Embedded Template Library 1.0
Loading...
Searching...
No Matches
message_broker.h
1/******************************************************************************
2The MIT License(MIT)
3
4Embedded Template Library.
5https://github.com/ETLCPP/etl
6https://www.etlcpp.com
7
8Copyright(c) 2017 John Wellbelove
9
10Permission is hereby granted, free of charge, to any person obtaining a copy
11of this software and associated documentation files(the "Software"), to deal
12in the Software without restriction, including without limitation the rights
13to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
14copies of the Software, and to permit persons to whom the Software is
15furnished to do so, subject to the following conditions :
16
17The above copyright notice and this permission notice shall be included in all
18copies or substantial portions of the Software.
19
20THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
23AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
26SOFTWARE.
27******************************************************************************/
28
29#ifndef ETL_MESSAGE_BROKER_INCLUDED
30#define ETL_MESSAGE_BROKER_INCLUDED
31
32#include "platform.h"
33#include "message.h"
34#include "message_router.h"
35#include "message_types.h"
36#include "nullptr.h"
37#include "span.h"
38
39#include <stdint.h>
40
41namespace etl
42{
43 //***************************************************************************
45 //***************************************************************************
47 {
48 private:
49
50 //*******************************************
51 class subscription_node
52 {
53 friend class message_broker;
54
55 protected:
56
57 //*******************************
58 subscription_node()
59 : p_next(ETL_NULLPTR)
60 {
61 }
62
63 //*******************************
64 void set_next(subscription_node* sub)
65 {
66 p_next = sub;
67 }
68
69 //*******************************
70 subscription_node* get_next() const
71 {
72 return p_next;
73 }
74
75 //*******************************
76 void terminate()
77 {
78 set_next(ETL_NULLPTR);
79 }
80
81 //*******************************
82 void append(subscription_node* sub)
83 {
84 if (sub != ETL_NULLPTR)
85 {
86 sub->set_next(get_next());
87 }
88 set_next(sub);
89 }
90
91 subscription_node* p_next;
92 };
93
94 public:
95
96 typedef etl::span<const etl::message_id_t> message_id_span_t;
97
98 //*******************************************
99 class subscription : public subscription_node
100 {
101 public:
102
103 friend class message_broker;
104
105 //*******************************
106 subscription(etl::imessage_router& router_)
107 : p_router(&router_)
108 {
109 }
110
111 private:
112
113 //*******************************
114 virtual message_id_span_t message_id_list() const = 0;
115
116 //*******************************
117 etl::imessage_router* get_router() const
118 {
119 return p_router;
120 }
121
122 //*******************************
123 subscription* next_subscription() const
124 {
125 return static_cast<subscription*>(get_next());
126 }
127
128 etl::imessage_router* const p_router;
129 };
130
131 using etl::imessage_router::receive;
132
133 //*******************************************
135 //*******************************************
137 : imessage_router(etl::imessage_router::MESSAGE_BROKER)
138 , head()
139 {
140 }
141
142 //*******************************************
144 //*******************************************
146 : imessage_router(etl::imessage_router::MESSAGE_BROKER, successor_)
147 , head()
148 {
149 }
150
151 //*******************************************
153 //*******************************************
154 message_broker(etl::message_router_id_t id_)
155 : imessage_router(id_)
156 , head()
157 {
158 ETL_ASSERT((id_ <= etl::imessage_router::MAX_MESSAGE_ROUTER) || (id_ == etl::imessage_router::MESSAGE_BROKER),
160 }
161
162 //*******************************************
164 //*******************************************
165 message_broker(etl::message_router_id_t id_, etl::imessage_router& successor_)
166 : imessage_router(id_, successor_)
167 , head()
168 {
169 ETL_ASSERT((id_ <= etl::imessage_router::MAX_MESSAGE_ROUTER) || (id_ == etl::imessage_router::MESSAGE_BROKER),
171 }
172
173 //*******************************************
175 //*******************************************
177 {
178 initialise_insertion_point(new_sub.get_router(), &new_sub);
179 }
180
181 //*******************************************
182 void unsubscribe(etl::imessage_router& router)
183 {
184 initialise_insertion_point(&router, ETL_NULLPTR);
185 }
186
187 //*******************************************
188 virtual void receive(const etl::imessage& msg) ETL_OVERRIDE
189 {
190 receive(etl::imessage_router::ALL_MESSAGE_ROUTERS, msg);
191 }
192
193 virtual void receive(etl::shared_message shared_msg) ETL_OVERRIDE
194 {
195 receive(etl::imessage_router::ALL_MESSAGE_ROUTERS, shared_msg);
196 }
197
198 //*******************************************
199 virtual void receive(etl::message_router_id_t destination_router_id, const etl::imessage& msg) ETL_OVERRIDE
200 {
201 const etl::message_id_t id = msg.get_message_id();
202
203 if (!empty())
204 {
205 // Scan the subscription lists.
206 subscription* sub = static_cast<subscription*>(head.get_next());
207
208 while (sub != ETL_NULLPTR)
209 {
210 message_id_span_t message_ids = sub->message_id_list();
211
212 message_id_span_t::iterator itr = etl::find(message_ids.begin(), message_ids.end(), id);
213
214 if (itr != message_ids.end())
215 {
216 etl::imessage_router* router = sub->get_router();
217
218 if (destination_router_id == etl::imessage_router::ALL_MESSAGE_ROUTERS || destination_router_id == router->get_message_router_id())
219 {
220 router->receive(msg);
221 }
222 }
223
224 sub = sub->next_subscription();
225 }
226 }
227
228 // Always pass the message on to the successor.
229 if (has_successor())
230 {
231 get_successor().receive(destination_router_id, msg);
232 }
233 }
234
235 //*******************************************
236 virtual void receive(etl::message_router_id_t destination_router_id, etl::shared_message shared_msg) ETL_OVERRIDE
237 {
238 const etl::message_id_t id = shared_msg.get_message().get_message_id();
239
240 if (!empty())
241 {
242 // Scan the subscription lists.
243 subscription* sub = static_cast<subscription*>(head.get_next());
244
245 while (sub != ETL_NULLPTR)
246 {
247 message_id_span_t message_ids = sub->message_id_list();
248
249 message_id_span_t::iterator itr = etl::find(message_ids.begin(), message_ids.end(), id);
250
251 if (itr != message_ids.end())
252 {
253 etl::imessage_router* router = sub->get_router();
254
255 if (destination_router_id == etl::imessage_router::ALL_MESSAGE_ROUTERS || destination_router_id == router->get_message_router_id())
256 {
257 router->receive(shared_msg);
258 }
259 }
260
261 sub = sub->next_subscription();
262 }
263 }
264
265 // Always pass the message on to a successor.
266 if (has_successor())
267 {
268 get_successor().receive(destination_router_id, shared_msg);
269 }
270 }
271
272 using imessage_router::accepts;
273
274 //*******************************************
277 //*******************************************
278 virtual bool accepts(etl::message_id_t id) const ETL_OVERRIDE
279 {
280 if (!empty())
281 {
282 // Scan the subscription lists.
283 subscription* sub = static_cast<subscription*>(head.get_next());
284
285 while (sub != ETL_NULLPTR)
286 {
287 message_id_span_t message_ids = sub->message_id_list();
288
289 message_id_span_t::iterator itr = etl::find(message_ids.begin(), message_ids.end(), id);
290
291 if (itr != message_ids.end())
292 {
293 etl::imessage_router* router = sub->get_router();
294
295 if (router->accepts(id))
296 {
297 return true;
298 }
299 }
300
301 sub = sub->next_subscription();
302 }
303 }
304
305 // Check any successor.
306 if (has_successor())
307 {
308 if (get_successor().accepts(id))
309 {
310 return true;
311 }
312 }
313
314 return false;
315
316 // return true;
317 }
318
319 //*******************************************
320 void clear()
321 {
322 head.terminate();
323 }
324
325 //********************************************
326 ETL_DEPRECATED
327 virtual bool is_null_router() const ETL_OVERRIDE
328 {
329 return false;
330 }
331
332 //********************************************
333 virtual bool is_producer() const ETL_OVERRIDE
334 {
335 return true;
336 }
337
338 //********************************************
339 virtual bool is_consumer() const ETL_OVERRIDE
340 {
341 return true;
342 }
343
344 //********************************************
345 bool empty() const
346 {
347 return head.get_next() == ETL_NULLPTR;
348 }
349
350 private:
351
352 //*******************************************
353 void initialise_insertion_point(const etl::imessage_router* p_router, etl::message_broker::subscription* p_new_sub)
354 {
355 const etl::imessage_router* p_target_router = p_router;
356
357 subscription_node* p_sub = head.get_next();
358 subscription_node* p_sub_previous = &head;
359
360 while (p_sub != ETL_NULLPTR)
361 {
362 // Do we already have a subscription for the router?
363 if (static_cast<subscription*>(p_sub)->get_router() == p_target_router)
364 {
365 // Then unlink it.
366 p_sub_previous->set_next(p_sub->get_next()); // Jump over the subscription.
367 p_sub->terminate(); // Terminate the unlinked subscription.
368
369 // We're done now.
370 break;
371 }
372
373 // Move on up the list.
374 p_sub = p_sub->get_next();
375 p_sub_previous = p_sub_previous->get_next();
376 }
377
378 if (p_new_sub != ETL_NULLPTR)
379 {
380 // Link in the new subscription.
381 p_sub_previous->append(p_new_sub);
382 }
383 }
384
385 subscription_node head;
386 };
387} // namespace etl
388
389#endif
This is the base of all message routers.
Definition message_router.h:138
Definition message.h:75
Definition message_broker.h:100
void subscribe(etl::message_broker::subscription &new_sub)
Subscribe to the broker.
Definition message_broker.h:176
message_broker()
Constructor.
Definition message_broker.h:136
message_broker(etl::message_router_id_t id_)
Constructor.
Definition message_broker.h:154
virtual bool accepts(etl::message_id_t id) const ETL_OVERRIDE
Definition message_broker.h:278
message_broker(etl::message_router_id_t id_, etl::imessage_router &successor_)
Constructor.
Definition message_broker.h:165
message_broker(etl::imessage_router &successor_)
Constructor.
Definition message_broker.h:145
Router id is out of the legal range.
Definition message_router.h:69
Span - Fixed Extent.
Definition span.h:208
ETL_NODISCARD ETL_CONSTEXPR iterator begin() const ETL_NOEXCEPT
Returns an iterator to the beginning of the span.
Definition span.h:484
ETL_NODISCARD ETL_CONSTEXPR iterator end() const ETL_NOEXCEPT
Returns an iterator to the end of the span.
Definition span.h:508
bool has_successor() const
Definition successor.h:184
successor_type & get_successor() const
Definition successor.h:174
#define ETL_ASSERT(b, e)
Definition error_handler.h:511
bitset_ext
Definition absolute.h:40
uint_least8_t message_id_t
Allow alternative type for message id.
Definition message_types.h:40