60#define EMAXTHREADS (-8 & 1 << 29)
62#define INVALID_POLICY (-9 & 1 << 29)
78 temp =
static_cast<double>(time1->tv_sec - time2->tv_sec);
84 temp +=
static_cast<double>(time1->tv_usec - time2->tv_usec) / 1000.0;
86 return static_cast<long>(temp);
89#if defined(STATS) || defined(DOXYGEN_RUN)
96 stats->totalIdleTime = 0.0;
97 stats->totalJobsHQ = 0;
98 stats->totalJobsLQ = 0;
99 stats->totalJobsMQ = 0;
100 stats->totalTimeHQ = 0.0;
101 stats->totalTimeMQ = 0.0;
102 stats->totalTimeLQ = 0.0;
103 stats->totalWorkTime = 0.0;
104 stats->totalIdleTime = 0.0;
105 stats->avgWaitHQ = 0.0;
106 stats->avgWaitMQ = 0.0;
107 stats->avgWaitLQ = 0.0;
108 stats->workerThreads = 0;
109 stats->idleThreads = 0;
110 stats->persistentThreads = 0;
111 stats->maxThreads = 0;
112 stats->totalThreads = 0;
123 tp->
stats.totalJobsLQ++;
124 tp->
stats.totalTimeLQ +=
static_cast<double>(diffTime);
135 tp->
stats.totalJobsMQ++;
136 tp->
stats.totalTimeMQ +=
static_cast<double>(diffTime);
147 tp->
stats.totalJobsHQ++;
148 tp->
stats.totalTimeHQ +=
static_cast<double>(diffTime);
209inline time_t
StatsTime(time_t* t) {
return 0; }
219 return a->jobId == b->jobId;
248#elif defined(__APPLE__) || defined(__NetBSD__)
250 setpriority(PRIO_PROCESS, 0, 0);
252#elif defined(__PTW32_DLLPORT)
253 retVal = sched_setscheduler(0, in);
254#elif defined(_POSIX_PRIORITY_SCHEDULING) && _POSIX_PRIORITY_SCHEDULING > 0
255 struct sched_param current;
258 memset(¤t, 0,
sizeof(current));
259 sched_getparam(0, ¤t);
261 sched_result = sched_setscheduler(0, in, ¤t);
262 retVal = (sched_result != -1 || errno == EPERM) ? 0 : errno;
279#if defined(_POSIX_PRIORITY_SCHEDULING) && _POSIX_PRIORITY_SCHEDULING > 0
286 struct sched_param newPriority;
289 pthread_getschedparam(pthread_self(), ¤tPolicy, &newPriority);
290 minPriority = sched_get_priority_min(currentPolicy);
291 maxPriority = sched_get_priority_max(currentPolicy);
292 midPriority = (maxPriority - minPriority) / 2;
295 actPriority = minPriority;
298 actPriority = midPriority;
301 actPriority = maxPriority;
308 newPriority.sched_priority = actPriority;
311 pthread_setschedparam(pthread_self(), currentPolicy, &newPriority);
312 retVal = (sched_result == 0 || errno == EPERM) ? 0 : sched_result;
339 diffTime =
DiffMillis(&now, &tempJob->requestTime);
351 diffTime =
DiffMillis(&now, &tempJob->requestTime);
375 int sec = relMillis / 1000;
376 int milliSeconds = relMillis % 1000;
379 time->tv_sec = now.tv_sec + sec;
380 time->tv_nsec = (now.tv_usec / 1000 + milliSeconds) * 1000000;
394#if defined(__PTW32_DLLPORT)
395 srand((
unsigned int)t.tv_usec +
396 (
unsigned int)((
unsigned long long)pthread_self().p));
397#elif defined(BSD) || defined(__APPLE__) || defined(__FreeBSD_kernel__)
398 srand((
unsigned int)t.tv_usec +
399 (
unsigned int)((
unsigned long)pthread_self()));
400#elif defined(__linux__) || defined(__sun) || defined(__CYGWIN__) || \
402 srand((
unsigned int)t.tv_usec + (
unsigned int)pthread_self());
406 volatile pthread_t tid;
410 idu.tid = pthread_self();
411 srand((
unsigned int)t.tv_usec + idu.i);
436 UPnPsdk::initialize_thread();
439 pthread_mutex_lock(&tp->
mutex);
443 pthread_mutex_unlock(&tp->
mutex);
448 pthread_mutex_lock(&tp->
mutex);
455 tp->
stats.idleThreads++;
456 tp->
stats.totalWorkTime += (double)
StatsTime(NULL) - (double)start;
458 if (persistent == 0) {
459 tp->
stats.workerThreads--;
460 }
else if (persistent == 1) {
472 if ((retCode == ETIMEDOUT &&
476 tp->
stats.idleThreads--;
485 tp->
stats.idleThreads--;
487 tp->
stats.totalIdleTime += (double)
StatsTime(NULL) - (double)start;
504 tp->
stats.workerThreads++;
510 tp->
stats.workerThreads--;
519 tp->
stats.workerThreads--;
528 tp->
stats.workerThreads--;
536 tp->
stats.workerThreads--;
543 pthread_mutex_unlock(&tp->
mutex);
558 pthread_mutex_unlock(&tp->
mutex);
559 UPnPsdk::cleanup_thread();
620 pthread_attr_init(&attr);
622 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
624 pthread_attr_destroy(&attr);
670 TRACE2(
"Executing ThreadPoolInit() for ThreadPool ", tp)
678 retCode += pthread_mutex_init(&tp->
mutex, NULL);
679 retCode += pthread_mutex_lock(&tp->
mutex);
681 retCode += pthread_cond_init(&tp->
condition, NULL);
684 pthread_mutex_unlock(&tp->
mutex);
685 pthread_mutex_destroy(&tp->
mutex);
696 pthread_mutex_unlock(&tp->
mutex);
697 pthread_mutex_destroy(&tp->
mutex);
727 pthread_mutex_unlock(&tp->
mutex);
750 pthread_mutex_lock(&tp->
mutex);
779 pthread_mutex_unlock(&tp->
mutex);
785 TRACE(
"Executing ThreadPoolAdd()")
794 pthread_mutex_lock(&tp->
mutex);
798 fprintf(stderr,
"libupnp ThreadPoolAdd too many jobs: %ld\n",
808 switch (job->priority) {
831 pthread_mutex_unlock(&tp->
mutex);
848 pthread_mutex_lock(&tp->
mutex);
887 pthread_mutex_unlock(&tp->
mutex);
896 pthread_mutex_lock(&tp->
mutex);
899 pthread_mutex_unlock(&tp->
mutex);
912 pthread_mutex_lock(&tp->
mutex);
919 pthread_mutex_unlock(&tp->
mutex);
935 pthread_mutex_unlock(&tp->
mutex);
945 TRACE2(
"Executing ThreadPoolShutdown() for ThreadPool ", tp)
951 pthread_mutex_lock(&tp->
mutex);
956 pthread_mutex_unlock(&tp->
mutex);
961 temp->free_func(temp->arg);
970 pthread_mutex_unlock(&tp->
mutex);
975 temp->free_func(temp->arg);
984 pthread_mutex_unlock(&tp->
mutex);
989 temp->free_func(temp->arg);
998 temp->free_func(temp->arg);
1009 while (pthread_cond_destroy(&tp->
condition) != 0) {
1015 pthread_mutex_unlock(&tp->
mutex);
1018 while (pthread_mutex_destroy(&tp->
mutex) != 0) {
1061 job->priority = priority;
1071 job->free_func = func;
1140#if defined(STATS) || defined(DOXYGEN_RUN)
1146 fprintf(stderr,
"ThreadPoolStats at Time: %ld\n", (
long)
StatsTime(NULL));
1147 fprintf(stderr,
"High Jobs pending: %d\n", stats->currentJobsHQ);
1148 fprintf(stderr,
"Med Jobs Pending: %d\n", stats->currentJobsMQ);
1149 fprintf(stderr,
"Low Jobs Pending: %d\n", stats->currentJobsLQ);
1150 fprintf(stderr,
"Average Wait in High Priority Q in milliseconds: %f\n",
1152 fprintf(stderr,
"Average Wait in Med Priority Q in milliseconds: %f\n",
1154 fprintf(stderr,
"Averate Wait in Low Priority Q in milliseconds: %f\n",
1156 fprintf(stderr,
"Max Threads Active: %d\n", stats->maxThreads);
1157 fprintf(stderr,
"Current Worker Threads: %d\n", stats->workerThreads);
1158 fprintf(stderr,
"Current Persistent Threads: %d\n",
1159 stats->persistentThreads);
1160 fprintf(stderr,
"Current Idle Threads: %d\n", stats->idleThreads);
1161 fprintf(stderr,
"Total Threads : %d\n", stats->totalThreads);
1162 fprintf(stderr,
"Total Time spent Working in seconds: %f\n",
1163 stats->totalWorkTime);
1164 fprintf(stderr,
"Total Time spent Idle in seconds : %f\n",
1165 stats->totalIdleTime);
1169 if (tp == NULL || stats == NULL)
1173 pthread_mutex_lock(&tp->
mutex);
1176 if (stats->totalJobsHQ > 0)
1177 stats->avgWaitHQ = stats->totalTimeHQ / (double)stats->totalJobsHQ;
1179 stats->avgWaitHQ = 0.0;
1180 if (stats->totalJobsMQ > 0)
1181 stats->avgWaitMQ = stats->totalTimeMQ / (double)stats->totalJobsMQ;
1183 stats->avgWaitMQ = 0.0;
1184 if (stats->totalJobsLQ > 0)
1185 stats->avgWaitLQ = stats->totalTimeLQ / (double)stats->totalJobsLQ;
1187 stats->avgWaitLQ = 0.0;
1196 pthread_mutex_unlock(&tp->
mutex);
1204#if defined(_MSC_VER) || defined(_MSC_EXTENSIONS)
1205#define DELTA_EPOCH_IN_MICROSECS 11644473600000000Ui64
1207#define DELTA_EPOCH_IN_MICROSECS 11644473600000000ULL
1213 unsigned __int64 tmpres = 0;
1217 GetSystemTimeAsFileTime(&ft);
1219 tmpres |= ft.dwHighDateTime;
1221 tmpres |= ft.dwLowDateTime;
1225 tmpres -= DELTA_EPOCH_IN_MICROSECS;
1226 tv->tv_sec = (long)(tmpres / 1000000UL);
1227 tv->tv_usec = (long)(tmpres % 1000000UL);
1236 _get_timezone(&itz);
int FreeListInit(FreeList *free_list, size_t elementSize, int maxFreeListLength)
Initializes Free List.
int FreeListFree(FreeList *free_list, void *element)
Returns an item to the Free List.
void * FreeListAlloc(FreeList *free_list)
Allocates chunk of set size.
int FreeListDestroy(FreeList *free_list)
Releases the resources stored with the free list.
long ListSize(LinkedList *list)
Returns the size of the list.
int ListDestroy(LinkedList *list, int freeItem)
Removes all memory associated with list nodes. Does not free LinkedList *list.
void * ListDelNode(LinkedList *list, ListNode *dnode, int freeItem)
Removes a node from the list. The memory for the node is freed.
ListNode * ListHead(LinkedList *list)
Returns the head of the list.
ListNode * ListAddTail(LinkedList *list, void *item)
Adds a node to the tail of the list. Node gets added immediately before list.tail.
ListNode * ListFind(LinkedList *list, ListNode *start, void *item)
Finds the specified item in the list.
int ListInit(LinkedList *list, cmp_routine cmp_func, free_function free_func)
Initializes LinkedList. Must be called first and only once for List.
ListNode head
head, first item is stored at: head->next
#define EOUTOFMEM
Error condition for "out of memory".
Linked list node. Stores generic item and pointers to next and prev.
int ThreadPoolInit(ThreadPool *tp, ThreadPoolAttr *attr)
Initializes and starts ThreadPool.
int ThreadPoolAdd(ThreadPool *tp, ThreadPoolJob *job, int *jobId)
Adds a job to the thread pool.
int TPJobSetFreeFunction(ThreadPoolJob *job, free_routine func)
Sets the jobs free function.
int ThreadPoolShutdown(ThreadPool *tp)
Shuts the thread pool down.
int ThreadPoolGetAttr(ThreadPool *tp, ThreadPoolAttr *out)
Gets the current set of attributes associated with the thread pool.
void ThreadPoolPrintStats(ThreadPoolStats *stats)
Prints various statistics about the thread pool to stderr.
int TPAttrSetMaxJobsTotal(ThreadPoolAttr *attr, int totalMaxJobs)
Sets the maximum number jobs that can be qeued totally.
int TPAttrSetStarvationTime(ThreadPoolAttr *attr, int starvationTime)
Sets the starvation time for the thread pool attributes.
int TPAttrSetMaxThreads(ThreadPoolAttr *attr, int maxThreads)
Sets the max threads for the thread pool attributes.
int TPJobSetPriority(ThreadPoolJob *job, ThreadPriority priority)
Sets the priority of the threadpool job.
int ThreadPoolSetAttr(ThreadPool *tp, ThreadPoolAttr *attr)
Sets the attributes for the thread pool.
int ThreadPoolRemove(ThreadPool *tp, int jobId, ThreadPoolJob *out)
Removes a job from the thread pool.
int ThreadPoolGetStats(ThreadPool *tp, ThreadPoolStats *stats)
Returns various statistics about the thread pool.
int TPAttrSetJobsPerThread(ThreadPoolAttr *attr, int jobsPerThread)
Sets the jobs per thread ratio.
constexpr int INFINITE_THREADS
int gettimeofday(struct timeval *tv, struct timezone *tz)
Get time of day.
int TPJobInit(ThreadPoolJob *job, UPnPsdk::start_routine func, void *arg)
Initializes thread pool job.
int TPAttrInit(ThreadPoolAttr *attr)
Initializes thread pool attributes.
void TPSetMaxJobsTotal(int mjt)
Sets the maximum number of jobs in the thread pool.
constexpr int JOBFREELISTSIZE
int TPAttrSetIdleTime(ThreadPoolAttr *attr, int idleTime)
Sets the idle time for the thread pool attributes.
int TPAttrSetMinThreads(ThreadPoolAttr *attr, int minThreads)
Sets the min threads for the thread pool attributes.
int TPAttrSetSchedPolicy(ThreadPoolAttr *attr, PolicyType schedPolicy)
Sets the scheduling policy for the thread pool attributes.
int TPAttrSetStackSize(ThreadPoolAttr *attr, size_t stackSize)
Sets the stack size for the thread pool attributes.
int ThreadPoolAddPersistent(ThreadPool *tp, ThreadPoolJob *job, int *jobId)
Adds a persistent job to the thread pool.
Manage a threadpool (for internal use only).
int minThreads
ThreadPool will always maintain at least this many threads.
#define INVALID_JOB_ID
Invalid JOB Id.
constexpr free_routine DEFAULT_FREE_ROUTINE
constexpr int DEFAULT_STACK_SIZE
constexpr int DEFAULT_JOBS_PER_THREAD
int tz_minuteswest
Minutes W of Greenwich.
constexpr int DEFAULT_STARVATION_TIME
constexpr ThreadPriority DEFAULT_PRIORITY
int maxIdleTime
this is the maximum time a thread will remain idle before dying (in milliseconds).
constexpr int DEFAULT_IDLE_TIME
constexpr int DEFAULT_MAX_JOBS_TOTAL
int maxJobsTotal
Maximum number of jobs that can be queued totally.
constexpr int DEFAULT_MAX_THREADS
PolicyType schedPolicy
Scheduling policy to use.
#define DEFAULT_POLICY
Define default schedule policy that are defined in <sched.h>.
ThreadPoolJob * persistentJob
int starvationTime
The time a low priority or med priority job waits before getting bumped up a priority (in millisecond...
int tz_dsttime
Type of dst correction.
pthread_cond_t start_and_shutdown
int jobsPerThread
Jobs per thread to maintain.
int PolicyType
Type of the thread policy.
ThreadPriority
Thread priority.
int pendingWorkerThreadStart
void(* free_routine)(void *arg)
size_t stackSize
This is the minimum stack size allocated for each thread.
int maxThreads
ThreadPool will never have more than this number of threads.
constexpr int DEFAULT_MIN_THREADS
Structure to hold statistics.
Attributes for thread pool.
long DiffMillis(timeval *time1, timeval *time2)
Returns the difference in milliseconds between two timeval structures.
void CalcWaitTime(ThreadPool *tp, ThreadPriority p, ThreadPoolJob *job)
Calculates the time the job has been waiting at the specified priority.
void BumpPriority(ThreadPool *tp)
Determines whether any jobs need to be bumped to a higher priority Q and bumps them.
ThreadPoolJob * CreateThreadPoolJob(ThreadPoolJob *job, int id, ThreadPool *tp)
Creates a Thread Pool Job. (Dynamically allocated)
void SetSeed(void)
Sets seed for random number generator.
int CmpThreadPoolJob(void *jobA, void *jobB)
Compares thread pool jobs.
void StatsInit(ThreadPoolStats *stats)
Initializes the statistics structure.
time_t StatsTime(time_t *t)
StatsTime.
void StatsAccountMQ(ThreadPool *tp, long diffTime)
StatsAccountMQ.
void FreeThreadPoolJob(ThreadPool *tp, ThreadPoolJob *tpj)
Deallocates a dynamically allocated ThreadPoolJob.
void StatsAccountLQ(ThreadPool *tp, long diffTime)
StatsAccountLQ.
int SetPolicyType(PolicyType in)
Sets the scheduling policy of the current process.
int SetPriority(ThreadPriority priority)
Sets the priority of the currently running thread.
void SetRelTimeout(timespec *time, int relMillis)
Sets the fields of the passed in timespec to be relMillis milliseconds in the future.
int CreateWorker(ThreadPool *tp)
Creates a worker thread, if the thread pool does not already have max threads.
void * WorkerThread(void *arg)
Implements a thread pool worker.
void StatsAccountHQ(ThreadPool *tp, long diffTime)
StatsAccountHQ.
void AddWorker(ThreadPool *tp)
Determines whether or not a thread should be added based on the jobsPerThread ratio.
Define macro for synced logging to the console for detailed info and debug.