00001 // $Id: QueueingSocketSink.hh 1742 2010-11-04 14:51:56Z g0dil $ 00002 // 00003 // Copyright (C) 2010 00004 // Fraunhofer (FOKUS) 00005 // Competence Center NETwork research (NET), St. Augustin, GERMANY 00006 // Thorsten Horstmann <tho@berlios.de> 00007 // 00008 // This program is free software; you can redistribute it and/or modify 00009 // it under the terms of the GNU General Public License as published by 00010 // the Free Software Foundation; either version 2 of the License, or 00011 // (at your option) any later version. 00012 // 00013 // This program is distributed in the hope that it will be useful, 00014 // but WITHOUT ANY WARRANTY; without even the implied warranty of 00015 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00016 // GNU General Public License for more details. 00017 // 00018 // You should have received a copy of the GNU General Public License 00019 // along with this program; if not, write to the 00020 // Free Software Foundation, Inc., 00021 // 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. 00022 00026 #ifndef HH_SENF_PPI_QueueingSocketSink_ 00027 #define HH_SENF_PPI_QueueingSocketSink_ 1 00028 00029 // Custom includes 00030 #include <queue> 00031 #include "SocketSink.hh" 00032 #include <senf/Utils/Console/ScopedDirectory.hh> 00033 00034 //#include "QueueingSocketSink.mpp" 00035 //-///////////////////////////////////////////////////////////////////////////////////////////////// 00036 00037 namespace senf { 00038 namespace ppi { 00039 00040 class QueueingAlgorithm 00041 : private boost::noncopyable 00042 { 00043 console::ScopedDirectory<QueueingAlgorithm> dir_; 00044 00045 public: 00046 typedef QueueingAlgorithm * ptr; 00047 00048 virtual ~QueueingAlgorithm() {}; 00049 00050 console::DirectoryNode & consoleDir(); 00051 Packet dequeue(); 00052 bool enqueue(Packet const & packet); 00053 unsigned size(); 00054 void clear(); 00055 00056 protected: 00057 QueueingAlgorithm(); 00058 00059 virtual Packet v_dequeue() = 0; 00060 virtual bool v_enqueue(Packet const & packet) = 0; 00061 virtual unsigned v_size() const = 0; 00062 virtual void v_clear() = 0; 00063 }; 00064 00065 00066 namespace detail { 00067 struct QueueingAlgorithmRegistry_EntryBase 00068 { 00069 virtual QueueingAlgorithm::ptr create() const = 0; 00070 }; 00071 00072 template <class QAlgorithm> 00073 struct QueueingAlgorithmRegistry_Entry : QueueingAlgorithmRegistry_EntryBase 00074 { 00075 virtual QueueingAlgorithm::ptr create() const; 00076 }; 00077 } 00078 00079 class QueueingAlgorithmRegistry 00080 : public senf::singleton<QueueingAlgorithmRegistry> 00081 { 00082 typedef boost::ptr_map<std::string, detail::QueueingAlgorithmRegistry_EntryBase> QAlgoMap; 00083 QAlgoMap qAlgoMap_; 00084 00085 QueueingAlgorithmRegistry() {}; 00086 public: 00087 using senf::singleton<QueueingAlgorithmRegistry>::instance; 00088 friend class senf::singleton<QueueingAlgorithmRegistry>; 00089 00090 struct Exception : public senf::Exception { 00091 Exception(std::string const & descr) : senf::Exception(descr) {} 00092 }; 00093 00094 template <class QAlgorithm> 00095 struct RegistrationProxy { 00096 RegistrationProxy(std::string const & key); 00097 }; 00098 00099 template <class QAlgorithm> 00100 void registerQAlgorithm(std::string key); 00101 00102 QueueingAlgorithm::ptr createQAlgorithm(std::string const & key) const; 00103 void dump(std::ostream & os) const; 00104 }; 00105 00106 00107 # define SENF_PPI_REGISTER_QALGORITHM( key, QAlgorithm ) \ 00108 namespace { \ 00109 senf::ppi::QueueingAlgorithmRegistry::RegistrationProxy<QAlgorithm> \ 00110 BOOST_PP_CAT(qAlgorithmRegistration_, __LINE__)( key); \ 00111 } 00112 00113 00114 class FIFOQueueingAlgorithm : public QueueingAlgorithm 00115 { 00116 std::queue<Packet> queue_; 00117 unsigned max_size_; 00118 00119 FIFOQueueingAlgorithm(); 00120 00121 virtual Packet v_dequeue(); 00122 virtual bool v_enqueue(Packet const & packet); 00123 virtual unsigned v_size() const; 00124 virtual void v_clear(); 00125 00126 public: 00127 static QueueingAlgorithm::ptr create(); 00128 }; 00129 00130 00131 namespace module { 00132 00137 template <class Writer=ConnectedDgramWriter> 00138 class PassiveQueueingSocketSink : public Module 00139 { 00140 SENF_PPI_MODULE(PassiveQueueingSocketSink); 00141 00142 public: 00143 typedef typename Writer::Handle Handle; 00144 typedef typename Writer::PacketType PacketType; 00145 00146 connector::PassiveInput<PacketType> input; 00147 console::ScopedDirectory<PassiveQueueingSocketSink<Writer> > dir; 00148 00149 explicit PassiveQueueingSocketSink(Handle const & handle, QueueingAlgorithm::ptr qAlgorithm); 00150 00151 Writer & writer(); 00152 Handle & handle(); 00153 void handle(Handle const & handle); 00155 00157 QueueingAlgorithm & qAlgorithm(); 00158 void qAlgorithm(QueueingAlgorithm::ptr qAlgorithm); 00159 00160 private: 00161 void write(); 00162 void writable(); 00163 void checkThrottle(); 00164 void setQAlgorithm(std::string const & key); 00165 00166 Handle handle_; 00167 Writer writer_; 00168 boost::scoped_ptr<QueueingAlgorithm> qAlgo_; 00169 IOEvent event_; 00170 }; 00171 00172 }}} 00173 00174 //-///////////////////////////////////////////////////////////////////////////////////////////////// 00175 #include "QueueingSocketSink.cci" 00176 #include "QueueingSocketSink.ct" 00177 #include "QueueingSocketSink.cti" 00178 #endif 00179 00180 00181 // Local Variables: 00182 // mode: c++ 00183 // fill-column: 100 00184 // c-file-style: "senf" 00185 // indent-tabs-mode: nil 00186 // ispell-local-dictionary: "american" 00187 // compile-command: "scons -u test" 00188 // comment-column: 40 00189 // End: