51 class subscription_node
64 void set_next(subscription_node* sub)
70 subscription_node* get_next()
const
78 set_next(ETL_NULLPTR);
82 void append(subscription_node* sub)
84 if (sub != ETL_NULLPTR)
86 sub->set_next(get_next());
91 subscription_node* p_next;
99 class subscription :
public subscription_node
103 friend class message_broker;
114 virtual message_id_span_t message_id_list()
const = 0;
123 subscription* next_subscription()
const
125 return static_cast<subscription*
>(get_next());
131 using etl::imessage_router::receive;
137 : imessage_router(
etl::imessage_router::MESSAGE_BROKER)
146 : imessage_router(
etl::imessage_router::MESSAGE_BROKER, successor_)
155 : imessage_router(id_)
158 ETL_ASSERT((id_ <= etl::imessage_router::MAX_MESSAGE_ROUTER) || (id_ == etl::imessage_router::MESSAGE_BROKER),
166 : imessage_router(id_, successor_)
169 ETL_ASSERT((id_ <= etl::imessage_router::MAX_MESSAGE_ROUTER) || (id_ == etl::imessage_router::MESSAGE_BROKER),
178 initialise_insertion_point(new_sub.get_router(), &new_sub);
184 initialise_insertion_point(&router, ETL_NULLPTR);
190 receive(etl::imessage_router::ALL_MESSAGE_ROUTERS, msg);
193 virtual void receive(etl::shared_message shared_msg) ETL_OVERRIDE
195 receive(etl::imessage_router::ALL_MESSAGE_ROUTERS, shared_msg);
199 virtual void receive(etl::message_router_id_t destination_router_id,
const etl::imessage& msg) ETL_OVERRIDE
208 while (sub != ETL_NULLPTR)
210 message_id_span_t message_ids = sub->message_id_list();
212 message_id_span_t::iterator itr = etl::find(message_ids.begin(), message_ids.end(),
id);
214 if (itr != message_ids.end())
216 etl::imessage_router* router = sub->get_router();
218 if (destination_router_id == etl::imessage_router::ALL_MESSAGE_ROUTERS || destination_router_id == router->get_message_router_id())
220 router->receive(msg);
224 sub = sub->next_subscription();
236 virtual void receive(etl::message_router_id_t destination_router_id, etl::shared_message shared_msg) ETL_OVERRIDE
245 while (sub != ETL_NULLPTR)
247 message_id_span_t message_ids = sub->message_id_list();
249 message_id_span_t::iterator itr = etl::find(message_ids.begin(), message_ids.end(),
id);
251 if (itr != message_ids.end())
253 etl::imessage_router* router = sub->get_router();
255 if (destination_router_id == etl::imessage_router::ALL_MESSAGE_ROUTERS || destination_router_id == router->get_message_router_id())
257 router->receive(shared_msg);
261 sub = sub->next_subscription();
272 using imessage_router::accepts;
285 while (sub != ETL_NULLPTR)
287 message_id_span_t message_ids = sub->message_id_list();
289 message_id_span_t::iterator itr = etl::find(message_ids.
begin(), message_ids.
end(),
id);
291 if (itr != message_ids.
end())
295 if (router->accepts(
id))
301 sub = sub->next_subscription();
327 virtual bool is_null_router() const ETL_OVERRIDE
333 virtual bool is_producer() const ETL_OVERRIDE
339 virtual bool is_consumer() const ETL_OVERRIDE
347 return head.get_next() == ETL_NULLPTR;
353 void initialise_insertion_point(
const etl::imessage_router* p_router, etl::message_broker::subscription* p_new_sub)
355 const etl::imessage_router* p_target_router = p_router;
357 subscription_node* p_sub = head.get_next();
358 subscription_node* p_sub_previous = &head;
360 while (p_sub != ETL_NULLPTR)
363 if (
static_cast<subscription*
>(p_sub)->get_router() == p_target_router)
366 p_sub_previous->set_next(p_sub->get_next());
374 p_sub = p_sub->get_next();
375 p_sub_previous = p_sub_previous->get_next();
378 if (p_new_sub != ETL_NULLPTR)
381 p_sub_previous->append(p_new_sub);
385 subscription_node head;