Preface
While using and studying Kafka, I discovered that during the Producer stage, it optimizes publishing by sending messages in batches rather than one at a time to the Kafka broker. This approach can optimize the number of I/O operations and improve processing efficiency.
Therefore, this article aims to simplify and abstract this batching process.
Params
While checking the Kafka python client SDK, we could see there are two params related with batching: batch_size
and linger_ms
. These two params set the executing batch condition below
- while batch size bigger then the certain number.
- after the certain period
To begin with, lets start initializing the Python BatchProcessor instance:
class BatchProcessor:
def __init__(self, batch_size=0, linger_ms=0):
self.batch_size = batch_size
self.linger_ms = linger_ms
The Process of Initialize
Then we need a queue as the buffer of tasks/messages, and also need to ensure thread-safety. Otherwise, we might face race condition during push and pop process.
class BatchProcessor:
def __init__(self, batch_size=0, linger_ms=0):
self.batch_size = batch_size
self.linger_ms = linger_ms
self.queue = asyncio.Queue() # ensure thread safty
Here we also need a flag to control the state of BatchProcessor about producing or not.
class BatchProcessor:
def __init__(self, batch_size=0, linger_ms=0):
# ...
self._batch_event_flag = asyncio.Event()
Next, start listening for whether execution time over the self.linger_ms and whether buffer queue exceeds self.batch_size.
Listening Execution Time
This part is quite simple. Directly pause for self.linger_ms
time within the whole loop, then proceed to execute self._batch_event_flag.set()
to start the batch process.
async def _batch_timer(self):
while True:
await asyncio.sleep(self.linger_ms)
self._batch_event_flag.set()
Listen Buffer Queue
Since the queue will change during exection, it is also necessary to check queue.qsize()
directly within the execute function.
async def execute(self, task):
await self.queue.put(task)
if self.queue.qsize() >= self.batch_size:
self._batch_event.set()
Process Batch
The main approach is poll and listen to this.batchEventFlag
in the while loop, then load the tasks/messages from the queue buffer into the batch, and subsequently execute self._process_batch(batch)
async def _batch_processor(self):
while True:
await self._batch_event_flag.wait()
batch = []
while not self.queue.empty() and len(batch) < self.batch_size:
batch.append(await self.queue.get())
if batch:
self._process_batch(batch)
# reset
self. _batch_event_flag.clear()
The logic within `self._process_batch(batch)
can be determined based on business requirements and how tasks/messages should be processed, including whether it should be an asynchronous function.
Place and Execute Both Listening Function during BatchProcessor Initializing
Make two function, self._batch_processor
and self._batch_timer
, execute by asyncio.create_task
in the process of BatchProcessor initialization.
Whole Source Code
import asyncio
class BatchProcessor:
def __init__(self, batch_size=0, linger_ms=0):
self.queue = asyncio.Queue() # ensure thread safty
self.batch_size = batch_size
self.linger_ms = linger_ms
self._batch_event_flag = asyncio.Event()
asyncio.create_task(self._batch_processor())
asyncio.create_task(self._batch_timer())
async def execute(self, task):
await self.queue.put(task)
if self.queue.qsize() >= self.batch_size:
self._batch_event_flag.set()
async def _batch_timer(self):
while True:
await asyncio.sleep(self.linger_ms)
self._batch_event_flag.set()
async def _batch_processor(self):
while True:
await self._batch_event_flag.wait()
batch = []
while not self.queue.empty() and len(batch) < self.batch_size:
batch.append(await self.queue.get())
if batch:
self._process_batch(batch)
# reset
self._batch_event_flag.clear()
def _process_batch(self, batch):
print(f"data:{batch}")
async def main():
processor = BatchProcessor(batch_size=2,linger_ms=5)
for i in range(100):
await asyncio.sleep(1)
await processor.execute(f"task {i}")
if __name__ == "__main__":
asyncio.run(main())
ChangeLog
- 20240706–init