[Fullstack Note] Implementing Kafka Producer batching by Javascript

Abstracting batch request implementation in Javascript

Posted by Jamie on Saturday, July 6, 2024

Preface

In the last article, we use Python to simplify and implement batch request. Now is Javascript time.

Params

While checking the Kafka python client SDK, we could see there are two params related with batching: batch_size and linger_ms. These two params set the executing batch condition below

  • while batch size bigger then the certain number.
  • after the certain period

To begin with, lets start initializing the Javascript BatchProcessor instance:

class BatchProcessor{    
    constructor(batchSize=1, lingerMs=0){
        this.batchSize = batchSize
        this.lingerMs = lingerMs       
    }

The Process of Initialize

Then we need a queue as the buffer for task/message. At the same time, we don’t have to consider the thread safety issue if there are no worker thread involving due to the single-thread runtime in Node.js event loop. As a result, we could just apply a pure array as the queue buffer.

class BatchProcessor{    
    constructor(batchSize=1, lingerMs=0){
        this.batchSize = batchSize
        this.lingerMs = lingerMs
        this.queue = []       
    }

Here we also need a flag to control the state of BatchProcessor about producing or not.

class BatchProcessor{    
    constructor(batchSize=1, lingerMs=0){
        ...
        this.batchEventFlag = false    
    }
    

Next, start listening for whether execution time over the this.lingerMS and whether buffer queue exceeds this.batchSize.

Listening Execution Time

Although in the Python implementation a while loop is used for listening, in Javascript, due to its single-threaded execution mechanism, setInterval is used instead. This will exectute the listening process, setting this.batchEventFlag to true after a period of this.lingerMs time.

_batchTimer(){
    setInterval(()=>{
        this.batchEventFlag = true
    }, this.lingerMs)
}

Listen Buffer Queue

Since the queue will change during exection, it is also necessary to check queue.length directly within the execute function.

execute(task){
    this.queue.push(task)
    if(this.queue.length >= this.batchSize){
        this.batchEventFlag = true
    }
}

Process Batch

The main approach is to use setInterval to poll and listen to this.batchEventFlag within a very short interval, then load the tasks/messages from the queue buffer into the batch, and subsequently execute this._processBatch(batch)

_batchProcessor(){
    const that = this
    setInterval(async()=>{
        if(that.batchEventFlag){
        
            const batch = []
            for(let i=0; i< that.batchSize; i++){
                if(that.queue.length){
                    batch.push(that.queue.shift())
                }
            }
            if(batch.length){
                await that._processBatch(batch)
            }
            that.batchEventFlag = false
        }
    }, 10)
}

Thurdermore, the inner logic and async/sync type of this _processBatch(batch) could be determined according to business logic.

Place and Execute Both Listening Function during BatchProcessor Initializing

Make two function, this._batchProcessor and this._batchTimer, execute directly in the process of BatchProcessor initialization.

Whole Source Code



class BatchProcessor{

    constructor(batchSize=1, lingerMs=0){
        this.queue = []
        this.batchSize = batchSize
        this.lingerMs = lingerMs

        this.batchEventFlag = false
        
        this._batchProcessor();
        this._batchTimer();
       
    }

    execute(asyncTask){
        this.queue.push(asyncTask)
        if(this.queue.length >= this.batchSize){
            this.batchEventFlag = true
        }
    }

    _batchTimer(){
        setInterval(()=>{
            this.batchEventFlag = true
        }, this.lingerMs)
    }

    _batchProcessor(){
        const that = this
        setInterval(async()=>{
            if(that.batchEventFlag){
            
                const batch = []
                for(let i=0; i< that.batchSize; i++){
                    if(that.queue.length){
                        batch.push(that.queue.shift())
                    }
                }
                if(batch.length){
                    await that._processBatch(batch)
                }
                that.batchEventFlag = false
            }
        }, 10)
    }

    async _processBatch(batch){
        const data = batch
        console.log(`data: ${JSON.stringify(data)}`)
    }


}

async function sleep(ms) {
    return new Promise(resolve => setTimeout(resolve, ms));
}

async function main() {
    try{
        const processor = new BatchProcessor(2, 1500); // batch_size=2, linger_ms=5000
        for (let i = 0; i < 100; i++) {
            await processor.execute(`task ${i}`);
            await sleep(500)
        }
    }catch(err){
        console.error(err)
    }
    
}

main().catch(console.error);

ChangeLog

  • 20240706–init