This example application implements a simple PPI application: It will read UDP packets from an input port and will forward them to another port at a fixed packet rate. If the input stream does not provide enough packets, empty UDP packets will be sent instead.
netcat
. We open several shell windows and run the following commands, each in it's own window
The first command listens for incoming UDP packets on port 44345:
# nc -u -l -p 44345
The next command starts the ratestuffer
# cd .../Examples/RateStuffer # scons -u # ./ratestuffer
We should now see '<idle>' messages arriving in the first window once per second. We now can start another netcat
to send packets to the input port.
# nc -u localhost 44344 < type any text here >
Whenever we send out a packet with CR in the last window we should see it appear in the first one. If we send out packets faster than 1 packet per second, they will start to be discarded if more than two packets are in flight.
Above image depicts the module setup implementing the rate stuffer. A senf::ppi::module::ActiveSocketSource reads the incoming UDP packets and sends them into a senf::ppi::module::PassiveQueue (via a senf::ppi::module::ThrottleBarrier).
The queue feeds the packets into a senf::ppi::module::PriorityJoin. The CloneSource generator is fed as second input into the join to provide the stuffing packets.
The RateFilter rateFilter reads packets from it's input at a fixed rate and writes them into the senf::ppi::module::PassiveSocketSink udpSink. The senf::ppi::module::PriorityJoin ensures that read requests of the RateStuffer's input are always serviced, either from the queue or, if the queue output is throttled, from the generator.
The barrier is not strictly necessary. However, it makes the behavior of the RateStuffer predictable in the case where packets need to be dropped. Without the senf::ppi::module::ThrottleBarrier, the packets will be left in the kernel socket queue. Packets will only start to be dropped when that queue fills up. The size of this queue cannot be easily manipulated and it's initial size is immense. So to stop the kernel queue from filling up with increasingly out-of-date packets, we add the barrier which will explicitly read and drop excess packets.
The code starts out including the necessary header files
#include <senf/Socket/Protocols/INet.hh> #include <senf/PPI.hh>
We also define some namespace aliases
namespace module = senf::ppi::module; namespace connector = senf::ppi::connector; namespace ppi = senf::ppi;
The RateStuffer application is based on one additional application module.
class RateFilter : public module::Module { SENF_PPI_MODULE(RateFilter); public: connector::ActiveInput<> input; connector::ActiveOutput<> output; RateFilter(senf::ClockService::clock_type interval); private: void timeout(); ppi::IntervalTimer timer; };
Both connectors of the RateFilter module are active. The module is driven by a senf::ppi::IntervalTimer.
RateFilter::RateFilter(senf::ClockService::clock_type interval) : timer(interval) { route(input,timer); route(timer,output); registerEvent(timer, &RateFilter::timeout); }
The event is initialized to fire every interval nanoseconds. The traffic is routed 'across' the timer which controls the traffic. This routing lets the module automatically handle throttling events. The timer is registered to call RateFilter::timeout().
The event handler is quite simple: Every interval nanoseconds a packet is read from input and forwarded to output.
This is all there is to the RateFilter module. Due to the routing setup, the timer will automatically be disabled should either input or output become throttled. However, in this specific case this should never happen: The input is connected (via the join) to the senf::ppi::module::CloneSource, which will never throttle. The output is connected to a UDP socket which also never throttles.
class RateStuffer { module::ThrottleBarrier barrier; module::PassiveQueue queue; module::CloneSource generator; module::PriorityJoin join; RateFilter rateFilter;
First the needed modules are declared. We have
public:
connector::PassiveInput<> & input;
connector::ActiveOutput<> & output;
Here we declare the external connectors. The subnetwork exports a single input and output connector. The external connectors are declared as references.
RateStuffer(senf::ClockService::clock_type interval, senf::Packet packet, unsigned high = 1, unsigned low = 0) : barrier (), queue (), generator ( packet ), join (), rateFilter ( interval ), input ( barrier.input ), output ( rateFilter.output )
The constructor now initializes all the local objects. We pass the template packet to the generator and set the timing interval of the rateFilter.
The input and output connector references are bound to the corresponding connectors we want to expose: input to the barrier's input and output to the rateFilter's output.
{ ppi::connect( barrier, queue ); ppi::connect( queue, join ); ppi::connect( generator, join ); ppi::connect( join, rateFilter ); queue.qdisc(ppi::ThresholdQueueing(high,low)); } };
The constructor body sets up the connections within the subnetwork. Finally, we set the queueing discipline of the queue. This Completes the RateStuffer. This subnetwork can now be used like a module.
int main(int argc, char * argv[]) { senf::UDPv4ClientSocketHandle inputSocket( senf::INet4SocketAddress("0.0.0.0:44344")); senf::ConnectedUDPv4ClientSocketHandle outputSocket( senf::INet4SocketAddress("localhost:44345"));
The inputSocket is listening on port 44344 while the outputSocket will send packets to port 44345 on localhost. The outputSocket uses the senf::ConnectedUDPv4SocketProtocol which is compatible with the senf::ppi::module::PassiveSocketSink module.
module::ActiveSocketSource<> udpSource ( inputSocket ); RateStuffer stuffer ( 1000000000ul, senf::DataPacket::create(std::string("<idle>\n")), 2u, 1u ); module::PassiveSocketSink<> udpSink ( outputSocket );
Here we allocate the components:
ppi::connect( udpSource, stuffer ); ppi::connect( stuffer, udpSink );
The senf::ppi::connect() calls setup the necessary connections.
The module setup is complete, senf::ppi::run() is called to enter the event loop.
ppi::run();
return 0;
}