HILA
Loading...
Searching...
No Matches
reductionvector.h
1#ifndef ReductionVector_H_
2#define ReductionVector_H_
3
4#include "hila.h"
5
6
7//////////////////////////////////////////////////////////////////////////////////
8/// Special reduction class for arrays: declare a reduction array as
9/// ReductionVector<T> a(size);
10///
11/// This can be used within site loops as
12/// onsites(ALL ) {
13/// int i = ...
14/// a[i] += ...
15/// }
16///
17/// or outside site loops as usual.
18/// Reductions: += *= sum or product reduction
19/// NOTE: size of the reduction vector must be same on all nodes!
20/// By default reduction is "allreduce", i.e. all nodes get the same result.
21///
22/// Reduction can be customized by using methods:
23/// a.size() : return the size of the array
24/// a.resize(new_size) : change size of array (all nodes must use same size!)
25/// a[i] : get/set element i
26///
27/// a.allreduce(bool) : turn allreduce on/off (default=true)
28/// a.nonblocking(bool) : turn non-blocking reduction on/off
29/// a.delayed(bool) : turn delayed reduction on/off
30///
31/// a.wait() : for non-blocking reduction, wait() has to be called
32/// after the loop to complete the reduction
33/// a.reduce() : for delayed reduction starts/completes the reduction
34///
35/// a.is_allreduce() : queries the reduction status
36/// is_nonblocking()
37/// is_delayed()
38///
39/// a.push_back(element) : add one element to
40/// The same reduction variable can be used again
41///
42
43template <typename T>
45
46 private:
47 std::vector<T> val;
48
49 /// comm_is_on is true if MPI communications are under way.
50 bool comm_is_on = false;
51
52 /// status variables of reduction
53 bool is_allreduce_ = true;
54 bool is_nonblocking_ = false;
55 bool is_delayed_ = false;
56
57 bool delay_is_on = false; // status of the delayed reduction
58 bool is_delayed_sum = true; // sum/product
59
60 MPI_Request request;
61
62 void reduce_operation(MPI_Op operation) {
63
64 // if for some reason reduction is going on unfinished, wait.
65 wait();
66
67 if (is_nonblocking_)
68 comm_is_on = true;
69
70 MPI_Datatype dtype;
71 dtype = get_MPI_number_type<T>();
72
73 if (dtype == MPI_BYTE) {
74 assert(sizeof(T) < 0 && "Unknown number_type in vector reduction");
75 }
76
77 reduction_timer.start();
78 if (is_allreduce_) {
79 if (is_nonblocking_) {
80 MPI_Iallreduce(MPI_IN_PLACE, (void *)val.data(),
81 sizeof(T) * val.size() / sizeof(hila::arithmetic_type<T>),
82 dtype, operation, lattice.mpi_comm_lat, &request);
83 } else {
84 MPI_Allreduce(MPI_IN_PLACE, (void *)val.data(),
85 sizeof(T) * val.size() / sizeof(hila::arithmetic_type<T>),
86 dtype, operation, lattice.mpi_comm_lat);
87 }
88 } else {
89 if (hila::myrank() == 0) {
90 if (is_nonblocking_) {
91 MPI_Ireduce(MPI_IN_PLACE, (void *)val.data(),
92 sizeof(T) * val.size() / sizeof(hila::arithmetic_type<T>),
93 dtype, operation, 0, lattice.mpi_comm_lat, &request);
94 } else {
95 MPI_Reduce(MPI_IN_PLACE, (void *)val.data(),
96 sizeof(T) * val.size() / sizeof(hila::arithmetic_type<T>),
97 dtype, operation, 0, lattice.mpi_comm_lat);
98 }
99 } else {
100 if (is_nonblocking_) {
101 MPI_Ireduce((void *)val.data(), (void *)val.data(),
102 sizeof(T) * val.size() / sizeof(hila::arithmetic_type<T>),
103 dtype, operation, 0, lattice.mpi_comm_lat, &request);
104 } else {
105 MPI_Reduce((void *)val.data(), (void *)val.data(),
106 sizeof(T) * val.size() / sizeof(hila::arithmetic_type<T>),
107 dtype, operation, 0, lattice.mpi_comm_lat);
108 }
109 }
110 }
111 reduction_timer.stop();
112 }
113
114 public:
115 // Define iterators using std::vector iterators
116 using iterator = typename std::vector<T>::iterator;
117 using const_iterator = typename std::vector<T>::const_iterator;
118
119 iterator begin() {
120 return val.begin();
121 }
122 iterator end() {
123 return val.end();
124 }
125 const_iterator begin() const {
126 return val.begin();
127 }
128 const_iterator end() const {
129 return val.end();
130 }
131
132 /// Initialize to zero by default (? exception to other variables)
133 /// allreduce = true by default
134 explicit ReductionVector() = default;
135 explicit ReductionVector(int size) : val(size, (T)0) {}
136 explicit ReductionVector(int size, const T &v) : val(size, v) {}
137
138 /// Destructor cleans up communications if they are in progress
140 if (comm_is_on) {
141 MPI_Cancel(&request);
142 }
143 }
144
145 /// And access operators - these do in practice everything already!
146 T &operator[](const int i) {
147 return val[i];
148 }
149
150 T operator[](const int i) const {
151 return val[i];
152 }
153
154 /// allreduce(bool) turns allreduce on or off. By default on.
155 ReductionVector &allreduce(bool b = true) {
156 is_allreduce_ = b;
157 return *this;
158 }
159 bool is_allreduce() {
160 return is_allreduce_;
161 }
162
163 /// nonblocking(bool) turns allreduce on or off. By default on.
164 ReductionVector &nonblocking(bool b = true) {
165 is_nonblocking_ = b;
166 return *this;
167 }
168 bool is_nonblocking() {
169 return is_nonblocking_;
170 }
171
172 /// deferred(bool) turns deferred on or off. By default turns on.
173 ReductionVector &delayed(bool b = true) {
174 is_delayed_ = b;
175 return *this;
176 }
177 bool is_delayed() {
178 return is_delayed_;
179 }
180
181 /// Assignment is used only outside site loops - wait for comms if needed
182 /// Make this return void, hard to imagine it is used for anything useful
183 template <typename S, std::enable_if_t<std::is_assignable<T &, S>::value, int> = 0>
184 void operator=(const S &rhs) {
185 for (auto &vp : val)
186 vp = rhs;
187 }
188
189 /// Assignment from 0
190 void operator=(std::nullptr_t np) {
191 for (auto &vp : val)
192 vp = 0;
193 }
194
195 // Don't even implement compound assignments
196
197 /// Init is to be called before every site loop
198 void init_sum() {
199 // if something is happening wait
200 wait();
201 if (hila::myrank() != 0 && !delay_is_on) {
202 for (auto &vp : val)
203 vp = 0;
204 }
205 }
206 /// Init is to be called before every site loop
208 wait();
209 if (hila::myrank() != 0 && !delay_is_on) {
210 for (auto &vp : val)
211 vp = 1;
212 }
213 }
214
215 /// Start sum reduction -- works only if the type T addition == element-wise
216 /// addition. This is true for all hila predefined data types
217 void reduce_sum() {
218
219 if (is_delayed_) {
220 if (delay_is_on && is_delayed_sum == false) {
221 assert(0 && "Cannot mix sum and product reductions!");
222 }
223 delay_is_on = true;
224 } else {
225 reduce_operation(MPI_SUM);
226 }
227 }
228
229 /// Product reduction -- currently works only for scalar data types.
230 /// For Complex, Matrix and Vector data product is not element-wise.
231 /// TODO: Array or std::array ?
232 /// TODO: implement using custom MPI ops (if needed)
234
235 static_assert(std::is_same<T, int>::value || std::is_same<T, long>::value ||
236 std::is_same<T, float>::value ||
237 std::is_same<T, double>::value ||
238 std::is_same<T, long double>::value,
239 "Type not implemented for product reduction");
240
241 if (is_delayed_) {
242 if (delay_is_on && is_delayed_sum == true) {
243 assert(0 && "Cannot mix sum and product reductions!");
244 }
245 delay_is_on = true;
246 } else {
247 reduce_operation(MPI_PROD);
248 }
249 }
250
251 /// Wait for MPI to complete, if it is currently going on
252 void wait() {
253
254 if (comm_is_on) {
255 reduction_wait_timer.start();
256 MPI_Status status;
257 MPI_Wait(&request, &status);
258 reduction_wait_timer.stop();
259 comm_is_on = false;
260 }
261 }
262
263 /// For delayed reduction, reduce starts or completes the reduction operation
265 if (delay_is_on) {
266 delay_is_on = false;
267
268 if (is_delayed_sum)
269 reduce_operation(MPI_SUM);
270 else
271 reduce_operation(MPI_PROD);
272 }
273 }
274
275 /// Complete non-blocking or delayed reduction
276 void reduce() {
277 start_reduce();
278 wait();
279 }
280
281 /// data() returns ptr to the raw storage
282 T *data() {
283 return val.data();
284 }
285
286 std::vector<T> vector() {
287 return val;
288 }
289
290 /// methods from std::vector:
291
292 size_t size() const {
293 return val.size();
294 }
295
296 void resize(size_t count) {
297 val.resize(count);
298 }
299 void resize(size_t count, const T &v) {
300 val.resize(count, v);
301 }
302
303 void clear() {
304 val.clear();
305 }
306
307 void push_back(const T &v) {
308 val.push_back(v);
309 }
310 void pop_back() {
311 val.pop_back();
312 }
313
314 T &front() {
315 return val.front();
316 }
317 T &back() {
318 return val.back();
319 }
320};
321
322
323#endif
T & operator[](const int i)
And access operators - these do in practice everything already!
size_t size() const
methods from std::vector:
ReductionVector()=default
ReductionVector & delayed(bool b=true)
deferred(bool) turns deferred on or off. By default turns on.
void start_reduce()
For delayed reduction, reduce starts or completes the reduction operation.
void reduce()
Complete non-blocking or delayed reduction.
void init_product()
Init is to be called before every site loop.
~ReductionVector()
Destructor cleans up communications if they are in progress.
void operator=(const S &rhs)
ReductionVector & nonblocking(bool b=true)
nonblocking(bool) turns allreduce on or off. By default on.
void operator=(std::nullptr_t np)
Assignment from 0.
void init_sum()
Init is to be called before every site loop.
ReductionVector & allreduce(bool b=true)
allreduce(bool) turns allreduce on or off. By default on.
void wait()
Wait for MPI to complete, if it is currently going on.
T * data()
data() returns ptr to the raw storage
int myrank()
rank of this node
Definition com_mpi.cpp:234