2 // Copyright (c) 2020 Fraunhofer Institute for Applied Information Technology (FIT)
3 // Network Research Group (NET)
4 // Schloss Birlinghoven, 53754 Sankt Augustin, GERMANY
5 // Contact: support@wiback.org
7 // This file is part of the SENF code tree.
8 // It is licensed under the 3-clause BSD License (aka New BSD License).
9 // See LICENSE.txt in the top level directory for details or visit
10 // https://opensource.org/licenses/BSD-3-Clause
15 \brief QueueSocketSourceSink non-inline template implementation */
17 #include "QueueSocketSourceSink.ih"
20 #include <senf/Utils/membind.hh>
21 #include <senf/Packets/PacketInfo.hh>
22 #include <senf/Utils/senflikely.hh>
23 #include "SocketSource.hh"
24 #include <senf/Socket/QueueReadWritePolicy.ih>
25 #include <senf/Socket/Protocols/Raw/MMapPacketSocketHandle.hh>
28 //-/////////////////////////////////////////////////////////////////////////////////////////////////
30 //-/////////////////////////////////////////////////////////////////////////////////////////////////
31 // senf::ppi::module::ActiveQueueSocketSource<Packet>
33 template <class Packet, class Connector>
35 senf::ppi::module::ActiveQueueSocketSource<Packet,Connector>::ActiveQueueSocketSource(unsigned burst)
38 , redFilterCallback_(redFilterDrop)
41 , flushPending_(false)
47 registerEvent(event_, &ActiveQueueSocketSource::read);
48 route(event_, output);
51 template <class Packet, class Connector>
52 prefix_ senf::ppi::module::ActiveQueueSocketSource<Packet,Connector>::
53 ActiveQueueSocketSource(Handle const & handle, unsigned burst)
55 , event_ (handle_, IOEvent::Read)
56 , redFilterCallback_(redFilterDrop)
59 , flushPending_(false)
65 registerEvent(event_, &ActiveQueueSocketSource::read);
67 senf::ppi::detail::RouteConnector<Connector>::route(*this, event_, output);
70 template <class Packet, class Connector>
71 prefix_ typename senf::ppi::module::ActiveQueueSocketSource<Packet,Connector>::Handle
72 senf::ppi::module::ActiveQueueSocketSource<Packet,Connector>::handle()
78 template <class Packet, class Connector>
79 prefix_ void senf::ppi::module::ActiveQueueSocketSource<Packet,Connector>::handle(Handle const & handle)
82 flushPending_ = false;
85 event_.set(handle_, IOEvent::Read);
92 template <class Packet, class Connector>
93 prefix_ unsigned senf::ppi::module::ActiveQueueSocketSource<Packet,Connector>::burstMax()
95 unsigned res (burstMax_);
102 template <class Packet, class Connector>
103 prefix_ void senf::ppi::module::ActiveQueueSocketSource<Packet,Connector>::setREDFilterCallback(REDFilterCallback const & cb)
105 redFilterCallback_ = cb;
108 template <class Packet, class Connector>
109 prefix_ void senf::ppi::module::ActiveQueueSocketSource<Packet,Connector>::read()
111 // this should not be required, but since we are bursting below, it's not that 'expensive'
112 if (SENF_UNLIKELY(!handle_))
115 senf::detail::QueueInfo & qi (* senf::detail::QueuePolicyBase::qinfo(handle_));
117 flushPending_ = false;
118 for (burst_ = 1; SENF_LIKELY(handle_ and (burst_ <= maxBurst_) and !flushPending_); burst_++) {
119 boost::optional<senf::QueueReadPolicy::Buffer> buf (handle_.dequeue());
120 if (SENF_UNLIKELY(!buf))
125 Packet const & pk (Packet::create(
130 pk.template annotation<senf::ppi::QueueBufferAnnotation>().value = &(*buf);
131 #ifdef SENF_PPI_READ_TIMESTAMP
132 pk.template annotation<senf::ppi::ReadTimestamp>().value = senf::ClockService::now();
134 ifo = PacketInfo(pk);
136 unsigned usage (qi.usageRx());
137 if (SENF_UNLIKELY( (usage > (senf::detail::QueueInfo::NUM_SAMPLES/4)) and ((std::uint32_t(rand()) % senf::detail::QueueInfo::NUM_SAMPLES) < (usage))
138 and !redFilterCallback_(pk, (usage * 100) / senf::detail::QueueInfo::NUM_SAMPLES))) {
147 if (SENF_UNLIKELY(ifo.is_shared() and handle_)) {
148 qi.rxStats.extMemory++;
149 ifo.releaseExternalMemory();
154 if (burst_ > burstMax_)
157 // we were triggered, but no valid packet was available
158 if (SENF_UNLIKELY(burst_ == 1)) {
163 // this tells flush() that we are done with the for() loop
166 if (SENF_LIKELY(handle_)) {
167 if (SENF_UNLIKELY(flushPending_))
174 template <class Packet, class Connector>
175 prefix_ std::pair<unsigned,unsigned> senf::ppi::module::ActiveQueueSocketSource<Packet,Connector>::dropped()
177 unsigned dropped (0);
180 auto & h (reinterpret_cast<ConnectedMMapPacketSocketHandle &>(handle_));
181 dropped = h.protocol().rxQueueDropped();
184 std::pair<unsigned,unsigned> tmp (std::make_pair(red_, dropped));
190 //-/////////////////////////////////////////////////////////////////////////////////////////////////
191 // senf::ppi::module::PassiveQueueSocketSink
193 template <class Connector>
194 prefix_ senf::ppi::module::PassiveQueueSocketSink<Connector>::PassiveQueueSocketSink()
196 , eventHook_ ("PassiveQueueSocketSink", senf::membind(&PassiveQueueSocketSink::send, this),
197 senf::scheduler::EventHook::POST, false)
198 , redFilterCallback_(redFilterDrop)
207 input.onRequest(&PassiveQueueSocketSink::write);
208 input.throttlingDisc( ThrottlingDiscipline::NONE);
210 registerEvent(writeEvent_, &PassiveQueueSocketSink::send);
213 template <class Connector>
214 prefix_ senf::ppi::module::PassiveQueueSocketSink<Connector>::PassiveQueueSocketSink(Handle const & handle)
216 , eventHook_ ("PassiveQueueSocketSink", senf::membind(&PassiveQueueSocketSink::send, this),
217 senf::scheduler::EventHook::POST, false)
218 , writeEvent_(handle_, IOEvent::Write)
219 , redFilterCallback_(redFilterDrop)
228 input.onRequest(&PassiveQueueSocketSink::write);
229 input.throttlingDisc( ThrottlingDiscipline::NONE);
231 registerEvent(writeEvent_, &PassiveQueueSocketSink::send);
234 template <class Connector>
235 prefix_ typename senf::ppi::module::PassiveQueueSocketSink<Connector>::Handle
236 senf::ppi::module::PassiveQueueSocketSink<Connector>::handle()
242 template <class Connector>
243 prefix_ void senf::ppi::module::PassiveQueueSocketSink<Connector>::handle(Handle const & handle)
246 eventHook_.disable();
247 writeEvent_.set(handle_, IOEvent::Write);
248 writeEvent_.enabled(false);
253 template <class Connector>
254 prefix_ unsigned senf::ppi::module::PassiveQueueSocketSink<Connector>::burstMax()
256 unsigned res (burstMax_);
262 template <class Connector>
263 prefix_ void senf::ppi::module::PassiveQueueSocketSink<Connector>::setREDFilterCallback(REDFilterCallback const & cb)
265 redFilterCallback_ = cb;
268 template <class Connector>
269 prefix_ void senf::ppi::module::PassiveQueueSocketSink<Connector>::send()
271 if (SENF_LIKELY(handle_)) {
272 if (SENF_LIKELY(handle_.send())) {
273 // we should try again once the socket is writable again
274 writeEvent_.enabled(true);
276 writeEvent_.enabled(false);
278 eventHook_.disable();
282 if (burst_ > burstMax_)
288 template <class Connector>
289 prefix_ void senf::ppi::module::PassiveQueueSocketSink<Connector>::write()
291 Packet const & packet (input());
293 if (SENF_UNLIKELY(!handle_)) {
298 // simple RED starting at > 33% usage
299 senf::detail::QueueInfo & qi (* senf::detail::QueuePolicyBase::qinfo(handle_));
300 unsigned usage (qi.usageTx());
302 if (SENF_UNLIKELY(usage > (senf::detail::QueueInfo::NUM_SAMPLES/4))) {
303 if ((std::uint32_t(rand()) % senf::detail::QueueInfo::NUM_SAMPLES) < (usage)) {
304 if (!redFilterCallback_(packet, (usage * 100) / senf::detail::QueueInfo::NUM_SAMPLES)) {
312 boost::optional<QueueWritePolicy::Buffer> buf (handle_.enqueue());
313 if (SENF_UNLIKELY(!buf)) {
318 writePacket(packet, *buf);
321 template <class Connector>
322 prefix_ void senf::ppi::module::PassiveQueueSocketSink<Connector>::writePacket(senf::Packet const & packet, QueueReadPolicy::Buffer & buf)
324 SENF_ASSERT( packet.data().size(), "cannot send empty packet" );
325 buf.resize(std::min(packet.data().size(), buf.frameSize()));
326 std::copy(packet.data().begin(), packet.data().end(), buf.begin());
333 template <class Connector>
334 prefix_ std::tuple<unsigned,unsigned,unsigned> senf::ppi::module::PassiveQueueSocketSink<Connector>::dropped()
336 unsigned wrongFormat (0);
339 auto & h (reinterpret_cast<ConnectedMMapPacketSocketHandle &>(handle_));
340 wrongFormat = h.protocol().txWrongFormat();
343 std::tuple<unsigned,unsigned,unsigned> tmp (std::make_tuple(red_,dropped_,wrongFormat));
349 //-/////////////////////////////////////////////////////////////////////////////////////////////////
356 // comment-column: 40
357 // c-file-style: "senf"
358 // indent-tabs-mode: nil
359 // ispell-local-dictionary: "american"
360 // compile-command: "scons -u test"