Data chunking for humans, particularly for data engineers. Makes batched data processing less painful and a little bit more joyful.
datachunks requires Python 3.8 or newer. Install it from PyPI:
$ pip install datachunks
datachunks implements two chunking strategy:
- "Pull" strategy. Wrap your source stream with
chunks
generator and consume a chunked data stream. - "Push" strategy. Create a special "feeder" object that will send data chunks to a specified consumer function.
First strategy is simple an sutable for most of applications, but second gives more flexibility in building non-trivial in-memory processing pipelines.
These functions implement the "pull" chunking strategy resectively for synchronous and async/await apllications.
from datachunks import chunks
for chunk in chunks(range(12), 5):
print(chunk)
Expected output:
[0, 1, 2, 3, 4]
[5, 6, 7, 8, 9]
[10, 11]
Asynchronous version example:
import asyncio
from datachunks import achunks
async def arange(*args, **kwargs):
for i in range(*args, **kwargs):
yield i
async def achunks_demo():
async for chunk in achunks(arange(12), 5):
print(chunk)
asyncio.run(achunks_demo())
Expected output:
[0, 1, 2, 3, 4]
[5, 6, 7, 8, 9]
[10, 11]
"Push" strategy is implemented in ChunkingFeeder
and AsyncChunkingFeeder
objects.
Consider the situation we decided to process odd and even numbers separately. For this purpose we create two feeders
and put
into them odd and even values.
from datachunks import ChunkingFeeder
with ChunkingFeeder(lambda c: print(f'evens: {c}'), 5) as print_evens_feeder, \
ChunkingFeeder(lambda c: print(f'odds: {c}'), 5) as print_odds_feeder:
for i in range(25):
if i % 2 == 0:
print_evens_feeder.put(i)
else:
print_odds_feeder.put(i)
Expected output:
evens: [0, 2, 4, 6, 8]
odds: [1, 3, 5, 7, 9]
evens: [10, 12, 14, 16, 18]
odds: [11, 13, 15, 17, 19]
odds: [21, 23]
evens: [20, 22, 24]
Additional features:
- It is guarandeed that all data is delivered to the callback functions after the context exit.
- It is possible to produce additional items in callback function. It allows to build flexible and even recursive data processing, but of course it is your responsibility to avoid infinite recursion.
- By default
ChunkingFeeder
calls its chunk consumer synchronously. To use multithreading specify theworkers_num
parameter. - To use multiprocessing set the
multiprocessing
parameter toTrue
in addition toworkers_num
parameter. - The
AsyncChunkingFeeder
also supports theworkers_num
parameter, but does not supportmultiprocessing
.
Consider a simple ETL task: we have an orders.jsonl
file that we need to upload to some Mongo database. Sending objects one-by-one is too slow, and file is too big to opload it in one big batch. So we are going to split this data to chunks of reasonable size.
Function read_jsonl
reads the file and yields objects one-by-one:
import json
def read_jsonl():
with open('orders.jsonl', 'r', encoding='utf-8') as jsonl:
for jsoned_obj in jsonl:
if jsoned_obj:
yield json.loads(jsoned_obj)
The following function pulls objects through chunks generator and send objects to some MongoDB:
from datachunks import chunks
def transfer_orders(db_connection):
for chunk in chunks(read_jsonl(), chunk_size=200):
db_connection.orders.insert_many(chunk)
After a while we decided to store purchase and sales orders into different MongoDB collections, so let's use two chunking feeders:
from datachunks import ChunkingFeeder
class TransferOrders():
def __init__(self, db_connection):
self.db_connection = db_connection
def store_purchase_orders(self, chunk):
self.db_connection.purchase_orders.insert_many(chunk)
def store_sales_orders(self, chunk):
self.db_connection.sales_orders.insert_many(chunk)
def do_transfer(self):
with ChunkingFeeder(self.store_purchase_orders, 100, workers_num=1) as purchase_feeder, \
ChunkingFeeder(self.store_purchase_orders, 500, workers_num=1) as sales_feeder:
for order in read_jsonl():
if order.get('order_type') == 'purchase':
purchase_feeder.put(order)
elif order.get('order_type') == 'sales':
sales_feeder.put(order)
def transfer_orders(db_connection):
TransferOrders(db_connection).do_transfer()
The pymongo
library is thread-safe, so it makes sense to speed up our process by storing data in separate threads.