您的当前位置:首页正文

Springboot使用netty应用tcp协议

2024-11-26 来源:个人技术集锦

介绍:
    Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。也就是说,Netty 是一个基于NIO的客户、服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务开发。“快速”和“简单”并不用产生维护性或性能上的问题。Netty 是一个吸收了多种协议(包括FTP、SMTP、HTTP等各种二进制文本协议)的实现经验,并经过相当精心设计的项目。最终,Netty 成功的找到了一种方式,在保证易于开发的同时还保证了其应用的性能,稳定性和伸缩性

搭建环境: Mac下、Gradle7.2、 Springboot 2.6、Netty 4.1.70、Java8、MagicSocketDebugger(Mac 下免费tcp测试工具)

前景条件:一般java都是http协议,由于项目需要对接一些硬件采用tcp协议,根据厂商的整体模式需要搭建一套服务端用来处理和数据相应,其实不用netty也能做,但是从项目的角度和场景上讲,自己实现这种涉及到几个难点,数据的存储、并发以及高可用性、数据处理等,这些东西如果自己去实现会相当的复杂,每个人观点不同,后来看了一些文档使用了netty,下面介绍一下直接使用的方式。
我们构建一套Springboot环境。

package com.example.tcpservice;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.springframework.stereotype.Component;

import java.net.InetSocketAddress;

@Component
public class NettyServer {

    //
    private EventLoopGroup boss = new NioEventLoopGroup();
    /**
     * work 线程组用于数据处理
     */
    private EventLoopGroup work = new NioEventLoopGroup();

    /**
     * 启动Netty Server
     *
     * @throws InterruptedException
     */
    //@PostConstruct (后续修改成异步)
    public void start(InetSocketAddress inetSocketAddress) {
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(boss, work)
                    // 指定Channel
                    .channel(NioServerSocketChannel.class)
                    //使用指定的端口设置套接字地址
                    .localAddress(inetSocketAddress)
                    // 日志处理 info级别
                    .handler(new LoggingHandler(LogLevel.INFO))
                    //服务端可连接队列数,对应TCP/IP协议listen函数中backlog参数
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    //设置TCP长连接,一般如果两个小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    //将小的数据包包装成更大的帧进行传送,提高网络的负载,即TCP延迟传输
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .childOption(ChannelOption.AUTO_READ, true)
                    // 添加自定义的初始化器
                    .childHandler(new ServerInitializer());
            ;

            System.out.println("--------------------自定义配置成功---------------------");
            ChannelFuture future = bootstrap.bind(inetSocketAddress).sync();
            System.out.println(" tcp服务器开始监听端口:" + inetSocketAddress.getPort());
            if (future.isSuccess()) {
                System.out.println("--------------------启动成功---------------------");
            }
        } catch (Exception e) {
            e.printStackTrace();
            boss.shutdownGracefully();
            work.shutdownGracefully();
        }

    }

    //@PreDestroy (后续修改成异步)
    public void destory() throws InterruptedException {
        boss.shutdownGracefully().sync();
        work.shutdownGracefully().sync();
        System.out.println("----关闭Netty-----");
    }

}

上述代码的描述,具体已经详细写了配置,不做过多的讲解,主要说一下这两个现场boss和work。
为什么要创建两个,其目的是为了解决并发的这个痛点,描述下举一个例子,Netty是一家公司,里面有一个Boss开了一家公司(开启一个服务端口)对外提供服务业务,Boss的公司下有很多员工work,Boss在接受业务的同时会将每个客服分配给其他的员工work。如何公司的业务达到一定的水平非常繁忙,一个work可能会对多个客户进行服务。这就是boss和work的关系。其实最简单的理解就是一个是服务端用来公开的,一个是内部用来处理工作的。

服务端初始化器代码:

package com.example.tcpservice;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

/**
 * @author Leo
 * @desc 设置初始化器,主要是给pipeLine添加Handler
 * @date 2021/12/14  上午10:12
 */
public class ServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline类是ChannelHandler实例对象的链表,用于处理或截获通道的接收和发送数据
        ChannelPipeline pipeline = socketChannel.pipeline();
        // 也可以选择将处理器加到pipeLine的那个位置
        //一些限定和编码解码器
        //pipeline.addLast("HttpServerCodec", new HttpServerCodec());
        //pipeline.addLast(new ChunkedWriteHandler());
        //pipeline.addLast(new ChunkedWriteHandler());
        //pipeline.addLast(new HttpObjectAggregator(1024 * 64));
        //pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));

        //pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
        //pipeline.addLast(new LengthFieldPrepender(4));
        //pipeline.addLast(new CustomHandlerText());
        //pipeline.addLast("decoder", new MyDecoder());
        //pipeline.addLast("encoder", new MyDecoder());

        pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
        pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
        pipeline.addLast(new CustomHandlerString());
        System.out.println("来自" + socketChannel.remoteAddress() + "的新连接接入");
    }

}

客户端初始化(注释掉很多写法),为什么要做这样的一个操作,ChannelInitializer的作用主要用来进行设置出站解码器和入站编码器,实际就是你接受方式的格式和返回方式的格式,用哪些发放接收。

服务端业务处理:
 

package com.example.tcpservice;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.example.tcpservice.dto.DevRegisterRequestDTO;
import com.example.tcpservice.json.JsonUtils;
import com.example.tcpservice.md5.MD5Utils;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.Map;

/**
 * @author Leo
 * @desc 业务处理器
 * @date 2021/11/23  下午4:41
 */
@Component
public class CustomHandlerString extends SimpleChannelInboundHandler<String> {

    // 管理一个全局map,保存连接进服务端的通道数量
    public static Map<String, ChannelHandlerContext> map = new HashMap<String, ChannelHandlerContext>();

    //只有一个客户端连接可以应用这套
    //private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

    /**
     * @author Leo
     * @desc 获取数据
     * @date 2021/12/20  下午3:09
     */
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //String hex= ByteBufUtil.hexDump(((String) msg).getBytes());
        //校验
        if (ctx == null) {
            System.out.println("通道【" + ctx.channel().id() + "】不存在");
            return;
        }
        if (msg == null && msg == "") {
            System.out.println("服务端响应空的消息");
            return;
        }
        System.out.println("---channelRead 客户端---" + ctx.channel().remoteAddress());
        System.out.println("【" + ctx.channel().id() + "】");
        System.out.println("接收到的数据:" + msg);
        //保存
        putChannnelMap(ctx);
        //响应客户端
        this.channelWrite(ctx, msg);
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println("-------------------------------------------------------");
        putChannnelMap(ctx);
    }

    /**
     * @author Leo
     * @desc 写入
     * @date 2021/12/14  下午4:50
     */
    public void channelWrite(ChannelHandlerContext ctx, Object msg) throws Exception {

        JSONObject jsonObject = JSON.parseObject((String) msg);
        String device = jsonObject.getString("Command");
        if (StringUtils.isEmpty(device)) {
            return;
        }

        //todo: 编写业务逻辑  业务主要应用
        if (device.equals("REGISTER")) {
            System.out.println(" --发送--" + JsonUtils.beanToJson(JsonUtils.beanToJson("OK") + "\r\n"));
            //回复数据 写入推送
            ctx.channel().writeAndFlush(JsonUtils.beanToJson("OK") + "\r\n");
            return;
        }

        //将客户端的信息直接返回写入ctx
        //ctx.write(msg);
        //刷新缓存区
        ctx.flush();

    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //clients.add(ctx.channel());
        putChannnelMap(ctx);
    }


    // 客户端连接之后获取客户端channel并放入group中管理
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        System.out.println("---客户端---" + ctx.channel().remoteAddress());
        //clients.add(ctx.channel());
    }

    // 移除对应客户端的channel
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        System.out.println("---客户端---" + ctx.channel().remoteAddress() + "关闭");
        // 长短id
        System.out.println(ctx.channel().id().asLongText());
        System.out.println(ctx.channel().id().asShortText());
        //删除
        removeChannnelMap(ctx);
        //
        //clients.remove(ctx.channel());
    }

    /**
     * @author Leo
     * @desc 异常处理
     * @date 2021/12/23  下午3:57
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("------服务异常关闭----------");
        removeChannnelMap(ctx);
        //有异常就关闭连接
        cause.printStackTrace();
        ctx.close();
    }

    /**
     * @author Leo
     * @desc 新增 todo:这里我们保留的是IP地址, 这个可以保存为业务参数 确定客户端的。
     * @date 2021/12/20  下午3:14
     */
    private void putChannnelMap(ChannelHandlerContext ctx) {
        System.out.println("----写入-----" + ctx.channel().remoteAddress().toString());
        map.put(ctx.channel().remoteAddress().toString(), ctx);
    }
    
    /**
     * @author Leo
     * @desc 删除
     * @date 2021/12/20  下午2:43
     */
    private void removeChannnelMap(ChannelHandlerContext ctx) {
        for (String key : map.keySet()) {
            if (map.get(key) != null && map.get(key).equals(ctx.channel().remoteAddress().toString())) {
                map.remove(key);
            }
        }
    }

}

工具模拟客户端请求服务端:

package com.example.tcpservice.json;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;

import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
 * @author leo
 * @desc Json工具.
 */
public final class JsonUtils {

    private JsonUtils() {
    }

    /**
     * 对象转换为json
     *
     * @param object
     * @return
     */
    public static String beanToJson(Object object) {
        return JSON.toJSONString(object);
    }


}
CustomHandlerString 配置
package com.example.tcpservice.config;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import java.util.HashMap;
import java.util.Map;

/**
 * @author Leo
 * @desc 业务处理器
 * @date 2021/11/23  下午4:41
 */
@Component
public class CustomHandlerString extends SimpleChannelInboundHandler<String> {

    // 管理一个全局map,保存连接进服务端的通道数量
    public static Map<String, ChannelHandlerContext> map = new HashMap<String, ChannelHandlerContext>();

    //只有一个客户端连接可以应用这套
    //private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

    /**
     * @author Leo
     * @desc 获取数据
     * @date 2021/12/20  下午3:09
     */
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //String hex= ByteBufUtil.hexDump(((String) msg).getBytes());
        //校验
        if (ctx == null) {
            System.out.println("通道【" + ctx.channel().id() + "】不存在");
            return;
        }
        if (msg == null && msg == "") {
            System.out.println("服务端响应空的消息");
            return;
        }
        System.out.println("---channelRead 客户端---" + ctx.channel().remoteAddress());
        System.out.println("【" + ctx.channel().id() + "】");
        System.out.println("接收到的数据:" + msg);
        //保存
        putChannnelMap(ctx);
        //响应客户端
        this.channelWrite(ctx, msg);
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println("-------------------------------------------------------");
        putChannnelMap(ctx);
    }

    /**
     * @author Leo
     * @desc 写入
     * @date 2021/12/14  下午4:50
     */
    public void channelWrite(ChannelHandlerContext ctx, Object msg) throws Exception {

        JSONObject jsonObject = JSON.parseObject((String) msg);
        String device = jsonObject.getString("command");
        if (StringUtils.isEmpty(device)) {
            return;
        }


        //将客户端的信息直接返回写入ctx
        //ctx.write(msg);
        //刷新缓存区
        ctx.flush();

    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //clients.add(ctx.channel());
        putChannnelMap(ctx);
    }


    // 客户端连接之后获取客户端channel并放入group中管理
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        System.out.println("---客户端---" + ctx.channel().remoteAddress());
        //clients.add(ctx.channel());
    }

    // 移除对应客户端的channel
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        System.out.println("---客户端---" + ctx.channel().remoteAddress() + "关闭");
        // 长短id
        System.out.println(ctx.channel().id().asLongText());
        System.out.println(ctx.channel().id().asShortText());
        //删除
        removeChannnelMap(ctx);
        //
        //clients.remove(ctx.channel());
    }

    /**
     * @author Leo
     * @desc 异常处理
     * @date 2021/12/23  下午3:57
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("------服务异常关闭----------");
        removeChannnelMap(ctx);
        //有异常就关闭连接
        cause.printStackTrace();
        ctx.close();
    }

    /**
     * @author Leo
     * @desc 新增 todo:这里我们保留的是IP地址, 这个可以保存为业务参数 确定客户端的。
     * @date 2021/12/20  下午3:14
     */
    private void putChannnelMap(ChannelHandlerContext ctx) {
        System.out.println("----写入-----" + ctx.channel().remoteAddress().toString());
        map.put(ctx.channel().remoteAddress().toString(), ctx);
    }
    
    /**
     * @author Leo
     * @desc 删除
     * @date 2021/12/20  下午2:43
     */
    private void removeChannnelMap(ChannelHandlerContext ctx) {
        for (String key : map.keySet()) {
            if (map.get(key) != null && map.get(key).equals(ctx.channel().remoteAddress().toString())) {
                map.remove(key);
            }
        }
    }

}

NettyServer服务端配置

package com.example.tcpservice.config;

import com.example.tcpservice.config.ServerInitializer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.springframework.stereotype.Component;

import java.net.InetSocketAddress;

@Component
public class NettyServer {

    //
    private EventLoopGroup boss = new NioEventLoopGroup();
    /**
     * work 线程组用于数据处理
     */
    private EventLoopGroup work = new NioEventLoopGroup();

    /**
     * 启动Netty Server
     *
     * @throws InterruptedException
     */
    //@PostConstruct
    public void start(InetSocketAddress inetSocketAddress) {
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(boss, work)
                    // 指定Channel
                    .channel(NioServerSocketChannel.class)
                    //使用指定的端口设置套接字地址
                    .localAddress(inetSocketAddress)
                    // 日志处理 info级别
                    .handler(new LoggingHandler(LogLevel.INFO))
                    //服务端可连接队列数,对应TCP/IP协议listen函数中backlog参数
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    //设置TCP长连接,一般如果两个小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    //将小的数据包包装成更大的帧进行传送,提高网络的负载,即TCP延迟传输
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .childOption(ChannelOption.AUTO_READ, true)
                    // 添加自定义的初始化器
                    .childHandler(new ServerInitializer());
            ;

            System.out.println("--------------------自定义配置成功---------------------");
            ChannelFuture future = bootstrap.bind(inetSocketAddress).sync();
            System.out.println(" tcp服务器开始监听端口:" + inetSocketAddress.getPort());
            if (future.isSuccess()) {
                System.out.println("--------------------启动成功---------------------");
            }
        } catch (Exception e) {
            e.printStackTrace();
            boss.shutdownGracefully();
            work.shutdownGracefully();
        }

    }

    //@PreDestroy
    public void destory() throws InterruptedException {
        boss.shutdownGracefully().sync();
        work.shutdownGracefully().sync();
        System.out.println("----关闭Netty-----");
    }

}
ServerInitializer 初始化配置 
package com.example.tcpservice.config;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

/**
 * @author Leo
 * @desc 设置初始化器,主要是给pipeLine添加Handler
 * @date 2021/12/14  上午10:12
 */
public class ServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline类是ChannelHandler实例对象的链表,用于处理或截获通道的接收和发送数据
        ChannelPipeline pipeline = socketChannel.pipeline();
        // 也可以选择将处理器加到pipeLine的那个位置
        //一些限定和编码解码器
        //pipeline.addLast("HttpServerCodec", new HttpServerCodec());
        //pipeline.addLast(new ChunkedWriteHandler());
        //pipeline.addLast(new ChunkedWriteHandler());
        //pipeline.addLast(new HttpObjectAggregator(1024 * 64));
        //pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));

        //pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
        //pipeline.addLast(new LengthFieldPrepender(4));
        //pipeline.addLast(new CustomHandlerText());
        //pipeline.addLast("decoder", new MyDecoder());
        //pipeline.addLast("encoder", new MyDecoder());

        pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
        pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
        pipeline.addLast(new CustomHandlerString());
        System.out.println("来自" + socketChannel.remoteAddress() + "的新连接接入");
    }

}

Controller 控制层 测试代码

package com.example.tcpservice.rest;

import com.example.tcpservice.config.CustomHandlerString;
import com.example.tcpservice.common.json.JsonUtils;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping(value = "/wr")
@Validated
public class Controller {
    /**
     * @author Leo
     * @desc 测试
     * @date 2021/12/20  上午10:49
     */
    @GetMapping(value = "/test")
    public String test() {
        if (CustomHandlerString.map.isEmpty()) {
            return "no";
        }
        CustomHandlerString.map.forEach((k, v) -> {
            System.out.println("key:value = " + k + ":" + v);
            System.out.println("---客户端---" + v.channel().localAddress());
        });
        CustomHandlerString.map.values().stream().
                findFirst().get().channel().
                writeAndFlush(JsonUtils.beanToJson("回复收到"));
        return "ok";
    }

}

启动  TcpServiceApplication

package com.example.tcpservice;

import com.example.tcpservice.config.NettyServer;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import javax.annotation.Resource;
import java.net.InetSocketAddress;

/**
 * @author Leo
 * @desc 启动类
 * @date 2021/12/23  下午2:59
 */
@SpringBootApplication
public class TcpServiceApplication implements CommandLineRunner {

    @Resource
    NettyServer nettyServer;

    public static void main(String[] args) {
        SpringApplication.run(TcpServiceApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        InetSocketAddress tcpAddress = new InetSocketAddress("localhost",  //后续更改为配置文件
                8080);    //后续更改为配置文件
        nettyServer.start(tcpAddress);
    }
}
显示全文