Skip to content

Commit

Permalink
Rebuild observable class
Browse files Browse the repository at this point in the history
  • Loading branch information
hzrd149 committed Feb 15, 2024
1 parent 8879641 commit e29658a
Show file tree
Hide file tree
Showing 40 changed files with 370 additions and 341 deletions.
5 changes: 5 additions & 0 deletions .changeset/clever-swans-walk.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"nostrudel": patch
---

Rebuild observable class
2 changes: 2 additions & 0 deletions .prettierrc
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
{
"tabWidth": 2,
"useTabs": false,
"printWidth": 120
}
3 changes: 2 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@
"webln"
],
"typescript.enablePromptUseWorkspaceTsdk": true,
"typescript.tsdk": "node_modules/typescript/lib"
"typescript.tsdk": "node_modules/typescript/lib",
"deno.enable": false
}
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@
"three-spritetext": "^1.8.1",
"three-stdlib": "^2.29.4",
"webln": "^0.3.2",
"yet-another-react-lightbox": "^3.15.6"
"yet-another-react-lightbox": "^3.15.6",
"zen-observable": "^0.10.0"
},
"devDependencies": {
"@changesets/cli": "^2.27.1",
Expand All @@ -89,6 +90,7 @@
"@types/react-dom": "^18.2.18",
"@types/three": "^0.160.0",
"@types/webscopeio__react-textarea-autocomplete": "^4.7.5",
"@types/zen-observable": "^0.8.7",
"@vitejs/plugin-react": "^4.2.1",
"camelcase": "^8.0.0",
"prettier": "^3.1.1",
Expand Down
64 changes: 64 additions & 0 deletions src/classes/controlled-observable.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import Observable from "zen-observable";

export default class ControlledObservable<T> implements Observable<T> {
private observable: Observable<T>;
private subscriptions = new Set<ZenObservable.SubscriptionObserver<T>>();
private _complete = false;
get closed() {
return this._complete;
}
get used() {
return this.subscriptions.size > 0;
}

constructor(subscriber?: ZenObservable.Subscriber<T>) {
this.observable = new Observable((observer) => {
this.subscriptions.add(observer);
const cleanup = subscriber && subscriber(observer);
return () => {
this.subscriptions.delete(observer);
if (typeof cleanup === "function") cleanup();
else if (cleanup?.unsubscribe) cleanup.unsubscribe();
};
});

this.subscribe = this.observable.subscribe.bind(this.observable);
this.map = this.observable.map.bind(this.observable);
this.flatMap = this.observable.flatMap.bind(this.observable);
this.forEach = this.observable.forEach.bind(this.observable);
this.reduce = this.observable.reduce.bind(this.observable);
this.filter = this.observable.filter.bind(this.observable);
this.concat = this.observable.concat.bind(this.observable);
}

next(v: T) {
if (this._complete) return;
for (const observer of this.subscriptions) {
observer.next(v);
}
}
error(err: any) {
if (this._complete) return;
for (const observer of this.subscriptions) {
observer.error(err);
}
}
complete() {
if (this._complete) return;
this._complete = true;
for (const observer of this.subscriptions) {
observer.complete();
}
}

[Symbol.observable]() {
return this.observable;
}
subscribe: Observable<T>["subscribe"];
map: Observable<T>["map"];
flatMap: Observable<T>["flatMap"];
forEach: Observable<T>["forEach"];
reduce: Observable<T>["reduce"];
filter: Observable<T>["filter"];
concat: Observable<T>["concat"];
}
72 changes: 37 additions & 35 deletions src/classes/event-store.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { getEventUID, isReplaceable, sortByDate } from "../helpers/nostr/events";
import replaceableEventLoaderService from "../services/replaceable-event-requester";
import { NostrEvent, isDTag } from "../types/nostr-event";
import Subject from "./subject";
import { NostrEvent } from "nostr-tools";
import { getEventUID, sortByDate } from "../helpers/nostr/events";
import ControlledObservable from "./controlled-observable";
import SuperMap from "./super-map";
import deleteEventService from "../services/delete-events";

export type EventFilter = (event: NostrEvent, store: EventStore) => boolean;

Expand All @@ -11,20 +12,27 @@ export default class EventStore {

customSort?: typeof sortByDate;

private deleteSub: ZenObservable.Subscription;

constructor(name?: string, customSort?: typeof sortByDate) {
this.name = name;
this.customSort = customSort;

this.deleteSub = deleteEventService.stream.subscribe((event) => {
const uid = getEventUID(event);
this.deleteEvent(uid);
if (uid !== event.id) this.deleteEvent(event.id);
});
}

getSortedEvents() {
return Array.from(this.events.values()).sort(this.customSort || sortByDate);
}

onEvent = new Subject<NostrEvent>(undefined, false);
onDelete = new Subject<string>(undefined, false);
onClear = new Subject(undefined, false);
onEvent = new ControlledObservable<NostrEvent>();
onDelete = new ControlledObservable<string>();
onClear = new ControlledObservable();

private replaceableEventSubs = new Map<string, Subject<NostrEvent>>();
private handleEvent(event: NostrEvent) {
const id = getEventUID(event);
const existing = this.events.get(id);
Expand All @@ -37,16 +45,6 @@ export default class EventStore {
addEvent(event: NostrEvent) {
const id = getEventUID(event);
this.handleEvent(event);

if (isReplaceable(event.kind)) {
// pass the event on
replaceableEventLoaderService.handleEvent(event);

// subscribe to any future changes
const sub = replaceableEventLoaderService.getEvent(event.kind, event.pubkey, event.tags.find(isDTag)?.[1]);
sub.subscribe(this.handleEvent, this);
this.replaceableEventSubs.set(id, sub);
}
}
getEvent(id: string) {
return this.events.get(id);
Expand All @@ -56,32 +54,36 @@ export default class EventStore {
this.events.delete(id);
this.onDelete.next(id);
}

if (this.replaceableEventSubs.has(id)) {
this.replaceableEventSubs.get(id)?.unsubscribe(this.handleEvent, this);
this.replaceableEventSubs.delete(id);
}
}

clear() {
this.events.clear();
this.onClear.next(undefined);

for (const [_, sub] of this.replaceableEventSubs) {
sub.unsubscribe(this.handleEvent, this);
}
}
cleanup() {
this.clear();
}

connect(other: EventStore) {
other.onEvent.subscribe(this.addEvent, this);
other.onDelete.subscribe(this.deleteEvent, this);
private storeSubs = new SuperMap<EventStore, ZenObservable.Subscription[]>(() => []);
connect(other: EventStore, fullSync = true) {
const subs = this.storeSubs.get(other);
subs.push(
other.onEvent.subscribe((e) => {
if (fullSync || this.events.has(getEventUID(e))) this.addEvent(e);
}),
);
subs.push(other.onDelete.subscribe(this.deleteEvent.bind(this)));
}
disconnect(other: EventStore) {
other.onEvent.unsubscribe(this.addEvent, this);
other.onDelete.unsubscribe(this.deleteEvent, this);
const subs = this.storeSubs.get(other);
for (const sub of subs) sub.unsubscribe();
this.storeSubs.delete(other);
}

cleanup() {
this.clear();
for (const [_, subs] of this.storeSubs) {
for (const sub of subs) sub.unsubscribe();
}
this.storeSubs.clear();
this.deleteSub.unsubscribe();
}

getFirstEvent(nth = 0, filter?: EventFilter) {
Expand Down
19 changes: 11 additions & 8 deletions src/classes/nostr-multi-subscription.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { nanoid } from "nanoid";

import { Subject } from "./subject";
import { NostrEvent } from "../types/nostr-event";
import { NostrOutgoingRequest, NostrRequestFilter, RelayQueryMap } from "../types/nostr-query";
import Relay, { IncomingEvent } from "./relay";
import relayPoolService from "../services/relay-pool";
import { isFilterEqual, isQueryMapEqual } from "../helpers/nostr/filter";
import ControlledObservable from "./controlled-observable";
import SuperMap from "./super-map";

export default class NostrMultiSubscription {
static INIT = "initial";
Expand All @@ -18,7 +19,7 @@ export default class NostrMultiSubscription {

relays: Relay[] = [];
state = NostrMultiSubscription.INIT;
onEvent = new Subject<NostrEvent>();
onEvent = new ControlledObservable<NostrEvent>();
seenEvents = new Set<string>();

constructor(name?: string) {
Expand All @@ -36,18 +37,20 @@ export default class NostrMultiSubscription {
}
}

private relaySubs = new SuperMap<Relay, ZenObservable.Subscription[]>(() => []);
/** listen for event and open events from relays */
private connectToRelay(relay: Relay) {
relay.onEvent.subscribe(this.handleEvent, this);
relay.onOpen.subscribe(this.handleRelayConnect, this);
relay.onClose.subscribe(this.handleRelayDisconnect, this);
const subs = this.relaySubs.get(relay);
subs.push(relay.onEvent.subscribe(this.handleEvent.bind(this)));
subs.push(relay.onOpen.subscribe(this.handleRelayConnect.bind(this)));
subs.push(relay.onClose.subscribe(this.handleRelayDisconnect.bind(this)));
relayPoolService.addClaim(relay.url, this);
}
/** stop listing to events from relays */
private disconnectFromRelay(relay: Relay) {
relay.onEvent.unsubscribe(this.handleEvent, this);
relay.onOpen.unsubscribe(this.handleRelayConnect, this);
relay.onClose.unsubscribe(this.handleRelayDisconnect, this);
const subs = this.relaySubs.get(relay);
for (const sub of subs) sub.unsubscribe();
this.relaySubs.delete(relay);
relayPoolService.removeClaim(relay.url, this);

// if the subscription is open and had sent a request to the relay
Expand Down
12 changes: 8 additions & 4 deletions src/classes/nostr-publish-action.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import { NostrEvent } from "nostr-tools";
import relayPoolService from "../services/relay-pool";
import createDefer from "./deferred";
import Relay, { IncomingCommandResult } from "./relay";
import Subject, { PersistentSubject } from "./subject";
import { PersistentSubject } from "./subject";
import ControlledObservable from "./controlled-observable";
import SuperMap from "./super-map";

export default class NostrPublishAction {
id = nanoid();
Expand All @@ -13,10 +15,11 @@ export default class NostrPublishAction {
event: NostrEvent;

results = new PersistentSubject<IncomingCommandResult[]>([]);
onResult = new Subject<IncomingCommandResult>(undefined, false);
onResult = new ControlledObservable<IncomingCommandResult>();
onComplete = createDefer<IncomingCommandResult[]>();

private remaining = new Set<Relay>();
private relayResultSubs = new SuperMap<Relay, ZenObservable.Subscription[]>(() => []);

constructor(label: string, relays: Iterable<string>, event: NostrEvent, timeout: number = 5000) {
this.label = label;
Expand All @@ -26,7 +29,7 @@ export default class NostrPublishAction {
for (const url of relays) {
const relay = relayPoolService.requestRelay(url);
this.remaining.add(relay);
relay.onCommandResult.subscribe(this.handleResult, this);
this.relayResultSubs.get(relay).push(relay.onCommandResult.subscribe(this.handleResult.bind(this)));

// send event
relay.send(["EVENT", event]);
Expand All @@ -42,7 +45,8 @@ export default class NostrPublishAction {

this.onResult.next(result);

relay.onCommandResult.unsubscribe(this.handleResult, this);
this.relayResultSubs.get(relay).forEach((s) => s.unsubscribe());
this.relayResultSubs.delete(relay);
this.remaining.delete(relay);
if (this.remaining.size === 0) this.onComplete.resolve(this.results.value);
}
Expand Down
28 changes: 16 additions & 12 deletions src/classes/nostr-request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,36 @@ import { CountResponse, NostrEvent } from "../types/nostr-event";
import { NostrRequestFilter } from "../types/nostr-query";
import relayPoolService from "../services/relay-pool";
import Relay, { IncomingCount, IncomingEOSE, IncomingEvent } from "./relay";
import Subject from "./subject";
import createDefer from "./deferred";
import ControlledObservable from "./controlled-observable";
import SuperMap from "./super-map";

const REQUEST_DEFAULT_TIMEOUT = 1000 * 5;
export default class NostrRequest {
static IDLE = "idle";
static RUNNING = "running";
static COMPLETE = "complete";

id: string;
id = nanoid();
timeout: number;
relays: Set<Relay>;
state = NostrRequest.IDLE;
onEvent = new Subject<NostrEvent>(undefined, false);
onCount = new Subject<CountResponse>(undefined, false);
onEvent = new ControlledObservable<NostrEvent>();
onCount = new ControlledObservable<CountResponse>();
/** @deprecated */
onComplete = createDefer<void>();
seenEvents = new Set<string>();

private relaySubs: SuperMap<Relay, ZenObservable.Subscription[]> = new SuperMap(() => []);

constructor(relayUrls: Iterable<string>, timeout?: number) {
this.id = nanoid();
this.relays = new Set(Array.from(relayUrls).map((url) => relayPoolService.requestRelay(url)));

for (const relay of this.relays) {
relay.onEOSE.subscribe(this.handleEOSE, this);
relay.onEvent.subscribe(this.handleEvent, this);
relay.onCount.subscribe(this.handleCount, this);
const subs = this.relaySubs.get(relay);
subs.push(relay.onEOSE.subscribe(this.handleEOSE.bind(this)));
subs.push(relay.onEvent.subscribe(this.handleEvent.bind(this)));
subs.push(relay.onCount.subscribe(this.handleCount.bind(this)));
}

this.timeout = timeout ?? REQUEST_DEFAULT_TIMEOUT;
Expand All @@ -40,8 +44,8 @@ export default class NostrRequest {
this.relays.delete(relay);
relay.send(["CLOSE", this.id]);

relay.onEOSE.unsubscribe(this.handleEOSE, this);
relay.onEvent.unsubscribe(this.handleEvent, this);
this.relaySubs.get(relay).forEach((sub) => sub.unsubscribe());
this.relaySubs.delete(relay);

if (this.relays.size === 0) {
this.state = NostrRequest.COMPLETE;
Expand Down Expand Up @@ -87,9 +91,9 @@ export default class NostrRequest {
this.state = NostrRequest.COMPLETE;
for (const relay of this.relays) {
relay.send(["CLOSE", this.id]);
relay.onEOSE.unsubscribe(this.handleEOSE, this);
relay.onEvent.unsubscribe(this.handleEvent, this);
this.relaySubs.get(relay).forEach((sub) => sub.unsubscribe());
}
this.relaySubs.clear();
this.onComplete.resolve();

return this;
Expand Down
Loading

0 comments on commit e29658a

Please sign in to comment.