前言
在上一篇文章中,使用Python來簡化並實現batch request。這次,要使用Javascript來實現。
入參
查看 python kafka SDK,可以看到關於batch的主要是兩個入參:batch_size 和 linger_ms,這兩個參數為指定batch要在什麼條件下執行:
- 當batch size大於某個數的時候
 - 當publish到一段時間後
 
先初始化一個 Javascript BatchProcessor 物件:
class BatchProcessor{    
    constructor(batchSize=1, lingerMs=0){
        this.batchSize = batchSize
        this.lingerMs = lingerMs       
    }
初始化過程
再來,需要一個queue來當作 task/message 的buffer;同時,由於NodeJS的runtime是一個single-thread的事件循環,所以如果沒有work_thread 的參與,那麼就不太需要考慮 thread safty的問題,因此直接使用簡單的 array來存儲 queue buffer。
class BatchProcessor{    
    constructor(batchSize=1, lingerMs=0){
        this.batchSize = batchSize
        this.lingerMs = lingerMs
        this.queue = []       
    }
然後需要一個flag來控制 BatchProcessor 的狀態:是要發布還是不要發布
class BatchProcessor{    
    constructor(batchSize=1, lingerMs=0){
        ...
        this.batchEventFlag = false    
    }
    
然後就是啟動監聽 執行時間有沒有超過 this.lingerMs、以及buffer queue有沒有超過 this.batchSize。
監聽執行時間
雖然在Python的部分,是使用while循環來監聽;但在Javascript中,由於是single-thread的運行機制,所以要改成 setInterval的方式來執行監聽—>在一段 this.lingerMs時間後,就把 this.batchEventFlag設為true
_batchTimer(){
    setInterval(()=>{
        this.batchEventFlag = true
    }, this.lingerMs)
}
監聽batch size
由於在 execute的時候會對queue 產生變化,所以也是直接在 execute 函數中檢查 queue.length:
execute(task){
    this.queue.push(task)
    if(this.queue.length >= this.batchSize){
        this.batchEventFlag = true
    }
}
執行batch
主要就是在 setInterval中在一個極短時間內輪詢監聽this.batchEventFlag,然後把 queue buffer裡面的task/message 給灌到 batch 中,進而執行this._processBatch(batch)
_batchProcessor(){
    const that = this
    setInterval(async()=>{
        if(that.batchEventFlag){
        
            const batch = []
            for(let i=0; i< that.batchSize; i++){
                if(that.queue.length){
                    batch.push(that.queue.shift())
                }
            }
            if(batch.length){
                await that._processBatch(batch)
            }
            that.batchEventFlag = false
        }
    }, 10)
}
而這個 this. _processBatch(batch),可以看業務需求,要如何處理task/message,來決定其中的邏輯、或者要不要是異步函數。
把監聽函數執行在 BatchProcessor 初始化時
把兩個監聽函數this._batchProcessor 和 this._batchTimer直接在BatchProcessor初始化的時候執行
全部程式碼
class BatchProcessor{
    constructor(batchSize=1, lingerMs=0){
        this.queue = []
        this.batchSize = batchSize
        this.lingerMs = lingerMs
        this.batchEventFlag = false
        
        this._batchProcessor();
        this._batchTimer();
       
    }
    execute(asyncTask){
        this.queue.push(asyncTask)
        if(this.queue.length >= this.batchSize){
            this.batchEventFlag = true
        }
    }
    _batchTimer(){
        setInterval(()=>{
            this.batchEventFlag = true
        }, this.lingerMs)
    }
    _batchProcessor(){
        const that = this
        setInterval(async()=>{
            if(that.batchEventFlag){
            
                const batch = []
                for(let i=0; i< that.batchSize; i++){
                    if(that.queue.length){
                        batch.push(that.queue.shift())
                    }
                }
                if(batch.length){
                    await that._processBatch(batch)
                }
                that.batchEventFlag = false
            }
        }, 10)
    }
    async _processBatch(batch){
        const data = batch
        console.log(`data: ${JSON.stringify(data)}`)
    }
}
async function sleep(ms) {
    return new Promise(resolve => setTimeout(resolve, ms));
}
async function main() {
    try{
        const processor = new BatchProcessor(2, 1500); // batch_size=2, linger_ms=5000
        for (let i = 0; i < 100; i++) {
            await processor.execute(`task ${i}`);
            await sleep(500)
        }
    }catch(err){
        console.error(err)
    }
    
}
main().catch(console.error);
ChangeLog
- 20240706–初稿