RM新时代网站-首页

0
  • 聊天消息
  • 系統(tǒng)消息
  • 評論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

一種異步延遲隊(duì)列的實(shí)現(xiàn)方式調(diào)研

OSC開源社區(qū) ? 來源:京東云開發(fā)者 ? 2023-03-31 10:10 ? 次閱讀

一、應(yīng)用場景

目前系統(tǒng)中有很多需要用到延時(shí)處理的功能:支付超時(shí)取消、排隊(duì)超時(shí)、短信、微信等提醒延遲發(fā)送、token刷新、會員卡過期等等。通過延時(shí)處理,極大地節(jié)省系統(tǒng)的資源,不必輪詢數(shù)據(jù)庫處理任務(wù)。

目前大部分功能通過定時(shí)任務(wù)完成,定時(shí)任務(wù)還分使用quartz及xxljob兩種類型輪詢時(shí)間短,每秒執(zhí)行一次,對數(shù)據(jù)庫造成一定的壓力,并且會有1秒的誤差。輪詢時(shí)間久,如30分鐘一次,03:01插入一條數(shù)據(jù),正常3:31執(zhí)行過期,但是3:30執(zhí)行輪詢時(shí),掃描330的數(shù)據(jù),是掃描不到3:31的數(shù)據(jù)的,需要4:00的時(shí)候才能掃描到,相當(dāng)于多延遲了29分鐘!

二、演示處理方式調(diào)研

1.DelayQueue

實(shí)現(xiàn)方式:

jvm提供的延遲阻塞隊(duì)列,通過優(yōu)先級隊(duì)列對不同延遲時(shí)間任務(wù)進(jìn)行排序,通過condition進(jìn)行阻塞、睡眠dealy時(shí)間 獲取延遲任務(wù)。

當(dāng)有新任務(wù)加入時(shí),會判斷新任務(wù)是否是第一個(gè)待執(zhí)行的任務(wù),若是,會解除隊(duì)列睡眠,防止新加入的元素時(shí)需要執(zhí)行的元素而不能正常被執(zhí)行線程獲取到。

存在的問題:

單機(jī)運(yùn)行,系統(tǒng)宕機(jī)后,無法進(jìn)行有效的重試

沒有執(zhí)行記錄和備份

沒有重試機(jī)制

系統(tǒng)重啟時(shí),會將任務(wù)清空!

不能分片消費(fèi)

優(yōu)勢: 實(shí)現(xiàn)簡單,無任務(wù)時(shí)阻塞,節(jié)省資源,執(zhí)行時(shí)間準(zhǔn)確

2.延遲隊(duì)列mq

實(shí)現(xiàn)方式:依賴mq,通過設(shè)置延遲消費(fèi)時(shí)間,達(dá)到延遲消費(fèi)功能。像rabbitMq、jmq都可以設(shè)置延遲消費(fèi)時(shí)間。RabbitMq通過將消息設(shè)置過期時(shí)間,放入私信隊(duì)列進(jìn)行消費(fèi)實(shí)現(xiàn)。

存在的問題:時(shí)間設(shè)置不靈活,每個(gè)queue是固定的到期時(shí)間,每次新創(chuàng)建延時(shí)隊(duì)列,需要?jiǎng)?chuàng)建新的消息隊(duì)列

優(yōu)點(diǎn):依靠jmq,可以有效的監(jiān)控、消費(fèi)記錄、重試,具備多機(jī)同時(shí)消費(fèi)能力,不懼怕宕機(jī)

3.定時(shí)任務(wù)

通過定時(shí)任務(wù)輪詢符合條件的數(shù)據(jù)

缺點(diǎn):

必須要讀業(yè)務(wù)數(shù)據(jù)庫,對數(shù)據(jù)庫造成一定的壓力,

存在延時(shí)

一次掃描數(shù)據(jù)量過大時(shí),占用過多的系統(tǒng)資源。

無法分片消費(fèi)

優(yōu)點(diǎn):

消費(fèi)失敗后,下次還能繼續(xù)消費(fèi),具備重試能力,

消費(fèi)能力穩(wěn)定

4.redis

任務(wù)存儲在redis中,使用redis的 zset隊(duì)列根據(jù)score進(jìn)行排序,程序通過線程不斷獲取隊(duì)列數(shù)據(jù)消費(fèi),實(shí)現(xiàn)延時(shí)隊(duì)列

優(yōu)點(diǎn):

查詢r(jià)edis相比較數(shù)據(jù)庫快,set隊(duì)列長度過大,會根據(jù)跳表結(jié)構(gòu)進(jìn)行查詢,效率高

redis可根據(jù)時(shí)間戳進(jìn)行排序,只需要查詢當(dāng)前時(shí)間戳內(nèi)的分?jǐn)?shù)的任務(wù)即可

無懼機(jī)器重啟

分布式消費(fèi)

缺點(diǎn):

受限于redis性能,并發(fā)10W

多個(gè)命令無法保證原子性,使用lua腳本會要求所有數(shù)據(jù)都在一個(gè)redis分片上。

5. 時(shí)間輪

通過時(shí)間輪實(shí)現(xiàn)的延遲任務(wù)執(zhí)行,也是基于jvm單機(jī)運(yùn)行,如kafka、netty都有實(shí)現(xiàn)時(shí)間輪,redisson的看門狗也是通過netty的時(shí)間輪實(shí)現(xiàn)的。

缺點(diǎn):不適合分布式服務(wù)的使用,宕機(jī)后,會丟失任務(wù)。

d0c5de70-cf06-11ed-bfe3-dac502259ad0.jpg

三、實(shí)現(xiàn)目標(biāo)

兼容目前在使用的異步事件組件,并提供更可靠,可重試、有記錄、可監(jiān)控報(bào)警、高性能的延遲組件。

消息傳輸可靠性:消息進(jìn)入到延遲隊(duì)列后,保證至少被消費(fèi)一次。

Client支持豐富:支持多重語言。

高可用性:支持多實(shí)例部署。掛掉一個(gè)實(shí)例后,還有后備實(shí)例繼續(xù)提供服務(wù)。

實(shí)時(shí)性:允許存在一定的時(shí)間誤差。

支持消息刪除:業(yè)務(wù)使用方,可以隨時(shí)刪除指定消息。

支持消費(fèi)查詢

支持手動(dòng)重試

對當(dāng)前異步事件的執(zhí)行增加監(jiān)控

四、架構(gòu)設(shè)計(jì)

d0db2f8c-cf06-11ed-bfe3-dac502259ad0.png

五、延遲組件實(shí)現(xiàn)方式

1.實(shí)現(xiàn)原理

目前選擇使用jimdb通過zset實(shí)現(xiàn)延時(shí)功能,將任務(wù)id和對應(yīng)的執(zhí)行時(shí)間作為score存在在zset隊(duì)列中,默認(rèn)會按照score排序,每次取0-當(dāng)前時(shí)間內(nèi)的score的任務(wù)id,

發(fā)送延遲任務(wù)時(shí),會根據(jù)時(shí)間戳+機(jī)器ip+queueName+sequence 生成唯一的id,構(gòu)造消息體,加密后放入zset隊(duì)列中。

通過搬運(yùn)線程,將達(dá)到執(zhí)行時(shí)間的任務(wù)移動(dòng)到發(fā)布隊(duì)列中,等待消費(fèi)者獲取。

監(jiān)控方通過集成ump

消費(fèi)記錄通過redis備份+數(shù)據(jù)庫持久化完成。

通過緩存實(shí)現(xiàn)的方式,只是實(shí)現(xiàn)的一種,可以通過參數(shù)控制使用哪一種實(shí)現(xiàn)方式,并可通過spi自由擴(kuò)展。

2.消息結(jié)構(gòu)

每個(gè)Job必須包含以下幾個(gè)屬性:

Topic:Job類型,即QueueName

Id:Job的唯一標(biāo)識。用來檢索和刪除指定的Job信息。

Delay:Job需要延遲的時(shí)間。單位:秒。(服務(wù)端會將其轉(zhuǎn)換為絕對時(shí)間)

Body:Job的內(nèi)容,供消費(fèi)者做具體的業(yè)務(wù)處理,以json格式存儲。

traceId:發(fā)送線程的traceId,待后續(xù)pfinder支持設(shè)置traceId后,可與發(fā)送線程公用同一個(gè)traceiD,便于日志追蹤

具體結(jié)構(gòu)如下圖表示:

d113aa24-cf06-11ed-bfe3-dac502259ad0.png

TTR的設(shè)計(jì)目的是為了保證消息傳輸?shù)目煽啃浴?/p>

3.數(shù)據(jù)流轉(zhuǎn)及流程圖

d12e09be-cf06-11ed-bfe3-dac502259ad0.png

基于redis-disruptor方式進(jìn)行發(fā)布、消費(fèi),可以作為消息來進(jìn)行使用,消費(fèi)者采用原有異步事件的disruptor無鎖隊(duì)列消費(fèi),不同應(yīng)用、不同queue之間無鎖

1)支持應(yīng)用只發(fā)布,不消費(fèi),達(dá)到消息隊(duì)列的功能。

2)支持分桶,針對大key問題,若事件多,可以設(shè)置延遲隊(duì)列和任務(wù)隊(duì)列桶的數(shù)量,減小因大key造成的redis阻塞問題。

3)通過ducc配置,進(jìn)行性能的擴(kuò)展,目前只支持開啟消費(fèi)和關(guān)閉消費(fèi)。

4)支持設(shè)置超時(shí)時(shí)間配置,防止消費(fèi)線程執(zhí)行過久

?瓶頸:消費(fèi)速度慢,生產(chǎn)速度過快,會導(dǎo)致ringbuffer隊(duì)列占滿,當(dāng)前應(yīng)用既是生產(chǎn)者也是消費(fèi)者時(shí),生產(chǎn)者會休眠,性能取決于消費(fèi)速度,可通過水平擴(kuò)展機(jī)器,直接提升性能。監(jiān)控redis隊(duì)列的長度,若不斷增長,可考慮增加消費(fèi)者,直接提高性能。

可能出現(xiàn)的情況:因一個(gè)應(yīng)用公用一個(gè)disruptor,擁有64個(gè)消費(fèi)者線程,如果某一個(gè)事件消費(fèi)過慢,導(dǎo)致64個(gè)線程都在消費(fèi)這個(gè)事件,會導(dǎo)致其他事件無消費(fèi)線程消費(fèi),生產(chǎn)者線程也被阻塞,導(dǎo)致所有事件的消費(fèi)都被阻塞。

后期觀察是否有這個(gè)性能瓶頸,可給每一個(gè)queue一個(gè)消費(fèi)者線程池。

六、demo示例

增加配置文件

判斷是否開啟jd.event.enable:true

 com.jd.car
 senna-event
 1.0-SNAPSHOT 
配置
jd:
senna:
event:
enable: true
queue:
retryEventQueue:
bucketNum: 1
handleBean: retryHandle
消費(fèi)代碼:

package com.jd.car.senna.admin.event;


import com.jd.car.senna.event.EventHandler;
import com.jd.car.senna.event.annotation.SennaEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;


/**
* @author zhangluyao
* @description
* @create 2022-02-21-9:54 下午
*/
@Slf4j
@Component("retryHandle")
public class RetryQueueEvent extends EventHandler {


@Override
protected void onHandle(String key, String eventType) {
log.info("Handler開始消費(fèi):{}", key);
}


@Override
protected void onDelayHandle(String key, String eventType) {
log.info("delayHandler開始消費(fèi):{}", key);
}
}

注解形式:


package com.jd.car.senna.admin.event;


import com.jd.car.senna.event.EventHandler;
import com.jd.car.senna.event.annotation.SennaEvent;
import lombok.extern.slf4j.Slf4j;


/**
* @author zhangluyao
* @description
* @create 2022-02-21-9:54 下午
*/
@Slf4j
@SennaEvent(queueName = "testQueue", bucketNum = 5,delayBucketNum = 5,delayEnable = true)
public class TestQueueEvent extends EventHandler {


@Override
protected void onHandle(String key, String eventType) {
log.info("Handler開始消費(fèi):{}", key);
}


@Override
protected void onDelayHandle(String key, String eventType) {
log.info("delayHandler開始消費(fèi):{}", key);
}
}

發(fā)送代碼:


package com.jd.car.senna.admin.controller;


import com.jd.car.senna.event.queue.IEventQueue;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Lazy;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;


import javax.annotation.Resource;
import java.util.concurrent.CompletableFuture;




/**
* @author zly
*/
@RestController
@Slf4j
public class DemoController {


@Lazy
@Resource(name = "testQueue")
private IEventQueue eventQueue;


@ResponseBody
@GetMapping("/api/v1/demo")
public String demo() {
log.info("發(fā)送無延遲消息");
eventQueue.push("no delay 5000 millseconds message 3");
return "ok";
}


@ResponseBody
@GetMapping("/api/v1/demo1")
public String demo1() {
log.info("發(fā)送延遲5秒消息");
eventQueue.push(" delay 5000 millseconds message,name",1000*5L);
return "ok";
}


@ResponseBody
@GetMapping("/api/v1/demo2")
public String demo2() {
log.info("發(fā)送延遲到2022-04-02 00:00:00執(zhí)行的消息");
eventQueue.push(" delay message,name to 2022-04-02 00:00:00", new Date(1648828800000));
return "ok";
} 


}





審核編輯:劉清

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點(diǎn)僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請聯(lián)系本站處理。 舉報(bào)投訴
  • 看門狗
    +關(guān)注

    關(guān)注

    10

    文章

    560

    瀏覽量

    70789
  • SPI接口
    +關(guān)注

    關(guān)注

    0

    文章

    258

    瀏覽量

    34373
  • JVM
    JVM
    +關(guān)注

    關(guān)注

    0

    文章

    158

    瀏覽量

    12220
  • Redis
    +關(guān)注

    關(guān)注

    0

    文章

    374

    瀏覽量

    10871

原文標(biāo)題:一種異步延遲隊(duì)列的實(shí)現(xiàn)方式

文章出處:【微信號:OSC開源社區(qū),微信公眾號:OSC開源社區(qū)】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。

收藏 人收藏

    評論

    相關(guān)推薦

    用FPGA芯片實(shí)現(xiàn)高速異步FIFO的一種方法

    現(xiàn)代集成電路芯片中,隨著設(shè)計(jì)規(guī)模的不斷擴(kuò)大。個(gè)系統(tǒng)中往往含有數(shù)個(gè)時(shí)鐘。多時(shí)鐘帶來的個(gè)問題就是,如何設(shè)計(jì)異步時(shí)鐘之間的接口電路。異步 FIFO(First In First Out)
    發(fā)表于 05-28 10:56 ?3832次閱讀

    延遲隊(duì)列實(shí)現(xiàn)方式

    由MQ中消費(fèi)到完整的數(shù)據(jù)則直接處理,否則進(jìn)入其他流程。 針對這種場景使用了延遲任務(wù)來實(shí)現(xiàn),以此為契機(jī)對延遲任務(wù)相關(guān)的技術(shù)做了個(gè)簡單了解... 簡介 延遲任務(wù)是
    的頭像 發(fā)表于 09-30 11:17 ?809次閱讀

    Spring Boot如何實(shí)現(xiàn)異步任務(wù)

    Spring Boot 提供了多種方式實(shí)現(xiàn)異步任務(wù),這里介紹三主要實(shí)現(xiàn)方式。 1、基于注解
    的頭像 發(fā)表于 09-30 10:32 ?1436次閱讀

    異步電路匹配延遲延遲線,如何設(shè)計(jì)能實(shí)現(xiàn)自適應(yīng)連續(xù)地調(diào)整延遲長度

    異步電路匹配延遲延遲線如何設(shè)計(jì)能實(shí)現(xiàn)自適應(yīng)連續(xù)地調(diào)整延遲長度。這是個(gè)新的想法,希望有大神能和
    發(fā)表于 03-19 17:19

    請問怎樣去設(shè)計(jì)一種異步FIFO?

    為什么要設(shè)計(jì)一種異步FIFO?異步FIFO的設(shè)計(jì)原理是什么?怎樣去設(shè)計(jì)一種異步FIFO?
    發(fā)表于 06-18 09:20

    怎樣去設(shè)計(jì)一種采用覆蓋機(jī)制的FIFO隊(duì)列模型呢

    FIFO隊(duì)列是什么?怎樣去設(shè)計(jì)一種采用覆蓋機(jī)制的FIFO隊(duì)列模型呢?
    發(fā)表于 12-08 06:07

    實(shí)現(xiàn)隊(duì)列環(huán)形緩沖的方法

    串口隊(duì)列環(huán)形緩沖區(qū)隊(duì)列串口環(huán)形緩沖的好處代碼實(shí)現(xiàn)隊(duì)列??要實(shí)現(xiàn)隊(duì)列環(huán)形緩沖,還需要
    發(fā)表于 02-21 07:11

    如何去實(shí)現(xiàn)一種隊(duì)列程序的設(shè)計(jì)呢

    隊(duì)列的原理是什么?隊(duì)列有何作用?如何去實(shí)現(xiàn)一種隊(duì)列程序的設(shè)計(jì)呢?
    發(fā)表于 02-25 07:50

    一種改進(jìn)的主動(dòng)隊(duì)列管理算法

    主動(dòng)隊(duì)列管理是實(shí)現(xiàn)網(wǎng)絡(luò)擁塞控制的重要技術(shù),但是多數(shù)主動(dòng)隊(duì)列管理算法如隨機(jī)早期檢(RED)都存在對參數(shù)依賴性強(qiáng)的問題。針對RED算法中平均隊(duì)列長度不能完全反映網(wǎng)絡(luò)擁塞狀況的
    發(fā)表于 04-13 09:08 ?14次下載

    一種高效的磁盤隊(duì)列I/O機(jī)制

    分析了傳統(tǒng)磁盤隊(duì)列的存儲管理開銷和讀寫性能,針對磁盤隊(duì)列I/O已成為影響消息服務(wù)器性能的首要瓶頸,提出了一種高效磁盤隊(duì)列I/O機(jī)制—FlashQ。FlashQ采用物理上連續(xù)的磁盤塊
    發(fā)表于 05-14 19:51 ?32次下載

    異步傳輸方式的HDLC協(xié)議的實(shí)現(xiàn)

    研究實(shí)現(xiàn)一種 HDLC (High Level Data Link Contr01)協(xié)議的改進(jìn)方法,該方法把HDLC協(xié)議傳統(tǒng)的同步傳榆方式改成了異步傳輸
    發(fā)表于 07-20 17:25 ?62次下載
    <b class='flag-5'>異步</b>傳輸<b class='flag-5'>方式</b>的HDLC協(xié)議的<b class='flag-5'>實(shí)現(xiàn)</b>

    一種基于信號延遲的光網(wǎng)絡(luò)攻擊方式

    針對光網(wǎng)絡(luò)攻擊易被發(fā)現(xiàn)的問題,提出一種基于信號延遲插入的光網(wǎng)絡(luò)攻擊方式。該方法在不改變鏈路光學(xué)性能的基礎(chǔ)上,利用信號延遲在系統(tǒng)中引起較高的串?dāng)_,極大的降低了系統(tǒng)的
    發(fā)表于 03-20 15:34 ?27次下載
    <b class='flag-5'>一種</b>基于信號<b class='flag-5'>延遲</b>的光網(wǎng)絡(luò)攻擊<b class='flag-5'>方式</b>

    TencentOS-tiny中環(huán)形隊(duì)列實(shí)現(xiàn)

    1. 什么是隊(duì)列隊(duì)列(queue)是一種只能在端插入元素、在另端刪除元素的數(shù)據(jù)結(jié)構(gòu),遵循「先入先出」(FIFO)的規(guī)則。 隊(duì)列中有兩個(gè)基
    的頭像 發(fā)表于 10-08 16:30 ?1380次閱讀

    如何用Redis實(shí)現(xiàn)延遲隊(duì)列呢?

    前段時(shí)間有個(gè)小項(xiàng)目需要使用延遲任務(wù),談到延遲任務(wù),我腦子第時(shí)間閃而過的就是使用消息隊(duì)列來做,比如RabbitMQ的死信
    的頭像 發(fā)表于 03-16 14:28 ?659次閱讀

    JavaWeb消息隊(duì)列使用指南

    在現(xiàn)代的JavaWeb應(yīng)用中,消息隊(duì)列(Message Queue)是一種常見的技術(shù),用于異步處理任務(wù)、解耦系統(tǒng)組件、提高系統(tǒng)性能和可靠性。 1. 消息隊(duì)列的基本概念 消息
    的頭像 發(fā)表于 11-25 09:27 ?139次閱讀
    RM新时代网站-首页