EHS Embedded HTTP Server  1.5.1.0
wsendpoint.h
1 /*
2  * This file has been partially derived from the WebSockets++ project at
3  * https://github.com/zaphoyd/websocketpp which is licensed under a BSD-license.
4  */
5 
6 #ifndef WSENDPOINT_H
7 #define WSENDPOINT_H
8 
9 #include <vector>
10 #include <sstream>
11 #include <iostream>
12 #include <string>
13 #include <boost/thread.hpp>
14 #include "wsframe.h"
15 
16 #ifndef HAVE_BOOST_LOCK_GUARD
17 #include <pthread.h>
21 class MutexHelper {
22  public:
29  MutexHelper(pthread_mutex_t *mutex, bool locknow = true) :
30  m_pMutex(mutex), m_bLocked(false)
31  {
32  if (locknow)
33  Lock();
34  }
35 
40  {
41  if (m_bLocked)
42  pthread_mutex_unlock(m_pMutex);
43  }
44 
48  void Lock()
49  {
50  pthread_mutex_lock(m_pMutex);
51  m_bLocked = true;
52  }
53 
57  void Unlock()
58  {
59  m_bLocked = false;
60  pthread_mutex_unlock(m_pMutex);
61  }
62  private:
63  pthread_mutex_t *m_pMutex;
64  bool m_bLocked;
65 
66  MutexHelper(const MutexHelper &);
67  MutexHelper & operator=(const MutexHelper &);
68 };
69 #endif
70 
71 namespace wspp {
72  class wsendpoint;
73 
83  class wshandler {
84  public:
89  void send_text(const std::string & data) {
90  send(data, frame::opcode::TEXT);
91  }
92 
97  void send_binary(const std::string & data) {
98  send(data, frame::opcode::BINARY);
99  }
100 
102  wshandler() : m_endpoint(0) {}
103 
105  virtual ~wshandler() {}
106 
107  private:
108  virtual void on_message(std::string header, std::string data) = 0;
109  virtual void on_close() = 0;
110  virtual bool on_ping(const std::string & data) = 0;
111  virtual void on_pong(const std::string & data) = 0;
112  virtual void do_response(const std::string & data) = 0;
113 
114  void send(const std::string& payload, frame::opcode::value op);
115 
116  // Non-copyable
117  wshandler(const wshandler&);
118  wshandler& operator=(const wshandler&);
119 
120  wsendpoint *m_endpoint;
121  friend class wsendpoint;
122  };
123 
127  class wsendpoint {
128  private:
129  // Non-copyable
130  wsendpoint(const wsendpoint&);
131  wsendpoint& operator=(const wsendpoint&);
132 
133  public:
139  : m_rng(simple_rng())
140  , m_parser(frame::parser<simple_rng>(m_rng))
141  , m_state(session::state::OPEN)
142  , m_lock()
143  , m_handler(h)
144  {
145 #ifndef HAVE_BOOST_LOCK_GUARD
146  pthread_mutexattr_t mattr;
147  pthread_mutexattr_init(&mattr);
148  pthread_mutexattr_settype(&mattr, PTHREAD_MUTEX_RECURSIVE);
149  pthread_mutex_init(&m_lock, &mattr);
150  pthread_mutexattr_destroy(&mattr);
151 #endif
152  m_handler->m_endpoint = this;
153  }
154 
155 #ifndef HAVE_BOOST_LOCK_GUARD
156  ~wsendpoint() { pthread_mutex_destroy(&m_lock); }
157 #endif
158 
170  void AddRxData(std::string data)
171  {
172  std::istringstream s(data);
173  while (m_state != session::state::CLOSED && s.rdbuf()->in_avail()) {
174  try {
175  m_parser.consume(s);
176  if (m_parser.ready()) {
177  if (m_parser.is_control()) {
178  process_control();
179  } else {
180  process_data();
181  }
182  m_parser.reset();
183  }
184  } catch (const tracing::wserror & e) {
185  if (m_parser.ready()) {
186  m_parser.reset();
187  }
188  switch(e.code()) {
189  case tracing::wserror::PROTOCOL_VIOLATION:
190  send_close(close::status::PROTOCOL_ERROR,e.what());
191  break;
192  case tracing::wserror::PAYLOAD_VIOLATION:
193  send_close(close::status::INVALID_PAYLOAD,e.what());
194  break;
195  case tracing::wserror::INTERNAL_ENDPOINT_ERROR:
196  send_close(close::status::INTERNAL_ENDPOINT_ERROR,e.what());
197  break;
198  case tracing::wserror::SOFT_ERROR:
199  continue;
200  case tracing::wserror::MESSAGE_TOO_BIG:
201  send_close(close::status::MESSAGE_TOO_BIG,e.what());
202  break;
203  case tracing::wserror::OUT_OF_MESSAGES:
204  // we need to wait for a message to be returned by the
205  // client. We exit the read loop. handle_read_frame
206  // will be restarted by recycle()
207  //m_read_state = WAITING;
208  //m_endpoint.wait(type::shared_from_this());
209  return;
210  default:
211  // Fatal error, forcibly end connection immediately.
212  std::cerr
213  << "Dropping TCP due to unrecoverable exception: " << e.code()
214  << " (" << e.what() << ")" << std::endl;
215  shutdown();
216  }
217  break;
218  }
219  }
220  }
221 
229  void send(const std::string& payload, frame::opcode::value op) {
230  frame::parser<simple_rng> control(m_rng);
231  control.set_opcode(op);
232  control.set_fin(true);
233  control.set_masked(false);
234  control.set_payload(payload);
235 
236  std::string tmp(control.get_header_str());
237  tmp.append(control.get_payload_str());
238  m_handler->do_response(tmp);
239  }
240 
241  private:
242  void process_data() {
243  m_handler->on_message(m_parser.get_header_str(), m_parser.get_payload_str());
244  }
245 
247 
257  void shutdown() {
258 #ifdef HAVE_BOOST_LOCK_GUARD
259  boost::lock_guard<boost::recursive_mutex> lock(m_lock);
260 #else
261  // cppcheck-suppress unusedScopedObject
262  MutexHelper((pthread_mutex_t *)&m_lock);
263 #endif
264 
265  if (m_state == session::state::CLOSED) {return;}
266 
267  m_state = session::state::CLOSED;
268  m_handler->on_close();
269  }
270 
283  void pong(const std::vector<unsigned char> & payload) {
284 #ifdef HAVE_BOOST_LOCK_GUARD
285  boost::lock_guard<boost::recursive_mutex> lock(m_lock);
286 #else
287  // cppcheck-suppress unusedScopedObject
288  MutexHelper((pthread_mutex_t *)&m_lock);
289 #endif
290 
291  if (m_state != session::state::OPEN) {return;}
292  // if (m_detached) {return;}
293 
294  // TODO: optimize control messages and handle case where
295  // endpoint is out of messages
296  frame::parser<simple_rng> control(m_rng);
297  control.set_opcode(frame::opcode::PONG);
298  control.set_fin(true);
299  control.set_masked(false);
300  control.set_payload(payload);
301 
302  std::string tmp(control.get_header_str());
303  tmp.append(control.get_payload_str());
304  m_handler->do_response(tmp);
305  }
306 
308 
319  void send_close(close::status::value code, const std::string& reason) {
320 #ifdef HAVE_BOOST_LOCK_GUARD
321  boost::lock_guard<boost::recursive_mutex> lock(m_lock);
322 #else
323  // cppcheck-suppress unusedScopedObject
324  MutexHelper((pthread_mutex_t *)&m_lock);
325 #endif
326 
327  // if (m_detached) {return;}
328 
329  if (m_state != session::state::OPEN) {
330  std::cerr << "Tried to disconnect a session that wasn't open" << std::endl;
331  return;
332  }
333 
334  if (close::status::invalid(code)) {
335  std::cerr << "Tried to close a connection with invalid close code: "
336  << code << std::endl;
337  return;
338  } else if (close::status::reserved(code)) {
339  std::cerr << "Tried to close a connection with reserved close code: "
340  << code << std::endl;
341  return;
342  }
343 
344  m_state = session::state::CLOSING;
345 
346  frame::parser<simple_rng> control(m_rng);
347  control.set_opcode(frame::opcode::CLOSE);
348  control.set_fin(true);
349  control.set_masked(false);
350  if (code != close::status::NO_STATUS) {
351  const uint16_t payload = htons(code);
352  std::string pl(reinterpret_cast<const char*>(&payload), 2);
353  pl.append(reason);
354  control.set_payload(pl);
355  }
356 
357  std::string tmp(control.get_header_str());
358  tmp.append(control.get_payload_str());
359  m_handler->do_response(tmp);
360  }
361 
363 
368  void send_close_ack(close::status::value remote_close_code, std::string remote_close_reason) {
369  close::status::value local_close_code;
370  std::string local_close_reason;
371  // echo close value unless there is a good reason not to.
372  if (remote_close_code == close::status::NO_STATUS) {
373  local_close_code = close::status::NORMAL;
374  local_close_reason = "";
375  } else if (remote_close_code == close::status::ABNORMAL_CLOSE) {
376  // TODO: can we possibly get here? This means send_close_ack was
377  // called after a connection ended without getting a close
378  // frame
379  throw "shouldn't be here";
380  } else if (close::status::invalid(remote_close_code)) {
381  // TODO: shouldn't be able to get here now either
382  local_close_code = close::status::PROTOCOL_ERROR;
383  local_close_reason = "Status code is invalid";
384  } else if (close::status::reserved(remote_close_code)) {
385  // TODO: shouldn't be able to get here now either
386  local_close_code = close::status::PROTOCOL_ERROR;
387  local_close_reason = "Status code is reserved";
388  } else {
389  local_close_code = remote_close_code;
390  local_close_reason = remote_close_reason;
391  }
392 
393  // TODO: check whether we should cancel the current in flight write.
394  // if not canceled the close message will be sent as soon as the
395  // current write completes.
396 
397 
398  frame::parser<simple_rng> control(m_rng);
399  control.set_opcode(frame::opcode::CLOSE);
400  control.set_fin(true);
401  control.set_masked(false);
402  if (local_close_code != close::status::NO_STATUS) {
403  const uint16_t payload = htons(local_close_code);
404  std::string pl(reinterpret_cast<const char*>(&payload), 2);
405  pl.append(local_close_reason);
406  control.set_payload(pl);
407  }
408 
409  std::string tmp(control.get_header_str());
410  tmp.append(control.get_payload_str());
411  m_handler->do_response(tmp);
412  shutdown();
413  }
414 
415  void process_control() {
416  switch (m_parser.get_opcode()) {
417  case frame::opcode::PING:
418  if (m_handler->on_ping(m_parser.get_payload_str())) {
419  pong(m_parser.get_payload());
420  }
421  break;
422  case frame::opcode::PONG:
423  m_handler->on_pong(m_parser.get_payload_str());
424  break;
425  case frame::opcode::CLOSE:
426  // check that the codes we got over the wire are valid
427  if (m_state == session::state::OPEN) {
428  // other end is initiating
429  std::cerr << "sending close ack" << std::endl;
430 
431  // TODO:
432  send_close_ack(m_parser.get_close_code(), m_parser.get_close_reason());
433  } else if (m_state == session::state::CLOSING) {
434  // ack of our close
435  std::cerr << "got close ack" << std::endl;
436  shutdown();
437  }
438  break;
439  default:
440  throw tracing::wserror("Invalid Opcode",
441  tracing::wserror::PROTOCOL_VIOLATION);
442  break;
443  }
444  }
445 
446  private:
447  simple_rng m_rng;
448  frame::parser<simple_rng> m_parser;
449  session::state::value m_state;
450 #ifdef HAVE_BOOST_LOCK_GUARD
451  mutable boost::recursive_mutex m_lock;
452 #else
453  mutable pthread_mutex_t m_lock;
454 #endif
455  wshandler *m_handler;
456  };
457 
458  void wshandler::send(const std::string& payload, frame::opcode::value op)
459  {
460  if (m_endpoint) {
461  m_endpoint->send(payload, op);
462  }
463  }
464 
465 }
466 
467 #endif
Automatically unlocks a mutex if destroyed.
Definition: wsendpoint.h:21
~MutexHelper()
Unlocks the associated mutex.
Definition: wsendpoint.h:39
void Lock()
Locks the associated mutex.
Definition: wsendpoint.h:48
void Unlock()
Unlocks the associated mutex.
Definition: wsendpoint.h:57
MutexHelper(pthread_mutex_t *mutex, bool locknow=true)
Constructs a new instance.
Definition: wsendpoint.h:29
This class represents any errors in our WebSockets implementation.
Definition: wsframe.h:71
virtual int code() const
Retrieve error code.
Definition: wsframe.h:102
virtual const char * what() const
Retrieve error message.
Definition: wsframe.h:96
The main parser/codec for our WebSockets implementation.
Definition: wsframe.h:160
std::string get_payload_str() const
Retrieves payload.
Definition: wsframe.h:321
void set_masked(bool masked)
Set the MASKED bit of the current message.
Definition: wsframe.h:386
void set_payload(const std::string &source)
Set the payload of the current message.
Definition: wsframe.h:400
void set_opcode(opcode::value op)
Set the opcode of the current message.
Definition: wsframe.h:365
void set_fin(bool fin)
Set the FIN bit of the current message.
Definition: wsframe.h:345
std::string get_header_str()
Retrieves header.
Definition: wsframe.h:313
A VERY simplistic random generator.
Definition: wsframe.h:115
This class implements a server-side WebSockets endpoint.
Definition: wsendpoint.h:127
wsendpoint(wshandler *h)
Constructor.
Definition: wsendpoint.h:138
void AddRxData(std::string data)
Processes incoming data from the client.
Definition: wsendpoint.h:170
void send(const std::string &payload, frame::opcode::value op)
Send a data message.
Definition: wsendpoint.h:229
Event handler interface for the server-side WebSockets endpoint.
Definition: wsendpoint.h:83
virtual ~wshandler()
Destructor.
Definition: wsendpoint.h:105
void send_binary(const std::string &data)
Send a binary message to the remote client.
Definition: wsendpoint.h:97
wshandler()
Constructor.
Definition: wsendpoint.h:102
void send_text(const std::string &data)
Send a text message to the remote client.
Definition: wsendpoint.h:89