package cn.com.taiji.beidou.server; import cn.com.taiji.beidou.handler.WebSocketHandler; import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.codec.serialization.ObjectEncoder; import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.handler.timeout.IdleStateHandler; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.net.InetSocketAddress; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * @author wudskq */ @Slf4j @Component public class NettyServer { /** * webSocket协议名 */ private static final String WEBSOCKET_PROTOCOL = "WebSocket"; /** * 端口号 */ @Value("${webSocket.netty.port}") private int port; /** * webSocket路径 */ @Value("${webSocket.netty.path}") private String webSocketPath; /** * 在Netty心跳检测中配置 - 读空闲超时时间设置 */ @Value("${webSocket.netty.readerIdleTime}") private long readerIdleTime; /** * 在Netty心跳检测中配置 - 写空闲超时时间设置 */ @Value("${webSocket.netty.writerIdleTime}") private long writerIdleTime; /** * 在Netty心跳检测中配置 - 读写空闲超时时间设置 */ @Value("${webSocket.netty.allIdleTime}") private long allIdleTime; @Autowired private WebSocketHandler webSocketHandler; private EventLoopGroup bossGroup; private EventLoopGroup workGroup; private static final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("thread-pool-netty-ws1").build(); /** * 启动 * @throws InterruptedException */ private void start() throws InterruptedException { bossGroup = new NioEventLoopGroup(); workGroup = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup,workGroup); bootstrap.channel(NioServerSocketChannel.class); // 设置监听端口 bootstrap.localAddress(new InetSocketAddress(port)); bootstrap.childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new IdleStateHandler(readerIdleTime, writerIdleTime, allIdleTime, TimeUnit.MINUTES)); // webSocket协议本身是基于http协议的,使用http编解码器 ch.pipeline().addLast(new HttpServerCodec()); ch.pipeline().addLast(new ObjectEncoder()); ch.pipeline().addLast(new ChunkedWriteHandler()); ch.pipeline().addLast(new HttpObjectAggregator(8192)); ch.pipeline().addLast(new WebSocketServerProtocolHandler(webSocketPath, WEBSOCKET_PROTOCOL, true, 65536 * 10)); // 自定义的handler,处理业务逻辑 ch.pipeline().addLast(webSocketHandler); } }); ChannelFuture channelFuture = bootstrap.bind().sync(); log.info("Server started and listen on:{}",channelFuture.channel().localAddress()); channelFuture.channel().closeFuture().sync(); } /** * 释放资源 * @throws InterruptedException */ @PreDestroy public void destroy() throws InterruptedException { if(bossGroup != null){ bossGroup.shutdownGracefully().sync(); } if(workGroup != null){ workGroup.shutdownGracefully().sync(); } } /** * 使用线程池初始化netty-websocket服务 */ ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2,2,5, TimeUnit.MINUTES,new ArrayBlockingQueue<>(10),threadFactory,new ThreadPoolExecutor.AbortPolicy()); /** * 初始化(新线程开启) */ @PostConstruct() public void init() { threadPoolExecutor.execute(new Runnable() { @Override public void run() { try { start(); } catch (InterruptedException e) { e.printStackTrace(); } } }); } }