00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00026
00027
00028
00029
00030 #include <senf/Socket/Protocols/INet.hh>
00031 #include <senf/PPI.hh>
00032
00033
00034 #define prefix_
00035
00036
00037 namespace module = senf::ppi::module;
00038 namespace connector = senf::ppi::connector;
00039 namespace ppi = senf::ppi;
00040
00041
00042
00043
00044 class RateFilter
00045 : public module::Module
00046 {
00047 SENF_PPI_MODULE(RateFilter);
00048 public:
00049
00050 connector::ActiveInput<> input;
00051 connector::ActiveOutput<> output;
00052
00053 RateFilter(senf::ClockService::clock_type interval);
00054
00055 private:
00056 void timeout();
00057
00058 ppi::IntervalTimer timer;
00059 };
00060
00061 RateFilter::RateFilter(senf::ClockService::clock_type interval)
00062 : timer(interval)
00063 {
00064 route(input,timer);
00065 route(timer,output);
00066 registerEvent(timer, &RateFilter::timeout);
00067 }
00068
00069 void RateFilter::timeout()
00070 {
00071 output(input());
00072 }
00073
00074
00075
00076 class RateStuffer
00077 {
00078 module::ThrottleBarrier barrier;
00079 module::PassiveQueue queue;
00080 module::CloneSource generator;
00081 module::PriorityJoin join;
00082 RateFilter rateFilter;
00083
00084 public:
00085 connector::PassiveInput<> & input;
00086 connector::ActiveOutput<> & output;
00087
00088 RateStuffer(senf::ClockService::clock_type interval,
00089 senf::Packet packet,
00090 unsigned high = 1,
00091 unsigned low = 0)
00092 : barrier (),
00093 queue (),
00094 generator ( packet ),
00095 join (),
00096 rateFilter ( interval ),
00097 input ( barrier.input ),
00098 output ( rateFilter.output )
00099 {
00100 ppi::connect( barrier, queue );
00101 ppi::connect( queue, join );
00102 ppi::connect( generator, join );
00103 ppi::connect( join, rateFilter );
00104
00105 queue.qdisc(ppi::ThresholdQueueing(high,low));
00106 }
00107 };
00108
00109
00110
00111
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126 int main(int argc, char * argv[])
00127 {
00128 senf::UDPv4ClientSocketHandle inputSocket(
00129 senf::INet4SocketAddress("0.0.0.0:44344"));
00130
00131 senf::ConnectedUDPv4ClientSocketHandle outputSocket(
00132 senf::INet4SocketAddress("localhost:44345"));
00133
00134 module::ActiveSocketSource<> udpSource ( inputSocket );
00135 RateStuffer stuffer ( 1000000000ul,
00136 senf::DataPacket::create(std::string("<idle>\n")),
00137 2u, 1u );
00138 module::PassiveSocketSink<> udpSink ( outputSocket );
00139
00140 ppi::connect( udpSource, stuffer );
00141 ppi::connect( stuffer, udpSink );
00142
00143 ppi::run();
00144
00145 return 0;
00146 }
00147
00148
00149 #undef prefix_
00150
00151
00152
00153
00154
00155
00156
00157
00158
00159
00160
00161