00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00026 #include "Joins.hh"
00027
00028
00029
00030 #include <algorithm>
00031 #include <boost/bind.hpp>
00032 #include <boost/lambda/lambda.hpp>
00033 #include <boost/lambda/bind.hpp>
00034
00035
00036 #define prefix_
00037
00038
00039
00040
00041
00042 prefix_ senf::ppi::module::PassiveJoin::PassiveJoin()
00043 {
00044 noroute(output);
00045 output.onThrottle(&PassiveJoin::onThrottle);
00046 output.onUnthrottle(&PassiveJoin::onUnthrottle);
00047 }
00048
00049
00050
00051
00052 prefix_ void senf::ppi::module::PassiveJoin::connectorSetup(connector::PassiveInput<> & conn)
00053 {
00054 noroute(conn);
00055 conn.onRequest(boost::bind(&PassiveJoin::request,this,boost::ref(conn)));
00056 conn.qdisc( QueueingDiscipline::NONE);
00057 }
00058
00059 prefix_ void senf::ppi::module::PassiveJoin::onThrottle()
00060 {
00061 using boost::lambda::_1;
00062 using boost::lambda::bind;
00063 std::for_each(connectors().begin(), connectors().end(),
00064 bind(&connector::GenericPassiveInput::throttle, _1));
00065 }
00066
00067 prefix_ void senf::ppi::module::PassiveJoin::onUnthrottle()
00068 {
00069 using boost::lambda::_1;
00070 using boost::lambda::bind;
00071 std::for_each(connectors().begin(), connectors().end(),
00072 bind(&connector::GenericPassiveInput::unthrottle, _1));
00073 }
00074
00075
00076
00077
00078 prefix_ senf::ppi::module::PriorityJoin::PriorityJoin()
00079 {
00080 noroute(output);
00081 output.onRequest(&PriorityJoin::request);
00082 }
00083
00084
00085
00086
00087 prefix_ void
00088 senf::ppi::module::PriorityJoin::connectorSetup(PriorityJoin::ConnectorType & conn,
00089 int priority)
00090 {
00091 noroute(conn);
00092 conn.onThrottle(&PriorityJoin::onThrottle);
00093 conn.onUnthrottle(&PriorityJoin::onUnthrottle);
00094
00095 if (priority < 0) {
00096 priority = connectors().size() + priority;
00097 if (priority < 0)
00098 priority = 0;
00099 }
00100 if (priority >= int(connectors().size())-1)
00101 return;
00102
00103 connectors().insert(connectors().begin()+priority, connectors().pop_back().release());
00104 }
00105
00106 prefix_ void senf::ppi::module::PriorityJoin::request()
00107 {
00108 using boost::lambda::_1;
00109 using boost::lambda::bind;
00110 PriorityJoin::ContainerType::iterator i (
00111 std::find_if(connectors().begin(), connectors().end(),
00112 ! bind(&connector::GenericActiveInput::throttled, _1)));
00113 if (i != connectors().end())
00114 output((*i)());
00115 }
00116
00117 prefix_ void senf::ppi::module::PriorityJoin::onThrottle()
00118 {
00119 if (std::find_if(connectors().begin(), connectors().end(),
00120 ! bind(&connector::GenericActiveInput::throttled, _1)) == connectors().end())
00121 output.throttle();
00122 }
00123
00124 prefix_ void senf::ppi::module::PriorityJoin::onUnthrottle()
00125 {
00126 output.unthrottle();
00127 }
00128
00129
00130 #undef prefix_
00131
00132
00133
00134
00135
00136
00137
00138
00139
00140
00141
00142