Connectors.cc
Go to the documentation of this file.
1 //
2 // Copyright (c) 2020 Fraunhofer Institute for Applied Information Technology (FIT)
3 // Network Research Group (NET)
4 // Schloss Birlinghoven, 53754 Sankt Augustin, GERMANY
5 // Contact: support@wiback.org
6 //
7 // This file is part of the SENF code tree.
8 // It is licensed under the 3-clause BSD License (aka New BSD License).
9 // See LICENSE.txt in the top level directory for details or visit
10 // https://opensource.org/licenses/BSD-3-Clause
11 //
12 
13 
17 #include "Connectors.hh"
18 #include "Connectors.ih"
19 
20 // Custom includes
22 #include "ModuleManager.hh"
23 #include "Module.hh"
24 
25 #define prefix_
26 //-/////////////////////////////////////////////////////////////////////////////////////////////////
27 
28 //-/////////////////////////////////////////////////////////////////////////////////////////////////
29 // senf::ppi::connector::Connector
30 
32  : peer_(nullptr), module_(nullptr)
33 # ifndef SENF_PPI_NOTRACE
34  , traceState_(NO_TRACING)
35 # endif
36 {
37  namespace fty = console::factory;
38  consoleDir().add("connected", fty::Command( &Connector::connected, this));
39 #ifndef SENF_PPI_NOTRACE
40  consoleDir().add("tracing", fty::Command(
41  SENF_MEMFNP(void, Connector, tracingState, (TraceState)), this));
42  consoleDir().add("tracing", fty::Command(
43  SENF_MEMFNP(TraceState, Connector, tracingState, () const), this));
44 #endif
45 }
46 
48 {
49  if (connected()) {
50  Connector & peer (*peer_);
51  peer_->peer_ = 0;
52  if (! peer.initializationScheduled())
53  peer.enqueueInitializable();
54  peer.v_disconnected();
55  }
56 }
57 
59 {
60  // The connector is not registered -> route() or noroute() statement missing
61  SENF_ASSERT( module_,
62  "senf::ppi::connector::Connector::connect(): (source) "
63  "Missing route() or noroute()" );
64  // The connector is already connected
65  if (peer_) {
66  module::Module & peerModule (peer_->module());
68  << " in module: " << prettyName(typeid(*module_))
69  << ", source module: " << prettyName(typeid(peerModule));
70  }
71  // The target connector is not registered -> route() or noroute() statement missing
72  SENF_ASSERT( target.module_,
73  "senf::ppi::connector::Connector::connect(): (target) "
74  "Missing route() or noroute()" );
75  // The target connector is already connected
76  if (target.peer_) {
77  module::Module & targetPeerModule (target.peer_->module());
79  << " in module: " << prettyName(typeid(*module_))
80  << ", target module: " << prettyName(typeid(targetPeerModule));
81  }
82  if (! (v_packetTypeId() == typeid(void) ||
83  target.v_packetTypeId() == typeid(void) ||
84  v_packetTypeId() == target.v_packetTypeId()) ) {
85  module::Module & targetModule (target.module());
87  << ": " << prettyName(v_packetTypeId())
88  << " [in module " << prettyName(typeid(*module_)) << "] "
89  << ", " << prettyName(target.v_packetTypeId())
90  << " [in module " << prettyName(typeid(targetModule)) << "]";
91  }
92 
93  peer_ = &target;
94  target.peer_ = this;
95 
98  if (! peer().initializationScheduled())
100 
101  v_connected();
102  peer_->v_connected();
103 }
104 
105 #ifndef SENF_PPI_NOTRACE
106 senf::ppi::connector::Connector::TraceState senf::ppi::connector::Connector::staticTraceState_ (
108 std::string senf::ppi::connector::Connector::traceFilter_;
109 
110 prefix_ void senf::ppi::connector::Connector::trace(Packet const & p, char const * label)
111 {
112  if (tracingState() == NO_TRACING)
113  return;
114 
115  std::string type (prettyName(p.typeId().id()));
116  if (! traceFilter_.empty()) {
117  std::stringstream ss;
118  p.dump(ss);
119  std::string packetDump (ss.str());
120  if (! (boost::algorithm::ifind_first(type, traceFilter_) || boost::algorithm::ifind_first(packetDump, traceFilter_)))
121  return;
122  }
123 
124  SENF_LOG_BLOCK(({
125  module::Module & mod (module());
126  log << "PPI packet trace: " << label << " 0x" << std::hex << p.id() << " "
127  << type.substr(21, type.size()-22) << " on " << & module() << " "
128  << prettyName(typeid(mod)) << " connector 0x" << this << "\n";
129  if (tracingState() == TRACE_CONTENTS)
130  p.dump(log);
131  }));
132 }
133 
135  char const * type)
136 {
137  if (tracingState() == NO_TRACING)
138  return;
139  SENF_LOG_BLOCK(({
140  module::Module & mod (module());
141  log << "PPI throttling trace: " << label << " " << type << " on " << & module()
142  << " " << prettyName(typeid(mod)) << " connector 0x" << this << "\n";
143  }));
144 }
145 
146 namespace senf { namespace ppi { namespace connector {
147 
150 
151 }}}
152 #endif
153 
154 namespace {
155 
156  struct ConsoleRegister
157  {
158  ConsoleRegister();
159  };
160 
161  ConsoleRegister::ConsoleRegister()
162  {
163 #ifndef SENF_PPI_NOTRACE
168  .arg("state", "new tracing state")
169  .doc("Log every packet sent or received by any module.\n"
170  "There are three different tracing levels:\n"
171  "\n"
172  " NO_TRACING don't output any tracing information\n"
173  " TRACE_IDS trace packet id's but do not show packet contents\n"
174  " TRACE_CONTENTS trace complete packet contents\n"
175  "\n"
176  "A log message is generated whenever the packet traverses a connector. The\n"
177  "TRACE_IDS log message has the following format:\n"
178  "\n"
179  " PPI packet trace: <direction> <packet-id> <packet-type>\n"
180  " on <module-id> <module-type> connector <connector-id>\n"
181  " PPI throttling trace: <direction> <throttle-msg>\n"
182  " on <module-id> <module-type> connector <connector-id>\n"
183  "\n"
184  "The fields are:\n"
185  "\n"
186  " direction 'IN' for packets/throttle notifications entering the module,\n"
187  " 'OUT' for packets/throttle notifications leaving it\n"
188  " packet-id Numeric unique packet id. This value is unique for packets\n"
189  " alive at the same time, packets at different times may (and\n"
190  " will) share id's\n"
191  " packet-type The type of the packet header\n"
192  " module-id Unique module id\n"
193  " module-type Type of the module the packet is sent to/from\n"
194  " connector-id Unique connector id\n"
195  " throttle-msg Type of throttling event\n")
196  );
198  .add("tracingFilter", senf::console::factory::Command(
200 #endif
201  }
202 
203  ConsoleRegister consoleRegister;
204 }
205 
207 {
208  // Cannot disconnected a non-connected connector
209  SENF_ASSERT( peer_, "senf::ppi::connector::Connector::disconnect(): Not connected" );
210 
211  Connector & peer (*peer_);
212  peer_ = 0;
213  peer.peer_ = 0;
214 
215  if (! initializationScheduled())
217  if (! peer.initializationScheduled())
218  peer.enqueueInitializable();
219 
220  v_disconnected();
221  peer.v_disconnected();
222 }
223 
224 prefix_ std::type_info const & senf::ppi::connector::Connector::v_packetTypeId()
225 {
226  return typeid(void);
227 }
228 
230 {
231  if (module_)
232  module_->unregisterConnector(*this);
233 }
234 
235 prefix_ void senf::ppi::connector::Connector::setModule(module::Module & module)
236 {
237  module_ = &module;
238  consoleDir().remove( "module", senf::nothrow);
239  consoleDir().add( "module", console::factory::Link(module.sysConsoleDir()));
240 }
241 
243  const
244 {
245  // The connector is not registered in the module -> probably a route() or noroute() statement is
246  // missing.
247  SENF_ASSERT(module_, "Connector not registered: Missing route() or noroute()");
248  return *module_;
249 }
250 
252 {
253  consoleDir().remove( "peer", senf::nothrow);
254 }
255 
257 {
258  if (! consoleDir().hasChild("peer"))
259  consoleDir().add( "peer", console::factory::Link(peer_->consoleDir()));
260 }
261 
262 //-/////////////////////////////////////////////////////////////////////////////////////////////////
263 // senf::ppi::connector::PassiveConnector
264 
266  : peer_(nullptr), remoteThrottled_(false), nativeThrottled_(false)
267 {
268  namespace fty = console::factory;
269  consoleDir().add("throttled", fty::Command( &PassiveConnector::throttled, this));
270 }
271 
273 {
274  // Must be here and NOT in base so it is called before destructing the routes_ member
276 }
277 
279 {
281  peer_ = 0;
282 }
283 
285 {
287  peer_ = & dynamic_cast<ActiveConnector&>(Connector::peer());
288 }
289 
290 //-/////////////////////////////////////////////////////////////////////////////////////////////////
291 // private members
292 
293 prefix_ void senf::ppi::connector::PassiveConnector::v_init()
294 {
295  Routes::const_iterator i (routes_.begin());
296  Routes::const_iterator const i_end (routes_.end());
297  for (; i != i_end; ++i)
298  if ((*i)->throttled())
299  break;
300  if (i == i_end)
301  remoteThrottled_ = false;
302  if (throttled())
303  emitThrottle();
304  else
305  emitUnthrottle();
306 }
307 
308 prefix_ void senf::ppi::connector::PassiveConnector::registerRoute(ForwardingRoute & route)
309 {
310  routes_.push_back(&route);
311 }
312 
313 prefix_ void senf::ppi::connector::PassiveConnector::unregisterRoute(ForwardingRoute & route)
314 {
315  Routes::iterator i (std::find(routes_.begin(), routes_.end(), &route));
316  if (i != routes_.end())
317  routes_.erase(i);
318 }
319 
320 prefix_ void senf::ppi::connector::PassiveConnector::v_unthrottleEvent()
321 {}
322 
323 prefix_ void senf::ppi::connector::PassiveConnector::notifyUnthrottle()
324 {
325  if (std::find_if(routes_.begin(), routes_.end(),
326  boost::bind(&ForwardingRoute::throttled, _1)) == routes_.end()) {
327  remoteThrottled_ = false;
328  if (!nativeThrottled_)
329  emitUnthrottle();
330  } else {
331  SENF_PPI_THROTTLE_TRACE("OUT", "not forwarding unthrottle event");
332  }
333 }
334 
335 //-/////////////////////////////////////////////////////////////////////////////////////////////////
336 // senf::ppi::connector::ActiveConnector
337 
339 {
340  // Must be here and NOT in base so it is called before destructing the routes_ member
342 }
343 
345 {
347  peer_ = 0;
348 }
349 
351 {
353  peer_ = & dynamic_cast<PassiveConnector&>(Connector::peer());
354 }
355 
356 //-/////////////////////////////////////////////////////////////////////////////////////////////////
357 // private members
358 
359 prefix_ void senf::ppi::connector::ActiveConnector::v_init()
360 {
361  if (! connected())
362  notifyThrottle();
363 }
364 
365 prefix_ void senf::ppi::connector::ActiveConnector::notifyThrottle()
366 {
367  SENF_PPI_THROTTLE_TRACE("IN ", "throttle");
368  if (! throttled_) {
369  throttled_ = true;
370  if (throttleCallback_)
371  throttleCallback_();
372  NotifyRoutes::const_iterator i (notifyRoutes_.begin());
373  NotifyRoutes::const_iterator const i_end (notifyRoutes_.end());
374  for (; i != i_end; ++i)
375  (*i)->notifyThrottle();
376  }
377 }
378 
379 prefix_ void senf::ppi::connector::ActiveConnector::notifyUnthrottle()
380 {
381  SENF_PPI_THROTTLE_TRACE("IN ", "unthrottle");
382  if (throttled_) {
383  throttled_ = false;
384  if (unthrottleCallback_)
385  unthrottleCallback_();
386  NotifyRoutes::const_iterator i (notifyRoutes_.begin());
387  NotifyRoutes::const_iterator const i_end (notifyRoutes_.end());
388  for (; i != i_end; ++i)
389  (*i)->notifyUnthrottle();
390  }
391 }
392 
393 prefix_ void senf::ppi::connector::ActiveConnector::registerRoute(ForwardingRoute & route)
394 {
395  notifyRoutes_.push_back(&route);
396 }
397 
398 prefix_ void senf::ppi::connector::ActiveConnector::unregisterRoute(ForwardingRoute & route)
399 {
400  NotifyRoutes::iterator i (std::find(notifyRoutes_.begin(), notifyRoutes_.end(), &route));
401  if (i != notifyRoutes_.end())
402  notifyRoutes_.erase(i);
403 }
404 
405 //-/////////////////////////////////////////////////////////////////////////////////////////////////
406 // senf::ppi::connector::InputConnector
407 
409 {
410  static Packet nullPacket;
411 
412  if (empty())
413  v_requestEvent();
414  if (fastPacket_) {
415  Packet const * p = fastPacket_;
416  fastPacket_ = nullptr;
417  v_dequeueEvent();
418  SENF_PPI_TRACE(*p, "IN ");
419  return *p;
420  }
421  if (! queue_.empty()) {
422  slowPacket_ = queue_.back();
423  queue_.pop_back();
424  v_dequeueEvent();
425  SENF_PPI_TRACE(slowPacket_, "IN ");
426  return slowPacket_;
427  } else {
428  SENF_PPI_TRACE(nullPacket, "IN ");
429  return nullPacket;
430  }
431 }
432 
434 {
436  peer_ = 0;
437 }
438 
440 {
442  peer_ = & dynamic_cast<OutputConnector&>(Connector::peer());
443 }
444 
445 //-/////////////////////////////////////////////////////////////////////////////////////////////////
446 // private members
447 
448 prefix_ void senf::ppi::connector::InputConnector::v_requestEvent()
449 {}
450 
451 prefix_ void senf::ppi::connector::InputConnector::v_enqueueEvent()
452 {}
453 
454 prefix_ void senf::ppi::connector::InputConnector::v_dequeueEvent()
455 {}
456 
457 //-/////////////////////////////////////////////////////////////////////////////////////////////////
458 // senf::ppi::connector::OutputConnector
459 
461 {
463  peer_ = 0;
464 }
465 
467 {
469  peer_ = & dynamic_cast<InputConnector&>(Connector::peer());
470 }
471 
472 //-/////////////////////////////////////////////////////////////////////////////////////////////////
473 // senf::ppi::connector::GenericActiveInput
474 
475 //-/////////////////////////////////////////////////////////////////////////////////////////////////
476 // private members
477 
478 prefix_ void senf::ppi::connector::GenericActiveInput::v_requestEvent()
479 {
480  request();
481 }
482 
483 //-/////////////////////////////////////////////////////////////////////////////////////////////////
484 // senf::ppi::connector::GenericPassiveInput
485 
487 {
490  peer_ = 0;
491 }
492 
494 {
497  peer_ = & dynamic_cast<GenericActiveOutput&>(Connector::peer());
498 }
499 
500 //-/////////////////////////////////////////////////////////////////////////////////////////////////
501 // private members
502 
503 prefix_ void senf::ppi::connector::GenericPassiveInput::v_enqueueEvent()
504 {
505  emit();
506  if (SENF_UNLIKELY(throttlingDisc_))
507  throttlingDisc_->update(*this, ThrottlingDiscipline::ENQUEUE);
508 }
509 
510 prefix_ void senf::ppi::connector::GenericPassiveInput::v_dequeueEvent()
511 {
512  if (SENF_UNLIKELY(throttlingDisc_))
513  throttlingDisc_->update(*this, ThrottlingDiscipline::DEQUEUE);
514 }
515 
517 {
518  throttlingDisc_.reset();
519 }
520 
521 prefix_ void senf::ppi::connector::GenericPassiveInput::v_unthrottleEvent()
522 {
523  size_type n (queueSize());
524  while (n) {
525  emit();
526  size_type nn (queueSize());
527  if (n == nn)
528  break;
529  n = nn;
530  }
531 }
532 
533 //-/////////////////////////////////////////////////////////////////////////////////////////////////
534 // senf::ppi::connector::GenericPassiveOutput
535 
537 {
540  peer_ = 0;
541 }
542 
544 {
547  peer_ = & dynamic_cast<GenericActiveInput&>(Connector::peer());
548 }
549 
550 //-/////////////////////////////////////////////////////////////////////////////////////////////////
551 // senf::ppi::connector::GenericActiveInput
552 
554 {
557  peer_ = 0;
558 }
559 
561 {
564  peer_ = & dynamic_cast<GenericPassiveOutput&>(Connector::peer());
565 }
566 
567 //-/////////////////////////////////////////////////////////////////////////////////////////////////
568 // senf::ppi::connector::GenericActiveOutput
569 
571 {
574  peer_ = 0;
575 }
576 
578 {
581  peer_ = & dynamic_cast<GenericPassiveInput&>(Connector::peer());
582 }
583 
584 
585 //-/////////////////////////////////////////////////////////////////////////////////////////////////
586 #undef prefix_
587 //#include "Connectors.mpp"
588 
589 
590 // Local Variables:
591 // mode: c++
592 // fill-column: 100
593 // comment-column: 40
594 // c-file-style: "senf"
595 // indent-tabs-mode: nil
596 // ispell-local-dictionary: "american"
597 // compile-command: "scons -u test"
598 // End:
#define SENF_LOG_BLOCK(args)
#define prefix_
Definition: Connectors.cc:25
Packet const & operator()()
Get a packet.
Definition: Connectors.cc:408
Module base-class.
Definition: Module.hh:169
Connectors public header.
bool empty()
#define SENF_FNP(ret, fn, args)
Module public header.
u8 type
static ModuleManager & instance()
#define SENF_MEMFNP(ret, cls, fn, args)
Combination of PassiveConnector and OutputConnector.
Definition: Connectors.hh:513
Passive connector base-class.
Definition: Connectors.hh:231
std::type_info const & id() const
Combination of ActiveConnector and InputConnector.
Definition: Connectors.hh:538
Active connector base-class.
Definition: Connectors.hh:309
Input connector base-class.
Definition: Connectors.hh:397
Queue::size_type size_type
Unsigned type for counting queue elements.
Definition: Connectors.hh:403
ModuleManager public header.
Output connector base-class.
Definition: Connectors.hh:449
Incompatible connectors connected.
Definition: Connectors.hh:143
Combination of ActiveConnector and OutputConnector.
Definition: Connectors.hh:563
void throttleTrace(char const *label, char const *type)
Definition: Connectors.cc:134
NodeType & add(std::string const &name, boost::shared_ptr< NodeType > node)
static void tracingFilter(std::string const &traceFilter)
std::string prettyName(std::type_info const &type)
Connector & peer() const
Get peer connected to this connector.
console::DirectoryNode & consoleDir() const
bool throttled() const
Get accumulative throttling state.
void connect(Connector &target)
Definition: Connectors.cc:58
nothrow
bool connected() const
true, if connector connected, false otherwise
void dump(std::ostream &os, DumpPacketAnnotations_t dumpAnnotationsSwitch=dumpAnnotations) const
Connector base-class.
Definition: Connectors.hh:156
#define SENF_ASSERT(x, comment)
unsigned long id() const
Forwarding route base class.
Definition: Route.hh:69
bool throttled() const
true, if the route is throttled
void disconnect()
Disconnect connector from peer.
Definition: Connectors.cc:206
Combination of PassiveConnector and InputConnector.
Definition: Connectors.hh:478
static void staticTracingState(TraceState state)
GenericNode::ptr remove(std::string const &name)
TypeIdValue typeId() const
module::Module & module() const
Get this connectors containing module.
Definition: Connectors.cc:242
TraceState tracingState() const
console::DirectoryNode & consoleDir() const
void trace(Packet const &p, char const *label)
Definition: Connectors.cc:110
#define SENF_UNLIKELY(x)
console::DirectoryNode & sysConsoleDir() const
SENF_CONSOLE_REGISTER_ENUM_MEMBER(Connector, TraceState,(NO_TRACING)(TRACE_IDS)(TRACE_CONTENTS))
void throttlingDisc(ThrottlingDisc const &disc)
Change the throttling discipline.