Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: increase message and batch sizes #108

Merged
merged 6 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,4 @@ jobs:

- name: Run posthog tests
run: |
python setup.py test
pytest --verbose --timeout=30
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 3.1.0 - 2023-12-04

1. Increase maximum event size and batch size

## 3.0.2 - 2023-08-17

1. Returns the current flag property with $feature_flag_called events, to make it easier to use in experiments
Expand Down
11 changes: 6 additions & 5 deletions posthog/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@
except ImportError:
from Queue import Empty

MAX_MSG_SIZE = 32 << 10

# Our servers only accept batches less than 500KB. Here limit is set slightly
# lower to leave space for extra data that will be added later, eg. "sentAt".
BATCH_SIZE_LIMIT = 475000
MAX_MSG_SIZE = 900 * 1024 # 900KiB per event

# The maximum request body size is currently 20MiB, let's be conservative
# in case we want to lower it in the future.
BATCH_SIZE_LIMIT = 5 * 1024 * 1024


class Consumer(Thread):
Expand Down Expand Up @@ -104,7 +105,7 @@ def next(self):
item = queue.get(block=True, timeout=self.flush_interval - elapsed)
item_size = len(json.dumps(item, cls=DatetimeSerializer).encode())
if item_size > MAX_MSG_SIZE:
self.log.error("Item exceeds 32kb limit, dropping. (%s)", str(item))
self.log.error("Item exceeds 900kib limit, dropping. (%s)", str(item))
continue
items.append(item)
total_size += item_size
Expand Down
13 changes: 9 additions & 4 deletions posthog/test/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,20 @@ def test_pause(self):
def test_max_batch_size(self):
q = Queue()
consumer = Consumer(q, TEST_API_KEY, flush_at=100000, flush_interval=3)
track = {"type": "track", "event": "python event", "distinct_id": "distinct_id"}
properties = {}
for n in range(0, 500):
properties[str(n)] = "one_long_property_value_to_build_a_big_event"
track = {"type": "track", "event": "python event", "distinct_id": "distinct_id", "properties": properties}
msg_size = len(json.dumps(track).encode())
# number of messages in a maximum-size batch
n_msgs = int(475000 / msg_size)
# Let's capture 8MB of data to trigger two batches
n_msgs = int(8_000_000 / msg_size)

def mock_post_fn(_, data, **kwargs):
res = mock.Mock()
res.status_code = 200
self.assertTrue(len(data.encode()) < 500000, "batch size (%d) exceeds 500KB limit" % len(data.encode()))
request_size = len(data.encode())
# Batches close after the first message bringing it bigger than BATCH_SIZE_LIMIT, let's add 10% of margin
self.assertTrue(request_size < (5 * 1024 * 1024) * 1.1, "batch size (%d) higher than limit" % request_size)
return res

with mock.patch("posthog.request._session.post", side_effect=mock_post_fn) as mock_post:
Expand Down
2 changes: 1 addition & 1 deletion posthog/version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
VERSION = "3.0.2"
VERSION = "3.1.0"

if __name__ == "__main__":
print(VERSION, end="") # noqa: T201
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"flake8-print",
"pre-commit",
],
"test": ["mock>=2.0.0", "freezegun==0.3.15", "pylint", "flake8", "coverage", "pytest"],
"test": ["mock>=2.0.0", "freezegun==0.3.15", "pylint", "flake8", "coverage", "pytest", "pytest-timeout"],
"sentry": ["sentry-sdk", "django"],
}

Expand Down
Loading