HILA
Loading...
Searching...
No Matches
reductionvector.h
1#ifndef ReductionVector_H_
2#define ReductionVector_H_
3
4#include "hila.h"
5
6
7//////////////////////////////////////////////////////////////////////////////////
8/// Special reduction class for arrays: declare a reduction array as
9/// ReductionVector<T> a(size);
10///
11/// This can be used within site loops as
12/// onsites(ALL ) {
13/// int i = ...
14/// a[i] += ...
15/// }
16///
17/// or outside site loops as usual.
18/// Reductions: += *= sum or product reduction
19/// NOTE: size of the reduction vector must be same on all nodes!
20/// By default reduction is "allreduce", i.e. all nodes get the same result.
21///
22/// Reduction can be customized by using methods:
23/// a.size() : return the size of the array
24/// a.resize(new_size) : change size of array (all nodes must use same size!)
25/// a[i] : get/set element i
26///
27/// a.allreduce(bool) : turn allreduce on/off (default=true)
28/// a.nonblocking(bool) : turn non-blocking reduction on/off
29/// a.delayed(bool) : turn delayed reduction on/off
30///
31/// a.wait() : for non-blocking reduction, wait() has to be called
32/// after the loop to complete the reduction
33/// a.reduce() : for delayed reduction starts/completes the reduction
34///
35/// a.is_allreduce() : queries the reduction status
36/// is_nonblocking()
37/// is_delayed()
38///
39/// a.push_back(element) : add one element to
40/// The same reduction variable can be used again
41///
42
43template <typename T>
45
46 private:
47 std::vector<T> val;
48
49 /// comm_is_on is true if MPI communications are under way.
50 bool comm_is_on = false;
51
52 /// status variables of reduction
53 bool is_allreduce_ = true;
54 bool is_nonblocking_ = false;
55 bool is_delayed_ = false;
56
57 bool delay_is_on = false; // status of the delayed reduction
58 bool is_delayed_sum = true; // sum/product
59
60 MPI_Request request;
61
62 void reduce_operation(MPI_Op operation) {
63
64 // if for some reason reduction is going on unfinished, wait.
65 wait();
66
67 if (is_nonblocking_)
68 comm_is_on = true;
69
70 MPI_Datatype dtype;
71 dtype = get_MPI_number_type<T>();
72
73 if (dtype == MPI_BYTE) {
74 assert(sizeof(T) < 0 && "Unknown number_type in vector reduction");
75 }
76
77 reduction_timer.start();
78 if (is_allreduce_) {
79 if (is_nonblocking_) {
80 MPI_Iallreduce(MPI_IN_PLACE, (void *)val.data(),
81 sizeof(T) * val.size() / sizeof(hila::arithmetic_type<T>),
82 dtype, operation, lattice.mpi_comm_lat, &request);
83 } else {
84 MPI_Allreduce(MPI_IN_PLACE, (void *)val.data(),
85 sizeof(T) * val.size() / sizeof(hila::arithmetic_type<T>),
86 dtype, operation, lattice.mpi_comm_lat);
87 }
88 } else {
89 if (hila::myrank() == 0) {
90 if (is_nonblocking_) {
91 MPI_Ireduce(MPI_IN_PLACE, (void *)val.data(),
92 sizeof(T) * val.size() / sizeof(hila::arithmetic_type<T>),
93 dtype, operation, 0, lattice.mpi_comm_lat, &request);
94 } else {
95 MPI_Reduce(MPI_IN_PLACE, (void *)val.data(),
96 sizeof(T) * val.size() / sizeof(hila::arithmetic_type<T>),
97 dtype, operation, 0, lattice.mpi_comm_lat);
98 }
99 } else {
100 if (is_nonblocking_) {
101 MPI_Ireduce((void *)val.data(), (void *)val.data(),
102 sizeof(T) * val.size() / sizeof(hila::arithmetic_type<T>),
103 dtype, operation, 0, lattice.mpi_comm_lat, &request);
104 } else {
105 MPI_Reduce((void *)val.data(), (void *)val.data(),
106 sizeof(T) * val.size() / sizeof(hila::arithmetic_type<T>),
107 dtype, operation, 0, lattice.mpi_comm_lat);
108 }
109 }
110 }
111 reduction_timer.stop();
112 }
113
114 public:
115 // Define iterators using std::vector iterators
116 using iterator = typename std::vector<T>::iterator;
117 using const_iterator = typename std::vector<T>::const_iterator;
118
119 iterator begin() {
120 return val.begin();
121 }
122 iterator end() {
123 return val.end();
124 }
125 const_iterator begin() const {
126 return val.begin();
127 }
128 const_iterator end() const {
129 return val.end();
130 }
131
132 /// Initialize to zero by default (? exception to other variables)
133 /// allreduce = true by default
134 explicit ReductionVector() = default;
135 explicit ReductionVector(int size) : val(size, (T)0) {}
136 explicit ReductionVector(int size, const T &v) : val(size, v) {}
137
138 /// Destructor cleans up communications if they are in progress
140 wait();
141 }
142
143 /// And access operators - these do in practice everything already!
144 T &operator[](const int i) {
145 return val[i];
146 }
147
148 T operator[](const int i) const {
149 return val[i];
150 }
151
152 /// allreduce(bool) turns allreduce on or off. By default on.
153 ReductionVector &allreduce(bool b = true) {
154 is_allreduce_ = b;
155 return *this;
156 }
157 bool is_allreduce() {
158 return is_allreduce_;
159 }
160
161 /// nonblocking(bool) turns allreduce on or off. By default on.
162 ReductionVector &nonblocking(bool b = true) {
163 is_nonblocking_ = b;
164 return *this;
165 }
166 bool is_nonblocking() {
167 return is_nonblocking_;
168 }
169
170 /// deferred(bool) turns deferred on or off. By default turns on.
171 ReductionVector &delayed(bool b = true) {
172 is_delayed_ = b;
173 return *this;
174 }
175 bool is_delayed() {
176 return is_delayed_;
177 }
178
179 /// Assignment is used only outside site loops - wait for comms if needed
180 /// Make this return void, hard to imagine it is used for anything useful
181 template <typename S, std::enable_if_t<std::is_assignable<T &, S>::value, int> = 0>
182 void operator=(const S &rhs) {
183 for (auto &vp : val)
184 vp = rhs;
185 }
186
187 /// Assignment from 0
188 void operator=(std::nullptr_t np) {
189 for (auto &vp : val)
190 vp = 0;
191 }
192
193 // Don't even implement compound assignments
194
195 /// Init is to be called before every site loop
196 void init_sum() {
197 // if something is happening wait
198 wait();
199 if (hila::myrank() != 0 && !delay_is_on) {
200 for (auto &vp : val)
201 vp = 0;
202 }
203 }
204 /// Init is to be called before every site loop
206 wait();
207 if (hila::myrank() != 0 && !delay_is_on) {
208 for (auto &vp : val)
209 vp = 1;
210 }
211 }
212
213 /// Start sum reduction -- works only if the type T addition == element-wise
214 /// addition. This is true for all hila predefined data types
215 void reduce_sum() {
216
217 if (is_delayed_) {
218 if (delay_is_on && is_delayed_sum == false) {
219 assert(0 && "Cannot mix sum and product reductions!");
220 }
221 delay_is_on = true;
222 } else {
223 reduce_operation(MPI_SUM);
224 }
225 }
226
227 /// Product reduction -- currently works only for scalar data types.
228 /// For Complex, Matrix and Vector data product is not element-wise.
229 /// TODO: Array or std::array ?
230 /// TODO: implement using custom MPI ops (if needed)
232
233 static_assert(std::is_same<T, int>::value || std::is_same<T, long>::value ||
234 std::is_same<T, float>::value ||
235 std::is_same<T, double>::value ||
236 std::is_same<T, long double>::value,
237 "Type not implemented for product reduction");
238
239 if (is_delayed_) {
240 if (delay_is_on && is_delayed_sum == true) {
241 assert(0 && "Cannot mix sum and product reductions!");
242 }
243 delay_is_on = true;
244 } else {
245 reduce_operation(MPI_PROD);
246 }
247 }
248
249 /// Wait for MPI to complete, if it is currently going on
250 void wait() {
251
252 if (comm_is_on) {
253 reduction_wait_timer.start();
254 MPI_Status status;
255 MPI_Wait(&request, &status);
256 reduction_wait_timer.stop();
257 comm_is_on = false;
258 }
259 }
260
261 /// For delayed reduction, reduce starts or completes the reduction operation
263 if (delay_is_on) {
264 delay_is_on = false;
265
266 if (is_delayed_sum)
267 reduce_operation(MPI_SUM);
268 else
269 reduce_operation(MPI_PROD);
270 }
271 }
272
273 /// Complete non-blocking or delayed reduction
274 void reduce() {
275 start_reduce();
276 wait();
277 }
278
279 /// data() returns ptr to the raw storage
280 T *data() {
281 return val.data();
282 }
283
284 std::vector<T> vector() {
285 return val;
286 }
287
288 /// methods from std::vector:
289
290 size_t size() const {
291 return val.size();
292 }
293
294 void resize(size_t count) {
295 val.resize(count);
296 }
297 void resize(size_t count, const T &v) {
298 val.resize(count, v);
299 }
300
301 void clear() {
302 val.clear();
303 }
304
305 void push_back(const T &v) {
306 val.push_back(v);
307 }
308 void pop_back() {
309 val.pop_back();
310 }
311
312 T &front() {
313 return val.front();
314 }
315 T &back() {
316 return val.back();
317 }
318};
319
320
321#endif
T & operator[](const int i)
And access operators - these do in practice everything already!
size_t size() const
methods from std::vector:
ReductionVector()=default
ReductionVector & delayed(bool b=true)
deferred(bool) turns deferred on or off. By default turns on.
void start_reduce()
For delayed reduction, reduce starts or completes the reduction operation.
void reduce()
Complete non-blocking or delayed reduction.
void init_product()
Init is to be called before every site loop.
~ReductionVector()
Destructor cleans up communications if they are in progress.
void operator=(const S &rhs)
ReductionVector & nonblocking(bool b=true)
nonblocking(bool) turns allreduce on or off. By default on.
void operator=(std::nullptr_t np)
Assignment from 0.
void init_sum()
Init is to be called before every site loop.
ReductionVector & allreduce(bool b=true)
allreduce(bool) turns allreduce on or off. By default on.
void wait()
Wait for MPI to complete, if it is currently going on.
T * data()
data() returns ptr to the raw storage
int myrank()
rank of this node
Definition com_mpi.cpp:234