您的当前位置:首页正文

c++集群聊天服务器

2024-11-23 来源:个人技术集锦


前言

1、项目介绍

本项目,利用moduo网络库,利用js进行网络传输,njinx进行负载均衡,redis实现消息订阅。

项目功能如下:
1、客户端新用户注册
2. 客户端用户登录
3. 添加好友和添加群组
4. 好友聊天
5. 离线消息
6. nginx配置tcp负载均衡
7. 集群聊天系统支持客户端跨服务器通
使用的技术栈如下:
Json序列化和反序列化
muduo网络库开发
nginx源码编译安装和环境部署
nginx的tcp负载均衡器配置
redis缓存服务器编程实践
基于发布-订阅的服务器中间件redis消息队列编程实践
MySQL数据库编程
CMake构建编译环境
Github托管项目

2、项目框架

一、JSON序列化

1、为什么需要json

因为Json是一种轻量级的数据交换格式(也叫数据序列化方式)。Json采用完全独立于编程语言的文本格式来存储和表示数据。简洁和清晰的层次结构使得 Json 成为理想的数据交换语言。 易于人阅读和编写,同时也易于机器解析和生成,并有效地提升网络传输效率。

2、c++ json库

JSON for Modern C++ 是一个由德国大牛 nlohmann 编写的在 C++ 下使用的 JSON 库。
具有以下特点:使用 C++ 11 标准编写,使用 json 像使用 STL 容器一样,STL 和 json 容器之间可以相互转换

3、序列与反序列化例子

首先在文件夹中包含json头文件
然后导入

#include "json.hpp"
using json = nlohmann::json;

json 对象 js 相当于装入key-value的形式 的容器 都打包

json js;
   // 添加数组
   js["id"] = {
   1, 2, 3, 4, 5};
   // 添加key-value
   js["name"] = "zhang san";
   // 添加对象
   js["msg"]["zhang san"] = "hello world";
   js["msg"]["liu shuo"] = "hello china";
   // 上面等同于下面这句一次性添加数组对象
   js["msg"] = {
   {
   "zhang san", "hello world"}, {
   "liu shuo", "hello china"}};
   cout << js << endl;
    // 输出为:
    // {"id":[1,2,3,4,5],"msg":{"liu shuo":"hello china","zhang san":"hello world"},"name":"zhang san"}

   vector<int> vec;
   vec.push_back(1);
   vec.push_back(2);
   vec.push_back(5);
   js["list"] = vec;
   // 直接序列化一个map容器
   map<int, string> m;
   m.insert({
   1, "黄山"});
   m.insert({
   2, "华山"});
   m.insert({
   3, "泰山"});
   js["path"] = m;
   cout << js << endl;
   //{"list":[1,2,5],"path":[[1,"黄山"],[2,"华山"],[3,"泰山"]]}

可以转成字符串以及char数组 网络传输用

    // json 转成字符串 并且转成char 数组
    string s=js.dump();
    cout<<s<<endl;
    cout<<s.c_str()<<endl;

反序列化 利用json::parse(s) 反序列化字符串 可以反序列化嵌套的js

   // 序列化层字符串或者char数组
   string s = js.dump();
   // 模拟从网络接收到json字符串,通过json::parse函数把json字符串专程json对象
   json js2 = json::parse(s);
   // 直接取key-value
   string name = js2["name"];
   cout << "name:" << name << endl;
   // 直接反序列化vector容器
   vector<int> v = js2["list"];
   for (int val : v) {
   
      cout << val << " ";
   }
   cout << endl;
   // 直接反序列化map容器
   map<int, string> m2 = js2["path"];
   for (auto p : m2) {
   
      cout << p.first << " " << p.second << endl;
   }
   cout << endl;
   //    可以总结打印
   cout << js2["id"] << endl;
   auto list = js2["id"];
   cout << list[0] << endl;
   //    嵌套反序列化  json 嵌套json
   //    js["msg"]["zhang san"] = "hello world";
   //    js["msg"]["liu shuo"] = "hello china";
   //    // 上面等同于下面这句一次性添加数组对象
   //    js["msg"] = {
   {"zhang san", "hello world"}, {"liu shuo", "hellochina"}};
   auto magjs = js2["msg"];
   cout << magjs["zhang san"] << endl;

二、nginx负载均衡

使用nginx 的tcp负载均衡模块

配制nginx文件,发的请求都在8000端口,然后负载均衡分发到不同的服务器上,并且配制好机器的端口以及权重这些。max_fails=3 fail_timeout=30s;表示30s发送一次心跳,如果3次检测失败就判定服务器挂掉了。(如果发现客户端闪退,把server下面的连个超时时间去掉)

服务器 气动的端口要要与配置文件一直一致客户端连接8000端口

三、基于发布-订阅的Redis

1、原理

为了保持通信,让各个ChatServer服务器互相之间直接建立TCP连接进行通信,相当于在服务器网络之间进行广播。这样的设计使得各个服务器之间耦合度太高,不利于系统扩展,并且会占用系统大量的socket资源,各服务器之间的带宽压力很大,不能节省资源给更多的客户端提供服务,因此绝对不是一个好的设计。

不需要每个服务器建立连接,只需要与消息队列连接就行。集群部署的服务器之间进行通信,最好的方式就是引入中间件消息队列,解耦各个服务器,使整个系统松耦合,提高服务器的响应能力,节省服务器的带宽资源,

订阅某个消息之后有人发布那种消息,队列就会通知转发给你


线程一直循环监听订阅的上下文,然后如果有就会调用预定的回调函数handler。
解决该项目中的问题 :在同一个server的客户端都可以通过该server的在线用户map找到对应的conn连接,然后进行客户端与客户端的通讯。但是由于负载均衡,所以客户端可能存在于不同的server中,不同的server并不共享存储的在线用户map,所以客户端与客户端的通讯就要通过服务器订阅的方式,比如说这个client1在server1中登录了,那么自由server1才存有client1的conn TCP连接,那么该服务器订阅该client1 的id的通道,每当这个通道有消息(别人客户端发送消息给client1时候就会发不到这个client1 的iid的通道中),server1j就会被推送该消息,然后server1就会对该消息进行处理。

2、实现

定义redis类,要实现redis消息订阅以及消息发布,要有分别负责publish消息和责subscribe消息的同步上下文对象,并且实现发布消息函数以及订阅和取消订阅消息函数。他们都是通过通道channel来实现的。要实现回调操作,收到订阅的消息之后进行的函数操作。_notify_message_handler。在独立线程中接收订阅通道中的消息的函数。

class Redis {
   
  public:
   Redis();
   ~Redis();
   // 连接redis服务器
   bool connect();
   // 向redis指定的通道channel发布消息
   bool publish(int channel, string message);
   // 向redis指定的通道subscribe订阅消息
   bool subscribe(int channel);
   // 向redis指定的通道unsubscribe取消订阅消息
   bool unsubscribe(int channel);
   // 在独立线程中接收订阅通道中的消息
   void observer_channel_message();
   // 初始化向业务层上报通道消息的回调对象   检测到消息 进行回调处理。
   void init_notify_handler(function<void(int, string)> fn);
  private:
   // hiredis同步上下文对象,负责publish消息
   redisContext* _publish_context;
   // hiredis同步上下文对象,负责subscribe消息
   redisContext* _subcribe_context;
   // 回调操作,收到订阅的消息,给service层上报
   function<void(int, string)> _notify_message_handler;
};

实现类

Redis::Redis() : _publish_context(nullptr), _subcribe_context(nullptr) {
   }
Redis::~Redis() {
   
   if (_publish_context != nullptr) {
   
      redisFree(_publish_context);
   }
   if (_subcribe_context != nullptr) {
   
      redisFree(_subcribe_context);
   }
}
bool Redis::connect() {
   
   // 负责publish发布消息的上下文连接
   _publish_context = redisConnect("127.0.0.1", 6379);
   if (nullptr == _publish_context) {
   
      cerr << "connect redis failed!" << endl;
      return false;
   }
   // 负责subscribe订阅消息的上下文连接
   _subcribe_context = redisConnect("127.0.0.1", 6379);
   if (nullptr == _subcribe_context) {
   
      cerr << "connect redis failed!" << endl;
      return false;
   }
   // 在单独的线程中,监听通道上的事件,有消息给业务层进行上报
   thread t([&]() {
    observer_channel_message(); });
   t.detach();
   cout << "connect redis-server success!" << endl;
   return true;
}

// 向redis指定的通道channel发布消息
bool Redis::publish(int channel, string message) {
   
   redisReply* reply = (redisReply*)redisCommand(
       _publish_context, "PUBLISH %d %s", channel, message.
显示全文