Netty
yuankaiqiang Lv5

一、简介

一个高性能、异步事件驱动的NIO框架,它提供了对TCP、UDP和文件传输的支持

  • 使用更高效的socket底层,对epoll空轮询引起的cpu占用飙升在内部进行了处理,避免了直接使用NIO的陷阱,简化了NIO的处理方式
  • 采用多种decoder/encoder 支持,对TCP粘包/分包进行自动化处理
  • 可使用接受/处理线程池,提高连接效率,对重连、心跳检测的简单支持
  • 可配置IO线程数、TCP参数, TCP接收和发送缓冲区使用直接内存代替堆内存,通过内存池的方式循环利用ByteBuf
  • 通过引用计数器及时申请释放不再引用的对象,降低了GC频率
  • 使用单线程串行化的方式,高效的Reactor线程模型
  • 大量使用了volitale、使用了CAS和原子类、线程安全类的使用、读写锁的使用

Netty的特点:

1、异步、NIO的网络通信框架

2、高性能

3、高扩展,高定制性

4、易用性

二、线程模型

2.1 Reactor线程模型

  • 单Reactor线程模型

    1. Reactor 通过 select 监听客户端请求事件,收到事件之后通过 dispatch 进行分发
    2. 如果事件是建立连接的请求事件,则由 Acceptor 通过 accept 处理连接请求,然后创建一个 Handler 对象处理连接建立后的后续业务处理。
    3. 如果事件不是建立连接的请求事件,则由 Reactor 对象分发给连接对应的 Handler 处理。
    4. Handler 会完成 read–>业务处理–>send 的完整处理流程。

    img

    优点:模型简单,没有多线程、进程通信、竞争的问题,一个线程完成所有的事件响应和业务处理

    缺点:存在性能问题,只有一个线程,无法完全发挥多核 CPU 的性能。Handler 在处理某个连接上的业务时,整个进程无法处理其他连接事件,很容易导致性能瓶颈。

    使用场景:客户端的数量有限,业务处理非常快速,比如 Redis 在业务处理的时间复杂度为 O(1)的情况

  • 多线程模型

    1. Reactor 对象通过 select 监听客户端请求事件,收到事件后通过 dispatch 进行分发。
    2. 如果事件是建立连接的请求事件,则由 Acceptor 通过 accept 处理连接请求,然后创建一个 Handler 对象处理连接建立后的后续业务处理。
    3. 如果事件不是建立连接的请求事件,则由 Reactor 对象分发给连接对应的 Handler 处理。Handler 只负责响应事件,不做具体的业务处理,Handler 通过 read 读取到请求数据后,会分发给后面的 Worker 线程池来处理业务请求。
    4. Worker 线程池会分配独立线程来完成真正的业务处理,并将处理结果返回给 Handler。Handler 通过 send 向客户端发送响应数据。

    img

    优点:充分的利用多核 cpu 的处理能力

    缺点:多线程数据共享和控制比较复杂,Reactor 处理所有的事件的监听和响应,在单线程中运行,面对高并发场景还是容易出现性能瓶颈。

  • 主从多线程模型:

    ​ 这种模式也被叫做服务器的 1+M+N 线程模式,即使用该模式开发的服务器包含一个(或多个,1 只是表示相对较少)连接建立线程+M 个 IO 线程+N 个业务处理线程。这是业界成熟的服务器程序设计模式。

    1. Reactor 主线程 MainReactor 对象通过 select 监听客户端连接事件,收到事件后,通过 Acceptor 处理客户端连接事件。
    2. 当 Acceptor 处理完客户端连接事件之后(与客户端建立好 Socket 连接),MainReactor 将连接分配给 SubReactor。(即:MainReactor 只负责监听客户端连接请求,和客户端==建立连接之后将连接交由=SubReactor 监听==后面的 IO 事件。)
    3. SubReactor 将连接加入到自己的连接队列进行监听,并创建 Handler 对各种事件进行处理。
    4. 当连接上有新事件发生的时候,SubReactor 就会调用对应的 Handler 处理。
    5. Handler 通过 read 从连接上读取请求数据,将请求数据分发给 Worker 线程池进行业务处理。
    6. Worker 线程池会分配独立线程来完成真正的业务处理,并将处理结果返回给 Handler。Handler 通过 send 向客户端发送响应数据。
    7. 一个 MainReactor 可以对应多个 SubReactor,即一个 MainReactor 线程可以对应多个 SubReactor 线程。

    优点:

    • MainReactor 线程与 SubReactor 线程的数据交互简单职责明确,MainReactor 线程只需要接收新连接,SubReactor 线程完成后续的业务处理。
    • MainReactor 线程与 SubReactor 线程的数据交互简单, MainReactor 线程只需要把新连接传给 SubReactor 线程,SubReactor 线程无需返回数据。
    • 多个 SubReactor 线程能够应对更高的并发请求。

2.2 Netty的线程模型(主要基于主从 Reactor 多线程模式,并做了一定的改进)

​ Netty通过Reactor模型基于多路复用器接收并处理用户请求,内部实现了两个线程池,boss线程池和work线程池,其中boss线程池的线程负责处理请求的accept事件,当接收到accept事件的请求时,把对应的socket封装到一个NioSocketChannel中,并交给work线程池,其中work线程池负责请求的read和write事件,由对应的Handler处理。

img
  1. Netty 抽象出两组线程池:BossGroup 和 WorkerGroup,也可以叫做 BossNioEventLoopGroup 和 WorkerNioEventLoopGroup。每个线程池中都有 NioEventLoop 线程。BossGroup 中的线程专门负责和客户端建立连接WorkerGroup 中的线程专门负责处理连接上的读写。BossGroup 和 WorkerGroup 的类型都是 NioEventLoopGroup。
  2. NioEventLoopGroup 相当于一个事件循环组,这个组中含有多个事件循环,每个事件循环就是一个 NioEventLoop。
  3. NioEventLoop 表示一个不断循环的执行事件处理的线程,每个 NioEventLoop 都包含一个 Selector,用于监听注册在其上的 Socket 网络连接(Channel)。
  4. NioEventLoopGroup 可以含有多个线程,即可以含有多个 NioEventLoop。
  5. 每个 BossNioEventLoop 中循环执行以下三个步骤:
    1. select:轮训注册在其上的 ServerSocketChannel 的 accept 事件(OP_ACCEPT 事件)
    2. processSelectedKeys:处理 accept 事件,与客户端建立连接,生成一个 NioSocketChannel,并将其注册到某个 WorkerNioEventLoop 上的 Selector 上
    3. runAllTasks:再去以此循环处理任务队列中的其他任务
  6. 每个 WorkerNioEventLoop 中循环执行以下三个步骤:
    1. select:轮训注册在其上的 NioSocketChannel 的 read/write 事件(OP_READ/OP_WRITE 事件)
    2. processSelectedKeys:在对应的 NioSocketChannel 上处理 read/write 事件
    3. runAllTasks:再去以此循环处理任务队列中的其他任务
  7. 在以上两个processSelectedKeys步骤中,会使用 Pipeline(管道),Pipeline 中引用了 Channel,即通过 Pipeline 可以获取到对应的 Channel,Pipeline 中维护了很多的处理器(拦截处理器、过滤处理器、自定义处理器等)。这里暂时不详细展开讲解 Pipeline。

三、TCP的粘包/拆包原因及其解决方法是什么?

为什么会发生TCP粘包、拆包? 发生TCP粘包、拆包主要是由于下面一些原因:

  1. 应用程序写入的数据大于套接字缓冲区大小,这将会发生拆包。
  2. 应用程序写入数据小于套接字缓冲区大小,网卡将应用多次写入的数据发送到网络上,这将会发生粘包。
  3. 进行MSS(最大报文长度)大小的TCP分段,当TCP报文长度-TCP头部长度>MSS的时候将发生拆包。
  4. 接收方法不及时读取套接字缓冲区数据,这将发生粘包。

粘包、拆包解决办法:

TCP本身是面向流的,作为网络服务器,如何从这源源不断涌来的数据流中拆分出或者合并出有意义的信息呢?通常会有以下一些常用的方法:

  1. 发送端给每个数据包添加包首部,首部中应该至少包含数据包的长度,这样接收端在接收到数据后,通过读取包首部的长度字段,便知道每一个数据包的实际长度了。
  2. 发送端将每个数据包封装为固定长度(不够的可以通过补0填充),这样接收端每次从接收缓冲区中读取固定长度的数据就自然而然的把每个数据包拆分开来。
  3. 可以在数据包之间设置边界,如添加特殊符号,这样,接收端通过这个边界就可以将不同的数据包拆分开。

四、Netty的粘包/拆包是怎么处理的,有哪些实现?

  1. 客户端在发送数据包的时候,每个包都固定长度,比如1024个字节大小,如果客户端发送的数据长度不足1024个字节,则通过补充空格的方式补全到指定长度;Netty提供的FixedLengthFrameDecoder

    1
    socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(20));
  2. 客户端在每个包的末尾使用固定的分隔符,例如rn,如果一个包被拆分了,则等待下一个包发送过来之后找到其中的rn,然后对其拆分后的头部部分与前一个包的剩余部分进行合并,这样就得到了一个完整的包;Netty提供LineBasedFrameDecoder与DelimiterBasedFrameDecoder

    LineBasedFrameDecoder的作用主要是通过换行符,即\n或者\r\n对数据进行处理

    1
    2

    socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
  3. 将消息分为头部和消息体,在头部中保存有当前整个消息的长度,只有在读取到足够长度的消息之后才算是读到了一个完整的消息;Netyy提供了LengthFieldBasedFrameDecoder与LengthFieldPrepender

  4. 通过自定义协议进行粘包和拆包的处理。Netty提供了通过实现MessageToByteEncoder和ByteToMessageDecoder来实现

五、同步与异步、阻塞与非阻塞的区别?

简单点理解就是:

  1. 同步,就是我调用一个功能,该功能没有结束前,我死等结果。
  2. 异步,就是我调用一个功能,不需要知道该功能结果,该功能有结果后通知我(回调通知)
  3. 阻塞,就是调用我(函数),我(函数)没有接收完数据或者没有得到结果之前,我不会返回。
  4. 非阻塞,就是调用我(函数),我(函数)立即返回,通过select通知调用者

同步IO和异步IO的区别就在于:数据拷贝的时候进程是否阻塞

阻塞IO和非阻塞IO的区别就在于:应用程序的调用是否立即返回

image.png

六、BIO、NIO、AIO分别是什么?

BIO:同步并阻塞 ,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,当然可以通过线程池机制改善。BIO方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4以前的唯一选择,但程序直观简单易理解。

NIO:同步非阻塞 ,服务器实现模式为一个请求一个线程,即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有I/O请求时才启动一个线程进行处理。NIO方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,并发局限于应用中,编程比较复杂,JDK1.4开始支持。

AIO:异步非阻塞 ,服务器实现模式为一个有效请求一个线程,客户端的I/O请求都是由OS先完成了再通知服务器应用去启动线程进行处理.AIO方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用OS参与并发操作,编程比较复杂,JDK7开始支持。

七、select、poll、epoll的机制及其区别?

  • 单个进程打开的文件描述符(fd文件句柄)不一致

select :有最大连接数限制数为1024,单个进程所能打开的最大连接数由FD_ZETSIZE宏定义。

poll:poll本质上与select没有区别,但是它没有最大连接数的限制,原因是它是基于链表来存储的。

epoll:虽然连接有上限,但是很大,1G内存的机器可以打开10万左右的连接,以此类推。

  • 监听Socket的方式不一致

select :轮询的方式,一个一个的socket检查过去,发现有socket活跃时才进行处理,当线性socket增多时,轮询的速度将会变得很慢,造成线性造成性能下降问题。

poll:对select稍微进行了优化,只是修改了文件描述符,但是监听socket的方式还是轮询。

expoll:epoll内核中实现是根据每个fd上的callback函数来实现的,只有活跃的socket才会主动调用callback,通知expoll来处理这个socket。(会将连接的socket注册到epoll中, 相当于socket的花名册, 如果有一个socket活跃了, 会回调一个函数, 通知epoll,赶紧过来处理)

  • 内存空间拷贝方式(消息传递方式)不一致

select:内核想将消息传递到用户态,需要将数据从内核态拷贝到用户态,这个过程非常的耗时

poll:同上

epoll:epoll的内核和用户空间共享一块内存,因此内存态数据和用户态数据是共享的select、poll、epoll时间复杂度分别是:O(n)、O(n)、O(1)

八、说说Netty的执行流程?

  1. 创建ServerBootStrap实例
  2. 设置并绑定Reactor线程池:EventLoopGroup,EventLoop就是处理所有注册到本线程的Selector上面的Channel
  3. 设置并绑定服务端的channel
  4. 创建处理网络事件的ChannelPipeline和handler,网络时间以流的形式在其中流转,handler完成多数的功能定制:比如编解码 SSl安全认证
  5. 绑定并启动监听端口
  6. 当轮训到准备就绪的channel后,由Reactor线程:NioEventLoop执行pipline中的方法,最终调度并执行channelHandler

image.png

九、Spring中使用

导入依赖

1
2
3
4
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>

1、服务端

Netty启动网上很多示例,核心代码如下,主要是两个线程组bossGroupworkGroup

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public class NettyServerRun {

private static final Logger log = LoggerFactory.getLogger(NettyServerRun.class);

private final EventLoopGroup bossGroup = new NioEventLoopGroup();
private final EventLoopGroup workGroup = new NioEventLoopGroup();
private static Channel channel;

public ChannelFuture start(InetSocketAddress socketAddress) throws Exception {

ChannelFuture future = null;

ServerBootstrap bootstrap = new ServerBootstrap()
//第1步 定义两个线程组,用来处理客户端通道的accept和读写事件
//bossGroup用来处理accept事件,workGroup用来处理通道的读写事件
//bossGroup获取客户端连接,连接接收到之后再将连接转发给workGroup去处理
.group(bossGroup, workGroup)
// 第2步 绑定服务端通道
.channel(NioServerSocketChannel.class)
// 第3步 处理读写事件,ChannelInitializer是给通道初始化
.childHandler(new MyServerCodec())
.localAddress(socketAddress)
/**
* 设置队列大小
* 用于构造服务端套接字ServerSocket对象,标识当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度。
* 用来初始化服务端可连接队列
* 服务端处理客户端连接请求是按顺序处理的,所以同一时间只能处理一个客户端连接,多个客户端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理
*/
.option(ChannelOption.SO_BACKLOG, 1024);
// 两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文
// .childOption(ChannelOption.SO_KEEPALIVE, true);
// 绑定端口,开始接收进来的连接
try {
future = bootstrap.bind(socketAddress).sync();

channel = future.channel();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (future != null && future.isSuccess()) {
log.info(">>>>>> Netty 服务端启动成功,IP:" + socketAddress.getAddress() + ",端口:" + socketAddress.getPort() + " ...");
} else {
log.error(">>>>> Netty 服务端启动失败!");
}
}
return future;
}

/**
* 停止服务
*/
public void destroy() {
if (channel != null) {
channel.close();
workGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
log.info("Netty 服务端关闭!");
}
}
}

ChannelHandlerd的生命周期^4

MyServerCodec里面主要是定义了一些编解码的方式心跳机制消息处理

image-20210628150830432

当没有设置心跳时,加上这个后默认为2小时自动检测一次

image-20210628151123553

当程序启动后,开启另外一个线程进行处理消息,因为这里阻塞了,因此在后面使用接口动态启动服务端与客户端的时候都是另外启动一个线程防止阻塞

image-20210628151338980

MyServerCodec(编解码的方式心跳机制消息处理等,**重点**)

1
2
3
4
5
6
public class MyServerCodec extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
xxx
}
}
  1. 心跳(心跳部分下面那行代码必须放在方法中的第一行,放入后面可能不起作用)

    dleStateHandler的readerIdleTime参数指定超过时间还没收到客户端的连接, 会触发IdleStateEvent事件并且交给下一个handler处理,下一个handler必须实现userEventTriggered方法处理对应事件,

    三个参数:

    • 表示多久没有读(客户端没发送消息),会发送一个心跳检测包检测是否连接
    • 表示多久没有写(服务端没发送消息),会发送一个心跳检测包检测是否连接
    • 多长时间没有读写,会发送一个心跳检测包检测是否连接
    1
    socketChannel.pipeline().addLast(new IdleStateHandler(6, 0, 0, TimeUnit.SECONDS));

    设置上面的时间以后,在消息处理中将会每6秒钟调用下面的userEventTriggered方法,具体实现可以自己根据自己业务情况编写。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    /**
    * 超过空闲时间调用,过了次数后关闭链接
    */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    String ip = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress();
    IdleStateEvent event = (IdleStateEvent) evt;

    String eventType;
    switch (event.state()) {
    case READER_IDLE:
    eventType = "读空闲"; // 如果客户端未发送信息到服务端,那么会触发channel的读空闲
    // 读空闲的计数加1
    readIdleTimes++;
    break;
    case WRITER_IDLE:
    eventType = "写空闲"; // 如果服务端未发送信息到客户端,那么会触发channel的写空闲
    // 不处理
    break;
    case ALL_IDLE:
    eventType = "读写空闲";
    // 不处理
    break;
    default:
    throw new IllegalStateException("非法状态!");
    }
    log.info(">>>>>> " + ctx.channel().remoteAddress() + " 超时事件:" + eventType);
    // 超过3次超时则断开连接
    if (readIdleTimes > 3) {
    log.info(">>>>> Netty 客户端 IP:" + ip + " 连接超时,关闭连接!");
    ctx.channel().close();
    }
    }
  2. 使用Netty自带的编解码

    Netty4自带编解码器详解^1

    Protobuf编解码^2

    1. 字符串编解码

      1
      2
      socketChannel.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
      socketChannel.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
    2. Base64编解码 base64的使用需要在String的基础上,不然消息是无法直接传递

      1
      2
      socketChannel.pipeline().addLast("base64Decoder", new Base64Decoder());
      socketChannel.pipeline().addLast("base64Encoder", new Base64Encoder());
    3. Object编解码,图片文件传输可以使用这种方式,当文件传输时,客户端和服务端都为对象传输,开业直接传递File

      1
      2
      socketChannel.pipeline().addLast("decoder", new ObjectDecoder(ClassResolvers.cacheDisabled(this.getClass().getClassLoader())));
      socketChannel.pipeline().addLast("encoder", new ObjectEncoder());

      发送方式

      image-20210628152952809

      接收方式

      image-20210628153116512

    4. 字节编解码

      1
      2
      socketChannel.pipeline().addLast("decoder", new ByteArrayDecoder());
      socketChannel.pipeline().addLast("encoder", new ByteArrayEncoder());
    5. Protobuf编解码(暂未了解)

  3. 使用自定义的编解码方式

    1
    2
    socketChannel.pipeline().addLast("decoder", new MyByteToMessageDecoder());
    socketChannel.pipeline().addLast("encoder", new MyMessageToByteEncoder());
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    /**
    * @ClassName: MyMessageToByteEncoder
    * @Description: 自定义编码器
    * @Author yuankaiqiang
    * @DateTime 2021-06-27 15:02:26
    */
    public class MyMessageToByteEncoder extends MessageToByteEncoder<ByteBuf>{

    @Override
    protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception {
    out.writeBytes(msg);
    }

    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    /**
    * @ClassName: MyByteToMessageDecoder
    * @Description: 自定义解码器
    * @Author yuankaiqiang
    * @DateTime 2020-11-30 00:14:02
    */
    public class MyByteToMessageDecoder extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {

    byte[] bytes = new byte[in.readableBytes()];
    // 复制内容到字节数组bytes
    in.readBytes(bytes);

    // 转换到对应的类型
    String ms = ConvertFactory.bytesToHexString(bytes);

    out.add(ms);
    }

    }
  4. 消息处理

    1
    socketChannel.pipeline().addLast(new NettyServerHandler());

    主要是继承ChannelInboundHandlerAdapter,具体实现看文章后面的代码链接

    image-20210628153603016

  5. 粘包拆包处理

    粘包拆包处理参考^3

    1. FixedLengthFrameDecoder

      固定长度的粘包和拆包场景,指定长度为20,长度为20时才会接收一次,也可以自定义的包处理,未满20的长度补空格

      1
      socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(20));

      发送第四次才成功

      image-20210628164217480

      image-20210628164240791

    2. LineBasedFrameDecoder

      通过分隔符进行粘包和拆包问题的处理,LineBasedFrameDecoder的作用主要是通过换行符,即\n或者\r\n对数据进行处理

      1
      socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));

      第一次请求发送消息,没有加换行的情况,消息没发送出去

      image-20210628164615657

      加了换行以后发送,可以进行发送,并且,上一次的数据也进行了发送,因此打印了两次,上一次的数据存在缓存中

      image-20210628164716493

      image-20210628164900037

    3. DelimiterBasedFrameDecoder

      将delimiter设置到DelimiterBasedFrameDecoder中,经过该解码一器进行处理之后,源数据将会被按照$_$进行分隔,这里1024指的是分隔的最大长度,即当读取到1024个字节的数据之后,若还是未读取到分隔符,则舍弃当前数据段,因为其很有可能是由于码流紊乱造成的。

      1
      2
      socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(5,
      Unpooled.wrappedBuffer("$_$".getBytes())));

      当发送的数据中以$_$结尾才可以读取到数据,且数据包的内容是剔除调数据包的分隔符$_$

      image-20210628165149281

      image-20210628165342402

      若你设置的大小为20个且结尾为 $_$,当你每一次发送的数据大小小于20个时会存在缓存中等待你下次输入的是否是 $_$,若还不是,则丢弃掉这个包,断开连接(低版本抛出异常),若是则接收之前每次发送的数据包。

      抛出 TooLongFrameException 异常防止由于异常码流缺失分隔符导致内存溢出(亲测 Netty 4.1 版本,服务器并未抛出异常,而是客户端被强制断开连接了)

      image-20210628170317208

      image-20210628170329865

2、客户端

启动方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package com.yuankaiqiang.netty.client;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import com.yuankaiqiang.netty.client.filter.MyClientCodec;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

/**
* @ClassName: NettyClientRun
* @Description: 客户端启动
* @Author yuankaiqiang
* @DateTime 2021-06-27 16:43:03
*/
@Component
public class NettyClientRun {

private static final Logger log = LoggerFactory.getLogger(NettyClientRun.class);

private static ChannelFuture future = null;
private EventLoopGroup group = new NioEventLoopGroup();

public ChannelFuture start(String ip, int port) {


Bootstrap bootstrap = new Bootstrap()
.group(group)
//该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输
.option(ChannelOption.TCP_NODELAY, true)
.channel(NioSocketChannel.class)
.handler(new MyClientCodec());
try {
future = bootstrap.connect(ip, port).sync();
// 第一次连接发送消息
future.channel().writeAndFlush("第一次连接发送消息");

return future;
} catch (InterruptedException e) {
e.printStackTrace();
return future;
}
}

public static ChannelFuture getFuture() {
return future;
}

/**
* 停止服务
*/
public void destroy() {
if (future != null) {
future.channel().close();
group.shutdownGracefully();
log.info("Netty 客户端端关闭!");
}
}
}

MyClientCodec与服务端类似

参考文章链接:

 评论