aGrUM 2.3.2
a C++ library for (probabilistic) graphical models
schedulerParallel.cpp
Go to the documentation of this file.
1/****************************************************************************
2 * This file is part of the aGrUM/pyAgrum library. *
3 * *
4 * Copyright (c) 2005-2025 by *
5 * - Pierre-Henri WUILLEMIN(_at_LIP6) *
6 * - Christophe GONZALES(_at_AMU) *
7 * *
8 * The aGrUM/pyAgrum library is free software; you can redistribute it *
9 * and/or modify it under the terms of either : *
10 * *
11 * - the GNU Lesser General Public License as published by *
12 * the Free Software Foundation, either version 3 of the License, *
13 * or (at your option) any later version, *
14 * - the MIT license (MIT), *
15 * - or both in dual license, as here. *
16 * *
17 * (see https://agrum.gitlab.io/articles/dual-licenses-lgplv3mit.html) *
18 * *
19 * This aGrUM/pyAgrum library is distributed in the hope that it will be *
20 * useful, but WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, *
21 * INCLUDING BUT NOT LIMITED TO THE WARRANTIES MERCHANTABILITY or FITNESS *
22 * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE *
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER *
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, *
25 * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR *
26 * OTHER DEALINGS IN THE SOFTWARE. *
27 * *
28 * See LICENCES for more details. *
29 * *
30 * SPDX-FileCopyrightText: Copyright 2005-2025 *
31 * - Pierre-Henri WUILLEMIN(_at_LIP6) *
32 * - Christophe GONZALES(_at_AMU) *
33 * SPDX-License-Identifier: LGPL-3.0-or-later OR MIT *
34 * *
35 * Contact : info_at_agrum_dot_org *
36 * homepage : http://agrum.gitlab.io *
37 * gitlab : https://gitlab.com/agrumery/agrum *
38 * *
39 ****************************************************************************/
40
41
48
51
52#ifndef DOXYGEN_SHOULD_SKIP_THIS
53
55# ifdef GUM_NO_INLINE
57# endif /* GUM_NO_INLINE */
58
59namespace gum {
60
62 SchedulerParallel::SchedulerParallel(Size max_nb_threads, double max_megabyte_memory) :
63 Scheduler(max_nb_threads, max_megabyte_memory),
64 _sequential_scheduler_(max_nb_threads, max_megabyte_memory) {
65 // for debugging purposes
66 GUM_CONSTRUCTOR(SchedulerParallel);
67 }
68
70 SchedulerParallel::SchedulerParallel(const SchedulerParallel& from) :
71 Scheduler(from), _sequential_scheduler_(from._sequential_scheduler_) {
72 // for debugging purposes
73 GUM_CONS_CPY(SchedulerParallel);
74 }
75
77 SchedulerParallel::SchedulerParallel(SchedulerParallel&& from) :
78 Scheduler(std::move(from)), _sequential_scheduler_(std::move(from._sequential_scheduler_)) {
79 // for debugging purposes
80 GUM_CONS_MOV(SchedulerParallel);
81 }
82
84 SchedulerParallel* SchedulerParallel::clone() const { return new SchedulerParallel(*this); }
85
87 SchedulerParallel::~SchedulerParallel() {
88 // for debugging purposes
89 GUM_DESTRUCTOR(SchedulerParallel);
90 }
91
93 void SchedulerParallel::execute(Schedule& schedule) {
94 // check that, a priori, we have sufficient memory to execute everything.
95 // If this is not the case, then we should raise an exception before even
96 // trying to execute any operation of the schedule
97 if (this->_max_memory != 0.0) {
98 SchedulerSequential seq_scheduler(this->getNumberOfThreads(), this->maxMemory());
99 auto memory_usage = seq_scheduler.memoryUsage(schedule);
100 if (memory_usage.first > this->_max_memory) { throw std::bad_alloc(); }
101 }
102
103 // compute the set of operations to perform
104 List< NodeId > available_nodes;
105 const DAG& dag = schedule.dag();
106 NodeSet nodes_to_execute(dag.sizeNodes());
107 {
108 for (const auto node: schedule.availableOperations()) {
109 available_nodes.insert(node);
110 }
111
112 List< NodeId > nodes = available_nodes;
113 while (!nodes.empty()) {
114 const NodeId node = nodes.front();
115 nodes.popFront();
116 if (!nodes_to_execute.exists(node)) {
117 nodes_to_execute.insert(node);
118 for (const auto child: dag.children(node))
119 nodes.insert(child);
120 }
121 }
122 }
123 if (nodes_to_execute.empty()) return;
124
125 // indicate for each node, the number of parents remaining to execute
126 std::atomic< Size > nb_remaining_operations(nodes_to_execute.size());
127 NodeProperty< std::atomic< Size > > nb_parents_to_execute(nb_remaining_operations);
128 for (const auto node: nodes_to_execute) {
129 Size nb_unexecuted_parents = dag.parents(node).size();
130 for (const auto parent: dag.parents(node))
131 if (schedule.operation(parent).isExecuted()) --nb_unexecuted_parents;
132 nb_parents_to_execute.emplace(node, nb_unexecuted_parents);
133 }
134 nodes_to_execute.clear();
135
136 // compute the number of threads to execute. Desired_nb_threads equals either
137 // the number of threads asked by the user or, if the used did not ask for a
138 // particular number, the aGrUM's current max number of threads
139 const auto desired_nb_threads = this->isGumNumberOfThreadsOverriden()
140 ? this->getNumberOfThreads()
142 const Size nb_threads = nb_remaining_operations.load() < desired_nb_threads
143 ? nb_remaining_operations.load()
144 : desired_nb_threads;
145
146 // indicate which threads are active
147 std::vector< std::atomic< bool > > active_threads(nb_remaining_operations.load());
148 for (auto& thread: active_threads)
149 thread = false;
150
151 // assign the available nodes to the threads
152 std::vector< std::atomic< NodeId > > thread2node(nb_remaining_operations.load());
153 for (auto& node: thread2node)
154 node = 0;
155 {
156 Idx thread_index = Idx(0);
157 while (!available_nodes.empty()) {
158 const NodeId node = available_nodes.front();
159 available_nodes.popFront();
160 thread2node[thread_index] = node;
161 active_threads[thread_index] = true;
162 ++thread_index;
163 if (thread_index >= nb_threads) break;
164 }
165 }
166
167 // prepare keeping information about memory usage. This is useful if the user
168 // added constraints on memory usage. When operations cannot be performed
169 // due to memory shortage, they will be temporarily stored into ordered vectors
170 // double overall_memory_used = 0.0; // the current memory used by all the threads
171
172 // create the mutexes needed for threads synchronization
173 std::vector< std::mutex > thread2mutex(nb_threads);
174 std::vector< std::condition_variable > thread2not_empty(nb_threads);
175 std::mutex overall_mutex;
176
177 // std::vector< ThreadData< std::pair<int,double> > >
178 // stats(nb_threads, {std::pair<int,double>(0,0.0)});
179
180 // here, we create a lambda that will be executed by all the threads
181 // to execute the operations in a parallel manner
182 auto opExecute = [&schedule,
183 &nb_parents_to_execute,
184 &overall_mutex,
185 &thread2mutex,
186 &thread2not_empty,
187 &active_threads,
188 &thread2node,
189 &available_nodes,
190 // &stats,
191 &nb_remaining_operations](const std::size_t this_thread,
192 const std::size_t nb_threads) -> void {
193 const DAG& dag = schedule.dag();
194
195 // get the synchronization objects
196 auto& this_mutex = thread2mutex[this_thread];
197 auto& this_not_empty = thread2not_empty[this_thread];
198 auto& this_active_thread = active_threads[this_thread];
199
200 // get the operation to execute
201 auto& node_to_execute = thread2node[this_thread];
202 Size nb_remaining;
203
204 // sets the condition to wait for new nodes
205 const auto duration = std::chrono::milliseconds(2);
206 auto has_node_to_process = [&node_to_execute, &nb_remaining_operations, &nb_remaining] {
207 nb_remaining = nb_remaining_operations.load();
208 return (node_to_execute != NodeId(0)) || (nb_remaining == Size(0));
209 };
210
211 while (true) {
212 { // use brace for unique_lock's scope
213
214 // wait until some operation is available or all the operations have
215 // been executed
216 std::unique_lock< std::mutex > lock(this_mutex);
217 do {
218 } while (!this_not_empty.wait_for(lock, duration, has_node_to_process));
219
220 // if all the operations have been executed, stop the thread
221 if (nb_remaining == Size(0)) {
222 thread2not_empty[(this_thread + 1) % nb_threads].notify_one();
223 return;
224 }
225
226 this_active_thread = true;
227 // ++stats[this_thread].data.first;
228 // stats[this_thread].data.second += schedule.operation(node_to_execute).nbOperations();
229 }
230
231 // now, actually execute the operation
232 const_cast< ScheduleOperator& >(schedule.operation(node_to_execute)).execute();
233 nb_remaining_operations.fetch_sub(1);
234
235 // compute the next operations available to be executed
236 std::vector< NodeId > new_available_nodes;
237 new_available_nodes.reserve(nb_remaining);
238 for (const auto child_node: dag.children(node_to_execute)) {
239 // indicate that node has been executed and get the set of nodes
240 // without parents yet to execute. Note that nb_parents_to_execute[child_node]
241 // is an atomic<Size>. Hence, the --operation is atomic and only one thread
242 // can pass the if statement below
243 if (--nb_parents_to_execute[child_node] == Size(0)) {
244 new_available_nodes.push_back(child_node);
245 }
246 }
247
248 // if there are new nodes to process. Assign one node to the current thread
249 bool this_thread_is_inactive = false;
250 if (!new_available_nodes.empty()) {
251 // no need to lock with a mutex here since, the thread being still active,
252 // no other thread can assign it another node
253 node_to_execute = new_available_nodes.back();
254 new_available_nodes.pop_back();
255 } else {
256 // here, no new node was assigned, so the thread becomes inactive
257 this_thread_is_inactive = true;
258 }
259
260 // There may still exist nodes to process. We will try to assign them to the
261 // set of inactive threads. So, get a list of threads that seem inactive. As we
262 // do not guard the reads of these inactive_threads, some threads might seem to
263 // be currently inactive but can become active before we try to assign them some
264 // operations to perform
265 std::vector< Idx > inactive_threads;
266 inactive_threads.reserve(nb_threads);
267 for (Idx i = 0; i < nb_threads; ++i) {
268 if (!active_threads[i]) inactive_threads.push_back(i);
269 }
270
271 // If we did not assign a new node to the current thread, consider it now
272 // as inactive. Below, we may be able to assign it a node that was stored
273 // into the overall set of available nodes
274 if (this_thread_is_inactive) {
275 inactive_threads.push_back(this_thread);
276 this_mutex.lock();
277 this_active_thread = false;
278 node_to_execute = 0;
279 this_mutex.unlock();
280 }
281
282 // if no inactive thread was identified, we store the newly available nodes
283 // into the overall set of available nodes and we go back to the waiting step
284 if (inactive_threads.empty()) {
285 if (!new_available_nodes.empty()) {
286 // lock the next shared memory operations
287 std::lock_guard< std::mutex > overall_lock(overall_mutex);
288 for (const auto node: new_available_nodes)
289 available_nodes.insert(node);
290 }
291
292 // go to the waiting step
293 continue;
294 }
295
296 // here we know that there are some threads that are still inactive.
297
298 // try to assign the nodes in new_available_nodes to these inactive threads.
299 // Here, we guard what we do with mutexes in order to get really inactive
300 // threads performing operations that are guaranteed to be different.
301 bool no_more_inactive_threads = false;
302 {
303 Idx i = inactive_threads.size() - 1;
304 Idx thread;
305 while (!new_available_nodes.empty()) {
306 while (!no_more_inactive_threads) {
307 thread = inactive_threads[i];
308 { // use brace for lock_guard's scope
309 std::lock_guard< std::mutex > lock(thread2mutex[thread]);
310 if (!active_threads[thread]) {
311 active_threads[thread] = true;
312 thread2node[thread] = new_available_nodes.back();
313 new_available_nodes.pop_back();
314 thread2not_empty[thread].notify_one();
315
316 if (i != Idx(0)) --i;
317 else no_more_inactive_threads = true;
318 break;
319 }
320 }
321 if (i != Idx(0)) --i;
322 else no_more_inactive_threads = true;
323 }
324
325 if (no_more_inactive_threads) break;
326 }
327
328 // do not take anymore into account the threads we have assigned
329 // operations to
330 if (no_more_inactive_threads) {
331 inactive_threads.clear();
332 } else {
333 inactive_threads.resize(i + 1);
334 }
335 }
336
337 // add the remaining elements of new_available_nodes to available_nodes and
338 // try to assign the nodes in available_nodes to the remaining inactive threads.
339 // Here, we guard what we do with mutexes in order to get really inactive
340 // threads performing operations that are guaranteed to be different.
341 {
342 // lock the next shared memory operations
343 std::lock_guard< std::mutex > overall_lock(overall_mutex);
344
345 for (const auto node: new_available_nodes)
346 available_nodes.insert(node);
347
348 Idx i = inactive_threads.size() - 1;
349 Idx thread;
350 while (!available_nodes.empty()) {
351 while (!no_more_inactive_threads) {
352 thread = inactive_threads[i];
353 {
354 std::lock_guard< std::mutex > lock(thread2mutex[thread]);
355 if (!active_threads[thread]) {
356 active_threads[thread] = true;
357 thread2node[thread] = available_nodes.front();
358 available_nodes.popFront();
359 thread2not_empty[thread].notify_one();
360 if (i != Idx(0)) --i;
361 else no_more_inactive_threads = true;
362 break;
363 }
364 }
365
366 if (i != Idx(0)) --i;
367 else no_more_inactive_threads = true;
368 }
369 if (no_more_inactive_threads) break;
370 }
371 }
372 }
373 };
374
375 // launch the threads
376 ThreadExecutor::execute(nb_threads, opExecute);
377
378 /*
379 std::cout << "nb_ops =";
380 for (const auto& nb : stats)
381 std::cout << " " << nb.data.first << '(' << nb.data.second << ')';
382 std::cout << std::endl;
383 */
384 }
385
386} /* namespace gum */
387
388#endif /* DOXYGEN_SHOULD_SKIP_THIS */
SchedulerParallel(Size max_nb_threads=0, double max_megabyte_memory=0.0)
default constructor
The common interface of all the schedulers.
Definition scheduler.h:71
std::size_t Size
In aGrUM, hashed values are unsigned long int.
Definition types.h:74
Set< NodeId > NodeSet
Some typdefs and define for shortcuts ...
unsigned int getNumberOfThreads()
returns the max number of threads used by default when entering the next parallel region
gum is the global namespace for all aGrUM entities
Definition agrum.h:46
STL namespace.
A scheduler that executes available operators in parallel.
a parallel scheduler that executes available operations
A wrapper that enables to store data in a way that prevents false cacheline sharing.