以下文章來源于君哥聊技術(shù),作者朱晉君
Kafka 是一款性能非常優(yōu)秀的消息隊列,每秒處理的消息體量可以達(dá)到千萬級別。
今天來聊一聊 Kafka 高性能背后的技術(shù)原理,也是面試常問的一個知識考點。
1 批量發(fā)送
Kafka 收發(fā)消息都是批量進(jìn)行處理的。我們看一下 Kafka 生產(chǎn)者發(fā)送消息的代碼:
privateFuturedoSend(ProducerRecord record,Callbackcallback){ TopicPartitiontp=null; try{ //省略前面代碼 CallbackinterceptCallback=newInterceptorCallback<>(callback,this.interceptors,tp); //把消息追加到之前緩存的這一批消息上 RecordAccumulator.RecordAppendResultresult=accumulator.append(tp,timestamp,serializedKey, serializedValue,headers,interceptCallback,remainingWaitMs); //積累到設(shè)置的緩存大小,則發(fā)送出去 if(result.batchIsFull||result.newBatchCreated){ log.trace("Wakingupthesendersincetopic{}partition{}iseitherfullorgettinganewbatch",record.topic(),partition); this.sender.wakeup(); } returnresult.future; //handlingexceptionsandrecordtheerrors; //forAPIexceptionsreturntheminthefuture, //forotherexceptionsthrowdirectly }catch/**省略catch代碼*/ }
從代碼中可以看到,生產(chǎn)者調(diào)用 doSend 方法后,并不會直接把消息發(fā)送出去,而是把消息緩存起來,緩存消息量達(dá)到配置的批量大小后,才會發(fā)送出去。
注意:從上面 accumulator.append 代碼可以看到,一批消息屬于同一個 topic 下面的同一個 partition。
Broker 收到消息后,并不會把批量消息解析成單條消息后落盤,而是作為批量消息進(jìn)行落盤,同時也會把批量消息直接同步給其他副本。
消費者拉取消息,也不會按照單條進(jìn)行拉取,而是按照批量進(jìn)行拉取,拉取到一批消息后,再解析成單條消息進(jìn)行消費。
使用批量收發(fā)消息,減輕了客戶端和 Broker 的交互次數(shù),提升了 Broker 處理能力。
2 消息壓縮
如果消息體比較大,Kafka 消息吞吐量要達(dá)到千萬級別,網(wǎng)卡支持的網(wǎng)絡(luò)傳輸帶寬會是一個瓶頸。Kafka 的解決方案是消息壓縮。發(fā)送消息時,如果增加參數(shù) compression.type,就可以開啟消息壓縮:
publicstaticvoidmain(String[]args){ Propertiesprops=newProperties(); props.put("bootstrap.servers","localhost:9092"); props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); //開啟消息壓縮 props.put("compression.type","gzip"); Producerproducer=newKafkaProducer<>(props); ProducerRecord record=newProducerRecord<>("my_topic","key1","value1"); producer.send(record,newCallback(){ @Override publicvoidonCompletion(RecordMetadatametadata,Exceptionexception){ if(exception!=null){ logger.error("sendingmessage error:", e); }else{ logger.info("sendingmessage successful, Offset:", metadata.offset()); } } }); producer.close(); }
如果 compression.type 的值設(shè)置為 none,則不開啟壓縮。那消息是在什么時候進(jìn)行壓縮呢?前面提到過,生產(chǎn)者緩存一批消息后才會發(fā)送,在發(fā)送這批消息之前就會進(jìn)行壓縮,代碼如下:
publicRecordAppendResultappend(TopicPartitiontp, longtimestamp, byte[]key, byte[]value, Header[]headers, Callbackcallback, longmaxTimeToBlock)throwsInterruptedException{ //... try{ //... buffer=free.allocate(size,maxTimeToBlock); synchronized(dq){ //... RecordAppendResultappendResult=tryAppend(timestamp,key,value,headers,callback,dq); if(appendResult!=null){ //Somebodyelsefoundusabatch,returntheonewewaitedfor!Hopefullythisdoesn'thappenoften... returnappendResult; } //這批消息緩存已滿,這里進(jìn)行壓縮 MemoryRecordsBuilderrecordsBuilder=recordsBuilder(buffer,maxUsableMagic); ProducerBatchbatch=newProducerBatch(tp,recordsBuilder,time.milliseconds()); FutureRecordMetadatafuture=Utils.notNull(batch.tryAppend(timestamp,key,value,headers,callback,time.milliseconds())); dq.addLast(batch); incomplete.add(batch); //Don'tdeallocatethisbufferinthefinallyblockasit'sbeingusedintherecordbatch buffer=null; returnnewRecordAppendResult(future,dq.size()>1||batch.isFull(),true); } }finally{ if(buffer!=null) free.deallocate(buffer); appendsInProgress.decrementAndGet(); } }
上面的 recordsBuilder 方法最終調(diào)用了下面 MemoryRecordsBuilder 的構(gòu)造方法。
publicMemoryRecordsBuilder(ByteBufferOutputStreambufferStream, bytemagic, CompressionTypecompressionType, TimestampTypetimestampType, longbaseOffset, longlogAppendTime, longproducerId, shortproducerEpoch, intbaseSequence, booleanisTransactional, booleanisControlBatch, intpartitionLeaderEpoch, intwriteLimit){ //省略其他代碼 this.appendStream=newDataOutputStream(compressionType.wrapForOutput(this.bufferStream,magic)); }
上面的 wrapForOutput 方法會根據(jù)配置的壓縮算法進(jìn)行壓縮或者選擇不壓縮。目前 Kafka 支持的壓縮算法包括:gzip、snappy、lz4,從 2.1.0 版本開始,Kafka 支持 Zstandard 算法。
在 Broker 端,會解壓 header 做一些校驗,但不會解壓消息體。消息體的解壓是在消費端,消費者拉取到一批消息后,首先會進(jìn)行解壓,然后進(jìn)行消息處理。
因為壓縮和解壓都是耗費 CPU 的操作,所以在開啟消息壓縮時,也要考慮生產(chǎn)者和消費者的 CPU 資源情況。
有了消息批量收集和壓縮,kafka 生產(chǎn)者發(fā)送消息的過程如下圖:
3 磁盤順序讀寫
順序讀寫省去了尋址的時間,只要一次尋址,就可以連續(xù)讀寫。
在固態(tài)硬盤上,順序讀寫的性能是隨機讀寫的好幾倍。而在機械硬盤上,尋址時需要移動磁頭,這個機械運動會花費很多時間,因此機械硬盤的順序讀寫性能是隨機讀寫的幾十倍。
Kafka 的 Broker 在寫消息數(shù)據(jù)時,首先為每個 Partition 創(chuàng)建一個文件,然后把數(shù)據(jù)順序地追加到該文件對應(yīng)的磁盤空間中,如果這個文件寫滿了,就再創(chuàng)建一個新文件繼續(xù)追加寫。這樣大大減少了尋址時間,提高了讀寫性能。
4 PageCache
在 Linux 系統(tǒng)中,所有文件 IO 操作都要通過 PageCache,PageCache 是磁盤文件在內(nèi)存中建立的緩存。當(dāng)應(yīng)用程序讀寫文件時,并不會直接讀寫磁盤上的文件,而是操作 PageCache。
應(yīng)用程序?qū)懳募r,都先會把數(shù)據(jù)寫入 PageCache,然后操作系統(tǒng)定期地將 PageCache 的數(shù)據(jù)寫到磁盤上。如下圖:
而應(yīng)用程序在讀取文件數(shù)據(jù)時,首先會判斷數(shù)據(jù)是否在 PageCache 中,如果在則直接讀取,如果不在,則讀取磁盤,并且將數(shù)據(jù)緩存到 PageCache。
Kafka 充分利用了 PageCache 的優(yōu)勢,當(dāng)生產(chǎn)者生產(chǎn)消息的速率和消費者消費消息的速率差不多時,Kafka 基本可以不用落盤就能完成消息的傳輸。
5 零拷貝
Kafka Broker 將消息發(fā)送給消費端時,即使命中了 PageCache,也需要將 PageCache 中的數(shù)據(jù)先復(fù)制到應(yīng)用程序的內(nèi)存空間,然后從應(yīng)用程序的內(nèi)存空間復(fù)制到 Socket 緩存區(qū),將數(shù)據(jù)發(fā)送出去。如下圖:
Kafka 采用了零拷貝技術(shù)把數(shù)據(jù)直接從 PageCache 復(fù)制到 Socket 緩沖區(qū)中,這樣數(shù)據(jù)不用復(fù)制到用戶態(tài)的內(nèi)存空間,同時 DMA 控制器直接完成數(shù)據(jù)復(fù)制,不需要 CPU 參與。如下圖:
Java 零拷貝技術(shù)采用 FileChannel.transferTo() 方法,底層調(diào)用了 sendfile 方法。
6 mmap
Kafka 的日志文件分為數(shù)據(jù)文件(.log)和索引文件(.index),Kafka 為了提高索引文件的讀取性能,對索引文件采用了 mmap 內(nèi)存映射,將索引文件映射到進(jìn)程的內(nèi)存空間,這樣讀取索引文件就不需要從磁盤進(jìn)行讀取。如下圖:
7 總結(jié)
本文介紹了 Kafka 實現(xiàn)高性能用到的關(guān)鍵技術(shù),這些技術(shù)可以為我們學(xué)習(xí)和工作提供參考。
-
代碼
+關(guān)注
關(guān)注
30文章
4779瀏覽量
68521 -
消息隊列
+關(guān)注
關(guān)注
0文章
33瀏覽量
2972 -
kafka
+關(guān)注
關(guān)注
0文章
51瀏覽量
5214
原文標(biāo)題:面試官:你說說 Kafka 為什么是高性能的?
文章出處:【微信號:小林coding,微信公眾號:小林coding】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。
發(fā)布評論請先 登錄
相關(guān)推薦
評論