[全端筆記]使用Javascript來實現Kafka Producer 的batch

在Javascript中抽象化實現批量請求

Posted by 李定宇 on Saturday, July 6, 2024

前言

在上一篇文章中,使用Python來簡化並實現batch request。這次,要使用Javascript來實現。

入參

查看 python kafka SDK,可以看到關於batch的主要是兩個入參:batch_sizelinger_ms,這兩個參數為指定batch要在什麼條件下執行:

  • 當batch size大於某個數的時候
  • 當publish到一段時間後

先初始化一個 Javascript BatchProcessor 物件:

class BatchProcessor{    
    constructor(batchSize=1, lingerMs=0){
        this.batchSize = batchSize
        this.lingerMs = lingerMs       
    }

初始化過程

再來,需要一個queue來當作 task/message 的buffer;同時,由於NodeJS的runtime是一個single-thread的事件循環,所以如果沒有work_thread 的參與,那麼就不太需要考慮 thread safty的問題,因此直接使用簡單的 array來存儲 queue buffer。

class BatchProcessor{    
    constructor(batchSize=1, lingerMs=0){
        this.batchSize = batchSize
        this.lingerMs = lingerMs
        this.queue = []       
    }

然後需要一個flag來控制 BatchProcessor 的狀態:是要發布還是不要發布

class BatchProcessor{    
    constructor(batchSize=1, lingerMs=0){
        ...
        this.batchEventFlag = false    
    }
    

然後就是啟動監聽 執行時間有沒有超過 this.lingerMs、以及buffer queue有沒有超過 this.batchSize

監聽執行時間

雖然在Python的部分,是使用while循環來監聽;但在Javascript中,由於是single-thread的運行機制,所以要改成 setInterval的方式來執行監聽—>在一段 this.lingerMs時間後,就把 this.batchEventFlag設為true

_batchTimer(){
    setInterval(()=>{
        this.batchEventFlag = true
    }, this.lingerMs)
}

監聽batch size

由於在 execute的時候會對queue 產生變化,所以也是直接在 execute 函數中檢查 queue.length:

execute(task){
    this.queue.push(task)
    if(this.queue.length >= this.batchSize){
        this.batchEventFlag = true
    }
}

執行batch

主要就是在 setInterval中在一個極短時間內輪詢監聽this.batchEventFlag,然後把 queue buffer裡面的task/message 給灌到 batch 中,進而執行this._processBatch(batch)

_batchProcessor(){
    const that = this
    setInterval(async()=>{
        if(that.batchEventFlag){
        
            const batch = []
            for(let i=0; i< that.batchSize; i++){
                if(that.queue.length){
                    batch.push(that.queue.shift())
                }
            }
            if(batch.length){
                await that._processBatch(batch)
            }
            that.batchEventFlag = false
        }
    }, 10)
}

而這個 this. _processBatch(batch),可以看業務需求,要如何處理task/message,來決定其中的邏輯、或者要不要是異步函數。

把監聽函數執行在 BatchProcessor 初始化時

把兩個監聽函數this._batchProcessorthis._batchTimer直接在BatchProcessor初始化的時候執行

全部程式碼



class BatchProcessor{

    constructor(batchSize=1, lingerMs=0){
        this.queue = []
        this.batchSize = batchSize
        this.lingerMs = lingerMs

        this.batchEventFlag = false
        
        this._batchProcessor();
        this._batchTimer();
       
    }

    execute(asyncTask){
        this.queue.push(asyncTask)
        if(this.queue.length >= this.batchSize){
            this.batchEventFlag = true
        }
    }

    _batchTimer(){
        setInterval(()=>{
            this.batchEventFlag = true
        }, this.lingerMs)
    }

    _batchProcessor(){
        const that = this
        setInterval(async()=>{
            if(that.batchEventFlag){
            
                const batch = []
                for(let i=0; i< that.batchSize; i++){
                    if(that.queue.length){
                        batch.push(that.queue.shift())
                    }
                }
                if(batch.length){
                    await that._processBatch(batch)
                }
                that.batchEventFlag = false
            }
        }, 10)
    }

    async _processBatch(batch){
        const data = batch
        console.log(`data: ${JSON.stringify(data)}`)
    }


}

async function sleep(ms) {
    return new Promise(resolve => setTimeout(resolve, ms));
}

async function main() {
    try{
        const processor = new BatchProcessor(2, 1500); // batch_size=2, linger_ms=5000
        for (let i = 0; i < 100; i++) {
            await processor.execute(`task ${i}`);
            await sleep(500)
        }
    }catch(err){
        console.error(err)
    }
    
}

main().catch(console.error);

ChangeLog

  • 20240706–初稿