From 5e0d92cf2c499383a85377414c7afe4020ee1014 Mon Sep 17 00:00:00 2001 From: Sharanabasava Date: Wed, 22 Jan 2025 01:50:13 -0800 Subject: [PATCH] Replace semaphore with mutex on zos This change is to avoid semaphore contention across multiple processes. Currently on ZOS for each of the seven HC threads a unique semaphore ID will be created and are reused for all the processes in the system. Signed-off-by: Sharanabasava --- .../monitoring/agent/threads/WorkerThread.cpp | 41 ++++++++++++++++++- .../monitoring/agent/threads/WorkerThread.h | 5 +++ 2 files changed, 44 insertions(+), 2 deletions(-) diff --git a/src/ibmras/monitoring/agent/threads/WorkerThread.cpp b/src/ibmras/monitoring/agent/threads/WorkerThread.cpp index 4ab6d35..8d1c2bf 100644 --- a/src/ibmras/monitoring/agent/threads/WorkerThread.cpp +++ b/src/ibmras/monitoring/agent/threads/WorkerThread.cpp @@ -14,6 +14,12 @@ * limitations under the License. *******************************************************************************/ +#if defined(_ZOS) +#define _XOPEN_SOURCE_EXTENDED 1 +#define _UNIX03_THREADS +#include +#include +#endif #include "ibmras/monitoring/agent/threads/WorkerThread.h" #include "ibmras/monitoring/agent/Agent.h" @@ -26,12 +32,23 @@ namespace threads { extern IBMRAS_DECLARE_LOGGER; - +#if defined(_ZOS) +WorkerThread::WorkerThread(pullsource* pullSource) : data(threadEntry, cleanUp), countdown(0) { +#else WorkerThread::WorkerThread(pullsource* pullSource) : semaphore(0, 1, pullSource->header.name), data(threadEntry, cleanUp), countdown(0) { +#endif source = pullSource; running = false; stopped = true; data.setArgs(this); +#if defined(_ZOS) + lock = new pthread_mutex_t; + pthread_mutex_t* mutex = reinterpret_cast(lock); + cond = new pthread_cond_t; + pthread_cond_t* condition = reinterpret_cast(cond); + pthread_mutex_init(mutex, NULL); + pthread_cond_init(condition, NULL); +#endif } @@ -55,8 +72,12 @@ void WorkerThread::stop() { // We've already set running=false, so processLoop will finish the // next chance it gets and only then will set stopped=true. //stopped = true; - +#if defined(_ZOS) + pthread_mutex_destroy(reinterpret_cast(lock)); + pthread_cond_destroy(reinterpret_cast(cond)); +#else semaphore.inc(); +#endif IBMRAS_DEBUG_1(debug, "Worker thread for %s stopping", source->header.name); } @@ -74,7 +95,11 @@ void* WorkerThread::threadEntry(ibmras::common::port::ThreadData* data) { void WorkerThread::process(bool immediate) { IBMRAS_DEBUG_2(finest, "Worker thread process for %s, countdown is %d", source->header.name, countdown); if ((immediate && countdown > 120) || (countdown == 0)) { +#if defined(_ZOS) + pthread_cond_signal(reinterpret_cast(cond));// Notify one waiting thread +#else semaphore.inc(); +#endif countdown = source->pullInterval; } else { countdown--; @@ -89,7 +114,19 @@ void* WorkerThread::processLoop() { IBMRAS_DEBUG_1(finest, "Worker thread started for %s", source->header.name); Agent* agent = Agent::getInstance(); while (running) { +#if defined(_ZOS) + struct timeval now; + struct timespec timeout; + gettimeofday(&now, NULL); + timeout.tv_sec = now.tv_sec + 1; + timeout.tv_nsec = now.tv_usec * 1000; + pthread_mutex_lock(reinterpret_cast(lock)); + int rc = pthread_cond_timedwait(reinterpret_cast(cond),reinterpret_cast(lock), &timeout); + pthread_mutex_unlock(reinterpret_cast(lock)); + if (0 == rc && running) { +#else if (semaphore.wait(1) && running) { +#endif IBMRAS_DEBUG_1(fine, "Pulling data from source %s", source->header.name); monitordata* data = source->callback(); if (data != NULL) { diff --git a/src/ibmras/monitoring/agent/threads/WorkerThread.h b/src/ibmras/monitoring/agent/threads/WorkerThread.h index 7fe8bab..1b7ff43 100644 --- a/src/ibmras/monitoring/agent/threads/WorkerThread.h +++ b/src/ibmras/monitoring/agent/threads/WorkerThread.h @@ -43,7 +43,12 @@ class WorkerThread { void* processLoop(); bool running; bool stopped; +#if defined(_ZOS) + void* lock; + void* cond; +#else ibmras::common::port::Semaphore semaphore; /* sempahore to control data processing */ +#endif pullsource* source; /* source to pull data from */ ibmras::common::port::ThreadData data; int countdown;