您的当前位置:首页正文

微信小程序与Netty实现的WebSocket聊天程序

2024-11-30 来源:个人技术集锦

一、微信小程序实现WebSocket客户端程序

1. 界面实现

<input name="url" value="{{url}}" bindinput ="urlInput"/>
<button size='mini' type="warn">断开连接</button>
<button size='mini' type="primary" bindtap="connectSocket">开启连接</button>
<textarea placeholder="输入发送内容" bindinput ="msgInput"></textarea>
<button size='mini' type="primary" bindtap="sendMsg">发送</button>
<view wx:for="{{msgs}}">{{index}}: {{item}}</view>

界面效果:

2. WXS部分

另外还定义了三个函数:
connectSocket:提供了连接WebSocket服务的功能;
msgInput:获取用户输入的内容;
sendMsg:实现了向远程WebSocket发送消息的功能;

Page({
  data: {
    url: 'ws://localhost:8888/ws',
    msgs: [],
    msg: '',
  }
  // 连接WebSocket服务  
  connectSocket() {    
    let _this = this;    
    // 连接websocket服务    
    let task = wx.connectSocket({      
      url: _this.data.url    
    });    
    // 监听websocket消息,并将接收到的消息添加到消息数组msgs中   
    task.onMessage(function(res) {       
      _this.setData({        
        msgs: [..._this.data.msgs, "接收到消息 -> " + res.data]      
      });    
    });    
    // 保存websocket实例     
    _this.setData({       
      socketTask: task,       
      msgs: [..._this.data.msgs,"连接成功!"]    
    });  
  },    
  
  // 获取输入内容,并临时保存在msg中  
  msgInput(e) {    
    this.setData({       
      msg: e.detail.value    
    });  
  },    
  
  // 发送消息  
  sendMsg() {    
    // 1.获取输入内容    
    let msg = this.data.msg;    
    // 2.发送消息到WebSocket服务端    
    this.data.socketTask.send({      
      data: msg    
    });  
  }
})

二、Netty实现WebSocket服务端程序

在从HTTP或HTTPS协议切换到WebSocket时,将会使用一种称为升级握手的机制。因此我们的WebSocket服务端程序将始终以HTTP作为开始,然后再执行升级。其约定为:如果被请求的URL以/ws结尾,那么我们将会把该协议升级为WebSocket;否则,服务器将使用基本的HTTP。当连接升级完毕后,所有数据都将会使用WebSocket进行传输(如下图)。

1. 新建一个Maven工程,并引入Netty依赖

  • 项目目录结构:

  • 引入Netty依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>io.netty</groupId>
    <artifactId>NettyWebSocket</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.48.Final</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

2. 自定义处理器

1)定义一个专门处理Http协议的处理器,当浏览器第一次连接时候会读取首页的html文件,并将html文件内容返回给浏览器展示。

package io.netty.websocket;

import io.netty.channel.*;
import io.netty.handler.codec.http.*;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedNioFile;

import java.io.File;
import java.io.RandomAccessFile;
import java.net.URISyntaxException;
import java.net.URL;

// 处理Http协议的Handler,该Handler只会在第一次客户端连接时候有用。
public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
    private final String wsUri;
    private static final File INDEX;

    static {
        URL location = HttpRequestHandler.class.getProtectionDomain()
                .getCodeSource().getLocation();
        try {
            String path = location.toURI() + "index.html";
            path = !path.contains("file:") ? path : path.substring(5);
            INDEX = new File(path);
        } catch (URISyntaxException e) {
            throw new IllegalStateException("Unable to locate index.html", e);
        }
    }

    public HttpRequestHandler(String wsUri) {
        this.wsUri = wsUri;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
        // 如果被请求的 URL 以/ws 结尾,那么我们将会把该协议升级为 WebSocket。
        if (wsUri.equalsIgnoreCase(request.getUri())) {
            // 将请求传递给下一个ChannelHandler,即WebSocketServerProtocolHandler处理
            // request.retain()会增加引用计数器,以防止资源被释放
            ctx.fireChannelRead(request.retain());
            return;
        }
        handleHttpRequest(ctx, request);
    }

    /**
     * 该方法读取首页html文件内容,然后将内容返回给客户端展示
     * @param ctx
     * @param request
     * @throws Exception
     */
    private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
        // HTTP1.1协议允许客户端先判定服务器是否愿意接受客户端发来的消息主体,以减少由于服务器拒绝请求所带来的额外资源开销
        if (HttpHeaders.is100ContinueExpected(request)) {
            FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
            ctx.writeAndFlush(response);
        }
        // 从resources目录读取index.html文件
        RandomAccessFile file = new RandomAccessFile(INDEX, "r");
        // 准备响应头信息
        HttpResponse response = new DefaultHttpResponse(request.getProtocolVersion(), HttpResponseStatus.OK);
        response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/html; charset=UTF-8");
        boolean keepAlive = HttpHeaders.isKeepAlive(request);
        if (keepAlive) {
            response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, file.length());
            response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
        }
        ctx.write(response);
        // 输出html文件内容
        ctx.write(new ChunkedNioFile(file.getChannel()));
        // 最后发送一个LastHttpContent来标记响应的结束
        ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
        // 如果不是长链接,则在写操作完成后关闭Channel
        if (!keepAlive) {
            future.addListener(ChannelFutureListener.CLOSE);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

2)定义一个处理器,负责处理所有委托管理的WebSocket帧类型以及升级握手本身。如果握手成功,则所需的ChannelHandler将会被添加到ChannelPipeline中,而那些不需要的ChannelHandler会被移除掉。

  • WebSocket升级前的ChannelPipeline:

  • WebSocket升级后的ChannelPipeline:

WebSocket升级完成后,WebSocketServerProtocolHandler会把HttpRequestDecoder替换为WebSocketFrameDecoder,把HttpResponseEncoder替换为WebSocketFrameEncoder。为了性能最大化,WebSocketServerProtocolHandler会移除任何不再被WebSocket连接所需要的ChannelHandler,其中包括 HttpObjectAggregator 和 HttpRequestHandler。

实现代码:

package io.netty.websocket;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;

// 处理WebSocket协议的Handler
public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    private final ChannelGroup channelGroup;

    public TextWebSocketFrameHandler(ChannelGroup channelGroup) {
        this.channelGroup = channelGroup;
    }

    // 用户事件监听,每次客户端连接时候自动触发
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        String content = "Client " + ctx.channel().remoteAddress().toString().substring(1) + " joined";
        System.out.println(content);
        // 如果是握手完成事件,则从Pipeline中删除HttpRequestHandler,并将当前channel添加到ChannelGroup中
        if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) {
            // 从Pipeline中删除HttpRequestHandler
            ctx.pipeline().remove(HttpRequestHandler.class);
            // 通知所有已连接的WebSocket客户端,新的客户端已经连接上了
            TextWebSocketFrame msg = new TextWebSocketFrame(content);
            channelGroup.writeAndFlush(msg);
            // 将WebSocket Channel添加到ChannelGroup中,以便可以它接收所有消息
            channelGroup.add(ctx.channel());
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    // 每次客户端发送消息时执行
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame msg) throws Exception {
        System.out.println("读取到的消息:" + msg.retain());
        // 将读取到的消息写到ChannelGroup中所有已经连接的客户端
        channelGroup.writeAndFlush(msg.retain());
    }
}

上面userEventTriggered方法监听用户事件。当有客户端连接时候,会自动执行该方法。而channelRead0方法负责读取客户端发送过来的消息,然后通过channelGroup将消息输出到所有已连接的客户端。

3. 定义初始化器

定义一个ChannelInitializer的子类,其主要目的是在某个 Channel 注册到 EventLoop 后,对这个 Channel 执行一些初始化操作。

package io.netty.websocket;

import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;

public class ChatServerInitializer extends ChannelInitializer<Channel> {
    private final ChannelGroup channelGroup;

    public ChatServerInitializer(ChannelGroup channelGroup) {
        this.channelGroup = channelGroup;
    }

    @Override
    protected void initChannel(Channel channel) throws Exception {
        ChannelPipeline pipeline = channel.pipeline();
        // 安装编解码器,以实现对HttpRequest、 HttpContent、LastHttp-Content与字节之间的编解码
        pipeline.addLast(new HttpServerCodec());
        // 专门处理写文件的Handler
        pipeline.addLast(new ChunkedWriteHandler());
        // Http聚合器,可以让pipeline中下一个Channel收到完整的HTTP信息
        pipeline.addLast(new HttpObjectAggregator(64 * 1024));
        // 处理Http协议的ChannelHandler,只会在客户端第一次连接时候有用
        pipeline.addLast(new HttpRequestHandler("/ws"));
        // 升级Websocket后,使用该 ChannelHandler 处理Websocket请求
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
        // 安装专门处理 Websocket TextWebSocketFrame 帧的处理器
        pipeline.addLast(new TextWebSocketFrameHandler(channelGroup));
    }
}

4. 创建启动类

package io.netty.websocket;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.concurrent.ImmediateEventExecutor;

import java.net.InetSocketAddress;

public class ChatServer {

    public void start() {
        ChannelGroup channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChatServerInitializer(channelGroup));
            ChannelFuture future = bootstrap.bind(new InetSocketAddress(8888)).syncUninterruptibly();
            System.out.println("Starting ChatServer on port 8888 ...");
            future.channel().closeFuture().syncUninterruptibly();
        } finally {
            channelGroup.close();
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        new ChatServer().start();
    }
}

5. 编写一个html文件

<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8">
    <title>WebSocket Chat</title>
</head>
<body>
<form οnsubmit="return false;">
    <h3>WebSocket 聊天室:</h3>
    <textarea id="responseText" style="width: 500px; height: 300px;"></textarea><br/>
    <input type="text" name="message"  style="width: 300px" value="Hello Netty"/>
    <input type="button" value="发送消息" onclick="send(this.form.message.value)"/>
    <input type="button" value="清空聊天记录" onclick="clearScreen()"/>
</form>
<script type="text/javascript">
    var socket;
    if (!window.WebSocket) {
        window.WebSocket = window.MozWebSocket;
    }
    if (window.WebSocket) {
        socket = new WebSocket("ws://localhost:8888/ws");
        // 注意:使用tls协议通信时候,协议名为wss
        // socket = new WebSocket("wss://localhost:8443/ws");
        socket.onopen = function(event) {
            var ta = document.getElementById('responseText');
            ta.value = "连接开启!";
        };
        socket.onclose = function(event) {
            var ta = document.getElementById('responseText');
            ta.value = ta.value + '\n' + "连接被关闭!";
        };
        socket.onmessage = function(event) {
            var ta = document.getElementById('responseText');
            ta.value = ta.value + '\n' + event.data;
        };
    } else {
        alert("你的浏览器不支持 WebSocket!");
    }

    function send(message) {
        if (!window.WebSocket) {
            return;
        }
        if (socket.readyState == WebSocket.OPEN) {
            socket.send(message);
        } else {
            alert("连接没有开启.");
        }
    }

    function clearScreen() {
        document.getElementById('responseText').value = "";
    }
</script>
</body>
</html>

界面效果:

显示全文