From 29e82a64cb9a88e2d4f52990455c2fb89ec1624d Mon Sep 17 00:00:00 2001 From: "melo.yao" Date: Wed, 15 Apr 2015 22:19:16 +0800 Subject: [PATCH] Code style enhancement and code re-org --- src/internal/sio_client_impl.cpp | 654 +++++++++++++++---------------- src/internal/sio_client_impl.h | 44 +-- src/internal/sio_packet.h | 4 +- src/sio_client.h | 4 +- 4 files changed, 351 insertions(+), 355 deletions(-) diff --git a/src/internal/sio_client_impl.cpp b/src/internal/sio_client_impl.cpp index 028fcad..0743ad9 100644 --- a/src/internal/sio_client_impl.cpp +++ b/src/internal/sio_client_impl.cpp @@ -22,6 +22,7 @@ using boost::posix_time::milliseconds; namespace sio { +/*************************public:*************************/ client_impl::client_impl() : m_con_state(con_closed), m_ping_interval(0), @@ -59,52 +60,285 @@ namespace sio sync_close(); } - void client_impl::on_handshake(message::ptr const& message) + void client_impl::connect(const std::string& uri) { - if(message && message->get_flag() == message::flag_object) + if(m_reconn_timer) { - const object_message* obj_ptr =static_cast(message.get()); - const map* values = &(obj_ptr->get_map()); - auto it = values->find("sid"); - if (it!= values->end()) { - m_sid = std::static_pointer_cast(it->second)->get_string(); - } - else - { - goto failed; - } - it = values->find("pingInterval"); - if (it!= values->end()&&it->second->get_flag() == message::flag_integer) { - m_ping_interval = (unsigned)std::static_pointer_cast(it->second)->get_int(); - } - else - { - m_ping_interval = 25000; - } - it = values->find("pingTimeout"); - - if (it!=values->end()&&it->second->get_flag() == message::flag_integer) { - m_ping_timeout = (unsigned) std::static_pointer_cast(it->second)->get_int(); - } - else - { - m_ping_timeout = 60000; - } - - m_ping_timer.reset(new boost::asio::deadline_timer(m_client.get_io_service())); - boost::system::error_code ec; - m_ping_timer->expires_from_now(milliseconds(m_ping_interval), ec); - if(ec)LOG("ec:"<async_wait(lib::bind(&client_impl::__ping,this,lib::placeholders::_1)); - LOG("On handshake,sid:"<cancel(); + m_reconn_timer.reset(); + } + if(m_network_thread) + { + if(m_con_state == con_closing||m_con_state == con_closed) + { + //if client is closing, join to wait. + //if client is closed, still need to join, + //but in closed case,join will return immediately. + m_network_thread->join(); + m_network_thread.reset();//defensive + } + else + { + //if we are connected, do nothing. + return; + } + } + m_con_state = con_opening; + m_base_url = uri; + m_reconn_made = 0; + this->reset_states(); + m_client.get_io_service().dispatch(lib::bind(&client_impl::connect_impl,this,uri)); + m_network_thread.reset(new std::thread(lib::bind(&client_impl::run_loop,this)));//uri lifecycle? + + } + + socket::ptr const& client_impl::socket(std::string const& nsp) + { + std::lock_guard guard(m_socket_mutex); + std::string aux; + if(nsp == "") + { + aux = "/"; + } + else if( nsp[0] != '/') + { + aux.append("/",1); + aux.append(nsp); + } + else + { + aux = nsp; + } + + auto it = m_sockets.find(aux); + if(it!= m_sockets.end()) + { + return it->second; + } + else + { + std::pair p(aux,std::shared_ptr(new sio::socket(this,aux))); + return (m_sockets.insert(p).first)->second; + } + } + + void client_impl::close() + { + m_con_state = con_closing; + this->sockets_invoke_void(&sio::socket::close); + m_client.get_io_service().dispatch(lib::bind(&client_impl::close_impl, this,close::status::normal,"End by user")); + } + + void client_impl::sync_close() + { + m_con_state = con_closing; + this->sockets_invoke_void(&sio::socket::close); + m_client.get_io_service().dispatch(lib::bind(&client_impl::close_impl, this,close::status::normal,"End by user")); + if(m_network_thread) + { + m_network_thread->join(); + m_network_thread.reset(); + } + } + +/*************************protected:*************************/ + void client_impl::send(packet& p) + { + m_packet_mgr.encode(p); + } + + void client_impl::remove_socket(std::string const& nsp) + { + std::lock_guard guard(m_socket_mutex); + auto it = m_sockets.find(nsp); + if(it!= m_sockets.end()) + { + m_sockets.erase(it); + } + } + + boost::asio::io_service& client_impl::get_io_service() + { + return m_client.get_io_service(); + } + + void client_impl::on_socket_closed(std::string const& nsp) + { + if(m_socket_close_listener)m_socket_close_listener(nsp); + } + + void client_impl::on_socket_opened(std::string const& nsp) + { + if(m_socket_open_listener)m_socket_open_listener(nsp); + } + +/*************************private:*************************/ + void client_impl::run_loop() + { + + m_client.run(); + m_client.reset(); + m_client.get_alog().write(websocketpp::log::alevel::devel, + "run loop end"); + } + + void client_impl::connect_impl(const std::string& uri) + { + do{ + websocketpp::uri uo(uri); + std::ostringstream ss; + if (m_sid.size()==0) { + ss<<"ws://"<cancel(); + m_reconn_timer.reset(); + } + if (m_con.expired()) + { + std::cerr << "Error: No active session" << std::endl; + } + else + { + lib::error_code ec; + m_client.close(m_con, code, reason, ec); + } + } + + void client_impl::send_impl(std::shared_ptr const& payload_ptr,frame::opcode::value opcode) + { + if(m_con_state == con_opened) + { + //delay the ping, since we already have message to send. + boost::system::error_code timeout_ec; + if(m_ping_timer) + { + m_ping_timer->expires_from_now(milliseconds(m_ping_interval),timeout_ec); + m_ping_timer->async_wait(lib::bind(&client_impl::ping,this,lib::placeholders::_1)); + } + lib::error_code ec; + m_client.send(m_con,*payload_ptr,opcode,ec); + if(ec) + { + std::cerr<<"Send failed,reason:"<< ec.message()< payload) + { + lib::error_code ec; + this->m_client.send(this->m_con, *payload, frame::opcode::text, ec); + }); + if(m_ping_timer) + { + boost::system::error_code e_code; + m_ping_timer->expires_from_now(milliseconds(m_ping_interval), e_code); + m_ping_timer->async_wait(lib::bind(&client_impl::ping,this,lib::placeholders::_1)); + } + if(!m_ping_timeout_timer) + { + m_ping_timeout_timer.reset(new boost::asio::deadline_timer(m_client.get_io_service())); + boost::system::error_code timeout_ec; + m_ping_timeout_timer->expires_from_now(milliseconds(m_ping_timeout), timeout_ec); + m_ping_timeout_timer->async_wait(lib::bind(&client_impl::timeout_pong, this,lib::placeholders::_1)); + } + } + + void client_impl::timeout_pong(const boost::system::error_code &ec) + { + if(ec) + { + return; + } + LOG("Pong timeout"<reset_states(); + LOG("Reconnecting..."<(m_reconn_delay * pow(1.5,m_reconn_made),m_reconn_delay_max); + } + + socket::ptr client_impl::get_socket_locked(std::string const& nsp) + { + std::lock_guard guard(m_socket_mutex); + auto it = m_sockets.find(nsp); + if(it != m_sockets.end()) + { + return it->second; + } + else + { + return socket::ptr(); + } + } + + void client_impl::sockets_invoke_void(void (sio::socket::*fn)(void)) + { + std::map socks; + { + std::lock_guard guard(m_socket_mutex); + socks.insert(m_sockets.begin(),m_sockets.end()); + } + for (auto it = socks.begin(); it!=socks.end(); ++it) { + ((*(it->second)).*fn)(); } - failed: - //just close it. - m_client.get_io_service().dispatch(lib::bind(&client_impl::__close, this,close::status::policy_violation,"Handshake error")); } - // Websocket++ client client void client_impl::on_fail(connection_hdl con) { m_con.reset(); @@ -119,7 +353,7 @@ namespace sio m_reconn_timer.reset(new boost::asio::deadline_timer(m_client.get_io_service())); boost::system::error_code ec; m_reconn_timer->expires_from_now(milliseconds(delay), ec); - m_reconn_timer->async_wait(lib::bind(&client_impl::reconnect,this,lib::placeholders::_1)); + m_reconn_timer->async_wait(lib::bind(&client_impl::timeout_reconnect,this,lib::placeholders::_1)); } else { @@ -171,7 +405,7 @@ namespace sio m_reconn_timer.reset(new boost::asio::deadline_timer(m_client.get_io_service())); boost::system::error_code ec; m_reconn_timer->expires_from_now(milliseconds(delay), ec); - m_reconn_timer->async_wait(lib::bind(&client_impl::reconnect,this,lib::placeholders::_1)); + m_reconn_timer->async_wait(lib::bind(&client_impl::timeout_reconnect,this,lib::placeholders::_1)); return; } reason = client::close_reason_drop; @@ -188,12 +422,57 @@ namespace sio if (m_ping_timeout_timer) { boost::system::error_code ec; m_ping_timeout_timer->expires_from_now(milliseconds(m_ping_timeout),ec); - m_ping_timeout_timer->async_wait(lib::bind(&client_impl::__timeout_pong, this,lib::placeholders::_1)); + m_ping_timeout_timer->async_wait(lib::bind(&client_impl::timeout_pong, this,lib::placeholders::_1)); } // Parse the incoming message according to socket.IO rules m_packet_mgr.put_payload(msg->get_payload()); } + void client_impl::on_handshake(message::ptr const& message) + { + if(message && message->get_flag() == message::flag_object) + { + const object_message* obj_ptr =static_cast(message.get()); + const map* values = &(obj_ptr->get_map()); + auto it = values->find("sid"); + if (it!= values->end()) { + m_sid = std::static_pointer_cast(it->second)->get_string(); + } + else + { + goto failed; + } + it = values->find("pingInterval"); + if (it!= values->end()&&it->second->get_flag() == message::flag_integer) { + m_ping_interval = (unsigned)std::static_pointer_cast(it->second)->get_int(); + } + else + { + m_ping_interval = 25000; + } + it = values->find("pingTimeout"); + + if (it!=values->end()&&it->second->get_flag() == message::flag_integer) { + m_ping_timeout = (unsigned) std::static_pointer_cast(it->second)->get_int(); + } + else + { + m_ping_timeout = 60000; + } + + m_ping_timer.reset(new boost::asio::deadline_timer(m_client.get_io_service())); + boost::system::error_code ec; + m_ping_timer->expires_from_now(milliseconds(m_ping_interval), ec); + if(ec)LOG("ec:"<async_wait(lib::bind(&client_impl::ping,this,lib::placeholders::_1)); + LOG("On handshake,sid:"<__close(close::status::abnormal_close, "End by server"); + this->close_impl(close::status::abnormal_close, "End by server"); break; case packet::frame_pong: this->on_pong(); @@ -238,170 +511,7 @@ namespace sio void client_impl::on_encode(bool isBinary,shared_ptr const& payload) { LOG("encoded payload length:"<length()< payload) - { - lib::error_code ec; - this->m_client.send(this->m_con, *payload, frame::opcode::text, ec); - }); - if(m_ping_timer) - { - boost::system::error_code e_code; - m_ping_timer->expires_from_now(milliseconds(m_ping_interval), e_code); - m_ping_timer->async_wait(lib::bind(&client_impl::__ping,this,lib::placeholders::_1)); - } - if(!m_ping_timeout_timer) - { - m_ping_timeout_timer.reset(new boost::asio::deadline_timer(m_client.get_io_service())); - boost::system::error_code timeout_ec; - m_ping_timeout_timer->expires_from_now(milliseconds(m_ping_timeout), timeout_ec); - m_ping_timeout_timer->async_wait(lib::bind(&client_impl::__timeout_pong, this,lib::placeholders::_1)); - } - } - - void client_impl::__timeout_pong(const boost::system::error_code &ec) - { - if(ec) - { - return; - } - this->on_pong_timeout(); - } - - - void client_impl::__close(close::status::value const& code,std::string const& reason) - { - LOG("Close by reason:"<cancel(); - m_reconn_timer.reset(); - } - if (m_con.expired()) - { - std::cerr << "Error: No active session" << std::endl; - } - else - { - lib::error_code ec; - m_client.close(m_con, code, reason, ec); - } - } - - void client_impl::close() - { - m_con_state = con_closing; - this->sockets_invoke_void(&sio::socket::close); - m_client.get_io_service().dispatch(lib::bind(&client_impl::__close, this,close::status::normal,"End by user")); - } - - void client_impl::sync_close() - { - m_con_state = con_closing; - this->sockets_invoke_void(&sio::socket::close); - m_client.get_io_service().dispatch(lib::bind(&client_impl::__close, this,close::status::normal,"End by user")); - if(m_network_thread) - { - m_network_thread->join(); - m_network_thread.reset(); - } - } - - unsigned client_impl::next_delay() const - { - //no jitter, fixed power root. - return min(m_reconn_delay * pow(1.5,m_reconn_made),m_reconn_delay_max); - } - - void client_impl::send(packet& p) - { - m_packet_mgr.encode(p); - } - - void client_impl::remove_socket(std::string const& nsp) - { - std::lock_guard guard(m_socket_mutex); - auto it = m_sockets.find(nsp); - if(it!= m_sockets.end()) - { - m_sockets.erase(it); - } - } - - boost::asio::io_service& client_impl::get_io_service() - { - return m_client.get_io_service(); - } - - void client_impl::on_socket_closed(std::string const& nsp) - { - if(m_socket_close_listener)m_socket_close_listener(nsp); - } - - void client_impl::on_socket_opened(std::string const& nsp) - { - if(m_socket_open_listener)m_socket_open_listener(nsp); - } - - void client_impl::__send(std::shared_ptr const& payload_ptr,frame::opcode::value opcode) - { - if(m_con_state == con_opened) - { - //delay the ping, since we already have message to send. - boost::system::error_code timeout_ec; - if(m_ping_timer) - { - m_ping_timer->expires_from_now(milliseconds(m_ping_interval),timeout_ec); - m_ping_timer->async_wait(lib::bind(&client_impl::__ping,this,lib::placeholders::_1)); - } - lib::error_code ec; - m_client.send(m_con,*payload_ptr,opcode,ec); - if(ec) - { - std::cerr<<"Send failed,reason:"<< ec.message()<cancel(); - m_reconn_timer.reset(); - } - if(m_network_thread) - { - if(m_con_state == con_closing||m_con_state == con_closed) - { - //if client is closing, join to wait. - //if client is closed, still need to join, - //but in closed case,join will return immediately. - m_network_thread->join(); - m_network_thread.reset();//defensive - } - else - { - //if we are connected, do nothing. - return; - } - } - m_con_state = con_opening; - m_base_url = uri; - m_reconn_made = 0; - this->reset_states(); - m_client.get_io_service().dispatch(lib::bind(&client_impl::__connect,this,uri)); - m_network_thread.reset(new std::thread(lib::bind(&client_impl::run_loop,this)));//uri lifecycle? - - } - - void client_impl::reconnect(boost::system::error_code const& ec) - { - if(ec) - { - return; - } - if(m_con_state == con_closed) - { - m_con_state = con_opening; - m_reconn_made++; - this->reset_states(); - LOG("Reconnecting..."< guard(m_socket_mutex); - std::string aux; - if(nsp == "") - { - aux = "/"; - } - else if( nsp[0] != '/') - { - aux.append("/",1); - aux.append(nsp); - } - else - { - aux = nsp; - } - - auto it = m_sockets.find(aux); - if(it!= m_sockets.end()) - { - return it->second; - } - else - { - std::pair p(aux,std::shared_ptr(new sio::socket(this,aux))); - return (m_sockets.insert(p).first)->second; - } - } - - void client_impl::run_loop() - { - - m_client.run(); - m_client.reset(); - m_client.get_alog().write(websocketpp::log::alevel::devel, - "run loop end"); - } - - socket::ptr client_impl::get_socket_locked(std::string const& nsp) - { - std::lock_guard guard(m_socket_mutex); - auto it = m_sockets.find(nsp); - if(it != m_sockets.end()) - { - return it->second; - } - else - { - return socket::ptr(); - } - } - - void client_impl::sockets_invoke_void(void (sio::socket::*fn)(void)) - { - std::map socks; - { - std::lock_guard guard(m_socket_mutex); - socks.insert(m_sockets.begin(),m_sockets.end()); - } - for (auto it = socks.begin(); it!=socks.end(); ++it) { - ((*(it->second)).*fn)(); - } - } } diff --git a/src/internal/sio_client_impl.h b/src/internal/sio_client_impl.h index cc3996c..9aab11a 100644 --- a/src/internal/sio_client_impl.h +++ b/src/internal/sio_client_impl.h @@ -118,44 +118,45 @@ void set_##__FIELD__(__TYPE__ const& l) \ void on_socket_opened(std::string const& nsp); private: - void __close(close::status::value const& code,std::string const& reason); + void run_loop(); + + void connect_impl(const std::string& uri); + + void close_impl(close::status::value const& code,std::string const& reason); - void __connect(const std::string& uri); + void send_impl(std::shared_ptr const& payload_ptr,frame::opcode::value opcode); - void __send(std::shared_ptr const& payload_ptr,frame::opcode::value opcode); - - void __ping(const boost::system::error_code& ec); - - void __timeout_pong(const boost::system::error_code& ec); - - void __timeout_connection(const boost::system::error_code& ec); + void ping(const boost::system::error_code& ec); + void timeout_pong(const boost::system::error_code& ec); + + void timeout_reconnect(boost::system::error_code const& ec); + + unsigned next_delay() const; + socket::ptr get_socket_locked(std::string const& nsp); void sockets_invoke_void(void (sio::socket::*fn)(void)); - - void reconnect(boost::system::error_code const& ec); - - unsigned next_delay() const; - - void run_loop(); void on_decode(packet const& pack); void on_encode(bool isBinary,shared_ptr const& payload); - // Callbacks + //websocket callbacks void on_fail(connection_hdl con); + void on_open(connection_hdl con); + void on_close(connection_hdl con); + void on_message(connection_hdl con, client_type::message_ptr msg); + + //socketio callbacks void on_handshake(message::ptr const& message); - + void on_pong(); - - void on_pong_timeout(); - + void reset_states(); - + void clear_timers(); // Connection pointer for client functions. @@ -184,7 +185,6 @@ void set_##__FIELD__(__TYPE__ const& l) \ client::con_listener m_fail_listener; client::con_listener m_reconnecting_listener; client::reconnect_listener m_reconnect_listener; - client::close_listener m_close_listener; client::socket_listener m_socket_open_listener; diff --git a/src/internal/sio_packet.h b/src/internal/sio_packet.h index b2137f9..c43a0ae 100755 --- a/src/internal/sio_packet.h +++ b/src/internal/sio_packet.h @@ -4,8 +4,8 @@ // Created by Melo Yao on 3/19/15. // -#ifndef __SIO_PACKET_H__ -#define __SIO_PACKET_H__ +#ifndef SIO_PACKET_H +#define SIO_PACKET_H #include #include "../sio_message.h" #include diff --git a/src/sio_client.h b/src/sio_client.h index a04a929..87cd59e 100755 --- a/src/sio_client.h +++ b/src/sio_client.h @@ -4,8 +4,8 @@ // Created by Melo Yao on 3/25/15. // -#ifndef __SIO_CLIENT__H__ -#define __SIO_CLIENT__H__ +#ifndef SIO_CLIENT_H +#define SIO_CLIENT_H #include #include #include "sio_message.h"