Skip to content

Commit

Permalink
delta as enum, delta recovery, add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia committed Oct 6, 2024
1 parent 94a318d commit 8fb4d82
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 38 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ jobs:
uses: crazy-max/ghaction-setup-docker@v3

- name: Start Centrifugo
run: docker run -d -p 8000:8000 -e CENTRIFUGO_TOKEN_HMAC_SECRET_KEY="secret" -e CENTRIFUGO_PRESENCE=1 -e CENTRIFUGO_JOIN_LEAVE=true -e CENTRIFUGO_FORCE_PUSH_JOIN_LEAVE=true -e CENTRIFUGO_HISTORY_TTL=300s -e CENTRIFUGO_HISTORY_SIZE=100 -e CENTRIFUGO_FORCE_RECOVERY=true -e CENTRIFUGO_USER_SUBSCRIBE_TO_PERSONAL=true -e CENTRIFUGO_ALLOW_PUBLISH_FOR_SUBSCRIBER=true -e CENTRIFUGO_ALLOW_PRESENCE_FOR_SUBSCRIBER=true -e CENTRIFUGO_ALLOW_HISTORY_FOR_SUBSCRIBER=true centrifugo/centrifugo:v5 centrifugo
run: docker run -d -p 8000:8000 -e CENTRIFUGO_TOKEN_HMAC_SECRET_KEY="secret" -e CENTRIFUGO_PRESENCE=true -e CENTRIFUGO_JOIN_LEAVE=true -e CENTRIFUGO_FORCE_PUSH_JOIN_LEAVE=true -e CENTRIFUGO_HISTORY_TTL=300s -e CENTRIFUGO_HISTORY_SIZE=100 -e CENTRIFUGO_FORCE_RECOVERY=true -e CENTRIFUGO_USER_SUBSCRIBE_TO_PERSONAL=true -e CENTRIFUGO_ALLOW_PUBLISH_FOR_SUBSCRIBER=true -e CENTRIFUGO_ALLOW_PRESENCE_FOR_SUBSCRIBER=true -e CENTRIFUGO_ALLOW_HISTORY_FOR_SUBSCRIBER=true -e CENTRIFUGO_DELTA_PUBLISH=true -e CENTRIFUGO_ALLOWED_DELTA_TYPES="fossil" centrifugo/centrifugo:v5 centrifugo

- name: Install dependencies
run: |
Expand Down
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ When using Protobuf protocol:

Event callbacks are called by SDK using `await` internally, the websocket connection read loop is blocked for the time SDK waits for the callback to be executed. This means that if you need to perform long operations in callbacks consider moving the work to a separate coroutine/task to return fast and continue reading data from the websocket.

The fact WebSocket read is blocked for the time we execute callbacks means that you can not call awaitable SDK APIs from callback – because SDK does not have a chance to read the reply. You will get `OperationTimeoutError` exception. The rule is the same - do the work asynchronously, for example use `asyncio.ensure_future`.
The fact WebSocket read is blocked for the time we execute callbacks means that you can not call awaitable SDK APIs from callback – because SDK does not have a chance to read the reply. You will get `OperationTimeoutError` exception. The rule is the same - do the work asynchronously, for example use `asyncio.ensure_future`.

## Run example

Expand Down Expand Up @@ -85,9 +85,11 @@ To run tests, first start Centrifugo server:

```bash
docker pull centrifugo/centrifugo:v5
docker run -d -p 8000:8000 -e CENTRIFUGO_TOKEN_HMAC_SECRET_KEY="secret" -e CENTRIFUGO_PRESENCE=1 \
docker run -d -p 8000:8000 -e CENTRIFUGO_LOG_LEVEL=trace \
-e CENTRIFUGO_TOKEN_HMAC_SECRET_KEY="secret" -e CENTRIFUGO_PRESENCE=true \
-e CENTRIFUGO_JOIN_LEAVE=true -e CENTRIFUGO_FORCE_PUSH_JOIN_LEAVE=true \
-e CENTRIFUGO_HISTORY_TTL=300s -e CENTRIFUGO_HISTORY_SIZE=100 \
-e CENTRIFUGO_DELTA_PUBLISH=true -e CENTRIFUGO_ALLOWED_DELTA_TYPES="fossil" \
-e CENTRIFUGO_FORCE_RECOVERY=true -e CENTRIFUGO_USER_SUBSCRIBE_TO_PERSONAL=true \
-e CENTRIFUGO_ALLOW_PUBLISH_FOR_SUBSCRIBER=true -e CENTRIFUGO_ALLOW_PRESENCE_FOR_SUBSCRIBER=true \
-e CENTRIFUGO_ALLOW_HISTORY_FOR_SUBSCRIBER=true centrifugo/centrifugo:v5 centrifugo
Expand Down
3 changes: 2 additions & 1 deletion centrifuge/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Main module of a Centrifuge Python client library."""

from .client import Client, ClientState, Subscription, SubscriptionState
from .client import Client, ClientState, Subscription, SubscriptionState, DeltaType
from .contexts import (
ConnectedContext,
ConnectingContext,
Expand Down Expand Up @@ -50,6 +50,7 @@
"ClientState",
"ConnectedContext",
"ConnectingContext",
"DeltaType",
"DisconnectedContext",
"DuplicateSubscriptionError",
"ErrorContext",
Expand Down
22 changes: 12 additions & 10 deletions centrifuge/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
Union,
List,
Callable,
Literal,
)

import websockets
Expand Down Expand Up @@ -119,6 +118,13 @@ class _ServerSubscription:
recoverable: bool


class DeltaType(Enum):
FOSSIL = "fossil"

def __str__(self) -> str:
return self.value


class Client:
"""Client is a websocket client to Centrifuge/Centrifugo server."""

Expand Down Expand Up @@ -202,7 +208,7 @@ def new_subscription(
positioned: bool = False,
recoverable: bool = False,
join_leave: bool = False,
delta: Literal["fossil", ""] = "",
delta: Optional[DeltaType] = None,
) -> "Subscription":
"""Creates new subscription to channel. If subscription already exists then
DuplicateSubscriptionError exception will be raised.
Expand Down Expand Up @@ -786,7 +792,7 @@ def _construct_subscribe_command(self, sub: "Subscription", cmd_id: int) -> Dict
subscribe["offset"] = sub._offset

if sub._delta:
subscribe["delta"] = sub._delta
subscribe["delta"] = sub._delta.value

command = {
"id": cmd_id,
Expand Down Expand Up @@ -1359,7 +1365,7 @@ def _initialize(
positioned: bool = False,
recoverable: bool = False,
join_leave: bool = False,
delta: Literal["fossil", ""] = "",
delta: Optional[DeltaType] = None,
) -> None:
"""Initializes Subscription instance.
Note: use Client.new_subscription method to create new subscriptions in your app.
Expand All @@ -1386,7 +1392,7 @@ def _initialize(
self._epoch: str = ""
self._prev_data: Optional[Any] = None

if delta not in {"fossil", ""}:
if delta and delta not in {DeltaType.FOSSIL}:
raise CentrifugeError("unsupported delta format")
self._delta = delta
self._delta_negotiated: bool = False
Expand Down Expand Up @@ -1582,12 +1588,8 @@ async def _move_subscribed(self, subscribe: Dict[str, Any]) -> None:

publications = subscribe.get("publications", [])
if publications:
on_publication_handler = self.events.on_publication
for pub in publications:
publication = self._client._publication_from_proto(pub)
await on_publication_handler(PublicationContext(pub=publication))
if publication.offset > 0:
self._offset = publication.offset
await self._process_publication(pub)

self._clear_subscribing_state()

Expand Down
8 changes: 4 additions & 4 deletions centrifuge/codecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ def decode_replies(data):
def apply_delta_if_needed(prev_data: bytes, pub: "Publication"):
if pub.delta:
prev_data = apply_delta(prev_data, pub.data.encode("utf-8"))
new_data = prev_data.decode("utf-8")
new_data = json.loads(prev_data)
else:
prev_data = pub.data.encode("utf-8")
new_data = pub.data
new_data = json.loads(pub.data)
return new_data, prev_data


Expand Down Expand Up @@ -92,8 +92,8 @@ def decode_replies(data: bytes):
def apply_delta_if_needed(prev_data: bytes, pub: "Publication"):
if pub.delta:
prev_data = apply_delta(prev_data, pub.data)
new_data = prev_data.decode("utf-8")
new_data = prev_data
else:
prev_data = pub.data
new_data = pub.data.decode("utf-8")
new_data = pub.data
return new_data, prev_data
85 changes: 65 additions & 20 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
import hashlib
from typing import List

import centrifuge.client
from centrifuge import (
Client,
ClientState,
DeltaType,
SubscriptionState,
PublicationContext,
SubscribedContext,
Expand Down Expand Up @@ -131,34 +133,63 @@ async def test_subscription_operations(self) -> None:
class TestPubSub(unittest.IsolatedAsyncioTestCase):
async def test_pub_sub(self) -> None:
for use_protobuf in (False, True):
with self.subTest(use_protobuf=use_protobuf):
await self._test_pub_sub(use_protobuf=use_protobuf)
for delta in (None, centrifuge.DeltaType.FOSSIL):
with self.subTest(use_protobuf=use_protobuf, delta=delta):
await self._test_pub_sub(use_protobuf=use_protobuf, delta=delta)

async def _test_pub_sub(self, use_protobuf=False) -> None:
async def _test_pub_sub(self, use_protobuf=False, delta=None) -> None:
client = Client(
"ws://localhost:8000/connection/websocket",
use_protobuf=use_protobuf,
get_token=test_get_client_token,
)

future = asyncio.Future()
future1 = asyncio.Future()
future2 = asyncio.Future()
future3 = asyncio.Future()

async def on_publication(ctx: PublicationContext) -> None:
future.set_result(ctx.pub.data)
if not future1.done():
future1.set_result(ctx.pub.data)
elif not future2.done():
future2.set_result(ctx.pub.data)
else:
future3.set_result(ctx.pub.data)

sub = client.new_subscription(
"pub_sub_channel" + uuid.uuid4().hex, get_token=test_get_subscription_token
"pub_sub_channel" + uuid.uuid4().hex,
get_token=test_get_subscription_token,
delta=delta,
)
sub.events.on_publication = on_publication

await client.connect()
await sub.subscribe()
payload = {"input": "test"}
payload1 = {
"input": "test message which is long enough for fossil "
"delta to be applied on the server side."
}
if use_protobuf:
payload = json.dumps(payload).encode()
await sub.publish(data=payload)
result = await future
self.assertEqual(result, payload)
payload1 = json.dumps(payload1).encode()

await sub.publish(data=payload1)
result = await future1
self.assertEqual(result, payload1)

# let's test fossil delta publishing the same.
payload2 = payload1
await sub.publish(data=payload2)
result = await future2
self.assertEqual(result, payload2)

another_payload = {"input": "hello"}
if use_protobuf:
another_payload = json.dumps(another_payload).encode()

await sub.publish(data=another_payload)
result = await future3
self.assertEqual(result, another_payload)

await client.disconnect()


Expand Down Expand Up @@ -222,10 +253,11 @@ async def on_leave(ctx: LeaveContext) -> None:
class TestAutoRecovery(unittest.IsolatedAsyncioTestCase):
async def test_auto_recovery(self) -> None:
for use_protobuf in (False, True):
with self.subTest(use_protobuf=use_protobuf):
await self._test_auto_recovery(use_protobuf=use_protobuf)
for delta in (None, DeltaType.FOSSIL):
with self.subTest(use_protobuf=use_protobuf, delta=delta):
await self._test_auto_recovery(use_protobuf=use_protobuf, delta=delta)

async def _test_auto_recovery(self, use_protobuf=False) -> None:
async def _test_auto_recovery(self, use_protobuf=False, delta=None) -> None:
client1 = Client(
"ws://localhost:8000/connection/websocket",
use_protobuf=use_protobuf,
Expand All @@ -240,10 +272,16 @@ async def _test_auto_recovery(self, use_protobuf=False) -> None:

# First subscribe both clients to the same channel.
channel = "recovery_channel" + uuid.uuid4().hex
sub1 = client1.new_subscription(channel, get_token=test_get_subscription_token)
sub2 = client2.new_subscription(channel, get_token=test_get_subscription_token)
sub1 = client1.new_subscription(
channel, get_token=test_get_subscription_token, delta=delta
)
sub2 = client2.new_subscription(
channel, get_token=test_get_subscription_token, delta=delta
)

num_messages = 10

futures: List[asyncio.Future] = [asyncio.Future() for _ in range(5)]
futures: List[asyncio.Future] = [asyncio.Future() for _ in range(num_messages)]

async def on_publication(ctx: PublicationContext) -> None:
futures[ctx.pub.offset - 1].set_result(ctx.pub.data)
Expand All @@ -263,10 +301,15 @@ async def on_subscribed(ctx: SubscribedContext) -> None:
# Now disconnect client1 and publish some messages using client2.
await client1.disconnect()

for _ in range(10):
payload = {"input": "test"}
payloads = []
for i in range(num_messages):
if i % 2 == 0:
payload = {"input": "I just subscribed to channel " + str(i)}
else:
payload = {"input": "Hi from Java " + str(i)}
if use_protobuf:
payload = json.dumps(payload).encode()
payloads.append(payload)
await sub2.publish(data=payload)

async def on_subscribed_after_recovery(ctx: SubscribedContext) -> None:
Expand All @@ -278,7 +321,9 @@ async def on_subscribed_after_recovery(ctx: SubscribedContext) -> None:
# Now reconnect client1 and check that it receives all missed messages.
await client1.connect()
results = await asyncio.gather(*futures)
self.assertEqual(len(results), 5)
self.assertEqual(len(results), num_messages)
for i, result in enumerate(results):
self.assertEqual(result, payloads[i])
await client1.disconnect()
await client2.disconnect()

Expand Down

0 comments on commit 8fb4d82

Please sign in to comment.