Fawkes API  Fawkes Development Version
server_client_thread.cpp
1 
2 /***************************************************************************
3  * server_client_thread.cpp - Thread handling Fawkes network client
4  *
5  * Created: Fri Nov 17 17:23:24 2006
6  * Copyright 2006-2007 Tim Niemueller [www.niemueller.de]
7  *
8  ****************************************************************************/
9 
10 /* This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version. A runtime exception applies to
14  * this software (see LICENSE.GPL_WRE file mentioned below for details).
15  *
16  * This program is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19  * GNU Library General Public License for more details.
20  *
21  * Read the full text in the LICENSE.GPL_WRE file in the doc directory.
22  */
23 
24 #include <core/exceptions/system.h>
25 #include <core/threading/mutex.h>
26 #include <core/threading/wait_condition.h>
27 #include <netcomm/fawkes/message_queue.h>
28 #include <netcomm/fawkes/server_client_thread.h>
29 #include <netcomm/fawkes/server_thread.h>
30 #include <netcomm/fawkes/transceiver.h>
31 #include <netcomm/socket/stream.h>
32 #include <netcomm/utils/exceptions.h>
33 
34 #include <unistd.h>
35 
36 namespace fawkes {
37 
38 /** @class FawkesNetworkServerClientSendThread <netcomm/fawkes/server_client_thread.h>
39  * Sending thread for a Fawkes client connected to the server.
40  * This thread is spawned for each client connected to the server to handle the
41  * server-side sending
42  * @ingroup NetComm
43  * @author Tim Niemueller
44  */
45 
47 {
48 public:
49  /** Constructor.
50  * @param s client stream socket
51  * @param parent parent FawkesNetworkServerClientThread instance
52  */
54  : Thread("FawkesNetworkServerClientSendThread", Thread::OPMODE_WAITFORWAKEUP)
55  {
56  s_ = s;
57  parent_ = parent;
58  outbound_mutex_ = new Mutex();
59  outbound_msgqs_[0] = new FawkesNetworkMessageQueue();
60  outbound_msgqs_[1] = new FawkesNetworkMessageQueue();
61  outbound_active_ = 0;
62  outbound_msgq_ = outbound_msgqs_[0];
63  }
64 
65  /** Destructor. */
67  {
68  for (unsigned int i = 0; i < 2; ++i) {
69  while (!outbound_msgqs_[i]->empty()) {
70  FawkesNetworkMessage *m = outbound_msgqs_[i]->front();
71  m->unref();
72  outbound_msgqs_[i]->pop();
73  }
74  }
75  delete outbound_msgqs_[0];
76  delete outbound_msgqs_[1];
77  delete outbound_mutex_;
78  }
79 
80  virtual void
81  loop()
82  {
83  if (!parent_->alive())
84  return;
85 
86  while (outbound_havemore_) {
87  outbound_mutex_->lock();
88  outbound_havemore_ = false;
89  FawkesNetworkMessageQueue *q = outbound_msgq_;
90  outbound_active_ = 1 - outbound_active_;
91  outbound_msgq_ = outbound_msgqs_[outbound_active_];
92  outbound_mutex_->unlock();
93 
94  if (!q->empty()) {
95  try {
97  } catch (ConnectionDiedException &e) {
98  parent_->connection_died();
99  exit();
100  }
101  }
102  }
103  }
104 
105  /** Enqueue message to outbound queue.
106  * This enqueues the given message to the outbound queue. The message will
107  * be sent in the next loop iteration. This method takes ownership of the
108  * transmitted message. If you want to use the message after enqueuing you
109  * must reference it explicitly.
110  * @param msg message to enqueue
111  */
112  void
114  {
115  outbound_mutex_->lock();
116  outbound_msgq_->push(msg);
117  outbound_havemore_ = true;
118  outbound_mutex_->unlock();
119  wakeup();
120  }
121 
122  /** Wait until all data has been sent. */
123  void
125  {
126  loop_mutex->lock();
127  loop_mutex->unlock();
128  }
129 
130  /** Stub to see name in backtrace for easier debugging. @see Thread::run() */
131 protected:
132  virtual void
133  run()
134  {
135  Thread::run();
136  }
137 
138 private:
139  StreamSocket * s_;
141 
142  Mutex * outbound_mutex_;
143  unsigned int outbound_active_;
144  bool outbound_havemore_;
145  FawkesNetworkMessageQueue *outbound_msgq_;
146  FawkesNetworkMessageQueue *outbound_msgqs_[2];
147 };
148 
149 /** @class FawkesNetworkServerClientThread netcomm/fawkes/server_client_thread.h
150  * Fawkes Network Client Thread for server.
151  * The FawkesNetworkServerThread spawns an instance of this class for every incoming
152  * connection. It is then used to handle the client.
153  * The thread will start another thread, an instance of
154  * FawkesNetworkServerClientSendThread. This will be used to handle all outgoing
155  * traffic.
156  *
157  * @ingroup NetComm
158  * @author Tim Niemueller
159  */
160 
161 /** Constructor.
162  * @param s socket to client
163  * @param parent parent network thread
164  */
167 : Thread("FawkesNetworkServerClientThread")
168 {
169  _s = s;
170  _parent = parent;
171  _alive = true;
172  _clid = 0;
173  _inbound_queue = new FawkesNetworkMessageQueue();
174 
175  _send_slave = new FawkesNetworkServerClientSendThread(_s, this);
176 
177  set_prepfin_conc_loop(true);
178 }
179 
180 /** Destructor. */
182 {
183  _send_slave->cancel();
184  _send_slave->join();
185  delete _send_slave;
186  delete _s;
187  delete _inbound_queue;
188 }
189 
190 /** Get client ID.
191  * The client ID can be used to send replies.
192  * @return client ID
193  */
194 unsigned int
196 {
197  return _clid;
198 }
199 
200 /** Set client ID.
201  * @param client_id new client ID
202  */
203 void
205 {
206  _clid = client_id;
207 }
208 
209 /** Receive data.
210  * Receives data from the network if there is any and then dispatches all
211  * inbound messages via the parent FawkesNetworkThread::dispatch()
212  */
213 void
214 FawkesNetworkServerClientThread::recv()
215 {
216  try {
217  FawkesNetworkTransceiver::recv(_s, _inbound_queue);
218 
219  _inbound_queue->lock();
220  while (!_inbound_queue->empty()) {
221  FawkesNetworkMessage *m = _inbound_queue->front();
222  m->set_client_id(_clid);
223  _parent->dispatch(m);
224  m->unref();
225  _inbound_queue->pop();
226  }
227  _parent->wakeup();
228  _inbound_queue->unlock();
229 
230  } catch (ConnectionDiedException &e) {
231  _alive = false;
232  _s->close();
233  _parent->wakeup();
234  }
235 }
236 
237 void
239 {
240  _send_slave->start();
241 }
242 
243 /** Thread loop.
244  * The client thread loop polls on the socket for 10 ms (wait for events
245  * on the socket like closed connection or data that can be read). If any
246  * event occurs it is processed. If the connection died or any other
247  * error occured the thread is cancelled and the parent FawkesNetworkThread
248  * is woken up to carry out any action that is needed when a client dies.
249  * If data is available for reading thedata is received and dispatched
250  * via recv().
251  * Afterwards the outbound message queue is processed and alle messages are
252  * sent. This is also done if the operation could block (POLL_OUT is not
253  * honored).
254  */
255 void
257 {
258  if (!_alive) {
259  usleep(1000000);
260  return;
261  }
262 
263  short p = 0;
264  try {
265  p = _s->poll(); // block until we got a message
266  } catch (InterruptedException &e) {
267  // we just ignore this and try it again
268  return;
269  }
270 
271  if ((p & Socket::POLL_ERR) || (p & Socket::POLL_HUP) || (p & Socket::POLL_RDHUP)) {
272  _alive = false;
273  _parent->wakeup();
274  } else if (p & Socket::POLL_IN) {
275  // Data can be read
276  recv();
277  }
278 }
279 
280 /** Enqueue message to outbound queue.
281  * This enqueues the given message to the outbound queue. The message will be send
282  * in the next loop iteration.
283  * @param msg message to enqueue
284  */
285 void
287 {
288  _send_slave->enqueue(msg);
289 }
290 
291 /** Check aliveness of connection.
292  * @return true if connection is still alive, false otherwise.
293  */
294 bool
296 {
297  return _alive;
298 }
299 
300 /** Force sending of all pending outbound messages.
301  * This is a blocking operation. The current poll will be interrupted by sending
302  * a signal to this thread (and ignoring it) and then wait for the sending to
303  * finish.
304  */
305 void
307 {
308  _send_slave->wait_for_all_sent();
309 }
310 
311 /** Connection died notification.
312  * To be called only be the send slave thread.
313  */
314 void
316 {
317  _alive = false;
318  _parent->wakeup();
319 }
320 
321 } // end namespace fawkes
fawkes::Mutex::lock
void lock()
Lock this mutex.
Definition: mutex.cpp:87
fawkes::RefCount::unref
void unref()
Decrement reference count and conditionally delete this instance.
Definition: refcount.cpp:95
fawkes::Thread::set_prepfin_conc_loop
void set_prepfin_conc_loop(bool concurrent=true)
Set concurrent execution of prepare_finalize() and loop().
Definition: thread.cpp:716
fawkes::Socket::POLL_HUP
static const short POLL_HUP
Hang up.
Definition: socket.h:71
fawkes::Mutex
Mutex mutual exclusion lock.
Definition: mutex.h:33
fawkes::FawkesNetworkServerClientSendThread
Sending thread for a Fawkes client connected to the server.
Definition: server_client_thread.cpp:47
fawkes::FawkesNetworkServerClientThread::connection_died
void connection_died()
Connection died notification.
Definition: server_client_thread.cpp:315
fawkes::FawkesNetworkServerClientThread::once
virtual void once()
Execute an action exactly once.
Definition: server_client_thread.cpp:238
fawkes::FawkesNetworkMessageQueue
A LockQueue of FawkesNetworkMessage to hold messages in inbound and outbound queues.
Definition: message_queue.h:33
fawkes::Thread::wakeup
void wakeup()
Wake up thread.
Definition: thread.cpp:995
fawkes::InterruptedException
The current system call has been interrupted (for instance by a signal).
Definition: system.h:39
fawkes::FawkesNetworkServerThread::dispatch
void dispatch(FawkesNetworkMessage *msg)
Dispatch messages.
Definition: server_thread.cpp:378
fawkes::Socket::POLL_RDHUP
static const short POLL_RDHUP
Stream socket peer closed connection, or shut down writing half of connection.
Definition: socket.h:69
fawkes::FawkesNetworkServerClientThread::force_send
void force_send()
Force sending of all pending outbound messages.
Definition: server_client_thread.cpp:306
fawkes::Mutex::unlock
void unlock()
Unlock the mutex.
Definition: mutex.cpp:131
fawkes::Socket::POLL_ERR
static const short POLL_ERR
Error condition.
Definition: socket.h:70
fawkes::Socket::close
virtual void close()
Close socket.
Definition: socket.cpp:311
fawkes::FawkesNetworkServerClientSendThread::wait_for_all_sent
void wait_for_all_sent()
Wait until all data has been sent.
Definition: server_client_thread.cpp:124
fawkes::FawkesNetworkServerClientSendThread::~FawkesNetworkServerClientSendThread
~FawkesNetworkServerClientSendThread()
Destructor.
Definition: server_client_thread.cpp:66
fawkes::Socket::poll
virtual short poll(int timeout=-1, short what=POLL_IN|POLL_HUP|POLL_PRI|POLL_RDHUP)
Wait for some event on socket.
Definition: socket.cpp:685
fawkes::FawkesNetworkTransceiver::recv
static void recv(StreamSocket *s, FawkesNetworkMessageQueue *msgq, unsigned int max_num_msgs=8)
Receive data.
Definition: transceiver.cpp:85
fawkes::FawkesNetworkTransceiver::send
static void send(StreamSocket *s, FawkesNetworkMessageQueue *msgq)
Send messages.
Definition: transceiver.cpp:51
fawkes::Thread::exit
void exit()
Exit the thread.
Definition: thread.cpp:582
fawkes::FawkesNetworkServerClientThread
Fawkes Network Client Thread for server.
Definition: server_client_thread.h:42
fawkes
Fawkes library namespace.
fawkes::Thread::OPMODE_WAITFORWAKEUP
@ OPMODE_WAITFORWAKEUP
operate in wait-for-wakeup mode
Definition: thread.h:58
fawkes::Socket::POLL_IN
static const short POLL_IN
Data can be read.
Definition: socket.h:66
fawkes::FawkesNetworkServerClientThread::~FawkesNetworkServerClientThread
~FawkesNetworkServerClientThread()
Destructor.
Definition: server_client_thread.cpp:181
fawkes::FawkesNetworkServerClientThread::loop
virtual void loop()
Thread loop.
Definition: server_client_thread.cpp:256
fawkes::FawkesNetworkServerClientSendThread::FawkesNetworkServerClientSendThread
FawkesNetworkServerClientSendThread(StreamSocket *s, FawkesNetworkServerClientThread *parent)
Constructor.
Definition: server_client_thread.cpp:53
fawkes::FawkesNetworkServerClientThread::FawkesNetworkServerClientThread
FawkesNetworkServerClientThread(StreamSocket *s, FawkesNetworkServerThread *parent)
Constructor.
Definition: server_client_thread.cpp:165
fawkes::FawkesNetworkServerClientThread::enqueue
void enqueue(FawkesNetworkMessage *msg)
Enqueue message to outbound queue.
Definition: server_client_thread.cpp:286
fawkes::FawkesNetworkServerClientThread::set_clid
void set_clid(unsigned int client_id)
Set client ID.
Definition: server_client_thread.cpp:204
fawkes::LockQueue::lock
void lock() const
Lock queue.
Definition: lock_queue.h:114
fawkes::Thread::run
virtual void run()
Code to execute in the thread.
Definition: thread.cpp:918
fawkes::Thread
Thread class encapsulation of pthreads.
Definition: thread.h:46
fawkes::FawkesNetworkServerClientThread::clid
unsigned int clid() const
Get client ID.
Definition: server_client_thread.cpp:195
fawkes::Thread::start
void start(bool wait=true)
Call this method to start the thread.
Definition: thread.cpp:499
fawkes::FawkesNetworkMessage
Representation of a message that is sent over the network.
Definition: message.h:77
fawkes::FawkesNetworkServerThread
Fawkes Network Thread.
Definition: server_thread.h:49
fawkes::StreamSocket
TCP stream socket over IP.
Definition: stream.h:32
fawkes::Thread::loop_mutex
Mutex * loop_mutex
Mutex that is used to protect a call to loop().
Definition: thread.h:152
fawkes::Thread::cancel
void cancel()
Cancel a thread.
Definition: thread.cpp:646
fawkes::FawkesNetworkMessage::set_client_id
void set_client_id(unsigned int clid)
Set client ID.
Definition: message.cpp:330
fawkes::FawkesNetworkServerClientThread::alive
bool alive() const
Check aliveness of connection.
Definition: server_client_thread.cpp:295
fawkes::FawkesNetworkServerClientSendThread::enqueue
void enqueue(FawkesNetworkMessage *msg)
Enqueue message to outbound queue.
Definition: server_client_thread.cpp:113
fawkes::Thread::join
void join()
Join the thread.
Definition: thread.cpp:597
fawkes::ConnectionDiedException
Thrown if the connection died during an operation.
Definition: exceptions.h:32
fawkes::FawkesNetworkServerClientSendThread::loop
virtual void loop()
Code to execute in the thread.
Definition: server_client_thread.cpp:81
fawkes::FawkesNetworkServerClientSendThread::run
virtual void run()
Stub to see name in backtrace for easier debugging.
Definition: server_client_thread.cpp:133
fawkes::LockQueue::unlock
void unlock() const
Unlock list.
Definition: lock_queue.h:128