---恢复内容开始---
boost asio可算是一个简单易用,功能又强大可跨平台的C++通讯库,效率也表现的不错,linux环境是epoll实现的,而windows环境是iocp实现的。而tcp通讯是项目当中经常用到通讯方式之一,实现的方法有各式各样,因此总结一套适用于自己项目的方法是很有必要,很可能下一个项目直接套上去就可以用了。
二、实现思路1.通讯包数据结构
Tag:检查数据包是否合法,具体会在下面讲解;
Length:描述Body的长度;
Command:表示数据包的类型,0表示心跳包(长连接需要心跳来检测连接是否正常),1表示注册包(客户端连接上服务器之后要将相关信息注册给服务器),2表示业务消息包;
business_type:业务消息包类型,服务器会根据业务消息包类型将数据路由到对应的客户端(客户端是有业务类型分类的);
app_id:客户端唯一标识符;
Data:消息数据;
2.连接对象
客户端连接上服务器之后,双方都会产生一个socket连接对象,通过这个对象可以收发数据,因此我定义为socket_session。
//socket_session.h
- #pragmaonce
- #include<iostream>
- #include<list>
- #include<hash_map>
- #include<boost/bind.hpp>
- #include<boost/asio.hpp>
- #include<boost/shared_ptr.hpp>
- #include<boost/make_shared.hpp>
- #include<boost/thread.hpp>
- #include<boost/thread/mutex.hpp>
- #include<boost/enable_shared_from_this.hpp>
- #include<firebird/log/logger_log4.hpp>
- #include<firebird/detail/config.hpp>
- #include<firebird/socket_utils/message_archive.hpp>
- usingboost::asio::ip::tcp;
- namespacefirebird{
- enumcommand{heartbeat=0,regist,normal};
- conststd::stringtag="KDS";
- classFIREBIRD_DECLsocket_session;
- typedefboost::shared_ptr<socket_session>socket_session_ptr;
- classFIREBIRD_DECLsocket_session:
- publicboost::enable_shared_from_this<socket_session>,
- privateboost::noncopyable
- {
- public:
- typedefboost::function<void(socket_session_ptr)>close_callback;
- typedefboost::function<void(
- constboost::system::error_code&,
- socket_session_ptr,message&)>read_data_callback;
- socket_session(boost::asio::io_service&io_service);
- ~socket_session(void);
- DWORDid(){returnm_id;}
- WORDget_business_type(){returnm_business_type;}
- voidset_business_type(WORDtype){m_business_type=type;}
- DWORDget_app_id(){returnm_app_id;}
- voidset_app_id(DWORDapp_id){m_app_id=app_id;}
- std::string&get_remote_addr(){returnm_name;}
- voidset_remote_addr(std::string&name){m_name=name;}
- tcp::socket&socket(){returnm_socket;}
- voidinstallCloseCallBack(close_callbackcb){close_cb=cb;}
- voidinstallReadDataCallBack(read_data_callbackcb){read_data_cb=cb;}
- voidstart();
- voidclose();
- voidasync_write(conststd::string&sMsg);
- voidasync_write(message&msg);
- boolis_timeout();
- voidset_op_time(){std::time(&m_last_op_time);}
- private:
- staticboost::detail::atomic_countm_last_id;
- DWORDm_id;
- WORDm_business_type;
- DWORDm_app_id;
- std::stringm_name;
- boost::array<char,7>sHeader;
- std::stringsBody;
- tcp::socketm_socket;
- boost::asio::io_service&m_io_service;
- std::time_tm_last_op_time;
- close_callbackclose_cb;
- read_data_callbackread_data_cb;
- //发送消息
- voidhandle_write(constboost::system::error_code&e,
- std::size_tbytes_transferred,std::string*pmsg);
- //读消息头
- voidhandle_read_header(constboost::system::error_code&error);
- //读消息体
- voidhandle_read_body(constboost::system::error_code&error);
- voidhandle_close();
- };
- }
这里注意的是,定义了一个tag="KDS",目的是为了检查收到的数据包是否有效,每一个数据包前3个字节不为“KDS”,那么就认为是非法的请求包,你也可以定义tag等于其它字符串,只要按协议发包就正常,当然这是比较简单的数据包检查方法了。比较严谨的方法是双方使用哈希算法来检查的,怎么做,这里先不做详解。
//socket_session.cpp
- #include"socket_session.h"
- namespacefirebird{
- boost::detail::atomic_countsocket_session::m_last_id(0);
- socket_session::socket_session(boost::asio::io_service&io_srv)
- :m_io_service(io_srv),m_socket(io_srv),
- m_business_type(0),m_app_id(0)
- {
- m_id=++socket_session::m_last_id;
- }
- socket_session::~socket_session(void)
- {
- m_socket.close();
- }
- voidsocket_session::start()
- {
- m_socket.set_option(boost::asio::ip::tcp::acceptor::linger(true,0));
- m_socket.set_option(boost::asio::socket_base::keep_alive(true));
- std::time(&m_last_op_time);
- constboost::system::error_codeerror;
- handle_read_header(error);
- }
- voidsocket_session::handle_close()
- {
- try{
- m_socket.close();
- close_cb(shared_from_this());
- }
- catch(std::exception&e)
- {
- LOG4CXX_ERROR(firebird_log,KDS_CODE_INFO<<"连接远程地址:["<<get_remote_addr()<<"],socket异常:["<<e.what()<<"]");
- }
- catch(...)
- {
- LOG4CXX_ERROR(firebird_log,KDS_CODE_INFO<<"连接远程地址:["<<get_remote_addr()<<"],socket异常:[未知异常]");
- }
- }
- voidsocket_session::close()
- {
- //由于回调中有加锁的情况,必须提交到另外一个线程去做,不然会出现死锁
- m_io_service.post(boost::bind(&socket_session::handle_close,shared_from_this()));
- }
- staticintconnection_timeout=60;
- boolsocket_session::is_timeout()
- {
- std::time_tnow;
- std::time(&now);
- returnnow-m_last_op_time>connection_timeout;
- }
- //读消息头
- voidsocket_session::handle_read_header(constboost::system::error_code&error)
- {
- LOG4CXX_DEBUG(firebird_log,KDS_CODE_INFO<<"enter.");
- try{
- if(error)
- {
- LOG4CXX_ERROR(firebird_log,KDS_CODE_INFO<<"连接远程地址:["<<get_remote_addr()<<"],socket异常:["<<error.message().c_str()<<"]");
- close();
- return;
- }
- std::stringdata;
- data.swap(sBody);
- boost::asio::async_read(m_socket,
- boost::asio::buffer(sHeader),
- boost::bind(&socket_session::handle_read_body,shared_from_this(),
- boost::asio::placeholders::error));
- if(data.length()>0&&data!="")
- {//读到数据回调注册的READ_DATA函数
- messagemsg;
- message_iarchive(msg,data);
- read_data_cb(error,shared_from_this(),msg);
- }
- }
- catch(std::exception&e)
- {
- LOG4CXX_ERROR(firebird_log,KDS_CODE_INFO<<"连接远程地址:["<<get_remote_addr()<<"],socket异常:["<<e.what()<<"]");
- close();
- }
- catch(...)
- {
- LOG4CXX_ERROR(firebird_log,KDS_CODE_INFO<<"连接远程地址:["<<get_remote_addr()<<"],socket异常:[未知异常]");
- close();
- }
- }
- //读消息体
- voidsocket_session::handle_read_body(constboost::system::error_code&error)
- {
- LOG4CXX_DEBUG(firebird_log,KDS_CODE_INFO<<"enter.");
- try{
- if(error)
- {
- LOG4CXX_ERROR(firebird_log,KDS_CODE_INFO<<"连接远程地址:["<<get_remote_addr()<<"],socket异常:["<<error.message().c_str()<<"]");
- close();
- return;
- }
- if(tag.compare(0,tag.length(),sHeader.data(),0,tag.length()))
- {
- LOG4CXX_ERROR(firebird_log,KDS_CODE_INFO<<"连接远程地址:["<<get_remote_addr()<<"],socket异常:[这是个非法连接!]");
- close();
- return;
- }
- DWORDdwLength=0;
- char*len=(char*)&dwLength;
- memcpy(len,&sHeader[tag.length()],sizeof(dwLength));
- sBody.resize(dwLength);
- char*pBody=&sBody[0];
- boost::asio::async_read(m_socket,
- boost::asio::buffer(pBody,dwLength),
- boost::bind(&socket_session::handle_read_header,shared_from_this(),
- boost::asio::placeholders::error));
- }
- catch(std::exception&e)
- {
- LOG4CXX_ERROR(firebird_log,KDS_CODE_INFO<<"连接远程地址:["<<get_remote_addr()<<"],socket异常:["<<e.what()<<"]");
- close();
- }
- catch(...)
- {
- LOG4CXX_ERROR(firebird_log,KDS_CODE_INFO<<"连接远程地址:["<<get_remote_addr()<<"],socket异常:[未知异常]");
- close();
- }
- }
- voidsocket_session::handle_write(constboost::system::error_code&error,
- std::size_tbytes_transferred,std::string*pmsg)
- {
- //数据发送成功就销毁
- if(pmsg!=NULL)
- {
- deletepmsg;
- }
- if(error)
- {
- LOG4CXX_ERROR(firebird_log,KDS_CODE_INFO<<"连接远程地址:["<<get_remote_addr()<<"],socket异常:["<<error.message().c_str()<<"]");
- close();
- return;
- }
- }
- voidsocket_session::async_write(conststd::string&sMsg)
- {
- LOG4CXX_DEBUG(firebird_log,KDS_CODE_INFO<<"enter.")
- try
- {
- DWORDdwLength=sMsg.size();
- char*pLen=(char*)&dwLength;
- //由于是异步发送,要保证数据发送完整时,才把数据销毁
- std::string*msg=newstd::string();
- msg->append(tag);
- msg->append(pLen,sizeof(dwLength));
- msg->append(sMsg);
- boost::asio::async_write(m_socket,boost::asio::buffer(*msg,msg->size()),
- boost::bind(&socket_session::handle_write,shared_from_this(),
- boost::asio::placeholders::error,boost::asio::placeholders::bytes_transferred,
- msg));
- }
- catch(std::exception&e)
- {
- LOG4CXX_ERROR(firebird_log,KDS_CODE_INFO<<"连接远程地址:["<<get_remote_addr()<<"],socket异常:["<<e.what()<<"]");
- close();
- }
- catch(...)
- {
- LOG4CXX_ERROR(firebird_log,KDS_CODE_INFO<<"连接远程地址:["<<get_remote_addr()<<"],socket异常:[未知异常]");
- close();
- }
- }
- voidsocket_session::async_write(message&msg)
- {
- std::stringdata;
- message_oarchive(data,msg);
- async_write(data);
- }
- }
接受数据时,socket_session会先读取7个字节的head,比较前3个字节“KDS”,然后取得4个字节的Length,再读出Length长度的数据,最后将该数据传给read_data_cb回调函数处理,read_data_cb回调函数是在外部注册的。
3.连接管理器
对于服务器来说,它同时服务多个客户端,为了有效的管理,因此需要一个连接管理器,我定义为session_manager。session_manager主要是对socket_session的增删改查,和有效性检查。
//session_manager.h
- #pragmaonce
- #include"socket_session.h"
- #include"filter_container.h"
- #include<boost/date_time/posix_time/posix_time.hpp>
- #include<boost/multi_index_container.hpp>
- #include<boost/multi_index/member.hpp>
- #include<boost/multi_index/ordered_index.hpp>
- #include<boost/typeof/typeof.hpp>
- #include<boost/random.hpp>
- #include<boost/pool/detail/singleton.hpp>
- namespacefirebird{
- template<typenameT>
- classvar_gen_wraper
- {
- public:
- var_gen_wraper():gen(boost::mt19937((boost::int32_t)std::time(0)),
- boost::uniform_smallint<>(1,100)){}
- typenameT::result_typeoperator()(){returngen();}
- private:
- Tgen;
- };
- structsession_stu
- {
- DWORDid;
- WORDbusiness_type;
- std::stringaddress;
- DWORDapp_id;
- socket_session_ptrsession;
- };
- structsid{};
- structsbusiness_type{};
- structsaddress{};
- structsapp_id{};
- enumsession_idx_member{session_id=0,session_business_type,session_address,app_id};
- #defineCLIENT0
- #defineSERVER1
- typedefboost::multi_index::multi_index_container<
- session_stu,
- boost::multi_index::indexed_by<
- boost::multi_index::ordered_unique<
- boost::multi_index::tag<sid>,BOOST_MULTI_INDEX_MEMBER(session_stu,DWORD,id)>,
- boost::multi_index::ordered_non_unique<
- boost::multi_index::tag<sbusiness_type>,BOOST_MULTI_INDEX_MEMBER(session_stu,WORD,business_type)>,
- boost::multi_index::ordered_non_unique<
- boost::multi_index::tag<saddress>,BOOST_MULTI_INDEX_MEMBER(session_stu,std::string,address)>,
- boost::multi_index::ordered_non_unique<
- boost::multi_index::tag<sapp_id>,BOOST_MULTI_INDEX_MEMBER(session_stu,DWORD,app_id)>
- >
- >session_set;
- #defineMULTI_MEMBER_CON(Tag)boost::multi_index::index<session_set,Tag>::type&
- #defineMULTI_MEMBER_ITR(Tag)boost::multi_index::index<session_set,Tag>::type::iterator
- structis_business_type{
- is_business_type(WORDtype)
- :m_type(type)
- {
- }
- booloperator()(constsession_stu&s)
- {
- return(s.business_type==m_type);
- }
- WORDm_type;
- };
- classsession_manager
- {
- public:
- typedefboost::shared_lock<boost::shared_mutex>readLock;
- typedefboost::unique_lock<boost::shared_mutex>writeLock;
- session_manager(boost::asio::io_service&io_srv,inttype,intexpires_time);
- ~session_manager();
- voidadd_session(socket_session_ptrp);
- voidupdate_session(socket_session_ptrp);
- template<typenameTag,typenameMember>
- voiddel_session(Memberm)
- {
- writeLocklock(m_mutex);
- if(m_sessions.empty())
- {
- return;
- }
- MULTI_MEMBER_CON(Tag)idx=boost::multi_index::get<Tag>(m_sessions);
- //BOOST_AUTO(idx,boost::multi_index::get<Tag>(m_sessions));
- BOOST_AUTO(iter,idx.find(m));
- if(iter!=idx.end())
- {
- idx.erase(iter);
- }
- }
- //获取容器中的第一个session
- template<typenameTag,typenameMember>
- socket_session_ptrget_session(Memberm)
- {
- readLocklock(m_mutex);
- if(m_sessions.empty())
- {
- returnsocket_session_ptr();
- }
- MULTI_MEMBER_CON(Tag)idx=boost::multi_index::get<Tag>(m_sessions);
- BOOST_AUTO(iter,idx.find(m));
- returniter!=boost::end(idx)?iter->session:socket_session_ptr();
- }
- //随机获取容器中的session
- template<typenameTag>
- socket_session_ptrget_session_by_business_type(WORDm)
- {
- typedeffilter_container<is_business_type,MULTI_MEMBER_ITR(Tag)>FilterContainer;
- readLocklock(m_mutex);
- if(m_sessions.empty())
- {
- returnsocket_session_ptr();
- }
- MULTI_MEMBER_CON(Tag)idx=boost::multi_index::get<Tag>(m_sessions);
- //对容器的元素条件过滤
- is_business_typepredicate(m);
- FilterContainerfc(predicate,idx.begin(),idx.end());
- FilterContainer::FilterIteriter=fc.begin();
- if(fc.begin()==fc.end())
- {
- returnsocket_session_ptr();
- }
- //typedefboost::variate_generator<boost::mt19937,boost::uniform_smallint<>>var_gen;
- //typedefboost::details::pool::singleton_default<var_gen_wraper<var_gen>>s_var_gen;
- ////根据随机数产生session
- //s_var_gen::object_type&gen=s_var_gen::instance();
- //intstep=gen()%fc.szie();
- intstep=m_next_session%fc.szie();
- ++m_next_session;
- for(inti=0;i<step;++i)
- {
- iter++;
- }
- returniter!=fc.end()?iter->session:socket_session_ptr();
- }
- //根据类型和地址取session
- template<typenameTag>
- socket_session_ptrget_session_by_type_ip(WORDm,std::string&ip)
- {
- typedeffilter_container<is_business_type,MULTI_MEMBER_ITR(Tag)>FilterContainer;
- readLocklock(m_mutex);
- if(m_sessions.empty())
- {
- returnsocket_session_ptr();
- }
- MULTI_MEMBER_CON(Tag)idx=boost::multi_index::get<Tag>(m_sessions);
- //对容器的元素条件过滤
- is_business_typepredicate(m);
- FilterContainerfc(predicate,idx.begin(),idx.end());
- FilterContainer::FilterIteriter=fc.begin();
- if(fc.begin()==fc.end())
- {
- returnsocket_session_ptr();
- }
- while(iter!=fc.end())
- {
- if(iter->session->get_remote_addr().find(ip)!=std::string::npos)
- {
- break;
- }
- iter++;
- }
- returniter!=fc.end()?iter->session:socket_session_ptr();
- }
- //根据类型和app_id取session
- template<typenameTag>
- socket_session_ptrget_session_by_type_appid(WORDm,DWORDapp_id)
- {
- typedeffilter_container<is_business_type,MULTI_MEMBER_ITR(Tag)>FilterContainer;
- readLocklock(m_mutex);
- if(m_sessions.empty())
- {
- returnsocket_session_ptr();
- }
- MULTI_MEMBER_CON(Tag)idx=boost::multi_index::get<Tag>(m_sessions);
- //对容器的元素条件过滤
- is_business_typepredicate(m);
- FilterContainerfc(predicate,idx.begin(),idx.end());
- FilterContainer::FilterIteriter=fc.begin();
- if(fc.begin()==fc.end())
- {
- returnsocket_session_ptr();
- }
- while(iter!=fc.end())
- {
- if(iter->session->get_app_id()==app_id)
- {
- break;
- }
- iter++;
- }
- returniter!=fc.end()?iter->session:socket_session_ptr();
- }
- private:
- intm_type;
- intm_expires_time;
- boost::asio::io_service&m_io_srv;
- boost::asio::deadline_timerm_check_tick;
- boost::shared_mutexm_mutex;
- unsignedshortm_next_session;
- session_setm_sessions;
- voidcheck_connection();
- };
- }
这里主要用到了boost的multi_index容器,这是一个非常有用方便的容器,可实现容器的多列索引,具体的使用方法,在这里不多做详解。
//session_manager.cpp
- #include"session_manager.h"
- namespacefirebird{
- session_manager::session_manager(boost::asio::io_service&io_srv,inttype,intexpires_time)
- :m_io_srv(io_srv),m_check_tick(io_srv),m_type(type),m_expires_time(expires_time),m_next_session(0)
- {
- check_connection();
- }
- session_manager::~session_manager()
- {
- }
- //检查服务器所有session的连接状态
- voidsession_manager::check_connection()
- {
- try{
- writeLocklock(m_mutex);
- session_set::iteratoriter=m_sessions.begin();
- while(iter!=m_sessions.end())
- {
- LOG4CXX_DEBUG(firebird_log,"循环");
- if(CLIENT==m_type)//客户端的方式
- {
- if(!iter->session->socket().is_open())//已断开,删除已断开的连接
- {
- LOG4CXX_INFO(firebird_log,"重新连接["<<iter->address<<"]");
- iter->session->close();//通过关闭触发客户端重连
- }
- else{//连接中,发送心跳
- messagemsg;
- msg.command=heartbeat;
- msg.business_type=iter->session->get_business_type();
- msg.app_id=iter->session->get_app_id();
- msg.data()="H";
- iter->session->async_write(msg);
- iter->session->set_op_time();
- }
- }
- elseif(SERVER==m_type)//服务器的方式
- {
- if(!iter->session->socket().is_open())//已断开,删除已断开的连接
- {
- LOG4CXX_INFO(firebird_log,KDS_CODE_INFO<<"删除已关闭的session:["<<iter->session->get_remote_addr()<<"]");
- iter=m_sessions.erase(iter);
- continue;
- }
- else{//连接中,设定每30秒检查一次
- if(iter->session->is_timeout())//如果session已长时间没操作,则关闭
- {
- LOG4CXX_INFO(firebird_log,KDS_CODE_INFO<<"删除已超时的session:["<<iter->session->get_remote_addr()<<"]");
- iter->session->close();//通过关闭触发删除session
- }
- }
- iter->session->set_op_time();
- }
- else{
- LOG4CXX_ERROR(firebird_log,KDS_CODE_INFO<<"unknownmanager_type");
- }
- ++iter;
- }
- LOG4CXX_DEBUG(firebird_log,"定时检查");
- m_check_tick.expires_from_now(boost::posix_time::seconds(m_expires_time));
- m_check_tick.async_wait(boost::bind(&session_manager::check_connection,this));
- }
- catch(std::exception&e)
- {
- LOG4CXX_ERROR(firebird_log,KDS_CODE_INFO<<"["<<e.what()<<"]");
- }
- catch(...)
- {
- LOG4CXX_ERROR(firebird_log,KDS_CODE_INFO<<"unknownexception.");
- }
- }
- voidsession_manager::add_session(socket_session_ptrp)
- {
- writeLocklock(m_mutex);
- session_stustuSession;
- stuSession.id=p->id();
- stuSession.business_type=0;
- stuSession.address=p->get_remote_addr();
- stuSession.app_id=p->get_app_id();
- stuSession.session=p;
- m_sessions.insert(stuSession);
- }
- voidsession_manager::update_session(socket_session_ptrp)
- {
- writeLocklock(m_mutex);
- if(m_sessions.empty())
- {
- return;
- }
- MULTI_MEMBER_CON(sid)idx=boost::multi_index::get<sid>(m_sessions);
- BOOST_AUTO(iter,idx.find(p->id()));
- if(iter!=idx.end())
- {
- const_cast<session_stu&>(*iter).business_type=p->get_business_type();
- const_cast<session_stu&>(*iter).app_id=p->get_app_id();
- }
- }
- }
这个时候,我就可以使用id、business_type、address、app_id当做key来索引socket_session了,单使用map容器是做不到的。
还有索引时,需要的一个条件过滤器
//filter_container.h
- #pragmaonce
- #include<boost/iterator/filter_iterator.hpp>
- namespacefirebird{
- template<classPredicate,classIterator>
- classfilter_container
- {
- public:
- typedefboost::filter_iterator<Predicate,Iterator>FilterIter;
- filter_container(Predicatep,Iteratorbegin,Iteratorend)
- :m_begin(p,begin,end),
- m_end(p,end,end)
- {
- }
- ~filter_container(){}
- FilterIterbegin(){returnm_begin;}
- FilterIterend(){returnm_end;}
- intszie(){
- inti=0;
- FilterIterfi=m_begin;
- while(fi!=m_end)
- {
- ++i;
- ++fi;
- }
- returni;
- }
- private:
- FilterIterm_begin;
- FilterIterm_end;
- };
- }
4.服务器端的实现
服务器我定义为server_socket_utils,拥有一个session_manager,每当accept成功得到一个socket_session时,都会将其增加到session_manager去管理,注册相关回调函数。
read_data_callback 接收到数据的回调函数
收到数据之后,也就是数据包的body部分,反序列化出command、business_type、app_id和data(我使用到了thrift),如果command==normal正常的业务包,会调用handle_read_data传入data。
close_callback 关闭socket_session触发的回调函数
根据id将该连接从session_manager中删除掉
//server_socket_utils.h
- #pragmaonce
- #include"socket_session.h"
- #include"session_manager.h"
- #include<boost/format.hpp>
- #include<firebird/message/message.hpp>
- namespacefirebird{
- usingboost::asio::ip::tcp;
- classFIREBIRD_DECLserver_socket_utils
- {
- private:
- boost::asio::io_servicem_io_srv;
- boost::asio::io_service::workm_work;
- tcp::acceptorm_acceptor;
- voidhandle_accept(socket_session_ptrsession,constboost::system::error_code&error);
- voidclose_callback(socket_session_ptrsession);
- voidread_data_callback(constboost::system::error_code&e,
- socket_session_ptrsession,message&msg);
- protected:
- virtualvoidhandle_read_data(message&msg,socket_session_ptrpSession)=0;
- public:
- server_socket_utils(intport);
- ~server_socket_utils(void);
- voidstart();
- boost::asio::io_service&get_io_service(){returnm_io_srv;}
- session_managerm_manager;
- };
- }
//server_socket_utils.cpp
- #include"server_socket_utils.h"
- namespacefirebird{
- server_socket_utils::server_socket_utils(intport)
- :m_work(m_io_srv),
- m_acceptor(m_io_srv,tcp::endpoint(tcp::v4(),port)),
- m_manager(m_io_srv,SERVER,3)
- {
- //m_acceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
- ////关闭连接前留0秒给客户接收数据
- //m_acceptor.set_option(boost::asio::ip::tcp::acceptor::linger(true,0));
- //m_acceptor.set_option(boost::asio::ip::tcp::no_delay(true));
- //m_acceptor.set_option(boost::asio::socket_base::keep_alive(true));
- //m_acceptor.set_option(boost::asio::socket_base::receive_buffer_size(16384));
- }
- server_socket_utils::~server_socket_utils(void)
- {
- }
- voidserver_socket_utils::start()
- {
- try{
- socket_session_ptrnew_session(newsocket_session(m_io_srv));
- m_acceptor.async_accept(new_session->socket(),
- boost::bind(&server_socket_utils::handle_accept,this,new_session,
- boost::asio::placeholders::error));
- }
- catch(std::exception&e)
- {
- LOG4CXX_ERROR(firebird_log,KDS_CODE_INFO<<"socket异常:["<<e.what()<<"]");
- }
- catch(...)
- {
- LOG4CXX_ERROR(firebird_log,KDS_CODE_INFO<<"socket异常:[未知异常]");
- }
- }
- voidserver_socket_utils::handle_accept(socket_session_ptrsession,constboost::system::error_code&error)
- {
- if(!error)
- {
- try{
- socket_session_ptrnew_session(newsocket_session(m_io_srv));
- m_acceptor.async_accept(new_session->socket(),
- boost::bind(&server_socket_utils::handle_accept,this,new_session,
- boost::asio::placeholders::error));
- if(session!=NULL)
- {
- //注册关闭回调函数
- session->installCloseCallBack(boost::bind(&server_socket_utils::close_callback,this,_1));
- //注册读到数据回调函数
- session->installReadDataCallBack(boost::bind(&server_socket_utils::read_data_callback,this,_1,_2,_3));
- boost::formatfmt("%1%:%2%");
- fmt%session->socket().remote_endpoint().address().to_string();
- fmt%session->socket().remote_endpoint().port();
- session->set_remote_addr(fmt.str());
- session->start();
- m_manager.add_session(session);
- }
- }
- catch(std::exception&e)
- {
- LOG4CXX_ERROR(firebird_log,KDS_CODE_INFO<<"socket异常:["<<e.what()<<"]");
- }
- catch(...)
- {
- LOG4CXX_ERROR(firebird_log,KDS_CODE_INFO<<"socket异常:[未知异常]");
- }
- }
- }
- voidserver_socket_utils::close_callback(socket_session_ptrsession)
- {
- LOG4CXX_DEBUG(firebird_log,"close_callback");
- try{
- m_manager.del_session<sid>(session->id());
- }
- catch(std::exception&e)
- {
- LOG4CXX_ERROR(firebird_log,KDS_CODE_INFO<<"socket异常:["<<e.what()<<"]");
- }
- catch(...)
- {
- LOG4CXX_ERROR(firebird_log,KDS_CODE_INFO<<"socket异常:[未知异常]");
- }
- }
- voidserver_socket_utils::read_data_callback(constboost::system::error_code&e,
- socket_session_ptrsession,message&msg)
- {
- try{
- LOG4CXX_DEBUG(firebird_log,"command=["<<msg.command<<"],["
- <<msg.business_type<<"],["<<msg.data()<<"]");
- if(msg.command==heartbeat)
- {//心跳
- session->async_write(msg);
- }
- elseif(msg.command==regist)
- {//注册
- session->set_business_type(msg.business_type);
- session->set_app_id(msg.app_id);
- m_manager.update_session(session);
- session->async_write(msg);
- LOG4CXX_FATAL(firebird_log,"远程地址:["<<session->get_remote_addr()<<"],服务器类型:["<<
- session->get_business_type()<<"],服务器ID:["<<session->get_app_id()<<"]注册成功!");
- }
- elseif(msg.command==normal)
- {//业务数据
- handle_read_data(msg,session);
- }
- else
- {
- LOG4CXX_ERROR(firebird_log,KDS_CODE_INFO<<"收到非法消息包!");
- }
- }
- catch(std::exception&e)
- {
- LOG4CXX_ERROR(firebird_log,KDS_CODE_INFO<<"socket异常:["<<e.what()<<"]");
- }
- catch(...)
- {
- LOG4CXX_ERROR(firebird_log,KDS_CODE_INFO<<"socket异常:[未知异常]");
- }
- }
- }
5.客户端
客户端与服务器的逻辑也差不多,区别就是在于客户端通过connect得到socket_session,而服务器是通过accept得到socket_session。
//client_socket_utils.h
- #pragmaonce
- #include"socket_session.h"
- #include"session_manager.h"
- #include<boost/algorithm/string.hpp>
- #include<firebird/message/message.hpp>
- namespacefirebird{
- classFIREBIRD_DECLclient_socket_utils
- {
- public:
- client_socket_utils();
- ~client_socket_utils();
- voidsession_connect(std::vector<socket_session_ptr>&vSession);
- voidsession_connect(socket_session_ptrpSession);
- //socket_session_ptrget_session(std::string&addr);
- boost::asio::io_service&get_io_service(){returnm_io_srv;}
- protected:
- virtualvoidhandle_read_data(message&msg,socket_session_ptrpSession)=0;
- private:
- boost::asio::io_servicem_io_srv;
- boost::asio::io_service::workm_work;
- session_managerm_manager;
- voidhandle_connect(constboost::system::error_code&error,
- tcp::resolver::iteratorendpoint_iterator,socket_session_ptrpSession);
- voidclose_callback(socket_session_ptrsession);
- voidread_data_callback(constboost::system::error_code&e,
- socket_session_ptrsession,message&msg);
- };
- }
//client_socket_utils.cpp
- #include"client_socket_utils.h"
- namespacefirebird{
- client_socket_utils::client_socket_utils()
- :m_work(m_io_srv),m_manager(m_io_srv,CLIENT,3)
- {
- }
- client_socket_utils::~client_socket_utils()
- {
- }
- voidclient_socket_utils::session_connect(std::vector<socket_session_ptr>&vSession)
- {
- for(inti=0;i<vSession.size();++i)
- {
- session_connect(vSession[i]);
- }
- }
- voidclient_socket_utils::session_connect(socket_session_ptrpSession)
- {
- std::string&addr=pSession->get_remote_addr();
- try{
- //注册关闭回调函数
- pSession->installCloseCallBack(boost::bind(&client_socket_utils::close_callback,this,_1));
- //注册读到数据回调函数
- pSession->installReadDataCallBack(boost::bind(&client_socket_utils::read_data_callback,this,_1,_2,_3));
- std::vector<std::string>ip_port;
- boost::split(ip_port,addr,boost::is_any_of(":"));
- if(ip_port.size()<2)
- {
- //throwstd::runtime_error("ip格式不正确!");
- LOG4CXX_ERROR(firebird_log,"["<<addr<<"]ip格式不正确!");
- return;
- }
- tcp::resolverresolver(pSession->socket().get_io_service());
- tcp::resolver::queryquery(ip_port[0],ip_port[1]);
- tcp::resolver::iteratorendpoint_iterator=resolver.resolve(query);
- //pSession->set_begin_endpoint(endpoint_iterator);//设置起始地址,以便重连
- //由于客户端是不断重连的,即使还未连接也要保存该session
- m_manager.add_session(pSession);
- tcp::endpointendpoint=*endpoint_iterator;
- pSession->socket().async_connect(endpoint,
- boost::bind(&client_socket_utils::handle_connect,this,
- boost::asio::placeholders::error,++endpoint_iterator,pSession));
- }
- catch(std::exception&e)
- {
- LOG4CXX_ERROR(firebird_log,KDS_CODE_INFO<<"连接远程地址:["<<addr<<"],socket异常:["<<e.what()<<"]");
- }
- catch(...)
- {
- LOG4CXX_ERROR(firebird_log,KDS_CODE_INFO<<"连接远程地址:["<<addr<<"],socket异常:[未知异常]");
- }
- }
- voidclient_socket_utils::handle_connect(constboost::system::error_code&error,
- tcp::resolver::iteratorendpoint_iterator,socket_session_ptrpSession)
- {
- LOG4CXX_DEBUG(firebird_log,KDS_CODE_INFO<<"enter.");
- std::stringsLog;
- try{
- if(!error)
- {
- LOG4CXX_FATAL(firebird_log,"服务器:["<<pSession->get_business_type()<<"],连接远程地址:["<<pSession->get_remote_addr().c_str()<<"]成功!");
- pSession->start();
- //向服务器注册服务类型
- messagemsg;
- msg.command=regist;
- msg.business_type=pSession->get_business_type();
- msg.app_id=pSession->get_app_id();
- msg.data()="R";
- pSession->async_write(msg);
- }
- elseif(endpoint_iterator!=tcp::resolver::iterator())
- {
- LOG4CXX_ERROR(firebird_log,"连接远程地址:["<<pSession->get_remote_addr().c_str()<<"]失败,试图重连下一个地址。");
- pSession->socket().close();//此处用socket的close,不应用session的close触发连接,不然会导致一直重连
- tcp::endpointendpoint=*endpoint_iterator;
- pSession->socket().async_connect(endpoint,
- boost::bind(&client_socket_utils::handle_connect,this,
- boost::asio::placeholders::error,++endpoint_iterator,pSession));
- }
- else
- {
- LOG4CXX_ERROR(firebird_log,KDS_CODE_INFO<<"连接远程地址:["<<pSession->get_remote_addr().c_str()<<"]失败!");
- pSession->socket().close();//此处用socket的close,不应用session的close触发连接,不然会导致一直重连
- }
- }
- catch(std::exception&e)
- {
- LOG4CXX_ERROR(firebird_log,KDS_CODE_INFO<<"连接远程地址:["<<pSession->get_remote_addr().c_str()<<"],socket异常:["<<e.what()<<"]");
- }
- catch(...)
- {
- LOG4CXX_ERROR(firebird_log,KDS_CODE_INFO<<"连接远程地址:["<<pSession->get_remote_addr().c_str()<<"],socket异常:[未知异常]");
- }
- }
- voidclient_socket_utils::read_data_callback(constboost::system::error_code&e,
- socket_session_ptrsession,message&msg)
- {
- LOG4CXX_DEBUG(firebird_log,"command=["<<msg.command<<"],["
- <<msg.business_type<<"],["<<msg.data()<<"]");
- if(msg.command==heartbeat)
- {//心跳
- }
- elseif(msg.command==regist)
- {//注册
- LOG4CXX_FATAL(firebird_log,"服务器:["<<session->get_business_type()<<"]注册成功。");
- }
- elseif(msg.command==normal)
- {//业务数据
- handle_read_data(msg,session);
- }
- else
- {
- LOG4CXX_ERROR(firebird_log,KDS_CODE_INFO<<"收到非法消息包!");
- }
- }
- //关闭session就会重连
- voidclient_socket_utils::close_callback(socket_session_ptrsession)
- {
- LOG4CXX_DEBUG(firebird_log,KDS_CODE_INFO<<"enter.");
- try{
- //tcp::resolver::iteratorendpoint_iterator=context.session->get_begin_endpoint();
- std::string&addr=session->get_remote_addr();
- std::vector<std::string>ip_port;
- boost::split(ip_port,addr,boost::is_any_of(":"));
- if(ip_port.size()<2)
- {
- LOG4CXX_ERROR(firebird_log,"["<<addr<<"]ip格式不正确!");
- return;
- }
- tcp::resolverresolver(session->socket().get_io_service());
- tcp::resolver::queryquery(ip_port[0],ip_port[1]);
- tcp::resolver::iteratorendpoint_iterator=resolver.resolve(query);
- tcp::endpointendpoint=*endpoint_iterator;
- session->socket().async_connect(endpoint,
- boost::bind(&client_socket_utils::handle_connect,this,
- boost::asio::placeholders::error,++endpoint_iterator,session));
- }
- catch(std::exception&e)
- {
- LOG4CXX_ERROR(firebird_log,KDS_CODE_INFO<<"连接远程地址:["<<session->get_remote_addr().c_str()<<"],socket异常:["<<e.what()<<"]");
- }
- catch(...)
- {
- LOG4CXX_ERROR(firebird_log,KDS_CODE_INFO<<"连接远程地址:["<<session->get_remote_addr().c_str()<<"],socket异常:[未知异常]");
- }
- }
- }
5.对象串行化
socket_session发送和接收数据包的时候使用到了对象串行化,我这里是通过thrift实现的,其实boost的serialization库也提供了这样的功能,使用起来更为方便,但我在测试过程中,thrift相比之下性能会高很多,因此就坚持使用thrift了,感兴趣的话可以看我之前写的《使用thrift串行化对象》和《轻量级序列化库boost serialization》。
5.1字符串与thrift对象的相互转换
- #pragmaonce
- #include<boost/shared_ptr.hpp>
- #include<transport/TBufferTransports.h>
- #include<protocol/TProtocol.h>
- #include<protocol/TBinaryProtocol.h>
- namespacefirebird{
- usingnamespaceapache::thrift;
- usingnamespaceapache::thrift::transport;
- usingnamespaceapache::thrift::protocol;
- template<typenameT>
- voidthrift_iserialize(T&stu,std::string&s)
- {
- boost::shared_ptr<TMemoryBuffer>trans(newTMemoryBuffer((uint8_t*)&s[0],s.size()));
- boost::shared_ptr<TProtocol>proto(newTBinaryProtocol(trans));
- stu.read(proto.get());
- }
- template<typenameT>
- voidthrift_oserialize(T&stu,std::string&s)
- {
- boost::shared_ptr<TMemoryBuffer>trans(newTMemoryBuffer());
- boost::shared_ptr<TProtocol>proto(newTBinaryProtocol(trans));
- stu.write(proto.get());
- s=trans->getBufferAsString();
- }
- }
5.2通过thrift对象,普通的对象与字符串的相互转换
- #pragmaonce
- #include"message_archive.hpp"
- #include<firebird/archive/thrift_archive.hpp>
- #include<firebird/message/TMessage_types.h>
- namespacefirebird
- {
- /***messagetoThriftMessage***/
- voidmsg_to_tmsg(TMessage&tmsg,message&msg)
- {
- //设置
- tmsg.command=msg.command;
- tmsg.business_type=msg.business_type;
- tmsg.app_id=msg.app_id;
- //设置context
- tmsg.context.cmdVersion=msg.context().cmdVersion;
- tmsg.context.cpid.swap(msg.context().cpid);
- tmsg.context.remote_ip.swap(msg.context().remote_ip);
- tmsg.context.wSerialNumber=msg.context().wSerialNumber;
- tmsg.context.session_id=msg.context().session_id;
- //设置source
- for(inti=0;i<msg.source().size();++i)
- {
- tmsg.source.push_back(msg.source()[i]);
- }
- //设置destination
- for(inti=0;i<msg.destination().size();++i)
- {
- tmsg.destination.push_back(msg.destination()[i]);
- }
- //设置data
- tmsg.data=msg.data();
- }
- /***ThriftMessagetomessage***/
- voidtmsg_to_msg(message&msg,TMessage&tmsg)
- {
- //设置
- msg.command=tmsg.command;
- msg.business_type=tmsg.business_type;
- msg.app_id=tmsg.app_id;
- //设置context
- msg.context().cmdVersion=tmsg.context.cmdVersion;
- msg.context().cpid=tmsg.context.cpid;
- msg.context().remote_ip=tmsg.context.remote_ip;
- msg.context().wSerialNumber=tmsg.context.wSerialNumber;
- msg.context().session_id=tmsg.context.session_id;
- //设置source
- for(inti=0;i<tmsg.source.size();++i)
- {
- msg.source()<<tmsg.source[i];
- }
- //设置destination
- for(inti=0;i<tmsg.destination.size();++i)
- {
- msg.destination()<<tmsg.destination[i];
- }
- //设置data
- msg.data()=tmsg.data;
- }
- voidmessage_iarchive(message&msg,std::string&s)
- {
- TMessagetmsg;
- thrift_iserialize(tmsg,s);
- tmsg_to_msg(msg,tmsg);
- }
- voidmessage_oarchive(std::string&s,message&msg)
- {
- TMessagetmsg;
- msg_to_tmsg(tmsg,msg);
- thrift_oserialize(tmsg,s);
- }
- }
---恢复内容结束---