Analyzer.cc
Go to the documentation of this file.
1 //
2 // Copyright (c) 2020 Fraunhofer Institute for Applied Information Technology (FIT)
3 // Network Research Group (NET)
4 // Schloss Birlinghoven, 53754 Sankt Augustin, GERMANY
5 // Contact: support@wiback.org
6 //
7 // This file is part of the SENF code tree.
8 // It is licensed under the 3-clause BSD License (aka New BSD License).
9 // See LICENSE.txt in the top level directory for details or visit
10 // https://opensource.org/licenses/BSD-3-Clause
11 //
12 
13 
14 #include <linux/net_tstamp.h>
18 #include <senf/Utils/membind.hh>
19 #include <senf/Utils/hexdump.hh>
23 #include <senf/PPI/PPI.hh>
25 
27 
28 #include "Configuration.hh"
30 
31 #define prefix_
32 //-/////////////////////////////////////////////////////////////////////////////////////////////////
33 
34 
36 {
38 
39 public:
43  unsigned numPkts_;
44  unsigned finPkts_;
45 
46  Generator(senf::MACAddress const & mac, Configuration const & config) :
47  timer("trigger", boost::bind(&Generator::sendPkts, this)),
48  eth(senf::EthernetPacket::create()),
49  numPkts_(config.numPackets),
50  finPkts_(0)
51  {
52  noroute(output);
53 
54  eth->source() << mac;
55  eth->destination() << config.destination;
56  auto ouiExt (senf::EthOUIExtensionPacket::createAfter(eth));
58  senf::DataPacket data(senf::DataPacket::createAfter(testPkt, config.pktSize - eth.size()));
59 
60  testPkt->sessionId() << config.sessionId;
61  testPkt->numPkts() << numPkts_;
62  testPkt->magic() << 0xaffe;
63 
64  eth.finalizeAll();
65 
66  SENF_LOG((senf::log::IMPORTANT) ("Starting generator on iface " << config.interface << " with MAC " << mac
67  << ", destination " << config.destination
68  << ", pktSize " << config.pktSize
69  << ", numPackets " << config.numPackets
70  << ", sessionId " << config.sessionId));
71 
72  // trigger sendPkts() in 1s
73  timer.timeout(senf::scheduler::now() + senf::ClockService::seconds(1));
74  }
75 
76  void sendPkts() {
77  // push packets into MMAP buffer
78  auto testPkt (eth.find<emu::InternalThroughputTestPacket>());
79  testPkt->timestamp() << senf::ClockService::in_nanoseconds(senf::scheduler::now());
80  for (unsigned n = 0; n < numPkts_; n++) {
81  testPkt->seqNo() << n;
82  output(eth.clone());
83  }
84 
85  // FIN sequence
86  std::cout << "FIN Sequence..." << std::endl;
87  timer.action(boost::bind(&Generator::FIN, this));
88  timer.timeout(senf::scheduler::now() + senf::ClockService::milliseconds(10));
89  // flush/send will be triggered as soon as we exit this method
90  }
91 
92  void FIN() {
93  if (finPkts_ < 10) {
94  auto testPkt (eth.find<emu::InternalThroughputTestPacket>());
95  testPkt->seqNo() << numPkts_;
96  output(eth.clone());
97  timer.timeout(senf::scheduler::now() + senf::ClockService::milliseconds(10));
98  finPkts_++;
99  }
100  else {
101  // terminate us in 1s
102  timer.action(boost::bind(&Generator::terminate, this));
103  timer.timeout(senf::scheduler::now() + senf::ClockService::seconds(1));
104  }
105  }
106 
107  void terminate() {
109  }
110 };
111 
113 {
115 
116 public:
119  std::map<std::uint32_t,std::int64_t> tstamps;
121  unsigned sessionId_;
122  bool verbose_;
123  unsigned OLd_numPkts=50;
124 
125  Analyzer(senf::MACAddress const & mac, Configuration const & config) :
126  ourMAC_(mac),
127  sessionId_(0),
128  verbose_(config.verbose)
129  {
130  noroute(input);
132  input.onRequest( &Analyzer::request);
133 
134  SENF_LOG((senf::log::IMPORTANT) ("Starting analyzer on iface " << config.interface << " with MAC " << mac) );
135  }
136 
137  void request() {
138  senf::EthernetPacket const & eth (input());
139 
140  // fetch the rx timestamp from the meta data
141  eth.annotation<senf::emu::annotations::Timestamp>().fromQueueBuffer(*(eth.annotation<senf::ppi::QueueBufferAnnotation>().value));
142 
143  if (eth->destination() == ourMAC_) {
144  auto testPkt (eth.find<emu::InternalThroughputTestPacket>(senf::nothrow));
145  if (testPkt) {
146  if (sessionId_ == 0 and testPkt->seqNo() < testPkt->numPkts())
147  sessionId_ = testPkt->sessionId();
148  if (sessionId_ != testPkt->sessionId()) {
149  if (testPkt->seqNo() != testPkt->numPkts()) {
150  SENF_LOG( (senf::log::IMPORTANT) ("Packet #" << testPkt->seqNo() << " ignored due to mismatching sessionId") );
151  }
152  return;
153  }
154 
155  // we have a valid packet
157 
158  if (verbose_) {
159  SENF_LOG( (senf::log::IMPORTANT) ("Packet #" << testPkt->seqNo() << "/" << testPkt->numPkts() << " received with length " << eth.size()
160  << ", timestamp " << rxTstamp) );
161  }
162 
163  if (testPkt->seqNo() < testPkt->numPkts()) {
164  tstamps.insert(std::make_pair(testPkt->seqNo(), rxTstamp));
165  } else {
166  // Analysis of tstamp vector here
167  analyzeData(testPkt->numPkts(), eth.size());
168  // we are done
169  tstamps.clear();
170  sessionId_ = 0;
171  }
172  }
173  }
174  }
175 
176  //
177  // Your code here !!!
178  //
179 
180  void analyzeData(unsigned numPkts, unsigned pktSize) {
181  if (tstamps.size() < 2) {
182  std::cerr << "Insufficient data for evaluation" << std::endl;
183  return;
184  }
185  std::map<std::int32_t,std::uint32_t> distribution;
187 
188  auto start (tstamps.begin());
189  for (auto it (std::next(start)); it != tstamps.end(); it++, start++) {
190 
191  if (it->first - start->first == 1) {
192  std::cout << "(" << start->first << " " << it->second - start->second << ")";
193  txTime.accumulate(it->second - start->second);
194  distribution[(it->second - start->second)/1000]++;
195  }
196  else
197  std::cout << "(loss after " << start->first << ", diff=" << it->first - start->first << ")";
198  }
199 
200  std::cout << std::endl;
201 
202  std::cout << "Sample distribution in us: ";
203  std::uint64_t avg (0), count (0);
204 
205 
206  for (auto const & d : distribution) {
207  std::cout << "(" << d.first << " => " << d.second << ")" << std::endl;
208  avg += d.first * d.second;
209  count += d.second;
210  }
211 
212  //Accumulating Link Capacity for each flow
213  float Link_Capacity;
214  float Throughput_avg,Throughput_deviation;
215 
216  Link_Capacity = (float(pktSize * 8) / txTime.data().avg);
217 
218  if(OLd_numPkts==numPkts){
219  }
220  else{
221 
222  auto data (ThroughputDiffs.data());
223  std::cout << "Results="<< " " << OLd_numPkts << " " << (Throughput_avg=data.avg) << " " << (Throughput_deviation=data.stddev) << std::endl;
224  //cout << "max=" << (max=data.max) << endl ;
225  //cout << "min=" << (min=data.min) << endl ;
226  //cout << "Total_avg=" << (avg=data.avg) << endl ;
227  //cout << "st_deviation=" << (deviation=data.stddev) << endl ;
228  //cout << "count=" << (data_count=data.cnt) << endl ;
229  //cout << "Total_Packets=" << OLd_numPkts << endl;
230 
231  ThroughputDiffs.clear();
232  OLd_numPkts+=10;
233 
234  }
235  ThroughputDiffs.accumulate(Link_Capacity);
236 
237  std::cout << ", avg " << (avg / count) << std::endl;
238  std::cout << "stats " << txTime.data() << ", Link_Capacity " << Link_Capacity << " Gbps" << std::endl;
239 
240 
241 
242 
243  }
244 };
245 
246 
247 int main(int argc, char const * argv[])
248 {
249  Configuration configuration;
250 
251  if (!configuration.parse( argc, argv)) {
252  exit(1);
253  }
254 
255  // determine MAC of outgoing interface
256  senf::NetdeviceController netdevCtrl (configuration.interface);
257  netdevCtrl.up();
258  senf::MACAddress macAddr(netdevCtrl.hardwareAddress());
259 
260  if (configuration.destination) {
261  // we are supposed to run as a generator
262  senf::ConnectedMMapPacketSocketHandle socket (configuration.interface, 1024, 4096);
263  socket.protocol().sndbuf(configuration.sendBuffer);
264  std::cout << "*** Using sndBuf size of " << socket.protocol().sndbuf() << " bytes. Max burstLength is " << socket.protocol().sndbuf() / configuration.pktSize << std::endl;
266  Generator generator(macAddr, configuration);
267  senf::ppi::connect( generator, sink);
268  try {
269  senf::ppi::run();
270  } catch(...) {};
271  auto ts (socket.protocol().txStats());
272  ts.dump(std::cout); std::cout << std::endl;
273  } else {
274  senf::ConnectedMMapPacketSocketHandle socket (configuration.interface, 1024, 4096);
276  try {
277  socket.protocol().timestamping(SOF_TIMESTAMPING_RX_HARDWARE | SOF_TIMESTAMPING_RAW_HARDWARE);
278  netdevCtrl.timestamping(HWTSTAMP_TX_OFF, HWTSTAMP_FILTER_ALL);
279  std::cerr << "Switching to RX_HARDWARE timestamping" << std::endl;
280  } catch(senf::Exception & ex) {
281  std::cerr << "Can not enable hw rx timestamping to due " << ex.what() << std::endl;
282  std::cerr << "Switching to RX_SOFTWARE timestamping" << std::endl;
283  try {
284  socket.protocol().timestamping(SOF_TIMESTAMPING_RX_SOFTWARE);
285  } catch(senf::Exception & ex) {
286  std::cerr << "Can not enable software rx timestamping to due " << ex.what() << std::endl;
287  std::cerr << "Continueing with kernel defaults" << std::endl;
288  }
289  }
290  source.maxBurst(configuration.numPackets);
291  Analyzer analyzer(macAddr, configuration);
292  senf::ppi::connect( source, analyzer);
293  senf::ppi::run();
294  }
295 }
296 
297 
298 //-/////////////////////////////////////////////////////////////////////////////////////////////////
299 #undef prefix_
300 //#include "MCSniffer.mpp"
301 
302 
303 // Local Variables:
304 // mode: c++
305 // fill-column: 100
306 // c-file-style: "senf"
307 // indent-tabs-mode: nil
308 // ispell-local-dictionary: "american"
309 // compile-command: "scons -u"
310 // comment-column: 40
311 // End:
senf::INet4SocketAddress destination
Analyzer(senf::MACAddress const &mac, Configuration const &config)
Definition: Analyzer.cc:125
void timestamping(int txType, int rxFilter)
std::uint8_t mac[6]
void data(StatisticsData &data_) const
boost::uint32_t pktSize
bool verbose_
Definition: Analyzer.cc:122
senf::MACAddress ourMAC_
Definition: Analyzer.cc:118
virtual void terminate() const
void accumulate(T const &value)
senf::StatisticAccumulator< float > ThroughputDiffs
Definition: Analyzer.cc:120
QueueReadPolicy::Buffer const * value
u8 data[SPECTRAL_HT20_NUM_BINS]
std::string interface
static SENF_CLOCKSERVICE_CONSTEXPR clock_type seconds(int64_type const &v)
unsigned finPkts_
Definition: Analyzer.cc:44
Annotations public header.
void noroute(connector::Connector &connector)
senf::EthernetPacket eth
Definition: Analyzer.cc:42
senf::ppi::connector::PassiveInput< senf::EthernetPacket > input
Definition: Analyzer.cc:114
senf::scheduler::TimerEvent timer
Definition: Analyzer.cc:41
void terminate()
Definition: Analyzer.cc:107
Generator(senf::MACAddress const &mac, Configuration const &config)
Definition: Analyzer.cc:46
std::uint32_t sendBuffer
static SENF_CLOCKSERVICE_CONSTEXPR clock_type milliseconds(int64_type const &v)
static SENF_CLOCKSERVICE_CONSTEXPR int64_type in_nanoseconds(clock_type const &v)
void timeout(ClockService::clock_type const &timeout, bool initiallyEnabled=true)
int run(int argc, char const *argv[])
Definition: dfstest.cc:83
std::uint64_t numPackets
Incoming packet timestamp.
Definition: Annotations.hh:86
senf::ClockService::clock_type as_clock_type() const
unsigned numPkts_
Definition: Analyzer.cc:43
void action(Callback const &cb)
void analyzeData(unsigned numPkts, unsigned pktSize)
Definition: Analyzer.cc:180
#define SENF_PPI_MODULE(name)
bool parse(int argc, char const *argv[])
nothrow
void sendPkts()
Definition: Analyzer.cc:76
MACAddress hardwareAddress() const
std::uint32_t count
virtual char const * what() const
std::map< std::uint32_t, std::int64_t > tstamps
Definition: Analyzer.cc:119
ProtocolClientSocketHandle< ConnectedMMapPacketSocketProtocol< QueueReadPolicy, QueueWritePolicy > > ConnectedMMapPacketSocketHandle
void FIN()
Definition: Analyzer.cc:92
senf::ppi::connector::ActiveOutput< senf::EthernetPacket > output
Definition: Analyzer.cc:37
unsigned sessionId
ConcretePacket< EthernetPacketType > EthernetPacket
#define SENF_LOG(args)
static ConcretePacket createAfter(Packet const &packet)
int main(int argc, char const *argv[])
Definition: Analyzer.cc:247
void request()
Definition: Analyzer.cc:137
void throttlingDisc(ThrottlingDisc const &disc)
unsigned sessionId_
Definition: Analyzer.cc:121