Skip to content

Commit

Permalink
use only one new database connection per dvm
Browse files Browse the repository at this point in the history
  • Loading branch information
believethehype committed May 31, 2024
1 parent 2e318d2 commit eb125c4
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 91 deletions.
39 changes: 18 additions & 21 deletions nostr_dvm/tasks/content_discovery_currently_popular.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,20 @@ def __init__(self, name, dvm_config: DVMConfig, nip89config: NIP89Config, nip88c
if self.logger:
init_logger(LogLevel.DEBUG)

opts = (Options().wait_for_send(False).send_timeout(timedelta(seconds=self.dvm_config.RELAY_TIMEOUT)))
sk = SecretKey.from_hex(self.dvm_config.PRIVATE_KEY)
keys = Keys.parse(sk.to_hex())
signer = NostrSigner.keys(keys)
database = NostrDatabase.sqlite(self.db_name)
self.client = ClientBuilder().signer(signer).database(database).opts(opts).build()

self.client.add_relay("wss://relay.damus.io")
self.client.add_relay("wss://nostr.oxtr.dev")
self.client.add_relay("wss://nostr21.com")

self.client.connect()


if self.dvm_config.UPDATE_DATABASE:
self.sync_db()

Expand Down Expand Up @@ -111,16 +125,11 @@ def calculate_result(self, request_form):

options = self.set_options(request_form)

database = NostrDatabase.sqlite(self.db_name)
cli = ClientBuilder().database(database).build()

# Negentropy reconciliation
# Query events from database
timestamp_hour_ago = Timestamp.now().as_secs() - self.db_since
since = Timestamp.from_secs(timestamp_hour_ago)

filter1 = Filter().kind(definitions.EventDefinitions.KIND_NOTE).since(since)
events = cli.database().query([filter1])
events = self.client.database().query([filter1])
print("[" + self.dvm_config.NIP89.NAME + "] Considering " + str(len(events)) + " Events")

ns.finallist = {}
Expand All @@ -129,7 +138,7 @@ def calculate_result(self, request_form):
filt = Filter().kinds([definitions.EventDefinitions.KIND_ZAP, definitions.EventDefinitions.KIND_REPOST,
definitions.EventDefinitions.KIND_REACTION,
definitions.EventDefinitions.KIND_NOTE]).event(event.id()).since(since)
reactions = cli.database().query([filt])
reactions = self.client.database().query([filt])
if len(reactions) >= self.min_reactions:
ns.finallist[event.id().to_hex()] = len(reactions)

Expand Down Expand Up @@ -168,18 +177,6 @@ def schedule(self, dvm_config):
return 1

def sync_db(self):
opts = (Options().wait_for_send(False).send_timeout(timedelta(seconds=self.dvm_config.RELAY_TIMEOUT)))
sk = SecretKey.from_hex(self.dvm_config.PRIVATE_KEY)
keys = Keys.parse(sk.to_hex())
signer = NostrSigner.keys(keys)
database = NostrDatabase.sqlite(self.db_name)
cli = ClientBuilder().signer(signer).database(database).opts(opts).build()

cli.add_relay("wss://relay.damus.io")
cli.add_relay("wss://nostr.oxtr.dev")
cli.add_relay("wss://nostr21.com")

cli.connect()

timestamp_hour_ago = Timestamp.now().as_secs() - self.db_since
lasthour = Timestamp.from_secs(timestamp_hour_ago)
Expand All @@ -191,9 +188,9 @@ def sync_db(self):
print("[" + self.dvm_config.NIP89.NAME + "] Syncing notes of the last " + str(
self.db_since) + " seconds.. this might take a while..")
dbopts = NegentropyOptions().direction(NegentropyDirection.DOWN)
cli.reconcile(filter1, dbopts)
self.client.reconcile(filter1, dbopts)
filter_delete = Filter().until(Timestamp.from_secs(Timestamp.now().as_secs() - self.db_since))
database.delete(filter_delete) # Clear old events so db doesn't get too full.
self.client.database().delete(filter_delete) # Clear old events so db doesn't get too full.

print(
"[" + self.dvm_config.NIP89.NAME + "] Done Syncing Notes of the last " + str(self.db_since) + " seconds..")
Expand Down
36 changes: 18 additions & 18 deletions nostr_dvm/tasks/content_discovery_currently_popular_by_top_zaps.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,20 @@ def __init__(self, name, dvm_config: DVMConfig, nip89config: NIP89Config, nip88c

if self.logger:
init_logger(LogLevel.DEBUG)

opts = (Options().wait_for_send(False).send_timeout(timedelta(seconds=self.dvm_config.RELAY_TIMEOUT)))
sk = SecretKey.from_hex(self.dvm_config.PRIVATE_KEY)
keys = Keys.parse(sk.to_hex())
signer = NostrSigner.keys(keys)
database = NostrDatabase.sqlite(self.db_name)
self.client = ClientBuilder().signer(signer).database(database).opts(opts).build()

self.client.add_relay("wss://relay.damus.io")
self.client.add_relay("wss://nostr.oxtr.dev")
self.client.add_relay("wss://nostr21.com")

self.client.connect()

if self.dvm_config.UPDATE_DATABASE:
self.sync_db()

Expand Down Expand Up @@ -110,23 +124,21 @@ def calculate_result(self, request_form):

options = self.set_options(request_form)

database = NostrDatabase.sqlite(self.db_name)
cli = ClientBuilder().database(database).build()

# Negentropy reconciliation
# Query events from database
timestamp_hour_ago = Timestamp.now().as_secs() - self.db_since
since = Timestamp.from_secs(timestamp_hour_ago)

filter1 = Filter().kind(definitions.EventDefinitions.KIND_NOTE).since(since)
events = cli.database().query([filter1])
events = self.client.database().query([filter1])
print("[" + self.dvm_config.NIP89.NAME + "] Considering " + str(len(events)) + " Events")

ns.finallist = {}
for event in events:
if event.created_at().as_secs() > timestamp_hour_ago:
filt = Filter().kinds([definitions.EventDefinitions.KIND_ZAP]).event(event.id()).since(since)
reactions = cli.database().query([filt])
reactions = self.client.database().query([filt])
invoice_amount = 0
haspreimage = False
if len(reactions) >= self.min_reactions:
Expand Down Expand Up @@ -180,18 +192,6 @@ def schedule(self, dvm_config):
return 1

def sync_db(self):
opts = (Options().wait_for_send(False).send_timeout(timedelta(seconds=self.dvm_config.RELAY_TIMEOUT)))
sk = SecretKey.from_hex(self.dvm_config.PRIVATE_KEY)
keys = Keys.parse(sk.to_hex())
signer = NostrSigner.keys(keys)
database = NostrDatabase.sqlite(self.db_name)
cli = ClientBuilder().signer(signer).database(database).opts(opts).build()

cli.add_relay("wss://relay.damus.io")
cli.add_relay("wss://nostr.oxtr.dev")
cli.add_relay("wss://nostr21.com")

cli.connect()

timestamp_hour_ago = Timestamp.now().as_secs() - self.db_since
lasthour = Timestamp.from_secs(timestamp_hour_ago)
Expand All @@ -203,9 +203,9 @@ def sync_db(self):
print("[" + self.dvm_config.NIP89.NAME + "] Syncing notes of the last " + str(
self.db_since) + " seconds.. this might take a while..")
dbopts = NegentropyOptions().direction(NegentropyDirection.DOWN)
cli.reconcile(filter1, dbopts)
self.client.reconcile(filter1, dbopts)
filter_delete = Filter().until(Timestamp.from_secs(Timestamp.now().as_secs() - self.db_since))
database.delete(filter_delete) # Clear old events so db doesn't get too full.
self.client.database().delete(filter_delete) # Clear old events so db doesn't get too full.

print(
"[" + self.dvm_config.NIP89.NAME + "] Done Syncing Notes of the last " + str(self.db_since) + " seconds..")
Expand Down
50 changes: 21 additions & 29 deletions nostr_dvm/tasks/content_discovery_currently_popular_followers.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,22 @@ def __init__(self, name, dvm_config: DVMConfig, nip89config: NIP89Config, nip88c
if self.logger:
init_logger(LogLevel.DEBUG)

opts = (Options().wait_for_send(False).send_timeout(timedelta(seconds=self.dvm_config.RELAY_TIMEOUT)))
sk = SecretKey.from_hex(self.dvm_config.PRIVATE_KEY)
keys = Keys.parse(sk.to_hex())
signer = NostrSigner.keys(keys)
database = NostrDatabase.sqlite(self.db_name)
self.client = ClientBuilder().signer(signer).database(database).opts(opts).build()

self.client.add_relay("wss://relay.damus.io")
self.client.add_relay("wss://nostr.oxtr.dev")
self.client.add_relay("wss://nostr21.com")

ropts = RelayOptions().ping(False)
self.client.add_relay_with_opts("wss://nostr.band", ropts)

self.client.connect()

if self.dvm_config.UPDATE_DATABASE:
self.sync_db()

Expand Down Expand Up @@ -100,23 +116,10 @@ def process(self, request_form):
opts = (
Options().wait_for_send(True).send_timeout(timedelta(seconds=self.dvm_config.RELAY_TIMEOUT)).relay_limits(
relaylimits))
sk = SecretKey.from_hex(self.dvm_config.PRIVATE_KEY)
keys = Keys.parse(sk.to_hex())
signer = NostrSigner.keys(keys)
database = NostrDatabase.sqlite(self.db_name)
cli = ClientBuilder().database(database).signer(signer).opts(opts).build()
cli.add_relay("wss://relay.damus.io")
cli.add_relay("wss://nos.lol")
cli.add_relay("wss://nostr.mom")

ropts = RelayOptions().ping(False)
cli.add_relay_with_opts("wss://nostr.band", ropts)

cli.connect()

user = PublicKey.parse(options["user"])
followers_filter = Filter().author(user).kinds([Kind(3)])
followers = cli.get_events_of([followers_filter], timedelta(seconds=self.dvm_config.RELAY_TIMEOUT))
followers = self.client.get_events_of([followers_filter], timedelta(seconds=self.dvm_config.RELAY_TIMEOUT))

# Negentropy reconciliation
# Query events from database
Expand All @@ -141,7 +144,7 @@ def process(self, request_form):
followings.append(following)

filter1 = Filter().kind(definitions.EventDefinitions.KIND_NOTE).authors(followings).since(since)
events = cli.database().query([filter1])
events = self.client.database().query([filter1])
print("[" + self.dvm_config.NIP89.NAME + "] Considering " + str(len(events)) + " Events")

ns.finallist = {}
Expand All @@ -150,7 +153,7 @@ def process(self, request_form):
filt = Filter().kinds(
[EventDefinitions.KIND_ZAP, EventDefinitions.KIND_REACTION, EventDefinitions.KIND_REPOST,
EventDefinitions.KIND_NOTE]).event(event.id()).since(since)
reactions = cli.database().query([filt])
reactions = self.client.database().query([filt])
if len(reactions) >= self.min_reactions:
ns.finallist[event.id().to_hex()] = len(reactions)

Expand Down Expand Up @@ -187,18 +190,7 @@ def schedule(self, dvm_config):
return 1

def sync_db(self):
opts = (Options().wait_for_send(False).send_timeout(timedelta(seconds=self.dvm_config.RELAY_TIMEOUT)))
sk = SecretKey.from_hex(self.dvm_config.PRIVATE_KEY)
keys = Keys.parse(sk.to_hex())
signer = NostrSigner.keys(keys)
database = NostrDatabase.sqlite(self.db_name)
cli = ClientBuilder().signer(signer).database(database).opts(opts).build()

cli.add_relay("wss://relay.damus.io")
cli.add_relay("wss://nostr.oxtr.dev")
cli.add_relay("wss://nostr21.com")

cli.connect()

timestamp_hour_ago = Timestamp.now().as_secs() - self.db_since
lasthour = Timestamp.from_secs(timestamp_hour_ago)
Expand All @@ -210,9 +202,9 @@ def sync_db(self):
print("[" + self.dvm_config.NIP89.NAME + "] Syncing notes of the last " + str(
self.db_since) + " seconds.. this might take a while..")
dbopts = NegentropyOptions().direction(NegentropyDirection.DOWN)
cli.reconcile(filter1, dbopts)
self.client.reconcile(filter1, dbopts)
filter_delete = Filter().until(Timestamp.from_secs(Timestamp.now().as_secs() - self.db_since))
database.delete(filter_delete) # Clear old events so db doesn't get too full.
self.client.database().delete(filter_delete) # Clear old events so db doesn't get too full.
print(
"[" + self.dvm_config.NIP89.NAME + "] Done Syncing Notes of the last " + str(self.db_since) + " seconds..")

Expand Down
33 changes: 17 additions & 16 deletions nostr_dvm/tasks/content_discovery_currently_popular_topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,18 @@ def __init__(self, name, dvm_config: DVMConfig, nip89config: NIP89Config, nip88c
if self.logger:
init_logger(LogLevel.DEBUG)

opts = (Options().wait_for_send(True).send_timeout(timedelta(seconds=self.dvm_config.RELAY_TIMEOUT)))
sk = SecretKey.from_hex(self.dvm_config.PRIVATE_KEY)
keys = Keys.parse(sk.to_hex())
signer = NostrSigner.keys(keys)
database = NostrDatabase.sqlite(self.db_name)
self.client = ClientBuilder().signer(signer).database(database).opts(opts).build()

self.client.add_relay("wss://relay.damus.io")
self.client.add_relay("wss://nostr.oxtr.dev")
self.client.add_relay("wss://nostr21.com")
self.client.connect()

if self.dvm_config.UPDATE_DATABASE:
self.sync_db()
if not self.personalized:
Expand Down Expand Up @@ -135,14 +147,13 @@ def calculate_result(self, request_form):

options = self.set_options(request_form)

database = NostrDatabase.sqlite(self.db_name)
cli = ClientBuilder().database(database).build()

timestamp_since = Timestamp.now().as_secs() - self.db_since
since = Timestamp.from_secs(timestamp_since)

filter1 = Filter().kind(definitions.EventDefinitions.KIND_NOTE).since(since)

events = cli.database().query([filter1])
events = self.client.database().query([filter1])
print("[" + self.dvm_config.NIP89.NAME + "] Considering " + str(len(events)) + " Events")

ns.final_list = {}
Expand All @@ -156,7 +167,7 @@ def calculate_result(self, request_form):
[definitions.EventDefinitions.KIND_ZAP, definitions.EventDefinitions.KIND_REACTION,
definitions.EventDefinitions.KIND_REPOST,
definitions.EventDefinitions.KIND_NOTE]).event(event.id()).since(since)
reactions = cli.database().query([filt])
reactions = self.client.database().query([filt])
if len(reactions) >= self.min_reactions:
ns.final_list[event.id().to_hex()] = len(reactions)

Expand Down Expand Up @@ -185,17 +196,7 @@ def schedule(self, dvm_config):
return 1

def sync_db(self):
opts = (Options().wait_for_send(True).send_timeout(timedelta(seconds=self.dvm_config.RELAY_TIMEOUT)))
sk = SecretKey.from_hex(self.dvm_config.PRIVATE_KEY)
keys = Keys.parse(sk.to_hex())
signer = NostrSigner.keys(keys)
database = NostrDatabase.sqlite(self.db_name)
cli = ClientBuilder().signer(signer).database(database).opts(opts).build()

cli.add_relay("wss://relay.damus.io")
cli.add_relay("wss://nostr.oxtr.dev")
cli.add_relay("wss://nostr21.com")
cli.connect()

timestamp_since = Timestamp.now().as_secs() - self.db_since
since = Timestamp.from_secs(timestamp_since)
Expand All @@ -208,9 +209,9 @@ def sync_db(self):
print("[" + self.dvm_config.NIP89.NAME + "] Syncing notes of the last " + str(
self.db_since) + " seconds.. this might take a while..")
dbopts = NegentropyOptions().direction(NegentropyDirection.DOWN)
cli.reconcile(filter1, dbopts)
self.client.reconcile(filter1, dbopts)
filter_delete = Filter().until(Timestamp.from_secs(Timestamp.now().as_secs() - self.db_since))
database.delete(filter_delete) # Clear old events so db doesn't get too full.
self.client.database().delete(filter_delete) # Clear old events so db doesn't get too full.

print(
"[" + self.dvm_config.NIP89.NAME + "] Done Syncing Notes of the last " + str(self.db_since) + " seconds..")
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from setuptools import setup, find_packages

VERSION = '0.5.2'
VERSION = '0.5.3'
DESCRIPTION = 'A framework to build and run Nostr NIP90 Data Vending Machines'
LONG_DESCRIPTION = ('A framework to build and run Nostr NIP90 Data Vending Machines. See the github repository for more information')

Expand Down
15 changes: 9 additions & 6 deletions tests/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from pathlib import Path

import dotenv
from nostr_sdk import init_logger, LogLevel, Keys
from nostr_sdk import init_logger, LogLevel, Keys, NostrLibrary

from nostr_dvm.subscription import Subscription
from nostr_dvm.tasks.content_discovery_currently_popular import DicoverContentCurrentlyPopular
Expand All @@ -18,9 +18,15 @@
from nostr_dvm.utils.nostr_utils import check_and_set_private_key
from nostr_dvm.utils.zap_utils import check_and_set_ln_bits_keys

global_update_rate = 180 # set this high on first sync so db can fully sync before another process trys to.
rebbroadcast_NIP89 = False # Announce NIP89 on startup
global_update_rate = 180 # set this high on first sync so db can fully sync before another process trys to.
use_logger = False

#git_hash = NostrLibrary().git_hash_version()
#print("GitHash " + git_hash)

if use_logger:
init_logger(LogLevel.DEBUG)
def build_example_nostrband(name, identifier, admin_config, image, about, custom_processing_msg):
dvm_config: DVMConfig = build_default_config(identifier)
dvm_config.USE_OWN_VENV = False
Expand Down Expand Up @@ -207,11 +213,8 @@ def build_example_top_zapped(name, identifier, admin_config, options, image, cos


def playground():
rebbroadcast_NIP89 = True # Announce NIP89 on startup
use_logger = False

if use_logger:
init_logger(LogLevel.INFO)


# Popular NOSTR.band
admin_config_trending_nostr_band = AdminConfig()
Expand Down

0 comments on commit eb125c4

Please sign in to comment.