commit 58585b24950a62a9c25f38b8607c5eec3cb56992 Author: melode11 Date: Thu Mar 26 21:46:07 2015 +0800 initial push diff --git a/README.md b/README.md new file mode 100755 index 0000000..7baec07 --- /dev/null +++ b/README.md @@ -0,0 +1,28 @@ +# SIO Client +This SIO client is depends on [websocket++](https://github.com/zaphoyd/websocketpp) and [rapidjson](https://github.com/miloyip/rapidjson), it provides another C++ client implementation for [Socket.IO](https://github.com/Automattic/socket.io). +This library is able to connect to a Socket.IO server 1.0, and its inspired by the [socket.io-clientpp](https://github.com/ebshimizu/socket.io-clientpp) project but I've rewrite 90% of the code. + +## Socket.IO 1.0+ protocol has been implemented! +The code is compatible with 1.0+ protocol only, not with prior protocols. + +## C++11 only for now +C++11 saves much time for me, so this is C++11 only for the first version. +I'll do further compatibility efforts on demand. + +## Supported features +1. Internal thread manangement. +2. Sends plain text messages. +3. Sends binary messages. +4. Sends structured messages with text and binary all together. +5. Sends messages with an ack and its corresponding callback. +6. Receives messages and automatically sends customable ack if need. +7. Automatically ping/pong messages and timeout management. +8. Reconnection. + +## Usage +1. Make sure you have the boost libararies installed. +2. Include websocket++, rapidjson and `sio_client.cpp`,`sio_packet.cpp` in your project. +3. Include `sio_client.h` where you want to use it. +4. Use `message` and its derived classes to compose complex text/binary messages. + +sio client specific source is released under the BSD license. \ No newline at end of file diff --git a/license.txt b/license.txt new file mode 100755 index 0000000..c18c759 --- /dev/null +++ b/license.txt @@ -0,0 +1,24 @@ +Copyright (c) 2015, Melo Yao +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + * Neither the name of the socket.io-client++ project nor the + names of its contributors may be used to endorse or promote products + derived from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL EVAN SHIMIZU BE LIABLE FOR ANY +DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. \ No newline at end of file diff --git a/src/sio_client.cpp b/src/sio_client.cpp new file mode 100755 index 0000000..06aa7e7 --- /dev/null +++ b/src/sio_client.cpp @@ -0,0 +1,918 @@ +// +// sio_client.h +// +// Created by Melo Yao on 3/25/15. +// + +#include "sio_client.h" + +#ifndef _WEBSOCKETPP_CPP11_STL_ +#define _WEBSOCKETPP_CPP11_STL_ 1 +#endif + +#include +#if _DEBUG || DEBUG +#include +typedef websocketpp::config::debug_asio client_config; +#else +#include +typedef websocketpp::config::asio_client client_config; +#endif + +#include +#include +#include +#include +#include +#include +#include +#include "sio_packet.h" +// Comment this out to disable handshake logging to stdout +#if DEBUG || _DEBUG +#define LOG(x) std::cout << x +#else +#define LOG(x) +#endif + +using namespace rapidjson; +using namespace websocketpp; +using boost::posix_time::milliseconds; +using std::stringstream; + +namespace sio +{ + typedef client client_type; + + class client::impl { + protected: + impl(); + + ~impl(); + + //set listeners and event bindings. +#define SYNTHESIS_SETTER(__TYPE__,__FIELD__) \ +void set_##__FIELD__(__TYPE__ const& l) \ +{ m_##__FIELD__ = l;} + + SYNTHESIS_SETTER(con_listener,open_listener) + + SYNTHESIS_SETTER(con_listener,fail_listener) + + SYNTHESIS_SETTER(con_listener,connect_listener) + + SYNTHESIS_SETTER(close_listener,close_listener) + + SYNTHESIS_SETTER(event_listener, default_event_listener) + + SYNTHESIS_SETTER(error_listener, error_listener) //socket io errors + + void bind_event(std::string const& event_name,event_listener const& func) + { + m_event_binding[event_name] = func; + } + + void unbind_event(std::string const& event_name) + { + auto it = m_event_binding.find(event_name); + if(it!=m_event_binding.end()) + { + m_event_binding.erase(it); + } + } + + void clear_socketio_listeners() + { + m_default_event_listener = nullptr; + m_error_listener = nullptr; + } + + void clear_con_listeners() + { + m_open_listener = nullptr; + m_close_listener = nullptr; + m_fail_listener = nullptr; + m_connect_listener = nullptr; + } + + void clear_event_bindings() + { + m_event_binding.clear(); + } + // Client Functions - such as send, etc. + + //event emit functions, for plain message,json and binary + void emit(std::string const& name, std::string const& message); + + void emit(std::string const& name, std::string const& message, std::function const& ack); + + void emit(std::string const& name, message::ptr const& args); + + void emit(std::string const& name, message::ptr const& args, std::function const& ack); + + void emit(std::string const& name, std::shared_ptr const& binary_ptr); + + void emit(std::string const& name, std::shared_ptr const& binary_ptr, std::function const& ack); + + + void connect(const std::string& uri); + + void reconnect(const std::string& uri); + + // Closes the connection + void close(); + + void sync_close(); + + std::string const& get_sessionid() const { return m_sid; } + + bool connected() const { return m_connected; } + + friend class client; + private: + void send(std::shared_ptr const& payload_ptr,frame::opcode::value opcode); + + void __close(close::status::value const& code,std::string const& reason); + + void __connect(const std::string& uri); + + void __send(std::shared_ptr const& payload_ptr,frame::opcode::value opcode); + + void __ack(int msgId,string const& name,message::ptr const& ack_message); + + void __ping(const boost::system::error_code& ec); + + void __timeout_pong(const boost::system::error_code& ec); + + void __timeout_connection(const boost::system::error_code& ec); + + void run_loop(); + + void on_decode(packet const& pack); + void on_encode(bool isBinary,shared_ptr const& payload); + + // Callbacks + void on_fail(connection_hdl con); + void on_open(connection_hdl con); + void on_close(connection_hdl con); + void on_message(connection_hdl con, client_type::message_ptr msg); + void on_handshake(message::ptr const& message); + void on_connected(); + void on_pong(); + + void on_pong_timeout(); + + // Message Parsing callbacks. + void on_socketio_event(int msgId,const std::string& name, message::ptr const& message); + void on_socketio_ack(int msgId, message::ptr const& message); + + void on_socketio_error(message::ptr const& err_message); + + void reset_states(); + + void clear_timers(); + + // Connection pointer for client functions. + connection_hdl m_con; + client_type m_client; + // Socket.IO server settings + std::string m_sid; + unsigned int m_ping_interval; + unsigned int m_ping_timeout; + + bool m_connected; + std::string m_nsp; + + // Currently we assume websocket as the transport, though you can find others in this string + std::map > m_acks; + + std::map m_event_binding; + + static unsigned int s_global_event_id; + + std::unique_ptr m_network_thread; + + struct message_queue_element + { + frame::opcode::value opcode; + std::shared_ptr payload_ptr; + }; + + packet_manager m_packet_mgr; + + std::queue m_message_queue; + + std::unique_ptr m_ping_timer; + + std::unique_ptr m_ping_timeout_timer; + + std::unique_ptr m_connection_timer; + + con_listener m_open_listener; + con_listener m_connect_listener; + con_listener m_fail_listener; + close_listener m_close_listener; + + event_listener m_default_event_listener; + error_listener m_error_listener; + + }; + + client::client(): + m_impl(new impl()) + { + } + + client::~client() + { + delete m_impl; + } + + void client::set_open_listener(con_listener const& l) + { + m_impl->set_open_listener(l); + } + + void client::set_fail_listener(con_listener const& l) + { + m_impl->set_fail_listener(l); + } + + void client::set_connect_listener(con_listener const& l) + { + m_impl->set_connect_listener(l); + } + + void client::set_close_listener(close_listener const& l) + { + m_impl->set_close_listener(l); + } + + void client::set_default_event_listener(event_listener const& l) + { + m_impl->set_default_event_listener(l); + } + + void client::set_error_listener(error_listener const& l) + { + m_impl->set_error_listener(l); + } + + void client::bind_event(std::string const& event_name,event_listener const& func) + { + m_impl->bind_event(event_name,func); + } + + void client::unbind_event(std::string const& event_name) + { + m_impl->unbind_event(event_name); + } + + void client::clear_socketio_listeners() + { + m_impl->clear_socketio_listeners(); + } + + void client::clear_con_listeners() + { + m_impl->clear_con_listeners(); + } + + void client::clear_event_bindings() + { + m_impl->clear_event_bindings(); + } + // Client Functions - such as send, etc. + + //event emit functions, for plain message,json and binary + void client::emit(std::string const& name, std::string const& message) + { + m_impl->emit(name,message); + } + + void client::emit(std::string const& name, std::string const& message, std::function const& ack) + { + m_impl->emit(name,message,ack); + } + + void client::emit(std::string const& name, message::ptr const& args) + { + m_impl->emit(name,args); + } + + void client::emit(std::string const& name, message::ptr const& args, std::function const& ack) + { + m_impl->emit(name,args,ack); + } + + void client::emit(std::string const& name, std::shared_ptr const& binary_ptr) + { + m_impl->emit(name,binary_ptr); + } + + void client::emit(std::string const& name, std::shared_ptr const& binary_ptr, std::function const& ack) + { + m_impl->emit(name,binary_ptr,ack); + } + + void client::connect(const std::string& uri) + { + m_impl->connect(uri); + } + + void client::reconnect(const std::string& uri) + { + m_impl->reconnect(uri); + } + + // Closes the connection + void client::close() + { + m_impl->close(); + } + + void client::sync_close() + { + m_impl->sync_close(); + } + + std::string const& client::get_sessionid() const + { + return m_impl->get_sessionid(); + } + + bool client::connected() const + { + return m_impl->connected(); + } + + + client::impl::impl() : + m_connected(false), + m_ping_interval(0), + m_ping_timeout(0), + m_network_thread() + { + using websocketpp::log::alevel; +#ifndef DEBUG + m_client.clear_access_channels(alevel::all); + m_client.set_access_channels(alevel::connect|alevel::disconnect|alevel::app); +#endif + // Initialize the Asio transport policy + m_client.init_asio(); + + // Bind the clients we are using + using websocketpp::lib::placeholders::_1; + using websocketpp::lib::bind; + m_client.set_open_client(bind(&client::impl::on_open,this,::_1)); + m_client.set_close_client(bind(&client::impl::on_close,this,::_1)); + m_client.set_fail_client(bind(&client::impl::on_fail,this,::_1)); + m_client.set_message_client(bind(&client::impl::on_message,this,::_1,::_2)); + + m_packet_mgr.set_decode_callback(bind(&client::impl::on_decode,this,::_1)); + + m_packet_mgr.set_encode_callback(bind(&client::impl::on_encode,this,::_1,::_2)); + } + + client::impl::~impl() + { + sync_close(); + } + + // Websocket++ client client + void client::impl::on_fail(connection_hdl con) + { + m_con.reset(); + m_connected = false; + + LOG("Connection failed." << std::endl); + if(m_fail_listener)m_fail_listener(); + } + + void client::impl::on_connected() + { + LOG("On Connected." << std::endl); + m_connected = true; + if(m_connection_timer) + { + boost::system::error_code ec; + m_connection_timer->cancel(ec); + m_connection_timer.reset(); + } + if(m_connect_listener) + { + m_connect_listener(); + } + } + + void client::impl::on_pong() + { + if(m_ping_timeout_timer) + { + m_ping_timeout_timer->cancel(); + m_ping_timeout_timer.reset(); + } + } + + void client::impl::on_handshake(message::ptr const& message) + { + if(message && message->get_flag() == message::flag_object) + { + const object_message* obj_ptr =static_cast(message.get()); + const map* values = &(obj_ptr->get_map()); + auto it = values->find("sid"); + if (it!= values->end()) { + m_sid = std::static_pointer_cast(it->second)->get_string(); + } + else + { + goto failed; + } + it = values->find("pingInterval"); + if (it!= values->end()&&it->second->get_flag() == message::flag_integer) { + m_ping_interval = (unsigned)std::static_pointer_cast(it->second)->get_int(); + } + else + { + m_ping_interval = 25000; + } + it = values->find("pingTimeout"); + + if (it!=values->end()&&it->second->get_flag() == message::flag_integer) { + m_ping_timeout = (unsigned) std::static_pointer_cast(it->second)->get_int(); + } + else + { + m_ping_timeout = 60000; + } + + m_ping_timer.reset(new boost::asio::deadline_timer(m_client.get_io_service())); + boost::system::error_code ec; + m_ping_timer->expires_from_now(milliseconds(m_ping_interval), ec); + m_ping_timer->async_wait(lib::bind(&client::impl::__ping,this,lib::placeholders::_1)); + LOG("On handshake,sid:"<expires_from_now(milliseconds(60000), ec); + m_connection_timer->async_wait(lib::bind(&client::impl::__timeout_connection,this,lib::placeholders::_1)); + + if(m_open_listener)m_open_listener(); + } + + void client::impl::on_close(connection_hdl con) + { + LOG("Client Disconnected." << std::endl); + m_connected = false; + lib::error_code ec; + close::status::value code = close::status::normal; + client_type::connection_ptr conn_ptr = m_client.get_con_from_hdl(con, ec); + if (ec) { + LOG("OnClose get conn failed"<get_local_close_code(); + } + + m_con.reset(); + this->clear_timers(); + if(m_close_listener) + { + close_reason reason; + if(code == close::status::normal) + { + reason = close_reason_normal; + } + else + { + reason = close_reason_drop; + } + m_close_listener(reason); + } + } + + void client::impl::on_message(connection_hdl con, client_type::message_ptr msg) + { + if (m_ping_timeout_timer) { + boost::system::error_code ec; + m_ping_timeout_timer->expires_from_now(milliseconds(m_ping_timeout),ec); + } + // Parse the incoming message according to socket.IO rules + m_packet_mgr.put_payload(msg->get_payload()); + } + + void client::impl::on_pong_timeout() + { + LOG("Pong timeout"<on_connected(); + break; + } + case packet::type_disconnect: + { + LOG("Received Message type (Disconnect)"<get_flag() == message::flag_array) + { + const array_message* array_ptr = static_cast(ptr.get()); + if(array_ptr->get_vector().size() >= 1&&array_ptr->get_vector()[0]->get_flag() == message::flag_string) + { + const string_message* name_ptr = static_cast(array_ptr->get_vector()[0].get()); + message::ptr value_ptr; + if(array_ptr->get_vector().size()>1) + { + value_ptr = array_ptr->get_vector()[1]; + } + this->on_socketio_event(p.get_pack_id(),name_ptr->get_string(), value_ptr); + } + } + + break; + } + // Ack + case packet::type_ack: + case packet::type_binary_ack: + { + LOG("Received Message type (ACK)"<get_flag() == message::flag_array) + { + const array_message* array_ptr = static_cast(ptr.get()); + if(array_ptr->get_vector().size() >= 1&&array_ptr->get_vector()[0]->get_flag() == message::flag_string) + { + message::ptr value_ptr; + if(array_ptr->get_vector().size()>1) + { + value_ptr = array_ptr->get_vector()[1]; + } + this->on_socketio_ack(p.get_pack_id(), value_ptr); + break; + } + } + this->on_socketio_ack(p.get_pack_id(),ptr); + break; + } + // Error + case packet::type_error: + { + LOG("Received Message type (ERROR)"<on_socketio_error(p.get_message()); + break; + } + default: + break; + } + break; + } + case packet::frame_open: + this->on_handshake(p.get_message()); + break; + case packet::frame_close: + //FIXME how to deal? + break; + case packet::frame_pong: + this->on_pong(); + break; + + default: + break; + } + } + + void client::impl::on_encode(bool isBinary,shared_ptr const& payload) + { + LOG("encoded payload length:"<length()< const& ack) + { + message::ptr msg_ptr = make_message(name, message); + packet p(m_nsp, msg_ptr,s_global_event_id); + m_acks[s_global_event_id++] = ack; + m_packet_mgr.encode(p); + } + + void client::impl::emit(std::string const& name, message::ptr const& args) + { + message::ptr msg_ptr = make_message(name, args); + packet p(m_nsp, msg_ptr); + m_packet_mgr.encode(p); + } + + void client::impl::emit(std::string const& name, message::ptr const& args, std::function const& ack) + { + message::ptr msg_ptr = make_message(name, args); + packet p(m_nsp, msg_ptr,s_global_event_id); + m_acks[s_global_event_id++] = ack; + m_packet_mgr.encode(p); + } + + void client::impl::emit(std::string const& name, std::shared_ptr const& binary_ptr) + { + message::ptr msg_ptr = make_message(name, binary_ptr); + packet p(m_nsp, msg_ptr); + m_packet_mgr.encode(p); + } + + void client::impl::emit(std::string const& name, std::shared_ptr const& binary_ptr, std::function const& ack) + { + message::ptr msg_ptr = make_message(name, binary_ptr); + packet p(m_nsp, msg_ptr,s_global_event_id); + m_acks[s_global_event_id++] = ack; + m_packet_mgr.encode(p); + } + + void client::impl::__ping(const boost::system::error_code& ec) + { + if(ec || m_con.expired()) + { + return; + } + std::string ping_msg; + packet p(packet::frame_ping); + m_packet_mgr.encode(p, + [&](bool isBin,shared_ptr payload) + { + lib::error_code ec; + this->m_client.send(this->m_con, *payload, frame::opcode::text, ec); + }); + if(m_ping_timer) + { + boost::system::error_code e_code; + m_ping_timer->expires_from_now(milliseconds(m_ping_interval), e_code); + m_ping_timer->async_wait(lib::bind(&client::impl::__ping,this,lib::placeholders::_1)); + } + if(!m_ping_timeout_timer) + { + m_ping_timeout_timer.reset(new boost::asio::deadline_timer(m_client.get_io_service())); + boost::system::error_code timeout_ec; + m_ping_timeout_timer->expires_from_now(milliseconds(m_ping_timeout), timeout_ec); + m_ping_timeout_timer->async_wait(lib::bind(&client::impl::__timeout_pong, this,lib::placeholders::_1)); + } + } + + void client::impl::__timeout_pong(const boost::system::error_code &ec) + { + if(ec) + { + return; + } + this->on_pong_timeout(); + } + + void client::impl::__timeout_connection(const boost::system::error_code &ec) + { + if(ec) + { + return; + } + m_connection_timer.reset(); + LOG("Connection timeout"<__close(close::status::policy_violation,"Connection timeout"); + } + + void client::impl::__close(close::status::value const& code,std::string const& reason) + { + LOG("Close by reason:"< payload) + { + lib::error_code ec; + this->m_client.send(this->m_con, *payload, frame::opcode::text, ec); + }); + lib::error_code ec; + m_client.close(m_con, code, reason, ec); + } + } + + void client::impl::close() + { + m_client.get_io_service().dispatch(lib::bind(&client::impl::__close, this,close::status::normal,"End by user")); + } + + void client::impl::sync_close() + { + m_client.get_io_service().dispatch(lib::bind(&client::impl::__close, this,close::status::normal,"End by user")); + if(m_network_thread) + { + m_network_thread->join(); + m_network_thread.reset(); + } + } + + + void client::impl::__send(std::shared_ptr const& payload_ptr,frame::opcode::value opcode) + { + if(connected()) + { + //delay the ping, since we already have message to send. + boost::system::error_code timeout_ec; + m_ping_timer->expires_from_now(milliseconds(m_ping_interval),timeout_ec); + while(m_message_queue.size()>0) + { + message_queue_element element = m_message_queue.front(); + m_message_queue.pop(); + m_client.send(m_con,*(element.payload_ptr),element.opcode); + } + m_client.send(m_con,*payload_ptr,opcode); + } + else + { + message_queue_element element = (message_queue_element){.opcode = opcode,.payload_ptr = payload_ptr}; + m_message_queue.push(element); + } + } + + void client::impl::__ack(int msgId,string const& name,message::ptr const& ack_message) + { + packet pack(m_nsp,make_message(name, ack_message),msgId,true); + m_packet_mgr.encode(pack); + } + + void client::impl::__connect(const std::string& uri) + { + + do{ + websocketpp::uri uo(uri); + std::ostringstream ss; + if (m_sid.size()==0) { + ss<<"ws://"<cancel(ec); + m_ping_timeout_timer.reset(); + } + if(m_ping_timer) + { + m_ping_timer->cancel(ec); + m_ping_timer.reset(); + } + if(m_connection_timer) + { + m_connection_timer->cancel(ec); + m_connection_timer.reset(); + } + } + + void client::impl::reset_states() + { + m_connected = false; + m_acks.clear(); + m_client.reset(); + m_sid.clear(); + m_packet_mgr.reset(); + //clear all queued messages. + while(!m_message_queue.empty()) + { + m_message_queue.pop(); + } + } + + void client::impl::connect(const std::string& uri) + { + if(m_network_thread) + { + m_network_thread->join(); + } + this->reset_states(); + m_client.get_io_service().dispatch(lib::bind(&client::impl::__connect,this,uri)); + m_network_thread.reset(new std::thread(lib::bind(&client::impl::run_loop,this)));//uri lifecycle? + } + + void client::impl::reconnect(const std::string& uri) + { + if(m_network_thread) + { + m_network_thread->join(); + } + + m_client.get_io_service().dispatch(lib::bind(&client::impl::__connect,this,uri)); + m_network_thread.reset(new std::thread(lib::bind(&client::impl::run_loop,this)));//uri + } + + void client::impl::run_loop() + { + + m_client.run(); + m_client.reset(); + m_client.get_alog().write(websocketpp::log::alevel::devel, + "run loop end"); + + } + + // This is where you'd add in behavior to handle events. + // By default, nothing is done with the endpoint or ID params. + void client::impl::on_socketio_event(int msgId,const std::string& name, message::ptr const& message) + { + event_listener *functor_ptr = &(m_default_event_listener); + auto it = m_event_binding.find(name); + if(it!=m_event_binding.end()) + { + functor_ptr = &(it->second); + } + bool needAck = msgId >= 0; + + message::ptr ack_message; + if (*functor_ptr) { + (*functor_ptr)(name, message,needAck, ack_message); + } + if(needAck) + { + this->__ack(msgId, name, ack_message); + } + } + + // This is where you'd add in behavior to handle ack + void client::impl::on_socketio_ack(int msgId, message::ptr const& message) + { + auto it = m_acks.find(msgId); + if(it!=m_acks.end()) + { + (it->second)(message); + m_acks.erase(it); + } + } + + // This is where you'd add in behavior to handle errors + void client::impl::on_socketio_error(message::ptr const& err_message) + { + if(m_error_listener)m_error_listener(err_message); + } +} \ No newline at end of file diff --git a/src/sio_client.h b/src/sio_client.h new file mode 100755 index 0000000..e98e799 --- /dev/null +++ b/src/sio_client.h @@ -0,0 +1,93 @@ +// +// sio_client.h +// +// Created by Melo Yao on 3/25/15. +// + +#ifndef __SIO_CLIENT__H__ +#define __SIO_CLIENT__H__ +#include +#include +#include "sio_message.h" + +namespace sio { + + class client { + public: + enum close_reason + { + close_reason_normal, + close_reason_drop + }; + + typedef std::function con_listener; + + typedef std::function close_listener; + + typedef std::function event_listener; + + typedef std::function error_listener; + + + client(); + ~client(); + + //set listeners and event bindings. + void set_open_listener(con_listener const& l); + + void set_fail_listener(con_listener const& l); + + void set_connect_listener(con_listener const& l); + + void set_close_listener(close_listener const& l); + + void set_default_event_listener(event_listener const& l); + + void set_error_listener(error_listener const& l); //socket io errors + + void bind_event(std::string const& event_name,event_listener const& func); + + void unbind_event(std::string const& event_name); + + void clear_socketio_listeners(); + + void clear_con_listeners(); + + void clear_event_bindings(); + // Client Functions - such as send, etc. + + //event emit functions, for plain message,json and binary + void emit(std::string const& name, std::string const& message); + + void emit(std::string const& name, std::string const& message, std::function const& ack); + + void emit(std::string const& name, message::ptr const& args); + + void emit(std::string const& name, message::ptr const& args, std::function const& ack); + + void emit(std::string const& name, std::shared_ptr const& binary_ptr); + + void emit(std::string const& name, std::shared_ptr const& binary_ptr, std::function const& ack); + + + void connect(const std::string& uri); + + void reconnect(const std::string& uri); + + // Closes the connection + void close(); + + void sync_close(); + + std::string const& get_sessionid() const; + + bool connected() const; + + private: + class impl; + impl* m_impl; + }; +} + + +#endif // __SIO_CLIENT__H__ \ No newline at end of file diff --git a/src/sio_message.h b/src/sio_message.h new file mode 100755 index 0000000..211725e --- /dev/null +++ b/src/sio_message.h @@ -0,0 +1,248 @@ +// +// sio_message.h +// +// Created by Melo Yao on 3/25/15. +// + +#ifndef __SIO_MESSAGE_H__ +#define __SIO_MESSAGE_H__ +#include +#include +#include +#include +#include +namespace sio +{ + using namespace std; + + class message + { + public: + enum flag + { + flag_integer, + flag_double, + flag_string, + flag_binary, + flag_array, + flag_object + }; + + flag get_flag() const + { + return _flag; + } + + typedef shared_ptr ptr; + + virtual int64_t get_int() const + { + assert(false); + } + + virtual double get_double() const + { + assert(false); + } + + virtual string const& get_string() const + { + assert(false); + } + + virtual shared_ptr const& get_binary() const + { + assert(false); + } + + virtual const vector& get_vector() const + { + assert(false); + } + + virtual vector& get_vector() + { + assert(false); + } + + virtual const map& get_map() const + { + assert(false); + } + + virtual map& get_map() + { + assert(false); + } + private: + flag _flag; + + protected: + message(flag f):_flag(f){} + }; + + + class int_message : public message + { + int64_t _v; + protected: + int_message(int64_t v) + :message(flag_integer),_v(v) + { + } + + public: + static message::ptr create(int64_t v) + { + return ptr(new int_message(v)); + } + + int64_t get_int() const + { + return _v; + } + }; + + class double_message : public message + { + double _v; + double_message(double v) + :message(flag_double),_v(v) + { + } + + public: + static message::ptr create(double v) + { + return ptr(new double_message(v)); + } + + double get_double() const + { + return _v; + } + }; + + class string_message : public message + { + string _v; + string_message(string const& v) + :message(flag_string),_v(v) + { + } + public: + static message::ptr create(string const& v) + { + return ptr(new string_message(v)); + } + + string const& get_string() const + { + return _v; + } + }; + + class binary_message : public message + { + shared_ptr _v; + binary_message(shared_ptr const& v) + :message(flag_binary),_v(v) + { + } + public: + static message::ptr create(shared_ptr const& v) + { + return ptr(new binary_message(v)); + } + + shared_ptr const& get_binary() const + { + return _v; + } + }; + + class array_message : public message + { + vector _v; + array_message():message(flag_array) + { + } + + public: + static message::ptr create() + { + return ptr(new array_message()); + } + + vector& get_vector() + { + return _v; + } + + const vector& get_vector() const + { + return _v; + } + }; + + class object_message : public message + { + map _v; + object_message() : message(flag_object) + { + } + public: + static message::ptr create() + { + return ptr(new object_message()); + } + + map& get_map() + { + return _v; + } + + const map& get_map() const + { + return _v; + } + }; + + inline + message::ptr make_message(string const& event_name, message::ptr const& event_args) + { + message::ptr msg_ptr = array_message::create(); + array_message* ptr = static_cast(msg_ptr.get()); + ptr->get_vector().push_back(string_message::create(event_name)); + if(event_args) + { + ptr->get_vector().push_back(event_args); + } + return msg_ptr; + } + + inline + message::ptr make_message(string const& event_name, string const& single_message) + { + message::ptr msg_ptr = array_message::create(); + array_message* ptr = static_cast(msg_ptr.get()); + ptr->get_vector().push_back(string_message::create(event_name)); + ptr->get_vector().push_back(string_message::create(single_message)); + return msg_ptr; + } + + inline + message::ptr make_message(string const& event_name, shared_ptr const& single_binary) + { + message::ptr msg_ptr = array_message::create(); + array_message* ptr = static_cast(msg_ptr.get()); + ptr->get_vector().push_back(string_message::create(event_name)); + if(single_binary) + { + ptr->get_vector().push_back(binary_message::create(single_binary)); + } + return msg_ptr; + } +} + +#endif diff --git a/src/sio_packet.cpp b/src/sio_packet.cpp new file mode 100755 index 0000000..6eaaf22 --- /dev/null +++ b/src/sio_packet.cpp @@ -0,0 +1,459 @@ +// +// sio_packet.cpp +// +// Created by Melo Yao on 3/22/15. +// + +#include "sio_packet.h" +#include +#include +#include +namespace sio +{ + using namespace rapidjson; + using namespace std; + void accept_message(message const& msg,Value& val, Document& doc,vector >& buffers); + + void accept_int_message(int_message const& msg, Value& val) + { + val.SetInt64(msg.get_int()); + } + + void accept_double_message(double_message const& msg, Value& val) + { + val.SetDouble(msg.get_double()); + } + + void accept_string_message(string_message const& msg, Value& val) + { + val.SetString(msg.get_string().data(),(SizeType) msg.get_string().length()); + } + + + void accept_binary_message(binary_message const& msg,Value& val,Document& doc,vector >& buffers) + { + val.SetObject(); + Value boolVal; + boolVal.SetBool(true); + val.AddMember("_placeholder", boolVal, doc.GetAllocator()); + Value numVal; + numVal.SetInt((int)buffers.size()); + val.AddMember("num", numVal, doc.GetAllocator()); + //FIXME can not avoid binary copy here. + shared_ptr write_buffer = make_shared(); + write_buffer->reserve(msg.get_binary()->size()+1); + char frame_char = packet::frame_message; + write_buffer->append(&frame_char,1); + write_buffer->append(*(msg.get_binary())); + buffers.push_back(write_buffer); + } + + void accept_array_message(array_message const& msg,Value& val,Document& doc,vector >& buffers) + { + val.SetArray(); + for (vector::const_iterator it = msg.get_vector().begin(); it!=msg.get_vector().end(); ++it) { + Value child; + accept_message(*(*it), child, doc,buffers); + val.PushBack(child, doc.GetAllocator()); + } + } + + void accept_object_message(object_message const& msg,Value& val,Document& doc,vector >& buffers) + { + val.SetObject(); + for (map::const_iterator it = msg.get_map().begin(); it!= msg.get_map().end(); ++it) { + Value nameVal; + nameVal.SetString(it->first.data(), (SizeType)it->first.length(), doc.GetAllocator()); + Value valueVal; + accept_message(*(it->second), valueVal, doc,buffers); + val.AddMember(nameVal, valueVal, doc.GetAllocator()); + } + } + + void accept_message(message const& msg,Value& val, Document& doc,vector >& buffers) + { + const message* msg_ptr = &msg; + switch(msg.get_flag()) + { + case message::flag_integer: + { + accept_int_message(*(static_cast(msg_ptr)), val); + break; + } + case message::flag_double: + { + accept_double_message(*(static_cast(msg_ptr)), val); + break; + } + case message::flag_string: + { + accept_string_message(*(static_cast(msg_ptr)), val); + break; + } + case message::flag_binary: + { + accept_binary_message(*(static_cast(msg_ptr)), val,doc,buffers); + break; + } + case message::flag_array: + { + accept_array_message(*(static_cast(msg_ptr)), val,doc,buffers); + break; + } + case message::flag_object: + { + accept_object_message(*(static_cast(msg_ptr)), val,doc,buffers); + break; + } + default: + break; + } + } + + message::ptr from_json(Value const& value, vector > const& buffers) + { + if(value.IsInt64()) + { + return int_message::create(value.GetInt64()); + } + else if(value.IsDouble()) + { + return double_message::create(value.GetDouble()); + } + else if(value.IsString()) + { + string str(value.GetString(),value.GetStringLength()); + return string_message::create(str); + } + else if(value.IsArray()) + { + message::ptr ptr = array_message::create(); + for (SizeType i = 0; i< value.Size(); ++i) { + static_cast(ptr.get())->get_vector().push_back(from_json(value[i],buffers)); + } + return ptr; + } + else if(value.IsObject()) + { + //binary placeholder + if (value.HasMember("_placeholder") && value["_placeholder"].GetBool()) { + + int num = value["num"].GetInt(); + if(num > 0 && num < buffers.size()) + { + return binary_message::create(buffers[num]); + } + return message::ptr(); + } + //real object message. + message::ptr ptr = object_message::create(); + for (auto it = value.MemberBegin();it!=value.MemberEnd();++it) + { + if(it->name.IsString()) + { + string key(it->name.GetString(),it->name.GetStringLength()); + static_cast(ptr.get())->get_map()[key] = from_json(it->value,buffers); + } + } + return ptr; + } + return message::ptr(); + } + + packet::packet(string const& nsp,message::ptr const& msg,int pack_id, bool isAck): + _frame(frame_message), + _type((isAck?type_ack : type_event) | type_undetermined), + _nsp(nsp), + _message(msg), + _pack_id(pack_id), + _pending_buffers(0) + { + assert((!isAck + || (isAck&&pack_id>=0))); + } + + packet::packet(type type,message::ptr const& msg): + _frame(frame_message), + _type(type), + _message(msg), + _pack_id(-1), + _pending_buffers(0) + { + + } + + packet::packet(packet::frame_type frame): + _frame(frame), + _type(type_undetermined), + _pack_id(-1), + _pending_buffers(0) + { + + } + + packet::packet(): + _type(type_undetermined), + _pack_id(-1), + _pending_buffers(0) + { + + } + + + bool packet::is_binary_message(string const& payload_ptr) + { + return payload_ptr.size()>0 && payload_ptr[0] == frame_message; + } + + bool packet::is_text_message(string const& payload_ptr) + { + return payload_ptr.size()>0 && payload_ptr[0] == (frame_message + '0'); + } + + bool packet::is_message(string const& payload_ptr) + { + return is_binary_message(payload_ptr) || is_text_message(payload_ptr); + } + + bool packet::parse_buffer(const string &buf_payload) + { + if (_pending_buffers > 0) { + assert(is_binary_message(buf_payload));//this is ensured by outside. + _buffers.push_back(std::make_shared(buf_payload.data()+1,buf_payload.size()-1)); + _pending_buffers--; + if (_pending_buffers == 0) { + + Document doc; + doc.Parse<0>(_buffers.front()->data()); + _buffers.erase(_buffers.begin()); + _message = from_json(doc, _buffers); + _buffers.clear(); + return false; + } + return true; + } + return false; + } + + bool packet::parse(const string& payload_ptr) + { + assert(!is_binary_message(payload_ptr)); //this is ensured by outside + _frame = (packet::frame_type) (payload_ptr[0] - '0'); + + size_t pos = 1; + if (_frame == frame_message) { + _type = (packet::type)(payload_ptr[pos] - '0'); + if(_type < type_min || _type > type_max) + { + return false; + } + pos++; + if (_type == type_binary_event || _type == type_binary_ack) { + size_t score_pos = payload_ptr.find('-'); + _pending_buffers = stoi(payload_ptr.substr(pos,score_pos)); + pos = score_pos+1; + } + } + + size_t comma_pos = payload_ptr.find_first_of("{[,"); + if( comma_pos!= string::npos && payload_ptr[comma_pos] == ',') + { + _nsp = payload_ptr.substr(pos,comma_pos - pos); + pos = comma_pos+1; + } + if (pos >= payload_ptr.length()) { + //message only have type, maybe with namespace. + return false; + } + size_t data_pos = payload_ptr.find_first_of("[{", pos, 2); + if (data_pos == string::npos) { + //we have pack id, no message. + _pack_id = stoi(payload_ptr.substr(pos)); + return false; + } + else if(data_pos>pos) + { + //we have pack id and messages. + _pack_id = stoi(payload_ptr.substr(pos,data_pos - pos)); + } + if (_frame == frame_message && (_type == type_binary_event || _type == type_binary_ack)) { + //parse later when all buffers are arrived. + _buffers.push_back(make_shared(payload_ptr.data() + data_pos, payload_ptr.length() - data_pos)); + return true; + } + else + { + Document doc; + doc.Parse<0>(payload_ptr.substr(data_pos).data()); + _message = from_json(doc, vector >()); + return false; + } + + } + + bool packet::accept(string& payload_ptr, vector >&buffers) + { + char frame_char = _frame+'0'; + payload_ptr.append(&frame_char,1); + if (_frame!=frame_message) { + return false; + } + bool hasMessage = false; + Document doc; + if (_message) { + accept_message(*_message, doc, doc, buffers); + hasMessage = true; + } + bool hasBinary = _buffers.size()>0; + _type = _type&(~type_undetermined); + if(_type == type_event) + { + _type = hasBinary?type_binary_event:type_event; + } + else if(_type == type_ack) + { + _type = hasBinary? type_binary_ack : type_ack; + } + ostringstream ss; + ss.precision(8); + ss<<_type; + if (hasBinary) { + ss<<_buffers.size()<<"-"; + } + if(_nsp.size()>0 && _nsp!="/") + { + ss<<_nsp; + if (hasMessage || _pack_id>=0) { + ss<<","; + } + } + + if(_pack_id>=0) + { + ss<<_pack_id; + } + if (hasMessage) { + StreamWriter writer(ss); + doc.Accept(writer); + } + payload_ptr.append(ss.str()); + return hasBinary; + } + + packet::frame_type packet::get_frame() const + { + return _frame; + } + + packet::type packet::get_type() const + { + assert((_type & type_undetermined) == 0); + return (type)_type; + } + + string const& packet::get_nsp() const + { + return _nsp; + } + + message::ptr const& packet::get_message() const + { + return _message; + } + + unsigned packet::get_pack_id() const + { + return _pack_id; + } + + + void packet_manager::set_decode_callback(function const& decode_callback) + { + m_decode_callback = decode_callback; + } + + void packet_manager::set_encode_callback(function const&)> const& encode_callback) + { + m_encode_callback = encode_callback; + } + + void packet_manager::reset() + { + m_partial_packet.reset(); + } + + void packet_manager::encode(packet& pack,encode_callback_function const& override_encode_callback) const + { + shared_ptr ptr = make_shared(); + vector > buffers; + const encode_callback_function *cb_ptr = &m_encode_callback; + if(override_encode_callback) + { + cb_ptr = &override_encode_callback; + } + if(pack.accept(*ptr,buffers)) + { + if((*cb_ptr)) + { + (*cb_ptr)(false,ptr); + } + for(auto it = buffers.begin();it!=buffers.end();++it) + { + if((*cb_ptr)) + { + (*cb_ptr)(true,*it); + } + } + } + else + { + if((*cb_ptr)) + { + (*cb_ptr)(false,ptr); + } + } + } + + void packet_manager::put_payload(string const& payload) + { + unique_ptr p; + do + { + if(packet::is_text_message(payload)) + { + p.reset(new packet()); + if(p->parse(payload)) + { + m_partial_packet = std::move(p); + } + else + { + break; + } + } + else if(packet::is_binary_message(payload)) + { + if(m_partial_packet) + { + if(!m_partial_packet->parse_buffer(payload)) + { + p = std::move(m_partial_packet); + break; + } + } + } + else + { + p.reset(new packet()); + p->parse(payload); + break; + } + return; + }while(0); + + if(m_decode_callback) + { + m_decode_callback(*p); + } + } +} \ No newline at end of file diff --git a/src/sio_packet.h b/src/sio_packet.h new file mode 100755 index 0000000..d6b5652 --- /dev/null +++ b/src/sio_packet.h @@ -0,0 +1,106 @@ +// +// sio_packet.h +// +// Created by Melo Yao on 3/19/15. +// + +#ifndef __SIO_PACKET_H__ +#define __SIO_PACKET_H__ +#include +#include "sio_message.h" +#include + +namespace sio +{ + using namespace std; + + class packet + { + public: + enum frame_type + { + frame_open = 0, + frame_close = 1, + frame_ping = 2, + frame_pong = 3, + frame_message = 4, + frame_upgrade = 5, + frame_noop = 6 + }; + + enum type + { + type_min = 0, + type_connect = 0, + type_disconnect = 1, + type_event = 2, + type_ack = 3, + type_error = 4, + type_binary_event = 5, + type_binary_ack = 6, + type_max = 6, + type_undetermined = 0x10 //undetermined mask bit + }; + private: + frame_type _frame; + int _type; + string _nsp; + int _pack_id; + message::ptr _message; + unsigned _pending_buffers; + vector > _buffers; + public: + packet(string const& nsp,message::ptr const& msg,int pack_id = -1,bool isAck = false);//message type constructor. + + packet(frame_type frame); + + packet(type type,message::ptr const& msg = message::ptr());//other message types constructor. + //empty constructor for parse. + packet(); + + frame_type get_frame() const; + + type get_type() const; + + bool parse(string const& payload_ptr);//return true if need to parse buffer. + + bool parse_buffer(string const& buf_payload); + + bool accept(string& payload_ptr, vector >&buffers); //return true if has binary buffers. + + string const& get_nsp() const; + + message::ptr const& get_message() const; + + unsigned get_pack_id() const; + + static bool is_message(string const& payload_ptr); + static bool is_text_message(string const& payload_ptr); + static bool is_binary_message(string const& payload_ptr); + }; + + class packet_manager + { + public: + typedef function const&)> encode_callback_function; + typedef function decode_callback_function; + + void set_decode_callback(decode_callback_function const& decode_callback); + + void set_encode_callback(encode_callback_function const& encode_callback); + + void encode(packet& pack,encode_callback_function const& override_encode_callback = encode_callback_function()) const; + + void put_payload(string const& payload); + + void reset(); + + private: + decode_callback_function m_decode_callback; + + encode_callback_function m_encode_callback; + + std::unique_ptr m_partial_packet; + }; +} +#endif diff --git a/test/sio_test_sample.cpp b/test/sio_test_sample.cpp new file mode 100755 index 0000000..9141b67 --- /dev/null +++ b/test/sio_test_sample.cpp @@ -0,0 +1,121 @@ +// +// sio_test_sample.cpp +// +// Created by Melo Yao on 3/24/15. +// + +#include "sio_client.h" + +#include +#include +#include + +using namespace sio; + +class connection_listener +{ + sio::client &handler; + +public: + + connection_listener(sio::client& h): + handler(h) + { + } + + + void on_connected() + { + handler.emit("test_text", "test payload"); + std::shared_ptr binary = std::make_shared(); + char test[100]; + memset(test, 0, sizeof(char)*100); + binary->append(test, 100); + handler.emit("test_binary", binary); + + message::ptr obj = object_message::create(); + message::ptr b_p = binary_message::create(binary); + + obj->get_map()["bin"] = b_p; + obj->get_map()["path"] = string_message::create("./test.bin"); + handler.emit("test ack", obj,[](message::ptr const& ack_data){ + std::cout<<"Listener1:test ack:"<get_flag() == message::flag_string) { + std::cout<(ack_data.get())->get_string()< binary(new std::string()); + char test[100]; + memset(test, 0, sizeof(char)*100); + binary->append(test, 100); + message::ptr obj = object_message::create(); + message::ptr b_p = binary_message::create(binary); + object_message* o = static_cast(obj.get()); + o->get_map()["bin"] = b_p; + o->get_map()["path"] = string_message::create("./test.bin"); + handler.emit("test ack", obj,[](message::ptr const& ack_data){ + std::cout<<"Listener2:test ack"<get_flag() == message::flag_string) { + std::cout<(ack_data.get())->get_string()<