您的当前位置:首页正文

netty实现websocket推送消息

2020-01-13 来源:个人技术集锦
netty实现websocket推送消息

前⾔

由于http协议为应答模式的连接,⽆法保持长连接于是引⼊了websocket套接字长连接概念,能够保持数据持久性的交互;本篇⽂章将告知读者如何使⽤netty实现简单的消息推送功能websocket请求头GET / HTTP/1.1

Host: 127.0.0.1:8096Connection: UpgradePragma: no-cache

Cache-Control: no-cache

User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.82 Safari/537.36Upgrade: websocket

Origin: http://localhost:8056Sec-WebSocket-Version: 13

websocket请求头 会有 Connection 升级为 Upgrade, 并且Upgrade 属性值为 websocket引⼊依赖

引⼊netty和 引擎模板依赖

io.netty netty-all 4.1.55.Final

org.springframework.boot

spring-boot-starter-thymeleaf 创建WebSocketServer

创建Nio线程组,并在辅助启动器中中注⼊ ⾃定义处理器;定义套接字端⼝为8096;/**

* @author lsc *

*/

@Slf4j

public class WebSocketServer {

public void init(){

NioEventLoopGroup boss=new NioEventLoopGroup(); NioEventLoopGroup work=new NioEventLoopGroup(); try {

ServerBootstrap bootstrap=new ServerBootstrap(); bootstrap.group(boss,work);

bootstrap.channel(NioServerSocketChannel.class); // ⾃定义处理器

bootstrap.childHandler(new SocketChannelInitializer()); Channel channel = bootstrap.bind(8096).sync().channel();

log.info(\"------------webSocket服务器启动成功-----------:\"+channel); channel.closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace();

log.info(\"---------运⾏出错----------:\"+e); }finally {

boss.shutdownGracefully(); work.shutdownGracefully();

log.info(\"------------websocket服务器已关闭----------------\"); } }}

SocketChannelInitializer

SocketChannelInitializer 中定义了聚合器 HttpObjectAggregator 将多个http⽚段消息聚合成完整的http消息,并且指定⼤⼩为65536;最后注⼊⾃定义的WebSocketHandler;/**

* @author lsc *

*/

public class SocketChannelInitializer extends ChannelInitializer { @Override

protected void initChannel(SocketChannel ch) { //设置log监听器

ch.pipeline().addLast(\"logging\ //设置解码器

ch.pipeline().addLast(\"http-codec\ //聚合器

ch.pipeline().addLast(\"aggregator\ //⽤于⼤数据的分区传输

ch.pipeline().addLast(\"http-chunked\ //⾃定义业务handler

ch.pipeline().addLast(\"handler\ }}

WebSocketHandler

WebSocketHandler 中对接收的消息进⾏判定,如果是websocket 消息 则将消息⼴播给所有通道;/**

* @author lsc *

*/

@Slf4j

public class WebSocketHandler extends SimpleChannelInboundHandler {

// 存放已经连接的通道

private static ConcurrentMap ChannelMap=new ConcurrentHashMap(); @Override

protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof FullHttpRequest){

System.out.println(\"------------收到http消息--------------\"+msg); handleHttpRequest(ctx,(FullHttpRequest)msg); }else if (msg instanceof WebSocketFrame){ //处理websocket客户端的消息

String message = ((TextWebSocketFrame) msg).text();

System.out.println(\"------------收到消息--------------\"+message);

// ctx.channel().writeAndFlush(new TextWebSocketFrame(message)); // 将消息回复给所有连接

Collection values = ChannelMap.values(); for (Channel channel: values){

channel.writeAndFlush(new TextWebSocketFrame(message)); } } }

/**

* @author lsc

*

处理http请求升级

*/

private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception {

// 该请求是不是websocket upgrade请求 if (isWebSocketUpgrade(req)) {

String ws = \"ws://127.0.0.1:8096\";

WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory(ws, null, false); WebSocketServerHandshaker handshaker = factory.newHandshaker(req);

if (handshaker == null) {// 请求头不合法, 导致handshaker没创建成功

WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); } else {

// 响应该请求

handshaker.handshake(ctx.channel(), req); }

return; } }

//n1.GET? 2.Upgrade头 包含websocket字符串?

private boolean isWebSocketUpgrade(FullHttpRequest req) { HttpHeaders headers = req.headers();

return req.method().equals(HttpMethod.GET)

&& headers.get(HttpHeaderNames.UPGRADE).equals(\"websocket\"); }

@Override

public void channelActive(ChannelHandlerContext ctx) throws Exception { //添加连接

log.debug(\"客户端加⼊连接:\"+ctx.channel()); Channel channel = ctx.channel();

ChannelMap.put(channel.id().asShortText(),channel); }

@Override

public void channelInactive(ChannelHandlerContext ctx) throws Exception { //断开连接

log.debug(\"客户端断开连接:\"+ctx.channel()); Channel channel = ctx.channel();

ChannelMap.remove(channel.id().asShortText()); }}

最后将WebSocketServer 注⼊spring监听器,在服务启动的时候运⾏;@Slf4j

@Component

public class ApplicationConfig implements ApplicationListener {

@Override

public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) { WebSocketServer webSocketServer = new WebSocketServer(); webSocketServer.init(); }}

视图转发

编写 IndexController 对视图进⾏转发/**

* @author lsc *

*/

@Controller

public class IndexController {

@GetMapping(\"index\")

public ModelAndView index(){

ModelAndView modelAndView = new ModelAndView(\"socket\"); return modelAndView; }}html

⽤户页⾯

附上配置⽂件server:

port: 8056

spring:

# 引擎模板配置 thymeleaf:

cache: false # 关闭缓存

mode: html # 去除htm5严格校验

prefix: classpath:/templates/ # 指定 thymeleaf 模板路径

encoding: UTF-8 # 指定字符集编码 suffix: .html运⾏服务后

前端页⾯显⽰消息服务端打印消息

源码获取:知识追寻者公众号回复:netty配套教程

因篇幅问题不能全部显示,请点此查看更多更全内容