forked from snakster/cpp.react
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathTopoQueue.h
More file actions
328 lines (254 loc) · 8.74 KB
/
TopoQueue.h
File metadata and controls
328 lines (254 loc) · 8.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
// Copyright Sebastian Jeckel 2014.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
#ifndef REACT_COMMON_TOPOQUEUE_H_INCLUDED
#define REACT_COMMON_TOPOQUEUE_H_INCLUDED
#pragma once
#include "react/detail/Defs.h"
#include <algorithm>
#include <array>
#include <limits>
#include <utility>
#include <vector>
#include "tbb/enumerable_thread_specific.h"
#include "tbb/tbb_stddef.h"
/***************************************/ REACT_IMPL_BEGIN /**************************************/
///////////////////////////////////////////////////////////////////////////////////////////////////
/// TopoQueue - Sequential
///////////////////////////////////////////////////////////////////////////////////////////////////
template <typename T, typename TLevelFunc>
class TopoQueue
{
private:
struct Entry;
public:
// Store the level as part of the entry for cheap comparisons
using QueueDataT = std::vector<Entry>;
using NextDataT = std::vector<T>;
TopoQueue() = default;
TopoQueue(const TopoQueue&) = default;
template <typename FIn>
TopoQueue(FIn&& levelFunc) :
levelFunc_( std::forward<FIn>(levelFunc) )
{}
void Push(const T& value)
{
queueData_.emplace_back(value, levelFunc_(value));
}
bool FetchNext()
{
// Throw away previous values
nextData_.clear();
// Find min level of nodes in queue data
minLevel_ = (std::numeric_limits<int>::max)();
for (const auto& e : queueData_)
if (minLevel_ > e.Level)
minLevel_ = e.Level;
// Swap entries with min level to the end
auto p = std::partition(
queueData_.begin(),
queueData_.end(),
LevelCompFunctor{ minLevel_ });
// Reserve once to avoid multiple re-allocations
nextData_.reserve(std::distance(p, queueData_.end()));
// Move min level values to next data
for (auto it = p; it != queueData_.end(); ++it)
nextData_.push_back(std::move(it->Value));
// Truncate moved entries
queueData_.resize(std::distance(queueData_.begin(), p));
return !nextData_.empty();
}
const NextDataT& NextValues() const { return nextData_; }
private:
struct Entry
{
Entry() = default;
Entry(const Entry&) = default;
Entry(const T& value, int level) : Value( value ), Level( level ) {}
T Value;
int Level;
};
struct LevelCompFunctor
{
LevelCompFunctor(int level) : Level( level ) {}
bool operator()(const Entry& e) const { return e.Level != Level; }
const int Level;
};
NextDataT nextData_;
QueueDataT queueData_;
TLevelFunc levelFunc_;
int minLevel_ = (std::numeric_limits<int>::max)();
};
///////////////////////////////////////////////////////////////////////////////////////////////////
/// WeightedRange - Implements tbb range concept
///////////////////////////////////////////////////////////////////////////////////////////////////
template
<
typename TIt,
uint grain_size
>
class WeightedRange
{
public:
using const_iterator = TIt;
using ValueT = typename TIt::value_type;
WeightedRange() = default;
WeightedRange(const WeightedRange&) = default;
WeightedRange(const TIt& a, const TIt& b, uint weight) :
begin_( a ),
end_( b ),
weight_( weight )
{}
WeightedRange(WeightedRange& source, tbb::split)
{
uint sum = 0;
TIt p = source.begin_;
while (p != source.end_)
{
// Note: assuming a pair with weight as second until more flexibility is needed
sum += p->second;
++p;
if (sum >= grain_size)
break;
}
// New [p,b)
begin_ = p;
end_ = source.end_;
weight_ = source.weight_ - sum;
// Source [a,p)
source.end_ = p;
source.weight_ = sum;
}
// tbb range interface
bool empty() const { return !(Size() > 0); }
bool is_divisible() const { return weight_ > grain_size && Size() > 1; }
// iteration interface
const_iterator begin() const { return begin_; }
const_iterator end() const { return end_; }
size_t Size() const { return end_ - begin_; }
uint Weight() const { return weight_; }
private:
TIt begin_;
TIt end_;
uint weight_ = 0;
};
///////////////////////////////////////////////////////////////////////////////////////////////////
/// ConcurrentTopoQueue
/// Usage based on two phases:
/// 1. Multiple threads push nodes to the queue concurrently.
/// 2. FetchNext() prepares all nodes of the next level in NextNodes().
/// The previous contents of NextNodes() are automatically cleared.
///////////////////////////////////////////////////////////////////////////////////////////////////
template
< typename T,
uint grain_size,
typename TLevelFunc,
typename TWeightFunc
>
class ConcurrentTopoQueue
{
private:
struct Entry;
public:
using QueueDataT = std::vector<Entry>;
using NextDataT = std::vector<std::pair<T,uint>>;
using NextRangeT = WeightedRange<typename NextDataT::const_iterator, grain_size>;
ConcurrentTopoQueue() = default;
ConcurrentTopoQueue(const ConcurrentTopoQueue&) = default;
template <typename FIn1, typename FIn2>
ConcurrentTopoQueue(FIn1&& levelFunc, FIn2&& weightFunc) :
levelFunc_( std::forward<FIn1>(levelFunc) ),
weightFunc_( std::forward<FIn2>(weightFunc) )
{}
void Push(const T& value)
{
auto& t = collectBuffer_.local();
auto level = levelFunc_(value);
auto weight = weightFunc_(value);
t.Data.emplace_back(value,level,weight);
t.Weight += weight;
if (t.MinLevel > level)
t.MinLevel = level;
}
bool FetchNext()
{
nextData_.clear();
uint totalWeight = 0;
// Determine current min level
minLevel_ = (std::numeric_limits<int>::max)();
for (const auto& buf : collectBuffer_)
if (minLevel_ > buf.MinLevel)
minLevel_ = buf.MinLevel;
// For each thread local buffer...
for (auto& buf : collectBuffer_)
{
auto& v = buf.Data;
// Swap min level nodes to end of v
auto p = std::partition(
v.begin(),
v.end(),
LevelCompFunctor{ minLevel_ });
// Reserve once to avoid multiple re-allocations
nextData_.reserve(std::distance(p, v.end()));
// Move min level values to global next data
for (auto it = p; it != v.end(); ++it)
nextData_.emplace_back(std::move(it->Value), it->Weight);
// Truncate remaining
v.resize(std::distance(v.begin(), p));
// Calc new min level and weight for this buffer
buf.MinLevel = (std::numeric_limits<int>::max)();
int oldWeight = buf.Weight;
buf.Weight = 0;
for (const auto& x : v)
{
buf.Weight += x.Weight;
if (buf.MinLevel > x.Level)
buf.MinLevel = x.Level;
}
// Add diff to nodes_ weight
totalWeight += oldWeight - buf.Weight;
}
nextRange_ = NextRangeT{ nextData_.begin(), nextData_.end(), totalWeight };
// Found more nodes?
return !nextData_.empty();
}
const NextRangeT& NextRange() const
{
return nextRange_;
}
private:
struct Entry
{
Entry() = default;
Entry(const Entry&) = default;
Entry(const T& value, int level, uint weight) :
Value( value ),
Level( level ),
Weight( weight )
{}
T Value;
int Level;
uint Weight;
};
struct LevelCompFunctor
{
LevelCompFunctor(int level) : Level{ level } {}
bool operator()(const Entry& e) const { return e.Level != Level; }
const int Level;
};
struct ThreadLocalBuffer
{
QueueDataT Data;
int MinLevel = (std::numeric_limits<int>::max)();
uint Weight = 0;
};
int minLevel_ = (std::numeric_limits<int>::max)();
NextDataT nextData_;
NextRangeT nextRange_;
TLevelFunc levelFunc_;
TWeightFunc weightFunc_;
tbb::enumerable_thread_specific<ThreadLocalBuffer> collectBuffer_;
};
/****************************************/ REACT_IMPL_END /***************************************/
#endif // REACT_COMMON_TOPOQUEUE_H_INCLUDED