aGrUM 2.3.2
a C++ library for (probabilistic) graphical models
schedulerSequential.cpp
Go to the documentation of this file.
1/****************************************************************************
2 * This file is part of the aGrUM/pyAgrum library. *
3 * *
4 * Copyright (c) 2005-2025 by *
5 * - Pierre-Henri WUILLEMIN(_at_LIP6) *
6 * - Christophe GONZALES(_at_AMU) *
7 * *
8 * The aGrUM/pyAgrum library is free software; you can redistribute it *
9 * and/or modify it under the terms of either : *
10 * *
11 * - the GNU Lesser General Public License as published by *
12 * the Free Software Foundation, either version 3 of the License, *
13 * or (at your option) any later version, *
14 * - the MIT license (MIT), *
15 * - or both in dual license, as here. *
16 * *
17 * (see https://agrum.gitlab.io/articles/dual-licenses-lgplv3mit.html) *
18 * *
19 * This aGrUM/pyAgrum library is distributed in the hope that it will be *
20 * useful, but WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, *
21 * INCLUDING BUT NOT LIMITED TO THE WARRANTIES MERCHANTABILITY or FITNESS *
22 * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE *
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER *
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, *
25 * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR *
26 * OTHER DEALINGS IN THE SOFTWARE. *
27 * *
28 * See LICENCES for more details. *
29 * *
30 * SPDX-FileCopyrightText: Copyright 2005-2025 *
31 * - Pierre-Henri WUILLEMIN(_at_LIP6) *
32 * - Christophe GONZALES(_at_AMU) *
33 * SPDX-License-Identifier: LGPL-3.0-or-later OR MIT *
34 * *
35 * Contact : info_at_agrum_dot_org *
36 * homepage : http://agrum.gitlab.io *
37 * gitlab : https://gitlab.com/agrumery/agrum *
38 * *
39 ****************************************************************************/
40
41
48
50
51#ifndef DOXYGEN_SHOULD_SKIP_THIS
52
54# ifdef GUM_NO_INLINE
56# endif /* GUM_NO_INLINE */
57
58namespace gum {
59
61 SchedulerSequential::SchedulerSequential(Size max_nb_threads, double max_megabyte_memory) :
62 Scheduler(max_nb_threads, max_megabyte_memory) {
63 // for debugging purposes
64 GUM_CONSTRUCTOR(SchedulerSequential);
65 }
66
68 SchedulerSequential::SchedulerSequential(const SchedulerSequential& from) : Scheduler(from) {
69 // for debugging purposes
70 GUM_CONS_CPY(SchedulerSequential);
71 }
72
74 SchedulerSequential::SchedulerSequential(SchedulerSequential&& from) :
75 Scheduler(std::move(from)) {
76 // for debugging purposes
77 GUM_CONS_MOV(SchedulerSequential);
78 }
79
81 SchedulerSequential* SchedulerSequential::clone() const { return new SchedulerSequential(*this); }
82
84 SchedulerSequential::~SchedulerSequential() {
85 // for debugging purposes
86 GUM_DESTRUCTOR(SchedulerSequential);
87 }
88
90 void SchedulerSequential::execute(Schedule& schedule) {
91 // if we do not have the right set of operations stored into operations,
92 // then recompute it
93 _setSchedule_(schedule);
94
95 // check that, a priori, we have sufficient memory to execute everything.
96 // If this is not the case, then we should raise an exception before even
97 // trying to execute any operation of the schedule
98 if ((this->_max_memory != 0.0) && _memory_usage_.first > this->_max_memory) {
99 throw std::bad_alloc();
100 }
101
102 // execute the schedule
103 for (const auto node: _operations_) {
104 // try to execute the operation. If this raises an exception, then
105 // we do not try to catch it
106 auto& op = const_cast< ScheduleOperator& >(schedule.operation(node));
107 op.execute();
108
109 // if we succeeded to execute the operation, indicate that the set of all
110 // the operations of the schedule is no more up to date
111 _operations_up_to_date_ = false;
112 }
113 }
114
117 double SchedulerSequential::nbOperations(const Schedule& schedule) {
118 double nb_ops = 0.0;
119
120 for (const auto node: schedule.dag()) {
121 nb_ops += schedule.operation(node).nbOperations();
122 }
123
124 return nb_ops;
125 }
126
128 bool SchedulerSequential::_cmp_(const UnexecutedOperation& a, const UnexecutedOperation& b) {
129 if (a.max_memory_usage < b.max_memory_usage) return true;
130 if (b.max_memory_usage < a.max_memory_usage) return false;
131 return (a.end_memory_usage < b.end_memory_usage);
132 }
133
135 std::pair< double, double > SchedulerSequential::memoryUsage(const Schedule& schedule) {
136 // if we do not have the right memory usage stored into _memory_usage_,
137 // then recompute it
138 _setSchedule_(schedule);
139
140 return _memory_usage_;
141 }
142
144 Size SchedulerSequential::_addExecutableOps_(
145 std::vector< UnexecutedOperation >& unexecuted_deletions,
146 std::vector< UnexecutedOperation >& unexecuted_operations,
147 bool& unexecuted_deletions_sorted,
148 bool& unexecuted_operations_sorted,
149 double memory_used,
150 double max_memory,
151 List< NodeId >& available_nodes) const {
152 double added_mem = 0.0;
153
154 // sort the vectors by increasing memory usage
155 if (!unexecuted_deletions_sorted) {
156 std::sort(unexecuted_deletions.begin(), unexecuted_deletions.end(), _cmp_);
157 unexecuted_deletions_sorted = true;
158 }
159 if (!unexecuted_operations_sorted) {
160 std::sort(unexecuted_operations.begin(), unexecuted_operations.end(), _cmp_);
161 unexecuted_operations_sorted = true;
162 }
163
164
165 // get the index of the last executable deletion
166 std::size_t i_del = 0;
167 for (std::size_t end = unexecuted_deletions.size(); i_del < end; ++i_del) {
168 if (memory_used + added_mem + unexecuted_deletions[i_del].max_memory_usage > max_memory)
169 break;
170 added_mem += unexecuted_deletions[i_del].end_memory_usage;
171 }
172
173 // get the index of the last executable operation (that does not imply
174 // any deletion
175 std::size_t i_op = 0;
176 for (std::size_t end = unexecuted_operations.size(); i_op < end; ++i_op) {
177 if (memory_used + added_mem + unexecuted_operations[i_op].max_memory_usage > max_memory)
178 break;
179 added_mem += unexecuted_operations[i_op].end_memory_usage;
180 }
181
182
183 // insert the executable operations (not deletions) into available_nodes
184 if (i_op != 0) {
185 for (std::size_t j = i_op - 1, i = i_op; i > 0; --j, --i)
186 available_nodes.pushFront(unexecuted_operations[j].node);
187
188 unexecuted_operations.erase(unexecuted_operations.begin(),
189 unexecuted_operations.begin() + i_op);
190 }
191
192 // insert the executable deletions into available_nodes
193 if (i_del != 0) {
194 for (std::size_t j = i_del - 1, i = 0; i > 0; --j, --i)
195 available_nodes.pushFront(unexecuted_deletions[j].node);
196
197 unexecuted_deletions.erase(unexecuted_deletions.begin(),
198 unexecuted_deletions.begin() + i_del);
199 }
200
201 return i_op + i_del;
202 }
203
206 void SchedulerSequential::_simulateDAGUpdate_(DAG& dag,
207 const NodeId node,
208 std::vector< NodeId >& new_available_nodes) const {
209 new_available_nodes.clear();
210
211 for (const auto child_node: dag.children(node)) {
212 if (dag.parents(child_node).size() == 1) { // only node is a parent
213 new_available_nodes.push_back(child_node);
214 }
215 }
216
217 // remove node_exec
218 dag.eraseNode(node);
219 }
220
222 void SchedulerSequential::_simulateExecuteOneOperation_(
223 const NodeId node,
224 ScheduleOperator& op,
225 DAG& dag,
226 List< NodeId >& available_nodes,
227 std::vector< NodeId >& new_available_nodes) {
228 // save the fact that operation node/op is to be executed
229 _operations_.push_back(node);
230
231 // whenever an operation has been executed, update the schedule
232 _simulateDAGUpdate_(dag, node, new_available_nodes);
233
234 // add the new available_nodes nodes to List available_nodes: make it so
235 // that the operations that imply deletions are first (hence, they should
236 // decrease the stress on memory usage)
237 for (const auto new_node: new_available_nodes) {
238 if (!_schedule_->operation(new_node).implyDeletion()) available_nodes.pushFront(new_node);
239 }
240 for (const auto new_node: new_available_nodes) {
241 if (_schedule_->operation(new_node).implyDeletion()) available_nodes.pushFront(new_node);
242 }
243 }
244
246 void SchedulerSequential::_simulateExecution_() {
247 // get the DAG of the operations to perform
248 DAG dag = _schedule_->dag();
249
250 _operations_.clear();
251 _operations_.reserve(dag.sizeNodes());
252
253 // the list of available_nodes nodes to perform
254 List< NodeId > available_nodes;
255 std::vector< NodeId > new_available_nodes; // nodes available after an execution
256
257 // fill available_nodes by inserting at the front the deletions and at the
258 // back the other nodes. This will enable to free memory as soon as possible
259 for (const auto node: _schedule_->availableOperations()) {
260 if (_schedule_->operation(node).implyDeletion()) available_nodes.pushFront(node);
261 else available_nodes.pushBack(node);
262 }
263
264 // if there is nothing to do, don't do it
265 if (available_nodes.empty()) {
266 _memory_usage_ = {0.0, 0.0};
267 _operations_up_to_date_ = true;
268 return;
269 }
270
271
272 // prepare keeping information about memory usage. This is useful if the user
273 // added constraints on memory usage. When operations cannot be performed
274 // due to memory shortage, they will be temporarily stored into ordered vectors
275 double memory_used = 0; // the current memory used
276 double max_memory_used = 0; // max memory used so far
277 double max_memory = this->_max_memory; // max memory allowed
278 std::vector< UnexecutedOperation > unexecuted_deletions;
279 std::vector< UnexecutedOperation > unexecuted_operations;
280 bool unexecuted_deletions_sorted = true;
281 bool unexecuted_operations_sorted = true;
282
283 // perform the schedule
284 bool schedule_fully_performed = false;
285 do {
286 while (!available_nodes.empty()) {
287 // get the first operation available_nodes
288 const NodeId node = available_nodes.front();
289 available_nodes.popFront();
290 auto& op = const_cast< ScheduleOperator& >(_schedule_->operation(node));
291
292 // if scheduling under memory budget, check that we do not consume too
293 // much memory.
294 std::pair< double, double > op_mem = op.memoryUsage();
295 if (max_memory != 0.0) {
296 if (memory_used + op_mem.first > max_memory) {
297 // here, we cannot execute the operation due to memory shortage
298 // so we should put the node to the unexecuted operations vectors.
299 // Hopefully, we will be able to execute them later on.
300 if (op_mem.second > 0) {
301 unexecuted_operations.push_back({op_mem.first, op_mem.second, node});
302 unexecuted_operations_sorted = false;
303 } else {
304 unexecuted_deletions.push_back({op_mem.first, op_mem.second, node});
305 unexecuted_deletions_sorted = false;
306 }
307
308 continue;
309 }
310 }
311
312 // here, we remain under the memory limit, if any, so we should try to
313 // execute the operation
314 _simulateExecuteOneOperation_(node, op, dag, available_nodes, new_available_nodes);
315 if (memory_used + op_mem.first > max_memory_used)
316 max_memory_used = memory_used + op_mem.first;
317 memory_used += op_mem.second;
318 }
319
320 // if scheduling under memory limitations, try to see if we can execute
321 // some operations that were previously unexecutable
322 if (!unexecuted_operations.empty() || !unexecuted_deletions.empty()) {
323 const Size nb_new_ops = const_cast< SchedulerSequential* >(this)->_addExecutableOps_(
324 unexecuted_deletions,
325 unexecuted_operations,
326 unexecuted_deletions_sorted,
327 unexecuted_operations_sorted,
328 memory_used,
329 max_memory,
330 available_nodes);
331
332 if (nb_new_ops == 0) {
333 // here, there exists no more operation that can be executed given the
334 // current memory constraints. But there may remain some unexecuted
335 // operations. In this case, try to increment the memory budget.
336 if (!unexecuted_deletions.empty())
337 max_memory = memory_used + unexecuted_deletions[0].max_memory_usage;
338 else if (!unexecuted_operations.empty())
339 max_memory = memory_used + unexecuted_operations[0].max_memory_usage;
340
341 const_cast< SchedulerSequential* >(this)->_addExecutableOps_(unexecuted_deletions,
342 unexecuted_operations,
343 unexecuted_deletions_sorted,
344 unexecuted_operations_sorted,
345 memory_used,
346 max_memory,
347 available_nodes);
348 }
349 } else {
350 schedule_fully_performed = true;
351 }
352 } while (!schedule_fully_performed);
353
354 _memory_usage_ = {max_memory_used, memory_used};
355 _operations_up_to_date_ = true;
356 }
357
358} /* namespace gum */
359
360#endif /* DOXYGEN_SHOULD_SKIP_THIS */
SchedulerSequential(Size max_nb_threads=0, double max_megabyte_memory=0.0)
default constructor
The common interface of all the schedulers.
Definition scheduler.h:71
std::size_t Size
In aGrUM, hashed values are unsigned long int.
Definition types.h:74
gum is the global namespace for all aGrUM entities
Definition agrum.h:46
STL namespace.
a scheduler that executes any available operation (chosen arbitrarily)
a scheduler that executes any available operation (chosen aribtrarily)