2 // Copyright (c) 2020 Fraunhofer Institute for Applied Information Technology (FIT)
3 // Network Research Group (NET)
4 // Schloss Birlinghoven, 53754 Sankt Augustin, GERMANY
5 // Contact: support@wiback.org
7 // This file is part of the SENF code tree.
8 // It is licensed under the 3-clause BSD License (aka New BSD License).
9 // See LICENSE.txt in the top level directory for details or visit
10 // https://opensource.org/licenses/BSD-3-Clause
15 \brief QueueReadWritePolicy inline non-template implementation */
17 #include "QueueReadWritePolicy.ih"
20 #include <sys/types.h>
21 #include <sys/socket.h>
22 #include <senf/Utils/Logger.hh>
23 #include <senf/Utils/senflikely.hh>
24 #include <senf/Scheduler/Scheduler.hh>
26 #define prefix_ inline
27 //-/////////////////////////////////////////////////////////////////////////////////////////////////
29 prefix_ senf::detail::QueueInfo * senf::detail::QueuePolicyBase::qinfo(FileHandle & handle)
31 SENF_ASSERT( extraPtr(handle), "invalid queue file handle (no QueueInfo found)");
32 return static_cast<detail::QueueInfo *>(extraPtr(handle));
35 prefix_ void senf::detail::QueueInfo::init(unsigned rxqlen, unsigned txqlen)
38 rx.begin = rx.head = rx.tail = map;
39 rx.end = rx.begin + frameSize * rxqlen;
45 tx.begin = tx.head = tx.tail = ((rxqlen == 0 ) ? map : rx.end);
46 tx.end = tx.begin + frameSize * txqlen;
51 unsigned slice (rx.qlen / NUM_SAMPLES);
52 for (unsigned n = 0; n < NUM_SAMPLES; n++)
53 rx.samples[n] = &(reinterpret_cast<struct ::tpacket2_hdr *>(rx.begin + (n * slice * frameSize))->tp_status);
55 slice = tx.qlen / NUM_SAMPLES;
56 for (unsigned n = 0; n < NUM_SAMPLES; n++)
57 tx.samples[n] = &(reinterpret_cast<struct ::tpacket2_hdr *>(tx.begin + (n * slice * frameSize))->tp_status);
60 prefix_ void senf::detail::QueueInfo::RxStats::dump(std::ostream & os)
63 os << "received " << received << ", "
64 << "red " << red << ", "
65 << "ignored " << ignored << ", "
66 << "extMemory " << extMemory << ", "
67 << "noop " << noop << ".";
70 prefix_ void senf::detail::QueueInfo::TxStats::dump(std::ostream & os)
73 os << "sent " << sent << ", "
74 << "wrongFormat " << wrongFormat << ", "
75 << "red " << red << ", "
76 << "overrun " << overrun << ", "
77 << "dropped " << dropped << ". ";
80 prefix_ unsigned senf::detail::QueueInfo::usageRx()
84 for (unsigned n = 0; n < NUM_SAMPLES; n++)
85 sum += (*(rx.samples[n]) != TP_STATUS_KERNEL);
90 prefix_ unsigned senf::detail::QueueInfo::usageTx()
94 for (unsigned n = 0; n < NUM_SAMPLES; n++)
95 sum += (*(tx.samples[n]) != TP_STATUS_AVAILABLE);
100 prefix_ void senf::detail::QueueInfo::inc(unsigned char * & ptr, Queue const & q)
103 if (SENF_UNLIKELY(ptr == q.end))
107 prefix_ bool senf::detail::QueueInfo::interfaceDead()
110 // a simple check, if the underlying interface is 'dead' or not responding (i.e. claims to have a 'link' but is not sending/receiving - ath9k bug ?)
111 return (rxStats.received == 0) and (txStats.sent == 0 and (txStats.overrun > 0 or txStats.red > 0 or txStats.dropped > 0));
114 ///////////////////////////////////////////////////////////////////////////
115 // senf::SocketQueueBuffer
117 prefix_ senf::SocketQueueBuffer::SocketQueueBuffer()
118 : b_ (0), e_ (0), hdrlen_ (0)
121 prefix_ senf::SocketQueueBuffer::SocketQueueBuffer(unsigned char * b, unsigned char * e,
123 : b_ (b), e_ (e), hdrlen_ (hdrlen)
126 prefix_ struct ::tpacket2_hdr & senf::SocketQueueBuffer::hdr()
128 return * reinterpret_cast<struct ::tpacket2_hdr *>(b_);
131 prefix_ struct ::tpacket2_hdr const & senf::SocketQueueBuffer::hdr()
134 return * reinterpret_cast<struct ::tpacket2_hdr const *>(b_);
137 prefix_ senf::SocketQueueBuffer::iterator senf::SocketQueueBuffer::begin()
139 return b_ + hdr().tp_mac;
142 prefix_ senf::SocketQueueBuffer::iterator senf::SocketQueueBuffer::end()
144 return begin() + hdr().tp_len;
147 prefix_ senf::SocketQueueBuffer::const_iterator senf::SocketQueueBuffer::begin()
150 return b_ + hdr().tp_mac;
153 prefix_ senf::SocketQueueBuffer::const_iterator senf::SocketQueueBuffer::end()
156 return begin() + hdr().tp_len;
159 prefix_ senf::SocketQueueBuffer::size_type senf::SocketQueueBuffer::size()
165 prefix_ bool senf::SocketQueueBuffer::empty()
168 return hdr().tp_len == 0;
171 prefix_ senf::SocketQueueBuffer::iterator senf::SocketQueueBuffer::frameBegin()
176 prefix_ senf::SocketQueueBuffer::iterator senf::SocketQueueBuffer::frameEnd()
181 prefix_ senf::SocketQueueBuffer::size_type senf::SocketQueueBuffer::frameSize()
184 return frameEnd() - frameBegin();
187 prefix_ senf::SocketQueueBuffer::const_iterator senf::SocketQueueBuffer::frameBegin()
193 prefix_ senf::SocketQueueBuffer::const_iterator senf::SocketQueueBuffer::frameEnd()
199 prefix_ unsigned senf::SocketQueueBuffer::dataOffset()
202 return begin() - frameBegin();
205 prefix_ senf::ClockService::clock_type senf::SocketQueueBuffer::timestamp()
208 return ClockService::from_time_t(hdr().tp_sec) + ClockService::nanoseconds(hdr().tp_nsec);
211 prefix_ std::uint32_t const * senf::SocketQueueBuffer::timestampPtr()
214 return &(hdr().tp_sec);
217 prefix_ senf::LLSocketAddress senf::SocketQueueBuffer::address()
220 SENF_ASSERT( hdr().tp_mac > hdrlen_,
221 "frame has no address field" );
222 senf::LLSocketAddress res;
223 ::memcpy(res.sockaddr_p(),
224 reinterpret_cast<struct ::sockaddr_ll *>(
226 sizeof(struct ::sockaddr_ll));
230 prefix_ boost::optional<unsigned> senf::SocketQueueBuffer::vlan()
233 return hdr().tp_status & TP_STATUS_VLAN_VALID
234 ? boost::optional<unsigned>(hdr().tp_vlan_tci) : boost::none;
237 prefix_ unsigned senf::SocketQueueBuffer::tpid()
240 return hdr().tp_status & TP_STATUS_VLAN_TPID_VALID ? hdr().tp_vlan_tpid : 0x8100;
244 prefix_ void senf::SocketQueueBuffer::resize(size_type sz
245 #ifdef SENF_ENABLE_TPACKET_OFFSET
250 #ifdef SENF_ENABLE_TPACKET_OFFSET
252 offset = dataOffset();
254 unsigned offset (dataOffset());
256 SENF_ASSERT( frameBegin() + offset + sz <= frameEnd(), "frame size exceeded" );
257 hdr().tp_mac = hdr().tp_net = frameBegin() + offset - b_;
261 ///////////////////////////////////////////////////////////////////////////
262 // senf::QueueReadPolicy
264 prefix_ boost::optional<senf::QueueReadPolicy::Buffer>
265 senf::QueueReadPolicy::dequeue(FileHandle & handle)
267 detail::QueueInfo & qi (* qinfo(handle));
268 for (unsigned count (0); count < 2*qi.rx.qlen; ++count) {
269 struct ::tpacket2_hdr & pk (* reinterpret_cast<struct ::tpacket2_hdr *>(qi.rx.head));
270 if (SENF_LIKELY((qi.rx.idle || qi.rx.head != qi.rx.tail) && pk.tp_status != TP_STATUS_KERNEL)) {
271 qi.rxStats.received++;
272 Buffer bf (qi.rx.head, qi.rx.head + qi.frameSize, qi.hdrlen);
273 struct ::sockaddr_ll & sa (* reinterpret_cast<struct ::sockaddr_ll *>(qi.rx.head + qi.hdrlen));
274 qi.inc(qi.rx.head, qi.rx);
275 if (SENF_UNLIKELY(sa.sll_pkttype >= PACKET_OUTGOING || bf.end() > bf.frameEnd() || bf.empty())) {
276 qi.rxStats.ignored++;
277 pk.tp_status = TP_STATUS_KERNEL;
279 qi.rx.tail = qi.rx.head;
289 prefix_ void senf::QueueReadPolicy::release(FileHandle & handle)
291 detail::QueueInfo & qi (* qinfo(handle));
292 while (SENF_LIKELY(!qi.rx.idle || qi.rx.tail != qi.rx.head)) { // we assume bursts => likely
293 struct ::tpacket2_hdr & pk (* reinterpret_cast<struct ::tpacket2_hdr*>(qi.rx.tail));
294 pk.tp_status = TP_STATUS_KERNEL;
295 qi.inc(qi.rx.tail, qi.rx);
296 // We set qi.rx.idle prematurely but this is safe: when the while loop is done,
297 // the queue is indeed idle. After the first loop iteration, it is impossible for
298 // the queue to be still completely filled (that is head == tail) so the while
299 // condition above is now correct with idle = true
304 prefix_ unsigned senf::QueueReadPolicy::read(FileHandle & handle, char * buffer, unsigned size)
306 if (SENF_UNLIKELY(!handle))
309 SENF_ASSERT( qinfo(handle)->rx.idle,
310 "Inconsistent mixing of read() and dequeue() on mmap socket" );
311 bool blocking (false);
313 boost::optional<Buffer> buf (dequeue(handle));
315 if (blocking || handle.blocking()) {
317 handle.waitReadable();
322 if (unsigned(buf->size()) < size)
324 ::memcpy(buffer, buf->begin(), size);
330 ///////////////////////////////////////////////////////////////////////////
331 // senf::QueueWritePolicy
333 prefix_ boost::optional<senf::QueueWritePolicy::Buffer>
334 senf::QueueWritePolicy::enqueue(FileHandle & handle)
336 detail::QueueInfo & qi (* qinfo(handle));
337 struct ::tpacket2_hdr & pk (* reinterpret_cast<struct ::tpacket2_hdr*>(qi.tx.head));
338 if (SENF_LIKELY((qi.tx.idle || qi.tx.head != qi.tx.tail) && (pk.tp_status == TP_STATUS_AVAILABLE))) {
339 ::memset(&pk, 0, qi.hdrlen);
340 pk.tp_mac = pk.tp_net = qi.hdrlen
341 #ifdef SENF_ENABLE_TPACKET_OFFSET
345 Buffer bf (qi.tx.head, qi.tx.head + qi.frameSize, qi.hdrlen);
346 qi.inc(qi.tx.head, qi.tx);
351 // our TX MMAP ring is full
352 qi.txStats.overrun++;
354 // let's trigger a send() to push out the packets
355 // this should be handled via the writable event, but it can't hurt...
356 if (handle.writeable())
362 prefix_ void senf::QueueWritePolicy::flush(FileHandle & handle)
364 if (SENF_UNLIKELY(!handle))
367 detail::QueueInfo & qi (* qinfo(handle));
370 // just forget about any frames in the ring which have not yet been pushed into the kernel
372 while (SENF_LIKELY(!qi.tx.idle || qi.tx.tail != qi.tx.head)) { // we assume bursts >> likely
373 struct ::tpacket2_hdr & pk (* reinterpret_cast<struct ::tpacket2_hdr*>(qi.tx.tail));
374 pk.tp_status = TP_STATUS_AVAILABLE;
375 qi.inc(qi.tx.tail, qi.tx);
376 // We set qi.tx.idle prematurely but this is safe: when the while loop is done,
377 // the queue is indeed idle. After the first loop iteration, it is impossible for
378 // the queue to be still completely filled (that is head == tail) so the while
379 // condition above is now correct with idle = true
383 // check if there are any more SEND_REQUEST marked frames that we can just discard/flush
384 for (unsigned char * buf = qi.tx.begin; buf < qi.tx.end; buf += qi.frameSize) {
385 struct ::tpacket2_hdr & pk (* reinterpret_cast<struct ::tpacket2_hdr*>(buf));
386 if (pk.tp_status == TP_STATUS_SEND_REQUEST) {
387 pk.tp_status = TP_STATUS_AVAILABLE;
392 prefix_ bool senf::QueueWritePolicy::send(FileHandle & handle)
394 detail::QueueInfo & qi (* qinfo(handle));
395 unsigned char * last_tail (qi.tx.tail);
396 while (SENF_LIKELY(!qi.tx.idle || qi.tx.tail != qi.tx.head)) { // we assume bursts >> likely
397 struct ::tpacket2_hdr & pk (* reinterpret_cast<struct ::tpacket2_hdr*>(qi.tx.tail));
398 SENF_ASSERT(pk.tp_len, "missing call to Buffer::resize()");
399 pk.tp_status = TP_STATUS_SEND_REQUEST;
400 qi.inc(qi.tx.tail, qi.tx);
402 // We set qi.tx.idle prematurely but this is safe: when the while loop is done,
403 // the queue is indeed idle. After the first loop iteration, it is impossible for
404 // the queue to be still completely filled (that is head == tail) so the while
405 // condition above is now correct with idle = true
409 bool tryAgain (false);
413 if (SENF_UNLIKELY((sent = ::send(handle.fd(), NULL, 0, MSG_DONTWAIT)) < 0)) {
416 // We used to break here as 3.16.x seems to throw tons of those and 'continue' caused a busy-loop
417 // This should be fixed by our patch which made it into Linux 4.1
419 break; // mtk 01-jul-2015: let's break here again for older kernels
424 // Note: As of 1-Apr-2019 we use the PACKET_LOSS option and should not see this error anymore...
425 // the trigerring frame(s) will be marked by the kernel as WRONG_FORMAT
426 for (unsigned char * buf = qi.tx.begin; buf < qi.tx.end; buf += qi.frameSize) {
427 struct ::tpacket2_hdr & pk (* reinterpret_cast<struct ::tpacket2_hdr*>(buf));
428 if (pk.tp_status & TP_STATUS_WRONG_FORMAT) {
429 pk.tp_status = TP_STATUS_SEND_REQUEST;
430 // This does not seem to work properly...
431 pk.tp_len = 64; // set the length of this frame 64 bytes - effectively dropping it without having to rearrange the mmap queue.
432 qi.txStats.wrongFormat++;
433 qi.txStats.dropped++;
439 // Just break'ing here is ok for normal sockets. But may cause problems when QDISC_BYPASS is turned on.
440 // In this case, the kernel seems to report ENOBUFS instead of ENOTCON or ENETDOWN and
441 // the sndBuf seems to be full, but the socket is still writable...
446 // Some frames might have been sent, hence we first have to reset and then advance tx.tail accordingly
447 qi.tx.tail = last_tail;
448 while (qi.tx.tail != qi.tx.head) {
449 struct ::tpacket2_hdr & pk (* reinterpret_cast<struct ::tpacket2_hdr*>(qi.tx.tail));
450 if (pk.tp_status != TP_STATUS_SEND_REQUEST)
451 qi.inc(qi.tx.tail, qi.tx);
455 // We flush our queue, to avoid a busy loop, effectively dropping all frames !!!
456 for (unsigned char * buf = qi.tx.begin; buf < qi.tx.end; buf += qi.frameSize) {
457 struct ::tpacket2_hdr & pk (* reinterpret_cast<struct ::tpacket2_hdr*>(buf));
458 if (pk.tp_status == TP_STATUS_SEND_REQUEST) {
459 pk.tp_status = TP_STATUS_AVAILABLE;
460 qi.txStats.dropped++;
463 // reset tx.head effectily dropping all unsent frames
464 qi.tx.head = qi.tx.tail;
467 SENF_THROW_SYSTEM_EXCEPTION("::send(MMAP)");
471 // we might have more data to send...
481 prefix_ unsigned senf::QueueWritePolicy::write(FileHandle & handle, char const * buffer,
484 if (SENF_UNLIKELY(!handle))
487 SENF_ASSERT( qinfo(handle)->tx.idle,
488 "Inconsistent mixing of write() and enqueue() on mmap socket" );
489 for (bool repeat (true);; repeat = false) {
490 boost::optional<Buffer> buf (enqueue(handle));
492 if (repeat && handle.blocking()) {
493 handle.waitWriteable();
498 if (unsigned(buf->frameSize()) < size)
499 size = buf->frameSize();
501 ::memcpy(buf->begin(), buffer, size);
507 //-/////////////////////////////////////////////////////////////////////////////////////////////////
514 // comment-column: 40
515 // c-file-style: "senf"
516 // indent-tabs-mode: nil
517 // ispell-local-dictionary: "american"
518 // compile-command: "scons -u test"