NettyServer.java 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. package cn.com.taiji.beidou.server;
  2. import cn.com.taiji.beidou.handler.WebSocketHandler;
  3. import com.google.common.util.concurrent.ThreadFactoryBuilder;
  4. import io.netty.bootstrap.ServerBootstrap;
  5. import io.netty.channel.ChannelFuture;
  6. import io.netty.channel.ChannelInitializer;
  7. import io.netty.channel.EventLoopGroup;
  8. import io.netty.channel.nio.NioEventLoopGroup;
  9. import io.netty.channel.socket.SocketChannel;
  10. import io.netty.channel.socket.nio.NioServerSocketChannel;
  11. import io.netty.handler.codec.http.HttpObjectAggregator;
  12. import io.netty.handler.codec.http.HttpServerCodec;
  13. import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
  14. import io.netty.handler.codec.serialization.ObjectEncoder;
  15. import io.netty.handler.stream.ChunkedWriteHandler;
  16. import io.netty.handler.timeout.IdleStateHandler;
  17. import lombok.extern.slf4j.Slf4j;
  18. import org.springframework.beans.factory.annotation.Autowired;
  19. import org.springframework.beans.factory.annotation.Value;
  20. import org.springframework.stereotype.Component;
  21. import javax.annotation.PostConstruct;
  22. import javax.annotation.PreDestroy;
  23. import java.net.InetSocketAddress;
  24. import java.util.concurrent.ArrayBlockingQueue;
  25. import java.util.concurrent.ThreadFactory;
  26. import java.util.concurrent.ThreadPoolExecutor;
  27. import java.util.concurrent.TimeUnit;
  28. /**
  29. * @author wudskq
  30. */
  31. @Slf4j
  32. @Component
  33. public class NettyServer {
  34. /**
  35. * webSocket协议名
  36. */
  37. private static final String WEBSOCKET_PROTOCOL = "WebSocket";
  38. /**
  39. * 端口号
  40. */
  41. @Value("${webSocket.netty.port}")
  42. private int port;
  43. /**
  44. * webSocket路径
  45. */
  46. @Value("${webSocket.netty.path}")
  47. private String webSocketPath;
  48. /**
  49. * 在Netty心跳检测中配置 - 读空闲超时时间设置
  50. */
  51. @Value("${webSocket.netty.readerIdleTime}")
  52. private long readerIdleTime;
  53. /**
  54. * 在Netty心跳检测中配置 - 写空闲超时时间设置
  55. */
  56. @Value("${webSocket.netty.writerIdleTime}")
  57. private long writerIdleTime;
  58. /**
  59. * 在Netty心跳检测中配置 - 读写空闲超时时间设置
  60. */
  61. @Value("${webSocket.netty.allIdleTime}")
  62. private long allIdleTime;
  63. @Autowired
  64. private WebSocketHandler webSocketHandler;
  65. private EventLoopGroup bossGroup;
  66. private EventLoopGroup workGroup;
  67. private static final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("thread-pool-netty-ws1").build();
  68. /**
  69. * 启动
  70. * @throws InterruptedException
  71. */
  72. private void start() throws InterruptedException {
  73. bossGroup = new NioEventLoopGroup();
  74. workGroup = new NioEventLoopGroup();
  75. ServerBootstrap bootstrap = new ServerBootstrap();
  76. bootstrap.group(bossGroup,workGroup);
  77. bootstrap.channel(NioServerSocketChannel.class);
  78. // 设置监听端口
  79. bootstrap.localAddress(new InetSocketAddress(port));
  80. bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
  81. @Override
  82. protected void initChannel(SocketChannel ch) throws Exception {
  83. ch.pipeline().addLast(new IdleStateHandler(readerIdleTime, writerIdleTime, allIdleTime, TimeUnit.MINUTES));
  84. // webSocket协议本身是基于http协议的,使用http编解码器
  85. ch.pipeline().addLast(new HttpServerCodec());
  86. ch.pipeline().addLast(new ObjectEncoder());
  87. ch.pipeline().addLast(new ChunkedWriteHandler());
  88. ch.pipeline().addLast(new HttpObjectAggregator(8192));
  89. ch.pipeline().addLast(new WebSocketServerProtocolHandler(webSocketPath, WEBSOCKET_PROTOCOL, true, 65536 * 10));
  90. // 自定义的handler,处理业务逻辑
  91. ch.pipeline().addLast(webSocketHandler);
  92. }
  93. });
  94. ChannelFuture channelFuture = bootstrap.bind().sync();
  95. log.info("Server started and listen on:{}",channelFuture.channel().localAddress());
  96. channelFuture.channel().closeFuture().sync();
  97. }
  98. /**
  99. * 释放资源
  100. * @throws InterruptedException
  101. */
  102. @PreDestroy
  103. public void destroy() throws InterruptedException {
  104. if(bossGroup != null){
  105. bossGroup.shutdownGracefully().sync();
  106. }
  107. if(workGroup != null){
  108. workGroup.shutdownGracefully().sync();
  109. }
  110. }
  111. /**
  112. * 使用线程池初始化netty-websocket服务
  113. */
  114. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2,2,5,
  115. TimeUnit.MINUTES,new ArrayBlockingQueue<>(10),threadFactory,new ThreadPoolExecutor.AbortPolicy());
  116. /**
  117. * 初始化(新线程开启)
  118. */
  119. @PostConstruct()
  120. public void init() {
  121. threadPoolExecutor.execute(new Runnable() {
  122. @Override
  123. public void run() {
  124. try {
  125. start();
  126. } catch (InterruptedException e) {
  127. e.printStackTrace();
  128. }
  129. }
  130. });
  131. }
  132. }