00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00026
00027
00028
00029 #include <senf/Utils/Console/ParsedCommand.hh>
00030
00031 #define prefix_
00032
00033
00034
00035
00036
00037 template <class QAlgorithm>
00038 prefix_ void senf::ppi::QueueingAlgorithmRegistry::registerQAlgorithm(std::string key)
00039 {
00040 if (qAlgoMap_.find( key) == qAlgoMap_.end() )
00041 qAlgoMap_.insert(key, new detail::QueueingAlgorithmRegistry_Entry<QAlgorithm>() );
00042 else
00043 throw Exception("Duplicated QAlgorithm Registration ") << key;
00044 }
00045
00046
00047
00048
00049 template <class QAlgorithm>
00050 prefix_ senf::ppi::QueueingAlgorithm::ptr senf::ppi::detail::QueueingAlgorithmRegistry_Entry<QAlgorithm>::create()
00051 const
00052 {
00053 return QAlgorithm::create();
00054 }
00055
00056
00057
00058
00059 template <class Writer>
00060 prefix_ senf::ppi::module::PassiveQueueingSocketSink<Writer>::PassiveQueueingSocketSink(Handle const & handle, QueueingAlgorithm::ptr qAlgorithm)
00061 : dir( this),
00062 handle_( handle), writer_( ),
00063 qAlgo_( qAlgorithm),
00064 event_( handle_, IOEvent::Write)
00065 {
00066 namespace fty = console::factory;
00067 dir.add( "active", qAlgo_->consoleDir());
00068 dir.add( "set", fty::Command(
00069 &PassiveQueueingSocketSink<Writer>::setQAlgorithm, this) );
00070 dir.add( "list", fty::Command(
00071 &QueueingAlgorithmRegistry::dump, &QueueingAlgorithmRegistry::instance()));
00072 registerEvent( event_, &PassiveQueueingSocketSink::writable );
00073 event_.enabled( false);
00074 noroute(input);
00075 input.onRequest( &PassiveQueueingSocketSink::write);
00076 input.qdisc( QueueingDiscipline::NONE);
00077 checkThrottle();
00078 }
00079
00080 template <class Writer>
00081 prefix_ void senf::ppi::module::PassiveQueueingSocketSink<Writer>::handle(Handle const & handle)
00082 {
00083 handle_ = handle;
00084 event_.set( handle_, IOEvent::Write);
00085 qAlgo_->clear();
00086 checkThrottle();
00087 }
00088
00089 template <class Writer>
00090 prefix_ void senf::ppi::module::PassiveQueueingSocketSink<Writer>::write()
00091 {
00092 PacketType p ( input());
00093 if (qAlgo_->size() > 0) {
00094 qAlgo_->enqueue( p);
00095 return;
00096 }
00097 if (! writer_( handle_, p)) {
00098 if (qAlgo_->enqueue( p) && !event_.enabled()) {
00099 event_.enabled( true);
00100 }
00101 }
00102 }
00103
00104 template <class Writer>
00105 prefix_ void senf::ppi::module::PassiveQueueingSocketSink<Writer>::writable()
00106 {
00107 PacketType p (qAlgo_->dequeue());
00108 if (p)
00109 writer_( handle_, p);
00110 if (qAlgo_->size() == 0) {
00111 event_.enabled( false);
00112 }
00113 }
00114
00115 template <class Writer>
00116 prefix_ void senf::ppi::module::PassiveQueueingSocketSink<Writer>::checkThrottle()
00117 {
00118 if (handle_.valid())
00119 input.unthrottle();
00120 else
00121 input.throttle();
00122 }
00123
00124 template <class Writer>
00125 prefix_ void senf::ppi::module::PassiveQueueingSocketSink<Writer>::qAlgorithm(QueueingAlgorithm::ptr qAlgorithm)
00126 {
00127
00128 qAlgo_.reset( qAlgorithm);
00129 dir.add( "active", qAlgo_->consoleDir());
00130 if (event_.enabled())
00131 event_.enabled( false);
00132 }
00133
00134 template <class Writer>
00135 prefix_ void senf::ppi::module::PassiveQueueingSocketSink<Writer>::setQAlgorithm(std::string const & key)
00136 {
00137 qAlgorithm( QueueingAlgorithmRegistry::instance().createQAlgorithm( key));
00138 }
00139
00140
00141 #undef prefix_
00142
00143
00144
00145
00146
00147
00148
00149
00150
00151
00152