Skip to content

Commit

Permalink
Merge pull request #19 from y-crdt/unobserve-events
Browse files Browse the repository at this point in the history
Unobserve subscription pattern for events
  • Loading branch information
dmonad authored Apr 1, 2022
2 parents 02d54f8 + 7a71c81 commit 159d52c
Show file tree
Hide file tree
Showing 9 changed files with 123 additions and 92 deletions.
37 changes: 21 additions & 16 deletions src/y_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use pyo3::exceptions::{PyIndexError, PyTypeError};
use pyo3::prelude::*;
use pyo3::types::{PyList, PySlice, PySliceIndices};
use yrs::types::array::{ArrayEvent, ArrayIter};
use yrs::{Array, Subscription, Transaction};
use yrs::{Array, SubscriptionId, Transaction};

/// A collection used to store data in an indexed sequence structure. This type is internally
/// implemented as a double linked list, which may squash values inserted directly one after another
Expand Down Expand Up @@ -185,24 +185,38 @@ impl YArray {

/// Subscribes to all operations happening over this instance of `YArray`. All changes are
/// batched and eventually triggered during transaction commit phase.
/// Returns an `YObserver` which, when free'd, will unsubscribe current callback.
pub fn observe(&mut self, f: PyObject) -> PyResult<YArrayObserver> {
/// Returns a `SubscriptionId` which can be used to cancel the callback with `unobserve`.
pub fn observe(&mut self, f: PyObject) -> PyResult<SubscriptionId> {
match &mut self.0 {
SharedType::Integrated(v) => Ok(v
.observe(move |txn, e| {
SharedType::Integrated(array) => {
let subscription = array.observe(move |txn, e| {
Python::with_gil(|py| {
let event = YArrayEvent::new(e, txn);
if let Err(err) = f.call1(py, (event,)) {
err.restore(py)
}
})
})
.into()),
});
Ok(subscription.into())
}
SharedType::Prelim(_) => Err(PyTypeError::new_err(
"Cannot observe a preliminary type. Must be added to a YDoc first",
)),
}
}

/// Cancels the callback of an observer using the Subscription ID returned from the `observe` method.
pub fn unobserve(&mut self, subscription_id: SubscriptionId) -> PyResult<()> {
match &mut self.0 {
SharedType::Integrated(v) => {
v.unobserve(subscription_id);
Ok(())
}
SharedType::Prelim(_) => Err(PyTypeError::new_err(
"Cannot call unobserve on a preliminary type. Must be added to a YDoc first",
)),
}
}
}

impl YArray {
Expand Down Expand Up @@ -407,12 +421,3 @@ impl YArrayEvent {
}
}
}

#[pyclass(unsendable)]
pub struct YArrayObserver(Subscription<ArrayEvent>);

impl From<Subscription<ArrayEvent>> for YArrayObserver {
fn from(o: Subscription<ArrayEvent>) -> Self {
YArrayObserver(o)
}
}
25 changes: 14 additions & 11 deletions src/y_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::collections::HashMap;
use std::mem::ManuallyDrop;
use std::ops::DerefMut;
use yrs::types::map::{MapEvent, MapIter};
use yrs::{Map, Subscription, Transaction};
use yrs::{Map, SubscriptionId, Transaction};

use crate::shared_types::SharedType;
use crate::type_conversions::{PyValueWrapper, ToPython};
Expand Down Expand Up @@ -172,7 +172,7 @@ impl YMap {
YMapKeyIterator(self.items())
}

pub fn observe(&mut self, f: PyObject) -> PyResult<YMapObserver> {
pub fn observe(&mut self, f: PyObject) -> PyResult<SubscriptionId> {
match &mut self.0 {
SharedType::Integrated(v) => Ok(v
.observe(move |txn, e| {
Expand All @@ -189,6 +189,18 @@ impl YMap {
)),
}
}
/// Cancels the observer callback associated with the `subscripton_id`.
pub fn unobserve(&mut self, subscription_id: SubscriptionId) -> PyResult<()> {
match &mut self.0 {
SharedType::Integrated(map) => {
map.unobserve(subscription_id);
Ok(())
}
SharedType::Prelim(_) => Err(PyTypeError::new_err(
"Cannot unobserve a preliminary type. Must be added to a YDoc first",
)),
}
}
}

pub enum InnerYMapIterator {
Expand Down Expand Up @@ -316,12 +328,3 @@ impl YMapEvent {
}
}
}

#[pyclass(unsendable)]
pub struct YMapObserver(Subscription<MapEvent>);

impl From<Subscription<MapEvent>> for YMapObserver {
fn from(o: Subscription<MapEvent>) -> Self {
YMapObserver(o)
}
}
25 changes: 14 additions & 11 deletions src/y_text.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use pyo3::exceptions::PyTypeError;
use pyo3::prelude::*;
use pyo3::types::PyList;
use yrs::types::text::TextEvent;
use yrs::{Subscription, Text, Transaction};
use yrs::{SubscriptionId, Text, Transaction};

use crate::shared_types::SharedType;
use crate::type_conversions::ToPython;
Expand Down Expand Up @@ -112,7 +112,7 @@ impl YText {
}
}

pub fn observe(&mut self, f: PyObject) -> PyResult<YTextObserver> {
pub fn observe(&mut self, f: PyObject) -> PyResult<SubscriptionId> {
match &mut self.0 {
SharedType::Integrated(v) => Ok(v
.observe(move |txn, e| {
Expand All @@ -129,6 +129,18 @@ impl YText {
)),
}
}
/// Cancels the observer callback associated with the `subscripton_id`.
pub fn unobserve(&mut self, subscription_id: SubscriptionId) -> PyResult<()> {
match &mut self.0 {
SharedType::Integrated(text) => {
text.unobserve(subscription_id);
Ok(())
}
SharedType::Prelim(_) => Err(PyTypeError::new_err(
"Cannot unobserve a preliminary type. Must be added to a YDoc first",
)),
}
}
}

/// Event generated by `YYText.observe` method. Emitted during transaction commit phase.
Expand Down Expand Up @@ -207,12 +219,3 @@ impl YTextEvent {
}
}
}

#[pyclass(unsendable)]
pub struct YTextObserver(Subscription<TextEvent>);

impl From<Subscription<TextEvent>> for YTextObserver {
fn from(o: Subscription<TextEvent>) -> Self {
YTextObserver(o)
}
}
33 changes: 12 additions & 21 deletions src/y_xml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::mem::ManuallyDrop;
use std::ops::Deref;
use yrs::types::xml::{Attributes, TreeWalker, XmlEvent, XmlTextEvent};
use yrs::types::{EntryChange, Path, PathSegment};
use yrs::Subscription;
use yrs::SubscriptionId;
use yrs::Transaction;
use yrs::Xml;
use yrs::XmlElement;
Expand Down Expand Up @@ -164,8 +164,8 @@ impl YXmlElement {

/// Subscribes to all operations happening over this instance of `YXmlElement`. All changes are
/// batched and eventually triggered during transaction commit phase.
/// Returns an `YXmlObserver` which, when free'd, will unsubscribe current callback.
pub fn observe(&mut self, f: PyObject) -> YXmlObserver {
/// Returns an `SubscriptionId` which, can be used to unsubscribe the observer.
pub fn observe(&mut self, f: PyObject) -> SubscriptionId {
self.0
.observe(move |txn, e| {
Python::with_gil(|py| {
Expand All @@ -177,6 +177,10 @@ impl YXmlElement {
})
.into()
}
/// Cancels the observer callback associated with the `subscripton_id`.
pub fn unobserve(&mut self, subscription_id: SubscriptionId) {
self.0.unobserve(subscription_id);
}
}

/// A shared data type used for collaborative text editing, that can be used in a context of
Expand Down Expand Up @@ -295,8 +299,8 @@ impl YXmlText {

/// Subscribes to all operations happening over this instance of `YXmlText`. All changes are
/// batched and eventually triggered during transaction commit phase.
/// Returns an `YXmlObserver` which, when free'd, will unsubscribe current callback.
pub fn observe(&mut self, f: PyObject) -> YXmlTextObserver {
/// Returns an `SubscriptionId` which, which can be used to unsubscribe the callback function.
pub fn observe(&mut self, f: PyObject) -> SubscriptionId {
self.0
.observe(move |txn, e| {
Python::with_gil(|py| {
Expand All @@ -308,23 +312,10 @@ impl YXmlText {
})
.into()
}
}

#[pyclass(unsendable)]
pub struct YXmlObserver(Subscription<XmlEvent>);

impl From<Subscription<XmlEvent>> for YXmlObserver {
fn from(o: Subscription<XmlEvent>) -> Self {
YXmlObserver(o)
}
}

#[pyclass(unsendable)]
pub struct YXmlTextObserver(Subscription<XmlTextEvent>);

impl From<Subscription<XmlTextEvent>> for YXmlTextObserver {
fn from(o: Subscription<XmlTextEvent>) -> Self {
YXmlTextObserver(o)
/// Cancels the observer callback associated with the `subscripton_id`.
pub fn unobserve(&mut self, subscription_id: SubscriptionId) {
self.0.unobserve(subscription_id);
}
}

Expand Down
7 changes: 4 additions & 3 deletions tests/test_y_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ def callback(e):
target = e.target
delta = e.delta

observer = x.observe(callback)
subscription_id = x.observe(callback)

# insert initial data to an empty YArray
with d1.begin_transaction() as txn:
Expand Down Expand Up @@ -189,8 +189,9 @@ def callback(e):
target = None
delta = None

# free the observer and make sure that callback is no longer called
del observer
# Cancel the observer and make sure that callback is no longer called
x.unobserve(subscription_id)

with d1.begin_transaction() as txn:
x.insert(txn, 1, [6])

Expand Down
4 changes: 2 additions & 2 deletions tests/test_y_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def callback(e):
target = e.target
entries = e.keys

observer = x.observe(callback)
subscription_id = x.observe(callback)

# insert initial data to an empty YMap
with d1.begin_transaction() as txn:
Expand Down Expand Up @@ -121,7 +121,7 @@ def callback(e):
entries = None

# free the observer and make sure that callback is no longer called
del observer
x.unobserve(subscription_id)
with d1.begin_transaction() as txn:
x.set(txn, "key1", [6])
assert target == None
Expand Down
4 changes: 2 additions & 2 deletions tests/test_y_text.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def callback(e):

x = d1.get_text("test")

observer = x.observe(callback)
subscription_id = x.observe(callback)

# insert initial data to an empty YText
with d1.begin_transaction() as txn:
Expand Down Expand Up @@ -105,7 +105,7 @@ def callback(e):
delta = None

# free the observer and make sure that callback is no longer called
del observer
x.unobserve(subscription_id)
with d1.begin_transaction() as txn:
x.insert(txn, 1, "fgh")
assert target == None
Expand Down
8 changes: 4 additions & 4 deletions tests/test_y_xml.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def callback(e):
attributes = e.keys
delta = e.delta

observer = x.observe(callback)
subscription_id = x.observe(callback)

# set initial attributes
with d1.begin_transaction() as txn:
Expand Down Expand Up @@ -168,7 +168,7 @@ def callback(e):
delta = None

# free the observer and make sure that callback is no longer called
del observer
x.unobserve(subscription_id)
with d1.begin_transaction() as txn:
x.insert(txn, 1, "fgh")
assert target == None
Expand All @@ -192,7 +192,7 @@ def callback(e):
attributes = e.keys
nodes = e.delta

observer = x.observe(callback)
subscription_id = x.observe(callback)

# insert initial attributes
with d1.begin_transaction() as txn:
Expand Down Expand Up @@ -260,7 +260,7 @@ def callback(e):
nodes = None

# free the observer and make sure that callback is no longer called
del observer
x.unobserve(subscription_id)
with d1.begin_transaction() as txn:
x.insert_xml_element(txn, 0, "head")
assert target == None
Expand Down
Loading

0 comments on commit 159d52c

Please sign in to comment.