52#ifndef DOXYGEN_SHOULD_SKIP_THIS
63 Scheduler(max_nb_threads, max_megabyte_memory),
64 _sequential_scheduler_(max_nb_threads, max_megabyte_memory) {
66 GUM_CONSTRUCTOR(SchedulerParallel);
70 SchedulerParallel::SchedulerParallel(
const SchedulerParallel& from) :
71 Scheduler(from), _sequential_scheduler_(from._sequential_scheduler_) {
73 GUM_CONS_CPY(SchedulerParallel);
77 SchedulerParallel::SchedulerParallel(SchedulerParallel&& from) :
78 Scheduler(
std::move(from)), _sequential_scheduler_(
std::move(from._sequential_scheduler_)) {
80 GUM_CONS_MOV(SchedulerParallel);
84 SchedulerParallel* SchedulerParallel::clone()
const {
return new SchedulerParallel(*
this); }
87 SchedulerParallel::~SchedulerParallel() {
89 GUM_DESTRUCTOR(SchedulerParallel);
93 void SchedulerParallel::execute(Schedule& schedule) {
97 if (this->_max_memory != 0.0) {
99 auto memory_usage = seq_scheduler.memoryUsage(schedule);
100 if (memory_usage.first > this->_max_memory) {
throw std::bad_alloc(); }
104 List< NodeId > available_nodes;
105 const DAG& dag = schedule.dag();
106 NodeSet nodes_to_execute(dag.sizeNodes());
108 for (
const auto node: schedule.availableOperations()) {
109 available_nodes.insert(node);
112 List< NodeId > nodes = available_nodes;
113 while (!nodes.empty()) {
114 const NodeId node = nodes.front();
116 if (!nodes_to_execute.exists(node)) {
117 nodes_to_execute.insert(node);
118 for (
const auto child: dag.children(node))
123 if (nodes_to_execute.empty())
return;
126 std::atomic< Size > nb_remaining_operations(nodes_to_execute.size());
127 NodeProperty< std::atomic< Size > > nb_parents_to_execute(nb_remaining_operations);
128 for (
const auto node: nodes_to_execute) {
129 Size nb_unexecuted_parents = dag.parents(node).size();
130 for (
const auto parent: dag.parents(node))
131 if (schedule.operation(parent).isExecuted()) --nb_unexecuted_parents;
132 nb_parents_to_execute.emplace(node, nb_unexecuted_parents);
134 nodes_to_execute.clear();
139 const auto desired_nb_threads = this->isGumNumberOfThreadsOverriden()
142 const Size nb_threads = nb_remaining_operations.load() < desired_nb_threads
143 ? nb_remaining_operations.load()
144 : desired_nb_threads;
147 std::vector< std::atomic< bool > > active_threads(nb_remaining_operations.load());
148 for (
auto& thread: active_threads)
152 std::vector< std::atomic< NodeId > > thread2node(nb_remaining_operations.load());
153 for (
auto& node: thread2node)
156 Idx thread_index = Idx(0);
157 while (!available_nodes.empty()) {
158 const NodeId node = available_nodes.front();
159 available_nodes.popFront();
160 thread2node[thread_index] = node;
161 active_threads[thread_index] =
true;
163 if (thread_index >= nb_threads)
break;
173 std::vector< std::mutex > thread2mutex(nb_threads);
174 std::vector< std::condition_variable > thread2not_empty(nb_threads);
175 std::mutex overall_mutex;
182 auto opExecute = [&schedule,
183 &nb_parents_to_execute,
191 &nb_remaining_operations](
const std::size_t this_thread,
192 const std::size_t nb_threads) ->
void {
193 const DAG& dag = schedule.dag();
196 auto& this_mutex = thread2mutex[this_thread];
197 auto& this_not_empty = thread2not_empty[this_thread];
198 auto& this_active_thread = active_threads[this_thread];
201 auto& node_to_execute = thread2node[this_thread];
205 const auto duration = std::chrono::milliseconds(2);
206 auto has_node_to_process = [&node_to_execute, &nb_remaining_operations, &nb_remaining] {
207 nb_remaining = nb_remaining_operations.load();
208 return (node_to_execute != NodeId(0)) || (nb_remaining == Size(0));
216 std::unique_lock< std::mutex > lock(this_mutex);
218 }
while (!this_not_empty.wait_for(lock, duration, has_node_to_process));
221 if (nb_remaining == Size(0)) {
222 thread2not_empty[(this_thread + 1) % nb_threads].notify_one();
226 this_active_thread =
true;
232 const_cast< ScheduleOperator&
>(schedule.operation(node_to_execute)).execute();
233 nb_remaining_operations.fetch_sub(1);
236 std::vector< NodeId > new_available_nodes;
237 new_available_nodes.reserve(nb_remaining);
238 for (
const auto child_node: dag.children(node_to_execute)) {
243 if (--nb_parents_to_execute[child_node] == Size(0)) {
244 new_available_nodes.push_back(child_node);
249 bool this_thread_is_inactive =
false;
250 if (!new_available_nodes.empty()) {
253 node_to_execute = new_available_nodes.back();
254 new_available_nodes.pop_back();
257 this_thread_is_inactive =
true;
265 std::vector< Idx > inactive_threads;
266 inactive_threads.reserve(nb_threads);
267 for (Idx i = 0; i < nb_threads; ++i) {
268 if (!active_threads[i]) inactive_threads.push_back(i);
274 if (this_thread_is_inactive) {
275 inactive_threads.push_back(this_thread);
277 this_active_thread =
false;
284 if (inactive_threads.empty()) {
285 if (!new_available_nodes.empty()) {
287 std::lock_guard< std::mutex > overall_lock(overall_mutex);
288 for (
const auto node: new_available_nodes)
289 available_nodes.insert(node);
301 bool no_more_inactive_threads =
false;
303 Idx i = inactive_threads.size() - 1;
305 while (!new_available_nodes.empty()) {
306 while (!no_more_inactive_threads) {
307 thread = inactive_threads[i];
309 std::lock_guard< std::mutex > lock(thread2mutex[thread]);
310 if (!active_threads[thread]) {
311 active_threads[thread] =
true;
312 thread2node[thread] = new_available_nodes.back();
313 new_available_nodes.pop_back();
314 thread2not_empty[thread].notify_one();
316 if (i != Idx(0)) --i;
317 else no_more_inactive_threads =
true;
321 if (i != Idx(0)) --i;
322 else no_more_inactive_threads =
true;
325 if (no_more_inactive_threads)
break;
330 if (no_more_inactive_threads) {
331 inactive_threads.clear();
333 inactive_threads.resize(i + 1);
343 std::lock_guard< std::mutex > overall_lock(overall_mutex);
345 for (
const auto node: new_available_nodes)
346 available_nodes.insert(node);
348 Idx i = inactive_threads.size() - 1;
350 while (!available_nodes.empty()) {
351 while (!no_more_inactive_threads) {
352 thread = inactive_threads[i];
354 std::lock_guard< std::mutex > lock(thread2mutex[thread]);
355 if (!active_threads[thread]) {
356 active_threads[thread] =
true;
357 thread2node[thread] = available_nodes.front();
358 available_nodes.popFront();
359 thread2not_empty[thread].notify_one();
360 if (i != Idx(0)) --i;
361 else no_more_inactive_threads =
true;
366 if (i != Idx(0)) --i;
367 else no_more_inactive_threads =
true;
369 if (no_more_inactive_threads)
break;
376 ThreadExecutor::execute(nb_threads, opExecute);
SchedulerParallel(Size max_nb_threads=0, double max_megabyte_memory=0.0)
default constructor
The common interface of all the schedulers.
std::size_t Size
In aGrUM, hashed values are unsigned long int.
Set< NodeId > NodeSet
Some typdefs and define for shortcuts ...
unsigned int getNumberOfThreads()
returns the max number of threads used by default when entering the next parallel region
gum is the global namespace for all aGrUM entities
A scheduler that executes available operators in parallel.
a parallel scheduler that executes available operations
A wrapper that enables to store data in a way that prevents false cacheline sharing.