netty服務(wù)器
Netty配置
管道配置
自定義handler
推送消息接口及實(shí)現(xiàn)類
測(cè)試
學(xué)過(guò) Netty 的都知道,Netty 對(duì) NIO 進(jìn)行了很好的封裝,簡(jiǎn)單的 API,龐大的開源社區(qū)。深受廣大程序員喜愛?;诖吮疚姆窒硪幌禄A(chǔ)的 netty 使用。實(shí)戰(zhàn)制作一個(gè) Netty + websocket 的消息推送小栗子。
netty服務(wù)器
@Component publicclassNettyServer{ staticfinalLoggerlog=LoggerFactory.getLogger(NettyServer.class); /** *端口號(hào) */ @Value("${webSocket.netty.port:8888}") intport; EventLoopGroupbossGroup; EventLoopGroupworkGroup; @Autowired ProjectInitializernettyInitializer; @PostConstruct publicvoidstart()throwsInterruptedException{ newThread(()->{ bossGroup=newNioEventLoopGroup(); workGroup=newNioEventLoopGroup(); ServerBootstrapbootstrap=newServerBootstrap(); //bossGroup輔助客戶端的tcp連接請(qǐng)求,workGroup負(fù)責(zé)與客戶端之前的讀寫操作 bootstrap.group(bossGroup,workGroup); //設(shè)置NIO類型的channel bootstrap.channel(NioServerSocketChannel.class); //設(shè)置監(jiān)聽端口 bootstrap.localAddress(newInetSocketAddress(port)); //設(shè)置管道 bootstrap.childHandler(nettyInitializer); //配置完成,開始綁定server,通過(guò)調(diào)用sync同步方法阻塞直到綁定成功 ChannelFuturechannelFuture=null; try{ channelFuture=bootstrap.bind().sync(); log.info("Serverstartedandlistenon:{}",channelFuture.channel().localAddress()); //對(duì)關(guān)閉通道進(jìn)行監(jiān)聽 channelFuture.channel().closeFuture().sync(); }catch(InterruptedExceptione){ e.printStackTrace(); } }).start(); } /** *釋放資源 */ @PreDestroy publicvoiddestroy()throwsInterruptedException{ if(bossGroup!=null){ bossGroup.shutdownGracefully().sync(); } if(workGroup!=null){ workGroup.shutdownGracefully().sync(); } } }
基于 Spring Boot + MyBatis Plus + Vue & Element 實(shí)現(xiàn)的后臺(tái)管理系統(tǒng) + 用戶小程序,支持 RBAC 動(dòng)態(tài)權(quán)限、多租戶、數(shù)據(jù)權(quán)限、工作流、三方登錄、支付、短信、商城等功能
項(xiàng)目地址:https://gitee.com/zhijiantianya/ruoyi-vue-pro
視頻教程:https://doc.iocoder.cn/video/
Netty配置
管理全局Channel以及用戶對(duì)應(yīng)的channel(推送消息)
publicclassNettyConfig{ /** *定義全局單利channel組管理所有channel */ privatestaticvolatileChannelGroupchannelGroup=null; /** *存放請(qǐng)求ID與channel的對(duì)應(yīng)關(guān)系 */ privatestaticvolatileConcurrentHashMapchannelMap=null; /** *定義兩把鎖 */ privatestaticfinalObjectlock1=newObject(); privatestaticfinalObjectlock2=newObject(); publicstaticChannelGroupgetChannelGroup(){ if(null==channelGroup){ synchronized(lock1){ if(null==channelGroup){ channelGroup=newDefaultChannelGroup(GlobalEventExecutor.INSTANCE); } } } returnchannelGroup; } publicstaticConcurrentHashMap getChannelMap(){ if(null==channelMap){ synchronized(lock2){ if(null==channelMap){ channelMap=newConcurrentHashMap<>(); } } } returnchannelMap; } publicstaticChannelgetChannel(StringuserId){ if(null==channelMap){ returngetChannelMap().get(userId); } returnchannelMap.get(userId); } }
基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 實(shí)現(xiàn)的后臺(tái)管理系統(tǒng) + 用戶小程序,支持 RBAC 動(dòng)態(tài)權(quán)限、多租戶、數(shù)據(jù)權(quán)限、工作流、三方登錄、支付、短信、商城等功能
項(xiàng)目地址:https://gitee.com/zhijiantianya/yudao-cloud
視頻教程:https://doc.iocoder.cn/video/
管道配置
@Component publicclassProjectInitializerextendsChannelInitializer{ /** *webSocket協(xié)議名 */ staticfinalStringWEBSOCKET_PROTOCOL="WebSocket"; /** *webSocket路徑 */ @Value("${webSocket.netty.path:/webSocket}") StringwebSocketPath; @Autowired WebSocketHandlerwebSocketHandler; @Override protectedvoidinitChannel(SocketChannelsocketChannel)throwsException{ //設(shè)置管道 ChannelPipelinepipeline=socketChannel.pipeline(); //流水線管理通道中的處理程序(Handler),用來(lái)處理業(yè)務(wù) //webSocket協(xié)議本身是基于http協(xié)議的,所以這邊也要使用http編解碼器 pipeline.addLast(newHttpServerCodec()); pipeline.addLast(newObjectEncoder()); //以塊的方式來(lái)寫的處理器 pipeline.addLast(newChunkedWriteHandler()); pipeline.addLast(newHttpObjectAggregator(8192)); pipeline.addLast(newWebSocketServerProtocolHandler(webSocketPath,WEBSOCKET_PROTOCOL,true,65536*10)); //自定義的handler,處理業(yè)務(wù)邏輯 pipeline.addLast(webSocketHandler); } }
自定義handler
@Component @ChannelHandler.Sharable publicclassWebSocketHandlerextendsSimpleChannelInboundHandler{ privatestaticfinalLoggerlog=LoggerFactory.getLogger(NettyServer.class); /** *一旦連接,第一個(gè)被執(zhí)行 */ @Override publicvoidhandlerAdded(ChannelHandlerContextctx)throwsException{ log.info("有新的客戶端鏈接:[{}]",ctx.channel().id().asLongText()); //添加到channelGroup通道組 NettyConfig.getChannelGroup().add(ctx.channel()); } /** *讀取數(shù)據(jù) */ @Override protectedvoidchannelRead0(ChannelHandlerContextctx,TextWebSocketFramemsg)throwsException{ log.info("服務(wù)器收到消息:{}",msg.text()); //獲取用戶ID,關(guān)聯(lián)channel JSONObjectjsonObject=JSONUtil.parseObj(msg.text()); Stringuid=jsonObject.getStr("uid"); NettyConfig.getChannelMap().put(uid,ctx.channel()); //將用戶ID作為自定義屬性加入到channel中,方便隨時(shí)channel中獲取用戶ID AttributeKey key=AttributeKey.valueOf("userId"); ctx.channel().attr(key).setIfAbsent(uid); //回復(fù)消息 ctx.channel().writeAndFlush(newTextWebSocketFrame("服務(wù)器收到消息啦")); } @Override publicvoidhandlerRemoved(ChannelHandlerContextctx)throwsException{ log.info("用戶下線了:{}",ctx.channel().id().asLongText()); //刪除通道 NettyConfig.getChannelGroup().remove(ctx.channel()); removeUserId(ctx); } @Override publicvoidexceptionCaught(ChannelHandlerContextctx,Throwablecause)throwsException{ log.info("異常:{}",cause.getMessage()); //刪除通道 NettyConfig.getChannelGroup().remove(ctx.channel()); removeUserId(ctx); ctx.close(); } /** *刪除用戶與channel的對(duì)應(yīng)關(guān)系 */ privatevoidremoveUserId(ChannelHandlerContextctx){ AttributeKey key=AttributeKey.valueOf("userId"); StringuserId=ctx.channel().attr(key).get(); NettyConfig.getChannelMap().remove(userId); } }
推送消息接口及實(shí)現(xiàn)類
publicinterfacePushMsgService{ /** *推送給指定用戶 */ voidpushMsgToOne(StringuserId,Stringmsg); /** *推送給所有用戶 */ voidpushMsgToAll(Stringmsg); } @Service publicclassPushMsgServiceImplimplementsPushMsgService{ @Override publicvoidpushMsgToOne(StringuserId,Stringmsg){ Channelchannel=NettyConfig.getChannel(userId); if(Objects.isNull(channel)){ thrownewRuntimeException("未連接socket服務(wù)器"); } channel.writeAndFlush(newTextWebSocketFrame(msg)); } @Override publicvoidpushMsgToAll(Stringmsg){ NettyConfig.getChannelGroup().writeAndFlush(newTextWebSocketFrame(msg)); } }
測(cè)試
鏈接服務(wù)器
發(fā)送消息
調(diào)用接口,往前端推送消息!
OK!
一個(gè)簡(jiǎn)單的 netty 小栗子就完成了。
-
接口
+關(guān)注
關(guān)注
33文章
8575瀏覽量
151014 -
封裝
+關(guān)注
關(guān)注
126文章
7873瀏覽量
142893 -
服務(wù)器
+關(guān)注
關(guān)注
12文章
9123瀏覽量
85322
原文標(biāo)題:Spring Boot+Netty+Websocket實(shí)現(xiàn)后臺(tái)向前端推送信息
文章出處:【微信號(hào):芋道源碼,微信公眾號(hào):芋道源碼】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
評(píng)論