From b79a6856813d73f111b9989c3b53cbd45a741f3a Mon Sep 17 00:00:00 2001 From: Alexander B Date: Tue, 7 May 2024 11:49:52 +0300 Subject: [PATCH 1/3] Motivated by "The load balancing issue in Poco::ActiveThreadPool #4544" Optimization allows redistribute tasks to the idle threads --- Foundation/include/Poco/ActiveThreadPool.h | 8 ++- Foundation/src/ActiveThreadPool.cpp | 58 +++++++++++++++--- .../testsuite/src/ActiveThreadPoolTest.cpp | 60 +++++++++++++++++++ .../testsuite/src/ActiveThreadPoolTest.h | 1 + 4 files changed, 117 insertions(+), 10 deletions(-) diff --git a/Foundation/include/Poco/ActiveThreadPool.h b/Foundation/include/Poco/ActiveThreadPool.h index 5c04a8ff02..cbc363a1cf 100644 --- a/Foundation/include/Poco/ActiveThreadPool.h +++ b/Foundation/include/Poco/ActiveThreadPool.h @@ -41,16 +41,19 @@ class Foundation_API ActiveThreadPool /// The thread pool always keeps fixed number of threads running. /// Use case for this pool is running many (more than os-max-thread-count) short live tasks /// Round-robin model allow efficiently utilize cpu cores + /// Using redistributeTasks option allows optimize reusage of idle threads { public: ActiveThreadPool(int capacity = static_cast(Environment::processorCount()) + 1, - int stackSize = POCO_THREAD_STACK_SIZE); + int stackSize = POCO_THREAD_STACK_SIZE, + bool redistributeTasks = false); /// Creates a thread pool with fixed capacity threads. /// Threads are created with given stack size. ActiveThreadPool(std::string name, int capacity = static_cast(Environment::processorCount()) + 1, - int stackSize = POCO_THREAD_STACK_SIZE); + int stackSize = POCO_THREAD_STACK_SIZE, + bool redistributeTasks = false); /// Creates a thread pool with the given name and fixed capacity threads. /// Threads are created with given stack size. @@ -124,6 +127,7 @@ class Foundation_API ActiveThreadPool ThreadVec _threads; mutable FastMutex _mutex; std::atomic _lastThreadIndex{0}; + bool _redistributeTasks; }; diff --git a/Foundation/src/ActiveThreadPool.cpp b/Foundation/src/ActiveThreadPool.cpp index 628c78d6da..0b023296e2 100644 --- a/Foundation/src/ActiveThreadPool.cpp +++ b/Foundation/src/ActiveThreadPool.cpp @@ -79,31 +79,39 @@ class NewActionNotification: public Notification class ActiveThread: public Runnable { public: - ActiveThread(const std::string& name, int stackSize = POCO_THREAD_STACK_SIZE); + ActiveThread(const std::string& name, std::vector &threads, int stackSize = POCO_THREAD_STACK_SIZE, bool redistributeTasks = false); ~ActiveThread() override = default; void start(); void start(Thread::Priority priority, Runnable& target); void start(Thread::Priority priority, Runnable& target, const std::string& name); + void start(Notification::Ptr notification); void join(); + bool idle() const; + int id() const; void release(); void run() override; private: NotificationQueue _pTargetQueue; std::string _name; + std::vector &_threads; Thread _thread; Event _targetCompleted; FastMutex _mutex; const long JOIN_TIMEOUT = 10000; std::atomic _needToStop{false}; + std::atomic _idle{true}; + bool _redistributeTasks; }; -ActiveThread::ActiveThread(const std::string& name, int stackSize): +ActiveThread::ActiveThread(const std::string& name, std::vector &threads, int stackSize, bool redistributeTasks): _name(name), + _threads(threads), _thread(name), - _targetCompleted(false) + _targetCompleted(false), + _redistributeTasks(redistributeTasks) { poco_assert_dbg (stackSize >= 0); _thread.setStackSize(stackSize); @@ -122,6 +130,13 @@ void ActiveThread::start(Thread::Priority priority, Runnable& target) } +void ActiveThread::start(Notification::Ptr notification) +{ + if (!notification.isNull()){ + _pTargetQueue.enqueueNotification(std::move(notification)); + } +} + void ActiveThread::start(Thread::Priority priority, Runnable& target, const std::string& name) { _pTargetQueue.enqueueNotification(Poco::makeAuto(priority, target, name)); @@ -134,9 +149,17 @@ void ActiveThread::join() { _targetCompleted.wait(); } +} +inline bool ActiveThread::idle() const +{ + return _idle; } +inline int ActiveThread::id() const +{ + return _thread.id(); +} void ActiveThread::release() { @@ -165,6 +188,22 @@ void ActiveThread::run() AutoPtr pN = _pTargetQueue.waitDequeueNotification(); while (pN) { + _idle = false; + if (_redistributeTasks) + { + for (const auto &thr : _threads) + { + if ((thr->id() != _thread.id()) && thr->idle()) + { + thr->start(std::move(pN)); + pN = _pTargetQueue.waitDequeueNotification(1000); + } + } + if (pN.isNull()) + { + break; + } + } NewActionNotification::Ptr pNAN = pN.cast(); Runnable& target = pNAN->runnable(); _thread.setPriority(pNAN->priority()); @@ -191,16 +230,18 @@ void ActiveThread::run() pN = _pTargetQueue.waitDequeueNotification(1000); } _targetCompleted.set(); + _idle = true; } while (_needToStop == false); } -ActiveThreadPool::ActiveThreadPool(int capacity, int stackSize): +ActiveThreadPool::ActiveThreadPool(int capacity, int stackSize, bool redistributeTasks): _capacity(capacity), _serial(0), _stackSize(stackSize), - _lastThreadIndex(0) + _lastThreadIndex(0), + _redistributeTasks(redistributeTasks) { poco_assert (_capacity >= 1); @@ -215,12 +256,13 @@ ActiveThreadPool::ActiveThreadPool(int capacity, int stackSize): } -ActiveThreadPool::ActiveThreadPool(std::string name, int capacity, int stackSize): +ActiveThreadPool::ActiveThreadPool(std::string name, int capacity, int stackSize, bool redistributeTasks): _name(std::move(name)), _capacity(capacity), _serial(0), _stackSize(stackSize), - _lastThreadIndex(0) + _lastThreadIndex(0), + _redistributeTasks(redistributeTasks) { poco_assert (_capacity >= 1); @@ -323,7 +365,7 @@ ActiveThread* ActiveThreadPool::createThread() { std::ostringstream name; name << _name << "[#active-thread-" << ++_serial << "]"; - return new ActiveThread(name.str(), _stackSize); + return new ActiveThread(name.str(), _threads, _stackSize, _redistributeTasks); } diff --git a/Foundation/testsuite/src/ActiveThreadPoolTest.cpp b/Foundation/testsuite/src/ActiveThreadPoolTest.cpp index 1bef49d0bb..654dcfb30e 100644 --- a/Foundation/testsuite/src/ActiveThreadPoolTest.cpp +++ b/Foundation/testsuite/src/ActiveThreadPoolTest.cpp @@ -76,6 +76,65 @@ void ActiveThreadPoolTest::testActiveThreadPool() } +void ActiveThreadPoolTest::testActiveThreadLoadBalancing() +{ + Poco::AtomicCounter lttCount; + Poco::AtomicCounter lttPerTIDCount; + class LongTimeTask : public Poco::Runnable + { + Poco::AtomicCounter &_counter; + Poco::AtomicCounter &_tidCounter; + public: + LongTimeTask(Poco::AtomicCounter &counter, Poco::AtomicCounter &tidCounter) : + _counter(counter), + _tidCounter(tidCounter) + {} + void run() override + { + _counter++; + if (_tidCounter >= 0) + { + _tidCounter = _tidCounter + (-1 * Poco::Thread::currentOsTid()); + } + else + { + _tidCounter = _tidCounter + Poco::Thread::currentOsTid(); + } + Poco::Thread::sleep(1 * 110); + } + }; + + Poco::AtomicCounter sttCount; + class ShortTimeTask : public Poco::Runnable + { + Poco::AtomicCounter &_counter; + public: + ShortTimeTask(Poco::AtomicCounter &counter) : _counter(counter) {} + void run() override + { + _counter++; + Poco::Thread::sleep(1); + } + }; + + const int capacity = 2; + const int taskCount = 100; + const bool redistributeTasks = true; + Poco::ActiveThreadPool pool(capacity, POCO_THREAD_STACK_SIZE, redistributeTasks); + + for (int i = 0; i < taskCount; i++) { + LongTimeTask ltt(lttCount, lttPerTIDCount); + pool.start(ltt); + ShortTimeTask stt(sttCount); + pool.start(stt); + } + + pool.joinAll(); + assertEqual(taskCount, lttCount.value()); + assertEqual(taskCount, sttCount.value()); + assertTrue(lttPerTIDCount != 0); // without optimization all tasks runs on single thread and this counter equal to 0, othrewise - no +} + void ActiveThreadPoolTest::setUp() { _count = 0; @@ -98,6 +157,7 @@ CppUnit::Test* ActiveThreadPoolTest::suite() CppUnit::TestSuite* pSuite = new CppUnit::TestSuite("ActiveThreadPoolTest"); CppUnit_addTest(pSuite, ActiveThreadPoolTest, testActiveThreadPool); + CppUnit_addTest(pSuite, ActiveThreadPoolTest, testActiveThreadLoadBalancing); return pSuite; } diff --git a/Foundation/testsuite/src/ActiveThreadPoolTest.h b/Foundation/testsuite/src/ActiveThreadPoolTest.h index 51df837355..44f33a751d 100644 --- a/Foundation/testsuite/src/ActiveThreadPoolTest.h +++ b/Foundation/testsuite/src/ActiveThreadPoolTest.h @@ -27,6 +27,7 @@ class ActiveThreadPoolTest: public CppUnit::TestCase ~ActiveThreadPoolTest(); void testActiveThreadPool(); + void testActiveThreadLoadBalancing(); void setUp(); void tearDown(); From 57ba3661a865d6207bfccd6359532a757a70ade7 Mon Sep 17 00:00:00 2001 From: Alexander B Date: Wed, 8 May 2024 13:55:34 +0300 Subject: [PATCH 2/3] fix datarace problems from tsan --- Foundation/include/Poco/ActiveThreadPool.h | 4 +- Foundation/src/ActiveThreadPool.cpp | 86 ++++++++++--------- .../testsuite/src/ActiveThreadPoolTest.cpp | 38 ++++---- 3 files changed, 71 insertions(+), 57 deletions(-) diff --git a/Foundation/include/Poco/ActiveThreadPool.h b/Foundation/include/Poco/ActiveThreadPool.h index cbc363a1cf..66db1822ae 100644 --- a/Foundation/include/Poco/ActiveThreadPool.h +++ b/Foundation/include/Poco/ActiveThreadPool.h @@ -110,11 +110,11 @@ class Foundation_API ActiveThreadPool /// Returns a reference to the default /// thread pool. -protected: +private: ActiveThread* getThread(); ActiveThread* createThread(); + void recreateThreads(); -private: ActiveThreadPool(const ActiveThreadPool& pool); ActiveThreadPool& operator = (const ActiveThreadPool& pool); diff --git a/Foundation/src/ActiveThreadPool.cpp b/Foundation/src/ActiveThreadPool.cpp index 0b023296e2..d8180d675f 100644 --- a/Foundation/src/ActiveThreadPool.cpp +++ b/Foundation/src/ActiveThreadPool.cpp @@ -79,16 +79,18 @@ class NewActionNotification: public Notification class ActiveThread: public Runnable { public: - ActiveThread(const std::string& name, std::vector &threads, int stackSize = POCO_THREAD_STACK_SIZE, bool redistributeTasks = false); + ActiveThread(const std::string& name, std::vector &threads, int stackSize = POCO_THREAD_STACK_SIZE); ~ActiveThread() override = default; void start(); void start(Thread::Priority priority, Runnable& target); void start(Thread::Priority priority, Runnable& target, const std::string& name); void start(Notification::Ptr notification); + void setRedistributeOption(bool redistributeTask); void join(); bool idle() const; int id() const; + bool isRunning() const; void release(); void run() override; @@ -102,16 +104,14 @@ class ActiveThread: public Runnable const long JOIN_TIMEOUT = 10000; std::atomic _needToStop{false}; std::atomic _idle{true}; - bool _redistributeTasks; + bool _redistributeTasks{false}; }; - -ActiveThread::ActiveThread(const std::string& name, std::vector &threads, int stackSize, bool redistributeTasks): +ActiveThread::ActiveThread(const std::string& name, std::vector &threads, int stackSize): _name(name), _threads(threads), _thread(name), - _targetCompleted(false), - _redistributeTasks(redistributeTasks) + _targetCompleted(false) { poco_assert_dbg (stackSize >= 0); _thread.setStackSize(stackSize); @@ -123,13 +123,11 @@ void ActiveThread::start() _thread.start(*this); } - void ActiveThread::start(Thread::Priority priority, Runnable& target) { _pTargetQueue.enqueueNotification(Poco::makeAuto(priority, target, _name)); } - void ActiveThread::start(Notification::Ptr notification) { if (!notification.isNull()){ @@ -142,6 +140,11 @@ void ActiveThread::start(Thread::Priority priority, Runnable& target, const std: _pTargetQueue.enqueueNotification(Poco::makeAuto(priority, target, name)); } +void ActiveThread::setRedistributeOption(bool redistributeTask) +{ + _redistributeTasks = redistributeTask; +} + void ActiveThread::join() { _pTargetQueue.wakeUpAll(); @@ -161,6 +164,11 @@ inline int ActiveThread::id() const return _thread.id(); } +inline bool ActiveThread::isRunning() const +{ + return _thread.isRunning(); +} + void ActiveThread::release() { // In case of a statically allocated thread pool (such @@ -180,7 +188,6 @@ void ActiveThread::release() } } - void ActiveThread::run() { do @@ -193,7 +200,7 @@ void ActiveThread::run() { for (const auto &thr : _threads) { - if ((thr->id() != _thread.id()) && thr->idle()) + if (thr && thr->isRunning() && (thr->id() != _thread.id()) && thr->idle()) { thr->start(std::move(pN)); pN = _pTargetQueue.waitDequeueNotification(1000); @@ -244,15 +251,8 @@ ActiveThreadPool::ActiveThreadPool(int capacity, int stackSize, bool redistribut _redistributeTasks(redistributeTasks) { poco_assert (_capacity >= 1); - - _threads.reserve(_capacity); - - for (int i = 0; i < _capacity; i++) - { - ActiveThread* pThread = createThread(); - _threads.push_back(pThread); - pThread->start(); - } + + recreateThreads(); } @@ -265,15 +265,8 @@ ActiveThreadPool::ActiveThreadPool(std::string name, int capacity, int stackSiz _redistributeTasks(redistributeTasks) { poco_assert (_capacity >= 1); - - _threads.reserve(_capacity); - - for (int i = 0; i < _capacity; i++) - { - ActiveThread* pThread = createThread(); - _threads.push_back(pThread); - pThread->start(); - } + + recreateThreads(); } @@ -340,22 +333,15 @@ void ActiveThreadPool::joinAll() { pThread->join(); } - - _threads.clear(); - _threads.reserve(_capacity); - - for (int i = 0; i < _capacity; i++) - { - ActiveThread* pThread = createThread(); - _threads.push_back(pThread); - pThread->start(); - } + + recreateThreads(); } ActiveThread* ActiveThreadPool::getThread() { auto thrSize = _threads.size(); - auto i = (_lastThreadIndex++) % thrSize; + auto i = (_lastThreadIndex++); + i = i % thrSize; ActiveThread* pThread = _threads[i]; return pThread; } @@ -365,9 +351,29 @@ ActiveThread* ActiveThreadPool::createThread() { std::ostringstream name; name << _name << "[#active-thread-" << ++_serial << "]"; - return new ActiveThread(name.str(), _threads, _stackSize, _redistributeTasks); + return new ActiveThread(name.str(), _threads, _stackSize); } +void ActiveThreadPool::recreateThreads() +{ + _threads.clear(); + _threads.reserve(_capacity); + + for (int i = 0; i < _capacity; i++) + { + ActiveThread* pThread = createThread(); + _threads.push_back(pThread); + pThread->start(); + } + for (auto& thr : _threads) + { + while (!thr->isRunning()) + { + Poco::Thread::sleep(100); + } + thr->setRedistributeOption(_redistributeTasks); + } +} class ActiveThreadPoolSingletonHolder { diff --git a/Foundation/testsuite/src/ActiveThreadPoolTest.cpp b/Foundation/testsuite/src/ActiveThreadPoolTest.cpp index 654dcfb30e..3425a40dd4 100644 --- a/Foundation/testsuite/src/ActiveThreadPoolTest.cpp +++ b/Foundation/testsuite/src/ActiveThreadPoolTest.cpp @@ -16,6 +16,7 @@ #include "Poco/Exception.h" #include "Poco/Thread.h" #include "Poco/Environment.h" +#include using Poco::ActiveThreadPool; @@ -79,26 +80,32 @@ void ActiveThreadPoolTest::testActiveThreadPool() void ActiveThreadPoolTest::testActiveThreadLoadBalancing() { Poco::AtomicCounter lttCount; - Poco::AtomicCounter lttPerTIDCount; + ptrdiff_t lttPerTIDCount = 0; + Poco::FastMutex mutex; class LongTimeTask : public Poco::Runnable { Poco::AtomicCounter &_counter; - Poco::AtomicCounter &_tidCounter; + ptrdiff_t &_tidCounter; + Poco::FastMutex &_mutex; public: - LongTimeTask(Poco::AtomicCounter &counter, Poco::AtomicCounter &tidCounter) : + LongTimeTask(Poco::AtomicCounter &counter, ptrdiff_t &tidCounter, Poco::FastMutex &mutex) : _counter(counter), - _tidCounter(tidCounter) + _tidCounter(tidCounter), + _mutex(mutex) {} void run() override { _counter++; - if (_tidCounter >= 0) { - _tidCounter = _tidCounter + (-1 * Poco::Thread::currentOsTid()); - } - else - { - _tidCounter = _tidCounter + Poco::Thread::currentOsTid(); + Poco::FastMutex::ScopedLock lock(_mutex); + if (_tidCounter >= 0) + { + _tidCounter -= (ptrdiff_t)Poco::Thread::currentTid(); + } + else + { + _tidCounter += (ptrdiff_t)Poco::Thread::currentTid(); + } } Poco::Thread::sleep(1 * 110); } @@ -121,12 +128,13 @@ void ActiveThreadPoolTest::testActiveThreadLoadBalancing() const int taskCount = 100; const bool redistributeTasks = true; Poco::ActiveThreadPool pool(capacity, POCO_THREAD_STACK_SIZE, redistributeTasks); - + std::vector> lttVec(taskCount); + std::vector> sttVec(taskCount); for (int i = 0; i < taskCount; i++) { - LongTimeTask ltt(lttCount, lttPerTIDCount); - pool.start(ltt); - ShortTimeTask stt(sttCount); - pool.start(stt); + lttVec[i] = std::make_unique(lttCount, lttPerTIDCount, mutex); + pool.start(*(lttVec[i])); + sttVec[i] = std::make_unique(sttCount); + pool.start(*(sttVec[i])); } pool.joinAll(); From 78a9b85f876da2e77085394107bd3ea454c9051a Mon Sep 17 00:00:00 2001 From: Alexander B Date: Wed, 8 May 2024 16:29:21 +0300 Subject: [PATCH 3/3] set optimization only if redistributeTasks is true try increase a number of tasks and remove sleep from short-task for tests --- Foundation/src/ActiveThreadPool.cpp | 11 +++++++---- Foundation/testsuite/src/ActiveThreadPoolTest.cpp | 3 +-- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/Foundation/src/ActiveThreadPool.cpp b/Foundation/src/ActiveThreadPool.cpp index d8180d675f..b867f8123d 100644 --- a/Foundation/src/ActiveThreadPool.cpp +++ b/Foundation/src/ActiveThreadPool.cpp @@ -365,13 +365,16 @@ void ActiveThreadPool::recreateThreads() _threads.push_back(pThread); pThread->start(); } - for (auto& thr : _threads) + if (_redistributeTasks) { - while (!thr->isRunning()) + for (auto& thr : _threads) { - Poco::Thread::sleep(100); + while (!thr->isRunning()) + { + Poco::Thread::sleep(100); + } + thr->setRedistributeOption(_redistributeTasks); } - thr->setRedistributeOption(_redistributeTasks); } } diff --git a/Foundation/testsuite/src/ActiveThreadPoolTest.cpp b/Foundation/testsuite/src/ActiveThreadPoolTest.cpp index 3425a40dd4..e3643aedc0 100644 --- a/Foundation/testsuite/src/ActiveThreadPoolTest.cpp +++ b/Foundation/testsuite/src/ActiveThreadPoolTest.cpp @@ -120,12 +120,11 @@ void ActiveThreadPoolTest::testActiveThreadLoadBalancing() void run() override { _counter++; - Poco::Thread::sleep(1); } }; const int capacity = 2; - const int taskCount = 100; + const int taskCount = 200; const bool redistributeTasks = true; Poco::ActiveThreadPool pool(capacity, POCO_THREAD_STACK_SIZE, redistributeTasks); std::vector> lttVec(taskCount);