Example introducing the Packet Processing Infrastructure

This example application implements a simple PPI application: It will read UDP packets from an input port and will forward them to another port at a fixed packet rate. If the input stream does not provide enough packets, empty UDP packets will be sent instead.

Running the example

Running the example is a little bit more complicated since we need to provide example UDP packets so we can see the application at work. We do this using netcat. We open several shell windows and run the following commands, each in it's own window

The first command listens for incoming UDP packets on port 44345:

# nc -u -l -p 44345

The next command starts the ratestuffer

# cd .../Examples/RateStuffer
# scons -u
# ./ratestuffer

We should now see '<idle>' messages arriving in the first window once per second. We now can start another netcat to send packets to the input port.

# nc -u localhost 44344
< type any text here >

Whenever we send out a packet with CR in the last window we should see it appear in the first one. If we send out packets faster than 1 packet per second, they will start to be discarded if more than two packets are in flight.

screenshot.png

Module setup

senf::ppi::module::CloneSource senf::ppi::module::ThrottleBarrier senf::ppi::module::PassiveQueue RateFilter senf::ppi::module::PriorityJoin senf::ppi::module::PassiveSocketSink senf::ppi::module::ActiveSocketSource ratestuffer

Above image depicts the module setup implementing the rate stuffer. A senf::ppi::module::ActiveSocketSource reads the incoming UDP packets and sends them into a senf::ppi::module::PassiveQueue (via a senf::ppi::module::ThrottleBarrier).

The queue feeds the packets into a senf::ppi::module::PriorityJoin. The CloneSource generator is fed as second input into the join to provide the stuffing packets.

The RateFilter rateFilter reads packets from it's input at a fixed rate and writes them into the senf::ppi::module::PassiveSocketSink udpSink. The senf::ppi::module::PriorityJoin ensures that read requests of the RateStuffer's input are always serviced, either from the queue or, if the queue output is throttled, from the generator.

The barrier is not strictly necessary. However, it makes the behavior of the RateStuffer predictable in the case where packets need to be dropped. Without the senf::ppi::module::ThrottleBarrier, the packets will be left in the kernel socket queue. Packets will only start to be dropped when that queue fills up. The size of this queue cannot be easily manipulated and it's initial size is immense. So to stop the kernel queue from filling up with increasingly out-of-date packets, we add the barrier which will explicitly read and drop excess packets.

Example code

The code starts out including the necessary header files

#include <senf/Socket/Protocols/INet.hh>
#include <senf/PPI.hh>

We also define some namespace aliases

namespace module = senf::ppi::module;
namespace connector = senf::ppi::connector;
namespace ppi = senf::ppi;

The RateStuffer application is based on one additional application module.

The RateFilter module

The RateFilter module simply forwards packets at a fixed rate.
class RateFilter
    : public module::Module
{
    SENF_PPI_MODULE(RateFilter);
public:

    connector::ActiveInput<> input;
    connector::ActiveOutput<> output;

    RateFilter(senf::ClockService::clock_type interval);

private:
    void timeout();

    ppi::IntervalTimer timer;
};

Both connectors of the RateFilter module are active. The module is driven by a senf::ppi::IntervalTimer.

RateFilter::RateFilter(senf::ClockService::clock_type interval)
    : timer(interval)
{
    route(input,timer);
    route(timer,output);
    registerEvent(timer, &RateFilter::timeout);
}

The event is initialized to fire every interval nanoseconds. The traffic is routed 'across' the timer which controls the traffic. This routing lets the module automatically handle throttling events. The timer is registered to call RateFilter::timeout().

void RateFilter::timeout()
{
    output(input());
}

The event handler is quite simple: Every interval nanoseconds a packet is read from input and forwarded to output.

This is all there is to the RateFilter module. Due to the routing setup, the timer will automatically be disabled should either input or output become throttled. However, in this specific case this should never happen: The input is connected (via the join) to the senf::ppi::module::CloneSource, which will never throttle. The output is connected to a UDP socket which also never throttles.

The RateStuffer subnet

We decide to implement the RateStuffer as a subnet or collection. This is a simple struct or class which contains all the modules necessary for a specific functionality. The modules are initialized and connected in the class's constructor. External connectors are exported as references to the corresponding module connectors:
class RateStuffer
{
    module::ThrottleBarrier barrier;
    module::PassiveQueue    queue;
    module::CloneSource     generator;
    module::PriorityJoin    join;
    RateFilter              rateFilter;

First the needed modules are declared. We have

  • the barrier to discard incoming packets sent to fast
  • the queue to receive incoming packets and create throttling notifications
  • the generator to create the stuffing packets
  • the join to combine the input stream from the queue with the stuffing packet stream
  • the rateFilter to generate the fixed rate output stream
public:
    connector::PassiveInput<> & input;
    connector::ActiveOutput<> & output;

Here we declare the external connectors. The subnetwork exports a single input and output connector. The external connectors are declared as references.

    RateStuffer(senf::ClockService::clock_type interval,
                senf::Packet packet,
                unsigned high = 1,
                unsigned low  = 0)
    :   barrier    (),
        queue      (),
        generator  ( packet ),
        join       (),
        rateFilter ( interval ),
        input      ( barrier.input ),
        output     ( rateFilter.output )

The constructor now initializes all the local objects. We pass the template packet to the generator and set the timing interval of the rateFilter.

The input and output connector references are bound to the corresponding connectors we want to expose: input to the barrier's input and output to the rateFilter's output.

    {
        ppi::connect( barrier,    queue      );
        ppi::connect( queue,      join       );
        ppi::connect( generator,  join       );
        ppi::connect( join,       rateFilter );

        queue.qdisc(ppi::ThresholdQueueing(high,low));
    }
};

The constructor body sets up the connections within the subnetwork. Finally, we set the queueing discipline of the queue. This Completes the RateStuffer. This subnetwork can now be used like a module.

Application setup

The applications main() method starts out by initializing the socket handles
int main(int argc, char * argv[])
{
    senf::UDPv4ClientSocketHandle inputSocket(
        senf::INet4SocketAddress("0.0.0.0:44344"));

    senf::ConnectedUDPv4ClientSocketHandle outputSocket(
        senf::INet4SocketAddress("localhost:44345"));

The inputSocket is listening on port 44344 while the outputSocket will send packets to port 44345 on localhost. The outputSocket uses the senf::ConnectedUDPv4SocketProtocol which is compatible with the senf::ppi::module::PassiveSocketSink module.

    module::ActiveSocketSource<>  udpSource ( inputSocket );
    RateStuffer                   stuffer   ( 1000000000ul,
                                              senf::DataPacket::create(std::string("<idle>\n")),
                                              2u, 1u );
    module::PassiveSocketSink<>   udpSink   ( outputSocket );

Here we allocate the components:

  • udpSource to read the packets from inputSocket
  • stuffer for the real work and
  • udpSink to send the packets to outputSocket
    ppi::connect( udpSource, stuffer   );
    ppi::connect( stuffer,   udpSink );

The senf::ppi::connect() calls setup the necessary connections.

The module setup is complete, senf::ppi::run() is called to enter the event loop.

    ppi::run();

    return 0;
}