Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
_flow_graph_impl.h
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2019 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 
16 
17 
18 
19 */
20 
21 #ifndef __TBB_flow_graph_impl_H
22 #define __TBB_flow_graph_impl_H
23 
24 #include "../tbb_stddef.h"
25 #include "../task.h"
26 #include "../task_arena.h"
27 #include "../flow_graph_abstractions.h"
28 
29 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
30 #include "../concurrent_priority_queue.h"
31 #endif
32 
33 #include <list>
34 
35 #if TBB_DEPRECATED_FLOW_ENQUEUE
36 #define FLOW_SPAWN(a) tbb::task::enqueue((a))
37 #else
38 #define FLOW_SPAWN(a) tbb::task::spawn((a))
39 #endif
40 
41 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
42 #define __TBB_FLOW_GRAPH_PRIORITY_EXPR( expr ) expr
43 #define __TBB_FLOW_GRAPH_PRIORITY_ARG0( priority ) , priority
44 #define __TBB_FLOW_GRAPH_PRIORITY_ARG1( arg1, priority ) arg1, priority
45 #else
46 #define __TBB_FLOW_GRAPH_PRIORITY_EXPR( expr )
47 #define __TBB_FLOW_GRAPH_PRIORITY_ARG0( priority )
48 #define __TBB_FLOW_GRAPH_PRIORITY_ARG1( arg1, priority ) arg1
49 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
50 
51 namespace tbb {
52 namespace flow {
53 
54 namespace internal {
55 static tbb::task * const SUCCESSFULLY_ENQUEUED = (task *)-1;
56 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
57 typedef unsigned int node_priority_t;
59 #endif
60 }
61 
62 namespace interface10 {
63 
65 
66 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
70 struct graph_task : public task {
71  graph_task( node_priority_t node_priority = no_priority ) : priority( node_priority ) {}
73 };
74 #else
75 typedef task graph_task;
76 #endif /* __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES */
77 
78 
79 class graph;
80 class graph_node;
81 
82 template <typename GraphContainerType, typename GraphNodeType>
84  friend class graph;
85  friend class graph_node;
86 public:
87  typedef size_t size_type;
88  typedef GraphNodeType value_type;
89  typedef GraphNodeType* pointer;
90  typedef GraphNodeType& reference;
91  typedef const GraphNodeType& const_reference;
92  typedef std::forward_iterator_tag iterator_category;
93 
95  graph_iterator() : my_graph(NULL), current_node(NULL) {}
96 
100  {}
101 
104  if (this != &other) {
105  my_graph = other.my_graph;
106  current_node = other.current_node;
107  }
108  return *this;
109  }
110 
112  reference operator*() const;
113 
115  pointer operator->() const;
116 
118  bool operator==(const graph_iterator& other) const {
119  return ((my_graph == other.my_graph) && (current_node == other.current_node));
120  }
121 
123  bool operator!=(const graph_iterator& other) const { return !(operator==(other)); }
124 
128  return *this;
129  }
130 
133  graph_iterator result = *this;
134  operator++();
135  return result;
136  }
137 
138 private:
139  // the graph over which we are iterating
140  GraphContainerType *my_graph;
141  // pointer into my_graph's my_nodes list
143 
145  graph_iterator(GraphContainerType *g, bool begin);
146  void internal_forward();
147 }; // class graph_iterator
148 
149 // flags to modify the behavior of the graph reset(). Can be combined.
152  rf_reset_bodies = 1 << 0, // delete the current node body, reset to a copy of the initial node body.
153  rf_clear_edges = 1 << 1 // delete edges
154 };
155 
156 namespace internal {
157 
158 void activate_graph(graph& g);
159 void deactivate_graph(graph& g);
160 bool is_graph_active(graph& g);
161 void spawn_in_graph_arena(graph& g, tbb::task& arena_task);
163 template<typename F> void execute_in_graph_arena(graph& g, F& f);
164 
165 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
167  bool operator()(const graph_task* left, const graph_task* right) {
168  return left->priority < right->priority;
169  }
170 };
171 
173 
174 class priority_task_selector : public task {
175 public:
177  : my_priority_queue(priority_queue) {}
179  graph_task* t = NULL;
180  bool result = my_priority_queue.try_pop(t);
181  __TBB_ASSERT_EX( result, "Number of critical tasks for scheduler and tasks"
182  " in graph's priority queue mismatched" );
184  "Incorrect task submitted to graph priority queue" );
186  "Tasks from graph's priority queue must have priority" );
187  task* t_next = t->execute();
188  task::destroy(*t);
189  return t_next;
190  }
191 private:
193 };
194 #endif /* __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES */
195 
196 }
197 
199 
201  friend class graph_node;
202 
203  template< typename Body >
204  class run_task : public graph_task {
205  public:
206  run_task(Body& body
208  , node_priority_t node_priority = no_priority
209  ) : graph_task(node_priority),
210 #else
211  ) :
212 #endif
213  my_body(body) { }
215  my_body();
216  return NULL;
217  }
218  private:
219  Body my_body;
220  };
221 
222  template< typename Receiver, typename Body >
223  class run_and_put_task : public graph_task {
224  public:
225  run_and_put_task(Receiver &r, Body& body) : my_receiver(r), my_body(body) {}
227  tbb::task *res = my_receiver.try_put_task(my_body());
228  if (res == SUCCESSFULLY_ENQUEUED) res = NULL;
229  return res;
230  }
231  private:
232  Receiver &my_receiver;
233  Body my_body;
234  };
235  typedef std::list<tbb::task *> task_list_type;
236 
237  class wait_functor {
239  public:
242  };
243 
247  public:
249  void operator()() const {
251  }
252  };
253 
254  void prepare_task_arena(bool reinit = false) {
255  if (reinit) {
256  __TBB_ASSERT(my_task_arena, "task arena is NULL");
259  }
260  else {
261  __TBB_ASSERT(my_task_arena == NULL, "task arena is not NULL");
263  }
264  if (!my_task_arena->is_active()) // failed to attach
265  my_task_arena->initialize(); // create a new, default-initialized arena
266  __TBB_ASSERT(my_task_arena->is_active(), "task arena is not active");
267  }
268 
269 public:
271  graph();
272 
274  explicit graph(tbb::task_group_context& use_this_context);
275 
277 
278  ~graph();
279 
280 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
281  void set_name(const char *name);
282 #endif
283 
285  reserve_wait();
286  }
287 
289  release_wait();
290  }
291 
293 
296 
298 
301 
303 
305  template< typename Receiver, typename Body >
306  void run(Receiver &r, Body body) {
307  if (internal::is_graph_active(*this)) {
308  task* rtask = new (task::allocate_additional_child_of(*root_task()))
311  }
312  }
313 
315 
317  template< typename Body >
318  void run(Body body) {
319  if (internal::is_graph_active(*this)) {
320  task* rtask = new (task::allocate_additional_child_of(*root_task())) run_task< Body >(body);
322  }
323  }
324 
326 
327  void wait_for_all() {
328  cancelled = false;
329  caught_exception = false;
330  if (my_root_task) {
331 #if TBB_USE_EXCEPTIONS
332  try {
333 #endif
336 #if TBB_USE_EXCEPTIONS
337  }
338  catch (...) {
340  my_context->reset();
341  caught_exception = true;
342  cancelled = true;
343  throw;
344  }
345 #endif
346  // TODO: the "if" condition below is just a work-around to support the concurrent wait
347  // mode. The cancellation and exception mechanisms are still broken in this mode.
348  // Consider using task group not to re-implement the same functionality.
350  my_context->reset(); // consistent with behavior in catch()
352  }
353  }
354  }
355 
358  return my_root_task;
359  }
360 
361  // ITERATORS
362  template<typename C, typename N>
363  friend class graph_iterator;
364 
365  // Graph iterator typedefs
368 
369  // Graph iterator constructors
371  iterator begin();
373  iterator end();
375  const_iterator begin() const;
377  const_iterator end() const;
379  const_iterator cbegin() const;
381  const_iterator cend() const;
382 
384  bool is_cancelled() { return cancelled; }
386 
387  // thread-unsafe state reset.
389 
390 private:
394  bool cancelled;
398 
400 
402  void register_node(graph_node *n);
403  void remove_node(graph_node *n);
404 
406 
407 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
409 #endif
410 
411  friend void internal::activate_graph(graph& g);
412  friend void internal::deactivate_graph(graph& g);
413  friend bool internal::is_graph_active(graph& g);
414  friend void internal::spawn_in_graph_arena(graph& g, tbb::task& arena_task);
416  template<typename F> friend void internal::execute_in_graph_arena(graph& g, F& f);
417 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
418  template<typename Input, typename Output, typename Policy, typename Allocator>
419  friend class async_node;
420 #endif
421 
423 
424 }; // class graph
425 
428  friend class graph;
429  template<typename C, typename N>
430  friend class graph_iterator;
431 protected:
434 public:
435  explicit graph_node(graph& g);
436 
437  virtual ~graph_node();
438 
439 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
440  virtual void set_name(const char *name) = 0;
441 #endif
442 
443 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
444  virtual void extract() = 0;
445 #endif
446 
447 protected:
448  // performs the reset on an individual node.
449  virtual void reset_node(reset_flags f = rf_reset_protocol) = 0;
450 }; // class graph_node
451 
452 namespace internal {
453 
454 inline void activate_graph(graph& g) {
455  g.my_is_active = true;
456 }
457 
458 inline void deactivate_graph(graph& g) {
459  g.my_is_active = false;
460 }
461 
462 inline bool is_graph_active(graph& g) {
463  return g.my_is_active;
464 }
465 
467 template<typename F>
468 inline void execute_in_graph_arena(graph& g, F& f) {
469  if (is_graph_active(g)) {
471  g.my_task_arena->execute(f);
472  }
473 }
474 
476 inline void spawn_in_graph_arena(graph& g, tbb::task& arena_task) {
477  task* task_to_spawn = &arena_task;
478 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
479  // TODO: change flow graph's interfaces to work with graph_task type instead of tbb::task.
480  graph_task* t = static_cast<graph_task*>(&arena_task);
481  if( t->priority != no_priority ) {
486  task_to_spawn = new( t->allocate_continuation() ) priority_task_selector(g.my_priority_queue);
487  tbb::internal::make_critical( *task_to_spawn );
488  g.my_priority_queue.push(t);
489  }
490 #endif /* __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES */
491  graph::spawn_functor s_fn(*task_to_spawn);
492  execute_in_graph_arena(g, s_fn);
493 }
494 
496  g.my_reset_task_list.push_back(tp);
497 }
498 
499 } // namespace internal
500 
501 } // namespace interface10
502 } // namespace flow
503 } // namespace tbb
504 
505 #endif // __TBB_flow_graph_impl_H
Tag class used to indicate the "attaching" constructor.
Definition: task_arena.h:235
graph_task(node_priority_t node_priority=no_priority)
static tbb::task *const SUCCESSFULLY_ENQUEUED
#define __TBB_override
Definition: tbb_stddef.h:244
Base class for tasks generated by graph nodes.
void add_task_to_graph_reset_list(graph &g, tbb::task *tp)
bool operator==(const graph_iterator &other) const
Equality.
const_iterator cend() const
end const iterator
Definition: flow_graph.h:845
std::forward_iterator_tag iterator_category
pointer operator->() const
Dereference.
Definition: flow_graph.h:735
virtual task * execute()=0
Should be overridden by derived classes.
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:169
#define __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
Definition: tbb_config.h:826
graph_iterator operator++(int)
Post-increment.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp begin
~graph()
Destroys the graph.
Definition: flow_graph.h:767
Used to form groups of tasks.
Definition: task.h:335
void remove_node(graph_node *n)
Definition: flow_graph.h:800
internal::return_type_or_void< F >::type execute(F &f)
Definition: task_arena.h:342
Base class for types that should not be copied or assigned.
Definition: tbb_stddef.h:335
graph_iterator< const graph, const graph_node > const_iterator
void register_node(graph_node *n)
Definition: flow_graph.h:789
#define FLOW_SPAWN(a)
reference operator*() const
Dereference.
Definition: flow_graph.h:729
tbb::task_group_context * my_context
Base class for user-defined tasks.
Definition: task.h:592
void run(Receiver &r, Body body)
Spawns a task that runs a body and puts its output to a specific receiver.
void push(const_reference elem)
Pushes elem onto the queue, increasing capacity of queue if necessary.
const_iterator cbegin() const
start const iterator
Definition: flow_graph.h:843
#define __TBB_ASSERT_EX(predicate, comment)
"Extended" version is useful to suppress warnings if a variable is only used with an assert
Definition: tbb_stddef.h:171
tbb::task * root_task()
Returns the root task of the graph.
static const node_priority_t no_priority
unsigned int node_priority_t
The base of all graph nodes.
void __TBB_EXPORTED_METHOD reset()
Forcefully reinitializes the context after the task tree it was associated with is completed.
A lock that occupies a single byte.
Definition: spin_mutex.h:40
Base class for types that should not be assigned.
Definition: tbb_stddef.h:324
bool operator()(const graph_task *left, const graph_task *right)
void wait_for_all()
Wait for reference count to become one, and set reference count to zero.
Definition: task.h:792
internal::allocate_continuation_proxy & allocate_continuation()
Returns proxy for overloaded new that allocates a continuation task of *this.
Definition: task.h:649
graph_iterator< graph, graph_node > iterator
Pure virtual template classes that define interfaces for async communication.
iterator end()
end iterator
Definition: flow_graph.h:837
bool try_pop(reference elem)
Gets a reference to and removes highest priority element.
bool operator!=(const graph_iterator &other) const
Inequality.
void release_wait() __TBB_override
Deregisters an external entity that may have interacted with the graph.
Definition: flow_graph.h:782
run_task(Body &body, node_priority_t node_priority=no_priority)
virtual void reset_node(reset_flags f=rf_reset_protocol)=0
The graph class.
std::list< tbb::task * > task_list_type
void reset(reset_flags f=rf_reset_protocol)
Definition: flow_graph.h:812
internal::graph_task_priority_queue_t my_priority_queue
graph_iterator & operator=(const graph_iterator &other)
Assignment.
task * execute() __TBB_override
Should be overridden by derived classes.
tbb::task * execute() __TBB_override
Should be overridden by derived classes.
bool __TBB_EXPORTED_METHOD is_group_execution_cancelled() const
Returns true if the context received cancellation request.
graph_iterator & operator++()
Pre-increment.
iterator begin()
start iterator
Definition: flow_graph.h:835
graph_iterator(const graph_iterator &other)
Copy constructor.
tbb::task * execute() __TBB_override
Should be overridden by derived classes.
bool is_cancelled()
return status of graph execution
void execute_in_graph_arena(graph &g, F &f)
Executes custom functor inside graph arena.
void set_ref_count(int count)
Set reference count.
Definition: task.h:734
void spawn_in_graph_arena(graph &g, tbb::task &arena_task)
Spawns a task inside graph arena.
graph()
Constructs a graph with isolated task_group_context.
Definition: flow_graph.h:745
void make_critical(task &t)
Definition: task.h:952
void prepare_task_arena(bool reinit=false)
void wait_for_all()
Wait until graph is idle and decrement_wait_count calls equals increment_wait_count calls.
priority_task_selector(graph_task_priority_queue_t &priority_queue)
void reserve_wait() __TBB_override
Used to register that an external entity may still interact with the graph.
Definition: flow_graph.h:775
void initialize()
Forces allocation of the resources for the task_arena as specified in constructor arguments.
Definition: task_arena.h:247
void const char const char int ITT_FORMAT __itt_group_sync x void const char * name
Implements async node.
Definition: flow_graph.h:3493
void run(Body body)
Spawns a task that runs a function object.
tbb::concurrent_priority_queue< graph_task *, graph_task_comparator > graph_task_priority_queue_t
uintptr_t traits() const
Returns the context's trait.
Definition: task.h:555

Copyright © 2005-2019 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.