Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
task_stream_extended.h
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2019 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 
16 
17 
18 
19 */
20 
21 #ifndef _TBB_task_stream_extended_H
22 #define _TBB_task_stream_extended_H
23 
30 
31 
32 #if _TBB_task_stream_H
33 #error Either task_stream.h or this file can be included at the same time.
34 #endif
35 
36 #if !__TBB_CPF_BUILD
37 #error This code bears a preview status until it proves its usefulness/peformance suitability.
38 #endif
39 
40 #include "tbb/tbb_stddef.h"
41 #include <deque>
42 #include <climits>
43 #include "tbb/atomic.h" // for __TBB_Atomic*
44 #include "tbb/spin_mutex.h"
45 #include "tbb/tbb_allocator.h"
46 #include "scheduler_common.h"
47 #include "tbb_misc.h" // for FastRandom
48 
49 namespace tbb {
50 namespace internal {
51 
53 
55 template< typename T, typename mutex_t >
56 struct queue_and_mutex {
57  typedef std::deque< T, tbb_allocator<T> > queue_base_t;
58 
61 
64 };
65 
66 typedef uintptr_t population_t;
67 const population_t one = 1;
68 
69 inline void set_one_bit( population_t& dest, int pos ) {
70  __TBB_ASSERT( pos>=0, NULL );
71  __TBB_ASSERT( pos<int(sizeof(population_t)*CHAR_BIT), NULL );
72  __TBB_AtomicOR( &dest, one<<pos );
73 }
74 
75 inline void clear_one_bit( population_t& dest, int pos ) {
76  __TBB_ASSERT( pos>=0, NULL );
77  __TBB_ASSERT( pos<int(sizeof(population_t)*CHAR_BIT), NULL );
78  __TBB_AtomicAND( &dest, ~(one<<pos) );
79 }
80 
81 inline bool is_bit_set( population_t val, int pos ) {
82  __TBB_ASSERT( pos>=0, NULL );
83  __TBB_ASSERT( pos<int(sizeof(population_t)*CHAR_BIT), NULL );
84  return (val & (one<<pos)) != 0;
85 }
86 
88 #if __INTEL_COMPILER == 1110 || __INTEL_COMPILER == 1500
89  no_assign
90 #else
91  no_copy
92 #endif
93 {
94  random_lane_selector( FastRandom& random ) : my_random( random ) {}
95  unsigned operator()( unsigned out_of ) const {
96  __TBB_ASSERT( ((out_of-1) & out_of) == 0, "number of lanes is not power of two." );
97  return my_random.get() & (out_of-1);
98  }
99 private:
101 };
102 
104 #if __INTEL_COMPILER == 1110 || __INTEL_COMPILER == 1500
105  no_assign
106 #else
107  no_copy
108 #endif
109 {
110  unsigned& my_previous;
111  lane_selector_base( unsigned& previous ) : my_previous( previous ) {}
112 };
113 
115  subsequent_lane_selector( unsigned& previous ) : lane_selector_base( previous ) {}
116  unsigned operator()( unsigned out_of ) const {
117  __TBB_ASSERT( ((out_of-1) & out_of) == 0, "number of lanes is not power of two." );
118  return (++my_previous &= out_of-1);
119  }
120 };
121 
123  preceding_lane_selector( unsigned& previous ) : lane_selector_base( previous ) {}
124  unsigned operator()( unsigned out_of ) const {
125  __TBB_ASSERT( ((out_of-1) & out_of) == 0, "number of lanes is not power of two." );
126  return (--my_previous &= (out_of-1));
127  }
128 };
129 
131 protected:
133 };
134 
136 
139 template<task_stream_accessor_type accessor>
141 protected:
144  task* result = queue.front();
145  queue.pop_front();
146  return result;
147  }
148 };
149 
150 template<>
152 protected:
154  task* result = NULL;
155  do {
156  result = queue.back();
157  queue.pop_back();
158  } while( !result && !queue.empty() );
159  return result;
160  }
161 };
162 
164 template<int Levels, task_stream_accessor_type accessor>
165 class task_stream : public task_stream_accessor< accessor > {
167  population_t population[Levels];
168  padded<lane_t>* lanes[Levels];
169  unsigned N;
170 
171 public:
172  task_stream() : N() {
173  for(int level = 0; level < Levels; level++) {
174  population[level] = 0;
175  lanes[level] = NULL;
176  }
177  }
178 
179  void initialize( unsigned n_lanes ) {
180  const unsigned max_lanes = sizeof(population_t) * CHAR_BIT;
181 
182  N = n_lanes>=max_lanes ? max_lanes : n_lanes>2 ? 1<<(__TBB_Log2(n_lanes-1)+1) : 2;
183  __TBB_ASSERT( N==max_lanes || N>=n_lanes && ((N-1)&N)==0, "number of lanes miscalculated");
184  __TBB_ASSERT( N <= sizeof(population_t) * CHAR_BIT, NULL );
185  for(int level = 0; level < Levels; level++) {
186  lanes[level] = new padded<lane_t>[N];
187  __TBB_ASSERT( !population[level], NULL );
188  }
189  }
190 
192  for(int level = 0; level < Levels; level++)
193  if (lanes[level]) delete[] lanes[level];
194  }
195 
197  bool try_push( task* source, int level, unsigned lane_idx ) {
198  __TBB_ASSERT( 0 <= level && level < Levels, "Incorrect lane level specified." );
200  if( lock.try_acquire( lanes[level][lane_idx].my_mutex ) ) {
201  lanes[level][lane_idx].my_queue.push_back( source );
202  set_one_bit( population[level], lane_idx ); // TODO: avoid atomic op if the bit is already set
203  return true;
204  }
205  return false;
206  }
207 
209  template<typename lane_selector_t>
210  void push( task* source, int level, const lane_selector_t& next_lane ) {
211  bool succeed = false;
212  unsigned lane = 0;
213  do {
214  lane = next_lane( /*out_of=*/N );
215  __TBB_ASSERT( lane < N, "Incorrect lane index." );
216  } while( ! (succeed = try_push( source, level, lane )) );
217  }
218 
220  task* try_pop( int level, unsigned lane_idx ) {
221  __TBB_ASSERT( 0 <= level && level < Levels, "Incorrect lane level specified." );
222  if( !is_bit_set( population[level], lane_idx ) )
223  return NULL;
224  task* result = NULL;
225  lane_t& lane = lanes[level][lane_idx];
227  if( lock.try_acquire( lane.my_mutex ) && !lane.my_queue.empty() ) {
228  result = this->get_item( lane.my_queue );
229  if( lane.my_queue.empty() )
230  clear_one_bit( population[level], lane_idx );
231  }
232  return result;
233  }
234 
237  template<typename lane_selector_t>
238  task* pop( int level, const lane_selector_t& next_lane ) {
239  task* popped = NULL;
240  unsigned lane = 0;
241  do {
242  lane = next_lane( /*out_of=*/N );
243  __TBB_ASSERT( lane < N, "Incorrect lane index." );
244  } while( !empty( level ) && !(popped = try_pop( level, lane )) );
245  return popped;
246  }
247 
248  // TODO: unify '*_specific' logic with 'pop' methods above
250  __TBB_ASSERT( !queue.empty(), NULL );
251  // TODO: add a worst-case performance test and consider an alternative container with better
252  // performance for isolation search.
253  typename lane_t::queue_base_t::iterator curr = queue.end();
254  do {
255  // TODO: consider logic from get_task to simplify the code.
256  task* result = *--curr;
257  if( result __TBB_ISOLATION_EXPR( && result->prefix().isolation == isolation ) ) {
258  if( queue.end() - curr == 1 )
259  queue.pop_back(); // a little of housekeeping along the way
260  else
261  *curr = 0; // grabbing task with the same isolation
262  // TODO: move one of the container's ends instead if the task has been found there
263  return result;
264  }
265  } while( curr != queue.begin() );
266  return NULL;
267  }
268 
270  task* pop_specific( int level, __TBB_ISOLATION_ARG(unsigned& last_used_lane, isolation_tag isolation) ) {
271  task* result = NULL;
272  // Lane selection is round-robin in backward direction.
273  unsigned idx = last_used_lane & (N-1);
274  do {
275  if( is_bit_set( population[level], idx ) ) {
276  lane_t& lane = lanes[level][idx];
278  if( lock.try_acquire(lane.my_mutex) && !lane.my_queue.empty() ) {
279  result = look_specific( __TBB_ISOLATION_ARG(lane.my_queue, isolation) );
280  if( lane.my_queue.empty() )
281  clear_one_bit( population[level], idx );
282  if( result )
283  break;
284  }
285  }
286  idx=(idx-1)&(N-1);
287  } while( !empty(level) && idx != last_used_lane );
288  last_used_lane = idx;
289  return result;
290  }
291 
293  bool empty(int level) {
294  return !population[level];
295  }
296 
298 
300  intptr_t drain() {
301  intptr_t result = 0;
302  for(int level = 0; level < Levels; level++)
303  for(unsigned i=0; i<N; ++i) {
304  lane_t& lane = lanes[level][i];
306  for(typename lane_t::queue_base_t::iterator it=lane.my_queue.begin();
307  it!=lane.my_queue.end(); ++it, ++result)
308  {
309  __TBB_ASSERT( is_bit_set( population[level], i ), NULL );
310  task* t = *it;
311  tbb::task::destroy(*t);
312  }
313  lane.my_queue.clear();
314  clear_one_bit( population[level], i );
315  }
316  return result;
317  }
318 }; // task_stream
319 
320 } // namespace internal
321 } // namespace tbb
322 
323 #endif /* _TBB_task_stream_extended_H */
intptr_t drain()
Destroys all remaining tasks in every lane. Returns the number of destroyed tasks.
The container for "fairness-oriented" aka "enqueued" tasks.
Definition: task_stream.h:73
void set_one_bit(population_t &dest, int pos)
Definition: task_stream.h:53
void initialize(unsigned n_lanes)
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:169
#define __TBB_ISOLATION_EXPR(isolation)
A fast random number generator.
Definition: tbb_misc.h:132
void push(task *source, int level, const lane_selector_t &next_lane)
Push a task into a lane. Lane selection is performed by passed functor.
task * pop_specific(int level, __TBB_ISOLATION_ARG(unsigned &last_used_lane, isolation_tag isolation))
Try finding and popping a related task.
Base class for types that should not be copied or assigned.
Definition: tbb_stddef.h:335
void __TBB_AtomicAND(volatile void *operand, uintptr_t addend)
Definition: tbb_machine.h:892
Base class for user-defined tasks.
Definition: task.h:592
padded< lane_t > * lanes[Levels]
Definition: task_stream.h:76
intptr_t isolation_tag
A tag for task isolation.
Definition: task.h:128
uintptr_t population_t
Definition: task_stream.h:50
Base class for types that should not be assigned.
Definition: tbb_stddef.h:324
#define __TBB_ISOLATION_ARG(arg1, isolation)
population_t population[Levels]
Definition: task_stream.h:75
intptr_t __TBB_Log2(uintptr_t x)
Definition: tbb_machine.h:864
unsigned operator()(unsigned out_of) const
bool empty(int level)
Checks existence of a task.
Definition: task_stream.h:142
task * get_item(lane_t::queue_base_t &queue)
const population_t one
Definition: task_stream.h:51
bool is_bit_set(population_t val, int pos)
Definition: task_stream.h:65
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
unsigned operator()(unsigned out_of) const
The graph class.
queue_and_mutex< task *, spin_mutex > lane_t
unsigned short get()
Get a random number.
Definition: tbb_misc.h:143
Represents acquisition of a mutex.
Definition: spin_mutex.h:54
void clear_one_bit(population_t &dest, int pos)
Definition: task_stream.h:59
Essentially, this is just a pair of a queue and a mutex to protect the queue.
Definition: task_stream.h:40
std::deque< T, tbb_allocator< T > > queue_base_t
task_stream_accessor< accessor >::lane_t lane_t
unsigned operator()(unsigned out_of) const
CRITICAL_SECTION mutex_t
internal::task_prefix & prefix(internal::version_tag *=NULL) const
Get reference to corresponding task_prefix.
Definition: task.h:941
bool try_push(task *source, int level, unsigned lane_idx)
Returns true on successful push, otherwise - false.
task * try_pop(int level, unsigned lane_idx)
Returns pointer to task on successful pop, otherwise - NULL.
task * pop(int level, const lane_selector_t &next_lane)
task * look_specific(__TBB_ISOLATION_ARG(task_stream_base::lane_t::queue_base_t &queue, isolation_tag isolation))
void __TBB_AtomicOR(volatile void *operand, uintptr_t addend)
Definition: tbb_machine.h:882
Pads type T to fill out to a multiple of cache line size.
Definition: tbb_stddef.h:265

Copyright © 2005-2019 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.