The following example module declares three Connectors: payload
, stuffing
and output
. These connectors are defined as public data members so they can be accessed from the outside. This is important as we will see below.
class RateStuffer : public senf::ppi::module::Module { SENF_PPI_MODULE(RateStuffer); senf::ppi::IntervalTimer timer_; public: senf::ppi::connector::ActiveInput<> payload; senf::ppi::connector::ActiveInput<> stuffing; senf::ppi::connector::ActiveOutput<> output; RateStuffer(unsigned packetsPerSecond) : timer_(1000u, packetsPerSecond) { route(payload, output); route(stuffing, output); registerEvent( timer_, &RateStuffer::tick ); } private: void tick() { if (payload) output(payload()); else output(stuffing()); } };
The constructor will declare flow information using senf::ppi::module::Module::route(). Then the module registers an interval timer which will fire packetsPerSecond
times every 1000
milliseconds.
The module processing is very simple: Whenever a timer tick arrives a packet is sent. If the payload
input is ready (see Throttling), a payload packet is sent, otherwise a stuffing packet is sent. The module will therefore provide a constant stream of packets at a fixed rate on output
(see the RateStuffer example application for a slightly different approach)
An example module to generate the stuffing packets could be
class CopyPacketGenerator : public senf::ppi::module::Module { SENF_PPI_MODULE(CopyPacketGenerator); public: senf::ppi::connector::PassiveOutput<> output; CopyPacketGenerator(Packet template) : template_ (template) { noroute(output); output.onRequest(&CopyPacketGenerator::makePacket); } private: Packet template_; void makePacket() { output(template_.clone()); } };
This module just produces a copy of a given packet whenever output is requested.
A passive connector is signaled by the framework to fetch data from the module or to pass data into the module. The module must register a callback which will be called, whenever a packet is requested from the module or whenever a new packet is made available for the module to process.
To send or receive a packet (either actively or passively) the module just calls the connector. It is permissible to generate or process multiple packets in one iteration. However, you must ensure yourself that enough packets are available to be read if more than one packet shall be read. It is also permissible to not handle a packet at all even if signaled to do so. The packet will automatically be queued.
To provide this flexibility, all input connectors incorporate a packet queue. This queue is exposed to the module and allows the module to optionally process packets in batches.
Connectors take an optional template argument which allows to specify the type of packet this connector sends or received. This template arguments defaults to senf::Packet.
Simple RateStuffer
To make use of the modules, they have to be instantiated and connections have to be created between its connectors. It is possible to connect any pair of input/output connectors as long as one of them is active and the other is passive.
It is possible to connect two active or passive connectors with each other using a special adaptor module (senf::ppi::module::PassiveQueue or senf::ppi::module::ActiveFeeder respectively).
Additionally, the connectors must be type-compatible: Either one (or both) of the connectors must be untyped (they accept arbitrary senf::Packet's, the optional tempalte argument is empty), or they both accept the same type of packet. This check is performed at runtime.
To complete our simplified example: Lets connect senf::ppi::module::ActiveSocketReader and senf::ppi::module::PassiveSocketWriter to our example module:
RateStuffer rateStuffer (10); senf::Packet stuffingPacket = senf::DataPacket::create(...); CopyPacketGenerator generator (stuffingPacket); senf::UDPv4ClientSocketHandle inputSocket (1111); senf::ppi::module::ActiveSocketSource<> udpInput (inputSocket); senf::UDPv4ClientSocketHandle outputSocket ("2.3.4.5:2222"); senf::ppi::module::PassiveSocketSink<> udpOutput (outputSocket); senf::ppi::module::PassiveQueue adaptor; senf::ppi::connect(udpInput, adaptor); senf::ppi::connect(adaptor, rateStuffer.payload); adaptor.qdisc(ThresholdQueueing(10,5)); senf::ppi::connect(generator, rateStuffer.stuffing); senf::ppi::connect(rateStuffer, udpOutput); senf::ppi::run();
This application will read udp-packets coming in on port 1111 and will forward them to port 2222 on host 2.3.4.5 with a fixed rate of 10 packets / second.
We start out by instantiating the necessary modules. Then the connections between these modules are set up by successively connecting each output connector to an input connector. As can be seen, the name of the connector can be left of if it is named output
or input
respectively.
The buffering on the udpInput <-> rateStuffer adaptor is changed so the queue will begin to throttle only if more than 10 packets are in the queue. The connection will be unthrottled as soon as there are no more than 5 packets left in the queue (see Throttling).
All this is handled using throttle notifications. We need throttle notifications so a passive connector can tell it's connected peer that it cannot service further requests until an unthrottle notification is sent. This tells us, that from the view of someone implementing a module, throttle notifications will always be received on active connectors and be sent on passive connectors.
This tells us, that the direction of control flow (the throttle notifications) is from passive to active connectors and does not depend on the direction of data flow (which flows from output to input connector). Thinking about this, this makes sense: The module with the active connector is the one initiating the data processing (after all, it is the active part) and needs to be told not to request or send packets on it's connector since the connected passive peer cannot handle the request.
So if a passive connector cannot handle requests, the connector must be throttled. Throttling the connector will forward a throttle notification to its peer. The peer then handles the throttling notification.
There are two ways, throttle notifications can be handled: By automatic throttling or by registering callbacks. The default is automatic throttling.
Automatic throttling is based on the routing information available to the module. Every notification received is forwarded within the module along all known routes from active to passive connectors (routes which connect to active or passive connectors are absolutely valid, they just are not forwarding routes, they are ignored by the throttle notifications). Together with automatic event throttling (see Events), this is all that is normally needed to handle throttle notifications: By forwarding the notifications we ensure, that a module's passive connectors will only be signaled when it's corresponding active connectors are not throttled (as defined by the routing information). The module is therefore not called until the connector(s) are untrhottled.
Throttle callbacks can optionaly be registerd (with automatic throttling enabled or disabled, see senf::ppi::connector::ActiveConnector) to be called when a throttle notification is received. The callback may then handle the notification however it sees fit, for example by manually throttling some passive connector (see senf::ppi::connector::PassiveConnector).
To enable/disable automatic throttling, the senf::ppi::module::Module::route() command returns a reference to a senf::ppi::Route instance. If this route is forwarding route, (that is, of the connectors is passive and the other is active), the return value will be derived from senf::ppi::ForwardingRoute which provides members to control the throttle notification forwarding.
All events are derived from senf::ppi::EventDescriptor. The base class allows to enable and disable the event. Each type of event will take descriptor specific constructor arguments to describe the event to be generated. Events are declared as (private) data members of the module and are then registered using senf::ppi::module::Module::registerEvent().
Each event when signaled is described by an instance of the descriptor specific descriptorType Event
class. This instance will hold the event specific information (like scheduled time of the event, file handle state and so on). This information is passed to the callback.
Additionaly, events are valid routing targets. This feature allows events to be disabled and enabled by throtling notifications. For the sake of routing, an event may be used like an active input or output. Iit is active from the PPI's point of view since it is signaled from the outside and not by some module. It may be either input or output depending on the operation the event controls.
If we take into account event routing, we can extend the RateStuffer
constructor accordingly:
RateStuffer(unsigned packetsPerSecond) : timer_(1000u, packetsPerSecond) { route(payload, output); route(stuffing, output); route(timer_, output); // (*) registerEvent( timer_, &RateStuffer::tick ); }
We have added the marked route call. This way, the timer_
will receive throttling notifications from the output: Whenever the output is throttled, the event will be disabled until the output is unthrottled again.
This works very well with automatic throttling. When no data is available to be processed any more and no more data can be expected to arrive (for Example since data has been read from a file which is now exhausted) all events will be disabled automatically via trhottle notifications and so signal that any processing should stop.
Within a module, the different flow levels are defined differently depending on the type of flow:
route
statement as defining the 'conceptual data flow' since this is also how control messages should flow (sans the direction, which is defined by the connectors active/passive property).