Preface
In the last article, we use Python to simplify and implement batch request. Now is Javascript time.
Params
While checking the Kafka python client SDK, we could see there are two params related with batching: batch_size
and linger_ms
. These two params set the executing batch condition below
- while batch size bigger then the certain number.
- after the certain period
To begin with, lets start initializing the Javascript BatchProcessor instance:
class BatchProcessor{
constructor(batchSize=1, lingerMs=0){
this.batchSize = batchSize
this.lingerMs = lingerMs
}
The Process of Initialize
Then we need a queue as the buffer for task/message. At the same time, we don’t have to consider the thread safety issue if there are no worker thread involving due to the single-thread runtime in Node.js event loop. As a result, we could just apply a pure array as the queue buffer.
class BatchProcessor{
constructor(batchSize=1, lingerMs=0){
this.batchSize = batchSize
this.lingerMs = lingerMs
this.queue = []
}
Here we also need a flag to control the state of BatchProcessor about producing or not.
class BatchProcessor{
constructor(batchSize=1, lingerMs=0){
...
this.batchEventFlag = false
}
Next, start listening for whether execution time over the this.lingerMS and whether buffer queue exceeds this.batchSize.
Listening Execution Time
Although in the Python implementation a while loop is used for listening, in Javascript, due to its single-threaded execution mechanism, setInterval
is used instead. This will exectute the listening process, setting this.batchEventFlag
to true after a period of this.lingerMs
time.
_batchTimer(){
setInterval(()=>{
this.batchEventFlag = true
}, this.lingerMs)
}
Listen Buffer Queue
Since the queue will change during exection, it is also necessary to check queue.length
directly within the execute function.
execute(task){
this.queue.push(task)
if(this.queue.length >= this.batchSize){
this.batchEventFlag = true
}
}
Process Batch
The main approach is to use setInterval
to poll and listen to this.batchEventFlag
within a very short interval, then load the tasks/messages from the queue buffer into the batch, and subsequently execute this._processBatch(batch)
_batchProcessor(){
const that = this
setInterval(async()=>{
if(that.batchEventFlag){
const batch = []
for(let i=0; i< that.batchSize; i++){
if(that.queue.length){
batch.push(that.queue.shift())
}
}
if(batch.length){
await that._processBatch(batch)
}
that.batchEventFlag = false
}
}, 10)
}
Thurdermore, the inner logic and async/sync type of this _processBatch(batch)
could be determined according to business logic.
Place and Execute Both Listening Function during BatchProcessor Initializing
Make two function, this._batchProcessor
and this._batchTimer
, execute directly in the process of BatchProcessor initialization.
Whole Source Code
class BatchProcessor{
constructor(batchSize=1, lingerMs=0){
this.queue = []
this.batchSize = batchSize
this.lingerMs = lingerMs
this.batchEventFlag = false
this._batchProcessor();
this._batchTimer();
}
execute(asyncTask){
this.queue.push(asyncTask)
if(this.queue.length >= this.batchSize){
this.batchEventFlag = true
}
}
_batchTimer(){
setInterval(()=>{
this.batchEventFlag = true
}, this.lingerMs)
}
_batchProcessor(){
const that = this
setInterval(async()=>{
if(that.batchEventFlag){
const batch = []
for(let i=0; i< that.batchSize; i++){
if(that.queue.length){
batch.push(that.queue.shift())
}
}
if(batch.length){
await that._processBatch(batch)
}
that.batchEventFlag = false
}
}, 10)
}
async _processBatch(batch){
const data = batch
console.log(`data: ${JSON.stringify(data)}`)
}
}
async function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
async function main() {
try{
const processor = new BatchProcessor(2, 1500); // batch_size=2, linger_ms=5000
for (let i = 0; i < 100; i++) {
await processor.execute(`task ${i}`);
await sleep(500)
}
}catch(err){
console.error(err)
}
}
main().catch(console.error);
ChangeLog
- 20240706–init