FIFORunner.cc
Go to the documentation of this file.
1 //
2 // Copyright (c) 2020 Fraunhofer Institute for Applied Information Technology (FIT)
3 // Network Research Group (NET)
4 // Schloss Birlinghoven, 53754 Sankt Augustin, GERMANY
5 // Contact: support@wiback.org
6 //
7 // This file is part of the SENF code tree.
8 // It is licensed under the 3-clause BSD License (aka New BSD License).
9 // See LICENSE.txt in the top level directory for details or visit
10 // https://opensource.org/licenses/BSD-3-Clause
11 //
12 
13 
17 #include "FIFORunner.hh"
18 //#include "FIFORunner.ih"
19 
20 // Custom includes
21 #include <signal.h>
22 #include <time.h>
23 #include <cassert>
24 #include <senf/config.hh>
25 #ifdef SENF_BACKTRACE
26  #include <execinfo.h>
27 #endif
28 #include <stdint.h>
29 #include <stdio.h>
30 #include <senf/Utils/Exception.hh>
34 #include <senf/Utils/senflikely.hh>
35 #include "ConsoleDir.hh"
36 
37 //#include "FIFORunner.mpp"
38 #define prefix_
39 //-/////////////////////////////////////////////////////////////////////////////////////////////////
40 
41 
42 //
43 // Our default watchdog hanging output handler (write to stdout)
44 //
45 static void watchdogDefaultOutput(std::string const & taskName, std::string const & checkPointName, std::string const & backTrace,
46  unsigned periodInMs, unsigned numPeriods)
47 {
48  senf::IGNORE( write(1, "*** Scheduler task hanging (pid ", 32) );
49  static char pid[7];
50  ::snprintf(pid, 7, "%6d", ::getpid());
51  pid[6] = 0;
52  senf::IGNORE( write(1, pid, 6) );
53  senf::IGNORE( write(1, "): ", 3) );
54  senf::IGNORE( write(1, taskName.c_str(), taskName.size()) );
55  if (!checkPointName.empty()) {
56  senf::IGNORE( write(1, " Last Checkpoint: ", 19));
57  senf::IGNORE( write(1, checkPointName.c_str(), checkPointName.size()) );
58  }
59  senf::IGNORE( write(1, "\n", 1) );
60 
61  if (!backTrace.empty()) {
62  senf::IGNORE( write(1, "Task was initialized at\n", 24) );
63  senf::IGNORE( write(1, backTrace.c_str(), backTrace.size()) );
64  senf::IGNORE( write(1, "\n", 1) );
65  }
66 }
67 
68 
69 //
70 // FIFO Runner
71 //
72 prefix_ senf::scheduler::detail::FIFORunner::FIFORunner()
73  : tasks_ (), next_ (tasks_.end()), watchdogRunning_ (false), watchdogMs_ (1000),
74  watchdogAbort_ (false), watchdogCheckpoint_(nullptr), runningTask_(nullptr),
75  watchdogCallback_(::watchdogDefaultOutput),
76  watchdogCount_(0), hangCount_ (0), yield_ (false)
77 {
78  struct sigevent ev;
79  ::memset(&ev, 0, sizeof(ev));
80  ev.sigev_notify = SIGEV_SIGNAL;
81  ev.sigev_signo = SIGURG;
82  ev.sigev_value.sival_ptr = this;
83  if (timer_create(CLOCK_MONOTONIC, &ev, &watchdogId_) < 0)
84  SENF_THROW_SYSTEM_EXCEPTION("timer_create()");
85 
86  struct sigaction sa;
87  ::memset(&sa, 0, sizeof(sa));
88  sa.sa_sigaction = &watchdog;
89  sa.sa_flags = SA_SIGINFO;
90  if (sigaction(SIGURG, &sa, 0) < 0)
91  SENF_THROW_SYSTEM_EXCEPTION("sigaction()");
92 
93  sigset_t mask;
94  sigemptyset(&mask);
95  sigaddset(&mask, SIGURG);
96  if (sigprocmask(SIG_UNBLOCK, &mask, 0) < 0)
97  SENF_THROW_SYSTEM_EXCEPTION("sigprocmask()");
98 
99  tasks_.push_back(highPriorityEnd_);
100  tasks_.push_back(normalPriorityEnd_);
101 
102 #ifndef SENF_DISABLE_CONSOLE
103  namespace fty = console::factory;
104  consoleDir().add("abortOnWatchdocTimeout", fty::Command(
105  SENF_MEMBINDFNP( bool, FIFORunner, abortOnTimeout, () const ))
106  .doc("Get current watchdog abort on event status.") );
107  consoleDir().add("abortOnWatchdocTimeout", fty::Command(
108  SENF_MEMBINDFNP( void, FIFORunner, abortOnTimeout, (bool) ))
109  .doc("Enable/disable abort on watchdog event.") );
110  consoleDir().add("watchdogTimeout", fty::Command(
111  SENF_MEMBINDFNP( unsigned, FIFORunner, taskTimeout, () const ))
112  .doc("Get current watchdog timeout in milliseconds") );
113  consoleDir().add("watchdogTimeout", fty::Command(
114  SENF_MEMBINDFNP( void, FIFORunner, taskTimeout, (unsigned) ))
115  .doc("Set watchdog timeout to in milliseconds\n"
116  "Setting the watchdog timeout to 0 will disable the watchdog.") );
117  consoleDir().add("watchdogEvents", fty::Command(membind( &FIFORunner::hangCount, this))
118  .doc("Get number of occurred watchdog events.\n"
119  "Calling this method will reset the counter to 0") );
120 #endif
121 }
122 
123 prefix_ senf::scheduler::detail::FIFORunner::~FIFORunner()
124 {
125  timer_delete(watchdogId_);
126  signal(SIGURG, SIG_DFL);
127 
128 #ifndef SENF_DISABLE_CONSOLE
129  consoleDir().remove("abortOnWatchdocTimeout");
130  consoleDir().remove("watchdogTimeout");
131  consoleDir().remove("watchdogEvents");
132 #endif
133 }
134 
136 {
137  if (watchdogMs_ > 0) {
138  struct itimerspec timer;
139  ::memset(&timer, 0, sizeof(timer));
140 
141  timer.it_interval.tv_sec = watchdogMs_ / 1000;
142  timer.it_interval.tv_nsec = (watchdogMs_ % 1000) * 1000000ul;
143  timer.it_value.tv_sec = timer.it_interval.tv_sec;
144  timer.it_value.tv_nsec = timer.it_interval.tv_nsec;
145 
146  if (timer_settime(watchdogId_, 0, &timer, 0) < 0)
147  SENF_THROW_SYSTEM_EXCEPTION("timer_settime()");
148 
149  watchdogRunning_ = true;
150  }
151  else
152  stopWatchdog();
153 }
154 
156 {
157  struct itimerspec timer;
158  ::memset(&timer, 0, sizeof(timer));
159 
160  if (timer_settime(watchdogId_, 0, &timer, 0) < 0)
161  SENF_THROW_SYSTEM_EXCEPTION("timer_settime()");
162 
163  watchdogRunning_ = false;
164 }
165 
166 // At the moment, the FIFORunner is not very efficient with many non-runnable tasks since the
167 // complete list of tasks is traversed on each run().
168 //
169 // To optimize this, we would need a way to find the relative ordering of two tasks in O(1) (at the
170 // moment, this is an O(N) operation by traversing the list).
171 //
172 // One idea is, to give each task an 'order' value. Whenever a task is added at the end, it's order
173 // value is set to the order value of the last task + 1. Whenever the order value such added exceeds
174 // some threshold (e.g. 2^31 -1 or some such), the task list is traversed from beginning to end to
175 // assign new consecutive order values. This O(N) operation is so seldom, that it is amortized over
176 // a very long time.
177 //
178 // With this value at hand, we can do several optimizations: One idea would be the following: The
179 // runnable set always has two types of tasks: There are tasks, which are heavily active and are
180 // signaled constantly and other tasks which lie dormant most of the time. Those dormant tasks will
181 // end up at the beginning of the task queue.
182 //
183 // With the above defined 'ordering' field available, we can manage an iterator pointing to the
184 // first and the last runnable task. This will often help a lot since the group of runnable tasks
185 // will mostly be localized to the end of the queue. only occasionally one of the dormant tasks will
186 // be runnable. This additional traversal time will be amortized over a larger time.
187 
189 {
190  TaskList::iterator i (TaskList::s_iterator_to(*task));
191  if (next_ == i)
192  ++next_;
193  if (runningTask_ == task) {
194  runningTask_ = nullptr;
195  runningName_ = task->name();
196 #ifdef SENF_BACKTRACE
197  runningBacktrace_ = task->backtrace_;
198 #endif
199  }
200  tasks_.erase(i);
201 }
202 
204 {
205  for (;;) {
206  TaskList::iterator f (tasks_.begin());
207  TaskList::iterator l (TaskList::s_iterator_to(highPriorityEnd_));
208  run(f, l);
209  if (SENF_UNLIKELY(yield_)) {
210  yield_ = false;
211  continue;
212  }
213 
214  f = l; ++f;
215  l = TaskList::s_iterator_to(normalPriorityEnd_);
216  run(f, l);
217  if (SENF_UNLIKELY(yield_)) {
218  yield_ = false;
219  continue;
220  }
221 
222  f = l; ++f;
223  l = tasks_.end();
224  run(f, l);
225  if (SENF_UNLIKELY(yield_)) {
226  yield_ = false;
227  continue;
228  }
229  break;
230  }
231 }
232 
233 prefix_ void senf::scheduler::detail::FIFORunner::run(TaskList::iterator f, TaskList::iterator l)
234 {
235  if (SENF_UNLIKELY(f == l))
236  // We'll have problems inserting NullTask between f and l below, so just explicitly bail out
237  return;
238 
239  // This algorithm is carefully adjusted to make it work even when arbitrary tasks are removed
240  // from the queue
241  // - Before we begin, we add a NullTask to the queue. The only purpose of this node is, to mark
242  // the current end of the queue. The iterator to this node becomes the end iterator of the
243  // range to process
244  // - We update the TaskInfo and move it to the next queue Element before calling the callback so
245  // we don't access the TaskInfo if it is removed while the callback is running
246  // - We keep the next to-be-processed node in a class variable which is checked and updated
247  // whenever a node is removed.
248 
249  TaskList::iterator end (tasks_.insert(l, queueEnd_));
250  next_ = f;
251 
252  // Would prefer to use ScopeExit+boost::lambda here instead of try but profiling has shown that
253  // to be to costly here
254 
255  try {
256  while (SENF_LIKELY(next_ != end)) {
257  runningTask_ = &(*next_);
258  if (runningTask_->runnable_) {
259  runningTask_->runnable_ = false;
260  TaskList::iterator i (next_);
261  ++ next_;
262  tasks_.splice(l, tasks_, i);
263  watchdogCount_ = 1;
264  yield_ = false;
265  runningTask_->run();
266  if (SENF_UNLIKELY(yield_)) {
267  tasks_.erase(end);
268  return;
269  }
270  }
271  else
272  ++ next_;
273  }
274  watchdogCount_ = 0;
275  next_ = l;
276  tasks_.erase(end);
277  }
278  catch (...) {
279  watchdogCount_ = 0;
280  next_ = l;
281  tasks_.erase(end);
282  throw;
283  }
284 }
285 
286 prefix_ senf::scheduler::detail::FIFORunner::TaskList::iterator
287 senf::scheduler::detail::FIFORunner::priorityEnd(TaskInfo::Priority p)
288 {
289  switch (p) {
291  return tasks_.end();
293  return TaskList::s_iterator_to(normalPriorityEnd_);
295  return TaskList::s_iterator_to(highPriorityEnd_);
296  }
297  return tasks_.begin();
298 }
299 
300 prefix_ void senf::scheduler::detail::FIFORunner::watchdog(int, siginfo_t * si, void *)
301 {
302  FIFORunner & runner (*static_cast<FIFORunner *>(si->si_value.sival_ptr));
303  if (runner.watchdogCount_ > 0) {
304  ++ runner.watchdogCount_;
305  if (runner.watchdogCount_ > 2) {
306  ++ runner.hangCount_;
307  runner.watchdogError();
308  }
309  }
310 }
311 
312 prefix_ void senf::scheduler::detail::FIFORunner::watchdogError()
313 {
314  std::string taskName (runningTask_ ? runningTask_->name() : runningName_);
315  std::string checkpointName (watchdogCheckpoint_ ? std::string(watchdogCheckpoint_) : std::string(""));
316 #ifdef SENF_BACKTRACE
317  std::string backTrace (runningTask_ ? runningTask_->backtrace_ : runningBacktrace_);
318 #else
319  std::string backTrace ("");
320 #endif
321 
322  watchdogCallback_(taskName, checkpointName, backTrace, watchdogMs_, watchdogCount_ -1);
323 
324  if (watchdogAbort_)
325  assert(false);
326 }
327 
328 //-/////////////////////////////////////////////////////////////////////////////////////////////////
329 #undef prefix_
330 //#include "FIFORunner.mpp"
331 
332 
333 // Local Variables:
334 // mode: c++
335 // fill-column: 100
336 // comment-column: 40
337 // c-file-style: "senf"
338 // indent-tabs-mode: nil
339 // ispell-local-dictionary: "american"
340 // compile-command: "scons -u test"
341 // End:
#define prefix_
Definition: FIFORunner.cc:38
#define SENF_MEMBINDFNP(ret, cls, fn, args)
#define SENF_THROW_SYSTEM_EXCEPTION(desc)
FIFORunner public header.
#define SENF_LIKELY(x)
__u32 mask
boost::function< R(Args)> membind(R(T::*fn)(Args), T *ob)
StatsDataCollectorKernel signal
GenericNode::ptr remove(std::string const &name)
console::ScopedDirectory & consoleDir()
Definition: ConsoleDir.cc:27
std::string const & name() const
Get event name.
Scheduler ConsoleDir public header.
#define SENF_UNLIKELY(x)
NodeType & add(std::string const &name, boost::shared_ptr< NodeType > node)