QueueSocketSourceSink.ct
Go to the documentation of this file.
1 //
2 // Copyright (c) 2020 Fraunhofer Institute for Applied Information Technology (FIT)
3 // Network Research Group (NET)
4 // Schloss Birlinghoven, 53754 Sankt Augustin, GERMANY
5 // Contact: support@wiback.org
6 //
7 // This file is part of the SENF code tree.
8 // It is licensed under the 3-clause BSD License (aka New BSD License).
9 // See LICENSE.txt in the top level directory for details or visit
10 // https://opensource.org/licenses/BSD-3-Clause
11 //
12 
13 
14 /** \file
15  \brief QueueSocketSourceSink non-inline template implementation */
16 
17 #include "QueueSocketSourceSink.ih"
18 
19 // Custom includes
20 #include <senf/Utils/membind.hh>
21 #include <senf/Packets/PacketInfo.hh>
22 #include <senf/Utils/senflikely.hh>
23 #include "SocketSource.hh"
24 #include <senf/Socket/QueueReadWritePolicy.ih>
25 #include <senf/Socket/Protocols/Raw/MMapPacketSocketHandle.hh>
26 
27 #define prefix_
28 //-/////////////////////////////////////////////////////////////////////////////////////////////////
29 
30 //-/////////////////////////////////////////////////////////////////////////////////////////////////
31 // senf::ppi::module::ActiveQueueSocketSource<Packet>
32 
33 template <class Packet, class Connector>
34 prefix_
35 senf::ppi::module::ActiveQueueSocketSource<Packet,Connector>::ActiveQueueSocketSource(unsigned burst)
36  : handle_ ()
37  , event_ ()
38  , redFilterCallback_(redFilterDrop)
39  , maxBurst_ (burst)
40  , burst_(0)
41  , flushPending_(false)
42  , red_(0)
43 #ifdef SENF_DEBUG
44  , burstMax_ (0)
45 #endif
46 {
47  registerEvent(event_, &ActiveQueueSocketSource::read);
48  route(event_, output);
49 }
50 
51 template <class Packet, class Connector>
52 prefix_ senf::ppi::module::ActiveQueueSocketSource<Packet,Connector>::
53 ActiveQueueSocketSource(Handle const & handle, unsigned burst)
54  : handle_ (handle)
55  , event_ (handle_, IOEvent::Read)
56  , redFilterCallback_(redFilterDrop)
57  , maxBurst_ (burst)
58  , burst_(0)
59  , flushPending_(false)
60  , red_(0)
61 #ifdef SENF_DEBUG
62  , burstMax_ (0)
63 #endif
64 {
65  registerEvent(event_, &ActiveQueueSocketSource::read);
66  noroute(output);
67  senf::ppi::detail::RouteConnector<Connector>::route(*this, event_, output);
68 }
69 
70 template <class Packet, class Connector>
71 prefix_ typename senf::ppi::module::ActiveQueueSocketSource<Packet,Connector>::Handle
72 senf::ppi::module::ActiveQueueSocketSource<Packet,Connector>::handle()
73  const
74 {
75  return handle_;
76 }
77 
78 template <class Packet, class Connector>
79 prefix_ void senf::ppi::module::ActiveQueueSocketSource<Packet,Connector>::handle(Handle const & handle)
80 {
81  burst_ = 0;
82  flushPending_ = false;
83  red_ = 0;
84  handle_ = handle;
85  event_.set(handle_, IOEvent::Read);
86  if (handle)
87  event_.enabled(true);
88 }
89 
90 #ifdef SENF_DEBUG
91 
92 template <class Packet, class Connector>
93 prefix_ unsigned senf::ppi::module::ActiveQueueSocketSource<Packet,Connector>::burstMax()
94 {
95  unsigned res (burstMax_);
96  burstMax_ = 0;
97  return res;
98 }
99 
100 #endif
101 
102 template <class Packet, class Connector>
103 prefix_ void senf::ppi::module::ActiveQueueSocketSource<Packet,Connector>::setREDFilterCallback(REDFilterCallback const & cb)
104 {
105  redFilterCallback_ = cb;
106 }
107 
108 template <class Packet, class Connector>
109 prefix_ void senf::ppi::module::ActiveQueueSocketSource<Packet,Connector>::read()
110 {
111  // this should not be required, but since we are bursting below, it's not that 'expensive'
112  if (SENF_UNLIKELY(!handle_))
113  return;
114 
115  senf::detail::QueueInfo & qi (* senf::detail::QueuePolicyBase::qinfo(handle_));
116 
117  flushPending_ = false;
118  for (burst_ = 1; SENF_LIKELY(handle_ and (burst_ <= maxBurst_) and !flushPending_); burst_++) {
119  boost::optional<senf::QueueReadPolicy::Buffer> buf (handle_.dequeue());
120  if (SENF_UNLIKELY(!buf))
121  break;
122 
123  PacketInfo ifo;
124  {
125  Packet const & pk (Packet::create(
126  buf->frameBegin(),
127  buf->size(),
128  buf->frameSize(),
129  buf->dataOffset()));
130  pk.template annotation<senf::ppi::QueueBufferAnnotation>().value = &(*buf);
131 #ifdef SENF_PPI_READ_TIMESTAMP
132  pk.template annotation<senf::ppi::ReadTimestamp>().value = senf::ClockService::now();
133 #endif
134  ifo = PacketInfo(pk);
135 
136  unsigned usage (qi.usageRx());
137  if (SENF_UNLIKELY( (usage > (senf::detail::QueueInfo::NUM_SAMPLES/4)) and ((std::uint32_t(rand()) % senf::detail::QueueInfo::NUM_SAMPLES) < (usage))
138  and !redFilterCallback_(pk, (usage * 100) / senf::detail::QueueInfo::NUM_SAMPLES))) {
139  // drop frame
140  qi.rxStats.red++;
141  red_++;
142  } else {
143  // process frame
144  output(pk);
145  }
146  }
147  if (SENF_UNLIKELY(ifo.is_shared() and handle_)) {
148  qi.rxStats.extMemory++;
149  ifo.releaseExternalMemory();
150  }
151  }
152 
153 #ifdef SENF_DEBUG
154  if (burst_ > burstMax_)
155  burstMax_ = burst_;
156 
157  // we were triggered, but no valid packet was available
158  if (SENF_UNLIKELY(burst_ == 1)) {
159  qi.rxStats.noop++;
160  }
161 #endif
162 
163  // this tells flush() that we are done with the for() loop
164  burst_ = 0;
165 
166  if (SENF_LIKELY(handle_)) {
167  if (SENF_UNLIKELY(flushPending_))
168  flush();
169  else
170  handle_.release();
171  }
172 }
173 
174 template <class Packet, class Connector>
175 prefix_ std::pair<unsigned,unsigned> senf::ppi::module::ActiveQueueSocketSource<Packet,Connector>::dropped()
176 {
177  unsigned dropped (0);
178  if (handle_) {
179  try {
180  auto & h (reinterpret_cast<ConnectedMMapPacketSocketHandle &>(handle_));
181  dropped = h.protocol().rxQueueDropped();
182  } catch (...) {};
183  }
184  std::pair<unsigned,unsigned> tmp (std::make_pair(red_, dropped));
185  red_ = 0;
186  return tmp;
187 }
188 
189 
190 //-/////////////////////////////////////////////////////////////////////////////////////////////////
191 // senf::ppi::module::PassiveQueueSocketSink
192 
193 template <class Connector>
194 prefix_ senf::ppi::module::PassiveQueueSocketSink<Connector>::PassiveQueueSocketSink()
195  : handle_ ()
196  , eventHook_ ("PassiveQueueSocketSink", senf::membind(&PassiveQueueSocketSink::send, this),
197  senf::scheduler::EventHook::POST, false)
198  , redFilterCallback_(redFilterDrop)
199  , red_(0)
200  , dropped_(0)
201 #ifdef SENF_DEBUG
202  , burst_ (0)
203  , burstMax_ (0)
204 #endif
205 {
206  noroute(input);
207  input.onRequest(&PassiveQueueSocketSink::write);
208  input.throttlingDisc( ThrottlingDiscipline::NONE);
209 
210  registerEvent(writeEvent_, &PassiveQueueSocketSink::send);
211 }
212 
213 template <class Connector>
214 prefix_ senf::ppi::module::PassiveQueueSocketSink<Connector>::PassiveQueueSocketSink(Handle const & handle)
215  : handle_ (handle)
216  , eventHook_ ("PassiveQueueSocketSink", senf::membind(&PassiveQueueSocketSink::send, this),
217  senf::scheduler::EventHook::POST, false)
218  , writeEvent_(handle_, IOEvent::Write)
219  , redFilterCallback_(redFilterDrop)
220  , red_(0)
221  , dropped_(0)
222 #ifdef SENF_DEBUG
223  , burst_ (0)
224  , burstMax_ (0)
225 #endif
226 {
227  noroute(input);
228  input.onRequest(&PassiveQueueSocketSink::write);
229  input.throttlingDisc( ThrottlingDiscipline::NONE);
230 
231  registerEvent(writeEvent_, &PassiveQueueSocketSink::send);
232 }
233 
234 template <class Connector>
235 prefix_ typename senf::ppi::module::PassiveQueueSocketSink<Connector>::Handle
236 senf::ppi::module::PassiveQueueSocketSink<Connector>::handle()
237  const
238 {
239  return handle_;
240 }
241 
242 template <class Connector>
243 prefix_ void senf::ppi::module::PassiveQueueSocketSink<Connector>::handle(Handle const & handle)
244 {
245  handle_ = handle;
246  eventHook_.disable();
247  writeEvent_.set(handle_, IOEvent::Write);
248  writeEvent_.enabled(false);
249  red_ = dropped_ = 0;
250 }
251 
252 #ifdef SENF_DEBUG
253 template <class Connector>
254 prefix_ unsigned senf::ppi::module::PassiveQueueSocketSink<Connector>::burstMax()
255 {
256  unsigned res (burstMax_);
257  burstMax_ = 0;
258  return res;
259 }
260 #endif
261 
262 template <class Connector>
263 prefix_ void senf::ppi::module::PassiveQueueSocketSink<Connector>::setREDFilterCallback(REDFilterCallback const & cb)
264 {
265  redFilterCallback_ = cb;
266 }
267 
268 template <class Connector>
269 prefix_ void senf::ppi::module::PassiveQueueSocketSink<Connector>::send()
270 {
271  if (SENF_LIKELY(handle_)) {
272  if (SENF_LIKELY(handle_.send())) {
273  // we should try again once the socket is writable again
274  writeEvent_.enabled(true);
275  } else {
276  writeEvent_.enabled(false);
277  }
278  eventHook_.disable();
279  }
280 
281 #ifdef SENF_DEBUG
282  if (burst_ > burstMax_)
283  burstMax_ = burst_;
284  burst_ = 0;
285 #endif
286 }
287 
288 template <class Connector>
289 prefix_ void senf::ppi::module::PassiveQueueSocketSink<Connector>::write()
290 {
291  Packet const & packet (input());
292 
293  if (SENF_UNLIKELY(!handle_)) {
294  dropped_++;
295  return;
296  }
297 
298  // simple RED starting at > 33% usage
299  senf::detail::QueueInfo & qi (* senf::detail::QueuePolicyBase::qinfo(handle_));
300  unsigned usage (qi.usageTx());
301 
302  if (SENF_UNLIKELY(usage > (senf::detail::QueueInfo::NUM_SAMPLES/4))) {
303  if ((std::uint32_t(rand()) % senf::detail::QueueInfo::NUM_SAMPLES) < (usage)) {
304  if (!redFilterCallback_(packet, (usage * 100) / senf::detail::QueueInfo::NUM_SAMPLES)) {
305  qi.txStats.red++;
306  red_++;
307  return;
308  }
309  }
310  }
311 
312  boost::optional<QueueWritePolicy::Buffer> buf (handle_.enqueue());
313  if (SENF_UNLIKELY(!buf)) {
314  dropped_++;
315  return;
316  }
317 
318  writePacket(packet, *buf);
319 }
320 
321 template <class Connector>
322 prefix_ void senf::ppi::module::PassiveQueueSocketSink<Connector>::writePacket(senf::Packet const & packet, QueueReadPolicy::Buffer & buf)
323 {
324  SENF_ASSERT( packet.data().size(), "cannot send empty packet" );
325  buf.resize(std::min(packet.data().size(), buf.frameSize()));
326  std::copy(packet.data().begin(), packet.data().end(), buf.begin());
327  eventHook_.enable();
328 #ifdef SENF_DEBUG
329  ++ burst_;
330 #endif
331 }
332 
333 template <class Connector>
334 prefix_ std::tuple<unsigned,unsigned,unsigned> senf::ppi::module::PassiveQueueSocketSink<Connector>::dropped()
335 {
336  unsigned wrongFormat (0);
337  if (handle_) {
338  try {
339  auto & h (reinterpret_cast<ConnectedMMapPacketSocketHandle &>(handle_));
340  wrongFormat = h.protocol().txWrongFormat();
341  } catch (...) {};
342  }
343  std::tuple<unsigned,unsigned,unsigned> tmp (std::make_tuple(red_,dropped_,wrongFormat));
344  red_ = dropped_ = 0;
345  return tmp;
346 }
347 
348 
349 //-/////////////////////////////////////////////////////////////////////////////////////////////////
350 #undef prefix_
351 
352 
353 // Local Variables:
354 // mode: c++
355 // fill-column: 100
356 // comment-column: 40
357 // c-file-style: "senf"
358 // indent-tabs-mode: nil
359 // ispell-local-dictionary: "american"
360 // compile-command: "scons -u test"
361 // End: