From 0b127d48fdfd612b8738c6d53c2fcc9e48debfa2 Mon Sep 17 00:00:00 2001 From: Joel Filho Date: Sun, 31 May 2020 13:57:15 -0300 Subject: [PATCH 1/3] CMake fix: using EXCLUDE_FROM_ALL - preventing doctest target installation without relying on cache --- tests/CMakeLists.txt | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 3bb76a9..ba4d40c 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -1,6 +1,4 @@ -set(DOCTEST_NO_INSTALL ON CACHE BOOL "Do not install doctest with TDP by default") -set(DOCTEST_WITH_TESTS OFF CACHE BOOL "Do not install build doctest's tests and examples") -add_subdirectory(doctest) +add_subdirectory(doctest EXCLUDE_FROM_ALL) file(GLOB TEST_FILES CONFIGURE_DEPENDS "*.cpp") From cbd0eb83baa956a4e251f9a5a6fd5ad15b8b49a7 Mon Sep 17 00:00:00 2001 From: Joel Filho Date: Sun, 31 May 2020 14:05:47 -0300 Subject: [PATCH 2/3] Remove initialization from public interface --- include/tdp/pipeline.impl.hpp | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/include/tdp/pipeline.impl.hpp b/include/tdp/pipeline.impl.hpp index 1fb9db4..0791160 100644 --- a/include/tdp/pipeline.impl.hpp +++ b/include/tdp/pipeline.impl.hpp @@ -230,6 +230,7 @@ struct pipeline, Stages...> final ~pipeline() { stop_threads(); } + private: template void init_intermediary_threads(std::tuple& stages) { using inputs = util::result_list_t; @@ -332,11 +333,6 @@ struct pipeline, Stages...> final } } - private: - std::atomic_bool _stop = false; - tuple_t _queues; - std::array _threads; - void stop_threads() { // Set the "stop token" flag _stop = true; @@ -355,6 +351,11 @@ struct pipeline, Stages...> final if (thread.joinable()) thread.join(); } + + private: + std::atomic_bool _stop = false; + tuple_t _queues; + std::array _threads; }; //------------------------------------------------------------------------------------------------- From f1289a8d41b157fa3e22f330e1caca5ef897af71 Mon Sep 17 00:00:00 2001 From: Joel Filho Date: Sun, 7 Jun 2020 17:15:45 -0300 Subject: [PATCH 3/3] Pipeline idling interface (1) Initial tentative for the idle interface --- TODO.md | 2 +- examples/00_introduction.cpp | 5 + examples/01_consumer_threads.cpp | 14 +- examples/07_reference_parameters.cpp | 6 +- include/tdp/pipeline.impl.hpp | 189 ++++++++++++++++++++++++--- tests/test_consumers.cpp | 6 +- tests/test_policies.cpp | 6 +- tests/test_producers.cpp | 4 + 8 files changed, 197 insertions(+), 35 deletions(-) diff --git a/TODO.md b/TODO.md index 65b2ebe..71a673f 100644 --- a/TODO.md +++ b/TODO.md @@ -12,7 +12,7 @@ - [x] Non-locking `get()` interface - [x] Rename old interface to make it clearer it blocks - [x] Apply execution policies -- [ ] `idle()` interface +- [x] `idle()` interface - [x] Rename `running()` to remove ambiguity - [x] Use `std::invoke` wherever applicable - [x] Prohibit reference outputs (mutable lvalue reference parameters already invalid) diff --git a/examples/00_introduction.cpp b/examples/00_introduction.cpp index 4875b48..274735b 100644 --- a/examples/00_introduction.cpp +++ b/examples/00_introduction.cpp @@ -50,6 +50,11 @@ int main() { std::cout << "Pipeline is empty, can't get another result!\n"; } + // We can verify all threads on the pipeline are waiting for input with idle() + // Note: idle() may return false if any pipeline stage hasn't registered its idle state yet. + // To guarantee the pipeline is idle before exitting, see the next example. + std::cout << "pipeline.idle() = " << std::boolalpha << pipeline.idle() << std::endl; + // At the end, we don't need to do anything. // The pipeline stops all threads when its destructor is called. } \ No newline at end of file diff --git a/examples/01_consumer_threads.cpp b/examples/01_consumer_threads.cpp index 9dffd62..23e3eab 100644 --- a/examples/01_consumer_threads.cpp +++ b/examples/01_consumer_threads.cpp @@ -35,13 +35,11 @@ int main() { pipeline.input(2, 3.5); pipeline.input(7, 0.75); - // We can wait a little, do anything else, and the production will be done. - // Sometimes we want to guarantee the pipeline has finished running. - // For that, we'll use the idle() member function. - // TODO - using namespace std::chrono_literals; - std::this_thread::sleep_for(100ms); - - // The capture we used from the lambda! + // We can wait a little, do anything else, and the processing will be done. + // Sometimes we want to guarantee the pipeline has finished running before exitting. + // For that, we'll use the wait_until_idle() member function. + pipeline.wait_until_idle(); + + // The capture we used from the lambda shows us we have processed everything! std::cout << "print() has been called " << count << " times.\n"; } \ No newline at end of file diff --git a/examples/07_reference_parameters.cpp b/examples/07_reference_parameters.cpp index a41dcf4..b9bbb6f 100644 --- a/examples/07_reference_parameters.cpp +++ b/examples/07_reference_parameters.cpp @@ -33,7 +33,7 @@ int main() { pipeline.input("!dlroW olleH"); pipeline.input("TACOCAT"); - // We insert a delay before destruction, so the pipeline finishes execution. - using namespace std::chrono_literals; - std::this_thread::sleep_for(100ms); + // We wait until execution is complete with wait_until_idle() + // This way, we free the processor from scheduling this thread until then, and don't need to check pipeline.idle() + pipeline.wait_until_idle(); } \ No newline at end of file diff --git a/include/tdp/pipeline.impl.hpp b/include/tdp/pipeline.impl.hpp index 0791160..199cfa8 100644 --- a/include/tdp/pipeline.impl.hpp +++ b/include/tdp/pipeline.impl.hpp @@ -10,7 +10,9 @@ #include #include +#include #include +#include #include #include #include @@ -23,6 +25,16 @@ namespace tdp::detail { +//------------------------------------------------------------------------------------------------- +// Idle Callback: Each thread notifies its idling state +//------------------------------------------------------------------------------------------------- + +struct idle_callback { + virtual void set(std::size_t index) noexcept = 0; + virtual void reset(std::size_t index) noexcept = 0; + virtual ~idle_callback() = default; +}; + //------------------------------------------------------------------------------------------------- // Processing threads //------------------------------------------------------------------------------------------------- @@ -38,19 +50,31 @@ struct thread_worker, Callable, // using input_t = std::tuple; using output_t = std::invoke_result_t; + const std::size_t _id; + idle_callback& _idle; Callable _f; Queue& _input_queue; Queue& _output_queue; const std::atomic_bool& _stop; void operator()() noexcept { + _idle.reset(_id); while (!_stop) { - auto val = _input_queue.pop_unless([&] { return _stop.load(); }); - if (!val) - break; - auto&& res = std::apply(_f, std::move(*val)); - _output_queue.push(std::move(res)); + if (_input_queue.empty()) { + _idle.set(_id); + auto val = _input_queue.pop_unless([&] { return _stop.load(); }); + if (!val) + break; + _idle.reset(_id); + auto&& res = std::apply(_f, std::move(*val)); + _output_queue.push(std::move(res)); + } else { + auto&& val = _input_queue.pop(); + auto&& res = std::apply(_f, std::move(val)); + _output_queue.push(std::move(res)); + } } + _idle.set(_id); _output_queue.wake(); } }; @@ -60,16 +84,27 @@ template