123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153 |
- package cn.com.taiji.beidou.server;
- import cn.com.taiji.beidou.handler.WebSocketHandler;
- import com.google.common.util.concurrent.ThreadFactoryBuilder;
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
- import io.netty.handler.codec.http.HttpObjectAggregator;
- import io.netty.handler.codec.http.HttpServerCodec;
- import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
- import io.netty.handler.codec.serialization.ObjectEncoder;
- import io.netty.handler.stream.ChunkedWriteHandler;
- import io.netty.handler.timeout.IdleStateHandler;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.stereotype.Component;
- import javax.annotation.PostConstruct;
- import javax.annotation.PreDestroy;
- import java.net.InetSocketAddress;
- import java.util.concurrent.ArrayBlockingQueue;
- import java.util.concurrent.ThreadFactory;
- import java.util.concurrent.ThreadPoolExecutor;
- import java.util.concurrent.TimeUnit;
- /**
- * @author wudskq
- */
- @Slf4j
- @Component
- public class NettyServer {
- /**
- * webSocket协议名
- */
- private static final String WEBSOCKET_PROTOCOL = "WebSocket";
- /**
- * 端口号
- */
- @Value("${webSocket.netty.port}")
- private int port;
- /**
- * webSocket路径
- */
- @Value("${webSocket.netty.path}")
- private String webSocketPath;
- /**
- * 在Netty心跳检测中配置 - 读空闲超时时间设置
- */
- @Value("${webSocket.netty.readerIdleTime}")
- private long readerIdleTime;
- /**
- * 在Netty心跳检测中配置 - 写空闲超时时间设置
- */
- @Value("${webSocket.netty.writerIdleTime}")
- private long writerIdleTime;
- /**
- * 在Netty心跳检测中配置 - 读写空闲超时时间设置
- */
- @Value("${webSocket.netty.allIdleTime}")
- private long allIdleTime;
- @Autowired
- private WebSocketHandler webSocketHandler;
- private EventLoopGroup bossGroup;
- private EventLoopGroup workGroup;
- private static final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("thread-pool-netty-ws1").build();
- /**
- * 启动
- * @throws InterruptedException
- */
- private void start() throws InterruptedException {
- bossGroup = new NioEventLoopGroup();
- workGroup = new NioEventLoopGroup();
- ServerBootstrap bootstrap = new ServerBootstrap();
- bootstrap.group(bossGroup,workGroup);
- bootstrap.channel(NioServerSocketChannel.class);
- // 设置监听端口
- bootstrap.localAddress(new InetSocketAddress(port));
- bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast(new IdleStateHandler(readerIdleTime, writerIdleTime, allIdleTime, TimeUnit.MINUTES));
- // webSocket协议本身是基于http协议的,使用http编解码器
- ch.pipeline().addLast(new HttpServerCodec());
- ch.pipeline().addLast(new ObjectEncoder());
- ch.pipeline().addLast(new ChunkedWriteHandler());
- ch.pipeline().addLast(new HttpObjectAggregator(8192));
- ch.pipeline().addLast(new WebSocketServerProtocolHandler(webSocketPath, WEBSOCKET_PROTOCOL, true, 65536 * 10));
- // 自定义的handler,处理业务逻辑
- ch.pipeline().addLast(webSocketHandler);
- }
- });
- ChannelFuture channelFuture = bootstrap.bind().sync();
- log.info("Server started and listen on:{}",channelFuture.channel().localAddress());
- channelFuture.channel().closeFuture().sync();
- }
- /**
- * 释放资源
- * @throws InterruptedException
- */
- @PreDestroy
- public void destroy() throws InterruptedException {
- if(bossGroup != null){
- bossGroup.shutdownGracefully().sync();
- }
- if(workGroup != null){
- workGroup.shutdownGracefully().sync();
- }
- }
- /**
- * 使用线程池初始化netty-websocket服务
- */
- ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2,2,5,
- TimeUnit.MINUTES,new ArrayBlockingQueue<>(10),threadFactory,new ThreadPoolExecutor.AbortPolicy());
- /**
- * 初始化(新线程开启)
- */
- @PostConstruct()
- public void init() {
- threadPoolExecutor.execute(new Runnable() {
- @Override
- public void run() {
- try {
- start();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- });
- }
- }
|