UPnPsdk 0.1
Universal Plug and Play +, Software Development Kit
 
Loading...
Searching...
No Matches
ThreadPool.cpp
Go to the documentation of this file.
1/*******************************************************************************
2 *
3 * Copyright (c) 2000-2003 Intel Corporation
4 * All rights reserved.
5 * Copyright (c) 2012 France Telecom All rights reserved.
6 * Copyright (C) 2021+ GPL 3 and higher by Ingo Höft, <Ingo@Hoeft-online.de>
7 * Redistribution only with this Copyright remark. Last modified: 2025-05-01
8 *
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions are met:
11 *
12 * - Redistributions of source code must retain the above copyright notice,
13 * this list of conditions and the following disclaimer.
14 * - Redistributions in binary form must reproduce the above copyright notice,
15 * this list of conditions and the following disclaimer in the documentation
16 * and/or other materials provided with the distribution.
17 * - Neither name of Intel Corporation nor the names of its contributors
18 * may be used to endorse or promote products derived from this software
19 * without specific prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL INTEL OR
25 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
26 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
27 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
28 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
29 * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 *
33 ******************************************************************************/
34// Last compare with ./Pupnp source file on 2024-10-26, ver 1.14.20
44#include <ThreadPool.hpp>
45
46#include <UPnPsdk/synclog.hpp>
47
49#include <cassert>
50#include <cstdio>
51#include <cstdlib>
52#include <cstring> /* for memset()*/
54
56constexpr int JOBFREELISTSIZE{100};
58constexpr int INFINITE_THREADS{-1};
60#define EMAXTHREADS (-8 & 1 << 29)
62#define INVALID_POLICY (-9 & 1 << 29)
63
64
65namespace {
75long DiffMillis(timeval* time1, timeval* time2) {
76 double temp = 0.0;
77
78 temp = static_cast<double>(time1->tv_sec - time2->tv_sec);
79 /* convert to milliseconds */
80 temp *= 1000.0;
81
82 /* convert microseconds to milliseconds and add to temp */
83 /* implicit flooring of unsigned long data type */
84 temp += static_cast<double>(time1->tv_usec - time2->tv_usec) / 1000.0;
85
86 return static_cast<long>(temp);
87}
88
89#if defined(STATS) || defined(DOXYGEN_RUN)
95 ThreadPoolStats* stats) {
96 stats->totalIdleTime = 0.0;
97 stats->totalJobsHQ = 0;
98 stats->totalJobsLQ = 0;
99 stats->totalJobsMQ = 0;
100 stats->totalTimeHQ = 0.0;
101 stats->totalTimeMQ = 0.0;
102 stats->totalTimeLQ = 0.0;
103 stats->totalWorkTime = 0.0;
104 stats->totalIdleTime = 0.0;
105 stats->avgWaitHQ = 0.0;
106 stats->avgWaitMQ = 0.0;
107 stats->avgWaitLQ = 0.0;
108 stats->workerThreads = 0;
109 stats->idleThreads = 0;
110 stats->persistentThreads = 0;
111 stats->maxThreads = 0;
112 stats->totalThreads = 0;
113}
114
120 ThreadPool* tp,
122 long diffTime) {
123 tp->stats.totalJobsLQ++;
124 tp->stats.totalTimeLQ += static_cast<double>(diffTime);
125}
126
132 ThreadPool* tp,
134 long diffTime) {
135 tp->stats.totalJobsMQ++;
136 tp->stats.totalTimeMQ += static_cast<double>(diffTime);
137}
138
144 ThreadPool* tp,
146 long diffTime) {
147 tp->stats.totalJobsHQ++;
148 tp->stats.totalTimeHQ += static_cast<double>(diffTime);
149}
150
160 ThreadPool* tp,
164 ThreadPoolJob* job) {
165 struct timeval now;
166 long diff;
167
168 assert(tp != NULL);
169 assert(job != NULL);
170
171 gettimeofday(&now, NULL);
172 diff = DiffMillis(&now, &job->requestTime);
173 switch (p) {
174 case LOW_PRIORITY:
175 StatsAccountLQ(tp, diff);
176 break;
177 case MED_PRIORITY:
178 StatsAccountMQ(tp, diff);
179 break;
180 case HIGH_PRIORITY:
181 StatsAccountHQ(tp, diff);
182 break;
183 default:
184 assert(0);
185 }
186}
187
193 time_t* t) {
194 struct timeval tv;
195
196 gettimeofday(&tv, NULL);
197 if (t)
198 *t = tv.tv_sec;
199
200 return tv.tv_sec;
201}
202#else /* STATS */
203inline void StatsInit(ThreadPoolStats* stats) {}
204inline void StatsAccountLQ(ThreadPool* tp, long diffTime) {}
205inline void StatsAccountMQ(ThreadPool* tp, long diffTime) {}
206inline void StatsAccountHQ(ThreadPool* tp, long diffTime) {}
207inline void CalcWaitTime(ThreadPool* tp, ThreadPriority p, ThreadPoolJob* job) {
208}
209inline time_t StatsTime(time_t* t) { return 0; }
210#endif /* STATS */
211
215int CmpThreadPoolJob(void* jobA, void* jobB) {
216 ThreadPoolJob* a = (ThreadPoolJob*)jobA;
217 ThreadPoolJob* b = (ThreadPoolJob*)jobB;
218
219 return a->jobId == b->jobId;
220}
221
227 ThreadPool* tp,
229 ThreadPoolJob* tpj) {
230 FreeListFree(&tp->jobFreeList, tpj);
231}
232
242 [[maybe_unused]] PolicyType in) {
243 int retVal = 0;
244#ifdef __CYGWIN__
246 (void)in;
247 retVal = 0;
248#elif defined(__APPLE__) || defined(__NetBSD__)
249 (void)in;
250 setpriority(PRIO_PROCESS, 0, 0);
251 retVal = 0;
252#elif defined(__PTW32_DLLPORT)
253 retVal = sched_setscheduler(0, in);
254#elif defined(_POSIX_PRIORITY_SCHEDULING) && _POSIX_PRIORITY_SCHEDULING > 0
255 struct sched_param current;
256 int sched_result;
257
258 memset(&current, 0, sizeof(current));
259 sched_getparam(0, &current);
260 current.sched_priority = sched_get_priority_min(DEFAULT_POLICY);
261 sched_result = sched_setscheduler(0, in, &current);
262 retVal = (sched_result != -1 || errno == EPERM) ? 0 : errno;
263#else
264 retVal = 0;
265#endif
266 return retVal;
267}
268
278 ThreadPriority priority) {
279#if defined(_POSIX_PRIORITY_SCHEDULING) && _POSIX_PRIORITY_SCHEDULING > 0
280 int retVal = 0;
281 int currentPolicy;
282 int minPriority = 0;
283 int maxPriority = 0;
284 int actPriority = 0;
285 int midPriority = 0;
286 struct sched_param newPriority;
287 int sched_result;
288
289 pthread_getschedparam(pthread_self(), &currentPolicy, &newPriority);
290 minPriority = sched_get_priority_min(currentPolicy);
291 maxPriority = sched_get_priority_max(currentPolicy);
292 midPriority = (maxPriority - minPriority) / 2;
293 switch (priority) {
294 case LOW_PRIORITY:
295 actPriority = minPriority;
296 break;
297 case MED_PRIORITY:
298 actPriority = midPriority;
299 break;
300 case HIGH_PRIORITY:
301 actPriority = maxPriority;
302 break;
303 default:
304 retVal = EINVAL;
305 goto exit_function;
306 };
307
308 newPriority.sched_priority = actPriority;
309
310 sched_result =
311 pthread_setschedparam(pthread_self(), currentPolicy, &newPriority);
312 retVal = (sched_result == 0 || errno == EPERM) ? 0 : sched_result;
313exit_function:
314 return retVal;
315#else
316 (void)priority;
317 return 0;
318#endif
319}
320
329 ThreadPool* tp) {
330 int done = 0;
331 struct timeval now;
332 long diffTime = 0;
333 ThreadPoolJob* tempJob = NULL;
334
335 gettimeofday(&now, NULL);
336 while (!done) {
337 if (tp->medJobQ.size) {
338 tempJob = (ThreadPoolJob*)tp->medJobQ.head.next->item;
339 diffTime = DiffMillis(&now, &tempJob->requestTime);
340 if (diffTime >= tp->attr.starvationTime) {
341 /* If job has waited longer than the starvation time, bump
342 * priority (add to higher priority Q) */
343 StatsAccountMQ(tp, diffTime);
344 ListDelNode(&tp->medJobQ, tp->medJobQ.head.next, 0);
345 ListAddTail(&tp->highJobQ, tempJob);
346 continue;
347 }
348 }
349 if (tp->lowJobQ.size) {
350 tempJob = (ThreadPoolJob*)tp->lowJobQ.head.next->item;
351 diffTime = DiffMillis(&now, &tempJob->requestTime);
352 if (diffTime >= tp->attr.maxIdleTime) {
353 /* If job has waited longer than the starvation time, bump
354 * priority (add to higher priority Q) */
355 StatsAccountLQ(tp, diffTime);
356 ListDelNode(&tp->lowJobQ, tp->lowJobQ.head.next, 0);
357 ListAddTail(&tp->medJobQ, tempJob);
358 continue;
359 }
360 }
361 done = 1;
362 }
363}
364
371 timespec* time,
373 int relMillis) {
374 timeval now;
375 int sec = relMillis / 1000;
376 int milliSeconds = relMillis % 1000;
377
378 gettimeofday(&now, NULL);
379 time->tv_sec = now.tv_sec + sec;
380 time->tv_nsec = (now.tv_usec / 1000 + milliSeconds) * 1000000;
381}
382
390void SetSeed(void) {
391 struct timeval t;
392
393 gettimeofday(&t, NULL);
394#if defined(__PTW32_DLLPORT) // pthreads4w on Microsoft Windows available.
395 srand((unsigned int)t.tv_usec +
396 (unsigned int)((unsigned long long)pthread_self().p));
397#elif defined(BSD) || defined(__APPLE__) || defined(__FreeBSD_kernel__)
398 srand((unsigned int)t.tv_usec +
399 (unsigned int)((unsigned long)pthread_self()));
400#elif defined(__linux__) || defined(__sun) || defined(__CYGWIN__) || \
401 defined(__GLIBC__)
402 srand((unsigned int)t.tv_usec + (unsigned int)pthread_self());
403#else
404 {
405 volatile union {
406 volatile pthread_t tid;
407 volatile unsigned i;
408 } idu;
409
410 idu.tid = pthread_self();
411 srand((unsigned int)t.tv_usec + idu.i);
412 }
413#endif
414}
415
425 void* arg) {
426 time_t start = 0;
427
428 ThreadPoolJob* job = NULL;
429 ListNode* head = NULL;
430
431 timespec timeout;
432 int retCode = 0;
433 int persistent = -1;
434 ThreadPool* tp = (ThreadPool*)arg;
435
436 UPnPsdk::initialize_thread();
437
438 /* Increment total thread count */
439 pthread_mutex_lock(&tp->mutex);
440 tp->totalThreads++;
442 pthread_cond_broadcast(&tp->start_and_shutdown);
443 pthread_mutex_unlock(&tp->mutex);
444
445 SetSeed();
446 StatsTime(&start);
447 while (1) {
448 pthread_mutex_lock(&tp->mutex);
449 if (job) {
450 tp->busyThreads--;
451 FreeThreadPoolJob(tp, job);
452 job = NULL;
453 }
454 retCode = 0;
455 tp->stats.idleThreads++;
456 tp->stats.totalWorkTime += (double)StatsTime(NULL) - (double)start;
457 StatsTime(&start);
458 if (persistent == 0) {
459 tp->stats.workerThreads--;
460 } else if (persistent == 1) {
461 /* Persistent thread becomes a regular thread */
462 tp->persistentThreads--;
463 }
464
465 /* Check for a job or shutdown */
466 while (tp->lowJobQ.size == 0 && tp->medJobQ.size == 0 &&
467 tp->highJobQ.size == 0 && !tp->persistentJob && !tp->shutdown) {
468 /* If wait timed out and we currently have more than the
469 * min threads, or if we have more than the max threads
470 * (only possible if the attributes have been reset)
471 * let this thread die. */
472 if ((retCode == ETIMEDOUT &&
473 tp->totalThreads > tp->attr.minThreads) ||
474 (tp->attr.maxThreads != -1 &&
475 tp->totalThreads > tp->attr.maxThreads)) {
476 tp->stats.idleThreads--;
477 goto exit_function;
478 }
479 SetRelTimeout(&timeout, tp->attr.maxIdleTime);
480
481 /* wait for a job up to the specified max time */
482 retCode =
483 pthread_cond_timedwait(&tp->condition, &tp->mutex, &timeout);
484 }
485 tp->stats.idleThreads--;
486 /* idle time */
487 tp->stats.totalIdleTime += (double)StatsTime(NULL) - (double)start;
488 /* work time */
489 StatsTime(&start);
490 /* bump priority of starved jobs */
491 BumpPriority(tp);
492 /* if shutdown then stop */
493 if (tp->shutdown) {
494 goto exit_function;
495 } else {
496 /* Pick up persistent job if available */
497 if (tp->persistentJob) {
498 job = tp->persistentJob;
499 tp->persistentJob = NULL;
500 tp->persistentThreads++;
501 persistent = 1;
502 pthread_cond_broadcast(&tp->start_and_shutdown);
503 } else {
504 tp->stats.workerThreads++;
505 persistent = 0;
506 /* Pick the highest priority job */
507 if (tp->highJobQ.size > 0) {
508 head = ListHead(&tp->highJobQ);
509 if (head == NULL) {
510 tp->stats.workerThreads--;
511 goto exit_function;
512 }
513 job = (ThreadPoolJob*)head->item;
514 CalcWaitTime(tp, HIGH_PRIORITY, job);
515 ListDelNode(&tp->highJobQ, head, 0);
516 } else if (tp->medJobQ.size > 0) {
517 head = ListHead(&tp->medJobQ);
518 if (head == NULL) {
519 tp->stats.workerThreads--;
520 goto exit_function;
521 }
522 job = (ThreadPoolJob*)head->item;
523 CalcWaitTime(tp, MED_PRIORITY, job);
524 ListDelNode(&tp->medJobQ, head, 0);
525 } else if (tp->lowJobQ.size > 0) {
526 head = ListHead(&tp->lowJobQ);
527 if (head == NULL) {
528 tp->stats.workerThreads--;
529 goto exit_function;
530 }
531 job = (ThreadPoolJob*)head->item;
532 CalcWaitTime(tp, LOW_PRIORITY, job);
533 ListDelNode(&tp->lowJobQ, head, 0);
534 } else {
535 /* Should never get here */
536 tp->stats.workerThreads--;
537 goto exit_function;
538 }
539 }
540 }
541
542 tp->busyThreads++;
543 pthread_mutex_unlock(&tp->mutex);
544
545 /* In the future can log info */
546 if (SetPriority(job->priority) != 0) {
547 } else {
548 }
549 /* run the job */
550 job->func(job->arg);
551 /* return to Normal */
553 }
554
555exit_function:
556 tp->totalThreads--;
557 pthread_cond_broadcast(&tp->start_and_shutdown);
558 pthread_mutex_unlock(&tp->mutex);
559 UPnPsdk::cleanup_thread();
560
561 return NULL;
562}
563
573 ThreadPoolJob* job,
575 int id,
577 ThreadPool* tp) {
578 ThreadPoolJob* newJob{nullptr};
579
580 newJob = (ThreadPoolJob*)FreeListAlloc(&tp->jobFreeList);
581 if (newJob) {
582 *newJob = *job;
583 newJob->jobId = id;
584 gettimeofday(&newJob->requestTime, NULL);
585 }
586
587 return newJob;
588}
589
605 ThreadPool* tp) {
606 pthread_t temp;
607 int rc = 0;
608 pthread_attr_t attr;
609
610 /* if a new worker is the process of starting, wait until it fully
611 * starts */
612 while (tp->pendingWorkerThreadStart) {
613 pthread_cond_wait(&tp->start_and_shutdown, &tp->mutex);
614 }
615
616 if (tp->attr.maxThreads != INFINITE_THREADS &&
617 tp->totalThreads + 1 > tp->attr.maxThreads) {
618 return EMAXTHREADS;
619 }
620 pthread_attr_init(&attr);
621 pthread_attr_setstacksize(&attr, tp->attr.stackSize);
622 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
623 rc = pthread_create(&temp, &attr, WorkerThread, tp);
624 pthread_attr_destroy(&attr);
625 if (rc == 0) {
627 /* wait until the new worker thread starts */
628 while (tp->pendingWorkerThreadStart) {
629 pthread_cond_wait(&tp->start_and_shutdown, &tp->mutex);
630 }
631 }
632 if (tp->stats.maxThreads < tp->totalThreads) {
633 tp->stats.maxThreads = tp->totalThreads;
634 }
635
636 return rc;
637}
638
650 ThreadPool* tp) {
651 long jobs = 0;
652 int threads = 0;
653
654 jobs = tp->highJobQ.size + tp->lowJobQ.size + tp->medJobQ.size;
655 threads = tp->totalThreads - tp->persistentThreads;
656 while (threads == 0 || (jobs / threads) >= tp->attr.jobsPerThread ||
657 (tp->totalThreads == tp->busyThreads)) {
658 if (CreateWorker(tp) != 0) {
659 return;
660 }
661 threads++;
662 }
663}
664
666} // anonymous namespace
667
668
670 TRACE2("Executing ThreadPoolInit() for ThreadPool ", tp)
671 int retCode = 0;
672 int i = 0;
673
674 if (!tp) {
675 return EINVAL;
676 }
677
678 retCode += pthread_mutex_init(&tp->mutex, NULL);
679 retCode += pthread_mutex_lock(&tp->mutex);
680
681 retCode += pthread_cond_init(&tp->condition, NULL);
682 retCode += pthread_cond_init(&tp->start_and_shutdown, NULL);
683 if (retCode) {
684 pthread_mutex_unlock(&tp->mutex);
685 pthread_mutex_destroy(&tp->mutex);
686 pthread_cond_destroy(&tp->condition);
687 pthread_cond_destroy(&tp->start_and_shutdown);
688 return EAGAIN;
689 }
690 if (attr) {
691 tp->attr = *attr;
692 } else {
693 TPAttrInit(&tp->attr);
694 }
695 if (SetPolicyType(tp->attr.schedPolicy) != 0) {
696 pthread_mutex_unlock(&tp->mutex);
697 pthread_mutex_destroy(&tp->mutex);
698 pthread_cond_destroy(&tp->condition);
699 pthread_cond_destroy(&tp->start_and_shutdown);
700
701 return INVALID_POLICY;
702 }
703 retCode +=
705 StatsInit(&tp->stats);
706 retCode += ListInit(&tp->highJobQ, CmpThreadPoolJob, NULL);
707 retCode += ListInit(&tp->medJobQ, CmpThreadPoolJob, NULL);
708 retCode += ListInit(&tp->lowJobQ, CmpThreadPoolJob, NULL);
709 if (retCode) {
710 retCode = EAGAIN;
711 } else {
712 tp->persistentJob = NULL;
713 tp->lastJobId = 0;
714 tp->shutdown = 0;
715 tp->totalThreads = 0;
716 tp->busyThreads = 0;
717 tp->persistentThreads = 0;
719 for (i = 0; i < tp->attr.minThreads; ++i) {
720 retCode = CreateWorker(tp);
721 if (retCode) {
722 break;
723 }
724 }
725 }
726
727 pthread_mutex_unlock(&tp->mutex);
728
729 if (retCode) {
730 /* clean up if the min threads could not be created */
732 }
733
734 return retCode;
735}
736
738 int ret = 0;
739 int tempId = -1;
740 ThreadPoolJob* temp = NULL;
741
742 if (!tp || !job) {
743 return EINVAL;
744 }
745 if (!jobId) {
746 jobId = &tempId;
747 }
748 *jobId = INVALID_JOB_ID;
749
750 pthread_mutex_lock(&tp->mutex);
751
752 /* Create A worker if less than max threads running */
753 if (tp->totalThreads < tp->attr.maxThreads) {
754 CreateWorker(tp);
755 } else {
756 /* if there is more than one worker thread
757 * available then schedule job, otherwise fail */
758 if (tp->totalThreads - tp->persistentThreads - 1 == 0) {
759 ret = EMAXTHREADS;
760 goto exit_function;
761 }
762 }
763 temp = CreateThreadPoolJob(job, tp->lastJobId, tp);
764 if (!temp) {
765 ret = EOUTOFMEM;
766 goto exit_function;
767 }
768 tp->persistentJob = temp;
769
770 /* Notify a waiting thread */
771 pthread_cond_signal(&tp->condition);
772
773 /* wait until long job has been picked up */
774 while (tp->persistentJob)
775 pthread_cond_wait(&tp->start_and_shutdown, &tp->mutex);
776 *jobId = tp->lastJobId++;
777
778exit_function:
779 pthread_mutex_unlock(&tp->mutex);
780
781 return ret;
782}
783
784int ThreadPoolAdd(ThreadPool* tp, ThreadPoolJob* job, int* jobId) {
785 TRACE("Executing ThreadPoolAdd()")
786 int rc = EOUTOFMEM;
787 int tempId = -1;
788 long totalJobs;
789 ThreadPoolJob* temp = NULL;
790
791 if (!tp || !job)
792 return EINVAL;
793
794 pthread_mutex_lock(&tp->mutex);
795
796 totalJobs = tp->highJobQ.size + tp->lowJobQ.size + tp->medJobQ.size;
797 if (totalJobs >= tp->attr.maxJobsTotal) {
798 fprintf(stderr, "libupnp ThreadPoolAdd too many jobs: %ld\n",
799 totalJobs);
800 goto exit_function;
801 }
802 if (!jobId)
803 jobId = &tempId;
804 *jobId = INVALID_JOB_ID;
805 temp = CreateThreadPoolJob(job, tp->lastJobId, tp);
806 if (!temp)
807 goto exit_function;
808 switch (job->priority) {
809 case HIGH_PRIORITY:
810 if (ListAddTail(&tp->highJobQ, temp))
811 rc = 0;
812 break;
813 case MED_PRIORITY:
814 if (ListAddTail(&tp->medJobQ, temp))
815 rc = 0;
816 break;
817 default:
818 if (ListAddTail(&tp->lowJobQ, temp))
819 rc = 0;
820 }
821 /* AddWorker if appropriate */
822 AddWorker(tp);
823 /* Notify a waiting thread */
824 if (rc == 0)
825 pthread_cond_signal(&tp->condition);
826 else
827 FreeThreadPoolJob(tp, temp);
828 *jobId = tp->lastJobId++;
829
830exit_function:
831 pthread_mutex_unlock(&tp->mutex);
832
833 return rc;
834}
835
836int ThreadPoolRemove(ThreadPool* tp, int jobId, ThreadPoolJob* out) {
837 int ret = INVALID_JOB_ID;
838 ThreadPoolJob* temp = NULL;
839 ListNode* tempNode = NULL;
840 ThreadPoolJob dummy;
841
842 if (!tp)
843 return EINVAL;
844 if (!out)
845 out = &dummy;
846 dummy.jobId = jobId;
847
848 pthread_mutex_lock(&tp->mutex);
849
850 tempNode = ListFind(&tp->highJobQ, NULL, &dummy);
851 if (tempNode) {
852 temp = (ThreadPoolJob*)tempNode->item;
853 *out = *temp;
854 ListDelNode(&tp->highJobQ, tempNode, 0);
855 FreeThreadPoolJob(tp, temp);
856 ret = 0;
857 goto exit_function;
858 }
859
860 tempNode = ListFind(&tp->medJobQ, NULL, &dummy);
861 if (tempNode) {
862 temp = (ThreadPoolJob*)tempNode->item;
863 *out = *temp;
864 ListDelNode(&tp->medJobQ, tempNode, 0);
865 FreeThreadPoolJob(tp, temp);
866 ret = 0;
867 goto exit_function;
868 }
869 tempNode = ListFind(&tp->lowJobQ, NULL, &dummy);
870 if (tempNode) {
871 temp = (ThreadPoolJob*)tempNode->item;
872 *out = *temp;
873 ListDelNode(&tp->lowJobQ, tempNode, 0);
874 FreeThreadPoolJob(tp, temp);
875 ret = 0;
876 goto exit_function;
877 }
878 if (tp->persistentJob && tp->persistentJob->jobId == jobId) {
879 *out = *tp->persistentJob;
881 tp->persistentJob = NULL;
882 ret = 0;
883 goto exit_function;
884 }
885
886exit_function:
887 pthread_mutex_unlock(&tp->mutex);
888
889 return ret;
890}
891
893 if (!tp || !out)
894 return EINVAL;
895 if (!tp->shutdown)
896 pthread_mutex_lock(&tp->mutex);
897 *out = tp->attr;
898 if (!tp->shutdown)
899 pthread_mutex_unlock(&tp->mutex);
900
901 return 0;
902}
903
905 int retCode = 0;
906 ThreadPoolAttr temp;
907 int i = 0;
908
909 if (!tp)
910 return EINVAL;
911
912 pthread_mutex_lock(&tp->mutex);
913
914 if (attr)
915 temp = *attr;
916 else
917 TPAttrInit(&temp);
918 if (SetPolicyType(temp.schedPolicy) != 0) {
919 pthread_mutex_unlock(&tp->mutex);
920 return INVALID_POLICY;
921 }
922 tp->attr = temp;
923 /* add threads */
924 if (tp->totalThreads < tp->attr.minThreads) {
925 for (i = tp->totalThreads; i < tp->attr.minThreads; i++) {
926 retCode = CreateWorker(tp);
927 if (retCode != 0) {
928 break;
929 }
930 }
931 }
932 /* signal changes */
933 pthread_cond_signal(&tp->condition);
934
935 pthread_mutex_unlock(&tp->mutex);
936
937 if (retCode != 0)
938 /* clean up if the min threads could not be created */
940
941 return retCode;
942}
943
945 TRACE2("Executing ThreadPoolShutdown() for ThreadPool ", tp)
946 ListNode* head = NULL;
947 ThreadPoolJob* temp = NULL;
948
949 if (!tp)
950 return EINVAL;
951 pthread_mutex_lock(&tp->mutex);
952 /* clean up high priority jobs */
953 while (tp->highJobQ.size) {
954 head = ListHead(&tp->highJobQ);
955 if (head == NULL) {
956 pthread_mutex_unlock(&tp->mutex);
957 return EINVAL;
958 }
959 temp = (ThreadPoolJob*)head->item;
960 if (temp->free_func)
961 temp->free_func(temp->arg);
962 FreeThreadPoolJob(tp, temp);
963 ListDelNode(&tp->highJobQ, head, 0);
964 }
965 ListDestroy(&tp->highJobQ, 0);
966 /* clean up med priority jobs */
967 while (tp->medJobQ.size) {
968 head = ListHead(&tp->medJobQ);
969 if (head == NULL) {
970 pthread_mutex_unlock(&tp->mutex);
971 return EINVAL;
972 }
973 temp = (ThreadPoolJob*)head->item;
974 if (temp->free_func)
975 temp->free_func(temp->arg);
976 FreeThreadPoolJob(tp, temp);
977 ListDelNode(&tp->medJobQ, head, 0);
978 }
979 ListDestroy(&tp->medJobQ, 0);
980 /* clean up low priority jobs */
981 while (tp->lowJobQ.size) {
982 head = ListHead(&tp->lowJobQ);
983 if (head == NULL) {
984 pthread_mutex_unlock(&tp->mutex);
985 return EINVAL;
986 }
987 temp = (ThreadPoolJob*)head->item;
988 if (temp->free_func)
989 temp->free_func(temp->arg);
990 FreeThreadPoolJob(tp, temp);
991 ListDelNode(&tp->lowJobQ, head, 0);
992 }
993 ListDestroy(&tp->lowJobQ, 0);
994 /* clean up long term job */
995 if (tp->persistentJob) {
996 temp = tp->persistentJob;
997 if (temp->free_func)
998 temp->free_func(temp->arg);
999 FreeThreadPoolJob(tp, temp);
1000 tp->persistentJob = NULL;
1001 }
1002 /* signal shutdown */
1003 tp->shutdown = 1;
1004 pthread_cond_broadcast(&tp->condition);
1005 /* wait for all threads to finish */
1006 while (tp->totalThreads > 0)
1007 pthread_cond_wait(&tp->start_and_shutdown, &tp->mutex);
1008 /* destroy condition */
1009 while (pthread_cond_destroy(&tp->condition) != 0) {
1010 }
1011 while (pthread_cond_destroy(&tp->start_and_shutdown) != 0) {
1012 }
1014
1015 pthread_mutex_unlock(&tp->mutex);
1016
1017 /* destroy mutex */
1018 while (pthread_mutex_destroy(&tp->mutex) != 0) {
1019 }
1020
1021 return 0;
1022}
1023
1025
1026void TPSetMaxJobsTotal(int mjt) { maxJobsTotal = mjt; }
1027
1029 if (!attr)
1030 return EINVAL;
1038 attr->maxJobsTotal = maxJobsTotal;
1039
1040 return 0;
1041}
1042
1043int TPJobInit(ThreadPoolJob* job, UPnPsdk::start_routine func, void* arg) {
1044 if (!job || !func)
1045 return EINVAL;
1046 job->func = func;
1047 job->arg = arg;
1048 job->priority = DEFAULT_PRIORITY;
1049 job->free_func = DEFAULT_FREE_ROUTINE;
1050
1051 return 0;
1052}
1053
1055 if (!job)
1056 return EINVAL;
1057 switch (priority) {
1058 case LOW_PRIORITY:
1059 case MED_PRIORITY:
1060 case HIGH_PRIORITY:
1061 job->priority = priority;
1062 return 0;
1063 default:
1064 return EINVAL;
1065 }
1066}
1067
1069 if (!job)
1070 return EINVAL;
1071 job->free_func = func;
1072
1073 return 0;
1074}
1075
1076int TPAttrSetMaxThreads(ThreadPoolAttr* attr, int maxThreads) {
1077 if (!attr)
1078 return EINVAL;
1079 attr->maxThreads = maxThreads;
1080
1081 return 0;
1082}
1083
1084int TPAttrSetMinThreads(ThreadPoolAttr* attr, int minThreads) {
1085 if (!attr)
1086 return EINVAL;
1087 attr->minThreads = minThreads;
1088
1089 return 0;
1090}
1091
1092int TPAttrSetStackSize(ThreadPoolAttr* attr, size_t stackSize) {
1093 if (!attr)
1094 return EINVAL;
1095 attr->stackSize = stackSize;
1096
1097 return 0;
1098}
1099
1100int TPAttrSetIdleTime(ThreadPoolAttr* attr, int idleTime) {
1101 if (!attr)
1102 return EINVAL;
1103 attr->maxIdleTime = idleTime;
1104
1105 return 0;
1106}
1107
1108int TPAttrSetJobsPerThread(ThreadPoolAttr* attr, int jobsPerThread) {
1109 if (!attr)
1110 return EINVAL;
1111 attr->jobsPerThread = jobsPerThread;
1112
1113 return 0;
1114}
1115
1116int TPAttrSetStarvationTime(ThreadPoolAttr* attr, int starvationTime) {
1117 if (!attr)
1118 return EINVAL;
1119 attr->starvationTime = starvationTime;
1120
1121 return 0;
1122}
1123
1125 if (!attr)
1126 return EINVAL;
1127 attr->schedPolicy = schedPolicy;
1128
1129 return 0;
1130}
1131
1132int TPAttrSetMaxJobsTotal(ThreadPoolAttr* attr, int totalMaxJobs) {
1133 if (!attr)
1134 return EINVAL;
1135 attr->maxJobsTotal = totalMaxJobs;
1136
1137 return 0;
1138}
1139
1140#if defined(STATS) || defined(DOXYGEN_RUN)
1142 if (!stats)
1143 return;
1144 /* some OSses time_t length may depending on platform, promote it to
1145 * long for safety */
1146 fprintf(stderr, "ThreadPoolStats at Time: %ld\n", (long)StatsTime(NULL));
1147 fprintf(stderr, "High Jobs pending: %d\n", stats->currentJobsHQ);
1148 fprintf(stderr, "Med Jobs Pending: %d\n", stats->currentJobsMQ);
1149 fprintf(stderr, "Low Jobs Pending: %d\n", stats->currentJobsLQ);
1150 fprintf(stderr, "Average Wait in High Priority Q in milliseconds: %f\n",
1151 stats->avgWaitHQ);
1152 fprintf(stderr, "Average Wait in Med Priority Q in milliseconds: %f\n",
1153 stats->avgWaitMQ);
1154 fprintf(stderr, "Averate Wait in Low Priority Q in milliseconds: %f\n",
1155 stats->avgWaitLQ);
1156 fprintf(stderr, "Max Threads Active: %d\n", stats->maxThreads);
1157 fprintf(stderr, "Current Worker Threads: %d\n", stats->workerThreads);
1158 fprintf(stderr, "Current Persistent Threads: %d\n",
1159 stats->persistentThreads);
1160 fprintf(stderr, "Current Idle Threads: %d\n", stats->idleThreads);
1161 fprintf(stderr, "Total Threads : %d\n", stats->totalThreads);
1162 fprintf(stderr, "Total Time spent Working in seconds: %f\n",
1163 stats->totalWorkTime);
1164 fprintf(stderr, "Total Time spent Idle in seconds : %f\n",
1165 stats->totalIdleTime);
1166}
1167
1169 if (tp == NULL || stats == NULL)
1170 return EINVAL;
1171 /* if not shutdown then acquire mutex */
1172 if (!tp->shutdown)
1173 pthread_mutex_lock(&tp->mutex);
1174
1175 *stats = tp->stats;
1176 if (stats->totalJobsHQ > 0)
1177 stats->avgWaitHQ = stats->totalTimeHQ / (double)stats->totalJobsHQ;
1178 else
1179 stats->avgWaitHQ = 0.0;
1180 if (stats->totalJobsMQ > 0)
1181 stats->avgWaitMQ = stats->totalTimeMQ / (double)stats->totalJobsMQ;
1182 else
1183 stats->avgWaitMQ = 0.0;
1184 if (stats->totalJobsLQ > 0)
1185 stats->avgWaitLQ = stats->totalTimeLQ / (double)stats->totalJobsLQ;
1186 else
1187 stats->avgWaitLQ = 0.0;
1188 stats->totalThreads = tp->totalThreads;
1189 stats->persistentThreads = tp->persistentThreads;
1190 stats->currentJobsHQ = (int)ListSize(&tp->highJobQ);
1191 stats->currentJobsLQ = (int)ListSize(&tp->lowJobQ);
1192 stats->currentJobsMQ = (int)ListSize(&tp->medJobQ);
1193
1194 /* if not shutdown then release mutex */
1195 if (!tp->shutdown)
1196 pthread_mutex_unlock(&tp->mutex);
1197
1198 return 0;
1199}
1200#endif /* STATS */
1201
1202#ifdef _WIN32
1204#if defined(_MSC_VER) || defined(_MSC_EXTENSIONS)
1205#define DELTA_EPOCH_IN_MICROSECS 11644473600000000Ui64
1206#else
1207#define DELTA_EPOCH_IN_MICROSECS 11644473600000000ULL
1208#endif
1210
1211int gettimeofday(struct timeval* tv, struct timezone* tz) {
1212 FILETIME ft;
1213 unsigned __int64 tmpres = 0;
1214 static int tzflag;
1215
1216 if (tv) {
1217 GetSystemTimeAsFileTime(&ft);
1218
1219 tmpres |= ft.dwHighDateTime;
1220 tmpres <<= 32;
1221 tmpres |= ft.dwLowDateTime;
1222
1223 /*converting file time to unix epoch*/
1224 tmpres /= 10; /*convert into microseconds*/
1225 tmpres -= DELTA_EPOCH_IN_MICROSECS;
1226 tv->tv_sec = (long)(tmpres / 1000000UL);
1227 tv->tv_usec = (long)(tmpres % 1000000UL);
1228 }
1229 if (tz) {
1230 if (!tzflag) {
1231 _tzset();
1232 tzflag++;
1233 }
1234#ifdef _UCRT
1235 long itz = 0;
1236 _get_timezone(&itz);
1237 tz->tz_minuteswest = (int)(itz / 60);
1238 _get_daylight(&tz->tz_dsttime);
1239#else
1240 tz->tz_minuteswest = _timezone / 60;
1241 tz->tz_dsttime = _daylight;
1242#endif
1243 }
1244
1245 return 0;
1246}
1247#endif /* _WIN32 */
int FreeListInit(FreeList *free_list, size_t elementSize, int maxFreeListLength)
Initializes Free List.
Definition FreeList.cpp:50
int FreeListFree(FreeList *free_list, void *element)
Returns an item to the Free List.
Definition FreeList.cpp:83
void * FreeListAlloc(FreeList *free_list)
Allocates chunk of set size.
Definition FreeList.cpp:64
int FreeListDestroy(FreeList *free_list)
Releases the resources stored with the free list.
Definition FreeList.cpp:103
long ListSize(LinkedList *list)
Returns the size of the list.
int ListDestroy(LinkedList *list, int freeItem)
Removes all memory associated with list nodes. Does not free LinkedList *list.
void * ListDelNode(LinkedList *list, ListNode *dnode, int freeItem)
Removes a node from the list. The memory for the node is freed.
ListNode * ListHead(LinkedList *list)
Returns the head of the list.
ListNode * ListAddTail(LinkedList *list, void *item)
Adds a node to the tail of the list. Node gets added immediately before list.tail.
ListNode * ListFind(LinkedList *list, ListNode *start, void *item)
Finds the specified item in the list.
int ListInit(LinkedList *list, cmp_routine cmp_func, free_function free_func)
Initializes LinkedList. Must be called first and only once for List.
ListNode head
head, first item is stored at: head->next
long size
size of list
#define EOUTOFMEM
Error condition for "out of memory".
Linked list node. Stores generic item and pointers to next and prev.
int ThreadPoolInit(ThreadPool *tp, ThreadPoolAttr *attr)
Initializes and starts ThreadPool.
int ThreadPoolAdd(ThreadPool *tp, ThreadPoolJob *job, int *jobId)
Adds a job to the thread pool.
#define INVALID_POLICY
#define EMAXTHREADS
int TPJobSetFreeFunction(ThreadPoolJob *job, free_routine func)
Sets the jobs free function.
int ThreadPoolShutdown(ThreadPool *tp)
Shuts the thread pool down.
int ThreadPoolGetAttr(ThreadPool *tp, ThreadPoolAttr *out)
Gets the current set of attributes associated with the thread pool.
void ThreadPoolPrintStats(ThreadPoolStats *stats)
Prints various statistics about the thread pool to stderr.
int TPAttrSetMaxJobsTotal(ThreadPoolAttr *attr, int totalMaxJobs)
Sets the maximum number jobs that can be qeued totally.
int TPAttrSetStarvationTime(ThreadPoolAttr *attr, int starvationTime)
Sets the starvation time for the thread pool attributes.
int TPAttrSetMaxThreads(ThreadPoolAttr *attr, int maxThreads)
Sets the max threads for the thread pool attributes.
int TPJobSetPriority(ThreadPoolJob *job, ThreadPriority priority)
Sets the priority of the threadpool job.
int ThreadPoolSetAttr(ThreadPool *tp, ThreadPoolAttr *attr)
Sets the attributes for the thread pool.
int ThreadPoolRemove(ThreadPool *tp, int jobId, ThreadPoolJob *out)
Removes a job from the thread pool.
int ThreadPoolGetStats(ThreadPool *tp, ThreadPoolStats *stats)
Returns various statistics about the thread pool.
int TPAttrSetJobsPerThread(ThreadPoolAttr *attr, int jobsPerThread)
Sets the jobs per thread ratio.
constexpr int INFINITE_THREADS
int gettimeofday(struct timeval *tv, struct timezone *tz)
Get time of day.
int TPJobInit(ThreadPoolJob *job, UPnPsdk::start_routine func, void *arg)
Initializes thread pool job.
int TPAttrInit(ThreadPoolAttr *attr)
Initializes thread pool attributes.
void TPSetMaxJobsTotal(int mjt)
Sets the maximum number of jobs in the thread pool.
constexpr int JOBFREELISTSIZE
int TPAttrSetIdleTime(ThreadPoolAttr *attr, int idleTime)
Sets the idle time for the thread pool attributes.
int TPAttrSetMinThreads(ThreadPoolAttr *attr, int minThreads)
Sets the min threads for the thread pool attributes.
int TPAttrSetSchedPolicy(ThreadPoolAttr *attr, PolicyType schedPolicy)
Sets the scheduling policy for the thread pool attributes.
int maxJobsTotal
int TPAttrSetStackSize(ThreadPoolAttr *attr, size_t stackSize)
Sets the stack size for the thread pool attributes.
int ThreadPoolAddPersistent(ThreadPool *tp, ThreadPoolJob *job, int *jobId)
Adds a persistent job to the thread pool.
Manage a threadpool (for internal use only).
int minThreads
ThreadPool will always maintain at least this many threads.
ThreadPoolStats stats
#define INVALID_JOB_ID
Invalid JOB Id.
constexpr free_routine DEFAULT_FREE_ROUTINE
FreeList jobFreeList
constexpr int DEFAULT_STACK_SIZE
constexpr int DEFAULT_JOBS_PER_THREAD
int tz_minuteswest
Minutes W of Greenwich.
constexpr int DEFAULT_STARVATION_TIME
constexpr ThreadPriority DEFAULT_PRIORITY
int maxIdleTime
this is the maximum time a thread will remain idle before dying (in milliseconds).
constexpr int DEFAULT_IDLE_TIME
constexpr int DEFAULT_MAX_JOBS_TOTAL
int maxJobsTotal
Maximum number of jobs that can be queued totally.
pthread_cond_t condition
LinkedList lowJobQ
constexpr int DEFAULT_MAX_THREADS
PolicyType schedPolicy
Scheduling policy to use.
#define DEFAULT_POLICY
Define default schedule policy that are defined in <sched.h>.
ThreadPoolJob * persistentJob
int starvationTime
The time a low priority or med priority job waits before getting bumped up a priority (in millisecond...
int tz_dsttime
Type of dst correction.
pthread_cond_t start_and_shutdown
int persistentThreads
int jobsPerThread
Jobs per thread to maintain.
LinkedList highJobQ
int PolicyType
Type of the thread policy.
ThreadPriority
Thread priority.
pthread_mutex_t mutex
ThreadPoolAttr attr
int pendingWorkerThreadStart
void(* free_routine)(void *arg)
size_t stackSize
This is the minimum stack size allocated for each thread.
LinkedList medJobQ
int maxThreads
ThreadPool will never have more than this number of threads.
constexpr int DEFAULT_MIN_THREADS
A thread pool.
Internal ThreadPool Job.
Structure to hold statistics.
Attributes for thread pool.
Timezone.
long DiffMillis(timeval *time1, timeval *time2)
Returns the difference in milliseconds between two timeval structures.
void CalcWaitTime(ThreadPool *tp, ThreadPriority p, ThreadPoolJob *job)
Calculates the time the job has been waiting at the specified priority.
void BumpPriority(ThreadPool *tp)
Determines whether any jobs need to be bumped to a higher priority Q and bumps them.
ThreadPoolJob * CreateThreadPoolJob(ThreadPoolJob *job, int id, ThreadPool *tp)
Creates a Thread Pool Job. (Dynamically allocated)
void SetSeed(void)
Sets seed for random number generator.
int CmpThreadPoolJob(void *jobA, void *jobB)
Compares thread pool jobs.
void StatsInit(ThreadPoolStats *stats)
Initializes the statistics structure.
time_t StatsTime(time_t *t)
StatsTime.
void StatsAccountMQ(ThreadPool *tp, long diffTime)
StatsAccountMQ.
void FreeThreadPoolJob(ThreadPool *tp, ThreadPoolJob *tpj)
Deallocates a dynamically allocated ThreadPoolJob.
void StatsAccountLQ(ThreadPool *tp, long diffTime)
StatsAccountLQ.
int SetPolicyType(PolicyType in)
Sets the scheduling policy of the current process.
int SetPriority(ThreadPriority priority)
Sets the priority of the currently running thread.
void SetRelTimeout(timespec *time, int relMillis)
Sets the fields of the passed in timespec to be relMillis milliseconds in the future.
int CreateWorker(ThreadPool *tp)
Creates a worker thread, if the thread pool does not already have max threads.
void * WorkerThread(void *arg)
Implements a thread pool worker.
void StatsAccountHQ(ThreadPool *tp, long diffTime)
StatsAccountHQ.
void AddWorker(ThreadPool *tp)
Determines whether or not a thread should be added based on the jobsPerThread ratio.
Define macro for synced logging to the console for detailed info and debug.