Intel® RealSense™ Cross Platform API
Intel Realsense Cross-platform API
sync.h
Go to the documentation of this file.
1 // License: Apache 2.0. See LICENSE file in root directory.
2 // Copyright(c) 2015 Intel Corporation. All Rights Reserved.
3 
4 #pragma once
5 
6 #include "types.h"
7 #include "archive.h"
8 
9 #include <stdint.h>
10 #include <vector>
11 #include <mutex>
12 #include <memory>
13 
14 namespace librealsense
15 {
16 
17  typedef int stream_id;
18 
19  class sync_lock
20  {
21  public:
22  sync_lock(std::mutex& mutex) : _mutex(mutex)
23  {
24  mutex.lock();
25  }
26 
28  {
29  // NOTE: The Sync_Lock is itself a single-threaded object
30  // It maintains a state, and does not protect its state.
31  // That is acceptable for our use case,
32  // because we use it to communicate within a single thread
33  if (!_is_locked) return;
34  _mutex.unlock();
35  _is_locked = false;
36 
37  }
38 
40  {
41  if (_is_locked)
42  {
43  _mutex.unlock();
44 
45  }
46  }
47 
48  private:
49  bool _is_locked = true;
50 
51  std::mutex& _mutex;
52  };
53  //sync_lock::ref = 0;
54 
55  class synthetic_source_interface;
56 
58  {
60  //sync_lock& lock_ref;
62  };
63 
64  typedef int stream_id;
65  typedef std::function<void(frame_holder, syncronization_environment)> sync_callback;
66 
68  {
69  public:
70  virtual void dispatch(frame_holder f, syncronization_environment env) = 0;
71  virtual void sync(frame_holder f, syncronization_environment env) = 0;
72  virtual void set_callback(sync_callback f) = 0;
73  virtual const std::vector<stream_id>& get_streams() const = 0;
74  virtual const std::vector<rs2_stream>& get_streams_types() const = 0;
75  virtual std::string get_name() const = 0;
76  };
77 
78  class matcher: public matcher_interface
79  {
80  public:
81  matcher(std::vector<stream_id> streams_id = {});
82  virtual void sync(frame_holder f, syncronization_environment env);
83  virtual void set_callback(sync_callback f);
84  const std::vector<stream_id>& get_streams() const override;
85  const std::vector<rs2_stream>& get_streams_types() const override;
86 
88  virtual ~matcher();
89 
90  virtual std::string get_name() const;
91  bool get_active() const;
92  void set_active(const bool active);
93 
94  protected:
95  std::vector<stream_id> _streams_id;
96  std::vector<rs2_stream> _streams_type;
99  std::string _name;
100  bool _active = true;
101  };
102 
103  class identity_matcher : public matcher
104  {
105  public:
107 
108  void dispatch(frame_holder f, syncronization_environment env) override;
109 
110  };
111 
112  class composite_matcher : public matcher
113  {
114  public:
115  composite_matcher(std::vector<std::shared_ptr<matcher>> matchers, std::string name);
116 
117 
118  virtual bool are_equivalent(frame_holder& a, frame_holder& b) = 0;
119  virtual bool is_smaller_than(frame_holder& a, frame_holder& b) = 0;
120  virtual bool skip_missing_stream(std::vector<matcher*> synced, matcher* missing) = 0;
121  virtual void clean_inactive_streams(frame_holder& f) = 0;
122  virtual void update_last_arrived(frame_holder& f, matcher* m) = 0;
123 
124  void dispatch(frame_holder f, syncronization_environment env) override;
125  std::string frames_to_string(std::vector<librealsense::matcher*> matchers);
126  void sync(frame_holder f, syncronization_environment env) override;
127  std::shared_ptr<matcher> find_matcher(const frame_holder& f);
128 
129  protected:
130  virtual void update_next_expected(const frame_holder& f) = 0;
131 
132  std::map<matcher*, single_consumer_queue<frame_holder>> _frames_queue;
133  std::map<stream_id, std::shared_ptr<matcher>> _matchers;
134  std::map<matcher*, double> _next_expected;
135  std::map<matcher*, rs2_timestamp_domain> _next_expected_domain;
136  };
137 
139  {
140  public:
141  frame_number_composite_matcher(std::vector<std::shared_ptr<matcher>> matchers);
142  virtual void update_last_arrived(frame_holder& f, matcher* m) override;
143  bool are_equivalent(frame_holder& a, frame_holder& b) override;
144  bool is_smaller_than(frame_holder& a, frame_holder& b) override;
145  bool skip_missing_stream(std::vector<matcher*> synced, matcher* missing) override;
146  void clean_inactive_streams(frame_holder& f) override;
147  void update_next_expected(const frame_holder& f) override;
148 
149  private:
150  std::map<matcher*,unsigned long long> _last_arrived;
151  };
152 
154  {
155  public:
156  timestamp_composite_matcher(std::vector<std::shared_ptr<matcher>> matchers);
157  bool are_equivalent(frame_holder& a, frame_holder& b) override;
158  bool is_smaller_than(frame_holder& a, frame_holder& b) override;
159  virtual void update_last_arrived(frame_holder& f, matcher* m) override;
160  void clean_inactive_streams(frame_holder& f) override;
161  bool skip_missing_stream(std::vector<matcher*> synced, matcher* missing) override;
162  void update_next_expected(const frame_holder & f) override;
163 
164  private:
165  unsigned int get_fps(const frame_holder & f);
166  bool are_equivalent(double a, double b, int fps);
167  std::map<matcher*, double> _last_arrived;
168  std::map<matcher*, unsigned int> _fps;
169 
170  };
171 }
timestamp_composite_matcher(std::vector< std::shared_ptr< matcher >> matchers)
Definition: sync.h:78
std::map< matcher *, double > _next_expected
Definition: sync.h:134
Definition: backend.h:347
void clean_inactive_streams(frame_holder &f) override
virtual std::string get_name() const =0
sync_callback _callback
Definition: sync.h:97
void sync(frame_holder f, syncronization_environment env) override
std::function< void(frame_holder, syncronization_environment)> sync_callback
Definition: sync.h:65
virtual void set_callback(sync_callback f)
virtual void update_last_arrived(frame_holder &f, matcher *m) override
bool is_smaller_than(frame_holder &a, frame_holder &b) override
virtual void set_callback(sync_callback f)=0
sync_lock(std::mutex &mutex)
Definition: sync.h:22
bool skip_missing_stream(std::vector< matcher *> synced, matcher *missing) override
virtual void update_last_arrived(frame_holder &f, matcher *m) override
single_consumer_queue< frame_holder > & matches
Definition: sync.h:61
std::vector< rs2_stream > _streams_type
Definition: sync.h:96
identity_matcher(stream_id stream, rs2_stream streams_type)
matcher(std::vector< stream_id > streams_id={})
bool _active
Definition: sync.h:100
Definition: algo.h:16
callback_invocation_holder begin_callback()
virtual void sync(frame_holder f, syncronization_environment env)=0
std::map< stream_id, std::shared_ptr< matcher > > _matchers
Definition: sync.h:133
callbacks_heap _callback_inflight
Definition: sync.h:98
virtual const std::vector< stream_id > & get_streams() const =0
virtual const std::vector< rs2_stream > & get_streams_types() const =0
~sync_lock()
Definition: sync.h:39
std::string frames_to_string(std::vector< librealsense::matcher *> matchers)
bool is_smaller_than(frame_holder &a, frame_holder &b) override
bool are_equivalent(frame_holder &a, frame_holder &b) override
void unlock_preemptively()
Definition: sync.h:27
virtual void update_last_arrived(frame_holder &f, matcher *m)=0
virtual void clean_inactive_streams(frame_holder &f)=0
std::string _name
Definition: sync.h:99
Definition: sync.h:112
virtual bool skip_missing_stream(std::vector< matcher *> synced, matcher *missing)=0
bool are_equivalent(frame_holder &a, frame_holder &b) override
synthetic_source_interface * source
Definition: sync.h:59
rs2_stream
Streams are different types of data provided by RealSense devices.
Definition: rs_sensor.h:36
void update_next_expected(const frame_holder &f) override
virtual void update_next_expected(const frame_holder &f)=0
virtual bool is_smaller_than(frame_holder &a, frame_holder &b)=0
void update_next_expected(const frame_holder &f) override
bool get_active() const
void set_active(const bool active)
std::map< matcher *, rs2_timestamp_domain > _next_expected_domain
Definition: sync.h:135
const std::vector< stream_id > & get_streams() const override
Definition: concurrency.h:15
std::shared_ptr< matcher > find_matcher(const frame_holder &f)
frame_number_composite_matcher(std::vector< std::shared_ptr< matcher >> matchers)
virtual std::string get_name() const
Definition: sync.h:103
int stream_id
Definition: sync.h:17
Definition: stream.h:14
Definition: types.h:587
virtual void dispatch(frame_holder f, syncronization_environment env)=0
const std::vector< rs2_stream > & get_streams_types() const override
bool skip_missing_stream(std::vector< matcher *> synced, matcher *missing) override
Definition: processing.h:19
composite_matcher(std::vector< std::shared_ptr< matcher >> matchers, std::string name)
Definition: sync.h:19
virtual bool are_equivalent(frame_holder &a, frame_holder &b)=0
void dispatch(frame_holder f, syncronization_environment env) override
std::map< matcher *, single_consumer_queue< frame_holder > > _frames_queue
Definition: sync.h:132
void dispatch(frame_holder f, syncronization_environment env) override
void clean_inactive_streams(frame_holder &f) override
virtual void sync(frame_holder f, syncronization_environment env)
Definition: sync.h:67
std::vector< stream_id > _streams_id
Definition: sync.h:95