Fawkes API  Fawkes Development Version
fuse_client.cpp
1 
2 /***************************************************************************
3  * fuse_client.cpp - FUSE network transport client
4  *
5  * Created: Thu Mar 29 00:47:24 2007
6  * Copyright 2005-2007 Tim Niemueller [www.niemueller.de]
7  *
8  ****************************************************************************/
9 
10 /* This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version. A runtime exception applies to
14  * this software (see LICENSE.GPL_WRE file mentioned below for details).
15  *
16  * This program is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19  * GNU Library General Public License for more details.
20  *
21  * Read the full text in the LICENSE.GPL_WRE file in the doc directory.
22  */
23 
24 #include <core/exceptions/software.h>
25 #include <core/threading/mutex.h>
26 #include <core/threading/wait_condition.h>
27 #include <fvutils/net/fuse_client.h>
28 #include <fvutils/net/fuse_client_handler.h>
29 #include <fvutils/net/fuse_message.h>
30 #include <fvutils/net/fuse_message_queue.h>
31 #include <fvutils/net/fuse_transceiver.h>
32 #include <netcomm/socket/stream.h>
33 #include <netcomm/utils/exceptions.h>
34 #include <netinet/in.h>
35 
36 #include <cstdlib>
37 #include <cstring>
38 #include <unistd.h>
39 
40 using namespace fawkes;
41 
42 namespace firevision {
43 
44 /** @class FuseClient <fvutils/net/fuse_client.h>
45  * FUSE client.
46  * FUSE is the FireVision protocol to retrieve information, images and lookup
47  * tables from vision processes and to send control commands to these systems.
48  * The client is used in the retrieving or controlling process.
49  * @ingroup FUSE
50  * @ingroup FireVision
51  * @author Tim Niemueller
52  */
53 
54 /** Constructor.
55  * @param hostname host to connect to
56  * @param port port to connect to
57  * @param handler client handler to handle incoming data
58  */
59 FuseClient::FuseClient(const char *hostname, unsigned short int port, FuseClientHandler *handler)
60 : Thread("FuseClient")
61 {
62  hostname_ = strdup(hostname);
63  port_ = port;
64  handler_ = handler;
65 
66  wait_timeout_ = 10;
67 
68  inbound_msgq_ = new FuseNetworkMessageQueue();
69  outbound_msgq_ = new FuseNetworkMessageQueue();
70 
71  mutex_ = new Mutex();
72  recv_mutex_ = new Mutex();
73  recv_waitcond_ = new WaitCondition(recv_mutex_);
74  socket_ = new StreamSocket();
75  greeting_mutex_ = new Mutex();
76  greeting_waitcond_ = new WaitCondition(greeting_mutex_);
77 
78  alive_ = true;
79  greeting_received_ = false;
80 }
81 
82 /** Destructor. */
84 {
85  free(hostname_);
86 
87  while (!inbound_msgq_->empty()) {
88  FuseNetworkMessage *m = inbound_msgq_->front();
89  m->unref();
90  inbound_msgq_->pop();
91  }
92  delete inbound_msgq_;
93 
94  while (!outbound_msgq_->empty()) {
95  FuseNetworkMessage *m = outbound_msgq_->front();
96  m->unref();
97  outbound_msgq_->pop();
98  }
99  delete outbound_msgq_;
100 
101  delete mutex_;
102  delete recv_mutex_;
103  delete recv_waitcond_;
104  delete socket_;
105  delete greeting_mutex_;
106  delete greeting_waitcond_;
107 }
108 
109 /** Connect. */
110 void
112 {
113  socket_->connect(hostname_, port_);
114 
115  FUSE_greeting_message_t *greetmsg =
117  greetmsg->version = htonl(FUSE_CURRENT_VERSION);
118  outbound_msgq_->push(
119  new FuseNetworkMessage(FUSE_MT_GREETING, greetmsg, sizeof(FUSE_greeting_message_t)));
120 }
121 
122 /** Disconnect. */
123 void
125 {
126  mutex_->lock();
127  delete socket_;
128  socket_ = new StreamSocket();
129  alive_ = false;
130  mutex_->unlock();
131 }
132 
133 /** Send queued messages. */
134 void
135 FuseClient::send()
136 {
137  try {
138  FuseNetworkTransceiver::send(socket_, outbound_msgq_);
139  } catch (ConnectionDiedException &e) {
140  e.print_trace();
141  socket_->close();
142  alive_ = false;
143  handler_->fuse_connection_died();
144  recv_waitcond_->wake_all();
145  }
146 }
147 
148 /** Receive messages. */
149 void
150 FuseClient::recv()
151 {
152  recv_mutex_->lock();
153  try {
154  while (socket_->available()) {
155  FuseNetworkTransceiver::recv(socket_, inbound_msgq_);
156  }
157  } catch (ConnectionDiedException &e) {
158  e.print_trace();
159  socket_->close();
160  alive_ = false;
161  handler_->fuse_connection_died();
162  recv_waitcond_->wake_all();
163  }
164  recv_mutex_->unlock();
165 }
166 
167 /** Enqueue message.
168  * This method takes ownership of the passed message. You must explicitly
169  * reference it before enqueing if you want to use it afterwards.
170  * @param m message to enqueue
171  */
172 void
174 {
175  outbound_msgq_->push_locked(m);
176 }
177 
178 /** Enqueue message.
179  * @param type type of message
180  * @param payload payload of message
181  * @param payload_size size of payload
182  */
183 void
184 FuseClient::enqueue(FUSE_message_type_t type, void *payload, size_t payload_size)
185 {
186  FuseNetworkMessage *m = new FuseNetworkMessage(type, payload, payload_size);
187  outbound_msgq_->push_locked(m);
188 }
189 
190 /** Enqueue message without payload.
191  * @param type type of message
192  */
193 void
194 FuseClient::enqueue(FUSE_message_type_t type)
195 {
196  FuseNetworkMessage *m = new FuseNetworkMessage(type);
197  outbound_msgq_->push_locked(m);
198 }
199 
200 /** Enqueue message and wait for reply.
201  * The wait happens atomically, use this to avoid race conditions. This method
202  * takes ownership of the passed message. You must explicitly reference it
203  * before enqueing if you want to use it afterwards.
204  * @param m message to enqueue
205  */
206 void
208 {
209  recv_mutex_->lock();
210  outbound_msgq_->push_locked(m);
211  recv_waitcond_->wait();
212  recv_mutex_->unlock();
213 }
214 
215 /** Enqueue message and wait for reply.
216  * The wait happens atomically, use this to avoid race conditions.
217  * @param type type of message
218  * @param payload payload of message
219  * @param payload_size size of payload
220  */
221 void
222 FuseClient::enqueue_and_wait(FUSE_message_type_t type, void *payload, size_t payload_size)
223 {
224  FuseNetworkMessage *m = new FuseNetworkMessage(type, payload, payload_size);
225  recv_mutex_->lock();
226  outbound_msgq_->push_locked(m);
227  recv_waitcond_->wait();
228  recv_mutex_->unlock();
229 }
230 
231 /** Enqueue message without payload and wait for reply.
232  * The wait happens atomically, use this to avoid race conditions.
233  * @param type type of message
234  */
235 void
236 FuseClient::enqueue_and_wait(FUSE_message_type_t type)
237 {
238  FuseNetworkMessage *m = new FuseNetworkMessage(type);
239  recv_mutex_->lock();
240  outbound_msgq_->push_locked(m);
241  recv_waitcond_->wait();
242  recv_mutex_->unlock();
243 }
244 
245 /** Sleep for some time.
246  * Wait until inbound messages have been receive, the connection dies or the
247  * timeout has been reached, whatever comes first. So you sleep at most timeout ms,
248  * but short under some circumstances (incoming data or lost connection).
249  */
250 void
251 FuseClient::sleep()
252 {
253  try {
254  socket_->poll(wait_timeout_ /* ms timeout */, Socket::POLL_IN);
255  } catch (Exception &e) {
256  }
257 }
258 
259 /** Thread loop.
260  * Sends enqueued messages and reads incoming messages off the network.
261  */
262 void
264 {
265  mutex_->lock();
266 
267  if (!alive_) {
268  mutex_->unlock();
269  usleep(10000);
270  return;
271  }
272 
273  bool wake = false;
274 
275  send();
276  sleep();
277  recv();
278 
279  //process_inbound();
280 
281  inbound_msgq_->lock();
282  while (!inbound_msgq_->empty()) {
283  FuseNetworkMessage *m = inbound_msgq_->front();
284 
285  if (m->type() == FUSE_MT_GREETING) {
287  if (ntohl(gm->version) != FUSE_CURRENT_VERSION) {
288  handler_->fuse_invalid_server_version(FUSE_CURRENT_VERSION, ntohl(gm->version));
289  alive_ = false;
290  } else {
291  greeting_mutex_->lock();
292  greeting_received_ = true;
293  greeting_waitcond_->wake_all();
294  greeting_mutex_->unlock();
295  handler_->fuse_connection_established();
296  }
297  } else {
298  handler_->fuse_inbound_received(m);
299  wake = true;
300  }
301 
302  m->unref();
303  inbound_msgq_->pop();
304  }
305  inbound_msgq_->unlock();
306 
307  if (wake) {
308  recv_waitcond_->wake_all();
309  }
310  mutex_->unlock();
311 }
312 
313 /** Wait for messages.
314  * This will wait for messages to arrive. The calling
315  * thread is blocked until messages are available.
316  */
317 void
319 {
320  recv_mutex_->lock();
321  recv_waitcond_->wait();
322  recv_mutex_->unlock();
323 }
324 
325 /** Wait for greeting message.
326  * This method will wait for the greeting message to arrive. Make sure that you called
327  * connect() before waiting or call it concurrently in another thread. The calling thread
328  * will be blocked until the message has been received. If the message has already been
329  * received this method will return immediately. Thus it is safe to call this at any time
330  * without risking a race condition.
331  */
332 void
334 {
335  greeting_mutex_->lock();
336  while (!greeting_received_) {
337  greeting_waitcond_->wait();
338  }
339  greeting_mutex_->unlock();
340 }
341 
342 } // end namespace firevision
fawkes::Mutex::lock
void lock()
Lock this mutex.
Definition: mutex.cpp:87
fawkes::RefCount::unref
void unref()
Decrement reference count and conditionally delete this instance.
Definition: refcount.cpp:95
fawkes::LockQueue::push_locked
void push_locked(const Type &x)
Push element to queue with lock protection.
Definition: lock_queue.h:135
firevision::FuseNetworkMessageQueue
A LockQueue of FuseNetworkMessage to hold messages in inbound and outbound queues.
Definition: fuse_message_queue.h:38
fawkes::Mutex
Mutex mutual exclusion lock.
Definition: mutex.h:33
firevision::FuseNetworkTransceiver::send
static void send(fawkes::StreamSocket *s, FuseNetworkMessageQueue *msgq)
Send messages.
Definition: fuse_transceiver.cpp:54
fawkes::WaitCondition
Wait until a given condition holds.
Definition: wait_condition.h:37
firevision::FuseClient::disconnect
void disconnect()
Disconnect.
Definition: fuse_client.cpp:124
fawkes::Socket::connect
virtual void connect(const char *hostname, const unsigned short int port)
Connect socket.
Definition: socket.cpp:376
fawkes::Mutex::unlock
void unlock()
Unlock the mutex.
Definition: mutex.cpp:131
fawkes::Socket::close
virtual void close()
Close socket.
Definition: socket.cpp:311
fawkes::WaitCondition::wait
void wait()
Wait for the condition forever.
Definition: wait_condition.cpp:139
fawkes::Socket::poll
virtual short poll(int timeout=-1, short what=POLL_IN|POLL_HUP|POLL_PRI|POLL_RDHUP)
Wait for some event on socket.
Definition: socket.cpp:685
firevision::FUSE_greeting_message_t::version
uint32_t version
version from FUSE_version_t
Definition: fuse.h:99
firevision::FuseClient::wait
void wait()
Wait for messages.
Definition: fuse_client.cpp:318
firevision::FuseClientHandler::fuse_inbound_received
virtual void fuse_inbound_received(FuseNetworkMessage *m)=0
Message received.
firevision::FuseNetworkTransceiver::recv
static void recv(fawkes::StreamSocket *s, FuseNetworkMessageQueue *msgq, unsigned int max_num_msgs=8)
Receive data.
Definition: fuse_transceiver.cpp:88
fawkes
Fawkes library namespace.
firevision::FuseClient::loop
virtual void loop()
Thread loop.
Definition: fuse_client.cpp:263
firevision::FuseClient::~FuseClient
virtual ~FuseClient()
Destructor.
Definition: fuse_client.cpp:83
firevision::FuseClient::enqueue_and_wait
void enqueue_and_wait(FuseNetworkMessage *message)
Enqueue message and wait for reply.
Definition: fuse_client.cpp:207
fawkes::Exception::print_trace
void print_trace()
Prints trace to stderr.
Definition: exception.cpp:601
firevision::FuseNetworkMessage::type
uint32_t type() const
Get message type.
Definition: fuse_message.cpp:129
fawkes::Socket::available
virtual bool available()
Check if data is available.
Definition: socket.cpp:644
fawkes::WaitCondition::wake_all
void wake_all()
Wake up all waiting threads.
Definition: wait_condition.cpp:287
fawkes::LockQueue::lock
void lock() const
Lock queue.
Definition: lock_queue.h:114
firevision::FuseClientHandler::fuse_connection_died
virtual void fuse_connection_died()=0
Connection died.
firevision::FuseNetworkMessage::msg
MT * msg() const
Get correctly casted payload.
Definition: fuse_message.h:67
fawkes::Thread
Thread class encapsulation of pthreads.
Definition: thread.h:46
firevision::FuseClientHandler::fuse_invalid_server_version
virtual void fuse_invalid_server_version(uint32_t local_version, uint32_t remote_version)=0
Invalid version string received.
firevision::FuseClientHandler
FUSE client handler.
Definition: fuse_client_handler.h:34
firevision::FUSE_greeting_message_t
version packet, bi-directional
Definition: fuse.h:98
fawkes::StreamSocket
TCP stream socket over IP.
Definition: stream.h:32
firevision::FuseClient::enqueue
void enqueue(FuseNetworkMessage *m)
Enqueue message.
Definition: fuse_client.cpp:173
firevision::FuseClient::wait_greeting
void wait_greeting()
Wait for greeting message.
Definition: fuse_client.cpp:333
firevision::FuseClientHandler::fuse_connection_established
virtual void fuse_connection_established()=0
Connection has been established.
firevision::FuseNetworkMessage
FUSE Network Message.
Definition: fuse_message.h:40
fawkes::ConnectionDiedException
Thrown if the connection died during an operation.
Definition: exceptions.h:32
fawkes::LockQueue::unlock
void unlock() const
Unlock list.
Definition: lock_queue.h:128
fawkes::Exception
Base class for exceptions in Fawkes.
Definition: exception.h:36
firevision::FuseClient::connect
void connect()
Connect.
Definition: fuse_client.cpp:111