RM新时代网站-首页

0
  • 聊天消息
  • 系統(tǒng)消息
  • 評論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

Kafka高性能背后的技術(shù)原理

小林coding ? 來源:君哥聊技術(shù) ? 2024-10-23 09:37 ? 次閱讀

以下文章來源于君哥聊技術(shù),作者朱晉君

Kafka 是一款性能非常優(yōu)秀的消息隊列,每秒處理的消息體量可以達(dá)到千萬級別。

今天來聊一聊 Kafka 高性能背后的技術(shù)原理,也是面試常問的一個知識考點。

1 批量發(fā)送

Kafka 收發(fā)消息都是批量進(jìn)行處理的。我們看一下 Kafka 生產(chǎn)者發(fā)送消息的代碼:

privateFuturedoSend(ProducerRecordrecord,Callbackcallback){
TopicPartitiontp=null;
try{
//省略前面代碼
CallbackinterceptCallback=newInterceptorCallback<>(callback,this.interceptors,tp);
//把消息追加到之前緩存的這一批消息上
RecordAccumulator.RecordAppendResultresult=accumulator.append(tp,timestamp,serializedKey,
serializedValue,headers,interceptCallback,remainingWaitMs);
//積累到設(shè)置的緩存大小,則發(fā)送出去
if(result.batchIsFull||result.newBatchCreated){
log.trace("Wakingupthesendersincetopic{}partition{}iseitherfullorgettinganewbatch",record.topic(),partition);
this.sender.wakeup();
}
returnresult.future;
//handlingexceptionsandrecordtheerrors;
//forAPIexceptionsreturntheminthefuture,
//forotherexceptionsthrowdirectly
}catch/**省略catch代碼*/
}

從代碼中可以看到,生產(chǎn)者調(diào)用 doSend 方法后,并不會直接把消息發(fā)送出去,而是把消息緩存起來,緩存消息量達(dá)到配置的批量大小后,才會發(fā)送出去。

注意:從上面 accumulator.append 代碼可以看到,一批消息屬于同一個 topic 下面的同一個 partition。

Broker 收到消息后,并不會把批量消息解析成單條消息后落盤,而是作為批量消息進(jìn)行落盤,同時也會把批量消息直接同步給其他副本。

消費者拉取消息,也不會按照單條進(jìn)行拉取,而是按照批量進(jìn)行拉取,拉取到一批消息后,再解析成單條消息進(jìn)行消費。

使用批量收發(fā)消息,減輕了客戶端和 Broker 的交互次數(shù),提升了 Broker 處理能力。

2 消息壓縮

如果消息體比較大,Kafka 消息吞吐量要達(dá)到千萬級別,網(wǎng)卡支持的網(wǎng)絡(luò)傳輸帶寬會是一個瓶頸。Kafka 的解決方案是消息壓縮。發(fā)送消息時,如果增加參數(shù) compression.type,就可以開啟消息壓縮:

publicstaticvoidmain(String[]args){
Propertiesprops=newProperties();
props.put("bootstrap.servers","localhost:9092");
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
//開啟消息壓縮
props.put("compression.type","gzip");
Producerproducer=newKafkaProducer<>(props);

ProducerRecordrecord=newProducerRecord<>("my_topic","key1","value1");

producer.send(record,newCallback(){
@Override
publicvoidonCompletion(RecordMetadatametadata,Exceptionexception){
if(exception!=null){
logger.error("sendingmessage error:", e);
}else{
logger.info("sendingmessage successful, Offset:", metadata.offset());
}
}
});

producer.close();
}

如果 compression.type 的值設(shè)置為 none,則不開啟壓縮。那消息是在什么時候進(jìn)行壓縮呢?前面提到過,生產(chǎn)者緩存一批消息后才會發(fā)送,在發(fā)送這批消息之前就會進(jìn)行壓縮,代碼如下:

publicRecordAppendResultappend(TopicPartitiontp,
longtimestamp,
byte[]key,
byte[]value,
Header[]headers,
Callbackcallback,
longmaxTimeToBlock)throwsInterruptedException{
//...
try{
//...
buffer=free.allocate(size,maxTimeToBlock);
synchronized(dq){
//...
RecordAppendResultappendResult=tryAppend(timestamp,key,value,headers,callback,dq);
if(appendResult!=null){
//Somebodyelsefoundusabatch,returntheonewewaitedfor!Hopefullythisdoesn'thappenoften...
returnappendResult;
}
//這批消息緩存已滿,這里進(jìn)行壓縮
MemoryRecordsBuilderrecordsBuilder=recordsBuilder(buffer,maxUsableMagic);
ProducerBatchbatch=newProducerBatch(tp,recordsBuilder,time.milliseconds());
FutureRecordMetadatafuture=Utils.notNull(batch.tryAppend(timestamp,key,value,headers,callback,time.milliseconds()));

dq.addLast(batch);
incomplete.add(batch);

//Don'tdeallocatethisbufferinthefinallyblockasit'sbeingusedintherecordbatch
buffer=null;

returnnewRecordAppendResult(future,dq.size()>1||batch.isFull(),true);
}
}finally{
if(buffer!=null)
free.deallocate(buffer);
appendsInProgress.decrementAndGet();
}
}

上面的 recordsBuilder 方法最終調(diào)用了下面 MemoryRecordsBuilder 的構(gòu)造方法。

publicMemoryRecordsBuilder(ByteBufferOutputStreambufferStream,
bytemagic,
CompressionTypecompressionType,
TimestampTypetimestampType,
longbaseOffset,
longlogAppendTime,
longproducerId,
shortproducerEpoch,
intbaseSequence,
booleanisTransactional,
booleanisControlBatch,
intpartitionLeaderEpoch,
intwriteLimit){
//省略其他代碼
this.appendStream=newDataOutputStream(compressionType.wrapForOutput(this.bufferStream,magic));
}

上面的 wrapForOutput 方法會根據(jù)配置的壓縮算法進(jìn)行壓縮或者選擇不壓縮。目前 Kafka 支持的壓縮算法包括:gzip、snappy、lz4,從 2.1.0 版本開始,Kafka 支持 Zstandard 算法。

在 Broker 端,會解壓 header 做一些校驗,但不會解壓消息體。消息體的解壓是在消費端,消費者拉取到一批消息后,首先會進(jìn)行解壓,然后進(jìn)行消息處理。

因為壓縮和解壓都是耗費 CPU 的操作,所以在開啟消息壓縮時,也要考慮生產(chǎn)者和消費者的 CPU 資源情況。

有了消息批量收集和壓縮,kafka 生產(chǎn)者發(fā)送消息的過程如下圖:

33f1d514-90dd-11ef-a511-92fbcf53809c.png

3 磁盤順序讀寫

順序讀寫省去了尋址的時間,只要一次尋址,就可以連續(xù)讀寫。

在固態(tài)硬盤上,順序讀寫的性能是隨機讀寫的好幾倍。而在機械硬盤上,尋址時需要移動磁頭,這個機械運動會花費很多時間,因此機械硬盤的順序讀寫性能是隨機讀寫的幾十倍。

Kafka 的 Broker 在寫消息數(shù)據(jù)時,首先為每個 Partition 創(chuàng)建一個文件,然后把數(shù)據(jù)順序地追加到該文件對應(yīng)的磁盤空間中,如果這個文件寫滿了,就再創(chuàng)建一個新文件繼續(xù)追加寫。這樣大大減少了尋址時間,提高了讀寫性能。

4 PageCache

Linux 系統(tǒng)中,所有文件 IO 操作都要通過 PageCache,PageCache 是磁盤文件在內(nèi)存中建立的緩存。當(dāng)應(yīng)用程序讀寫文件時,并不會直接讀寫磁盤上的文件,而是操作 PageCache。

33fa2b24-90dd-11ef-a511-92fbcf53809c.png

應(yīng)用程序?qū)懳募r,都先會把數(shù)據(jù)寫入 PageCache,然后操作系統(tǒng)定期地將 PageCache 的數(shù)據(jù)寫到磁盤上。如下圖:

34021e6a-90dd-11ef-a511-92fbcf53809c.png

而應(yīng)用程序在讀取文件數(shù)據(jù)時,首先會判斷數(shù)據(jù)是否在 PageCache 中,如果在則直接讀取,如果不在,則讀取磁盤,并且將數(shù)據(jù)緩存到 PageCache。

3409b486-90dd-11ef-a511-92fbcf53809c.png

Kafka 充分利用了 PageCache 的優(yōu)勢,當(dāng)生產(chǎn)者生產(chǎn)消息的速率和消費者消費消息的速率差不多時,Kafka 基本可以不用落盤就能完成消息的傳輸。

5 零拷貝

Kafka Broker 將消息發(fā)送給消費端時,即使命中了 PageCache,也需要將 PageCache 中的數(shù)據(jù)先復(fù)制到應(yīng)用程序的內(nèi)存空間,然后從應(yīng)用程序的內(nèi)存空間復(fù)制到 Socket 緩存區(qū),將數(shù)據(jù)發(fā)送出去。如下圖:

341c0708-90dd-11ef-a511-92fbcf53809c.png

Kafka 采用了零拷貝技術(shù)把數(shù)據(jù)直接從 PageCache 復(fù)制到 Socket 緩沖區(qū)中,這樣數(shù)據(jù)不用復(fù)制到用戶態(tài)的內(nèi)存空間,同時 DMA 控制器直接完成數(shù)據(jù)復(fù)制,不需要 CPU 參與。如下圖:

34240c00-90dd-11ef-a511-92fbcf53809c.png

Java 零拷貝技術(shù)采用 FileChannel.transferTo() 方法,底層調(diào)用了 sendfile 方法。

6 mmap

Kafka 的日志文件分為數(shù)據(jù)文件(.log)和索引文件(.index),Kafka 為了提高索引文件的讀取性能,對索引文件采用了 mmap 內(nèi)存映射,將索引文件映射到進(jìn)程的內(nèi)存空間,這樣讀取索引文件就不需要從磁盤進(jìn)行讀取。如下圖:

3440d9d4-90dd-11ef-a511-92fbcf53809c.png

7 總結(jié)

本文介紹了 Kafka 實現(xiàn)高性能用到的關(guān)鍵技術(shù),這些技術(shù)可以為我們學(xué)習(xí)和工作提供參考。

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請聯(lián)系本站處理。 舉報投訴
  • 代碼
    +關(guān)注

    關(guān)注

    30

    文章

    4779

    瀏覽量

    68521
  • 消息隊列
    +關(guān)注

    關(guān)注

    0

    文章

    33

    瀏覽量

    2972
  • kafka
    +關(guān)注

    關(guān)注

    0

    文章

    51

    瀏覽量

    5214

原文標(biāo)題:面試官:你說說 Kafka 為什么是高性能的?

文章出處:【微信號:小林coding,微信公眾號:小林coding】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。

收藏 人收藏

    評論

    相關(guān)推薦

    泰克30+GHz高性能示波器的關(guān)鍵技術(shù)

    泰克公司最近宣布首款經(jīng)驗證采用 IBM 8HP 硅鍺 (SiGe) BiCMOS 特殊工藝技術(shù)設(shè)計的新型示波器平臺ASIC各項技術(shù)指標(biāo)優(yōu)于規(guī)定要求,實現(xiàn)了新型高性能示波器的設(shè)計目標(biāo),使多通道帶寬達(dá)
    發(fā)表于 07-24 07:47

    基于閃存存儲的Apache Kafka性能提升方法

    作者:Dennis Lattka我是美光科技的首席存儲解決方案工程師Dennis Lattka。這個頭銜的真正含義是,我要致力于確定如何利用閃存存儲改善工作負(fù)載應(yīng)用的性能和結(jié)果。為此,我決定對大數(shù)
    發(fā)表于 07-24 06:58

    淺析kafka

    kafka常見問題
    發(fā)表于 09-29 10:09

    基于發(fā)布與訂閱的消息系統(tǒng)Kafka

    Kafka權(quán)威指南》——初識 Kafka
    發(fā)表于 03-05 13:46

    Kafka基礎(chǔ)入門文檔

    kafka系統(tǒng)入門教程(原理、配置、集群搭建、Java應(yīng)用、Kafka-manager)
    發(fā)表于 03-12 07:22

    Kafka集群環(huán)境的搭建

    1、環(huán)境版本版本:kafka2.11,zookeeper3.4注意:這里zookeeper3.4也是基于集群模式部署。2、解壓重命名tar -zxvf
    發(fā)表于 01-05 17:55

    Kafka文件存儲機制分析

    機制設(shè)計是衡量一個消息隊列服務(wù)技術(shù)水平和最關(guān)鍵指標(biāo)之一。 《br》 下面將從Kafka文件存儲機制和物理結(jié)
    發(fā)表于 09-28 15:40 ?0次下載

    大數(shù)據(jù)開發(fā)最火技術(shù)Kafka背后的“黑科技”

    、低延遲等方面有很突出的表現(xiàn)。這篇文章不同于其他介紹Kafka使用或?qū)崿F(xiàn)的文章,只是談?wù)?b class='flag-5'>Kafka用了什么“黑科技”使他在性能方面有這么突出的表現(xiàn)。消息順序?qū)懭氪疟P磁盤大多數(shù)都還是機械結(jié)構(gòu)(SSD不在
    的頭像 發(fā)表于 10-22 17:53 ?989次閱讀
    大數(shù)據(jù)開發(fā)最火<b class='flag-5'>技術(shù)</b><b class='flag-5'>Kafka</b><b class='flag-5'>背后</b>的“黑科技”

    Kafka的概念及Kafka的宕機

    問題要從一次Kafka的宕機開始說起。 筆者所在的是一家金融科技公司,但公司內(nèi)部并沒有采用在金融支付領(lǐng)域更為流行的 RabbitMQ ,而是采用了設(shè)計之初就為日志處理而生的 Kafka ,所以我一直
    的頭像 發(fā)表于 08-27 11:21 ?2093次閱讀
    <b class='flag-5'>Kafka</b>的概念及<b class='flag-5'>Kafka</b>的宕機

    Kafka如何做到那么高的性能

    有人說:他曾在一臺配置較好的機子上對 Kafka 進(jìn)行性能壓測,壓測結(jié)果是 Kafka 單個節(jié)點的極限處理能力接近每秒 2000萬 條消息,吞吐量達(dá)到每秒 600MB。
    的頭像 發(fā)表于 09-14 17:03 ?1029次閱讀

    Kafka 的簡介

    ,即使對TB級以上數(shù)據(jù)也能保證常數(shù)時間的訪問性能 高吞吐率。即使在非常廉價的機器上也能做到單機支持每秒100K條消息的傳輸 支持Kafka Server間的消息分區(qū),及分布式消費,同時保證每個
    的頭像 發(fā)表于 07-03 11:10 ?604次閱讀
    <b class='flag-5'>Kafka</b> 的簡介

    物通博聯(lián)5G-kafka工業(yè)網(wǎng)關(guān)實現(xiàn)kafka協(xié)議對接到云平臺

    Kafka協(xié)議是一種基于TCP層的網(wǎng)絡(luò)協(xié)議,用于在分布式消息傳遞系統(tǒng)Apache Kafka中發(fā)送和接收消息。Kafka協(xié)議定義了客戶端和服務(wù)器之間的通信方式和數(shù)據(jù)格式,允許客戶端發(fā)送消息到K
    的頭像 發(fā)表于 07-11 10:44 ?501次閱讀

    Kafka中學(xué)習(xí)高性能系統(tǒng)如何設(shè)計

    相信各位小伙伴之前或多或少接觸過消息隊列,比較知名的包含 Rocket MQ 和 Kafka,在京東內(nèi)部使用的是自研的消息中間件 JMQ,從 JMQ2 升級到 JMQ4 的也是帶來了性能上的明顯提升,并且 JMQ4 的底層也是參考 Ka
    的頭像 發(fā)表于 07-17 11:25 ?571次閱讀
    從<b class='flag-5'>Kafka</b>中學(xué)習(xí)<b class='flag-5'>高性能</b>系統(tǒng)如何設(shè)計

    Kafka架構(gòu)技術(shù)Kafka的架構(gòu)和客戶端API設(shè)計

    Kafka 給自己的定位是事件流平臺(event stream platform)。因此在消息隊列中經(jīng)常使用的 "消息"一詞,在 Kafka 中被稱為 "事件"。
    的頭像 發(fā)表于 10-10 15:41 ?2365次閱讀
    <b class='flag-5'>Kafka</b>架構(gòu)<b class='flag-5'>技術(shù)</b>:<b class='flag-5'>Kafka</b>的架構(gòu)和客戶端API設(shè)計

    golang中使用kafka的綜合指南

    kafka是一個比較流行的分布式、可拓展、高性能、可靠的流處理平臺。在處理kafka的數(shù)據(jù)時,這里有確保處理效率和可靠性的多種最佳實踐。本文將介紹這幾種實踐方式,并通過sarama實現(xiàn)他們。
    的頭像 發(fā)表于 11-30 11:18 ?592次閱讀
    RM新时代网站-首页