Skip to content
This repository is currently being migrated. It's locked while the migration is in progress.

Provide alternative in-memory queue for publishing events #763

Closed
Closed
Show file tree
Hide file tree
Changes from 70 commits
Commits
Show all changes
79 commits
Select commit Hold shift + click to select a range
4918353
add in memory queue option
Dec 7, 2022
7558da7
cleanup
Dec 7, 2022
be54be0
Merge branch 'develop' into replace_posix_queue_event_publishing
sydjryan Dec 7, 2022
b71fac2
lint, cleanup
Dec 7, 2022
7916749
Merge branch 'replace_posix_queue_event_publishing' of github.com:syd…
Dec 7, 2022
84c50c9
linting
Dec 7, 2022
8cd02e9
linting
Dec 7, 2022
719749a
linting :sob:
Dec 7, 2022
7e84998
linting
Dec 8, 2022
057f295
wip
Dec 14, 2022
3740cc6
wip
Dec 20, 2022
1b1d5cf
thrift updates
Dec 23, 2022
5fd17f5
working
Jan 3, 2023
1d9510b
cleanup, wipg
Jan 3, 2023
6628c1c
linting
Jan 3, 2023
6834daa
linting
Jan 4, 2023
efffbcb
linting, remove docker-compose change
Jan 4, 2023
afcbd9f
dockerfile
Jan 4, 2023
de2822d
Merge branch 'develop' into replace_posix_queue_event_publishing
sydjryan Jan 4, 2023
05a10a7
typo
Jan 4, 2023
6632221
Merge branch 'replace_posix_queue_event_publishing' of github.com:syd…
Jan 4, 2023
257c0bb
wip
Jan 9, 2023
c31eb31
updates
Jan 9, 2023
7b93a00
cleanup exceptions
Jan 9, 2023
f522fce
linting
Jan 10, 2023
11e498e
linting
Jan 10, 2023
27d4171
linting
Jan 10, 2023
78c3102
cleanup
Jan 10, 2023
2e364d1
updates
Jan 19, 2023
4c4c938
EventQueue takes n optionala queue
Jan 24, 2023
95a225e
clarify comments
Feb 13, 2023
018134e
linting
Feb 13, 2023
3ba5648
linting
Feb 13, 2023
e1345b0
linting
Feb 13, 2023
276e768
linting :sob:
Feb 13, 2023
50226db
wip need to test metrics
Feb 23, 2023
e28ed06
move timer start
Feb 24, 2023
fc95a77
Merge branch 'develop' into replace_posix_queue_event_publishing
sydjryan Feb 24, 2023
7f6538b
import order
Feb 24, 2023
72f5c1d
Merge branch 'replace_posix_queue_event_publishing' of github.com:syd…
Feb 24, 2023
406161f
linting
Feb 24, 2023
5a459a9
wip
Feb 27, 2023
3eee3d9
x
Mar 1, 2023
2f92bb0
remove thrift get
Mar 2, 2023
da30d5e
remove get
Mar 2, 2023
bf367e4
linting
Mar 2, 2023
2d17807
linting
Mar 2, 2023
3a62e2d
linting
Mar 2, 2023
eb71f4c
cleanup
Mar 2, 2023
1ccd7e2
lint
Mar 2, 2023
48a7624
comment
Mar 2, 2023
53a4df0
x
Mar 2, 2023
210cbb9
reduce pool timeout for pool test
Mar 2, 2023
654ad09
newline
Mar 2, 2023
272f80d
remove pool test
Mar 2, 2023
ffaf766
import
Mar 3, 2023
19f26cf
cleanup
Mar 6, 2023
d49b2d8
black
Mar 6, 2023
9e13d34
linting
Mar 6, 2023
6d8d967
lint
Mar 6, 2023
cc9714f
linting
Mar 6, 2023
c77c67f
remove create_queue
Mar 6, 2023
b8cfff8
l
Mar 6, 2023
442c20d
mypy
Mar 7, 2023
5ec4da3
black
Mar 7, 2023
ed0d328
cleanup and monkeypatch gevent
Mar 9, 2023
f171e10
exception
Mar 9, 2023
38464cf
lint
Mar 15, 2023
a456dc8
catch gevent errors
Mar 16, 2023
18242c7
lint
Mar 16, 2023
855de0f
remove from trace publisher and sidecar init
Mar 17, 2023
097d4d0
Merge branch 'develop' into replace_posix_queue_event_publishing
Mar 18, 2023
b2c2ff0
lint
Mar 20, 2023
8da60eb
lint
Mar 20, 2023
608a446
fix
Mar 20, 2023
adea15c
add scripts, remove signal handling for remote queue
Mar 20, 2023
14ecb2e
lint
Mar 20, 2023
86f3eca
typos
Mar 21, 2023
9ba9170
x
Mar 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ all: thrift
THRIFT=thrift
THRIFT_OPTS=-strict -gen py:slots
THRIFT_BUILDDIR=build/thrift
THRIFT_SOURCE=baseplate/thrift/baseplate.thrift tests/integration/test.thrift
THRIFT_SOURCE=baseplate/thrift/baseplate.thrift baseplate/thrift/message_queue/message_queue.thrift tests/integration/test.thrift
THRIFT_BUILDSTAMPS=$(patsubst %,$(THRIFT_BUILDDIR)/%_buildstamp,$(THRIFT_SOURCE))

thrift: $(THRIFT_BUILDSTAMPS)
Expand All @@ -22,6 +22,13 @@ $(THRIFT_BUILDDIR)/baseplate/thrift/baseplate.thrift_buildstamp: baseplate/thrif
rm -f baseplate/thrift/BaseplateServiceV2-remote
touch $@

$(THRIFT_BUILDDIR)/baseplate/thrift/message_queue/message_queue.thrift_buildstamp: baseplate/thrift/message_queue/message_queue.thrift
mkdir -p $(THRIFT_BUILDDIR)/$<
$(THRIFT) $(THRIFT_OPTS) -out $(THRIFT_BUILDDIR)/$< $<
cp -r $(THRIFT_BUILDDIR)/$</message_queue/thrift/ baseplate/thrift/message_queue
rm -f baseplate/thrift/message_queue/RemoteMessageQueueService-remote
touch $@

$(THRIFT_BUILDDIR)/tests/integration/test.thrift_buildstamp: tests/integration/test.thrift
mkdir -p $(THRIFT_BUILDDIR)/$<
$(THRIFT) $(THRIFT_OPTS) -out $(THRIFT_BUILDDIR)/$< $<
Expand Down
29 changes: 25 additions & 4 deletions baseplate/lib/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from typing import Any
from typing import Callable
from typing import Generic
from typing import Optional
from typing import TypeVar

from thrift import TSerialization
Expand All @@ -23,6 +24,7 @@
from baseplate.clients import ContextFactory
from baseplate.lib import config
from baseplate.lib.message_queue import MessageQueue
from baseplate.lib.message_queue import PosixMessageQueue
from baseplate.lib.message_queue import TimedOutError


Expand Down Expand Up @@ -90,13 +92,23 @@ class EventQueue(ContextFactory, config.Parser, Generic[T]):
:param event_serializer: A callable that takes an event object
and returns serialized bytes ready to send on the wire. See below for
options.
:param queue: An optional MessageQueue that will be used for queueing and
publishing messages. If no queue is provided, a PosixMessageQueue will
be used.

"""

def __init__(self, name: str, event_serializer: Callable[[T], bytes]):
self.queue = MessageQueue(
"/events-" + name, max_messages=MAX_QUEUE_SIZE, max_message_size=MAX_EVENT_SIZE
)
def __init__(
self,
name: str,
event_serializer: Callable[[T], bytes],
queue: Optional[MessageQueue] = None,
):
if queue:
self.queue = queue
else:
self.queue = PosixMessageQueue("/events-" + name, MAX_QUEUE_SIZE, MAX_EVENT_SIZE)

self.serialize_event = event_serializer

def put(self, event: T) -> None:
Expand All @@ -122,6 +134,15 @@ def put(self, event: T) -> None:
except TimedOutError:
raise EventQueueFullError

def get(self) -> bytes:
"""Get an event from the queue.

:returns bytes: The next event in the queue.
:raises: :py:exc:`TimedOutError` There were no elements in the queue.

"""
return self.queue.get()

def make_object_for_context(self, name: str, span: Span) -> "EventQueue[T]":
return self

Expand Down
Loading