00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00026 #include "Connectors.hh"
00027 #include "Connectors.ih"
00028
00029
00030 #include "ModuleManager.hh"
00031 #include <senf/Utils/Console/ParsedCommand.hh>
00032
00033
00034 #define prefix_
00035
00036
00037
00038
00039
00040 prefix_ senf::ppi::connector::Connector::~Connector()
00041 {
00042 if (connected()) {
00043 Connector & peer (*peer_);
00044 peer_->peer_ = 0;
00045 if (! peer.initializationScheduled())
00046 peer.enqueueInitializable();
00047 peer.v_disconnected();
00048 }
00049 }
00050
00051 prefix_ void senf::ppi::connector::Connector::connect(Connector & target)
00052 {
00053
00054 SENF_ASSERT( module_,
00055 "senf::ppi::connector::Connector::connect(): (source) "
00056 "Missing route() or noroute()" );
00057
00058 SENF_ASSERT( ! peer_,
00059 "senf::ppi::connector::Connector::connect(): (source) "
00060 "duplicate connection" );
00061
00062 SENF_ASSERT( target.module_,
00063 "senf::ppi::connector::Connector::connect(): (target) "
00064 "Missing route() or noroute()" );
00065
00066 SENF_ASSERT( ! target.peer_,
00067 "senf::ppi::connector::Connector::connect(): (target) "
00068 "duplicate connection" );
00069
00070 if (! (packetTypeID() == typeid(void) ||
00071 target.packetTypeID() == typeid(void) ||
00072 packetTypeID() == target.packetTypeID()) )
00073 throw IncompatibleConnectorsException()
00074 << ": " << prettyName(packetTypeID())
00075 << " [in module " << prettyName(typeid(*module_)) << "] "
00076 << ", " << prettyName(target.packetTypeID())
00077 << " [in module " << prettyName(typeid(*target.module_)) << "]";
00078
00079 peer_ = & target;
00080 target.peer_ = this;
00081
00082 if (! initializationScheduled())
00083 enqueueInitializable();
00084 if (! peer().initializationScheduled())
00085 peer().enqueueInitializable();
00086
00087 v_connected();
00088 peer_->v_connected();
00089
00090 }
00091
00092 senf::ppi::connector::Connector::TraceState senf::ppi::connector::Connector::traceState_ (
00093 senf::ppi::connector::Connector::NO_TRACING);
00094
00095 prefix_ void senf::ppi::connector::Connector::trace(Packet const & p, char const * label)
00096 {
00097 if (traceState_ == NO_TRACING)
00098 return;
00099 SENF_LOG_BLOCK(({
00100 std::string type (prettyName(p.typeId().id()));
00101 log << "PPI packet trace: " << label << " 0x" << std::hex << p.id() << " "
00102 << type.substr(21, type.size()-22) << " on " << & module() << " "
00103 << prettyName(typeid(module())) << " connector 0x" << this << "\n";
00104 if (traceState_ == TRACE_CONTENTS)
00105 p.dump(log);
00106 }));
00107 }
00108
00109 prefix_ void senf::ppi::connector::Connector::throttleTrace(char const * label,
00110 char const * type)
00111 {
00112 if (traceState_ == NO_TRACING)
00113 return;
00114 SENF_LOG_BLOCK(({
00115 log << "PPI throttling trace: " << label << " " << type << " on " << & module()
00116 << " " << prettyName(typeid(module())) << " connector 0x" << this << "\n";
00117 }));
00118 }
00119
00120 namespace senf { namespace ppi { namespace connector {
00121
00122 SENF_CONSOLE_REGISTER_ENUM_MEMBER(
00123 Connector, TraceState, (NO_TRACING)(TRACE_IDS)(TRACE_CONTENTS) );
00124
00125 }}}
00126
00127 namespace {
00128
00129 struct ConsoleRegister
00130 {
00131 ConsoleRegister();
00132 };
00133
00134 ConsoleRegister::ConsoleRegister()
00135 {
00136 #ifndef SENF_PPI_NOTRACE
00137 senf::ppi::ModuleManager::instance().consoleDir()
00138 .add("tracing", senf::console::factory::Command(
00139 SENF_FNP(senf::ppi::connector::Connector::TraceState,
00140 senf::ppi::connector::Connector::tracing, ()))
00141 .doc("Log every packet sent or received by any module.\n"
00142 "There are three different tracing levels:\n"
00143 "\n"
00144 " NO_TRACING don't output any tracing information\n"
00145 " TRACE_IDS trace packet id's but do not show packet contents\n"
00146 " TRACE_CONTENTS trace complete packet contents\n"
00147 "\n"
00148 "A log message is generated whenever the packet traverses a connector. The\n"
00149 "TRACE_IDS log message has the following format:\n"
00150 "\n"
00151 " PPI packet trace: <direction> <packet-id> <packet-type>\n"
00152 " on <module-id> <module-type> connector <connector-id>\n"
00153 " PPI throttling trace: <direction> <throttle-msg>\n"
00154 " on <module-id> <module-type> connector <connector-id>\n"
00155 "\n"
00156 "The fields are:\n"
00157 "\n"
00158 " direction 'IN' for packets/throttle notifications entering the module,\n"
00159 " 'OUT' for packets/throttle notifications leaving it\n"
00160 " packet-id Numeric unique packet id. This value is unique for packets\n"
00161 " alive at the same time, packets at different times may (and\n"
00162 " will) share id's\n"
00163 " packet-type The type of the packet header\n"
00164 " module-id Unique module id\n"
00165 " module-type Type of the module the packet is sent to/from\n"
00166 " connector-id Unique connector id\n"
00167 " throttle-msg Type of throttling event\n")
00168 );
00169
00170 senf::ppi::ModuleManager::instance().consoleDir()
00171 .add("tracing", senf::console::factory::Command(
00172 SENF_FNP(void, senf::ppi::connector::Connector::tracing,
00173 (senf::ppi::connector::Connector::TraceState)))
00174 .arg("state", "new tracing state")
00175 );
00176 #endif
00177 }
00178
00179 ConsoleRegister consoleRegister;
00180
00181 }
00182
00183 prefix_ void senf::ppi::connector::Connector::disconnect()
00184 {
00185
00186 SENF_ASSERT( peer_,
00187 "senf::ppi::connector::Connector::disconnect(): Not connected" );
00188
00189 Connector & peer (*peer_);
00190 peer_ = 0;
00191 peer.peer_ = 0;
00192
00193 if (! initializationScheduled())
00194 enqueueInitializable();
00195 if (! peer.initializationScheduled())
00196 peer.enqueueInitializable();
00197
00198 v_disconnected();
00199 peer.v_disconnected();
00200 }
00201
00202 prefix_ std::type_info const & senf::ppi::connector::Connector::packetTypeID()
00203 {
00204 return typeid(void);
00205 }
00206
00207 prefix_ void senf::ppi::connector::Connector::unregisterConnector()
00208 {
00209 if (module_)
00210 module_->unregisterConnector(*this);
00211 }
00212
00213 prefix_ void senf::ppi::connector::Connector::setModule(module::Module & module)
00214 {
00215 module_ = &module;
00216 }
00217
00218 prefix_ void senf::ppi::connector::Connector::v_disconnected()
00219 {}
00220
00221 prefix_ void senf::ppi::connector::Connector::v_connected()
00222 {}
00223
00224
00225
00226
00227 prefix_ senf::ppi::connector::PassiveConnector::~PassiveConnector()
00228 {
00229
00230 unregisterConnector();
00231 }
00232
00233 prefix_ void senf::ppi::connector::PassiveConnector::v_disconnected()
00234 {
00235 Connector::v_disconnected();
00236 peer_ = 0;
00237 }
00238
00239 prefix_ void senf::ppi::connector::PassiveConnector::v_connected()
00240 {
00241 Connector::v_connected();
00242 peer_ = & dynamic_cast<ActiveConnector&>(Connector::peer());
00243 }
00244
00245
00246
00247
00248 prefix_ void senf::ppi::connector::PassiveConnector::v_init()
00249 {
00250 Routes::const_iterator i (routes_.begin());
00251 Routes::const_iterator const i_end (routes_.end());
00252 for (; i != i_end; ++i)
00253 if ((*i)->throttled())
00254 break;
00255 if (i == i_end)
00256 remoteThrottled_ = false;
00257 if (throttled())
00258 emitThrottle();
00259 else
00260 emitUnthrottle();
00261 }
00262
00263 prefix_ void senf::ppi::connector::PassiveConnector::registerRoute(ForwardingRoute & route)
00264 {
00265 routes_.push_back(&route);
00266 }
00267
00268 prefix_ void senf::ppi::connector::PassiveConnector::unregisterRoute(ForwardingRoute & route)
00269 {
00270 Routes::iterator i (std::find(routes_.begin(), routes_.end(), &route));
00271 if (i != routes_.end())
00272 routes_.erase(i);
00273 }
00274
00275 prefix_ void senf::ppi::connector::PassiveConnector::v_unthrottleEvent()
00276 {}
00277
00278 prefix_ void senf::ppi::connector::PassiveConnector::notifyUnthrottle()
00279 {
00280 if (std::find_if(routes_.begin(), routes_.end(),
00281 boost::bind(&ForwardingRoute::throttled, _1)) == routes_.end()) {
00282 remoteThrottled_ = false;
00283 if (!nativeThrottled_)
00284 emitUnthrottle();
00285 } else
00286 SENF_PPI_THROTTLE_TRACE("OUT", "not forwarding unthrottle event");
00287 }
00288
00289
00290
00291
00292 prefix_ senf::ppi::connector::ActiveConnector::~ActiveConnector()
00293 {
00294
00295 unregisterConnector();
00296 }
00297
00298 prefix_ void senf::ppi::connector::ActiveConnector::v_disconnected()
00299 {
00300 Connector::v_disconnected();
00301 peer_ = 0;
00302 }
00303
00304 prefix_ void senf::ppi::connector::ActiveConnector::v_connected()
00305 {
00306 Connector::v_connected();
00307 peer_ = & dynamic_cast<PassiveConnector&>(Connector::peer());
00308 }
00309
00310
00311
00312
00313 prefix_ void senf::ppi::connector::ActiveConnector::v_init()
00314 {
00315 if (! connected())
00316 notifyThrottle();
00317 }
00318
00319 prefix_ void senf::ppi::connector::ActiveConnector::notifyThrottle()
00320 {
00321 SENF_PPI_THROTTLE_TRACE("IN ", "throttle");
00322 if (! throttled_) {
00323 throttled_ = true;
00324 if (throttleCallback_)
00325 throttleCallback_();
00326 NotifyRoutes::const_iterator i (notifyRoutes_.begin());
00327 NotifyRoutes::const_iterator const i_end (notifyRoutes_.end());
00328 for (; i != i_end; ++i)
00329 (*i)->notifyThrottle();
00330 }
00331 }
00332
00333 prefix_ void senf::ppi::connector::ActiveConnector::notifyUnthrottle()
00334 {
00335 SENF_PPI_THROTTLE_TRACE("IN ", "unthrottle");
00336 if (throttled_) {
00337 throttled_ = false;
00338 if (unthrottleCallback_)
00339 unthrottleCallback_();
00340 NotifyRoutes::const_iterator i (notifyRoutes_.begin());
00341 NotifyRoutes::const_iterator const i_end (notifyRoutes_.end());
00342 for (; i != i_end; ++i)
00343 (*i)->notifyUnthrottle();
00344 }
00345 }
00346
00347 prefix_ void senf::ppi::connector::ActiveConnector::registerRoute(ForwardingRoute & route)
00348 {
00349 notifyRoutes_.push_back(&route);
00350 }
00351
00352 prefix_ void senf::ppi::connector::ActiveConnector::unregisterRoute(ForwardingRoute & route)
00353 {
00354 NotifyRoutes::iterator i (std::find(notifyRoutes_.begin(), notifyRoutes_.end(), &route));
00355 if (i != notifyRoutes_.end())
00356 notifyRoutes_.erase(i);
00357 }
00358
00359
00360
00361
00362 prefix_ senf::Packet senf::ppi::connector::InputConnector::operator()()
00363 {
00364 if (empty())
00365 v_requestEvent();
00366 if (! empty()) {
00367 Packet p ( queue_.back());
00368 queue_.pop_back();
00369 v_dequeueEvent();
00370 SENF_PPI_TRACE(p, "IN ");
00371 return p;
00372 } else {
00373 SENF_PPI_TRACE(Packet(), "IN ");
00374 return Packet();
00375 }
00376 }
00377
00378 prefix_ void senf::ppi::connector::InputConnector::v_disconnected()
00379 {
00380 Connector::v_disconnected();
00381 peer_ = 0;
00382 }
00383
00384 prefix_ void senf::ppi::connector::InputConnector::v_connected()
00385 {
00386 Connector::v_connected();
00387 peer_ = & dynamic_cast<OutputConnector&>(Connector::peer());
00388 }
00389
00390
00391
00392
00393 prefix_ void senf::ppi::connector::InputConnector::v_requestEvent()
00394 {}
00395
00396 prefix_ void senf::ppi::connector::InputConnector::v_enqueueEvent()
00397 {}
00398
00399 prefix_ void senf::ppi::connector::InputConnector::v_dequeueEvent()
00400 {}
00401
00402
00403
00404
00405 prefix_ void senf::ppi::connector::OutputConnector::v_disconnected()
00406 {
00407 Connector::v_disconnected();
00408 peer_ = 0;
00409 }
00410
00411 prefix_ void senf::ppi::connector::OutputConnector::v_connected()
00412 {
00413 Connector::v_connected();
00414 peer_ = & dynamic_cast<InputConnector&>(Connector::peer());
00415 }
00416
00417
00418
00419
00420
00421
00422
00423 prefix_ void senf::ppi::connector::GenericActiveInput::v_requestEvent()
00424 {
00425 request();
00426 }
00427
00428
00429
00430
00431 prefix_ void senf::ppi::connector::GenericPassiveInput::v_disconnected()
00432 {
00433 PassiveConnector::v_disconnected();
00434 InputConnector::v_disconnected();
00435 peer_ = 0;
00436 }
00437
00438 prefix_ void senf::ppi::connector::GenericPassiveInput::v_connected()
00439 {
00440 PassiveConnector::v_connected();
00441 InputConnector::v_connected();
00442 peer_ = & dynamic_cast<GenericActiveOutput&>(Connector::peer());
00443 }
00444
00445
00446
00447
00448 prefix_ void senf::ppi::connector::GenericPassiveInput::v_enqueueEvent()
00449 {
00450 emit();
00451 if (qdisc_)
00452 qdisc_->update(*this, QueueingDiscipline::ENQUEUE);
00453 }
00454
00455 prefix_ void senf::ppi::connector::GenericPassiveInput::v_dequeueEvent()
00456 {
00457 if (qdisc_)
00458 qdisc_->update(*this, QueueingDiscipline::DEQUEUE);
00459 }
00460
00461 prefix_ void senf::ppi::connector::GenericPassiveInput::qdisc(QueueingDiscipline::None_t)
00462 {
00463 qdisc_.reset( 0);
00464 }
00465
00466 prefix_ void senf::ppi::connector::GenericPassiveInput::v_unthrottleEvent()
00467 {
00468 size_type n (queueSize());
00469 while (n) {
00470 emit();
00471 size_type nn (queueSize());
00472 if (n == nn)
00473 break;
00474 n = nn;
00475 }
00476 }
00477
00478
00479
00480
00481 prefix_ void senf::ppi::connector::GenericPassiveOutput::v_disconnected()
00482 {
00483 PassiveConnector::v_disconnected();
00484 OutputConnector::v_disconnected();
00485 peer_ = 0;
00486 }
00487
00488 prefix_ void senf::ppi::connector::GenericPassiveOutput::v_connected()
00489 {
00490 PassiveConnector::v_connected();
00491 OutputConnector::v_connected();
00492 peer_ = & dynamic_cast<GenericActiveInput&>(Connector::peer());
00493 }
00494
00495
00496
00497
00498 prefix_ void senf::ppi::connector::GenericActiveInput::v_disconnected()
00499 {
00500 ActiveConnector::v_disconnected();
00501 InputConnector::v_disconnected();
00502 peer_ = 0;
00503 }
00504
00505 prefix_ void senf::ppi::connector::GenericActiveInput::v_connected()
00506 {
00507 ActiveConnector::v_connected();
00508 InputConnector::v_connected();
00509 peer_ = & dynamic_cast<GenericPassiveOutput&>(Connector::peer());
00510 }
00511
00512
00513
00514
00515 prefix_ void senf::ppi::connector::GenericActiveOutput::v_disconnected()
00516 {
00517 ActiveConnector::v_disconnected();
00518 OutputConnector::v_disconnected();
00519 peer_ = 0;
00520 }
00521
00522 prefix_ void senf::ppi::connector::GenericActiveOutput::v_connected()
00523 {
00524 ActiveConnector::v_connected();
00525 OutputConnector::v_connected();
00526 peer_ = & dynamic_cast<GenericPassiveInput&>(Connector::peer());
00527 }
00528
00529
00530
00531 #undef prefix_
00532
00533
00534
00535
00536
00537
00538
00539
00540
00541
00542
00543