diff --git a/TODO.md b/TODO.md index 65b2ebe..71a673f 100644 --- a/TODO.md +++ b/TODO.md @@ -12,7 +12,7 @@ - [x] Non-locking `get()` interface - [x] Rename old interface to make it clearer it blocks - [x] Apply execution policies -- [ ] `idle()` interface +- [x] `idle()` interface - [x] Rename `running()` to remove ambiguity - [x] Use `std::invoke` wherever applicable - [x] Prohibit reference outputs (mutable lvalue reference parameters already invalid) diff --git a/examples/00_introduction.cpp b/examples/00_introduction.cpp index 4875b48..274735b 100644 --- a/examples/00_introduction.cpp +++ b/examples/00_introduction.cpp @@ -50,6 +50,11 @@ int main() { std::cout << "Pipeline is empty, can't get another result!\n"; } + // We can verify all threads on the pipeline are waiting for input with idle() + // Note: idle() may return false if any pipeline stage hasn't registered its idle state yet. + // To guarantee the pipeline is idle before exitting, see the next example. + std::cout << "pipeline.idle() = " << std::boolalpha << pipeline.idle() << std::endl; + // At the end, we don't need to do anything. // The pipeline stops all threads when its destructor is called. } \ No newline at end of file diff --git a/examples/01_consumer_threads.cpp b/examples/01_consumer_threads.cpp index 9dffd62..23e3eab 100644 --- a/examples/01_consumer_threads.cpp +++ b/examples/01_consumer_threads.cpp @@ -35,13 +35,11 @@ int main() { pipeline.input(2, 3.5); pipeline.input(7, 0.75); - // We can wait a little, do anything else, and the production will be done. - // Sometimes we want to guarantee the pipeline has finished running. - // For that, we'll use the idle() member function. - // TODO - using namespace std::chrono_literals; - std::this_thread::sleep_for(100ms); - - // The capture we used from the lambda! + // We can wait a little, do anything else, and the processing will be done. + // Sometimes we want to guarantee the pipeline has finished running before exitting. + // For that, we'll use the wait_until_idle() member function. + pipeline.wait_until_idle(); + + // The capture we used from the lambda shows us we have processed everything! std::cout << "print() has been called " << count << " times.\n"; } \ No newline at end of file diff --git a/examples/07_reference_parameters.cpp b/examples/07_reference_parameters.cpp index a41dcf4..b9bbb6f 100644 --- a/examples/07_reference_parameters.cpp +++ b/examples/07_reference_parameters.cpp @@ -33,7 +33,7 @@ int main() { pipeline.input("!dlroW olleH"); pipeline.input("TACOCAT"); - // We insert a delay before destruction, so the pipeline finishes execution. - using namespace std::chrono_literals; - std::this_thread::sleep_for(100ms); + // We wait until execution is complete with wait_until_idle() + // This way, we free the processor from scheduling this thread until then, and don't need to check pipeline.idle() + pipeline.wait_until_idle(); } \ No newline at end of file diff --git a/include/tdp/pipeline.impl.hpp b/include/tdp/pipeline.impl.hpp index 1fb9db4..199cfa8 100644 --- a/include/tdp/pipeline.impl.hpp +++ b/include/tdp/pipeline.impl.hpp @@ -10,7 +10,9 @@ #include #include +#include #include +#include #include #include #include @@ -23,6 +25,16 @@ namespace tdp::detail { +//------------------------------------------------------------------------------------------------- +// Idle Callback: Each thread notifies its idling state +//------------------------------------------------------------------------------------------------- + +struct idle_callback { + virtual void set(std::size_t index) noexcept = 0; + virtual void reset(std::size_t index) noexcept = 0; + virtual ~idle_callback() = default; +}; + //------------------------------------------------------------------------------------------------- // Processing threads //------------------------------------------------------------------------------------------------- @@ -38,19 +50,31 @@ struct thread_worker, Callable, // using input_t = std::tuple; using output_t = std::invoke_result_t; + const std::size_t _id; + idle_callback& _idle; Callable _f; Queue& _input_queue; Queue& _output_queue; const std::atomic_bool& _stop; void operator()() noexcept { + _idle.reset(_id); while (!_stop) { - auto val = _input_queue.pop_unless([&] { return _stop.load(); }); - if (!val) - break; - auto&& res = std::apply(_f, std::move(*val)); - _output_queue.push(std::move(res)); + if (_input_queue.empty()) { + _idle.set(_id); + auto val = _input_queue.pop_unless([&] { return _stop.load(); }); + if (!val) + break; + _idle.reset(_id); + auto&& res = std::apply(_f, std::move(*val)); + _output_queue.push(std::move(res)); + } else { + auto&& val = _input_queue.pop(); + auto&& res = std::apply(_f, std::move(val)); + _output_queue.push(std::move(res)); + } } + _idle.set(_id); _output_queue.wake(); } }; @@ -60,16 +84,27 @@ template