前言
在上一篇文章中,使用Python來簡化並實現batch request。這次,要使用Javascript來實現。
入參
查看 python kafka SDK,可以看到關於batch的主要是兩個入參:batch_size
和 linger_ms
,這兩個參數為指定batch要在什麼條件下執行:
- 當batch size大於某個數的時候
- 當publish到一段時間後
先初始化一個 Javascript BatchProcessor 物件:
class BatchProcessor{
constructor(batchSize=1, lingerMs=0){
this.batchSize = batchSize
this.lingerMs = lingerMs
}
初始化過程
再來,需要一個queue來當作 task/message 的buffer;同時,由於NodeJS的runtime是一個single-thread的事件循環,所以如果沒有work_thread 的參與,那麼就不太需要考慮 thread safty的問題,因此直接使用簡單的 array來存儲 queue buffer。
class BatchProcessor{
constructor(batchSize=1, lingerMs=0){
this.batchSize = batchSize
this.lingerMs = lingerMs
this.queue = []
}
然後需要一個flag來控制 BatchProcessor 的狀態:是要發布還是不要發布
class BatchProcessor{
constructor(batchSize=1, lingerMs=0){
...
this.batchEventFlag = false
}
然後就是啟動監聽 執行時間有沒有超過 this.lingerMs、以及buffer queue有沒有超過 this.batchSize。
監聽執行時間
雖然在Python的部分,是使用while循環來監聽;但在Javascript中,由於是single-thread的運行機制,所以要改成 setInterval
的方式來執行監聽—>在一段 this.lingerMs
時間後,就把 this.batchEventFlag
設為true
_batchTimer(){
setInterval(()=>{
this.batchEventFlag = true
}, this.lingerMs)
}
監聽batch size
由於在 execute的時候會對queue 產生變化,所以也是直接在 execute 函數中檢查 queue.length:
execute(task){
this.queue.push(task)
if(this.queue.length >= this.batchSize){
this.batchEventFlag = true
}
}
執行batch
主要就是在 setInterval
中在一個極短時間內輪詢監聽this.batchEventFlag
,然後把 queue buffer裡面的task/message 給灌到 batch 中,進而執行this._processBatch(batch)
_batchProcessor(){
const that = this
setInterval(async()=>{
if(that.batchEventFlag){
const batch = []
for(let i=0; i< that.batchSize; i++){
if(that.queue.length){
batch.push(that.queue.shift())
}
}
if(batch.length){
await that._processBatch(batch)
}
that.batchEventFlag = false
}
}, 10)
}
而這個 this. _processBatch(batch)
,可以看業務需求,要如何處理task/message,來決定其中的邏輯、或者要不要是異步函數。
把監聽函數執行在 BatchProcessor 初始化時
把兩個監聽函數this._batchProcessor
和 this._batchTimer
直接在BatchProcessor初始化的時候執行
全部程式碼
class BatchProcessor{
constructor(batchSize=1, lingerMs=0){
this.queue = []
this.batchSize = batchSize
this.lingerMs = lingerMs
this.batchEventFlag = false
this._batchProcessor();
this._batchTimer();
}
execute(asyncTask){
this.queue.push(asyncTask)
if(this.queue.length >= this.batchSize){
this.batchEventFlag = true
}
}
_batchTimer(){
setInterval(()=>{
this.batchEventFlag = true
}, this.lingerMs)
}
_batchProcessor(){
const that = this
setInterval(async()=>{
if(that.batchEventFlag){
const batch = []
for(let i=0; i< that.batchSize; i++){
if(that.queue.length){
batch.push(that.queue.shift())
}
}
if(batch.length){
await that._processBatch(batch)
}
that.batchEventFlag = false
}
}, 10)
}
async _processBatch(batch){
const data = batch
console.log(`data: ${JSON.stringify(data)}`)
}
}
async function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
async function main() {
try{
const processor = new BatchProcessor(2, 1500); // batch_size=2, linger_ms=5000
for (let i = 0; i < 100; i++) {
await processor.execute(`task ${i}`);
await sleep(500)
}
}catch(err){
console.error(err)
}
}
main().catch(console.error);
ChangeLog
- 20240706–初稿