Fawkes API  Fawkes Development Version
oprs_protobuf.h
1 
2 /***************************************************************************
3  * oprs_protobuf.h - protobuf network communication for OpenPRS
4  *
5  * Created: Tue Sep 02 16:34:09 2014 (based on CLIPS version)
6  * Copyright 2013-2014 Tim Niemueller [www.niemueller.de]
7  ****************************************************************************/
8 
9 /* Redistribution and use in source and binary forms, with or without
10  * modification, are permitted provided that the following conditions
11  * are met:
12  *
13  * - Redistributions of source code must retain the above copyright
14  * notice, this list of conditions and the following disclaimer.
15  * - Redistributions in binary form must reproduce the above copyright
16  * notice, this list of conditions and the following disclaimer in
17  * the documentation and/or other materials provided with the
18  * distribution.
19  * - Neither the name of the authors nor the names of its contributors
20  * may be used to endorse or promote products derived from this
21  * software without specific prior written permission.
22  *
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
26  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
27  * COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
28  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
29  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
30  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
31  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
32  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
34  * OF THE POSSIBILITY OF SUCH DAMAGE.
35  */
36 
37 #ifndef _OPENPRS_AGENT_OPRS_PROTOBUF_H_
38 #define _OPENPRS_AGENT_OPRS_PROTOBUF_H_
39 
40 #include <core/threading/mutex.h>
41 #include <core/utils/lock_queue.h>
42 #include <protobuf_comm/server.h>
43 
44 #include <list>
45 #include <map>
46 #include <memory>
47 #include <oprs-type-pub.h>
48 #include <oprs-type_f-pub.h>
49 
50 namespace protobuf_comm {
51 class ProtobufStreamClient;
52 class ProtobufBroadcastPeer;
53 } // namespace protobuf_comm
54 
55 namespace oprs_protobuf {
56 
58 {
59 public:
60  OpenPRSProtobuf(std::vector<std::string> &proto_path);
62 
63  /** Get Protobuf server.
64  * @return protobuf server */
66  server() const
67  {
68  return server_;
69  }
70 
71  /** Get protobuf_comm peers.
72  * @return protobuf_comm peer */
73  const std::map<long int, protobuf_comm::ProtobufBroadcastPeer *> &
74  peers() const
75  {
76  return peers_;
77  }
78 
79  /** Get the communicator's message register.
80  * @return message register */
83  {
84  return *message_register_;
85  }
86 
87  /** Signal invoked for a message that has been sent to a server client.
88  * @return signal
89  */
90  boost::signals2::signal<void(protobuf_comm::ProtobufStreamServer::ClientID,
91  std::shared_ptr<google::protobuf::Message>)> &
93  {
94  return sig_server_sent_;
95  }
96 
97  /** Signal invoked for a message that has been sent to a client.
98  * @return signal
99  */
100  boost::signals2::signal<
101  void(std::string, unsigned short, std::shared_ptr<google::protobuf::Message>)> &
103  {
104  return sig_client_sent_;
105  }
106 
107  /** Signal invoked for a message that has been sent via broadcast.
108  * @return signal
109  */
110  boost::signals2::signal<void(long int, std::shared_ptr<google::protobuf::Message>)> &
112  {
113  return sig_peer_sent_;
114  }
115 
116  bool oprs_pb_register_type(std::string full_name);
117  Term *oprs_pb_field_names(void *msgptr);
118  bool oprs_pb_has_field(void *msgptr, std::string field_name);
119  Term *oprs_pb_field_value(void *msgptr, std::string field_name);
120  Term *oprs_pb_field_type(void *msgptr, std::string field_name);
121  Term *oprs_pb_field_label(void *msgptr, std::string field_name);
122  Term *oprs_pb_field_list(void *msgptr, std::string field_name);
123  bool oprs_pb_field_is_list(void *msgptr, std::string field_name);
124  std::shared_ptr<google::protobuf::Message> *oprs_create_msg(std::string full_name);
125  Term * oprs_pb_ref(void *msgptr);
126  Term * oprs_pb_destroy(void *msgptr);
127  void oprs_pb_set_field(void *msgptr, std::string field_name, Term *value);
128  void oprs_pb_add_list(void *msgptr, std::string field_name, Term *value);
129  void oprs_pb_send(long int client_id, void *msgptr);
130  Term *oprs_pb_client_connect(std::string host, int port);
131  void oprs_pb_disconnect(long int client_id);
132  void oprs_pb_broadcast(long int peer_id, void *msgptr);
133  void oprs_pb_enable_server(int port);
134  void oprs_pb_disable_server();
135 
136  Term *oprs_pb_peer_create(const std::string &host, int port);
137  Term *oprs_pb_peer_create_local(const std::string &host, int send_port, int recv_port);
138  Term *oprs_pb_peer_create_crypto(const std::string &host,
139  int port,
140  const std::string &crypto_key = "",
141  const std::string &cipher = "");
142  Term *oprs_pb_peer_create_local_crypto(const std::string &host,
143  int send_port,
144  int recv_port,
145  const std::string &crypto_key = "",
146  const std::string &cipher = "");
147  void oprs_pb_peer_destroy(long int peer_id);
148  void oprs_pb_peer_setup_crypto(long int peer_id,
149  const std::string &crypto_key,
150  const std::string &cipher);
151 
152  bool oprs_pb_events_pending();
153  void oprs_pb_process();
154 
155 private:
156  typedef enum { CT_SERVER, CT_CLIENT, CT_PEER } ClientType;
157  void clips_assert_message(std::pair<std::string, unsigned short> & endpoint,
158  uint16_t comp_id,
159  uint16_t msg_type,
160  std::shared_ptr<google::protobuf::Message> &msg,
161  ClientType ct,
162  unsigned int client_id = 0);
163  void handle_server_client_connected(protobuf_comm::ProtobufStreamServer::ClientID client,
164  boost::asio::ip::tcp::endpoint & endpoint);
165  void handle_server_client_disconnected(protobuf_comm::ProtobufStreamServer::ClientID client,
166  const boost::system::error_code & error);
167 
168  void handle_server_client_msg(protobuf_comm::ProtobufStreamServer::ClientID client,
169  uint16_t component_id,
170  uint16_t msg_type,
171  std::shared_ptr<google::protobuf::Message> msg);
172 
173  void handle_server_client_fail(protobuf_comm::ProtobufStreamServer::ClientID client,
174  uint16_t component_id,
175  uint16_t msg_type,
176  std::string msg);
177 
178  void handle_peer_msg(long int peer_id,
179  boost::asio::ip::udp::endpoint & endpoint,
180  uint16_t component_id,
181  uint16_t msg_type,
182  std::shared_ptr<google::protobuf::Message> msg);
183  void handle_peer_recv_error(long int peer_id,
184  boost::asio::ip::udp::endpoint &endpoint,
185  std::string msg);
186  void handle_peer_send_error(long int peer_id, const std::string &msg);
187 
188  void handle_client_connected(long int client_id);
189  void handle_client_disconnected(long int client_id, const boost::system::error_code &error);
190  void handle_client_msg(long int client_id,
191  uint16_t comp_id,
192  uint16_t msg_type,
193  std::shared_ptr<google::protobuf::Message> msg);
194  void handle_client_receive_fail(long int client_id,
195  uint16_t comp_id,
196  uint16_t msg_type,
197  const std::string &msg);
198  void oprs_assert_message(std::string & endpoint_host,
199  unsigned short endpoint_port,
200  uint16_t comp_id,
201  uint16_t msg_type,
202  std::shared_ptr<google::protobuf::Message> &msg,
203  OpenPRSProtobuf::ClientType ct,
204  unsigned int client_id);
205  void oprs_assert_server_client_event(long int client_id,
206  std::string & host,
207  unsigned short port,
208  bool connect);
209  void oprs_assert_client_event(long int client_id, bool connect);
210 
211 private:
212  std::shared_ptr<protobuf_comm::MessageRegister> message_register_;
214 
215  boost::signals2::signal<void(protobuf_comm::ProtobufStreamServer::ClientID,
216  std::shared_ptr<google::protobuf::Message>)>
217  sig_server_sent_;
218  boost::signals2::signal<
219  void(std::string, unsigned short, std::shared_ptr<google::protobuf::Message>)>
220  sig_client_sent_;
221  boost::signals2::signal<void(long int, std::shared_ptr<google::protobuf::Message>)>
222  sig_peer_sent_;
223 
224  fawkes::Mutex map_mutex_;
225  long int next_client_id_;
226 
227  std::map<long int, protobuf_comm::ProtobufStreamServer::ClientID> server_clients_;
228  typedef std::map<protobuf_comm::ProtobufStreamServer::ClientID, long int> RevServerClientMap;
229  RevServerClientMap rev_server_clients_;
230  std::map<long int, protobuf_comm::ProtobufStreamClient *> clients_;
231  std::map<long int, protobuf_comm::ProtobufBroadcastPeer *> peers_;
232 
233  std::map<long int, std::pair<std::string, unsigned short>> client_endpoints_;
234 
235  fawkes::LockQueue<std::tuple<std::string,
236  unsigned short,
237  uint16_t,
238  uint16_t,
239  std::shared_ptr<google::protobuf::Message>,
240  ClientType,
241  unsigned int>>
242  q_msgs_;
245 };
246 
247 } // namespace oprs_protobuf
248 
249 #endif
oprs_protobuf::OpenPRSProtobuf::oprs_create_msg
std::shared_ptr< google::protobuf::Message > * oprs_create_msg(std::string full_name)
Create a new message of given type.
Definition: oprs_protobuf.cpp:241
oprs_protobuf::OpenPRSProtobuf::oprs_pb_peer_create_local_crypto
Term * oprs_pb_peer_create_local_crypto(const std::string &host, int send_port, int recv_port, const std::string &crypto_key="", const std::string &cipher="")
Enable protobuf peer.
Definition: oprs_protobuf.cpp:120
oprs_protobuf::OpenPRSProtobuf
OpenPRS protobuf integration class.
Definition: oprs_protobuf.h:58
protobuf_comm::MessageRegister
Register to map msg type numbers to Protobuf messages.
Definition: message_register.h:66
oprs_protobuf::OpenPRSProtobuf::oprs_pb_client_connect
Term * oprs_pb_client_connect(std::string host, int port)
Connect as a client to the given server.
Definition: oprs_protobuf.cpp:758
fawkes::Mutex
Mutex mutual exclusion lock.
Definition: mutex.h:33
oprs_protobuf::OpenPRSProtobuf::server
protobuf_comm::ProtobufStreamServer * server() const
Get Protobuf server.
Definition: oprs_protobuf.h:66
oprs_protobuf::OpenPRSProtobuf::oprs_pb_field_list
Term * oprs_pb_field_list(void *msgptr, std::string field_name)
Get list of values of a given message field.
Definition: oprs_protobuf.cpp:893
protobuf_comm::ProtobufStreamServer
Stream server for protobuf message transmission.
Definition: server.h:62
oprs_protobuf::OpenPRSProtobuf::oprs_pb_events_pending
bool oprs_pb_events_pending()
Check if there are pending events.
Definition: oprs_protobuf.cpp:1037
oprs_protobuf::OpenPRSProtobuf::oprs_pb_set_field
void oprs_pb_set_field(void *msgptr, std::string field_name, Term *value)
Set a field.
Definition: oprs_protobuf.cpp:454
protobuf_comm::ProtobufStreamServer::ClientID
unsigned int ClientID
ID to identify connected clients.
Definition: server.h:65
oprs_protobuf::OpenPRSProtobuf::oprs_pb_peer_destroy
void oprs_pb_peer_destroy(long int peer_id)
Disable peer.
Definition: oprs_protobuf.cpp:196
oprs_protobuf::OpenPRSProtobuf::oprs_pb_peer_create_crypto
Term * oprs_pb_peer_create_crypto(const std::string &host, int port, const std::string &crypto_key="", const std::string &cipher="")
Enable protobuf peer.
Definition: oprs_protobuf.cpp:161
oprs_protobuf::OpenPRSProtobuf::oprs_pb_process
void oprs_pb_process()
Process all pending events.
Definition: oprs_protobuf.cpp:994
oprs_protobuf::OpenPRSProtobuf::oprs_pb_has_field
bool oprs_pb_has_field(void *msgptr, std::string field_name)
Check if message has a specific field.
Definition: oprs_protobuf.cpp:350
oprs_protobuf::OpenPRSProtobuf::oprs_pb_register_type
bool oprs_pb_register_type(std::string full_name)
Register a new message type.
Definition: oprs_protobuf.cpp:224
oprs_protobuf::OpenPRSProtobuf::oprs_pb_ref
Term * oprs_pb_ref(void *msgptr)
Create new reference to message.
Definition: oprs_protobuf.cpp:252
oprs_protobuf::OpenPRSProtobuf::oprs_pb_disable_server
void oprs_pb_disable_server()
Disable protobuf stream server.
Definition: oprs_protobuf.cpp:105
oprs_protobuf::OpenPRSProtobuf::oprs_pb_peer_setup_crypto
void oprs_pb_peer_setup_crypto(long int peer_id, const std::string &crypto_key, const std::string &cipher)
Setup crypto for peer.
Definition: oprs_protobuf.cpp:210
oprs_protobuf::OpenPRSProtobuf::oprs_pb_field_value
Term * oprs_pb_field_value(void *msgptr, std::string field_name)
Get properly typed field value.
Definition: oprs_protobuf.cpp:402
oprs_protobuf::OpenPRSProtobuf::peers
const std::map< long int, protobuf_comm::ProtobufBroadcastPeer * > & peers() const
Get protobuf_comm peers.
Definition: oprs_protobuf.h:74
oprs_protobuf::OpenPRSProtobuf::oprs_pb_add_list
void oprs_pb_add_list(void *msgptr, std::string field_name, Term *value)
Add value to a repeated field.
Definition: oprs_protobuf.cpp:616
oprs_protobuf::OpenPRSProtobuf::signal_peer_sent
boost::signals2::signal< void(long int, std::shared_ptr< google::protobuf::Message >)> & signal_peer_sent()
Signal invoked for a message that has been sent via broadcast.
Definition: oprs_protobuf.h:111
oprs_protobuf::OpenPRSProtobuf::signal_client_sent
boost::signals2::signal< void(std::string, unsigned short, std::shared_ptr< google::protobuf::Message >)> & signal_client_sent()
Signal invoked for a message that has been sent to a client.
Definition: oprs_protobuf.h:102
oprs_protobuf::OpenPRSProtobuf::OpenPRSProtobuf
OpenPRSProtobuf(std::vector< std::string > &proto_path)
Constructor.
Definition: oprs_protobuf.cpp:66
oprs_protobuf::OpenPRSProtobuf::message_register
protobuf_comm::MessageRegister & message_register()
Get the communicator's message register.
Definition: oprs_protobuf.h:82
oprs_protobuf::OpenPRSProtobuf::oprs_pb_field_is_list
bool oprs_pb_field_is_list(void *msgptr, std::string field_name)
Check if a given field is a list (repeated field).
Definition: oprs_protobuf.cpp:975
fawkes::LockQueue
Queue with a lock.
Definition: lock_queue.h:45
oprs_protobuf::OpenPRSProtobuf::oprs_pb_broadcast
void oprs_pb_broadcast(long int peer_id, void *msgptr)
Broadcast a message through a peer.
Definition: oprs_protobuf.cpp:836
oprs_protobuf::OpenPRSProtobuf::oprs_pb_field_type
Term * oprs_pb_field_type(void *msgptr, std::string field_name)
Get type if a specific field.
Definition: oprs_protobuf.cpp:309
oprs_protobuf::OpenPRSProtobuf::oprs_pb_destroy
Term * oprs_pb_destroy(void *msgptr)
Destroy given message (reference).
Definition: oprs_protobuf.cpp:270
oprs_protobuf::OpenPRSProtobuf::oprs_pb_send
void oprs_pb_send(long int client_id, void *msgptr)
Send message to a specific client.
Definition: oprs_protobuf.cpp:793
oprs_protobuf::OpenPRSProtobuf::oprs_pb_peer_create_local
Term * oprs_pb_peer_create_local(const std::string &host, int send_port, int recv_port)
Enable protobuf peer.
Definition: oprs_protobuf.cpp:187
oprs_protobuf::OpenPRSProtobuf::oprs_pb_disconnect
void oprs_pb_disconnect(long int client_id)
Disconnect a given client.
Definition: oprs_protobuf.cpp:865
oprs_protobuf::OpenPRSProtobuf::~OpenPRSProtobuf
~OpenPRSProtobuf()
Destructor.
Definition: oprs_protobuf.cpp:72
oprs_protobuf::OpenPRSProtobuf::oprs_pb_field_names
Term * oprs_pb_field_names(void *msgptr)
Get field names of message.
Definition: oprs_protobuf.cpp:286
oprs_protobuf::OpenPRSProtobuf::oprs_pb_peer_create
Term * oprs_pb_peer_create(const std::string &host, int port)
Enable protobuf peer.
Definition: oprs_protobuf.cpp:175
oprs_protobuf::OpenPRSProtobuf::oprs_pb_field_label
Term * oprs_pb_field_label(void *msgptr, std::string field_name)
Get a fields label.
Definition: oprs_protobuf.cpp:377
oprs_protobuf::OpenPRSProtobuf::oprs_pb_enable_server
void oprs_pb_enable_server(int port)
Enable protobuf stream server.
Definition: oprs_protobuf.cpp:87
oprs_protobuf::OpenPRSProtobuf::signal_server_sent
boost::signals2::signal< void(protobuf_comm::ProtobufStreamServer::ClientID, std::shared_ptr< google::protobuf::Message >)> & signal_server_sent()
Signal invoked for a message that has been sent to a server client.
Definition: oprs_protobuf.h:92