Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
_flow_graph_join_impl.h
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2019 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 
16 
17 
18 
19 */
20 
21 #ifndef __TBB__flow_graph_join_impl_H
22 #define __TBB__flow_graph_join_impl_H
23 
24 #ifndef __TBB_flow_graph_H
25 #error Do not #include this internal file directly; use public TBB headers instead.
26 #endif
27 
28 namespace internal {
29 
31  forwarding_base(graph &g) : graph_ref(g) {}
32  virtual ~forwarding_base() {}
33  // decrement_port_count may create a forwarding task. If we cannot handle the task
34  // ourselves, ask decrement_port_count to deal with it.
35  virtual task * decrement_port_count(bool handle_task) = 0;
36  virtual void increment_port_count() = 0;
37  // moved here so input ports can queue tasks
38  graph& graph_ref;
39  };
40 
41  // specialization that lets us keep a copy of the current_key for building results.
42  // KeyType can be a reference type.
43  template<typename KeyType>
47  virtual task * increment_key_count(current_key_type const & /*t*/, bool /*handle_task*/) = 0; // {return NULL;}
48  current_key_type current_key; // so ports can refer to FE's desired items
49  };
50 
51  template< int N >
52  struct join_helper {
53 
54  template< typename TupleType, typename PortType >
55  static inline void set_join_node_pointer(TupleType &my_input, PortType *port) {
56  tbb::flow::get<N-1>( my_input ).set_join_node_pointer(port);
58  }
59  template< typename TupleType >
60  static inline void consume_reservations( TupleType &my_input ) {
61  tbb::flow::get<N-1>( my_input ).consume();
63  }
64 
65  template< typename TupleType >
66  static inline void release_my_reservation( TupleType &my_input ) {
67  tbb::flow::get<N-1>( my_input ).release();
68  }
69 
70  template <typename TupleType>
71  static inline void release_reservations( TupleType &my_input) {
73  release_my_reservation(my_input);
74  }
75 
76  template< typename InputTuple, typename OutputTuple >
77  static inline bool reserve( InputTuple &my_input, OutputTuple &out) {
78  if ( !tbb::flow::get<N-1>( my_input ).reserve( tbb::flow::get<N-1>( out ) ) ) return false;
79  if ( !join_helper<N-1>::reserve( my_input, out ) ) {
80  release_my_reservation( my_input );
81  return false;
82  }
83  return true;
84  }
85 
86  template<typename InputTuple, typename OutputTuple>
87  static inline bool get_my_item( InputTuple &my_input, OutputTuple &out) {
88  bool res = tbb::flow::get<N-1>(my_input).get_item(tbb::flow::get<N-1>(out) ); // may fail
89  return join_helper<N-1>::get_my_item(my_input, out) && res; // do get on other inputs before returning
90  }
91 
92  template<typename InputTuple, typename OutputTuple>
93  static inline bool get_items(InputTuple &my_input, OutputTuple &out) {
94  return get_my_item(my_input, out);
95  }
96 
97  template<typename InputTuple>
98  static inline void reset_my_port(InputTuple &my_input) {
100  tbb::flow::get<N-1>(my_input).reset_port();
101  }
102 
103  template<typename InputTuple>
104  static inline void reset_ports(InputTuple& my_input) {
105  reset_my_port(my_input);
106  }
107 
108  template<typename InputTuple, typename KeyFuncTuple>
109  static inline void set_key_functors(InputTuple &my_input, KeyFuncTuple &my_key_funcs) {
110  tbb::flow::get<N-1>(my_input).set_my_key_func(tbb::flow::get<N-1>(my_key_funcs));
111  tbb::flow::get<N-1>(my_key_funcs) = NULL;
112  join_helper<N-1>::set_key_functors(my_input, my_key_funcs);
113  }
114 
115  template< typename KeyFuncTuple>
116  static inline void copy_key_functors(KeyFuncTuple &my_inputs, KeyFuncTuple &other_inputs) {
117  if(tbb::flow::get<N-1>(other_inputs).get_my_key_func()) {
118  tbb::flow::get<N-1>(my_inputs).set_my_key_func(tbb::flow::get<N-1>(other_inputs).get_my_key_func()->clone());
119  }
120  join_helper<N-1>::copy_key_functors(my_inputs, other_inputs);
121  }
122 
123  template<typename InputTuple>
124  static inline void reset_inputs(InputTuple &my_input, reset_flags f) {
125  join_helper<N-1>::reset_inputs(my_input, f);
126  tbb::flow::get<N-1>(my_input).reset_receiver(f);
127  }
128 
129 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
130  template<typename InputTuple>
131  static inline void extract_inputs(InputTuple &my_input) {
133  tbb::flow::get<N-1>(my_input).extract_receiver();
134  }
135 #endif
136  }; // join_helper<N>
137 
138  template< >
139  struct join_helper<1> {
140 
141  template< typename TupleType, typename PortType >
142  static inline void set_join_node_pointer(TupleType &my_input, PortType *port) {
143  tbb::flow::get<0>( my_input ).set_join_node_pointer(port);
144  }
145 
146  template< typename TupleType >
147  static inline void consume_reservations( TupleType &my_input ) {
148  tbb::flow::get<0>( my_input ).consume();
149  }
150 
151  template< typename TupleType >
152  static inline void release_my_reservation( TupleType &my_input ) {
153  tbb::flow::get<0>( my_input ).release();
154  }
155 
156  template<typename TupleType>
157  static inline void release_reservations( TupleType &my_input) {
158  release_my_reservation(my_input);
159  }
160 
161  template< typename InputTuple, typename OutputTuple >
162  static inline bool reserve( InputTuple &my_input, OutputTuple &out) {
163  return tbb::flow::get<0>( my_input ).reserve( tbb::flow::get<0>( out ) );
164  }
165 
166  template<typename InputTuple, typename OutputTuple>
167  static inline bool get_my_item( InputTuple &my_input, OutputTuple &out) {
168  return tbb::flow::get<0>(my_input).get_item(tbb::flow::get<0>(out));
169  }
170 
171  template<typename InputTuple, typename OutputTuple>
172  static inline bool get_items(InputTuple &my_input, OutputTuple &out) {
173  return get_my_item(my_input, out);
174  }
175 
176  template<typename InputTuple>
177  static inline void reset_my_port(InputTuple &my_input) {
178  tbb::flow::get<0>(my_input).reset_port();
179  }
180 
181  template<typename InputTuple>
182  static inline void reset_ports(InputTuple& my_input) {
183  reset_my_port(my_input);
184  }
185 
186  template<typename InputTuple, typename KeyFuncTuple>
187  static inline void set_key_functors(InputTuple &my_input, KeyFuncTuple &my_key_funcs) {
188  tbb::flow::get<0>(my_input).set_my_key_func(tbb::flow::get<0>(my_key_funcs));
189  tbb::flow::get<0>(my_key_funcs) = NULL;
190  }
191 
192  template< typename KeyFuncTuple>
193  static inline void copy_key_functors(KeyFuncTuple &my_inputs, KeyFuncTuple &other_inputs) {
194  if(tbb::flow::get<0>(other_inputs).get_my_key_func()) {
195  tbb::flow::get<0>(my_inputs).set_my_key_func(tbb::flow::get<0>(other_inputs).get_my_key_func()->clone());
196  }
197  }
198  template<typename InputTuple>
199  static inline void reset_inputs(InputTuple &my_input, reset_flags f) {
200  tbb::flow::get<0>(my_input).reset_receiver(f);
201  }
202 
203 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
204  template<typename InputTuple>
205  static inline void extract_inputs(InputTuple &my_input) {
206  tbb::flow::get<0>(my_input).extract_receiver();
207  }
208 #endif
209  }; // join_helper<1>
210 
212  template< typename T >
213  class reserving_port : public receiver<T> {
214  public:
215  typedef T input_type;
216  typedef typename receiver<input_type>::predecessor_type predecessor_type;
217 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
218  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
219  typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
220 #endif
221  private:
222  // ----------- Aggregator ------------
224 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
225  , add_blt_pred, del_blt_pred, blt_pred_cnt, blt_pred_cpy
226 #endif
227  };
230 
231  class reserving_port_operation : public aggregated_operation<reserving_port_operation> {
232  public:
233  char type;
234  union {
235  T *my_arg;
237 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
238  size_t cnt_val;
239  predecessor_list_type *plist;
240 #endif
241  };
243  type(char(t)), my_arg(const_cast<T*>(&e)) {}
245  my_pred(const_cast<predecessor_type *>(&s)) {}
247  };
248 
249  typedef internal::aggregating_functor<class_type, reserving_port_operation> handler_type;
250  friend class internal::aggregating_functor<class_type, reserving_port_operation>;
251  aggregator<handler_type, reserving_port_operation> my_aggregator;
252 
254  reserving_port_operation *current;
255  bool no_predecessors;
256  while(op_list) {
257  current = op_list;
258  op_list = op_list->next;
259  switch(current->type) {
260  case reg_pred:
261  no_predecessors = my_predecessors.empty();
262  my_predecessors.add(*(current->my_pred));
263  if ( no_predecessors ) {
264  (void) my_join->decrement_port_count(true); // may try to forward
265  }
266  __TBB_store_with_release(current->status, SUCCEEDED);
267  break;
268  case rem_pred:
269  my_predecessors.remove(*(current->my_pred));
271  __TBB_store_with_release(current->status, SUCCEEDED);
272  break;
273  case res_item:
274  if ( reserved ) {
275  __TBB_store_with_release(current->status, FAILED);
276  }
277  else if ( my_predecessors.try_reserve( *(current->my_arg) ) ) {
278  reserved = true;
279  __TBB_store_with_release(current->status, SUCCEEDED);
280  } else {
281  if ( my_predecessors.empty() ) {
283  }
284  __TBB_store_with_release(current->status, FAILED);
285  }
286  break;
287  case rel_res:
288  reserved = false;
290  __TBB_store_with_release(current->status, SUCCEEDED);
291  break;
292  case con_res:
293  reserved = false;
295  __TBB_store_with_release(current->status, SUCCEEDED);
296  break;
297 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
298  case add_blt_pred:
299  my_predecessors.internal_add_built_predecessor(*(current->my_pred));
300  __TBB_store_with_release(current->status, SUCCEEDED);
301  break;
302  case del_blt_pred:
303  my_predecessors.internal_delete_built_predecessor(*(current->my_pred));
304  __TBB_store_with_release(current->status, SUCCEEDED);
305  break;
306  case blt_pred_cnt:
307  current->cnt_val = my_predecessors.predecessor_count();
308  __TBB_store_with_release(current->status, SUCCEEDED);
309  break;
310  case blt_pred_cpy:
311  my_predecessors.copy_predecessors(*(current->plist));
312  __TBB_store_with_release(current->status, SUCCEEDED);
313  break;
314 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
315  }
316  }
317  }
318 
319  protected:
320  template< typename R, typename B > friend class run_and_put_task;
321  template<typename X, typename Y> friend class internal::broadcast_cache;
322  template<typename X, typename Y> friend class internal::round_robin_cache;
324  return NULL;
325  }
326 
328  return my_join->graph_ref;
329  }
330 
331  public:
332 
335  my_join = NULL;
336  my_predecessors.set_owner( this );
337  my_aggregator.initialize_handler(handler_type(this));
338  }
339 
340  // copy constructor
341  reserving_port(const reserving_port& /* other */) : receiver<T>() {
342  reserved = false;
343  my_join = NULL;
344  my_predecessors.set_owner( this );
345  my_aggregator.initialize_handler(handler_type(this));
346  }
347 
349  my_join = join;
350  }
351 
354  reserving_port_operation op_data(src, reg_pred);
355  my_aggregator.execute(&op_data);
356  return op_data.status == SUCCEEDED;
357  }
358 
361  reserving_port_operation op_data(src, rem_pred);
362  my_aggregator.execute(&op_data);
363  return op_data.status == SUCCEEDED;
364  }
365 
367  bool reserve( T &v ) {
369  my_aggregator.execute(&op_data);
370  return op_data.status == SUCCEEDED;
371  }
372 
374  void release( ) {
376  my_aggregator.execute(&op_data);
377  }
378 
380  void consume( ) {
382  my_aggregator.execute(&op_data);
383  }
384 
385 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
386  built_predecessors_type &built_predecessors() __TBB_override { return my_predecessors.built_predecessors(); }
387  void internal_add_built_predecessor(predecessor_type &src) __TBB_override {
388  reserving_port_operation op_data(src, add_blt_pred);
389  my_aggregator.execute(&op_data);
390  }
391 
392  void internal_delete_built_predecessor(predecessor_type &src) __TBB_override {
393  reserving_port_operation op_data(src, del_blt_pred);
394  my_aggregator.execute(&op_data);
395  }
396 
397  size_t predecessor_count() __TBB_override {
398  reserving_port_operation op_data(blt_pred_cnt);
399  my_aggregator.execute(&op_data);
400  return op_data.cnt_val;
401  }
402 
403  void copy_predecessors(predecessor_list_type &l) __TBB_override {
404  reserving_port_operation op_data(blt_pred_cpy);
405  op_data.plist = &l;
406  my_aggregator.execute(&op_data);
407  }
408 
409  void extract_receiver() {
410  my_predecessors.built_predecessors().receiver_extract(*this);
411  }
412 
413 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
414 
417  else
419  reserved = false;
420  __TBB_ASSERT(!(f&rf_clear_edges) || my_predecessors.empty(), "port edges not removed");
421  }
422 
423  private:
426  bool reserved;
427  }; // reserving_port
428 
430  template<typename T>
431  class queueing_port : public receiver<T>, public item_buffer<T> {
432  public:
433  typedef T input_type;
434  typedef typename receiver<input_type>::predecessor_type predecessor_type;
436 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
437  typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
438  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
439 #endif
440 
441  // ----------- Aggregator ------------
442  private:
444 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
445  , add_blt_pred, del_blt_pred, blt_pred_cnt, blt_pred_cpy
446 #endif
447  };
449 
450  class queueing_port_operation : public aggregated_operation<queueing_port_operation> {
451  public:
452  char type;
454  T *my_arg;
455 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
456  predecessor_type *pred;
457  size_t cnt_val;
458  predecessor_list_type *plist;
459 #endif
461  // constructor for value parameter
463  type(char(t)), my_val(e)
464  , bypass_t(NULL)
465  {}
466  // constructor for pointer parameter
468  type(char(t)), my_arg(const_cast<T*>(p))
469  , bypass_t(NULL)
470  {}
471  // constructor with no parameter
473  , bypass_t(NULL)
474  {}
475  };
476 
477  typedef internal::aggregating_functor<class_type, queueing_port_operation> handler_type;
478  friend class internal::aggregating_functor<class_type, queueing_port_operation>;
479  aggregator<handler_type, queueing_port_operation> my_aggregator;
480 
482  queueing_port_operation *current;
483  bool was_empty;
484  while(op_list) {
485  current = op_list;
486  op_list = op_list->next;
487  switch(current->type) {
488  case try__put_task: {
489  task *rtask = NULL;
490  was_empty = this->buffer_empty();
491  this->push_back(current->my_val);
492  if (was_empty) rtask = my_join->decrement_port_count(false);
493  else
494  rtask = SUCCESSFULLY_ENQUEUED;
495  current->bypass_t = rtask;
496  __TBB_store_with_release(current->status, SUCCEEDED);
497  }
498  break;
499  case get__item:
500  if(!this->buffer_empty()) {
501  *(current->my_arg) = this->front();
502  __TBB_store_with_release(current->status, SUCCEEDED);
503  }
504  else {
505  __TBB_store_with_release(current->status, FAILED);
506  }
507  break;
508  case res_port:
509  __TBB_ASSERT(this->my_item_valid(this->my_head), "No item to reset");
510  this->destroy_front();
511  if(this->my_item_valid(this->my_head)) {
513  }
514  __TBB_store_with_release(current->status, SUCCEEDED);
515  break;
516 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
517  case add_blt_pred:
518  my_built_predecessors.add_edge(*(current->pred));
519  __TBB_store_with_release(current->status, SUCCEEDED);
520  break;
521  case del_blt_pred:
522  my_built_predecessors.delete_edge(*(current->pred));
523  __TBB_store_with_release(current->status, SUCCEEDED);
524  break;
525  case blt_pred_cnt:
526  current->cnt_val = my_built_predecessors.edge_count();
527  __TBB_store_with_release(current->status, SUCCEEDED);
528  break;
529  case blt_pred_cpy:
530  my_built_predecessors.copy_edges(*(current->plist));
531  __TBB_store_with_release(current->status, SUCCEEDED);
532  break;
533 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
534  }
535  }
536  }
537  // ------------ End Aggregator ---------------
538 
539  protected:
540  template< typename R, typename B > friend class run_and_put_task;
541  template<typename X, typename Y> friend class internal::broadcast_cache;
542  template<typename X, typename Y> friend class internal::round_robin_cache;
545  my_aggregator.execute(&op_data);
546  __TBB_ASSERT(op_data.status == SUCCEEDED || !op_data.bypass_t, "inconsistent return from aggregator");
547  if(!op_data.bypass_t) return SUCCESSFULLY_ENQUEUED;
548  return op_data.bypass_t;
549  }
550 
552  return my_join->graph_ref;
553  }
554 
555  public:
556 
559  my_join = NULL;
560  my_aggregator.initialize_handler(handler_type(this));
561  }
562 
564  queueing_port(const queueing_port& /* other */) : receiver<T>(), item_buffer<T>() {
565  my_join = NULL;
566  my_aggregator.initialize_handler(handler_type(this));
567  }
568 
571  my_join = join;
572  }
573 
574  bool get_item( T &v ) {
575  queueing_port_operation op_data(&v, get__item);
576  my_aggregator.execute(&op_data);
577  return op_data.status == SUCCEEDED;
578  }
579 
580  // reset_port is called when item is accepted by successor, but
581  // is initiated by join_node.
582  void reset_port() {
584  my_aggregator.execute(&op_data);
585  return;
586  }
587 
588 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
589  built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
590 
591  void internal_add_built_predecessor(predecessor_type &p) __TBB_override {
592  queueing_port_operation op_data(add_blt_pred);
593  op_data.pred = &p;
594  my_aggregator.execute(&op_data);
595  }
596 
597  void internal_delete_built_predecessor(predecessor_type &p) __TBB_override {
598  queueing_port_operation op_data(del_blt_pred);
599  op_data.pred = &p;
600  my_aggregator.execute(&op_data);
601  }
602 
603  size_t predecessor_count() __TBB_override {
604  queueing_port_operation op_data(blt_pred_cnt);
605  my_aggregator.execute(&op_data);
606  return op_data.cnt_val;
607  }
608 
609  void copy_predecessors(predecessor_list_type &l) __TBB_override {
610  queueing_port_operation op_data(blt_pred_cpy);
611  op_data.plist = &l;
612  my_aggregator.execute(&op_data);
613  }
614 
615  void extract_receiver() {
617  my_built_predecessors.receiver_extract(*this);
618  }
619 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
620 
624 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
625  if (f & rf_clear_edges)
626  my_built_predecessors.clear();
627 #endif
628  }
629 
630  private:
632 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
633  edge_container<predecessor_type> my_built_predecessors;
634 #endif
635  }; // queueing_port
636 
638 
639  template<typename K>
640  struct count_element {
642  size_t my_value;
643  };
644 
645  // method to access the key in the counting table
646  // the ref has already been removed from K
647  template< typename K >
650  const K& operator()(const table_item_type& v) { return v.my_key; }
651  };
652 
653  // the ports can have only one template parameter. We wrap the types needed in
654  // a traits type
655  template< class TraitsType >
657  public receiver<typename TraitsType::T>,
658  public hash_buffer< typename TraitsType::K, typename TraitsType::T, typename TraitsType::TtoK,
659  typename TraitsType::KHash > {
660  public:
661  typedef TraitsType traits;
663  typedef typename TraitsType::T input_type;
664  typedef typename TraitsType::K key_type;
666  typedef typename receiver<input_type>::predecessor_type predecessor_type;
667  typedef typename TraitsType::TtoK type_to_key_func_type;
668  typedef typename TraitsType::KHash hash_compare_type;
670 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
671  typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
672  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
673 #endif
674  private:
675 // ----------- Aggregator ------------
676  private:
678 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
679  , add_blt_pred, del_blt_pred, blt_pred_cnt, blt_pred_cpy
680 #endif
681  };
683 
684  class key_matching_port_operation : public aggregated_operation<key_matching_port_operation> {
685  public:
686  char type;
689 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
690  predecessor_type *pred;
691  size_t cnt_val;
692  predecessor_list_type *plist;
693 #endif
694  // constructor for value parameter
696  type(char(t)), my_val(e) {}
697  // constructor for pointer parameter
699  type(char(t)), my_arg(const_cast<input_type*>(p)) {}
700  // constructor with no parameter
702  };
703 
704  typedef internal::aggregating_functor<class_type, key_matching_port_operation> handler_type;
705  friend class internal::aggregating_functor<class_type, key_matching_port_operation>;
706  aggregator<handler_type, key_matching_port_operation> my_aggregator;
707 
710  while(op_list) {
711  current = op_list;
712  op_list = op_list->next;
713  switch(current->type) {
714  case try__put: {
715  bool was_inserted = this->insert_with_key(current->my_val);
716  // return failure if a duplicate insertion occurs
717  __TBB_store_with_release(current->status, was_inserted ? SUCCEEDED : FAILED);
718  }
719  break;
720  case get__item:
721  // use current_key from FE for item
722  if(!this->find_with_key(my_join->current_key, *(current->my_arg))) {
723  __TBB_ASSERT(false, "Failed to find item corresponding to current_key.");
724  }
725  __TBB_store_with_release(current->status, SUCCEEDED);
726  break;
727  case res_port:
728  // use current_key from FE for item
730  __TBB_store_with_release(current->status, SUCCEEDED);
731  break;
732 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
733  case add_blt_pred:
734  my_built_predecessors.add_edge(*(current->pred));
735  __TBB_store_with_release(current->status, SUCCEEDED);
736  break;
737  case del_blt_pred:
738  my_built_predecessors.delete_edge(*(current->pred));
739  __TBB_store_with_release(current->status, SUCCEEDED);
740  break;
741  case blt_pred_cnt:
742  current->cnt_val = my_built_predecessors.edge_count();
743  __TBB_store_with_release(current->status, SUCCEEDED);
744  break;
745  case blt_pred_cpy:
746  my_built_predecessors.copy_edges(*(current->plist));
747  __TBB_store_with_release(current->status, SUCCEEDED);
748  break;
749 #endif
750  }
751  }
752  }
753 // ------------ End Aggregator ---------------
754  protected:
755  template< typename R, typename B > friend class run_and_put_task;
756  template<typename X, typename Y> friend class internal::broadcast_cache;
757  template<typename X, typename Y> friend class internal::round_robin_cache;
760  task *rtask = NULL;
761  my_aggregator.execute(&op_data);
762  if(op_data.status == SUCCEEDED) {
763  rtask = my_join->increment_key_count((*(this->get_key_func()))(v), false); // may spawn
764  // rtask has to reflect the return status of the try_put
765  if(!rtask) rtask = SUCCESSFULLY_ENQUEUED;
766  }
767  return rtask;
768  }
769 
771  return my_join->graph_ref;
772  }
773 
774  public:
775 
777  my_join = NULL;
778  my_aggregator.initialize_handler(handler_type(this));
779  }
780 
781  // copy constructor
782  key_matching_port(const key_matching_port& /*other*/) : receiver<input_type>(), buffer_type() {
783  my_join = NULL;
784  my_aggregator.initialize_handler(handler_type(this));
785  }
786 
788 
790  my_join = dynamic_cast<matching_forwarding_base<key_type>*>(join);
791  }
792 
794 
796 
797  bool get_item( input_type &v ) {
798  // aggregator uses current_key from FE for Key
800  my_aggregator.execute(&op_data);
801  return op_data.status == SUCCEEDED;
802  }
803 
804 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
805  built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
806 
807  void internal_add_built_predecessor(predecessor_type &p) __TBB_override {
808  key_matching_port_operation op_data(add_blt_pred);
809  op_data.pred = &p;
810  my_aggregator.execute(&op_data);
811  }
812 
813  void internal_delete_built_predecessor(predecessor_type &p) __TBB_override {
814  key_matching_port_operation op_data(del_blt_pred);
815  op_data.pred = &p;
816  my_aggregator.execute(&op_data);
817  }
818 
819  size_t predecessor_count() __TBB_override {
820  key_matching_port_operation op_data(blt_pred_cnt);
821  my_aggregator.execute(&op_data);
822  return op_data.cnt_val;
823  }
824 
825  void copy_predecessors(predecessor_list_type &l) __TBB_override {
826  key_matching_port_operation op_data(blt_pred_cpy);
827  op_data.plist = &l;
828  my_aggregator.execute(&op_data);
829  }
830 #endif
831 
832  // reset_port is called when item is accepted by successor, but
833  // is initiated by join_node.
834  void reset_port() {
836  my_aggregator.execute(&op_data);
837  return;
838  }
839 
840 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
841  void extract_receiver() {
843  my_built_predecessors.receiver_extract(*this);
844  }
845 #endif
849 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
850  if (f & rf_clear_edges)
851  my_built_predecessors.clear();
852 #endif
853  }
854 
855  private:
856  // my_join forwarding base used to count number of inputs that
857  // received key.
859 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
860  edge_container<predecessor_type> my_built_predecessors;
861 #endif
862  }; // key_matching_port
863 
864  using namespace graph_policy_namespace;
865 
866  template<typename JP, typename InputTuple, typename OutputTuple>
868 
870  template<typename JP, typename InputTuple, typename OutputTuple>
872 
873  template<typename InputTuple, typename OutputTuple>
874  class join_node_FE<reserving, InputTuple, OutputTuple> : public forwarding_base {
875  public:
877  typedef OutputTuple output_type;
878  typedef InputTuple input_type;
880 
881  join_node_FE(graph &g) : forwarding_base(g), my_node(NULL) {
882  ports_with_no_inputs = N;
883  join_helper<N>::set_join_node_pointer(my_inputs, this);
884  }
885 
886  join_node_FE(const join_node_FE& other) : forwarding_base((other.forwarding_base::graph_ref)), my_node(NULL) {
887  ports_with_no_inputs = N;
888  join_helper<N>::set_join_node_pointer(my_inputs, this);
889  }
890 
891  void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
892 
894  ++ports_with_no_inputs;
895  }
896 
897  // if all input_ports have predecessors, spawn forward to try and consume tuples
899  if(ports_with_no_inputs.fetch_and_decrement() == 1) {
900  if(internal::is_graph_active(this->graph_ref)) {
901  task *rtask = new ( task::allocate_additional_child_of( *(this->graph_ref.root_task()) ) )
903  if(!handle_task) return rtask;
904  internal::spawn_in_graph_arena(this->graph_ref, *rtask);
905  }
906  }
907  return NULL;
908  }
909 
910  input_type &input_ports() { return my_inputs; }
911 
912  protected:
913 
914  void reset( reset_flags f) {
915  // called outside of parallel contexts
916  ports_with_no_inputs = N;
917  join_helper<N>::reset_inputs(my_inputs, f);
918  }
919 
920 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
921  void extract( ) {
922  // called outside of parallel contexts
923  ports_with_no_inputs = N;
925  }
926 #endif
927 
928  // all methods on input ports should be called under mutual exclusion from join_node_base.
929 
931  return !ports_with_no_inputs;
932  }
933 
935  if(ports_with_no_inputs) return false;
936  return join_helper<N>::reserve(my_inputs, out);
937  }
938 
939  void tuple_accepted() {
941  }
942  void tuple_rejected() {
944  }
945 
948  atomic<size_t> ports_with_no_inputs;
949  }; // join_node_FE<reserving, ... >
950 
951  template<typename InputTuple, typename OutputTuple>
952  class join_node_FE<queueing, InputTuple, OutputTuple> : public forwarding_base {
953  public:
955  typedef OutputTuple output_type;
956  typedef InputTuple input_type;
958 
959  join_node_FE(graph &g) : forwarding_base(g), my_node(NULL) {
960  ports_with_no_items = N;
961  join_helper<N>::set_join_node_pointer(my_inputs, this);
962  }
963 
964  join_node_FE(const join_node_FE& other) : forwarding_base((other.forwarding_base::graph_ref)), my_node(NULL) {
965  ports_with_no_items = N;
966  join_helper<N>::set_join_node_pointer(my_inputs, this);
967  }
968 
969  // needed for forwarding
970  void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
971 
973  ports_with_no_items = N;
974  }
975 
976  // if all input_ports have items, spawn forward to try and consume tuples
978  {
979  if(ports_with_no_items.fetch_and_decrement() == 1) {
980  if(internal::is_graph_active(this->graph_ref)) {
981  task *rtask = new ( task::allocate_additional_child_of( *(this->graph_ref.root_task()) ) )
983  if(!handle_task) return rtask;
984  internal::spawn_in_graph_arena(this->graph_ref, *rtask);
985  }
986  }
987  return NULL;
988  }
989 
990  void increment_port_count() __TBB_override { __TBB_ASSERT(false, NULL); } // should never be called
991 
992  input_type &input_ports() { return my_inputs; }
993 
994  protected:
995 
996  void reset( reset_flags f) {
997  reset_port_count();
998  join_helper<N>::reset_inputs(my_inputs, f );
999  }
1000 
1001 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1002  void extract() {
1003  reset_port_count();
1004  join_helper<N>::extract_inputs(my_inputs);
1005  }
1006 #endif
1007  // all methods on input ports should be called under mutual exclusion from join_node_base.
1008 
1010  return !ports_with_no_items;
1011  }
1012 
1014  if(ports_with_no_items) return false;
1015  return join_helper<N>::get_items(my_inputs, out);
1016  }
1017 
1019  reset_port_count();
1020  join_helper<N>::reset_ports(my_inputs);
1021  }
1023  // nothing to do.
1024  }
1025 
1028  atomic<size_t> ports_with_no_items;
1029  }; // join_node_FE<queueing, ...>
1030 
1031  // key_matching join front-end.
1032  template<typename InputTuple, typename OutputTuple, typename K, typename KHash>
1033  class join_node_FE<key_matching<K,KHash>, InputTuple, OutputTuple> : public matching_forwarding_base<K>,
1034  // buffer of key value counts
1035  public hash_buffer< // typedefed below to key_to_count_buffer_type
1036  typename tbb::internal::strip<K>::type&, // force ref type on K
1037  count_element<typename tbb::internal::strip<K>::type>,
1038  internal::type_to_key_function_body<
1039  count_element<typename tbb::internal::strip<K>::type>,
1040  typename tbb::internal::strip<K>::type& >,
1041  KHash >,
1042  // buffer of output items
1043  public item_buffer<OutputTuple> {
1044  public:
1046  typedef OutputTuple output_type;
1047  typedef InputTuple input_type;
1048  typedef K key_type;
1050  typedef KHash key_hash_compare;
1051  // must use K without ref.
1053  // method that lets us refer to the key of this type.
1057  // this is the type of the special table that keeps track of the number of discrete
1058  // elements corresponding to each key that we've seen.
1062  typedef join_node_base<key_matching<key_type,key_hash_compare>, InputTuple, OutputTuple> base_node_type; // for forwarding
1064 
1065 // ----------- Aggregator ------------
1066  // the aggregator is only needed to serialize the access to the hash table.
1067  // and the output_buffer_type base class
1068  private:
1069  enum op_type { res_count, inc_count, may_succeed, try_make };
1072 
1073  class key_matching_FE_operation : public aggregated_operation<key_matching_FE_operation> {
1074  public:
1075  char type;
1080  // constructor for value parameter
1081  key_matching_FE_operation(const unref_key_type& e , bool q_task , op_type t) : type(char(t)), my_val(e),
1082  my_output(NULL), bypass_t(NULL), enqueue_task(q_task) {}
1083  key_matching_FE_operation(output_type *p, op_type t) : type(char(t)), my_output(p), bypass_t(NULL),
1084  enqueue_task(true) {}
1085  // constructor with no parameter
1086  key_matching_FE_operation(op_type t) : type(char(t)), my_output(NULL), bypass_t(NULL), enqueue_task(true) {}
1087  };
1088 
1089  typedef internal::aggregating_functor<class_type, key_matching_FE_operation> handler_type;
1090  friend class internal::aggregating_functor<class_type, key_matching_FE_operation>;
1091  aggregator<handler_type, key_matching_FE_operation> my_aggregator;
1092 
1093  // called from aggregator, so serialized
1094  // returns a task pointer if the a task would have been enqueued but we asked that
1095  // it be returned. Otherwise returns NULL.
1096  task * fill_output_buffer(unref_key_type &t, bool should_enqueue, bool handle_task) {
1097  output_type l_out;
1098  task *rtask = NULL;
1099  bool do_fwd = should_enqueue && this->buffer_empty() && internal::is_graph_active(this->graph_ref);
1100  this->current_key = t;
1101  this->delete_with_key(this->current_key); // remove the key
1102  if(join_helper<N>::get_items(my_inputs, l_out)) { // <== call back
1103  this->push_back(l_out);
1104  if(do_fwd) { // we enqueue if receiving an item from predecessor, not if successor asks for item
1105  rtask = new ( task::allocate_additional_child_of( *(this->graph_ref.root_task()) ) )
1107  if(handle_task) {
1108  internal::spawn_in_graph_arena(this->graph_ref, *rtask);
1109  rtask = NULL;
1110  }
1111  do_fwd = false;
1112  }
1113  // retire the input values
1114  join_helper<N>::reset_ports(my_inputs); // <== call back
1115  }
1116  else {
1117  __TBB_ASSERT(false, "should have had something to push");
1118  }
1119  return rtask;
1120  }
1121 
1122  void handle_operations(key_matching_FE_operation* op_list) {
1123  key_matching_FE_operation *current;
1124  while(op_list) {
1125  current = op_list;
1126  op_list = op_list->next;
1127  switch(current->type) {
1128  case res_count: // called from BE
1129  {
1130  this->destroy_front();
1131  __TBB_store_with_release(current->status, SUCCEEDED);
1132  }
1133  break;
1134  case inc_count: { // called from input ports
1135  count_element_type *p = 0;
1136  unref_key_type &t = current->my_val;
1137  bool do_enqueue = current->enqueue_task;
1138  if(!(this->find_ref_with_key(t,p))) {
1139  count_element_type ev;
1140  ev.my_key = t;
1141  ev.my_value = 0;
1142  this->insert_with_key(ev);
1143  if(!(this->find_ref_with_key(t,p))) {
1144  __TBB_ASSERT(false, "should find key after inserting it");
1145  }
1146  }
1147  if(++(p->my_value) == size_t(N)) {
1148  task *rtask = fill_output_buffer(t, true, do_enqueue);
1149  __TBB_ASSERT(!rtask || !do_enqueue, "task should not be returned");
1150  current->bypass_t = rtask;
1151  }
1152  }
1153  __TBB_store_with_release(current->status, SUCCEEDED);
1154  break;
1155  case may_succeed: // called from BE
1156  __TBB_store_with_release(current->status, this->buffer_empty() ? FAILED : SUCCEEDED);
1157  break;
1158  case try_make: // called from BE
1159  if(this->buffer_empty()) {
1160  __TBB_store_with_release(current->status, FAILED);
1161  }
1162  else {
1163  *(current->my_output) = this->front();
1164  __TBB_store_with_release(current->status, SUCCEEDED);
1165  }
1166  break;
1167  }
1168  }
1169  }
1170 // ------------ End Aggregator ---------------
1171 
1172  public:
1173  template<typename FunctionTuple>
1174  join_node_FE(graph &g, FunctionTuple &TtoK_funcs) : forwarding_base_type(g), my_node(NULL) {
1175  join_helper<N>::set_join_node_pointer(my_inputs, this);
1176  join_helper<N>::set_key_functors(my_inputs, TtoK_funcs);
1177  my_aggregator.initialize_handler(handler_type(this));
1179  this->set_key_func(cfb);
1180  }
1181 
1183  output_buffer_type() {
1184  my_node = NULL;
1185  join_helper<N>::set_join_node_pointer(my_inputs, this);
1186  join_helper<N>::copy_key_functors(my_inputs, const_cast<input_type &>(other.my_inputs));
1187  my_aggregator.initialize_handler(handler_type(this));
1189  this->set_key_func(cfb);
1190  }
1191 
1192  // needed for forwarding
1193  void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
1194 
1195  void reset_port_count() { // called from BE
1196  key_matching_FE_operation op_data(res_count);
1197  my_aggregator.execute(&op_data);
1198  return;
1199  }
1200 
1201  // if all input_ports have items, spawn forward to try and consume tuples
1202  // return a task if we are asked and did create one.
1203  task *increment_key_count(unref_key_type const & t, bool handle_task) __TBB_override { // called from input_ports
1204  key_matching_FE_operation op_data(t, handle_task, inc_count);
1205  my_aggregator.execute(&op_data);
1206  return op_data.bypass_t;
1207  }
1208 
1209  task *decrement_port_count(bool /*handle_task*/) __TBB_override { __TBB_ASSERT(false, NULL); return NULL; }
1210 
1211  void increment_port_count() __TBB_override { __TBB_ASSERT(false, NULL); } // should never be called
1212 
1213  input_type &input_ports() { return my_inputs; }
1214 
1215  protected:
1216 
1217  void reset( reset_flags f ) {
1218  // called outside of parallel contexts
1219  join_helper<N>::reset_inputs(my_inputs, f);
1220 
1221  key_to_count_buffer_type::reset();
1222  output_buffer_type::reset();
1223  }
1224 
1225 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1226  void extract() {
1227  // called outside of parallel contexts
1228  join_helper<N>::extract_inputs(my_inputs);
1229  key_to_count_buffer_type::reset(); // have to reset the tag counts
1230  output_buffer_type::reset(); // also the queue of outputs
1231  // my_node->current_tag = NO_TAG;
1232  }
1233 #endif
1234  // all methods on input ports should be called under mutual exclusion from join_node_base.
1235 
1236  bool tuple_build_may_succeed() { // called from back-end
1237  key_matching_FE_operation op_data(may_succeed);
1238  my_aggregator.execute(&op_data);
1239  return op_data.status == SUCCEEDED;
1240  }
1241 
1242  // cannot lock while calling back to input_ports. current_key will only be set
1243  // and reset under the aggregator, so it will remain consistent.
1245  key_matching_FE_operation op_data(&out,try_make);
1246  my_aggregator.execute(&op_data);
1247  return op_data.status == SUCCEEDED;
1248  }
1249 
1251  reset_port_count(); // reset current_key after ports reset.
1252  }
1253 
1255  // nothing to do.
1256  }
1257 
1258  input_type my_inputs; // input ports
1260  }; // join_node_FE<key_matching<K,KHash>, InputTuple, OutputTuple>
1261 
1263  template<typename JP, typename InputTuple, typename OutputTuple>
1264  class join_node_base : public graph_node, public join_node_FE<JP, InputTuple, OutputTuple>,
1265  public sender<OutputTuple> {
1266  protected:
1267  using graph_node::my_graph;
1268  public:
1269  typedef OutputTuple output_type;
1270 
1271  typedef typename sender<output_type>::successor_type successor_type;
1273  using input_ports_type::tuple_build_may_succeed;
1274  using input_ports_type::try_to_make_tuple;
1275  using input_ports_type::tuple_accepted;
1276  using input_ports_type::tuple_rejected;
1277 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1278  typedef typename sender<output_type>::built_successors_type built_successors_type;
1279  typedef typename sender<output_type>::successor_list_type successor_list_type;
1280 #endif
1281 
1282  private:
1283  // ----------- Aggregator ------------
1284  enum op_type { reg_succ, rem_succ, try__get, do_fwrd, do_fwrd_bypass
1285 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1286  , add_blt_succ, del_blt_succ, blt_succ_cnt, blt_succ_cpy
1287 #endif
1288  };
1291 
1292  class join_node_base_operation : public aggregated_operation<join_node_base_operation> {
1293  public:
1294  char type;
1295  union {
1298 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1299  size_t cnt_val;
1300  successor_list_type *slist;
1301 #endif
1302  };
1305  my_arg(const_cast<output_type*>(&e)), bypass_t(NULL) {}
1307  my_succ(const_cast<successor_type *>(&s)), bypass_t(NULL) {}
1308  join_node_base_operation(op_type t) : type(char(t)), bypass_t(NULL) {}
1309  };
1310 
1311  typedef internal::aggregating_functor<class_type, join_node_base_operation> handler_type;
1312  friend class internal::aggregating_functor<class_type, join_node_base_operation>;
1314  aggregator<handler_type, join_node_base_operation> my_aggregator;
1315 
1317  join_node_base_operation *current;
1318  while(op_list) {
1319  current = op_list;
1320  op_list = op_list->next;
1321  switch(current->type) {
1322  case reg_succ: {
1323  my_successors.register_successor(*(current->my_succ));
1324  if(tuple_build_may_succeed() && !forwarder_busy && internal::is_graph_active(my_graph)) {
1325  task *rtask = new ( task::allocate_additional_child_of(*(my_graph.root_task())) )
1328  internal::spawn_in_graph_arena(my_graph, *rtask);
1329  forwarder_busy = true;
1330  }
1331  __TBB_store_with_release(current->status, SUCCEEDED);
1332  }
1333  break;
1334  case rem_succ:
1335  my_successors.remove_successor(*(current->my_succ));
1336  __TBB_store_with_release(current->status, SUCCEEDED);
1337  break;
1338  case try__get:
1339  if(tuple_build_may_succeed()) {
1340  if(try_to_make_tuple(*(current->my_arg))) {
1341  tuple_accepted();
1342  __TBB_store_with_release(current->status, SUCCEEDED);
1343  }
1344  else __TBB_store_with_release(current->status, FAILED);
1345  }
1346  else __TBB_store_with_release(current->status, FAILED);
1347  break;
1348  case do_fwrd_bypass: {
1349  bool build_succeeded;
1350  task *last_task = NULL;
1351  output_type out;
1352  if(tuple_build_may_succeed()) { // checks output queue of FE
1353  do {
1354  build_succeeded = try_to_make_tuple(out); // fetch front_end of queue
1355  if(build_succeeded) {
1356  task *new_task = my_successors.try_put_task(out);
1357  last_task = combine_tasks(my_graph, last_task, new_task);
1358  if(new_task) {
1359  tuple_accepted();
1360  }
1361  else {
1362  tuple_rejected();
1363  build_succeeded = false;
1364  }
1365  }
1366  } while(build_succeeded);
1367  }
1368  current->bypass_t = last_task;
1369  __TBB_store_with_release(current->status, SUCCEEDED);
1370  forwarder_busy = false;
1371  }
1372  break;
1373 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1374  case add_blt_succ:
1375  my_successors.internal_add_built_successor(*(current->my_succ));
1376  __TBB_store_with_release(current->status, SUCCEEDED);
1377  break;
1378  case del_blt_succ:
1379  my_successors.internal_delete_built_successor(*(current->my_succ));
1380  __TBB_store_with_release(current->status, SUCCEEDED);
1381  break;
1382  case blt_succ_cnt:
1383  current->cnt_val = my_successors.successor_count();
1384  __TBB_store_with_release(current->status, SUCCEEDED);
1385  break;
1386  case blt_succ_cpy:
1387  my_successors.copy_successors(*(current->slist));
1388  __TBB_store_with_release(current->status, SUCCEEDED);
1389  break;
1390 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1391  }
1392  }
1393  }
1394  // ---------- end aggregator -----------
1395  public:
1396  join_node_base(graph &g) : graph_node(g), input_ports_type(g), forwarder_busy(false) {
1397  my_successors.set_owner(this);
1398  input_ports_type::set_my_node(this);
1399  my_aggregator.initialize_handler(handler_type(this));
1400  }
1401 
1403  graph_node(other.graph_node::my_graph), input_ports_type(other),
1404  sender<OutputTuple>(), forwarder_busy(false), my_successors() {
1405  my_successors.set_owner(this);
1406  input_ports_type::set_my_node(this);
1407  my_aggregator.initialize_handler(handler_type(this));
1408  }
1409 
1410  template<typename FunctionTuple>
1411  join_node_base(graph &g, FunctionTuple f) : graph_node(g), input_ports_type(g, f), forwarder_busy(false) {
1412  my_successors.set_owner(this);
1413  input_ports_type::set_my_node(this);
1414  my_aggregator.initialize_handler(handler_type(this));
1415  }
1416 
1418  join_node_base_operation op_data(r, reg_succ);
1419  my_aggregator.execute(&op_data);
1420  return op_data.status == SUCCEEDED;
1421  }
1422 
1424  join_node_base_operation op_data(r, rem_succ);
1425  my_aggregator.execute(&op_data);
1426  return op_data.status == SUCCEEDED;
1427  }
1428 
1430  join_node_base_operation op_data(v, try__get);
1431  my_aggregator.execute(&op_data);
1432  return op_data.status == SUCCEEDED;
1433  }
1434 
1435 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1436  built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
1437 
1438  void internal_add_built_successor( successor_type &r) __TBB_override {
1439  join_node_base_operation op_data(r, add_blt_succ);
1440  my_aggregator.execute(&op_data);
1441  }
1442 
1443  void internal_delete_built_successor( successor_type &r) __TBB_override {
1444  join_node_base_operation op_data(r, del_blt_succ);
1445  my_aggregator.execute(&op_data);
1446  }
1447 
1448  size_t successor_count() __TBB_override {
1449  join_node_base_operation op_data(blt_succ_cnt);
1450  my_aggregator.execute(&op_data);
1451  return op_data.cnt_val;
1452  }
1453 
1454  void copy_successors(successor_list_type &l) __TBB_override {
1455  join_node_base_operation op_data(blt_succ_cpy);
1456  op_data.slist = &l;
1457  my_aggregator.execute(&op_data);
1458  }
1459 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1460 
1461 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1462  void extract() __TBB_override {
1463  input_ports_type::extract();
1464  my_successors.built_successors().sender_extract(*this);
1465  }
1466 #endif
1467 
1468  protected:
1469 
1471  input_ports_type::reset(f);
1472  if(f & rf_clear_edges) my_successors.clear();
1473  }
1474 
1475  private:
1477 
1478  friend class forward_task_bypass< join_node_base<JP, InputTuple, OutputTuple> >;
1480  join_node_base_operation op_data(do_fwrd_bypass);
1481  my_aggregator.execute(&op_data);
1482  return op_data.bypass_t;
1483  }
1484 
1485  }; // join_node_base
1486 
1487  // join base class type generator
1488  template<int N, template<class> class PT, typename OutputTuple, typename JP>
1489  struct join_base {
1491  };
1492 
1493  template<int N, typename OutputTuple, typename K, typename KHash>
1494  struct join_base<N, key_matching_port, OutputTuple, key_matching<K,KHash> > {
1496  typedef K key_type;
1497  typedef KHash key_hash_compare;
1498  typedef typename internal::join_node_base< key_traits_type,
1499  // ports type
1501  OutputTuple > type;
1502  };
1503 
1505  // using tuple_element. The class PT is the port type (reserving_port, queueing_port, key_matching_port)
1506  // and should match the typename.
1507 
1508  template<int N, template<class> class PT, typename OutputTuple, typename JP>
1509  class unfolded_join_node : public join_base<N,PT,OutputTuple,JP>::type {
1510  public:
1512  typedef OutputTuple output_type;
1513  private:
1515  public:
1516  unfolded_join_node(graph &g) : base_type(g) {}
1518  };
1519 
1520 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1521  template <typename K, typename T>
1522  struct key_from_message_body {
1523  K operator()(const T& t) const {
1525  return key_from_message<K>(t);
1526  }
1527  };
1528  // Adds const to reference type
1529  template <typename K, typename T>
1530  struct key_from_message_body<K&,T> {
1531  const K& operator()(const T& t) const {
1533  return key_from_message<const K&>(t);
1534  }
1535  };
1536 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1537  // key_matching unfolded_join_node. This must be a separate specialization because the constructors
1538  // differ.
1539 
1540  template<typename OutputTuple, typename K, typename KHash>
1541  class unfolded_join_node<2,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1542  join_base<2,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1545  public:
1547  typedef OutputTuple output_type;
1548  private:
1552  typedef typename tbb::flow::tuple< f0_p, f1_p > func_initializer_type;
1553  public:
1554 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1555  unfolded_join_node(graph &g) : base_type(g,
1557  new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1558  new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>())
1559  ) ) {
1560  }
1561 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1562  template<typename Body0, typename Body1>
1563  unfolded_join_node(graph &g, Body0 body0, Body1 body1) : base_type(g,
1565  new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1566  new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1)
1567  ) ) {
1568  __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 2, "wrong number of body initializers");
1569  }
1571  };
1572 
1573  template<typename OutputTuple, typename K, typename KHash>
1574  class unfolded_join_node<3,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1575  join_base<3,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1579  public:
1581  typedef OutputTuple output_type;
1582  private:
1587  typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p > func_initializer_type;
1588  public:
1589 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1590  unfolded_join_node(graph &g) : base_type(g,
1592  new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1593  new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1594  new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>())
1595  ) ) {
1596  }
1597 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1598  template<typename Body0, typename Body1, typename Body2>
1599  unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2) : base_type(g,
1601  new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1602  new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1),
1603  new internal::type_to_key_function_body_leaf<T2, K, Body2>(body2)
1604  ) ) {
1605  __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 3, "wrong number of body initializers");
1606  }
1608  };
1609 
1610  template<typename OutputTuple, typename K, typename KHash>
1611  class unfolded_join_node<4,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1612  join_base<4,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1617  public:
1619  typedef OutputTuple output_type;
1620  private:
1626  typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p > func_initializer_type;
1627  public:
1628 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1629  unfolded_join_node(graph &g) : base_type(g,
1631  new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1632  new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1633  new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1634  new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>())
1635  ) ) {
1636  }
1637 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1638  template<typename Body0, typename Body1, typename Body2, typename Body3>
1639  unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3) : base_type(g,
1641  new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1642  new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1),
1643  new internal::type_to_key_function_body_leaf<T2, K, Body2>(body2),
1644  new internal::type_to_key_function_body_leaf<T3, K, Body3>(body3)
1645  ) ) {
1646  __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 4, "wrong number of body initializers");
1647  }
1649  };
1650 
1651  template<typename OutputTuple, typename K, typename KHash>
1652  class unfolded_join_node<5,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1653  join_base<5,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1659  public:
1661  typedef OutputTuple output_type;
1662  private:
1669  typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p > func_initializer_type;
1670  public:
1671 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1672  unfolded_join_node(graph &g) : base_type(g,
1674  new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1675  new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1676  new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1677  new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1678  new internal::type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>())
1679  ) ) {
1680  }
1681 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1682  template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4>
1683  unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4) : base_type(g,
1685  new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1686  new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1),
1687  new internal::type_to_key_function_body_leaf<T2, K, Body2>(body2),
1688  new internal::type_to_key_function_body_leaf<T3, K, Body3>(body3),
1689  new internal::type_to_key_function_body_leaf<T4, K, Body4>(body4)
1690  ) ) {
1691  __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 5, "wrong number of body initializers");
1692  }
1694  };
1695 
1696 #if __TBB_VARIADIC_MAX >= 6
1697  template<typename OutputTuple, typename K, typename KHash>
1698  class unfolded_join_node<6,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1699  join_base<6,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1706  public:
1707  typedef typename wrap_key_tuple_elements<6,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1708  typedef OutputTuple output_type;
1709  private:
1710  typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1711  typedef typename internal::type_to_key_function_body<T0, K> *f0_p;
1712  typedef typename internal::type_to_key_function_body<T1, K> *f1_p;
1713  typedef typename internal::type_to_key_function_body<T2, K> *f2_p;
1714  typedef typename internal::type_to_key_function_body<T3, K> *f3_p;
1715  typedef typename internal::type_to_key_function_body<T4, K> *f4_p;
1716  typedef typename internal::type_to_key_function_body<T5, K> *f5_p;
1717  typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p > func_initializer_type;
1718  public:
1719 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1720  unfolded_join_node(graph &g) : base_type(g,
1721  func_initializer_type(
1722  new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1723  new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1724  new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1725  new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1726  new internal::type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1727  new internal::type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>())
1728  ) ) {
1729  }
1730 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1731  template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4, typename Body5>
1732  unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4, Body5 body5)
1733  : base_type(g, func_initializer_type(
1734  new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1735  new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1),
1736  new internal::type_to_key_function_body_leaf<T2, K, Body2>(body2),
1737  new internal::type_to_key_function_body_leaf<T3, K, Body3>(body3),
1738  new internal::type_to_key_function_body_leaf<T4, K, Body4>(body4),
1739  new internal::type_to_key_function_body_leaf<T5, K, Body5>(body5)
1740  ) ) {
1741  __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 6, "wrong number of body initializers");
1742  }
1743  unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1744  };
1745 #endif
1746 
1747 #if __TBB_VARIADIC_MAX >= 7
1748  template<typename OutputTuple, typename K, typename KHash>
1749  class unfolded_join_node<7,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1750  join_base<7,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1758  public:
1759  typedef typename wrap_key_tuple_elements<7,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1760  typedef OutputTuple output_type;
1761  private:
1762  typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1763  typedef typename internal::type_to_key_function_body<T0, K> *f0_p;
1764  typedef typename internal::type_to_key_function_body<T1, K> *f1_p;
1765  typedef typename internal::type_to_key_function_body<T2, K> *f2_p;
1766  typedef typename internal::type_to_key_function_body<T3, K> *f3_p;
1767  typedef typename internal::type_to_key_function_body<T4, K> *f4_p;
1768  typedef typename internal::type_to_key_function_body<T5, K> *f5_p;
1769  typedef typename internal::type_to_key_function_body<T6, K> *f6_p;
1770  typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p > func_initializer_type;
1771  public:
1772 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1773  unfolded_join_node(graph &g) : base_type(g,
1774  func_initializer_type(
1775  new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1776  new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1777  new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1778  new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1779  new internal::type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1780  new internal::type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1781  new internal::type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>())
1782  ) ) {
1783  }
1784 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1785  template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1786  typename Body5, typename Body6>
1787  unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1788  Body5 body5, Body6 body6) : base_type(g, func_initializer_type(
1789  new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1790  new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1),
1791  new internal::type_to_key_function_body_leaf<T2, K, Body2>(body2),
1792  new internal::type_to_key_function_body_leaf<T3, K, Body3>(body3),
1793  new internal::type_to_key_function_body_leaf<T4, K, Body4>(body4),
1794  new internal::type_to_key_function_body_leaf<T5, K, Body5>(body5),
1795  new internal::type_to_key_function_body_leaf<T6, K, Body6>(body6)
1796  ) ) {
1797  __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 7, "wrong number of body initializers");
1798  }
1799  unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1800  };
1801 #endif
1802 
1803 #if __TBB_VARIADIC_MAX >= 8
1804  template<typename OutputTuple, typename K, typename KHash>
1805  class unfolded_join_node<8,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1806  join_base<8,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1815  public:
1816  typedef typename wrap_key_tuple_elements<8,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1817  typedef OutputTuple output_type;
1818  private:
1819  typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1820  typedef typename internal::type_to_key_function_body<T0, K> *f0_p;
1821  typedef typename internal::type_to_key_function_body<T1, K> *f1_p;
1822  typedef typename internal::type_to_key_function_body<T2, K> *f2_p;
1823  typedef typename internal::type_to_key_function_body<T3, K> *f3_p;
1824  typedef typename internal::type_to_key_function_body<T4, K> *f4_p;
1825  typedef typename internal::type_to_key_function_body<T5, K> *f5_p;
1826  typedef typename internal::type_to_key_function_body<T6, K> *f6_p;
1827  typedef typename internal::type_to_key_function_body<T7, K> *f7_p;
1828  typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p > func_initializer_type;
1829  public:
1830 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1831  unfolded_join_node(graph &g) : base_type(g,
1832  func_initializer_type(
1833  new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1834  new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1835  new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1836  new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1837  new internal::type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1838  new internal::type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1839  new internal::type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>()),
1840  new internal::type_to_key_function_body_leaf<T7, K, key_from_message_body<K,T7> >(key_from_message_body<K,T7>())
1841  ) ) {
1842  }
1843 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1844  template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1845  typename Body5, typename Body6, typename Body7>
1846  unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1847  Body5 body5, Body6 body6, Body7 body7) : base_type(g, func_initializer_type(
1848  new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1849  new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1),
1850  new internal::type_to_key_function_body_leaf<T2, K, Body2>(body2),
1851  new internal::type_to_key_function_body_leaf<T3, K, Body3>(body3),
1852  new internal::type_to_key_function_body_leaf<T4, K, Body4>(body4),
1853  new internal::type_to_key_function_body_leaf<T5, K, Body5>(body5),
1854  new internal::type_to_key_function_body_leaf<T6, K, Body6>(body6),
1855  new internal::type_to_key_function_body_leaf<T7, K, Body7>(body7)
1856  ) ) {
1857  __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 8, "wrong number of body initializers");
1858  }
1859  unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1860  };
1861 #endif
1862 
1863 #if __TBB_VARIADIC_MAX >= 9
1864  template<typename OutputTuple, typename K, typename KHash>
1865  class unfolded_join_node<9,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1866  join_base<9,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1876  public:
1877  typedef typename wrap_key_tuple_elements<9,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1878  typedef OutputTuple output_type;
1879  private:
1880  typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1881  typedef typename internal::type_to_key_function_body<T0, K> *f0_p;
1882  typedef typename internal::type_to_key_function_body<T1, K> *f1_p;
1883  typedef typename internal::type_to_key_function_body<T2, K> *f2_p;
1884  typedef typename internal::type_to_key_function_body<T3, K> *f3_p;
1885  typedef typename internal::type_to_key_function_body<T4, K> *f4_p;
1886  typedef typename internal::type_to_key_function_body<T5, K> *f5_p;
1887  typedef typename internal::type_to_key_function_body<T6, K> *f6_p;
1888  typedef typename internal::type_to_key_function_body<T7, K> *f7_p;
1889  typedef typename internal::type_to_key_function_body<T8, K> *f8_p;
1890  typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p, f8_p > func_initializer_type;
1891  public:
1892 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1893  unfolded_join_node(graph &g) : base_type(g,
1894  func_initializer_type(
1895  new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1896  new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1897  new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1898  new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1899  new internal::type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1900  new internal::type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1901  new internal::type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>()),
1902  new internal::type_to_key_function_body_leaf<T7, K, key_from_message_body<K,T7> >(key_from_message_body<K,T7>()),
1903  new internal::type_to_key_function_body_leaf<T8, K, key_from_message_body<K,T8> >(key_from_message_body<K,T8>())
1904  ) ) {
1905  }
1906 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1907  template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1908  typename Body5, typename Body6, typename Body7, typename Body8>
1909  unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1910  Body5 body5, Body6 body6, Body7 body7, Body8 body8) : base_type(g, func_initializer_type(
1911  new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1912  new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1),
1913  new internal::type_to_key_function_body_leaf<T2, K, Body2>(body2),
1914  new internal::type_to_key_function_body_leaf<T3, K, Body3>(body3),
1915  new internal::type_to_key_function_body_leaf<T4, K, Body4>(body4),
1916  new internal::type_to_key_function_body_leaf<T5, K, Body5>(body5),
1917  new internal::type_to_key_function_body_leaf<T6, K, Body6>(body6),
1918  new internal::type_to_key_function_body_leaf<T7, K, Body7>(body7),
1919  new internal::type_to_key_function_body_leaf<T8, K, Body8>(body8)
1920  ) ) {
1921  __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 9, "wrong number of body initializers");
1922  }
1923  unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1924  };
1925 #endif
1926 
1927 #if __TBB_VARIADIC_MAX >= 10
1928  template<typename OutputTuple, typename K, typename KHash>
1929  class unfolded_join_node<10,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1930  join_base<10,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1941  public:
1942  typedef typename wrap_key_tuple_elements<10,key_matching_port,key_matching<K,KHash>,OutputTuple>::type input_ports_type;
1943  typedef OutputTuple output_type;
1944  private:
1945  typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1946  typedef typename internal::type_to_key_function_body<T0, K> *f0_p;
1947  typedef typename internal::type_to_key_function_body<T1, K> *f1_p;
1948  typedef typename internal::type_to_key_function_body<T2, K> *f2_p;
1949  typedef typename internal::type_to_key_function_body<T3, K> *f3_p;
1950  typedef typename internal::type_to_key_function_body<T4, K> *f4_p;
1951  typedef typename internal::type_to_key_function_body<T5, K> *f5_p;
1952  typedef typename internal::type_to_key_function_body<T6, K> *f6_p;
1953  typedef typename internal::type_to_key_function_body<T7, K> *f7_p;
1954  typedef typename internal::type_to_key_function_body<T8, K> *f8_p;
1955  typedef typename internal::type_to_key_function_body<T9, K> *f9_p;
1956  typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p, f8_p, f9_p > func_initializer_type;
1957  public:
1958 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1959  unfolded_join_node(graph &g) : base_type(g,
1960  func_initializer_type(
1961  new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1962  new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1963  new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1964  new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1965  new internal::type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1966  new internal::type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1967  new internal::type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>()),
1968  new internal::type_to_key_function_body_leaf<T7, K, key_from_message_body<K,T7> >(key_from_message_body<K,T7>()),
1969  new internal::type_to_key_function_body_leaf<T8, K, key_from_message_body<K,T8> >(key_from_message_body<K,T8>()),
1970  new internal::type_to_key_function_body_leaf<T9, K, key_from_message_body<K,T9> >(key_from_message_body<K,T9>())
1971  ) ) {
1972  }
1973 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1974  template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1975  typename Body5, typename Body6, typename Body7, typename Body8, typename Body9>
1976  unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1977  Body5 body5, Body6 body6, Body7 body7, Body8 body8, Body9 body9) : base_type(g, func_initializer_type(
1978  new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1979  new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1),
1980  new internal::type_to_key_function_body_leaf<T2, K, Body2>(body2),
1981  new internal::type_to_key_function_body_leaf<T3, K, Body3>(body3),
1982  new internal::type_to_key_function_body_leaf<T4, K, Body4>(body4),
1983  new internal::type_to_key_function_body_leaf<T5, K, Body5>(body5),
1984  new internal::type_to_key_function_body_leaf<T6, K, Body6>(body6),
1985  new internal::type_to_key_function_body_leaf<T7, K, Body7>(body7),
1986  new internal::type_to_key_function_body_leaf<T8, K, Body8>(body8),
1987  new internal::type_to_key_function_body_leaf<T9, K, Body9>(body9)
1988  ) ) {
1989  __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 10, "wrong number of body initializers");
1990  }
1991  unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1992  };
1993 #endif
1994 
1996  template<size_t N, typename JNT>
1998  return tbb::flow::get<N>(jn.input_ports());
1999  }
2000 
2001 }
2002 #endif // __TBB__flow_graph_join_impl_H
2003 
virtual void increment_port_count()=0
static tbb::task *const SUCCESSFULLY_ENQUEUED
#define __TBB_override
Definition: tbb_stddef.h:244
A task that calls a node's forward_task function.
unfolded_join_node : passes input_ports_type to join_node_base. We build the input port type
bool remove_successor(successor_type &r) __TBB_override
Removes a successor from this node.
type_to_key_func_type * get_my_key_func()
internal::type_to_key_function_body< count_element_type, unref_key_type & > TtoK_function_body_type
#define __TBB_STATIC_ASSERT(condition, msg)
Definition: tbb_stddef.h:536
join_node_base(graph &g, FunctionTuple f)
void set_owner(successor_type *owner)
wrap_key_tuple_elements< 4, key_matching_port, key_matching< K, KHash >, OutputTuple >::type input_ports_type
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:169
hash_buffer< key_type, input_type, type_to_key_func_type, hash_compare_type > buffer_type
static bool reserve(InputTuple &my_input, OutputTuple &out)
static void reset_my_port(InputTuple &my_input)
graph & graph_reference() __TBB_override
aggregator< handler_type, reserving_port_operation > my_aggregator
unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4)
field of type K being used for matching.
task * fill_output_buffer(unref_key_type &t, bool should_enqueue, bool handle_task)
graph & graph_reference() __TBB_override
A cache of successors that are put in a round-robin fashion.
static void copy_key_functors(KeyFuncTuple &my_inputs, KeyFuncTuple &other_inputs)
reservable_predecessor_cache< T, null_mutex > my_predecessors
bool my_item_valid(size_type i) const
void suppress_unused_warning(const T1 &)
Utility template function to prevent "unused" warnings by various compilers.
Definition: tbb_stddef.h:381
wrap_tuple_elements< N, PT, OutputTuple >::type input_ports_type
hash_buffer< unref_key_type &, count_element_type, TtoK_function_body_type, key_hash_compare > key_to_count_buffer_type
receiver< input_type >::predecessor_type predecessor_type
void reset_receiver(reset_flags f) __TBB_override
internal::aggregating_functor< class_type, queueing_port_operation > handler_type
static void reset_inputs(InputTuple &my_input, reset_flags f)
tbb::flow::tuple_element< N, typename JNT::input_ports_type >::type & input_port(JNT &jn)
templated function to refer to input ports of the join node
reserving_port< T > class_type
graph & graph_reference() __TBB_override
void set_join_node_pointer(forwarding_base *join)
record parent for tallying available items
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type type
K key_from_message(const T &t)
Definition: flow_graph.h:695
static void set_key_functors(InputTuple &my_input, KeyFuncTuple &my_key_funcs)
static bool get_my_item(InputTuple &my_input, OutputTuple &out)
bool remove_predecessor(predecessor_type &src) __TBB_override
Remove a predecessor.
void reset_receiver(reset_flags f) __TBB_override
static bool get_my_item(InputTuple &my_input, OutputTuple &out)
static void release_reservations(TupleType &my_input)
internal::join_node_base< key_traits_type, typename wrap_key_tuple_elements< N, key_matching_port, key_traits_type, OutputTuple >::type, OutputTuple > type
void set_join_node_pointer(forwarding_base *join)
join_node_base< key_matching< K, KHash >, input_ports_type, output_type > base_type
queueing_port(const queueing_port &)
copy constructor
reserving_port_operation(const predecessor_type &s, op_type t)
aggregator< handler_type, join_node_base_operation > my_aggregator
receiver< input_type >::predecessor_type predecessor_type
void consume()
Complete use of the port.
void const char const char int ITT_FORMAT __itt_group_sync p
receiver< input_type >::predecessor_type predecessor_type
Base class for types that should not be assigned.
Definition: tbb_stddef.h:324
static tbb::task * combine_tasks(graph &g, tbb::task *left, tbb::task *right)
Definition: flow_graph.h:171
void handle_operations(reserving_port_operation *op_list)
task * decrement_port_count(bool handle_task) __TBB_override
join_node_FE< key_matching< key_type, key_hash_compare >, InputTuple, OutputTuple > class_type
reserving_port(const reserving_port &)
static void release_my_reservation(TupleType &my_input)
matching_forwarding_base< key_type > * my_join
static void reset_ports(InputTuple &my_input)
bool register_predecessor(predecessor_type &src) __TBB_override
Add a predecessor.
static void set_join_node_pointer(TupleType &my_input, PortType *port)
internal::aggregating_functor< class_type, reserving_port_operation > handler_type
static void reset_ports(InputTuple &my_input)
internal::join_node_base< JP, typename wrap_tuple_elements< N, PT, OutputTuple >::type, OutputTuple > type
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task * task
void reset_node(reset_flags f) __TBB_override
join_node_base_operation(const successor_type &s, op_type t)
void release()
Release the port.
join_node_FE : implements input port policy
static void reset_my_port(InputTuple &my_input)
aggregator< handler_type, key_matching_FE_operation > my_aggregator
bool register_successor(successor_type &r) __TBB_override
Add a new successor to this node.
void set_join_node_pointer(forwarding_base *join)
virtual task * increment_key_count(current_key_type const &, bool)=0
join_node_base< key_matching< K, KHash >, input_ports_type, output_type > base_type
wrap_key_tuple_elements< 2, key_matching_port, key_matching< K, KHash >, OutputTuple >::type input_ports_type
virtual task * decrement_port_count(bool handle_task)=0
internal::aggregating_functor< class_type, key_matching_FE_operation > handler_type
bool try_get(output_type &v) __TBB_override
Request an item from the sender.
void set_my_key_func(type_to_key_func_type *f)
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long value
static void consume_reservations(TupleType &my_input)
static void set_join_node_pointer(TupleType &my_input, PortType *port)
void reset_receiver(reset_flags f) __TBB_override
bool reserve(T &v)
Reserve an item from the port.
static bool get_items(InputTuple &my_input, OutputTuple &out)
const item_type & front() const
const K & operator()(const table_item_type &v)
join_node_base< key_matching< key_type, key_hash_compare >, InputTuple, OutputTuple > base_node_type
join_node_base< JP, input_ports_type, output_type > base_type
broadcast_cache< output_type, null_rw_mutex > my_successors
task * try_put_task(const input_type &v) __TBB_override
Put item to successor; return task to run the successor if possible.
void handle_operations(queueing_port_operation *op_list)
task * try_put_task(const T &v) __TBB_override
join_node_base< key_matching< K, KHash >, input_ports_type, output_type > base_type
wrap_key_tuple_elements< 5, key_matching_port, key_matching< K, KHash >, OutputTuple >::type input_ports_type
void handle_operations(key_matching_port_operation *op_list)
static void release_reservations(TupleType &my_input)
join_node_base(const join_node_base &other)
join_node_base< reserving, InputTuple, OutputTuple > base_node_type
join_node_base< key_matching< K, KHash >, input_ports_type, output_type > base_type
unfolded_join_node(const unfolded_join_node &other)
internal::type_to_key_function_body_leaf< count_element_type, unref_key_type &, key_to_count_func > TtoK_function_body_leaf_type
tbb::internal::strip< KeyType >::type current_key_type
static void copy_key_functors(KeyFuncTuple &my_inputs, KeyFuncTuple &other_inputs)
void const char const char int ITT_FORMAT __itt_group_sync s
void handle_operations(join_node_base_operation *op_list)
static void reset_inputs(InputTuple &my_input, reset_flags f)
key_matching_port(const key_matching_port &)
A cache of successors that are broadcast to.
join_node_FE< JP, InputTuple, OutputTuple > input_ports_type
tbb::internal::strip< key_type >::type noref_key_type
wrap_key_tuple_elements< 3, key_matching_port, key_matching< K, KHash >, OutputTuple >::type input_ports_type
sender< output_type >::successor_type successor_type
join_node_base< queueing, InputTuple, OutputTuple > base_node_type
aggregator< handler_type, queueing_port_operation > my_aggregator
void spawn_in_graph_arena(graph &g, tbb::task &arena_task)
Spawns a task inside graph arena.
static void set_key_functors(InputTuple &my_input, KeyFuncTuple &my_key_funcs)
task * try_put_task(const T &) __TBB_override
The two-phase join port.
join_node_base< JP, InputTuple, OutputTuple > class_type
internal::aggregating_functor< class_type, join_node_base_operation > handler_type
static void release_my_reservation(TupleType &my_input)
task * increment_key_count(unref_key_type const &t, bool handle_task) __TBB_override
static bool get_items(InputTuple &my_input, OutputTuple &out)
internal::aggregating_functor< class_type, key_matching_port_operation > handler_type
Release.
Definition: atomic.h:49
key_matching_port< traits > class_type
void __TBB_store_with_release(volatile T &location, V value)
Definition: tbb_machine.h:717
static void consume_reservations(TupleType &my_input)
static bool reserve(InputTuple &my_input, OutputTuple &out)
aggregator< handler_type, key_matching_port_operation > my_aggregator

Copyright © 2005-2019 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.