中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當前位置: 首頁 > news >正文

網(wǎng)站搜索防止攻擊廣告代運營

網(wǎng)站搜索防止攻擊,廣告代運營,沈陽網(wǎng)頁設計制作,折再返怎么 做網(wǎng)站文章目錄 即時通訊重點概括展開聊聊單聊消息流轉流程展開聊聊群聊消息流轉流程群成員管理數(shù)據(jù)庫MySQL連接池設計redis連接池設計文件傳輸原理實時性并發(fā)能力 db_proxy_server reactor響應處理流程單聊消息消息如何封裝?如何保證對端完整解析一幀消息?協(xié)議…

文章目錄

  • 即時通訊重點概括
    • 展開聊聊單聊消息流轉流程
    • 展開聊聊群聊消息流轉流程
    • 群成員管理
    • 數(shù)據(jù)庫
    • MySQL連接池設計
    • redis連接池設計
    • 文件傳輸原理
    • 實時性
    • 并發(fā)能力
  • db_proxy_server reactor響應處理流程
  • 單聊消息
    • 消息如何封裝?如何保證對端完整解析一幀消息?協(xié)議格式?
    • 消息序號(msg_id )為什么使用redis生成?
    • 展開聊聊單聊消息流轉流程
  • 展開聊聊群聊消息流轉流程
    • 群聊消息流轉
    • 怎么保證數(shù)據(jù)的不丟失以及重復包?
    • 消息未讀計數(shù)是怎么實現(xiàn)的?
  • 群成員管理

即時通訊重點概括

展開聊聊單聊消息流轉流程

  • 消息如何封裝
  1. 怎么解決半包、粘包問題?
  2. 消息流轉流程介紹下?
  • 消息序號(msg_id )在哪里生成以及生成方式
  • 怎么保證數(shù)據(jù)的不丟失以及重復包?
  1. 接收端收到數(shù)據(jù)后(收到消息區(qū)別于閱讀消息)如何應答?
  2. 消息發(fā)送后服務器怎么應答?
  3. 消息發(fā)送時的seq有什么作用(業(yè)務層的ack機制)

展開聊聊群聊消息流轉流程

  • 如何推送群聊
  • 群消息計數(shù)器(msg_id )
  • 群會話如何更新(每有一個人發(fā)送消息,則其他人都需要更新會話消息)

群成員管理

  • 如何創(chuàng)建群
  • 如何刪除群
  • 怎么使用redis管理群成員
    消息未讀計數(shù)是怎么實現(xiàn)的?
  • 服務器怎么保留消息未讀計數(shù)(redis 單聊和群聊機制不同)
  • 客戶端的未讀消息計數(shù)從何而來
  • 客戶端未讀消息計數(shù)清0時向服務器發(fā)送了什么?服務器又是怎么清除未讀消息計數(shù)(單聊和群聊機制不同)

數(shù)據(jù)庫

  • 數(shù)據(jù)庫表設計(表達籠統(tǒng))
  • 密碼存儲方式
  • 未讀消息如何體現(xiàn)
  • 聊天消息分表問題(單聊和群聊消息表)
  • 最近會話表

MySQL連接池設計

  • 為什么使用連接池
  • 連接池設置多大合適?

redis連接池設計

  • 為什么使用連接池
  • 連接池設置多大合適?

文件傳輸原理

  • 在線傳輸和離線傳輸有什么區(qū)別

實時性

  • Http(登錄、圖片服務)
  • Socket
  • websocket
    展開聊聊登錄流程

并發(fā)能力

  • 如何做到百萬并發(fā)
  • 如何做到千萬并發(fā)

db_proxy_server reactor響應處理流程

  1. 數(shù)據(jù)入口 reactor CProxyConn:: HandlePduBuf
  2. 怎么初始化epoll+線程池
  3. 任務封裝
  4. 把任務放入線程池
  5. 執(zhí)行任務
  6. 把要回應的數(shù)據(jù)放入回復列表CProxyConn::SendResponsePdulist
  7. epoll所在線程讀取回復列表的數(shù)據(jù)發(fā)給請求端

單聊消息

消息如何封裝?如何保證對端完整解析一幀消息?協(xié)議格式?

  1. 答:消息封裝采用包頭(Header)+包體(Body)的格式。包頭自定義格式如下代碼所示,包體采用protobuf序列化。
typedef struct {uint32_t    length;        // the whole pdu lengthuint16_t    version;       // pdu version numberuint16_t    flag;          // not useduint16_t    service_id;    //uint16_t    command_id;    //uint16_t    seq_num;       // 包序號uint16_t    reversed;      // 保留
} PduHeader_t;
  1. 答:
  • 采用tcp保證數(shù)據(jù)傳輸可靠性
  • 通過包頭的 length 字段標記一幀消息的長度
  • 通過service id 和 command id區(qū)分不同的命令(比如登錄、退出等)
  • 解決數(shù)據(jù)TCP粘包(包頭長度字段)、半包(放入網(wǎng)絡庫的緩沖區(qū))問題
void CImConn::OnRead()
{for (;;){uint32_t free_buf_len = m_in_buf.GetAllocSize() - m_in_buf.GetWriteOffset();if (free_buf_len < READ_BUF_SIZE)m_in_buf.Extend(READ_BUF_SIZE);int ret = netlib_recv(m_handle, m_in_buf.GetBuffer() + m_in_buf.GetWriteOffset(), READ_BUF_SIZE);if (ret <= 0)break;m_recv_bytes += ret;m_in_buf.IncWriteOffset(ret);m_last_recv_tick = get_tick_count();}CImPdu* pPdu = NULL;try{while ( ( pPdu = CImPdu::ReadPdu(m_in_buf.GetBuffer(), m_in_buf.GetWriteOffset()) ) ){uint32_t pdu_len = pPdu->GetLength();HandlePdu(pPdu);m_in_buf.Read(NULL, pdu_len);delete pPdu;pPdu = NULL;
//                        ++g_recv_pkt_cnt;}} catch (CPduException& ex) {log("!!!catch exception, sid=%u, cid=%u, err_code=%u, err_msg=%s, close the connection ",ex.GetServiceId(), ex.GetCommandId(), ex.GetErrorCode(), ex.GetErrorMsg());if (pPdu) {delete pPdu;pPdu = NULL;}OnClose();}
}
message IMMsgData{//cmd id:                0x0301required uint32 from_user_id = 1;    //消息發(fā)送方required uint32 to_session_id = 2;    //消息接受方required uint32 msg_id = 3;required uint32 create_time = 4; required IM.BaseDefine.MsgType msg_type = 5;required bytes msg_data = 6;optional bytes attach_data = 20;
}message IMMsgDataAck{//cmd id:                0x0302required uint32 user_id = 1;    //發(fā)送此信令的用戶idrequired uint32 session_id = 2;                                required uint32 msg_id = 3;required IM.BaseDefine.SessionType session_type = 4;
}

消息序號(msg_id )為什么使用redis生成?

  1. 消息ID(msg_id )的作用是防止消息亂序。
  2. 消息ID(msg_id )為什么這么設計?
    答:msg_id 存儲在 unread 連接池所在的redis數(shù)據(jù)庫。單聊 msg_id 的 key涉及到 nRelateId。nRelateId 從關系表(IMRelationShip :兩個用戶id的映射關系)中獲取。
/***  獲取會話關系ID*  對于群組,必須把nUserBId設置為群ID**  @param nUserAId  <#nUserAId description#>*  @param nUserBId  <#nUserBId description#>*  @param bAdd      <#bAdd description#>*  @param nStatus 0 獲取未被刪除會話,1獲取所有。*/
uint32_t CRelationModel::getRelationId(uint32_t nUserAId, uint32_t nUserBId, bool bAdd)
{uint32_t nRelationId = INVALID_VALUE;if (nUserAId == 0 || nUserBId == 0) {log("invalied user id:%u->%u", nUserAId, nUserBId);return nRelationId;}CDBManager* pDBManager = CDBManager::getInstance();CDBConn* pDBConn = pDBManager->GetDBConn("teamtalk_slave");if (pDBConn){uint32_t nBigId = nUserAId > nUserBId ? nUserAId : nUserBId;uint32_t nSmallId = nUserAId > nUserBId ? nUserBId : nUserAId;string strSql = "select id from IMRelationShip where smallId=" + int2string(nSmallId) + " and bigId="+ int2string(nBigId) + " and status = 0";CResultSet* pResultSet = pDBConn->ExecuteQuery(strSql.c_str());if (pResultSet){while (pResultSet->Next()){nRelationId = pResultSet->GetInt("id");}delete pResultSet;}else{log("there is no result for sql:%s", strSql.c_str());}pDBManager->RelDBConn(pDBConn);if (nRelationId == INVALID_VALUE && bAdd){nRelationId = addRelation(nSmallId, nBigId);}}else{log("no db connection for teamtalk_slave");}return nRelationId;
}
  1. 群聊和單聊msg_id 的區(qū)別:key設置不同。
uint32_t CMessageModel::getMsgId(uint32_t nRelateId)
{uint32_t nMsgId = 0;CacheManager* pCacheManager = CacheManager::getInstance();CacheConn* pCacheConn = pCacheManager->GetCacheConn("unread");if(pCacheConn){string strKey = "msg_id_" + int2string(nRelateId);nMsgId = pCacheConn->incrBy(strKey, 1);pCacheManager->RelCacheConn(pCacheConn);}return nMsgId;
}/***  獲取一個群組的msgId,自增,通過redis控制*  @param nGroupId 群Id*  @return 返回msgId*/
uint32_t CGroupMessageModel::getMsgId(uint32_t nGroupId)
{uint32_t nMsgId = 0;CacheManager* pCacheManager = CacheManager::getInstance();CacheConn* pCacheConn = pCacheManager->GetCacheConn("unread");if(pCacheConn){// TODOstring strKey = "group_msg_id_" + int2string(nGroupId);nMsgId = pCacheConn->incrBy(strKey, 1);pCacheManager->RelCacheConn(pCacheConn);}else{log("no cache connection for unread");}return nMsgId;
}

展開聊聊單聊消息流轉流程

答:兩個用戶A給B發(fā)消息,用戶A把聊天消息封裝好以后發(fā)送給MsgServer;同時把消息進行持久化,將聊天消息發(fā)給這個 DBProxy(數(shù)據(jù)庫代理服務),存儲消息成功后,DBProxyServer組包應答MsgServer,MsgServer收到回復后組包應答Client A。如果 A 和 B 兩個用戶不在同一個 MsgServer 上,那么會通過這個 RouteServer 去中轉Pdu包數(shù)據(jù)(廣播給所有的MsgServer,MsgServer再廣播給Client B),B收到消息后應答MsgServer,至此,流程結束。然后如果是一些熱點數(shù)據(jù),我們同時也會寫Redis。
群聊消息

展開聊聊群聊消息流轉流程

群聊消息流轉

void CGroupChat::HandleGroupMessage(CImPdu* pPdu)
{IM::Message::IMMsgData msg;CHECK_PB_PARSE_MSG(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength()));uint32_t from_user_id = msg.from_user_id();uint32_t to_group_id = msg.to_session_id();string msg_data = msg.msg_data();uint32_t msg_id = msg.msg_id();if (msg_id == 0) {log("HandleGroupMsg, write db failed, %u->%u. ", from_user_id, to_group_id);return;}uint8_t msg_type = msg.msg_type();CDbAttachData attach_data((uchar_t*)msg.attach_data().c_str(), msg.attach_data().length());log("HandleGroupMsg, %u->%u, msg id=%u. ", from_user_id, to_group_id, msg_id);CMsgConn* pFromConn = CImUserManager::GetInstance()->GetMsgConnByHandle(from_user_id,attach_data.GetHandle());if (pFromConn){//接收反饋IM::Message::IMMsgDataAck msg2;msg2.set_user_id(from_user_id);msg2.set_session_id(to_group_id);msg2.set_msg_id(msg_id);msg2.set_session_type(::IM::BaseDefine::SESSION_TYPE_GROUP);CImPdu pdu;pdu.SetPBMsg(&msg2);pdu.SetServiceId(SID_MSG);pdu.SetCommandId(CID_MSG_DATA_ACK);pdu.SetSeqNum(pPdu->GetSeqNum());pFromConn->SendPdu(&pdu);}CRouteServConn* pRouteConn = get_route_serv_conn();if (pRouteConn){pRouteConn->SendPdu(pPdu);}// 服務器沒有群的信息,向DB服務器請求群信息,并帶上消息作為附件,返回時在發(fā)送該消息給其他群成員//IM::BaseDefine::GroupVersionInfo group_version_info;CPduAttachData pduAttachData(ATTACH_TYPE_HANDLE_AND_PDU, attach_data.GetHandle(), pPdu->GetBodyLength(), pPdu->GetBodyData());IM::Group::IMGroupInfoListReq msg3;msg3.set_user_id(from_user_id);IM::BaseDefine::GroupVersionInfo* group_version_info = msg3.add_group_version_list();group_version_info->set_group_id(to_group_id);group_version_info->set_version(0);msg3.set_attach_data(pduAttachData.GetBuffer(), pduAttachData.GetLength());CImPdu pdu;pdu.SetPBMsg(&msg3);pdu.SetServiceId(SID_GROUP);pdu.SetCommandId(CID_GROUP_INFO_REQUEST);CDBServConn* pDbConn = get_db_serv_conn();if(pDbConn){pDbConn->SendPdu(&pdu);}
}
void CGroupChat::HandleGroupInfoResponse(CImPdu* pPdu)
{IM::Group::IMGroupInfoListRsp msg;CHECK_PB_PARSE_MSG(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength()));uint32_t user_id = msg.user_id();uint32_t group_cnt = msg.group_info_list_size();CPduAttachData pduAttachData((uchar_t*)msg.attach_data().c_str(), msg.attach_data().length());log("HandleGroupInfoResponse, user_id=%u, group_cnt=%u. ", user_id, group_cnt);//此處是查詢成員時使用,主要用于群消息從數(shù)據(jù)庫獲得msg_id后進行發(fā)送,一般此時group_cnt = 1if (pduAttachData.GetPduLength() > 0 && group_cnt > 0){IM::BaseDefine::GroupInfo group_info = msg.group_info_list(0);uint32_t group_id = group_info.group_id();log("GroupInfoRequest is send by server, group_id=%u ", group_id);std::set<uint32_t> group_member_set;for (uint32_t i = 0; i < group_info.group_member_list_size(); i++){uint32_t member_user_id = group_info.group_member_list(i);group_member_set.insert(member_user_id);}if (group_member_set.find(user_id) == group_member_set.end()){log("user_id=%u is not in group, group_id=%u. ", user_id, group_id);return;}IM::Message::IMMsgData msg2;CHECK_PB_PARSE_MSG(msg2.ParseFromArray(pduAttachData.GetPdu(), pduAttachData.GetPduLength()));CImPdu pdu;pdu.SetPBMsg(&msg2);pdu.SetServiceId(SID_MSG);pdu.SetCommandId(CID_MSG_DATA);//Push相關IM::Server::IMGroupGetShieldReq msg3;msg3.set_group_id(group_id);msg3.set_attach_data(pdu.GetBodyData(), pdu.GetBodyLength());for (uint32_t i = 0; i < group_info.group_member_list_size(); i++){uint32_t member_user_id = group_info.group_member_list(i);msg3.add_user_id(member_user_id);CImUser* pToImUser = CImUserManager::GetInstance()->GetImUserById(member_user_id);if (pToImUser){CMsgConn* pFromConn = NULL;if( member_user_id == user_id ){uint32_t reqHandle = pduAttachData.GetHandle();if(reqHandle != 0)pFromConn = CImUserManager::GetInstance()->GetMsgConnByHandle(user_id, reqHandle);}pToImUser->BroadcastData(pdu.GetBuffer(), pdu.GetLength(), pFromConn);}}CImPdu pdu2;pdu2.SetPBMsg(&msg3);pdu2.SetServiceId(SID_OTHER);pdu2.SetCommandId(CID_OTHER_GET_SHIELD_REQ);CDBServConn* pDbConn = get_db_serv_conn();if (pDbConn){pDbConn->SendPdu(&pdu2);}}else if (pduAttachData.GetPduLength() == 0){//正常獲取群信息的返回CMsgConn* pConn = CImUserManager::GetInstance()->GetMsgConnByHandle(user_id, pduAttachData.GetHandle());if (pConn){msg.clear_attach_data();pPdu->SetPBMsg(&msg);pConn->SendPdu(pPdu);}}
}

同步群組聊天信息:群會話如何更新(每有一個人發(fā)送消息,則其他人都需要更新會話消息)
分析:
如果和單聊類似,實時更新會有大量操作數(shù)據(jù)庫的成本。
某個群成員發(fā)消息時,存儲消息成功后,會更新群的最新發(fā)言時間。
后續(xù)優(yōu)化,將session放在redis中查詢不方便???所以要分庫分表???

void CGroupModel::updateGroupChat(uint32_t nGroupId)
{CDBManager* pDBManager = CDBManager::getInstance();CDBConn* pDBConn = pDBManager->GetDBConn("teamtalk_master");if(pDBConn){uint32_t nNow = (uint32_t)time(NULL);string strSql = "update IMGroup set lastChated=" + int2string(nNow) + " where id=" + int2string(nGroupId);pDBConn->ExecuteUpdate(strSql.c_str());pDBManager->RelDBConn(pDBConn);}else{log("no db connection for teamtalk_master");}
}

CSyncCenter類,群會話的更新-獨立的線程

CSyncCenter :: doSyncGroupChat

  1. 根據(jù)時間節(jié)點將需要更新會話的群id和最近的聊天時間讀取出來放到map
  2. 保存當前時間到CSyncCenter
  3. 根據(jù)群id從redis讀取群成員,然后遍歷群成員更新會話信息。
/***  開啟內(nèi)網(wǎng)數(shù)據(jù)同步以及群組聊天記錄同步*/
void CSyncCenter::startSync()
{
#ifdef _WIN32(void)CreateThread(NULL, 0, doSyncGroupChat, NULL, 0, &m_nGroupChatThreadId);
#else(void)pthread_create(&m_nGroupChatThreadId, NULL, doSyncGroupChat, NULL);
#endif
}
//談取更新的時間大于之前更新的時間點,把對應的群id-nLastChat讀取出來放到mapChangedGroup存儲
/***  同步群組聊天信息**  @param arg NULL**  @return NULL*/
void* CSyncCenter::doSyncGroupChat(void* arg)
{m_bSyncGroupChatRuning = true;CDBManager* pDBManager = CDBManager::getInstance();map<uint32_t, uint32_t> mapChangedGroup;do{mapChangedGroup.clear();CDBConn* pDBConn = pDBManager->GetDBConn("teamtalk_slave");if(pDBConn){string strSql = "select id, lastChated from IMGroup where status=0 and lastChated >=" + int2string(m_pInstance->getLastUpdateGroup());CResultSet* pResult = pDBConn->ExecuteQuery(strSql.c_str());if(pResult){while (pResult->Next()) {uint32_t nGroupId = pResult->GetInt("id");uint32_t nLastChat = pResult->GetInt("lastChated");if(nLastChat != 0){mapChangedGroup[nGroupId] = nLastChat;}}delete pResult;}pDBManager->RelDBConn(pDBConn);}else{log("no db connection for teamtalk_slave");}m_pInstance->updateLastUpdateGroup(time(NULL));for (auto it=mapChangedGroup.begin(); it!=mapChangedGroup.end(); ++it){uint32_t nGroupId =it->first;list<uint32_t> lsUsers;uint32_t nUpdate = it->second;// 讀取該群的群成員CGroupModel::getInstance()->getGroupUser(nGroupId, lsUsers);//遍歷群成員,更新Sessionfor (auto it1=lsUsers.begin(); it1!=lsUsers.end(); ++it1){uint32_t nUserId = *it1;uint32_t nSessionId = INVALID_VALUE;nSessionId = CSessionModel::getInstance()->getSessionId(nUserId, nGroupId, IM::BaseDefine::SESSION_TYPE_GROUP, true);if(nSessionId != INVALID_VALUE){CSessionModel::getInstance()->updateSession(nSessionId, nUpdate);}else{CSessionModel::getInstance()->addSession(nUserId, nGroupId, IM::BaseDefine::SESSION_TYPE_GROUP);}}}
//    } while (!m_pInstance->m_pCondSync->waitTime(5*1000));} while (m_pInstance->m_bSyncGroupChatWaitting && !(m_pInstance->m_pCondGroupChat->waitTime(5*1000)));
//    } while(m_pInstance->m_bSyncGroupChatWaitting);m_bSyncGroupChatRuning = false;return NULL;
}

怎么保證數(shù)據(jù)的不丟失以及重復包?

  1. 包頭的seq_num字段(包序號),未回復消息列表
  2. 業(yè)務層的ack機制,收到數(shù)據(jù)會回復ack

消息未讀計數(shù)是怎么實現(xiàn)的?

服務器怎么保留消息未讀計數(shù)(單聊和群聊)
7. 單聊和群聊消息未讀計數(shù)機制為什么不同?
答:加入該群很大,有1000人,999個人的未讀消息計數(shù)都+1,效率低下。
8. 單聊消息未讀計數(shù)機制

  • key設計:“unread_” + int2string(nToId);field:int2string(nFromId),value:自增1
  • 發(fā)送消息時,將消息寫入 mysql 消息表成功后,更新redis。
void CMessageModel::incMsgCount(uint32_t nFromId, uint32_t nToId)
{CacheManager* pCacheManager = CacheManager::getInstance();// increase message countCacheConn* pCacheConn = pCacheManager->GetCacheConn("unread");if (pCacheConn) {pCacheConn->hincrBy("unread_" + int2string(nToId), int2string(nFromId), 1);pCacheManager->RelCacheConn(pCacheConn);} else {log("no cache connection to increase unread count: %d->%d", nFromId, nToId);}
}
9. 群聊消息未讀計數(shù)機制
- 群總的消息計數(shù)key設計:int2string(nGroupId) + _im_group_msg;
- field:count
- 群內(nèi)某個成員已經(jīng)讀取的消息計數(shù)key設計:
int2string(nUserId) + "_" + int2string(nGroupId) + _im_user_group
- field:count
所以:群內(nèi)某個成員未讀消息計數(shù) = 群總消息數(shù)量 - 該成員已經(jīng)讀取的消息數(shù)量#define     GROUP_TOTAL_MSG_COUNTER_REDIS_KEY_SUFFIX    "_im_group_msg"
#define     GROUP_USER_MSG_COUNTER_REDIS_KEY_SUFFIX     "_im_user_group"
#define     GROUP_COUNTER_SUBKEY_COUNTER_FIELD          "count"
增加群消息計數(shù)
/***  增加群消息計數(shù)*  @param nUserId  用戶Id*  @param nGroupId 群組Id*  @return 成功返回true,失敗返回false*/
bool CGroupMessageModel::incMessageCount(uint32_t nUserId, uint32_t nGroupId)
{bool bRet = false;CacheManager* pCacheManager = CacheManager::getInstance();CacheConn* pCacheConn = pCacheManager->GetCacheConn("unread");if (pCacheConn){string strGroupKey = int2string(nGroupId) + GROUP_TOTAL_MSG_COUNTER_REDIS_KEY_SUFFIX;pCacheConn->hincrBy(strGroupKey, GROUP_COUNTER_SUBKEY_COUNTER_FIELD, 1);map<string, string> mapGroupCount;bool bRet = pCacheConn->hgetAll(strGroupKey, mapGroupCount);if(bRet){string strUserKey = int2string(nUserId) + "_" + int2string(nGroupId) + GROUP_USER_MSG_COUNTER_REDIS_KEY_SUFFIX;string strReply = pCacheConn->hmset(strUserKey, mapGroupCount);if(!strReply.empty()){bRet = true;}else{log("hmset %s failed !", strUserKey.c_str());}}else{log("hgetAll %s failed!", strGroupKey.c_str());}pCacheManager->RelCacheConn(pCacheConn);}else{log("no cache connection for unread");}return bRet;
}
獲取用戶群未讀消息計數(shù)
/***  獲取用戶群未讀消息計數(shù)*  @param nUserId       用戶Id*  @param nTotalCnt     總條數(shù)*  @param lsUnreadCount 每個會話的未讀信息包含了條數(shù),最后一個消息的Id,最后一個消息的類型,最后一個消息的類容*/
void CGroupMessageModel::getUnreadMsgCount(uint32_t nUserId, uint32_t &nTotalCnt, list<IM::BaseDefine::UnreadInfo>& lsUnreadCount)
{list<uint32_t> lsGroupId;CGroupModel::getInstance()->getUserGroupIds(nUserId, lsGroupId, 0);uint32_t nCount = 0;CacheManager* pCacheManager = CacheManager::getInstance();CacheConn* pCacheConn = pCacheManager->GetCacheConn("unread");if (pCacheConn){for(auto it=lsGroupId.begin(); it!=lsGroupId.end(); ++it){uint32_t nGroupId = *it;string strGroupKey = int2string(nGroupId) + GROUP_TOTAL_MSG_COUNTER_REDIS_KEY_SUFFIX;string strGroupCnt = pCacheConn->hget(strGroupKey, GROUP_COUNTER_SUBKEY_COUNTER_FIELD);if(strGroupCnt.empty()){
//                log("hget %s : count failed !", strGroupKey.c_str());continue;}uint32_t nGroupCnt = (uint32_t)(atoi(strGroupCnt.c_str()));string strUserKey = int2string(nUserId) + "_" + int2string(nGroupId) + GROUP_USER_MSG_COUNTER_REDIS_KEY_SUFFIX;string strUserCnt = pCacheConn->hget(strUserKey, GROUP_COUNTER_SUBKEY_COUNTER_FIELD);uint32_t nUserCnt = ( strUserCnt.empty() ? 0 : ((uint32_t)atoi(strUserCnt.c_str())) );if(nGroupCnt >= nUserCnt) {nCount = nGroupCnt - nUserCnt;}if(nCount > 0){IM::BaseDefine::UnreadInfo cUnreadInfo;cUnreadInfo.set_session_id(nGroupId);cUnreadInfo.set_session_type(IM::BaseDefine::SESSION_TYPE_GROUP);cUnreadInfo.set_unread_cnt(nCount);nTotalCnt += nCount;string strMsgData;uint32_t nMsgId;IM::BaseDefine::MsgType nType;uint32_t nFromId;getLastMsg(nGroupId, nMsgId, strMsgData, nType, nFromId);if(IM::BaseDefine::MsgType_IsValid(nType)){cUnreadInfo.set_latest_msg_id(nMsgId);cUnreadInfo.set_latest_msg_data(strMsgData);cUnreadInfo.set_latest_msg_type(nType);cUnreadInfo.set_latest_msg_from_user_id(nFromId);lsUnreadCount.push_back(cUnreadInfo);}else{log("invalid msgType. userId=%u, groupId=%u, msgType=%u, msgId=%u", nUserId, nGroupId, nType, nMsgId);}}}pCacheManager->RelCacheConn(pCacheConn);}else{log("no cache connection for unread");}
}
清除未讀消息
單聊和群聊清除未讀消息都是調(diào)用如下函數(shù)
1. 單聊直接刪掉key
2. 群聊將該成員的已讀消息數(shù)量設置成群總消息數(shù)量。
m_handler_map.insert(make_pair(uint32_t(CID_MSG_READ_ACK), DB_PROXY::clearUnreadMsgCounter));
void CUserModel::clearUserCounter(uint32_t nUserId, uint32_t nPeerId, IM::BaseDefine::SessionType nSessionType)
{if(IM::BaseDefine::SessionType_IsValid(nSessionType)){CacheManager* pCacheManager = CacheManager::getInstance();CacheConn* pCacheConn = pCacheManager->GetCacheConn("unread");if (pCacheConn){// Clear P2P msg Counterif(nSessionType == IM::BaseDefine::SESSION_TYPE_SINGLE){int nRet = pCacheConn->hdel("unread_" + int2string(nUserId), int2string(nPeerId));if(!nRet){log("hdel failed %d->%d", nPeerId, nUserId);}}// Clear Group msg Counterelse if(nSessionType == IM::BaseDefine::SESSION_TYPE_GROUP){string strGroupKey = int2string(nPeerId) + GROUP_TOTAL_MSG_COUNTER_REDIS_KEY_SUFFIX;map<string, string> mapGroupCount;bool bRet = pCacheConn->hgetAll(strGroupKey, mapGroupCount);if(bRet){string strUserKey = int2string(nUserId) + "_" + int2string(nPeerId) + GROUP_USER_MSG_COUNTER_REDIS_KEY_SUFFIX;string strReply = pCacheConn->hmset(strUserKey, mapGroupCount);if(strReply.empty()) {log("hmset %s failed !", strUserKey.c_str());}}else{log("hgetall %s failed!", strGroupKey.c_str());}}pCacheManager->RelCacheConn(pCacheConn);}else{log("no cache connection for unread");}}else{log("invalid sessionType. userId=%u, fromId=%u, sessionType=%u", nUserId, nPeerId, nSessionType);}
}

群成員管理

  1. 為什么使用 redis 管理群成員?
    答:發(fā)群消息需要通知群成員,群成員很多從redis獲取,提高效率。
  2. 群成員管理使用redis 緩存設計,以hash為存儲結構,
  • key 為 “group_member_”+int2string(nGroupId);
  • field 為 userId(用戶id)
  • Value 為 創(chuàng)建時間:int2string(nCreated)
  1. 加入成員:insertNewMember,插入mysql數(shù)據(jù)庫的同時也插入redis緩存
    /***  修改群成員,增加或刪除**  @param pPdu      收到的packet包指針*  @param conn_uuid 該包過來的socket 描述符*/void modifyMember(CImPdu* pPdu, uint32_t conn_uuid){IM::Group::IMGroupChangeMemberReq msg;IM::Group::IMGroupChangeMemberRsp msgResp;if(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength())){uint32_t nUserId = msg.user_id();uint32_t nGroupId = msg.group_id();IM::BaseDefine::GroupModifyType nType = msg.change_type();if (IM::BaseDefine::GroupModifyType_IsValid(nType) &&CGroupModel::getInstance()->isValidateGroupId(nGroupId)) {CImPdu* pPduRes = new CImPdu;uint32_t nCnt = msg.member_id_list_size();set<uint32_t> setUserId;for(uint32_t i=0; i<nCnt;++i){setUserId.insert(msg.member_id_list(i));}list<uint32_t> lsCurUserId;bool bRet = CGroupModel::getInstance()->modifyGroupMember(nUserId, nGroupId, nType, setUserId, lsCurUserId);msgResp.set_user_id(nUserId);msgResp.set_group_id(nGroupId);msgResp.set_change_type(nType);msgResp.set_result_code(bRet?0:1);if(bRet){for(auto it=setUserId.begin(); it!=setUserId.end(); ++it){msgResp.add_chg_user_id_list(*it);}for(auto it=lsCurUserId.begin(); it!=lsCurUserId.end(); ++it){msgResp.add_cur_user_id_list(*it);}}log("userId=%u, groupId=%u, result=%u, changeCount:%u, currentCount=%u",nUserId, nGroupId,  bRet?0:1, msgResp.chg_user_id_list_size(), msgResp.cur_user_id_list_size());msgResp.set_attach_data(msg.attach_data());pPduRes->SetPBMsg(&msgResp);pPduRes->SetSeqNum(pPdu->GetSeqNum());pPduRes->SetServiceId(IM::BaseDefine::SID_GROUP);pPduRes->SetCommandId(IM::BaseDefine::CID_GROUP_CHANGE_MEMBER_RESPONSE);CProxyConn::AddResponsePdu(conn_uuid, pPduRes);}else{log("invalid groupModifyType or groupId. userId=%u, groupId=%u, groupModifyType=%u", nUserId, nGroupId, nType);}}else{log("parse pb failed");}}
bool CGroupModel::insertNewMember(uint32_t nGroupId, set<uint32_t>& setUsers)
{bool bRet = false;uint32_t nUserCnt = (uint32_t)setUsers.size();if(nGroupId != INVALID_VALUE &&  nUserCnt > 0){CDBManager* pDBManager = CDBManager::getInstance();CDBConn* pDBConn = pDBManager->GetDBConn("teamtalk_slave");if (pDBConn){uint32_t nCreated = (uint32_t)time(NULL);// 獲取 已經(jīng)存在群里的用戶string strClause;bool bFirst = true;for (auto it=setUsers.begin(); it!=setUsers.end(); ++it){if(bFirst){bFirst = false;strClause = int2string(*it);}else{strClause += ("," + int2string(*it));}}string strSql = "select userId from IMGroupMember where groupId=" + int2string(nGroupId) + " and userId in (" + strClause + ")";CResultSet* pResult = pDBConn->ExecuteQuery(strSql.c_str());set<uint32_t> setHasUser;if(pResult){while (pResult->Next()) {setHasUser.insert(pResult->GetInt("userId"));}delete pResult;}else{log("no result for sql:%s", strSql.c_str());}pDBManager->RelDBConn(pDBConn);pDBConn = pDBManager->GetDBConn("teamtalk_master");if (pDBConn){CacheManager* pCacheManager = CacheManager::getInstance();CacheConn* pCacheConn = pCacheManager->GetCacheConn("group_member");if (pCacheConn){// 設置已經(jīng)存在群中人的狀態(tài)if (!setHasUser.empty()){strClause.clear();bFirst = true;for (auto it=setHasUser.begin(); it!=setHasUser.end(); ++it) {if(bFirst){bFirst = false;strClause = int2string(*it);}else{strClause += ("," + int2string(*it));}}strSql = "update IMGroupMember set status=0, updated="+int2string(nCreated)+" where groupId=" + int2string(nGroupId) + " and userId in (" + strClause + ")";pDBConn->ExecuteUpdate(strSql.c_str());}strSql = "insert into IMGroupMember(`groupId`, `userId`, `status`, `created`, `updated`) values\(?,?,?,?,?)";//插入新成員auto it = setUsers.begin();uint32_t nStatus = 0;uint32_t nIncMemberCnt = 0;for (;it != setUsers.end();){uint32_t nUserId = *it;if(setHasUser.find(nUserId) == setHasUser.end()){CPrepareStatement* pStmt = new CPrepareStatement();if (pStmt->Init(pDBConn->GetMysql(), strSql)){uint32_t index = 0;pStmt->SetParam(index++, nGroupId);pStmt->SetParam(index++, nUserId);pStmt->SetParam(index++, nStatus);pStmt->SetParam(index++, nCreated);pStmt->SetParam(index++, nCreated);pStmt->ExecuteUpdate();++nIncMemberCnt;delete pStmt;}else{setUsers.erase(it++);delete pStmt;continue;}}++it;}if(nIncMemberCnt != 0){strSql = "update IMGroup set userCnt=userCnt+" + int2string(nIncMemberCnt) + " where id="+int2string(nGroupId);pDBConn->ExecuteUpdate(strSql.c_str());}//更新一份到redis中string strKey = "group_member_"+int2string(nGroupId);for(auto it = setUsers.begin(); it!=setUsers.end(); ++it){pCacheConn->hset(strKey, int2string(*it), int2string(nCreated));}pCacheManager->RelCacheConn(pCacheConn);bRet = true;}else{log("no cache connection");}pDBManager->RelDBConn(pDBConn);}else{log("no db connection for teamtalk_master");}}else{log("no db connection for teamtalk_slave");}}return bRet;
}
  1. 刪除成員:removeMember;從mysql數(shù)據(jù)庫刪除的同時,也從redis緩存刪除
bool CGroupModel::removeMember(uint32_t nGroupId, set<uint32_t> &setUser, list<uint32_t>& lsCurUserId)
{if(setUser.size() <= 0){return true;}bool bRet = false;CDBManager* pDBManager = CDBManager::getInstance();CDBConn* pDBConn = pDBManager->GetDBConn("teamtalk_master");if(pDBConn){CacheManager* pCacheManager = CacheManager::getInstance();CacheConn* pCacheConn = pCacheManager->GetCacheConn("group_member");if (pCacheConn){string strClause ;bool bFirst = true;for(auto it= setUser.begin(); it!=setUser.end();++it){if (bFirst) {bFirst = false;strClause = int2string(*it);}else{strClause += ("," + int2string(*it));}}string strSql = "update IMGroupMember set status=1 where  groupId =" + int2string(nGroupId) + " and userId in(" + strClause + ")";pDBConn->ExecuteUpdate(strSql.c_str());//從redis中刪除成員string strKey = "group_member_"+ int2string(nGroupId);for (auto it=setUser.begin(); it!=setUser.end(); ++it) {string strField = int2string(*it);pCacheConn->hdel(strKey, strField);}pCacheManager->RelCacheConn(pCacheConn);bRet = true;}else{log("no cache connection");}pDBManager->RelDBConn(pDBConn);if (bRet){getGroupUser(nGroupId,lsCurUserId);}}else{log("no db connection for teamtalk_master");}return bRet;
}

redis連接池設計
6. 為什么使用連接池?
答:對象復用,減小頻繁創(chuàng)建鏈接釋放鏈接的開銷時間。
7. CacheInstances=unread,group_set,token,sync,group_member 5 個連接池
8. 為什么分開不同的db redis?
答:方便擴展。
9. pool_name的意義?
答:抽象,不必關注redis是否分布式。

class CacheManager {
public:virtual ~CacheManager();static CacheManager* getInstance();int Init();CacheConn* GetCacheConn(const char* pool_name);void RelCacheConn(CacheConn* pCacheConn);
private:CacheManager();private:static CacheManager*         s_cache_manager;map<string, CachePool*>        m_cache_pool_map;
};
int CacheManager::Init()
{CConfigFileReader config_file("dbproxyserver.conf");//CacheInstances=unread,group_set,token,sync,group_memberchar* cache_instances = config_file.GetConfigName("CacheInstances");if (!cache_instances) {log("not configure CacheIntance");return 1;}char host[64];char port[64];char db[64];char maxconncnt[64];CStrExplode instances_name(cache_instances, ',');for (uint32_t i = 0; i < instances_name.GetItemCnt(); i++) {char* pool_name = instances_name.GetItem(i);//printf("%s", pool_name);snprintf(host, 64, "%s_host", pool_name);snprintf(port, 64, "%s_port", pool_name);snprintf(db, 64, "%s_db", pool_name);snprintf(maxconncnt, 64, "%s_maxconncnt", pool_name);char* cache_host = config_file.GetConfigName(host);char* str_cache_port = config_file.GetConfigName(port);char* str_cache_db = config_file.GetConfigName(db);char* str_max_conn_cnt = config_file.GetConfigName(maxconncnt);if (!cache_host || !str_cache_port || !str_cache_db || !str_max_conn_cnt) {log("not configure cache instance: %s", pool_name);return 2;}CachePool* pCachePool = new CachePool(pool_name, cache_host, atoi(str_cache_port),atoi(str_cache_db), atoi(str_max_conn_cnt));if (pCachePool->Init()) {log("Init cache pool failed");return 3;}m_cache_pool_map.insert(make_pair(pool_name, pCachePool));}return 0;
}
http://www.risenshineclean.com/news/27042.html

相關文章:

  • 有沒有catia做幕墻的網(wǎng)站色目人
  • 有什么做節(jié)能報告的網(wǎng)站網(wǎng)絡營銷和網(wǎng)上銷售的區(qū)別
  • 石家莊視頻網(wǎng)站建設公司百度大數(shù)據(jù)平臺
  • 怎么用服務器ip做網(wǎng)站百家號優(yōu)化
  • 武漢房價深圳市seo網(wǎng)絡推廣哪家好
  • 大連網(wǎng)站建設設計上海牛巨微seo優(yōu)化
  • php做電影網(wǎng)站有哪些網(wǎng)絡營銷以什么為中心
  • 網(wǎng)站建設衤金手指下拉10搜索引擎優(yōu)化的具體措施
  • 臨沂哪里做網(wǎng)站比較好seo是什么專業(yè)的課程
  • 網(wǎng)站黨組織規(guī)范化建設開展情況網(wǎng)站怎么做出來的
  • 愛做奈官方網(wǎng)站優(yōu)化公司哪家好
  • 怎樣做百度網(wǎng)站推廣百度6大核心部門
  • visual studio做網(wǎng)站哪個公司做網(wǎng)站推廣最好
  • 網(wǎng)站關鍵詞可以添加嗎seo關鍵詞外包
  • 獵頭公司應堅持的原則有超級優(yōu)化空間
  • 東莞建站seo怎么收費的
  • 網(wǎng)站中flash怎么做的游戲推廣怎么做引流
  • 做網(wǎng)站什么商品好軟文網(wǎng)站模板
  • 河田鎮(zhèn)建設局網(wǎng)站百度知道app
  • 建設部網(wǎng)站四庫一平臺best網(wǎng)絡推廣平臺
  • 在公司做網(wǎng)站是什么職位網(wǎng)頁搜索
  • 二手房在哪個網(wǎng)站做合同互聯(lián)網(wǎng)營銷師培訓大綱
  • 敦煌網(wǎng)站銷售員怎么做濰坊今日頭條新聞
  • pb 做網(wǎng)站網(wǎng)站排名怎么優(yōu)化
  • 橙子建站驗證碼是詐騙嗎長沙靠譜seo優(yōu)化
  • 網(wǎng)站建設方案新聞鄭州網(wǎng)絡營銷公司
  • wordpress停用react如何做谷歌seo推廣
  • c 手機網(wǎng)站開發(fā)模板國家衛(wèi)生健康委
  • 信息系統(tǒng)軟件有哪些優(yōu)化的概念
  • 外貿(mào)公司網(wǎng)站模板免費阿里云搜索引擎網(wǎng)址