RM新时代网站-首页

0
  • 聊天消息
  • 系統(tǒng)消息
  • 評(píng)論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會(huì)員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識(shí)你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

netty推送消息接口及實(shí)現(xiàn)

jf_ro2CN3Fa ? 來(lái)源:芋道源碼 ? 作者:芋道源碼 ? 2022-11-02 16:14 ? 次閱讀

netty服務(wù)器

Netty配置

管道配置

自定義handler

推送消息接口及實(shí)現(xiàn)類

測(cè)試

學(xué)過(guò) Netty 的都知道,Netty 對(duì) NIO 進(jìn)行了很好的封裝,簡(jiǎn)單的 API,龐大的開源社區(qū)。深受廣大程序員喜愛?;诖吮疚姆窒硪幌禄A(chǔ)的 netty 使用。實(shí)戰(zhàn)制作一個(gè) Netty + websocket 的消息推送小栗子。

netty服務(wù)器

@Component
publicclassNettyServer{

staticfinalLoggerlog=LoggerFactory.getLogger(NettyServer.class);

/**
*端口號(hào)
*/
@Value("${webSocket.netty.port:8888}")
intport;

EventLoopGroupbossGroup;
EventLoopGroupworkGroup;

@Autowired
ProjectInitializernettyInitializer;

@PostConstruct
publicvoidstart()throwsInterruptedException{
newThread(()->{
bossGroup=newNioEventLoopGroup();
workGroup=newNioEventLoopGroup();
ServerBootstrapbootstrap=newServerBootstrap();
//bossGroup輔助客戶端的tcp連接請(qǐng)求,workGroup負(fù)責(zé)與客戶端之前的讀寫操作
bootstrap.group(bossGroup,workGroup);
//設(shè)置NIO類型的channel
bootstrap.channel(NioServerSocketChannel.class);
//設(shè)置監(jiān)聽端口
bootstrap.localAddress(newInetSocketAddress(port));
//設(shè)置管道
bootstrap.childHandler(nettyInitializer);

//配置完成,開始綁定server,通過(guò)調(diào)用sync同步方法阻塞直到綁定成功
ChannelFuturechannelFuture=null;
try{
channelFuture=bootstrap.bind().sync();
log.info("Serverstartedandlistenon:{}",channelFuture.channel().localAddress());
//對(duì)關(guān)閉通道進(jìn)行監(jiān)聽
channelFuture.channel().closeFuture().sync();
}catch(InterruptedExceptione){
e.printStackTrace();
}
}).start();
}

/**
*釋放資源
*/
@PreDestroy
publicvoiddestroy()throwsInterruptedException{
if(bossGroup!=null){
bossGroup.shutdownGracefully().sync();
}
if(workGroup!=null){
workGroup.shutdownGracefully().sync();
}
}
}

基于 Spring Boot + MyBatis Plus + Vue & Element 實(shí)現(xiàn)的后臺(tái)管理系統(tǒng) + 用戶小程序,支持 RBAC 動(dòng)態(tài)權(quán)限、多租戶、數(shù)據(jù)權(quán)限、工作流、三方登錄、支付、短信、商城等功能

項(xiàng)目地址:https://gitee.com/zhijiantianya/ruoyi-vue-pro

視頻教程:https://doc.iocoder.cn/video/

Netty配置

管理全局Channel以及用戶對(duì)應(yīng)的channel(推送消息)

publicclassNettyConfig{

/**
*定義全局單利channel組管理所有channel
*/
privatestaticvolatileChannelGroupchannelGroup=null;

/**
*存放請(qǐng)求ID與channel的對(duì)應(yīng)關(guān)系
*/
privatestaticvolatileConcurrentHashMapchannelMap=null;

/**
*定義兩把鎖
*/
privatestaticfinalObjectlock1=newObject();
privatestaticfinalObjectlock2=newObject();


publicstaticChannelGroupgetChannelGroup(){
if(null==channelGroup){
synchronized(lock1){
if(null==channelGroup){
channelGroup=newDefaultChannelGroup(GlobalEventExecutor.INSTANCE);
}
}
}
returnchannelGroup;
}

publicstaticConcurrentHashMapgetChannelMap(){
if(null==channelMap){
synchronized(lock2){
if(null==channelMap){
channelMap=newConcurrentHashMap<>();
}
}
}
returnchannelMap;
}

publicstaticChannelgetChannel(StringuserId){
if(null==channelMap){
returngetChannelMap().get(userId);
}
returnchannelMap.get(userId);
}
}

基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 實(shí)現(xiàn)的后臺(tái)管理系統(tǒng) + 用戶小程序,支持 RBAC 動(dòng)態(tài)權(quán)限、多租戶、數(shù)據(jù)權(quán)限、工作流、三方登錄、支付、短信、商城等功能

項(xiàng)目地址:https://gitee.com/zhijiantianya/yudao-cloud

視頻教程:https://doc.iocoder.cn/video/

管道配置

@Component
publicclassProjectInitializerextendsChannelInitializer{

/**
*webSocket協(xié)議名
*/
staticfinalStringWEBSOCKET_PROTOCOL="WebSocket";

/**
*webSocket路徑
*/
@Value("${webSocket.netty.path:/webSocket}")
StringwebSocketPath;
@Autowired
WebSocketHandlerwebSocketHandler;

@Override
protectedvoidinitChannel(SocketChannelsocketChannel)throwsException{
//設(shè)置管道
ChannelPipelinepipeline=socketChannel.pipeline();
//流水線管理通道中的處理程序(Handler),用來(lái)處理業(yè)務(wù)
//webSocket協(xié)議本身是基于http協(xié)議的,所以這邊也要使用http編解碼器
pipeline.addLast(newHttpServerCodec());
pipeline.addLast(newObjectEncoder());
//以塊的方式來(lái)寫的處理器
pipeline.addLast(newChunkedWriteHandler());
pipeline.addLast(newHttpObjectAggregator(8192));
pipeline.addLast(newWebSocketServerProtocolHandler(webSocketPath,WEBSOCKET_PROTOCOL,true,65536*10));
//自定義的handler,處理業(yè)務(wù)邏輯
pipeline.addLast(webSocketHandler);
}
}

自定義handler

@Component
@ChannelHandler.Sharable
publicclassWebSocketHandlerextendsSimpleChannelInboundHandler{
privatestaticfinalLoggerlog=LoggerFactory.getLogger(NettyServer.class);

/**
*一旦連接,第一個(gè)被執(zhí)行
*/
@Override
publicvoidhandlerAdded(ChannelHandlerContextctx)throwsException{
log.info("有新的客戶端鏈接:[{}]",ctx.channel().id().asLongText());
//添加到channelGroup通道組
NettyConfig.getChannelGroup().add(ctx.channel());
}

/**
*讀取數(shù)據(jù)
*/
@Override
protectedvoidchannelRead0(ChannelHandlerContextctx,TextWebSocketFramemsg)throwsException{
log.info("服務(wù)器收到消息:{}",msg.text());

//獲取用戶ID,關(guān)聯(lián)channel
JSONObjectjsonObject=JSONUtil.parseObj(msg.text());
Stringuid=jsonObject.getStr("uid");
NettyConfig.getChannelMap().put(uid,ctx.channel());

//將用戶ID作為自定義屬性加入到channel中,方便隨時(shí)channel中獲取用戶ID
AttributeKeykey=AttributeKey.valueOf("userId");
ctx.channel().attr(key).setIfAbsent(uid);

//回復(fù)消息
ctx.channel().writeAndFlush(newTextWebSocketFrame("服務(wù)器收到消息啦"));
}

@Override
publicvoidhandlerRemoved(ChannelHandlerContextctx)throwsException{
log.info("用戶下線了:{}",ctx.channel().id().asLongText());
//刪除通道
NettyConfig.getChannelGroup().remove(ctx.channel());
removeUserId(ctx);
}

@Override
publicvoidexceptionCaught(ChannelHandlerContextctx,Throwablecause)throwsException{
log.info("異常:{}",cause.getMessage());
//刪除通道
NettyConfig.getChannelGroup().remove(ctx.channel());
removeUserId(ctx);
ctx.close();
}

/**
*刪除用戶與channel的對(duì)應(yīng)關(guān)系
*/
privatevoidremoveUserId(ChannelHandlerContextctx){
AttributeKeykey=AttributeKey.valueOf("userId");
StringuserId=ctx.channel().attr(key).get();
NettyConfig.getChannelMap().remove(userId);
}
}

推送消息接口及實(shí)現(xiàn)類

publicinterfacePushMsgService{

/**
*推送給指定用戶
*/
voidpushMsgToOne(StringuserId,Stringmsg);

/**
*推送給所有用戶
*/
voidpushMsgToAll(Stringmsg);

}
@Service
publicclassPushMsgServiceImplimplementsPushMsgService{

@Override
publicvoidpushMsgToOne(StringuserId,Stringmsg){
Channelchannel=NettyConfig.getChannel(userId);
if(Objects.isNull(channel)){
thrownewRuntimeException("未連接socket服務(wù)器");
}

channel.writeAndFlush(newTextWebSocketFrame(msg));
}

@Override
publicvoidpushMsgToAll(Stringmsg){
NettyConfig.getChannelGroup().writeAndFlush(newTextWebSocketFrame(msg));
}
}

測(cè)試

7c253b2c-573d-11ed-a3b6-dac502259ad0.png

鏈接服務(wù)器

7c607976-573d-11ed-a3b6-dac502259ad0.png7c8095ee-573d-11ed-a3b6-dac502259ad0.png

發(fā)送消息

7cb7271c-573d-11ed-a3b6-dac502259ad0.png7dd7ed98-573d-11ed-a3b6-dac502259ad0.png

調(diào)用接口,往前端推送消息!

7e0179ba-573d-11ed-a3b6-dac502259ad0.png7e1e56de-573d-11ed-a3b6-dac502259ad0.png

OK!

一個(gè)簡(jiǎn)單的 netty 小栗子就完成了。

審核編輯:彭靜
聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點(diǎn)僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場(chǎng)。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問(wèn)題,請(qǐng)聯(lián)系本站處理。 舉報(bào)投訴
  • 接口
    +關(guān)注

    關(guān)注

    33

    文章

    8575

    瀏覽量

    151014
  • 封裝
    +關(guān)注

    關(guān)注

    126

    文章

    7873

    瀏覽量

    142893
  • 服務(wù)器
    +關(guān)注

    關(guān)注

    12

    文章

    9123

    瀏覽量

    85322

原文標(biāo)題:Spring Boot+Netty+Websocket實(shí)現(xiàn)后臺(tái)向前端推送信息

文章出處:【微信號(hào):芋道源碼,微信公眾號(hào):芋道源碼】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。

收藏 人收藏

    評(píng)論

    相關(guān)推薦

    基于多路復(fù)用模型的Netty框架

    Netty version: 4.1.55.Final 傳統(tǒng)的IO模型的web容器,比如老版本的Tomcat,為了增加系統(tǒng)的吞吐量,需要不斷增加系統(tǒng)核心線程數(shù)量,或者通過(guò)水平擴(kuò)展服務(wù)器數(shù)量,來(lái)增加
    的頭像 發(fā)表于 09-30 11:30 ?822次閱讀

    基于阿里云移動(dòng)推送的移動(dòng)應(yīng)用推送模式最佳實(shí)踐

    推送系統(tǒng)自動(dòng)分配,通過(guò)接口獲取2.2 賬號(hào)與deviceID一一對(duì)應(yīng),對(duì)于同一設(shè)備切換賬號(hào)的場(chǎng)景,通過(guò)重新綁定賬號(hào)實(shí)現(xiàn)2.3 別名一個(gè)deviceID可以對(duì)應(yīng)多個(gè)別名別名是用戶粒度的概念,建議用于單推
    發(fā)表于 03-02 11:48

    如何實(shí)現(xiàn)服務(wù)器自動(dòng)推送消息?

    有個(gè)想法,想和大家探討一下如何實(shí)現(xiàn)。功能:自己在本地寫個(gè)日志,第二天自動(dòng)推送到手機(jī)上想法:想法不成熟,因?yàn)橹R(shí)面太少了,目前想的是自己在本地電腦寫個(gè)日志,上傳給服務(wù)器,服務(wù)器第二天定時(shí)推送到指定郵箱上。想知道如何
    發(fā)表于 03-16 11:34

    怎么去理解netty

    導(dǎo)讀原創(chuàng)文章,轉(zhuǎn)載請(qǐng)注明出處。本文源碼地址:netty-source-code-analysis兩篇開胃小菜過(guò)后,我已經(jīng)有一些粉絲了,還有一些粉絲加了我的好友,有粉絲通過(guò)微信對(duì)我的文章表示了肯定
    發(fā)表于 08-31 06:42

    怎樣使用springboot整合netty來(lái)開發(fā)一套高性能的通信系統(tǒng)呢

    怎樣使用springboot整合netty來(lái)開發(fā)一套高性能的通信系統(tǒng)呢?為什么要用這兩個(gè)框架來(lái)實(shí)現(xiàn)通信服務(wù)呢?如何去實(shí)現(xiàn)呢?
    發(fā)表于 02-22 06:09

    網(wǎng)絡(luò)編程框架netty io介紹

    深入理解網(wǎng)絡(luò)編程框架netty io歡迎大家下載學(xué)習(xí)
    發(fā)表于 09-28 07:36

    如何采用mqtt協(xié)議實(shí)現(xiàn)物聯(lián)網(wǎng)模塊消息推送

    如何采用mqtt協(xié)議實(shí)現(xiàn)物聯(lián)網(wǎng)模塊消息推送
    發(fā)表于 11-03 06:55

    單片機(jī)MQTT如何實(shí)現(xiàn)推送的簡(jiǎn)單使用

    本文檔的主要內(nèi)容詳細(xì)介紹的是單片機(jī)MQTT如何實(shí)現(xiàn)推送的簡(jiǎn)單使用。
    發(fā)表于 07-19 17:37 ?9次下載
    單片機(jī)MQTT如何<b class='flag-5'>實(shí)現(xiàn)</b><b class='flag-5'>推送</b>的簡(jiǎn)單使用

    Springboot整合netty框架實(shí)現(xiàn)終端、通訊板子(單片機(jī))TCP/UDP通信案例

    如何springboot和netty案例的源代碼一個(gè)springboot整合netty框架的開發(fā)小案例,實(shí)現(xiàn)服務(wù)端與單片機(jī)終端實(shí)時(shí)通信的通訊架構(gòu)案例。物聯(lián)網(wǎng)通信給板子下發(fā)指令案例附帶源碼及整合流程步驟
    發(fā)表于 12-29 18:55 ?20次下載
    Springboot整合<b class='flag-5'>netty</b>框架<b class='flag-5'>實(shí)現(xiàn)</b>終端、通訊板子(單片機(jī))TCP/UDP通信案例

    詳解Netty高性能異步事件驅(qū)動(dòng)的網(wǎng)絡(luò)框架

    大家好,今天我們來(lái)聊聊Netty的那些事兒,我們都知道Netty是一個(gè)高性能異步事件驅(qū)動(dòng)的網(wǎng)絡(luò)框架。
    的頭像 發(fā)表于 03-16 10:57 ?1841次閱讀

    Netty如何實(shí)現(xiàn)消息推送

    Netty 是一個(gè)利用 Java 的高級(jí)網(wǎng)絡(luò)的能力,隱藏其背后的復(fù)雜性而提供一個(gè)易于使用的 API 的客戶端/服務(wù)器框架。
    的頭像 發(fā)表于 08-30 09:42 ?1465次閱讀

    Netty如何做到單機(jī)百萬(wàn)并發(fā)?

    說(shuō)起 Netty 的異步模型,我相信大多數(shù)人,只要是寫過(guò)服務(wù)端的話,都是耳熟能詳?shù)?,bossGroup 和 workerGroup 被 ServerBootstrap 所驅(qū)動(dòng),用起來(lái)簡(jiǎn)直是如虎添翼。
    的頭像 發(fā)表于 09-07 10:51 ?1066次閱讀

    一步步解決長(zhǎng)連接Netty服務(wù)內(nèi)存泄漏

    線上應(yīng)用長(zhǎng)連接 Netty 服務(wù)出現(xiàn)內(nèi)存泄漏了!真讓人頭大
    的頭像 發(fā)表于 04-27 14:06 ?1123次閱讀
    一步步解決長(zhǎng)連接<b class='flag-5'>Netty</b>服務(wù)內(nèi)存泄漏

    聊聊Netty那些事兒之從內(nèi)核角度看IO模型

    從今天開始我們來(lái)聊聊Netty的那些事兒,我們都知道Netty是一個(gè)高性能異步事件驅(qū)動(dòng)的網(wǎng)絡(luò)框架。
    的頭像 發(fā)表于 05-23 10:27 ?1402次閱讀
    聊聊<b class='flag-5'>Netty</b>那些事兒之從內(nèi)核角度看IO模型

    jdk17下netty導(dǎo)致堆內(nèi)存瘋漲原因排查

    天網(wǎng)風(fēng)控靈璣系統(tǒng)是基于內(nèi)存計(jì)算實(shí)現(xiàn)的高吞吐低延遲在線計(jì)算服務(wù),提供滑動(dòng)或滾動(dòng)窗口內(nèi)的 count、distinctCout、max、min、avg、sum、std 及區(qū)間分布類的在線統(tǒng)計(jì)計(jì)算服務(wù)
    的頭像 發(fā)表于 09-12 11:22 ?796次閱讀
    jdk17下<b class='flag-5'>netty</b>導(dǎo)致堆內(nèi)存瘋漲原因排查
    RM新时代网站-首页