Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
_concurrent_queue_impl.h
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2019 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 
16 
17 
18 
19 */
20 
21 #ifndef __TBB__concurrent_queue_impl_H
22 #define __TBB__concurrent_queue_impl_H
23 
24 #ifndef __TBB_concurrent_queue_H
25 #error Do not #include this internal file directly; use public TBB headers instead.
26 #endif
27 
28 #include "../tbb_stddef.h"
29 #include "../tbb_machine.h"
30 #include "../atomic.h"
31 #include "../spin_mutex.h"
32 #include "../cache_aligned_allocator.h"
33 #include "../tbb_exception.h"
34 #include "../tbb_profiling.h"
35 #include <new>
36 #include __TBB_STD_SWAP_HEADER
37 #include <iterator>
38 
39 namespace tbb {
40 
41 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
42 
43 // forward declaration
44 namespace strict_ppl {
45 template<typename T, typename A> class concurrent_queue;
46 }
47 
48 template<typename T, typename A> class concurrent_bounded_queue;
49 
50 #endif
51 
53 namespace strict_ppl {
54 
56 namespace internal {
57 
58 using namespace tbb::internal;
59 
60 typedef size_t ticket;
61 
62 template<typename T> class micro_queue ;
63 template<typename T> class micro_queue_pop_finalizer ;
64 template<typename T> class concurrent_queue_base_v3;
65 template<typename T> struct concurrent_queue_rep;
66 
68 
72  template<typename T> friend class micro_queue;
73  template<typename T> friend class concurrent_queue_base_v3;
74 
75 protected:
77  static const size_t phi = 3;
78 
79 public:
80  // must be power of 2
81  static const size_t n_queue = 8;
82 
84  struct page {
86  uintptr_t mask;
87  };
88 
90  char pad1[NFS_MaxLineSize-sizeof(atomic<ticket>)];
92  char pad2[NFS_MaxLineSize-sizeof(atomic<ticket>)];
93 
96 
98  size_t item_size;
99 
102 
103  char pad3[NFS_MaxLineSize-sizeof(size_t)-sizeof(size_t)-sizeof(atomic<size_t>)];
104 } ;
105 
107  return uintptr_t(p)>1;
108 }
109 
111 
115 {
116  template<typename T> friend class micro_queue ;
117  template<typename T> friend class micro_queue_pop_finalizer ;
118 protected:
120 private:
121  virtual concurrent_queue_rep_base::page* allocate_page() = 0;
122  virtual void deallocate_page( concurrent_queue_rep_base::page* p ) = 0;
123 } ;
124 
125 #if _MSC_VER && !defined(__INTEL_COMPILER)
126 // unary minus operator applied to unsigned type, result still unsigned
127 #pragma warning( push )
128 #pragma warning( disable: 4146 )
129 #endif
130 
132 
134 template<typename T>
135 class micro_queue : no_copy {
136 public:
137  typedef void (*item_constructor_t)(T* location, const void* src);
138 private:
140 
144  public:
145  destroyer( T& value ) : my_value(value) {}
146  ~destroyer() {my_value.~T();}
147  };
148 
149  void copy_item( page& dst, size_t dindex, const void* src, item_constructor_t construct_item ) {
150  construct_item( &get_ref(dst, dindex), src );
151  }
152 
153  void copy_item( page& dst, size_t dindex, const page& src, size_t sindex,
154  item_constructor_t construct_item )
155  {
156  T& src_item = get_ref( const_cast<page&>(src), sindex );
157  construct_item( &get_ref(dst, dindex), static_cast<const void*>(&src_item) );
158  }
159 
160  void assign_and_destroy_item( void* dst, page& src, size_t index ) {
161  T& from = get_ref(src,index);
162  destroyer d(from);
163  *static_cast<T*>(dst) = tbb::internal::move( from );
164  }
165 
166  void spin_wait_until_my_turn( atomic<ticket>& counter, ticket k, concurrent_queue_rep_base& rb ) const ;
167 
168 public:
169  friend class micro_queue_pop_finalizer<T>;
170 
171  struct padded_page: page {
173  padded_page();
175  void operator=( const padded_page& );
177  T last;
178  };
179 
180  static T& get_ref( page& p, size_t index ) {
181  return (&static_cast<padded_page*>(static_cast<void*>(&p))->last)[index];
182  }
183 
186 
189 
191 
192  void push( const void* item, ticket k, concurrent_queue_base_v3<T>& base,
193  item_constructor_t construct_item ) ;
194 
195  bool pop( void* dst, ticket k, concurrent_queue_base_v3<T>& base ) ;
196 
197  micro_queue& assign( const micro_queue& src, concurrent_queue_base_v3<T>& base,
198  item_constructor_t construct_item ) ;
199 
200  page* make_copy( concurrent_queue_base_v3<T>& base, const page* src_page, size_t begin_in_page,
201  size_t end_in_page, ticket& g_index, item_constructor_t construct_item ) ;
202 
203  void invalidate_page_and_rethrow( ticket k ) ;
204 };
205 
206 template<typename T>
208  for( atomic_backoff b(true);;b.pause() ) {
209  ticket c = counter;
210  if( c==k ) return;
211  else if( c&1 ) {
212  ++rb.n_invalid_entries;
214  }
215  }
216 }
217 
218 template<typename T>
219 void micro_queue<T>::push( const void* item, ticket k, concurrent_queue_base_v3<T>& base,
220  item_constructor_t construct_item )
221 {
223  page* p = NULL;
224  size_t index = modulo_power_of_two( k/concurrent_queue_rep_base::n_queue, base.my_rep->items_per_page);
225  if( !index ) {
226  __TBB_TRY {
228  p = pa.allocate_page();
229  } __TBB_CATCH (...) {
230  ++base.my_rep->n_invalid_entries;
231  invalidate_page_and_rethrow( k );
232  }
233  p->mask = 0;
234  p->next = NULL;
235  }
236 
237  if( tail_counter != k ) spin_wait_until_my_turn( tail_counter, k, *base.my_rep );
238  call_itt_notify(acquired, &tail_counter);
239 
240  if( p ) {
241  spin_mutex::scoped_lock lock( page_mutex );
242  page* q = tail_page;
243  if( is_valid_page(q) )
244  q->next = p;
245  else
246  head_page = p;
247  tail_page = p;
248  } else {
249  p = tail_page;
250  }
251 
252  __TBB_TRY {
253  copy_item( *p, index, item, construct_item );
254  // If no exception was thrown, mark item as present.
255  itt_hide_store_word(p->mask, p->mask | uintptr_t(1)<<index);
256  call_itt_notify(releasing, &tail_counter);
257  tail_counter += concurrent_queue_rep_base::n_queue;
258  } __TBB_CATCH (...) {
259  ++base.my_rep->n_invalid_entries;
260  call_itt_notify(releasing, &tail_counter);
261  tail_counter += concurrent_queue_rep_base::n_queue;
262  __TBB_RETHROW();
263  }
264 }
265 
266 template<typename T>
269  if( head_counter!=k ) spin_wait_until_eq( head_counter, k );
270  call_itt_notify(acquired, &head_counter);
271  if( tail_counter==k ) spin_wait_while_eq( tail_counter, k );
272  call_itt_notify(acquired, &tail_counter);
273  page *p = head_page;
274  __TBB_ASSERT( p, NULL );
275  size_t index = modulo_power_of_two( k/concurrent_queue_rep_base::n_queue, base.my_rep->items_per_page );
276  bool success = false;
277  {
278  micro_queue_pop_finalizer<T> finalizer( *this, base, k+concurrent_queue_rep_base::n_queue, index==base.my_rep->items_per_page-1 ? p : NULL );
279  if( p->mask & uintptr_t(1)<<index ) {
280  success = true;
281  assign_and_destroy_item( dst, *p, index );
282  } else {
283  --base.my_rep->n_invalid_entries;
284  }
285  }
286  return success;
287 }
288 
289 template<typename T>
291  item_constructor_t construct_item )
292 {
293  head_counter = src.head_counter;
294  tail_counter = src.tail_counter;
295 
296  const page* srcp = src.head_page;
297  if( is_valid_page(srcp) ) {
298  ticket g_index = head_counter;
299  __TBB_TRY {
300  size_t n_items = (tail_counter-head_counter)/concurrent_queue_rep_base::n_queue;
301  size_t index = modulo_power_of_two( head_counter/concurrent_queue_rep_base::n_queue, base.my_rep->items_per_page );
302  size_t end_in_first_page = (index+n_items<base.my_rep->items_per_page)?(index+n_items):base.my_rep->items_per_page;
303 
304  head_page = make_copy( base, srcp, index, end_in_first_page, g_index, construct_item );
305  page* cur_page = head_page;
306 
307  if( srcp != src.tail_page ) {
308  for( srcp = srcp->next; srcp!=src.tail_page; srcp=srcp->next ) {
309  cur_page->next = make_copy( base, srcp, 0, base.my_rep->items_per_page, g_index, construct_item );
310  cur_page = cur_page->next;
311  }
312 
313  __TBB_ASSERT( srcp==src.tail_page, NULL );
314  size_t last_index = modulo_power_of_two( tail_counter/concurrent_queue_rep_base::n_queue, base.my_rep->items_per_page );
315  if( last_index==0 ) last_index = base.my_rep->items_per_page;
316 
317  cur_page->next = make_copy( base, srcp, 0, last_index, g_index, construct_item );
318  cur_page = cur_page->next;
319  }
320  tail_page = cur_page;
321  } __TBB_CATCH (...) {
322  invalidate_page_and_rethrow( g_index );
323  }
324  } else {
325  head_page = tail_page = NULL;
326  }
327  return *this;
328 }
329 
330 template<typename T>
332  // Append an invalid page at address 1 so that no more pushes are allowed.
333  page* invalid_page = (page*)uintptr_t(1);
334  {
335  spin_mutex::scoped_lock lock( page_mutex );
337  page* q = tail_page;
338  if( is_valid_page(q) )
339  q->next = invalid_page;
340  else
341  head_page = invalid_page;
342  tail_page = invalid_page;
343  }
344  __TBB_RETHROW();
345 }
346 
347 template<typename T>
349  const concurrent_queue_rep_base::page* src_page, size_t begin_in_page, size_t end_in_page,
350  ticket& g_index, item_constructor_t construct_item )
351 {
353  page* new_page = pa.allocate_page();
354  new_page->next = NULL;
355  new_page->mask = src_page->mask;
356  for( ; begin_in_page!=end_in_page; ++begin_in_page, ++g_index )
357  if( new_page->mask & uintptr_t(1)<<begin_in_page )
358  copy_item( *new_page, begin_in_page, *src_page, begin_in_page, construct_item );
359  return new_page;
360 }
361 
362 template<typename T>
369 public:
371  my_ticket(k), my_queue(queue), my_page(p), allocator(b)
372  {}
374 };
375 
376 template<typename T>
378  page* p = my_page;
379  if( is_valid_page(p) ) {
380  spin_mutex::scoped_lock lock( my_queue.page_mutex );
381  page* q = p->next;
382  my_queue.head_page = q;
383  if( !is_valid_page(q) ) {
384  my_queue.tail_page = NULL;
385  }
386  }
387  itt_store_word_with_release(my_queue.head_counter, my_ticket);
388  if( is_valid_page(p) ) {
389  allocator.deallocate_page( p );
390  }
391 }
392 
393 #if _MSC_VER && !defined(__INTEL_COMPILER)
394 #pragma warning( pop )
395 #endif // warning 4146 is back
396 
397 template<typename T> class concurrent_queue_iterator_rep ;
398 template<typename T> class concurrent_queue_iterator_base_v3;
399 
401 
404 template<typename T>
406  micro_queue<T> array[n_queue];
407 
409  static size_t index( ticket k ) {
410  return k*phi%n_queue;
411  }
412 
414  // The formula here approximates LRU in a cache-oblivious way.
415  return array[index(k)];
416  }
417 };
418 
420 
424 template<typename T>
425 class concurrent_queue_base_v3: public concurrent_queue_page_allocator {
426 private:
429 
430  friend struct concurrent_queue_rep<T>;
431  friend class micro_queue<T>;
434 
435 protected:
437 
438 private:
441 
443  concurrent_queue_rep<T>& r = *my_rep;
444  size_t n = sizeof(padded_page) + (r.items_per_page-1)*sizeof(T);
445  return reinterpret_cast<page*>(allocate_block ( n ));
446  }
447 
449  concurrent_queue_rep<T>& r = *my_rep;
450  size_t n = sizeof(padded_page) + (r.items_per_page-1)*sizeof(T);
451  deallocate_block( reinterpret_cast<void*>(p), n );
452  }
453 
455  virtual void *allocate_block( size_t n ) = 0;
456 
458  virtual void deallocate_block( void *p, size_t n ) = 0;
459 
460 protected:
462 
464 #if TBB_USE_ASSERT
465  size_t nq = my_rep->n_queue;
466  for( size_t i=0; i<nq; i++ )
467  __TBB_ASSERT( my_rep->array[i].tail_page==NULL, "pages were not freed properly" );
468 #endif /* TBB_USE_ASSERT */
469  cache_aligned_allocator<concurrent_queue_rep<T> >().deallocate(my_rep,1);
470  }
471 
473  void internal_push( const void* src, item_constructor_t construct_item ) {
474  concurrent_queue_rep<T>& r = *my_rep;
475  ticket k = r.tail_counter++;
476  r.choose(k).push( src, k, *this, construct_item );
477  }
478 
480 
481  bool internal_try_pop( void* dst ) ;
482 
484  size_t internal_size() const ;
485 
487  bool internal_empty() const ;
488 
490  /* note that the name may be misleading, but it remains so due to a historical accident. */
491  void internal_finish_clear() ;
492 
496  }
497 
499  void assign( const concurrent_queue_base_v3& src, item_constructor_t construct_item ) ;
500 
501 #if __TBB_CPP11_RVALUE_REF_PRESENT
502  void internal_swap( concurrent_queue_base_v3& src ) {
504  std::swap( my_rep, src.my_rep );
505  }
506 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
507 };
508 
509 template<typename T>
511  const size_t item_size = sizeof(T);
512  my_rep = cache_aligned_allocator<concurrent_queue_rep<T> >().allocate(1);
513  __TBB_ASSERT( (size_t)my_rep % NFS_GetLineSize()==0, "alignment error" );
514  __TBB_ASSERT( (size_t)&my_rep->head_counter % NFS_GetLineSize()==0, "alignment error" );
515  __TBB_ASSERT( (size_t)&my_rep->tail_counter % NFS_GetLineSize()==0, "alignment error" );
516  __TBB_ASSERT( (size_t)&my_rep->array % NFS_GetLineSize()==0, "alignment error" );
517  memset(static_cast<void*>(my_rep),0,sizeof(concurrent_queue_rep<T>));
518  my_rep->item_size = item_size;
519  my_rep->items_per_page = item_size<= 8 ? 32 :
520  item_size<= 16 ? 16 :
521  item_size<= 32 ? 8 :
522  item_size<= 64 ? 4 :
523  item_size<=128 ? 2 :
524  1;
525 }
526 
527 template<typename T>
529  concurrent_queue_rep<T>& r = *my_rep;
530  ticket k;
531  do {
532  k = r.head_counter;
533  for(;;) {
534  if( (ptrdiff_t)(r.tail_counter-k)<=0 ) {
535  // Queue is empty
536  return false;
537  }
538  // Queue had item with ticket k when we looked. Attempt to get that item.
539  ticket tk=k;
540 #if defined(_MSC_VER) && defined(_Wp64)
541  #pragma warning (push)
542  #pragma warning (disable: 4267)
543 #endif
544  k = r.head_counter.compare_and_swap( tk+1, tk );
545 #if defined(_MSC_VER) && defined(_Wp64)
546  #pragma warning (pop)
547 #endif
548  if( k==tk )
549  break;
550  // Another thread snatched the item, retry.
551  }
552  } while( !r.choose( k ).pop( dst, k, *this ) );
553  return true;
554 }
555 
556 template<typename T>
558  concurrent_queue_rep<T>& r = *my_rep;
559  __TBB_ASSERT( sizeof(ptrdiff_t)<=sizeof(size_t), NULL );
560  ticket hc = r.head_counter;
561  size_t nie = r.n_invalid_entries;
562  ticket tc = r.tail_counter;
563  __TBB_ASSERT( hc!=tc || !nie, NULL );
564  ptrdiff_t sz = tc-hc-nie;
565  return sz<0 ? 0 : size_t(sz);
566 }
567 
568 template<typename T>
570  concurrent_queue_rep<T>& r = *my_rep;
571  ticket tc = r.tail_counter;
572  ticket hc = r.head_counter;
573  // if tc!=r.tail_counter, the queue was not empty at some point between the two reads.
574  return tc==r.tail_counter && tc==hc+r.n_invalid_entries ;
575 }
576 
577 template<typename T>
579  concurrent_queue_rep<T>& r = *my_rep;
580  size_t nq = r.n_queue;
581  for( size_t i=0; i<nq; ++i ) {
582  page* tp = r.array[i].tail_page;
583  if( is_valid_page(tp) ) {
584  __TBB_ASSERT( r.array[i].head_page==tp, "at most one page should remain" );
585  deallocate_page( tp );
586  r.array[i].tail_page = NULL;
587  } else
588  __TBB_ASSERT( !is_valid_page(r.array[i].head_page), "head page pointer corrupt?" );
589  }
590 }
591 
592 template<typename T>
594  item_constructor_t construct_item )
595 {
596  concurrent_queue_rep<T>& r = *my_rep;
597  r.items_per_page = src.my_rep->items_per_page;
598 
599  // copy concurrent_queue_rep data
600  r.head_counter = src.my_rep->head_counter;
601  r.tail_counter = src.my_rep->tail_counter;
602  r.n_invalid_entries = src.my_rep->n_invalid_entries;
603 
604  // copy or move micro_queues
605  for( size_t i = 0; i < r.n_queue; ++i )
606  r.array[i].assign( src.my_rep->array[i], *this, construct_item);
607 
608  __TBB_ASSERT( r.head_counter==src.my_rep->head_counter && r.tail_counter==src.my_rep->tail_counter,
609  "the source concurrent queue should not be concurrently modified." );
610 }
611 
612 template<typename Container, typename Value> class concurrent_queue_iterator;
613 
614 template<typename T>
617 public:
622  head_counter(queue.my_rep->head_counter),
623  my_queue(queue)
624  {
625  for( size_t k=0; k<concurrent_queue_rep<T>::n_queue; ++k )
626  array[k] = queue.my_rep->array[k].head_page;
627  }
628 
630  bool get_item( T*& item, size_t k ) ;
631 };
632 
633 template<typename T>
634 bool concurrent_queue_iterator_rep<T>::get_item( T*& item, size_t k ) {
635  if( k==my_queue.my_rep->tail_counter ) {
636  item = NULL;
637  return true;
638  } else {
640  __TBB_ASSERT(p,NULL);
641  size_t i = modulo_power_of_two( k/concurrent_queue_rep<T>::n_queue, my_queue.my_rep->items_per_page );
642  item = &micro_queue<T>::get_ref(*p,i);
643  return (p->mask & uintptr_t(1)<<i)!=0;
644  }
645 }
646 
648 
649 template<typename Value>
652 
654 
655  template<typename C, typename T, typename U>
657 
658  template<typename C, typename T, typename U>
660 protected:
662  Value* my_item;
663 
665  concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) {
666 #if __TBB_GCC_OPTIMIZER_ORDERING_BROKEN
668 #endif
669  }
670 
673  : no_assign(), my_rep(NULL), my_item(NULL) {
674  assign(i);
675  }
676 
679 
681  void assign( const concurrent_queue_iterator_base_v3<Value>& other ) ;
682 
684  void advance() ;
685 
689  my_rep = NULL;
690  }
691 };
692 
693 template<typename Value>
696  new( my_rep ) concurrent_queue_iterator_rep<Value>(queue);
697  size_t k = my_rep->head_counter;
698  if( !my_rep->get_item(my_item, k) ) advance();
699 }
700 
701 template<typename Value>
703  if( my_rep!=other.my_rep ) {
704  if( my_rep ) {
706  my_rep = NULL;
707  }
708  if( other.my_rep ) {
710  new( my_rep ) concurrent_queue_iterator_rep<Value>( *other.my_rep );
711  }
712  }
713  my_item = other.my_item;
714 }
715 
716 template<typename Value>
718  __TBB_ASSERT( my_item, "attempt to increment iterator past end of queue" );
719  size_t k = my_rep->head_counter;
720  const concurrent_queue_base_v3<Value>& queue = my_rep->my_queue;
721 #if TBB_USE_ASSERT
722  Value* tmp;
723  my_rep->get_item(tmp,k);
724  __TBB_ASSERT( my_item==tmp, NULL );
725 #endif /* TBB_USE_ASSERT */
727  if( i==queue.my_rep->items_per_page-1 ) {
729  root = root->next;
730  }
731  // advance k
732  my_rep->head_counter = ++k;
733  if( !my_rep->get_item(my_item, k) ) advance();
734 }
735 
737 
738 template<typename T> struct tbb_remove_cv {typedef T type;};
739 template<typename T> struct tbb_remove_cv<const T> {typedef T type;};
740 template<typename T> struct tbb_remove_cv<volatile T> {typedef T type;};
741 template<typename T> struct tbb_remove_cv<const volatile T> {typedef T type;};
742 
744 
746 template<typename Container, typename Value>
747 class concurrent_queue_iterator: public concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>,
748  public std::iterator<std::forward_iterator_tag,Value> {
749 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
750  template<typename T, class A>
751  friend class ::tbb::strict_ppl::concurrent_queue;
752 #else
753 public:
754 #endif
757  concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>(queue)
758  {
759  }
760 
761 public:
763 
767  concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>(other)
768  {}
769 
772  this->assign(other);
773  return *this;
774  }
775 
777  Value& operator*() const {
778  return *static_cast<Value*>(this->my_item);
779  }
780 
781  Value* operator->() const {return &operator*();}
782 
785  this->advance();
786  return *this;
787  }
788 
790  Value* operator++(int) {
791  Value* result = &operator*();
792  operator++();
793  return result;
794  }
795 }; // concurrent_queue_iterator
796 
797 
798 template<typename C, typename T, typename U>
800  return i.my_item==j.my_item;
801 }
802 
803 template<typename C, typename T, typename U>
805  return i.my_item!=j.my_item;
806 }
807 
808 } // namespace internal
809 
811 
812 } // namespace strict_ppl
813 
815 namespace internal {
816 
820 template<typename Container, typename Value> class concurrent_queue_iterator;
821 
823 
826 private:
829 
830  friend class concurrent_queue_rep;
831  friend struct micro_queue;
835 protected:
837  struct page {
839  uintptr_t mask;
840  };
841 
843  ptrdiff_t my_capacity;
844 
847 
849  size_t item_size;
850 
851  enum copy_specifics { copy, move };
852 
853 #if __TBB_PROTECTED_NESTED_CLASS_BROKEN
854 public:
855 #endif
856  template<typename T>
857  struct padded_page: page {
859  padded_page();
861  void operator=( const padded_page& );
863  T last;
864  };
865 
866 private:
867  virtual void copy_item( page& dst, size_t index, const void* src ) = 0;
868  virtual void assign_and_destroy_item( void* dst, page& src, size_t index ) = 0;
869 protected:
872 
874  void __TBB_EXPORTED_METHOD internal_push( const void* src );
875 
877  void __TBB_EXPORTED_METHOD internal_pop( void* dst );
878 
880  void __TBB_EXPORTED_METHOD internal_abort();
881 
883  bool __TBB_EXPORTED_METHOD internal_push_if_not_full( const void* src );
884 
886 
887  bool __TBB_EXPORTED_METHOD internal_pop_if_present( void* dst );
888 
890  ptrdiff_t __TBB_EXPORTED_METHOD internal_size() const;
891 
894 
896  void __TBB_EXPORTED_METHOD internal_set_capacity( ptrdiff_t capacity, size_t element_size );
897 
899  virtual page *allocate_page() = 0;
900 
902  virtual void deallocate_page( page *p ) = 0;
903 
905  /* note that the name may be misleading, but it remains so due to a historical accident. */
907 
910 
913 
914 #if __TBB_CPP11_RVALUE_REF_PRESENT
917  std::swap( my_capacity, src.my_capacity );
918  std::swap( items_per_page, src.items_per_page );
919  std::swap( item_size, src.item_size );
920  std::swap( my_rep, src.my_rep );
921  }
922 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
923 
925  void internal_insert_item( const void* src, copy_specifics op_type );
926 
928  bool internal_insert_if_not_full( const void* src, copy_specifics op_type );
929 
931  void internal_assign( const concurrent_queue_base_v3& src, copy_specifics op_type );
932 private:
933  virtual void copy_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) = 0;
934 };
935 
937 
940 protected:
941  concurrent_queue_base_v8( size_t item_sz ) : concurrent_queue_base_v3( item_sz ) {}
942 
945 
948 
950  void __TBB_EXPORTED_METHOD internal_push_move( const void* src );
951 private:
952  friend struct micro_queue;
953  virtual void move_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) = 0;
954  virtual void move_item( page& dst, size_t index, const void* src ) = 0;
955 };
956 
958 
961 
963 
964  template<typename C, typename T, typename U>
966 
967  template<typename C, typename T, typename U>
969 
970  void initialize( const concurrent_queue_base_v3& queue, size_t offset_of_data );
971 protected:
973  void* my_item;
974 
977 
980  assign(i);
981  }
982 
984 
986 
989 
992 
995 
998 };
999 
1001 
1003 
1005 template<typename Container, typename Value>
1007  public std::iterator<std::forward_iterator_tag,Value> {
1008 
1009 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
1010  template<typename T, class A>
1011  friend class ::tbb::concurrent_bounded_queue;
1012 #else
1013 public:
1014 #endif
1015 
1019  {
1020  }
1021 
1022 public:
1024 
1029  {}
1030 
1033  assign(other);
1034  return *this;
1035  }
1036 
1038  Value& operator*() const {
1039  return *static_cast<Value*>(my_item);
1040  }
1041 
1042  Value* operator->() const {return &operator*();}
1043 
1046  advance();
1047  return *this;
1048  }
1049 
1051  Value* operator++(int) {
1052  Value* result = &operator*();
1053  operator++();
1054  return result;
1055  }
1056 }; // concurrent_queue_iterator
1057 
1058 
1059 template<typename C, typename T, typename U>
1061  return i.my_item==j.my_item;
1062 }
1063 
1064 template<typename C, typename T, typename U>
1066  return i.my_item!=j.my_item;
1067 }
1068 
1069 } // namespace internal;
1070 
1072 
1073 } // namespace tbb
1074 
1075 #endif /* __TBB__concurrent_queue_impl_H */
virtual concurrent_queue_rep_base::page * allocate_page()=0
friend bool operator!=(const concurrent_queue_iterator< C, T > &i, const concurrent_queue_iterator< C, U > &j)
Meets requirements of a forward iterator for STL.
void push(const void *item, ticket k, concurrent_queue_base_v3< T > &base, item_constructor_t construct_item)
#define __TBB_override
Definition: tbb_stddef.h:244
bool __TBB_EXPORTED_METHOD internal_push_move_if_not_full(const void *src)
Attempt to enqueue item onto queue using move operation.
concurrent_queue_iterator(const concurrent_queue_iterator< Container, typename Container::value_type > &other)
size_t __TBB_EXPORTED_FUNC NFS_GetLineSize()
Cache/sector line size.
Meets "allocator" requirements of ISO C++ Standard, Section 20.1.5.
argument_integer_type modulo_power_of_two(argument_integer_type arg, divisor_integer_type divisor)
A function to compute arg modulo divisor where divisor is a power of 2.
Definition: tbb_stddef.h:365
Internal representation of a ConcurrentQueue.
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:169
concurrent_queue_iterator(const concurrent_queue_iterator< Container, typename Container::value_type > &other)
void itt_store_word_with_release(tbb::atomic< T > &dst, U src)
Value & operator*() const
Reference to current item.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d
ptrdiff_t my_capacity
Capacity of the queue.
size_t internal_size() const
Get size of queue; result may be invalid if queue is modified concurrently.
void copy_item(page &dst, size_t dindex, const void *src, item_constructor_t construct_item)
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type type
Base class for types that should not be copied or assigned.
Definition: tbb_stddef.h:335
void pause()
Pause for a while.
Definition: tbb_machine.h:364
void swap(atomic< T > &lhs, atomic< T > &rhs)
Definition: atomic.h:539
#define __TBB_TRY
Definition: tbb_stddef.h:287
#define __TBB_offsetof(class_name, member_name)
Extended variant of the standard offsetof macro.
Definition: tbb_stddef.h:270
Constness-independent portion of concurrent_queue_iterator.
Class used to ensure exception-safety of method "pop".
friend bool operator==(const concurrent_queue_iterator< C, T > &i, const concurrent_queue_iterator< C, U > &j)
void assign_and_destroy_item(void *dst, page &src, size_t index)
parts of concurrent_queue_rep that do not have references to micro_queue
void assign(const concurrent_queue_base_v3 &src, item_constructor_t construct_item)
copy or move internal representation
void const char const char int ITT_FORMAT __itt_group_sync p
void __TBB_EXPORTED_METHOD internal_push_move(const void *src)
Enqueue item at tail of queue using move operation.
A lock that occupies a single byte.
Definition: spin_mutex.h:40
void copy_item(page &dst, size_t dindex, const page &src, size_t sindex, item_constructor_t construct_item)
Base class for types that should not be assigned.
Definition: tbb_stddef.h:324
void throw_exception(exception_id eid)
Versionless convenience wrapper for throw_exception_v4()
void assign(const concurrent_queue_iterator_base_v3< Value > &other)
Assignment.
auto last(Container &c) -> decltype(begin(c))
#define __TBB_CATCH(e)
Definition: tbb_stddef.h:288
bool internal_empty() const
check if the queue is empty; thread safe
#define __TBB_EXPORTED_METHOD
Definition: tbb_stddef.h:102
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
micro_queue< T >::item_constructor_t item_constructor_t
Value & operator*() const
Reference to current item.
#define __TBB_compiler_fence()
Definition: icc_generic.h:55
The graph class.
concurrent_queue_iterator_rep(const concurrent_queue_base_v3< T > &queue)
void spin_wait_while_eq(const volatile T &location, U value)
Spin WHILE the value of the variable is equal to a given value.
Definition: tbb_machine.h:395
A queue using simple locking.
Represents acquisition of a mutex.
Definition: spin_mutex.h:54
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long value
bool get_item(T *&item, size_t k)
Set item to point to kth element. Return true if at end of queue or item is marked valid; false other...
void __TBB_EXPORTED_METHOD move_content(concurrent_queue_base_v8 &src)
move items
concurrent_queue_iterator & operator++()
Advance to next item in queue.
void call_itt_notify(notify_type, void *)
Meets requirements of a forward iterator for STL.
void advance()
Advance iterator one step towards tail of queue.
concurrent_queue_iterator & operator=(const concurrent_queue_iterator &other)
Iterator assignment.
void operator=(const padded_page &)
Not defined anywhere - exists to quiet warnings.
micro_queue_pop_finalizer(micro_queue< T > &queue, concurrent_queue_base_v3< T > &b, ticket k, page *p)
Class that implements exponential backoff.
Definition: tbb_machine.h:349
void move(tbb_thread &t1, tbb_thread &t2)
Definition: tbb_thread.h:309
concurrent_queue_iterator & operator++()
Advance to next item in queue.
bool internal_try_pop(void *dst)
Attempt to dequeue item from queue.
void spin_wait_until_eq(const volatile T &location, const U value)
Spin UNTIL the value of the variable is equal to a given value.
Definition: tbb_machine.h:403
concurrent_queue_iterator_base_v3(const concurrent_queue_iterator_base_v3 &i)
Copy constructor.
const size_t NFS_MaxLineSize
Compile-time constant that is upper bound on cache line/sector size.
Definition: tbb_stddef.h:220
virtual void move_item(page &dst, size_t index, const void *src)=0
Type-independent portion of concurrent_queue_iterator.
void itt_hide_store_word(T &dst, T src)
concurrent_queue_rep * my_rep
Internal representation.
concurrent_queue_rep_base::page page
micro_queue & assign(const micro_queue &src, concurrent_queue_base_v3< T > &base, item_constructor_t construct_item)
concurrent_queue_iterator & operator=(const concurrent_queue_iterator &other)
Iterator assignment.
value_type compare_and_swap(value_type value, value_type comparand)
Definition: atomic.h:289
concurrent_queue_iterator(const concurrent_queue_base_v3 &queue)
Construct iterator pointing to head of queue.
virtual void move_page_item(page &dst, size_t dindex, const page &src, size_t sindex)=0
page * make_copy(concurrent_queue_base_v3< T > &base, const page *src_page, size_t begin_in_page, size_t end_in_page, ticket &g_index, item_constructor_t construct_item)
concurrent_queue_iterator_base_v3(const concurrent_queue_iterator_base_v3 &i)
Copy constructor.
void internal_push(const void *src, item_constructor_t construct_item)
Enqueue item at tail of queue.
bool is_valid_page(const concurrent_queue_rep_base::page *p)
atomic< size_t > n_invalid_entries
number of invalid entries in the queue
bool pop(void *dst, ticket k, concurrent_queue_base_v3< T > &base)
Identifiers declared inside namespace internal should never be used directly by client code.
Definition: atomic.h:55
bool operator!=(const concurrent_queue_iterator< C, T > &i, const concurrent_queue_iterator< C, U > &j)
Abstract class to define interface for page allocation/deallocation.
void spin_wait_until_my_turn(atomic< ticket > &counter, ticket k, concurrent_queue_rep_base &rb) const
virtual void deallocate_page(concurrent_queue_rep_base::page *p) __TBB_override
#define __TBB_RETHROW()
Definition: tbb_stddef.h:290
representation of concurrent_queue_base
static size_t index(ticket k)
Map ticket to an array index.
concurrent_queue_iterator_rep< Value > * my_rep
Represents concurrent_queue over which we are iterating.
void internal_swap(concurrent_queue_base_v3 &src)
swap internal representation
static T & get_ref(page &p, size_t index)
bool operator==(const concurrent_queue_iterator< C, T > &i, const concurrent_queue_iterator< C, U > &j)
concurrent_queue_iterator_base_v3 concurrent_queue_iterator_base
concurrent_queue_rep< T > * my_rep
Internal representation.
concurrent_queue_iterator_rep * my_rep
concurrent_queue over which we are iterating.
padded_page()
Not defined anywhere - exists to quiet warnings.

Copyright © 2005-2019 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.