aGrUM 2.3.2
a C++ library for (probabilistic) graphical models
threadExecutorSTL_tpl.h
Go to the documentation of this file.
1/****************************************************************************
2 * This file is part of the aGrUM/pyAgrum library. *
3 * *
4 * Copyright (c) 2005-2025 by *
5 * - Pierre-Henri WUILLEMIN(_at_LIP6) *
6 * - Christophe GONZALES(_at_AMU) *
7 * *
8 * The aGrUM/pyAgrum library is free software; you can redistribute it *
9 * and/or modify it under the terms of either : *
10 * *
11 * - the GNU Lesser General Public License as published by *
12 * the Free Software Foundation, either version 3 of the License, *
13 * or (at your option) any later version, *
14 * - the MIT license (MIT), *
15 * - or both in dual license, as here. *
16 * *
17 * (see https://agrum.gitlab.io/articles/dual-licenses-lgplv3mit.html) *
18 * *
19 * This aGrUM/pyAgrum library is distributed in the hope that it will be *
20 * useful, but WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, *
21 * INCLUDING BUT NOT LIMITED TO THE WARRANTIES MERCHANTABILITY or FITNESS *
22 * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE *
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER *
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, *
25 * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR *
26 * OTHER DEALINGS IN THE SOFTWARE. *
27 * *
28 * See LICENCES for more details. *
29 * *
30 * SPDX-FileCopyrightText: Copyright 2005-2025 *
31 * - Pierre-Henri WUILLEMIN(_at_LIP6) *
32 * - Christophe GONZALES(_at_AMU) *
33 * SPDX-License-Identifier: LGPL-3.0-or-later OR MIT *
34 * *
35 * Contact : info_at_agrum_dot_org *
36 * homepage : http://agrum.gitlab.io *
37 * gitlab : https://gitlab.com/agrumery/agrum *
38 * *
39 ****************************************************************************/
40#pragma once
41
42
48#include <algorithm>
49#include <exception>
50#include <functional>
51#include <thread>
52#include <tuple>
53#include <vector>
54
56
57#ifndef DOXYGEN_SHOULD_SKIP_THIS
58
59
60namespace gum {
61
62 namespace threadsSTL {
63
65 template < typename FUNCTION, typename... ARGS >
66 void ThreadExecutor::execute(std::size_t nb_threads, FUNCTION exec_func, ARGS&&... func_args) {
67 if (nb_threads <= 1) {
68 exec_func(0, 1, std::forward< ARGS >(func_args)...);
69 } else {
70 // indicate that we start a new threadExecutor
72
73 // here, we shall create the threads, but also one std::exception_ptr
74 // for each thread. This will allow us to catch the exception raised
75 // by the threads
76 std::vector< std::thread > threads;
77 threads.reserve(nb_threads);
78 std::vector< std::exception_ptr > func_exceptions(nb_threads, nullptr);
79
80 // create a lambda that will execute exec_func while catching its exceptions
81 auto real_exec_func = [&exec_func, nb_threads](std::size_t this_thread,
82 std::exception_ptr& exc,
83 ARGS&... args) -> void {
84 try {
85 exec_func(this_thread, nb_threads, std::forward< ARGS >(args)...);
86 } catch (...) { exc = std::current_exception(); }
87 };
88
89 // launch the threads
90 for (std::size_t i = std::size_t(0); i < nb_threads; ++i) {
91 threads.push_back(
92 std::thread(real_exec_func, i, std::ref(func_exceptions[i]), std::ref(func_args)...));
93 // std::ref(std::forward< ARGS >(func_args))...));
94 }
95
96 // wait for the threads to complete their executions
97 std::for_each(threads.begin(), threads.end(), std::mem_fn(&std::thread::join));
98
99 // now, we have completed the execution of the ThreadExecutor
101
102 // now, check if one exception has been raised
103 for (const auto& exc: func_exceptions) {
104 if (exc != nullptr) { std::rethrow_exception(exc); }
105 }
106 }
107 }
108
110 template < typename FUNC1, typename FUNC2, typename... ARGS >
111 void ThreadExecutor::executeOrUndo(std::size_t nb_threads,
112 FUNC1 exec_func,
113 FUNC2 undo_func,
114 ARGS&&... func_args) {
115 if (nb_threads <= 1) {
116 try {
117 exec_func(0, 1, std::forward< ARGS >(func_args)...);
118 } catch (...) {
119 undo_func(0, 1, std::forward< ARGS >(func_args)...);
120 throw;
121 }
122 } else {
123 // indicate that we start a new threadExecutor
125
126 // here, we shall create the threads, but also one std::exception_ptr
127 // for each thread. This will allow us to catch the exception raised
128 // by the threads
129 std::vector< std::thread > threads;
130 threads.reserve(nb_threads);
131 std::vector< std::exception_ptr > func_exceptions(nb_threads, nullptr);
132
133 // create a lambda that will execute exec_func while catching its exceptions
134 auto real_exec_func = [&exec_func, nb_threads](std::size_t this_thread,
135 std::exception_ptr& exc,
136 ARGS&... args) -> void {
137 try {
138 exec_func(this_thread, nb_threads, std::forward< ARGS >(args)...);
139 } catch (...) { exc = std::current_exception(); }
140 };
141
142
143 // launch the threads
144 for (std::size_t i = std::size_t(0); i < nb_threads; ++i) {
145 threads.push_back(
146 std::thread(real_exec_func, i, std::ref(func_exceptions[i]), std::ref(func_args)...));
147 // std::ref(std::forward< ARGS >(func_args))...));
148 }
149
150 // wait for the threads to complete their executions
151 std::for_each(threads.begin(), threads.end(), std::mem_fn(&std::thread::join));
152
153 // now, check if one exception has been raised
154 bool exception_raised = false;
155 for (const auto& exc: func_exceptions) {
156 if (exc != nullptr) {
157 exception_raised = true;
158 break;
159 }
160 }
161
162
163 if (exception_raised) {
164 // create a lambda that will execute undo_func while catching
165 // its exceptions
166 auto real_undo_func = [&undo_func, nb_threads](std::size_t this_thread,
167 std::exception_ptr& exc,
168 ARGS&... args) -> void {
169 try {
170 undo_func(this_thread, nb_threads, std::forward< ARGS >(args)...);
171 } catch (...) { exc = std::current_exception(); }
172 };
173
174 // launch the repair threads
175 threads.clear();
176 std::vector< std::exception_ptr > undo_func_exceptions(nb_threads, nullptr);
177 for (std::size_t i = std::size_t(0); i < nb_threads; ++i) {
178 // we just need to repair the threads that did not raise exceptions
179 if (func_exceptions[i] == nullptr)
180 threads.push_back(std::thread(real_undo_func,
181 i,
182 std::ref(undo_func_exceptions[i]),
183 std::ref(func_args)...));
184 // std::ref(std::forward< ARGS >(func_args))...));
185 }
186
187 // wait for the threads to complete their executions
188 std::for_each(threads.begin(), threads.end(), std::mem_fn(&std::thread::join));
189
190 // now, we have completed the execution of the ThreadExecutor
192
193 // rethrow the exception
194 for (const auto& exc: func_exceptions) {
195 if (exc != nullptr) { std::rethrow_exception(exc); }
196 }
197 } else {
198 // now, we have completed the execution of the ThreadExecutor
200 }
201 }
202 }
203
204
205 } /* namespace threadsSTL */
206
207} /* namespace gum */
208
209#endif /* DOXYGEN_SHOULD_SKIP_THIS */
static std::atomic< int > nbRunningThreadsExecutors_
he number of currently running ThreadExecutors
gum is the global namespace for all aGrUM entities
Definition agrum.h:46
static void execute(std::size_t nb_threads, FUNCTION exec_func, ARGS &&... func_args)
executes a function using several threads
static void executeOrUndo(std::size_t nb_threads, FUNC1 exec_func, FUNC2 undo_func, ARGS &&... func_args)
executes in parallel a function and undoes it if execptions are raised
A class to execute several threads by exploiting std::thread.