本项目,利用moduo网络库,利用js进行网络传输,njinx进行负载均衡,redis实现消息订阅。
项目功能如下:
1、客户端新用户注册
2. 客户端用户登录
3. 添加好友和添加群组
4. 好友聊天
5. 离线消息
6. nginx配置tcp负载均衡
7. 集群聊天系统支持客户端跨服务器通
使用的技术栈如下:
Json序列化和反序列化
muduo网络库开发
nginx源码编译安装和环境部署
nginx的tcp负载均衡器配置
redis缓存服务器编程实践
基于发布-订阅的服务器中间件redis消息队列编程实践
MySQL数据库编程
CMake构建编译环境
Github托管项目
因为Json是一种轻量级的数据交换格式(也叫数据序列化方式)。Json采用完全独立于编程语言的文本格式来存储和表示数据。简洁和清晰的层次结构使得 Json 成为理想的数据交换语言。 易于人阅读和编写,同时也易于机器解析和生成,并有效地提升网络传输效率。
JSON for Modern C++ 是一个由德国大牛 nlohmann 编写的在 C++ 下使用的 JSON 库。
具有以下特点:使用 C++ 11 标准编写,使用 json 像使用 STL 容器一样,STL 和 json 容器之间可以相互转换
首先在文件夹中包含json头文件
然后导入
#include "json.hpp"
using json = nlohmann::json;
json 对象 js 相当于装入key-value的形式 的容器 都打包
json js;
// 添加数组
js["id"] = {
1, 2, 3, 4, 5};
// 添加key-value
js["name"] = "zhang san";
// 添加对象
js["msg"]["zhang san"] = "hello world";
js["msg"]["liu shuo"] = "hello china";
// 上面等同于下面这句一次性添加数组对象
js["msg"] = {
{
"zhang san", "hello world"}, {
"liu shuo", "hello china"}};
cout << js << endl;
// 输出为:
// {"id":[1,2,3,4,5],"msg":{"liu shuo":"hello china","zhang san":"hello world"},"name":"zhang san"}
vector<int> vec;
vec.push_back(1);
vec.push_back(2);
vec.push_back(5);
js["list"] = vec;
// 直接序列化一个map容器
map<int, string> m;
m.insert({
1, "黄山"});
m.insert({
2, "华山"});
m.insert({
3, "泰山"});
js["path"] = m;
cout << js << endl;
//{"list":[1,2,5],"path":[[1,"黄山"],[2,"华山"],[3,"泰山"]]}
可以转成字符串以及char数组 网络传输用
// json 转成字符串 并且转成char 数组
string s=js.dump();
cout<<s<<endl;
cout<<s.c_str()<<endl;
反序列化 利用json::parse(s) 反序列化字符串 可以反序列化嵌套的js
// 序列化层字符串或者char数组
string s = js.dump();
// 模拟从网络接收到json字符串,通过json::parse函数把json字符串专程json对象
json js2 = json::parse(s);
// 直接取key-value
string name = js2["name"];
cout << "name:" << name << endl;
// 直接反序列化vector容器
vector<int> v = js2["list"];
for (int val : v) {
cout << val << " ";
}
cout << endl;
// 直接反序列化map容器
map<int, string> m2 = js2["path"];
for (auto p : m2) {
cout << p.first << " " << p.second << endl;
}
cout << endl;
// 可以总结打印
cout << js2["id"] << endl;
auto list = js2["id"];
cout << list[0] << endl;
// 嵌套反序列化 json 嵌套json
// js["msg"]["zhang san"] = "hello world";
// js["msg"]["liu shuo"] = "hello china";
// // 上面等同于下面这句一次性添加数组对象
// js["msg"] = {
{"zhang san", "hello world"}, {"liu shuo", "hellochina"}};
auto magjs = js2["msg"];
cout << magjs["zhang san"] << endl;
使用nginx 的tcp负载均衡模块
配制nginx文件,发的请求都在8000端口,然后负载均衡分发到不同的服务器上,并且配制好机器的端口以及权重这些。max_fails=3 fail_timeout=30s;表示30s发送一次心跳,如果3次检测失败就判定服务器挂掉了。(如果发现客户端闪退,把server下面的连个超时时间去掉)
服务器 气动的端口要要与配置文件一直一致客户端连接8000端口
为了保持通信,让各个ChatServer服务器互相之间直接建立TCP连接进行通信,相当于在服务器网络之间进行广播。这样的设计使得各个服务器之间耦合度太高,不利于系统扩展,并且会占用系统大量的socket资源,各服务器之间的带宽压力很大,不能节省资源给更多的客户端提供服务,因此绝对不是一个好的设计。
不需要每个服务器建立连接,只需要与消息队列连接就行。集群部署的服务器之间进行通信,最好的方式就是引入中间件消息队列,解耦各个服务器,使整个系统松耦合,提高服务器的响应能力,节省服务器的带宽资源,
订阅某个消息之后有人发布那种消息,队列就会通知转发给你
线程一直循环监听订阅的上下文,然后如果有就会调用预定的回调函数handler。
解决该项目中的问题 :在同一个server的客户端都可以通过该server的在线用户map找到对应的conn连接,然后进行客户端与客户端的通讯。但是由于负载均衡,所以客户端可能存在于不同的server中,不同的server并不共享存储的在线用户map,所以客户端与客户端的通讯就要通过服务器订阅的方式,比如说这个client1在server1中登录了,那么自由server1才存有client1的conn TCP连接,那么该服务器订阅该client1 的id的通道,每当这个通道有消息(别人客户端发送消息给client1时候就会发不到这个client1 的iid的通道中),server1j就会被推送该消息,然后server1就会对该消息进行处理。
定义redis类,要实现redis消息订阅以及消息发布,要有分别负责publish消息和责subscribe消息的同步上下文对象,并且实现发布消息函数以及订阅和取消订阅消息函数。他们都是通过通道channel来实现的。要实现回调操作,收到订阅的消息之后进行的函数操作。_notify_message_handler。在独立线程中接收订阅通道中的消息的函数。
class Redis {
public:
Redis();
~Redis();
// 连接redis服务器
bool connect();
// 向redis指定的通道channel发布消息
bool publish(int channel, string message);
// 向redis指定的通道subscribe订阅消息
bool subscribe(int channel);
// 向redis指定的通道unsubscribe取消订阅消息
bool unsubscribe(int channel);
// 在独立线程中接收订阅通道中的消息
void observer_channel_message();
// 初始化向业务层上报通道消息的回调对象 检测到消息 进行回调处理。
void init_notify_handler(function<void(int, string)> fn);
private:
// hiredis同步上下文对象,负责publish消息
redisContext* _publish_context;
// hiredis同步上下文对象,负责subscribe消息
redisContext* _subcribe_context;
// 回调操作,收到订阅的消息,给service层上报
function<void(int, string)> _notify_message_handler;
};
实现类
Redis::Redis() : _publish_context(nullptr), _subcribe_context(nullptr) {
}
Redis::~Redis() {
if (_publish_context != nullptr) {
redisFree(_publish_context);
}
if (_subcribe_context != nullptr) {
redisFree(_subcribe_context);
}
}
bool Redis::connect() {
// 负责publish发布消息的上下文连接
_publish_context = redisConnect("127.0.0.1", 6379);
if (nullptr == _publish_context) {
cerr << "connect redis failed!" << endl;
return false;
}
// 负责subscribe订阅消息的上下文连接
_subcribe_context = redisConnect("127.0.0.1", 6379);
if (nullptr == _subcribe_context) {
cerr << "connect redis failed!" << endl;
return false;
}
// 在单独的线程中,监听通道上的事件,有消息给业务层进行上报
thread t([&]() {
observer_channel_message(); });
t.detach();
cout << "connect redis-server success!" << endl;
return true;
}
// 向redis指定的通道channel发布消息
bool Redis::publish(int channel, string message) {
redisReply* reply = (redisReply*)redisCommand(
_publish_context, "PUBLISH %d %s", channel, message.