[Fullstack Note] Implementing Kafka Producer batching by Python

Abstracting batch request implementation in Python

Posted by Jamie on Saturday, July 6, 2024

Preface

While using and studying Kafka, I discovered that during the Producer stage, it optimizes publishing by sending messages in batches rather than one at a time to the Kafka broker. This approach can optimize the number of I/O operations and improve processing efficiency.

Therefore, this article aims to simplify and abstract this batching process.

Params

While checking the Kafka python client SDK, we could see there are two params related with batching: batch_size and linger_ms. These two params set the executing batch condition below

  • while batch size bigger then the certain number.
  • after the certain period

To begin with, lets start initializing the Python BatchProcessor instance:

class BatchProcessor:
    def __init__(self, batch_size=0, linger_ms=0):
        self.batch_size = batch_size
        self.linger_ms = linger_ms

The Process of Initialize

Then we need a queue as the buffer of tasks/messages, and also need to ensure thread-safety. Otherwise, we might face race condition during push and pop process.

class BatchProcessor:
    def __init__(self, batch_size=0, linger_ms=0):
        self.batch_size = batch_size
        self.linger_ms = linger_ms
        self.queue = asyncio.Queue() # ensure thread safty

Here we also need a flag to control the state of BatchProcessor about producing or not.

class BatchProcessor:
    def __init__(self, batch_size=0, linger_ms=0):
        # ... 
        self._batch_event_flag = asyncio.Event()

Next, start listening for whether execution time over the self.linger_ms and whether buffer queue exceeds self.batch_size.

Listening Execution Time

This part is quite simple. Directly pause for self.linger_ms time within the whole loop, then proceed to execute self._batch_event_flag.set() to start the batch process.

async def _batch_timer(self):
    while True:
        await asyncio.sleep(self.linger_ms)
        self._batch_event_flag.set()

Listen Buffer Queue

Since the queue will change during exection, it is also necessary to check queue.qsize() directly within the execute function.

async def execute(self, task):
    await self.queue.put(task)
    if self.queue.qsize() >= self.batch_size:
        self._batch_event.set()

Process Batch

The main approach is poll and listen to this.batchEventFlag in the while loop, then load the tasks/messages from the queue buffer into the batch, and subsequently execute self._process_batch(batch)

async def _batch_processor(self):
    while True:
        await self._batch_event_flag.wait()
        batch = []
        
        while not self.queue.empty() and len(batch) < self.batch_size:
            batch.append(await self.queue.get())

        if batch:
            self._process_batch(batch)

        # reset
        self. _batch_event_flag.clear()

The logic within `self._process_batch(batch) can be determined based on business requirements and how tasks/messages should be processed, including whether it should be an asynchronous function.

Place and Execute Both Listening Function during BatchProcessor Initializing

Make two function, self._batch_processor and self._batch_timer, execute by asyncio.create_task in the process of BatchProcessor initialization.

Whole Source Code

import asyncio

class BatchProcessor:
    def __init__(self, batch_size=0, linger_ms=0):
        self.queue = asyncio.Queue() # ensure thread safty
        self.batch_size = batch_size
        self.linger_ms = linger_ms

        self._batch_event_flag = asyncio.Event()
        asyncio.create_task(self._batch_processor())
        asyncio.create_task(self._batch_timer())

    async def execute(self, task):
        await self.queue.put(task)

        if self.queue.qsize() >= self.batch_size:
            self._batch_event_flag.set()
    
    async def _batch_timer(self):
        while True:
            await asyncio.sleep(self.linger_ms)
            self._batch_event_flag.set()


    async def _batch_processor(self):
        while True:
            await self._batch_event_flag.wait()
            batch = []
            
            while not self.queue.empty() and len(batch) < self.batch_size:
                batch.append(await self.queue.get())

            if batch:
                self._process_batch(batch)

            # reset 
            self._batch_event_flag.clear()

    def _process_batch(self, batch):
        print(f"data:{batch}")


async def main():
    processor = BatchProcessor(batch_size=2,linger_ms=5)
    for i in range(100):
        await asyncio.sleep(1)
        await processor.execute(f"task {i}")

if __name__ == "__main__":
    asyncio.run(main())

ChangeLog

  • 20240706–init