QueueReadWritePolicy.cci
Go to the documentation of this file.
1 //
2 // Copyright (c) 2020 Fraunhofer Institute for Applied Information Technology (FIT)
3 // Network Research Group (NET)
4 // Schloss Birlinghoven, 53754 Sankt Augustin, GERMANY
5 // Contact: support@wiback.org
6 //
7 // This file is part of the SENF code tree.
8 // It is licensed under the 3-clause BSD License (aka New BSD License).
9 // See LICENSE.txt in the top level directory for details or visit
10 // https://opensource.org/licenses/BSD-3-Clause
11 //
12 
13 
14 /** \file
15  \brief QueueReadWritePolicy inline non-template implementation */
16 
17 #include "QueueReadWritePolicy.ih"
18 
19 // Custom includes
20 #include <sys/types.h>
21 #include <sys/socket.h>
22 #include <senf/Utils/Logger.hh>
23 #include <senf/Utils/senflikely.hh>
24 #include <senf/Scheduler/Scheduler.hh>
25 
26 #define prefix_ inline
27 //-/////////////////////////////////////////////////////////////////////////////////////////////////
28 
29 prefix_ senf::detail::QueueInfo * senf::detail::QueuePolicyBase::qinfo(FileHandle & handle)
30 {
31  SENF_ASSERT( extraPtr(handle), "invalid queue file handle (no QueueInfo found)");
32  return static_cast<detail::QueueInfo *>(extraPtr(handle));
33 }
34 
35 prefix_ void senf::detail::QueueInfo::init(unsigned rxqlen, unsigned txqlen)
36 {
37  if (rxqlen > 0) {
38  rx.begin = rx.head = rx.tail = map;
39  rx.end = rx.begin + frameSize * rxqlen;
40  rx.idle = true;
41  rx.qlen = rxqlen;
42  }
43 
44  if (txqlen > 0) {
45  tx.begin = tx.head = tx.tail = ((rxqlen == 0 ) ? map : rx.end);
46  tx.end = tx.begin + frameSize * txqlen;
47  tx.idle = true;
48  tx.qlen = txqlen;
49  }
50 
51  unsigned slice (rx.qlen / NUM_SAMPLES);
52  for (unsigned n = 0; n < NUM_SAMPLES; n++)
53  rx.samples[n] = &(reinterpret_cast<struct ::tpacket2_hdr *>(rx.begin + (n * slice * frameSize))->tp_status);
54 
55  slice = tx.qlen / NUM_SAMPLES;
56  for (unsigned n = 0; n < NUM_SAMPLES; n++)
57  tx.samples[n] = &(reinterpret_cast<struct ::tpacket2_hdr *>(tx.begin + (n * slice * frameSize))->tp_status);
58 }
59 
60 prefix_ void senf::detail::QueueInfo::RxStats::dump(std::ostream & os)
61  const
62 {
63  os << "received " << received << ", "
64  << "red " << red << ", "
65  << "ignored " << ignored << ", "
66  << "extMemory " << extMemory << ", "
67  << "noop " << noop << ".";
68 }
69 
70 prefix_ void senf::detail::QueueInfo::TxStats::dump(std::ostream & os)
71  const
72 {
73  os << "sent " << sent << ", "
74  << "wrongFormat " << wrongFormat << ", "
75  << "red " << red << ", "
76  << "overrun " << overrun << ", "
77  << "dropped " << dropped << ". ";
78 }
79 
80 prefix_ unsigned senf::detail::QueueInfo::usageRx()
81  const
82 {
83  unsigned sum (0);
84  for (unsigned n = 0; n < NUM_SAMPLES; n++)
85  sum += (*(rx.samples[n]) != TP_STATUS_KERNEL);
86 
87  return sum;
88 }
89 
90 prefix_ unsigned senf::detail::QueueInfo::usageTx()
91  const
92 {
93  unsigned sum (0);
94  for (unsigned n = 0; n < NUM_SAMPLES; n++)
95  sum += (*(tx.samples[n]) != TP_STATUS_AVAILABLE);
96 
97  return sum;
98 }
99 
100 prefix_ void senf::detail::QueueInfo::inc(unsigned char * & ptr, Queue const & q)
101 {
102  ptr += frameSize;
103  if (SENF_UNLIKELY(ptr == q.end))
104  ptr = q.begin;
105 }
106 
107 prefix_ bool senf::detail::QueueInfo::interfaceDead()
108  const
109 {
110  // a simple check, if the underlying interface is 'dead' or not responding (i.e. claims to have a 'link' but is not sending/receiving - ath9k bug ?)
111  return (rxStats.received == 0) and (txStats.sent == 0 and (txStats.overrun > 0 or txStats.red > 0 or txStats.dropped > 0));
112 }
113 
114 ///////////////////////////////////////////////////////////////////////////
115 // senf::SocketQueueBuffer
116 
117 prefix_ senf::SocketQueueBuffer::SocketQueueBuffer()
118  : b_ (0), e_ (0), hdrlen_ (0)
119 {}
120 
121 prefix_ senf::SocketQueueBuffer::SocketQueueBuffer(unsigned char * b, unsigned char * e,
122  unsigned hdrlen)
123  : b_ (b), e_ (e), hdrlen_ (hdrlen)
124 {}
125 
126 prefix_ struct ::tpacket2_hdr & senf::SocketQueueBuffer::hdr()
127 {
128  return * reinterpret_cast<struct ::tpacket2_hdr *>(b_);
129 }
130 
131 prefix_ struct ::tpacket2_hdr const & senf::SocketQueueBuffer::hdr()
132  const
133 {
134  return * reinterpret_cast<struct ::tpacket2_hdr const *>(b_);
135 }
136 
137 prefix_ senf::SocketQueueBuffer::iterator senf::SocketQueueBuffer::begin()
138 {
139  return b_ + hdr().tp_mac;
140 }
141 
142 prefix_ senf::SocketQueueBuffer::iterator senf::SocketQueueBuffer::end()
143 {
144  return begin() + hdr().tp_len;
145 }
146 
147 prefix_ senf::SocketQueueBuffer::const_iterator senf::SocketQueueBuffer::begin()
148  const
149 {
150  return b_ + hdr().tp_mac;
151 }
152 
153 prefix_ senf::SocketQueueBuffer::const_iterator senf::SocketQueueBuffer::end()
154  const
155 {
156  return begin() + hdr().tp_len;
157 }
158 
159 prefix_ senf::SocketQueueBuffer::size_type senf::SocketQueueBuffer::size()
160  const
161 {
162  return hdr().tp_len;
163 }
164 
165 prefix_ bool senf::SocketQueueBuffer::empty()
166  const
167 {
168  return hdr().tp_len == 0;
169 }
170 
171 prefix_ senf::SocketQueueBuffer::iterator senf::SocketQueueBuffer::frameBegin()
172 {
173  return b_ + hdrlen_;
174 }
175 
176 prefix_ senf::SocketQueueBuffer::iterator senf::SocketQueueBuffer::frameEnd()
177 {
178  return e_;
179 }
180 
181 prefix_ senf::SocketQueueBuffer::size_type senf::SocketQueueBuffer::frameSize()
182  const
183 {
184  return frameEnd() - frameBegin();
185 }
186 
187 prefix_ senf::SocketQueueBuffer::const_iterator senf::SocketQueueBuffer::frameBegin()
188  const
189 {
190  return b_ + hdrlen_;
191 }
192 
193 prefix_ senf::SocketQueueBuffer::const_iterator senf::SocketQueueBuffer::frameEnd()
194  const
195 {
196  return e_;
197 }
198 
199 prefix_ unsigned senf::SocketQueueBuffer::dataOffset()
200  const
201 {
202  return begin() - frameBegin();
203 }
204 
205 prefix_ senf::ClockService::clock_type senf::SocketQueueBuffer::timestamp()
206  const
207 {
208  return ClockService::from_time_t(hdr().tp_sec) + ClockService::nanoseconds(hdr().tp_nsec);
209 }
210 
211 prefix_ std::uint32_t const * senf::SocketQueueBuffer::timestampPtr()
212  const
213 {
214  return &(hdr().tp_sec);
215 }
216 
217 prefix_ senf::LLSocketAddress senf::SocketQueueBuffer::address()
218  const
219 {
220  SENF_ASSERT( hdr().tp_mac > hdrlen_,
221  "frame has no address field" );
222  senf::LLSocketAddress res;
223  ::memcpy(res.sockaddr_p(),
224  reinterpret_cast<struct ::sockaddr_ll *>(
225  b_ + hdrlen_),
226  sizeof(struct ::sockaddr_ll));
227  return res;
228 }
229 
230 prefix_ boost::optional<unsigned> senf::SocketQueueBuffer::vlan()
231  const
232 {
233  return hdr().tp_status & TP_STATUS_VLAN_VALID
234  ? boost::optional<unsigned>(hdr().tp_vlan_tci) : boost::none;
235 }
236 
237 prefix_ unsigned senf::SocketQueueBuffer::tpid()
238  const
239 {
240  return hdr().tp_status & TP_STATUS_VLAN_TPID_VALID ? hdr().tp_vlan_tpid : 0x8100;
241 }
242 
243 
244 prefix_ void senf::SocketQueueBuffer::resize(size_type sz
245 #ifdef SENF_ENABLE_TPACKET_OFFSET
246  , int offset
247 #endif
248 )
249 {
250 #ifdef SENF_ENABLE_TPACKET_OFFSET
251  if (offset == -1)
252  offset = dataOffset();
253 #else
254  unsigned offset (dataOffset());
255 #endif
256  SENF_ASSERT( frameBegin() + offset + sz <= frameEnd(), "frame size exceeded" );
257  hdr().tp_mac = hdr().tp_net = frameBegin() + offset - b_;
258  hdr().tp_len = sz;
259 }
260 
261 ///////////////////////////////////////////////////////////////////////////
262 // senf::QueueReadPolicy
263 
264 prefix_ boost::optional<senf::QueueReadPolicy::Buffer>
265 senf::QueueReadPolicy::dequeue(FileHandle & handle)
266 {
267  detail::QueueInfo & qi (* qinfo(handle));
268  for (unsigned count (0); count < 2*qi.rx.qlen; ++count) {
269  struct ::tpacket2_hdr & pk (* reinterpret_cast<struct ::tpacket2_hdr *>(qi.rx.head));
270  if (SENF_LIKELY((qi.rx.idle || qi.rx.head != qi.rx.tail) && pk.tp_status != TP_STATUS_KERNEL)) {
271  qi.rxStats.received++;
272  Buffer bf (qi.rx.head, qi.rx.head + qi.frameSize, qi.hdrlen);
273  struct ::sockaddr_ll & sa (* reinterpret_cast<struct ::sockaddr_ll *>(qi.rx.head + qi.hdrlen));
274  qi.inc(qi.rx.head, qi.rx);
275  if (SENF_UNLIKELY(sa.sll_pkttype >= PACKET_OUTGOING || bf.end() > bf.frameEnd() || bf.empty())) {
276  qi.rxStats.ignored++;
277  pk.tp_status = TP_STATUS_KERNEL;
278  if (qi.rx.idle)
279  qi.rx.tail = qi.rx.head;
280  } else {
281  qi.rx.idle = false;
282  return bf;
283  }
284  }
285  }
286  return boost::none;
287 }
288 
289 prefix_ void senf::QueueReadPolicy::release(FileHandle & handle)
290 {
291  detail::QueueInfo & qi (* qinfo(handle));
292  while (SENF_LIKELY(!qi.rx.idle || qi.rx.tail != qi.rx.head)) { // we assume bursts => likely
293  struct ::tpacket2_hdr & pk (* reinterpret_cast<struct ::tpacket2_hdr*>(qi.rx.tail));
294  pk.tp_status = TP_STATUS_KERNEL;
295  qi.inc(qi.rx.tail, qi.rx);
296  // We set qi.rx.idle prematurely but this is safe: when the while loop is done,
297  // the queue is indeed idle. After the first loop iteration, it is impossible for
298  // the queue to be still completely filled (that is head == tail) so the while
299  // condition above is now correct with idle = true
300  qi.rx.idle = true;
301  }
302 }
303 
304 prefix_ unsigned senf::QueueReadPolicy::read(FileHandle & handle, char * buffer, unsigned size)
305 {
306  if (SENF_UNLIKELY(!handle))
307  return 0;
308 
309  SENF_ASSERT( qinfo(handle)->rx.idle,
310  "Inconsistent mixing of read() and dequeue() on mmap socket" );
311  bool blocking (false);
312  for (;;) {
313  boost::optional<Buffer> buf (dequeue(handle));
314  if (! buf) {
315  if (blocking || handle.blocking()) {
316  blocking = true;
317  handle.waitReadable();
318  continue;
319  }
320  return 0;
321  }
322  if (unsigned(buf->size()) < size)
323  size = buf->size();
324  ::memcpy(buffer, buf->begin(), size);
325  release(handle);
326  return size;
327  }
328 }
329 
330 ///////////////////////////////////////////////////////////////////////////
331 // senf::QueueWritePolicy
332 
333 prefix_ boost::optional<senf::QueueWritePolicy::Buffer>
334 senf::QueueWritePolicy::enqueue(FileHandle & handle)
335 {
336  detail::QueueInfo & qi (* qinfo(handle));
337  struct ::tpacket2_hdr & pk (* reinterpret_cast<struct ::tpacket2_hdr*>(qi.tx.head));
338  if (SENF_LIKELY((qi.tx.idle || qi.tx.head != qi.tx.tail) && (pk.tp_status == TP_STATUS_AVAILABLE))) {
339  ::memset(&pk, 0, qi.hdrlen);
340  pk.tp_mac = pk.tp_net = qi.hdrlen
341 #ifdef SENF_ENABLE_TPACKET_OFFSET
342  + qi.reserve
343 #endif
344  ;
345  Buffer bf (qi.tx.head, qi.tx.head + qi.frameSize, qi.hdrlen);
346  qi.inc(qi.tx.head, qi.tx);
347  qi.tx.idle = false;
348  return bf;
349  }
350 
351  // our TX MMAP ring is full
352  qi.txStats.overrun++;
353 
354  // let's trigger a send() to push out the packets
355  // this should be handled via the writable event, but it can't hurt...
356  if (handle.writeable())
357  send(handle);
358 
359  return boost::none;
360 }
361 
362 prefix_ void senf::QueueWritePolicy::flush(FileHandle & handle)
363 {
364  if (SENF_UNLIKELY(!handle))
365  return;
366 
367  detail::QueueInfo & qi (* qinfo(handle));
368 
369  //
370  // just forget about any frames in the ring which have not yet been pushed into the kernel
371  //
372  while (SENF_LIKELY(!qi.tx.idle || qi.tx.tail != qi.tx.head)) { // we assume bursts >> likely
373  struct ::tpacket2_hdr & pk (* reinterpret_cast<struct ::tpacket2_hdr*>(qi.tx.tail));
374  pk.tp_status = TP_STATUS_AVAILABLE;
375  qi.inc(qi.tx.tail, qi.tx);
376  // We set qi.tx.idle prematurely but this is safe: when the while loop is done,
377  // the queue is indeed idle. After the first loop iteration, it is impossible for
378  // the queue to be still completely filled (that is head == tail) so the while
379  // condition above is now correct with idle = true
380  qi.tx.idle = true;
381  }
382 
383  // check if there are any more SEND_REQUEST marked frames that we can just discard/flush
384  for (unsigned char * buf = qi.tx.begin; buf < qi.tx.end; buf += qi.frameSize) {
385  struct ::tpacket2_hdr & pk (* reinterpret_cast<struct ::tpacket2_hdr*>(buf));
386  if (pk.tp_status == TP_STATUS_SEND_REQUEST) {
387  pk.tp_status = TP_STATUS_AVAILABLE;
388  }
389  }
390 }
391 
392 prefix_ bool senf::QueueWritePolicy::send(FileHandle & handle)
393 {
394  detail::QueueInfo & qi (* qinfo(handle));
395  unsigned char * last_tail (qi.tx.tail);
396  while (SENF_LIKELY(!qi.tx.idle || qi.tx.tail != qi.tx.head)) { // we assume bursts >> likely
397  struct ::tpacket2_hdr & pk (* reinterpret_cast<struct ::tpacket2_hdr*>(qi.tx.tail));
398  SENF_ASSERT(pk.tp_len, "missing call to Buffer::resize()");
399  pk.tp_status = TP_STATUS_SEND_REQUEST;
400  qi.inc(qi.tx.tail, qi.tx);
401  qi.txStats.sent++;
402  // We set qi.tx.idle prematurely but this is safe: when the while loop is done,
403  // the queue is indeed idle. After the first loop iteration, it is impossible for
404  // the queue to be still completely filled (that is head == tail) so the while
405  // condition above is now correct with idle = true
406  qi.tx.idle = true;
407  }
408 
409  bool tryAgain (false);
410 
411  ssize_t sent;
412  for (;;) {
413  if (SENF_UNLIKELY((sent = ::send(handle.fd(), NULL, 0, MSG_DONTWAIT)) < 0)) {
414  switch (errno) {
415  case EINTR:
416  // We used to break here as 3.16.x seems to throw tons of those and 'continue' caused a busy-loop
417  // This should be fixed by our patch which made it into Linux 4.1
418  tryAgain = true;
419  break; // mtk 01-jul-2015: let's break here again for older kernels
420  case EAGAIN:
421  tryAgain = true;
422  break;
423  case EMSGSIZE:
424  // Note: As of 1-Apr-2019 we use the PACKET_LOSS option and should not see this error anymore...
425  // the trigerring frame(s) will be marked by the kernel as WRONG_FORMAT
426  for (unsigned char * buf = qi.tx.begin; buf < qi.tx.end; buf += qi.frameSize) {
427  struct ::tpacket2_hdr & pk (* reinterpret_cast<struct ::tpacket2_hdr*>(buf));
428  if (pk.tp_status & TP_STATUS_WRONG_FORMAT) {
429  pk.tp_status = TP_STATUS_SEND_REQUEST;
430  // This does not seem to work properly...
431  pk.tp_len = 64; // set the length of this frame 64 bytes - effectively dropping it without having to rearrange the mmap queue.
432  qi.txStats.wrongFormat++;
433  qi.txStats.dropped++;
434  break;
435  }
436  }
437  continue;
438  case ENOBUFS:
439  // Just break'ing here is ok for normal sockets. But may cause problems when QDISC_BYPASS is turned on.
440  // In this case, the kernel seems to report ENOBUFS instead of ENOTCON or ENETDOWN and
441  // the sndBuf seems to be full, but the socket is still writable...
442  break;
443  case ENOTCONN:
444  case ENETDOWN:
445  case ENXIO:
446  // Some frames might have been sent, hence we first have to reset and then advance tx.tail accordingly
447  qi.tx.tail = last_tail;
448  while (qi.tx.tail != qi.tx.head) {
449  struct ::tpacket2_hdr & pk (* reinterpret_cast<struct ::tpacket2_hdr*>(qi.tx.tail));
450  if (pk.tp_status != TP_STATUS_SEND_REQUEST)
451  qi.inc(qi.tx.tail, qi.tx);
452  else
453  break;
454  }
455  // We flush our queue, to avoid a busy loop, effectively dropping all frames !!!
456  for (unsigned char * buf = qi.tx.begin; buf < qi.tx.end; buf += qi.frameSize) {
457  struct ::tpacket2_hdr & pk (* reinterpret_cast<struct ::tpacket2_hdr*>(buf));
458  if (pk.tp_status == TP_STATUS_SEND_REQUEST) {
459  pk.tp_status = TP_STATUS_AVAILABLE;
460  qi.txStats.dropped++;
461  }
462  }
463  // reset tx.head effectily dropping all unsent frames
464  qi.tx.head = qi.tx.tail;
465  break;
466  default:
467  SENF_THROW_SYSTEM_EXCEPTION("::send(MMAP)");
468  }
469  } else {
470  if (sent > 0) {
471  // we might have more data to send...
472  tryAgain = true;
473  }
474  }
475  break;
476  }
477 
478  return tryAgain;
479 }
480 
481 prefix_ unsigned senf::QueueWritePolicy::write(FileHandle & handle, char const * buffer,
482  unsigned size)
483 {
484  if (SENF_UNLIKELY(!handle))
485  return 0;
486 
487  SENF_ASSERT( qinfo(handle)->tx.idle,
488  "Inconsistent mixing of write() and enqueue() on mmap socket" );
489  for (bool repeat (true);; repeat = false) {
490  boost::optional<Buffer> buf (enqueue(handle));
491  if (! buf) {
492  if (repeat && handle.blocking()) {
493  handle.waitWriteable();
494  continue;
495  }
496  return 0;
497  }
498  if (unsigned(buf->frameSize()) < size)
499  size = buf->frameSize();
500  buf->resize(size);
501  ::memcpy(buf->begin(), buffer, size);
502  send(handle);
503  return size;
504  }
505 }
506 
507 //-/////////////////////////////////////////////////////////////////////////////////////////////////
508 #undef prefix_
509 
510 
511 // Local Variables:
512 // mode: c++
513 // fill-column: 100
514 // comment-column: 40
515 // c-file-style: "senf"
516 // indent-tabs-mode: nil
517 // ispell-local-dictionary: "american"
518 // compile-command: "scons -u test"
519 // End: