dbproxy.cpp 4.63 KB
#include "dbproxy.h"
#include "gateway.h"
#include <proto_header.h>
#include <db.pb.h>

dbproxy_t* g_dbproxy;

int dbproxy_t::send_msg( google::protobuf::Message* msg, USER_ID uid,
						CMD_ID cmd, user_t* handle_callback_user )
{
	proto_head_t proto_head;
	proto_head.cmd = cmd;
	proto_head.id = uid;

	auto it = this->msg_map.find(cmd);
	if(this->msg_map.end() != it){
		proto_head.seq = this->add_handle_callback_user(handle_callback_user);
	}

// 	if (NULL != handle_callback_user){
// 		proto_head.seq = this->add_handle_callback_user(handle_callback_user);
// 	}

	try{
		void* sb = g_gateway->send_msg_buf.send_data + proto_head_t::PROTO_HEAD_LEN;
		const int sb_len = sizeof(g_gateway->send_msg_buf.send_data) - proto_head_t::PROTO_HEAD_LEN;
		msg->SerializeToArray(sb, sb_len);

		uint32_t body_len = msg->ByteSize();

		TRACE_LOG("======>s2db msg [uid:%" PRIu64 ", cmd:0x%x, body_len:%u]", uid, cmd, body_len);

		g_gateway->send_msg_buf.add_body_len(body_len);
		g_gateway->send_msg_buf.set_head(&proto_head);
		
		int ret = this->send(g_gateway->send_msg_buf.send_data, body_len + proto_head_t::PROTO_HEAD_LEN);

		if (!msg->Utf8DebugString().empty()){
			TRACE_LOG("%s", msg->Utf8DebugString().c_str());
		}
		return ret;
	} catch (...){
		ERROR_LOG("[uid:%" PRIu64 ", cmd:%u, seq:%u]", uid, (uint32_t)cmd, proto_head.seq);
	}
	return 0;
}

int dbproxy_t::send( const void* data, uint32_t len )
{
	if (NULL == this->fd_info){
		std::string db_ip = g_bench_conf->get_strval("dbproxy", "ip");
		uint16_t db_port = ::atoi(g_bench_conf->get_strval("dbproxy", "port").c_str());	
		fd_info = el_async::connect(db_ip, db_port);
		if (fd_info == NULL){
			WARN_LOG("connet to db fail |%s|%u", db_ip.c_str(), db_port);
			return ERR;
		}
	}

//    TRACE_MSG_HEX_LOG(data, len);

	return el_async::s2peer(fd_info, data, len);
}

dbproxy_t::dbproxy_t()
{
	this->fd_info = NULL;

#undef  BIND_PROTO_CMD
#undef  BIND_PROTO_CMD_NO_CB

#define BIND_PROTO_CMD(cmd, fun_name, proto_name)\
	{\
		if (this->msg_map.end() != this->msg_map.find(cmd)){\
			WARN_LOG("cmd inster err![0x%X]", cmd);\
			exit(0);\
		} else {\
		this->msg_map[cmd] = MSG_HANDLE_DBPROXY_T(new db_msg::proto_name##_res(), &dbproxy_t::fun_name##_res, true);\
		}\
	}
#define BIND_PROTO_CMD_NO_CB(cmd, fun_name, proto_name)

#include <db_cmd.h>
#undef  BIND_PROTO_CMD
#undef  BIND_PROTO_CMD_NO_CB
}

dbproxy_t::~dbproxy_t()
{
	FOREACH(this->msg_map, it){
		SAFE_DELETE(it->second.prototype);
	}
	this->msg_map.clear();
}

int dbproxy_t::on_recv( el::lib_tcp_peer_info_t* peer_fd_info, CMD_ID cmd,
					   char* pdata, int len, USER_ID uid, uint32_t seq,
					   uint32_t ret )
{
	if (db_load_user_msg_cmd == cmd){
		INFO_LOG("statisitcs_login");
	}
	
	TRACE_LOG("<======recv db [uid:%" PRIu64 ", cmd:0x%x, body_len:%u, seq:%u]", 
		uid, cmd, len, seq);
    if (SUCC != ret){
        ERROR_LOG("db back msg error [ret:%#x]", ret);
    }

	auto it = this->msg_map.find(cmd);
	if(it == this->msg_map.end()){
		WARN_LOG("[cmd:%#x]", cmd);
		return ERR;
	}
// 	if (!it->second.is_callback){
// 		return SUCC;
// 	}

	try{
		if(!it->second.prototype->ParseFromArray(pdata, len)){
			WARN_LOG("[cmd:0x%#x]", cmd);
			return ERR;
		}
	}catch(...)
	{
		WARN_LOG("[cmd:0x%#x", cmd);
		return ERR;
	}

	user_t* user = NULL;
	if (db_msg::SYS_CMD_BEGIN < cmd && cmd < db_msg::SYS_CMD_END ){
	} else {
		user = pop_handle_callback_user(seq);
		if (NULL == user){
			WARN_LOG("");
			return ERR;
		}
	}

    if (!it->second.prototype->Utf8DebugString().empty()){
        TRACE_LOG("db back msg \n%s ", it->second.prototype->Utf8DebugString().c_str());
    }
	return (this->*it->second.func)(peer_fd_info, it->second.prototype, uid, seq, ret, user);
}

uint32_t dbproxy_t::add_handle_callback_user(user_t* user)
{
	static uint32_t seq = 0;
    if (!this->handle_callback_user_map.insert(std::make_pair(++seq, user)).second){
        WARN_LOG("[add db seq error seq:%u]", seq);
    }

    return seq;
}

user_t* dbproxy_t::pop_handle_callback_user(uint32_t seq)
{
    auto it = handle_callback_user_map.find(seq);
    if (handle_callback_user_map.end() == it){
		WARN_LOG("[pop wait db seq not exist seq:%u]", seq);
        return NULL;
    }

    user_t* user = it->second;
    handle_callback_user_map.erase(seq);

//    DEBUG_LOG("[pop wait db seq succ seq:%u]", seq);

    return user;
}

void dbproxy_t::clear_handle_callback_user(user_t* user)
{
    auto it = this->handle_callback_user_map.begin();
    for (; it != this->handle_callback_user_map.end(); ){
       if (it->second == user){
           this->handle_callback_user_map.erase(it++);
       } else {
           ++it;
       }
    }
}

void dbproxy_t::offline()
{
	this->fd_info = NULL;
	this->handle_callback_user_map.clear();
}