Skip to content

Commit

Permalink
introduce loglevels to reduce logging on demand, improve wot
Browse files Browse the repository at this point in the history
  • Loading branch information
believethehype committed Jun 13, 2024
1 parent 78dd3ae commit b9dc440
Show file tree
Hide file tree
Showing 11 changed files with 182 additions and 97 deletions.
62 changes: 37 additions & 25 deletions nostr_dvm/dvm.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ class DVM:

def __init__(self, dvm_config, admin_config=None):


asyncio.run(self.run_dvm(dvm_config, admin_config))

async def run_dvm(self, dvm_config, admin_config):
Expand All @@ -57,7 +56,7 @@ async def run_dvm(self, dvm_config, admin_config):
self.job_list = []
self.jobs_on_hold_list = []
pk = self.keys.public_key()
print(bcolors.GREEN + "[" + self.dvm_config.NIP89.NAME + "] " + "Nostr DVM public key: " + str(
print(bcolors.BLUE + "[" + self.dvm_config.NIP89.NAME + "] " + "Nostr DVM public key: " + str(
pk.to_bech32()) + " Hex: " +
str(pk.to_hex()) + " Supported DVM tasks: " +
', '.join(p.NAME + ":" + p.TASK for p in self.dvm_config.SUPPORTED_DVMS) + bcolors.ENDC)
Expand All @@ -82,7 +81,8 @@ class NotificationHandler(HandleNotification):
keys = self.keys

async def handle(self, relay_url, subscription_id, nostr_event: Event):
print(nostr_event.as_json())
if self.dvm_config.LOGLEVEL.value >= LogLevel.DEBUG.value:
print(nostr_event.as_json())
if EventDefinitions.KIND_NIP90_EXTRACT_TEXT.as_u64() <= nostr_event.kind().as_u64() <= EventDefinitions.KIND_NIP90_GENERIC.as_u64():
await handle_nip90_job_event(nostr_event)
elif nostr_event.kind().as_u64() == EventDefinitions.KIND_ZAP.as_u64():
Expand Down Expand Up @@ -110,7 +110,8 @@ async def handle_nip90_job_event(nip90_event):
p_tag_str = tag.as_vec()[1]

if p_tag_str != "" and p_tag_str != self.dvm_config.PUBLIC_KEY:
print("[" + self.dvm_config.NIP89.NAME + "] No public request, also not addressed to me.")
if self.dvm_config.LOGLEVEL.value >= LogLevel.DEBUG.value:
print("[" + self.dvm_config.NIP89.NAME + "] No public request, also not addressed to me.")
return

# check if task is supported by the current DVM
Expand All @@ -126,9 +127,9 @@ async def handle_nip90_job_event(nip90_event):
await send_job_status_reaction(nip90_event, "error", client=self.client, dvm_config=self.dvm_config)
print("[" + self.dvm_config.NIP89.NAME + "] Request by blacklisted user, skipped")
return

print(
bcolors.MAGENTA + "[" + self.dvm_config.NIP89.NAME + "] Received new Request: " + task + " from " + user.name + bcolors.ENDC)
if self.dvm_config.LOGLEVEL.value >= LogLevel.INFO.value:
print(
bcolors.MAGENTA + "[" + self.dvm_config.NIP89.NAME + "] Received new Request: " + task + " from " + user.name + bcolors.ENDC)
duration = input_data_file_duration(nip90_event, dvm_config=self.dvm_config, client=self.client)
amount = get_amount_per_task(task, self.dvm_config, duration)
if amount is None:
Expand Down Expand Up @@ -158,8 +159,9 @@ async def handle_nip90_job_event(nip90_event):
self.dvm_config)

subscription_status = await nip88_has_active_subscription(PublicKey.parse(user.npub),
self.dvm_config.NIP88.DTAG, self.client,
self.dvm_config.PUBLIC_KEY)
self.dvm_config.NIP88.DTAG,
self.client,
self.dvm_config.PUBLIC_KEY)

if subscription_status["isActive"]:
await send_job_status_reaction(nip90_event, "subscription-required", True, amount,
Expand Down Expand Up @@ -201,9 +203,10 @@ async def handle_nip90_job_event(nip90_event):
if (user.iswhitelisted or task_is_free or cashu_redeemed) and (
p_tag_str == "" or p_tag_str ==
self.dvm_config.PUBLIC_KEY):
print(
"[" + self.dvm_config.NIP89.NAME + "] Free task or Whitelisted for task " + task +
". Starting processing..")
if self.dvm_config.LOGLEVEL.value >= LogLevel.DEBUG.value:
print(
"[" + self.dvm_config.NIP89.NAME + "] Free task or Whitelisted for task " + task +
". Starting processing..")

if dvm_config.SEND_FEEDBACK_EVENTS:
await send_job_status_reaction(nip90_event, "processing", True, 0,
Expand Down Expand Up @@ -278,7 +281,8 @@ async def handle_nip90_job_event(nip90_event):


else:
print("[" + self.dvm_config.NIP89.NAME + "] Job addressed to someone else, skipping..")
if self.dvm_config.LOGLEVEL.value >= LogLevel.DEBUG.value:
print("[" + self.dvm_config.NIP89.NAME + "] Job addressed to someone else, skipping..")
# else:
# print("[" + self.dvm_config.NIP89.NAME + "] Task " + task + " not supported on this DVM, skipping..")

Expand Down Expand Up @@ -500,9 +504,12 @@ async def send_nostr_reply_event(content, original_event_as_str):

# send_event(reply_event, client=self.client, dvm_config=self.dvm_config)
await send_event_outbox(reply_event, client=self.client, dvm_config=self.dvm_config)

print(bcolors.GREEN + "[" + self.dvm_config.NIP89.NAME + "] " + str(
original_event.kind().as_u64() + 1000) + " Job Response event sent: " + reply_event.as_json() + bcolors.ENDC)
if self.dvm_config.LOGLEVEL.value >= LogLevel.DEBUG.value:
print(bcolors.GREEN + "[" + self.dvm_config.NIP89.NAME + "] " + str(
original_event.kind().as_u64() + 1000) + " Job Response event sent: " + reply_event.as_json() + bcolors.ENDC)
elif self.dvm_config.LOGLEVEL.value >= LogLevel.INFO.value:
print(bcolors.GREEN + "[" + self.dvm_config.NIP89.NAME + "] " + str(
original_event.kind().as_u64() + 1000) + " Job Response event sent: " + reply_event.id().to_hex() + bcolors.ENDC)

async def send_job_status_reaction(original_event, status, is_paid=True, amount=0, client=None,
content=None,
Expand Down Expand Up @@ -612,8 +619,13 @@ async def send_job_status_reaction(original_event, status, is_paid=True, amount=
# send_event(reaction_event, client=self.client, dvm_config=self.dvm_config)
await send_event_outbox(reaction_event, client=self.client, dvm_config=self.dvm_config)

print(bcolors.YELLOW + "[" + self.dvm_config.NIP89.NAME + "]" + " Sent Kind " + str(
EventDefinitions.KIND_FEEDBACK.as_u64()) + " Reaction: " + status + " " + reaction_event.as_json() + bcolors.ENDC)
if self.dvm_config.LOGLEVEL.value >= LogLevel.DEBUG.value:
print(bcolors.YELLOW + "[" + self.dvm_config.NIP89.NAME + "]" + " Sent Kind " + str(
EventDefinitions.KIND_FEEDBACK.as_u64()) + " Reaction: " + status + " " + reaction_event.as_json() + bcolors.ENDC)
elif self.dvm_config.LOGLEVEL.value >= LogLevel.INFO.value:
print(bcolors.YELLOW + "[" + self.dvm_config.NIP89.NAME + "]" + " Sent Kind " + str(
EventDefinitions.KIND_FEEDBACK.as_u64()) + " Reaction: " + status + " " + reaction_event.id().to_hex() + bcolors.ENDC)

return reaction_event.as_json()

async def do_work(job_event, amount):
Expand Down Expand Up @@ -687,7 +699,7 @@ async def do_work(job_event, amount):

return

#await self.client.handle_notifications(NotificationHandler)
# await self.client.handle_notifications(NotificationHandler)
asyncio.create_task(self.client.handle_notifications(NotificationHandler()))

while True:
Expand All @@ -696,17 +708,17 @@ async def do_work(job_event, amount):

for job in self.job_list:
if job.bolt11 != "" and job.payment_hash != "" and not job.payment_hash is None and not job.is_paid:
ispaid = check_bolt11_ln_bits_is_paid(job.payment_hash, se.dvm_config)
ispaid = check_bolt11_ln_bits_is_paid(job.payment_hash, self.dvm_config)
if ispaid and job.is_paid is False:
print("is paid")
job.is_paid = True
amount = parse_amount_from_bolt11_invoice(job.bolt11)

job.is_paid = True
await send_job_status_reaction(job.event, "processing", True, 0,
content=self.dvm_config.CUSTOM_PROCESSING_MESSAGE,
client=self.client,
dvm_config=self.dvm_config)
content=self.dvm_config.CUSTOM_PROCESSING_MESSAGE,
client=self.client,
dvm_config=self.dvm_config)
print("[" + self.dvm_config.NIP89.NAME + "] doing work from joblist")
await do_work(job.event, amount)
elif ispaid is None: # invoice expired
Expand All @@ -716,8 +728,8 @@ async def do_work(job_event, amount):
self.job_list.remove(job)

for job in self.jobs_on_hold_list:
if await check_event_has_not_unfinished_job_input(job.event, False, client=se.client,
dvmconfig=self.dvm_config):
if await check_event_has_not_unfinished_job_input(job.event, False, client=self.client,
dvmconfig=self.dvm_config):
await handle_nip90_job_event(nip90_event=job.event)
try:
self.jobs_on_hold_list.remove(job)
Expand Down
10 changes: 6 additions & 4 deletions nostr_dvm/interfaces/dvmtaskinterface.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from sys import platform
from threading import Thread
from venv import create
from nostr_sdk import Keys, Kind
from nostr_sdk import Keys, Kind, LogLevel
from nostr_dvm.dvm import DVM
from nostr_dvm.utils.admin_utils import AdminConfig
from nostr_dvm.utils.dvmconfig import DVMConfig, build_default_config
Expand All @@ -17,6 +17,7 @@
from nostr_dvm.utils.output_utils import post_process_result



class DVMTaskInterface:
NAME: str
KIND: Kind
Expand Down Expand Up @@ -136,12 +137,13 @@ def post_process(self, result, event):
return post_process_result(result, event)

def set_options(self, request_form):

print("[" + self.dvm_config.NIP89.NAME + "] " + "Setting options...")
if self.dvm_config.LOGLEVEL.value >= LogLevel.DEBUG.value:
print("[" + self.dvm_config.NIP89.NAME + "] " + "Setting options...")
opts = []
if request_form.get("options"):
opts = json.loads(request_form["options"])
print("[" + self.dvm_config.NIP89.NAME + "] " + str(opts))
if self.dvm_config.LOGLEVEL.value >= LogLevel.DEBUG.value:
print("[" + self.dvm_config.NIP89.NAME + "] " + str(opts))
return dict(opts)

@staticmethod
Expand Down
22 changes: 13 additions & 9 deletions nostr_dvm/tasks/content_discovery_currently_latest_longform.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from nostr_dvm.utils.nip89_utils import NIP89Config, check_and_set_d_tag, create_amount_tag
from nostr_dvm.utils.output_utils import post_process_list_to_events


"""
This File contains a Module to discover popular notes
Accepted Inputs: none
Expand Down Expand Up @@ -112,7 +113,7 @@ async def calculate_result(self, request_form):
signer = NostrSigner.keys(keys)

database = await NostrDatabase.sqlite(self.db_name)
print(self.db_name)
#print(self.db_name)
cli = ClientBuilder().database(database).signer(signer).opts(opts).build()
await cli.connect()

Expand All @@ -123,8 +124,8 @@ async def calculate_result(self, request_form):

filter1 = Filter().kind(definitions.EventDefinitions.KIND_LONGFORM).since(since)
events = await cli.database().query([filter1])

print("[" + self.dvm_config.NIP89.NAME + "] Considering " + str(len(events)) + " Events")
if self.dvm_config.LOGLEVEL.value >= LogLevel.DEBUG.value:
print("[" + self.dvm_config.NIP89.NAME + "] Considering " + str(len(events)) + " Events")
ns.finallist = {}
index = options["max_results"]
for event in events:
Expand All @@ -142,8 +143,9 @@ async def calculate_result(self, request_form):
e_tag = Tag.parse(["e", entry[0]])
result_list.append(e_tag.as_vec())
await cli.shutdown()
print("[" + self.dvm_config.NIP89.NAME + "] Filtered " + str(
len(result_list)) + " fitting events.")
if self.dvm_config.LOGLEVEL.value >= LogLevel.DEBUG.value:
print("[" + self.dvm_config.NIP89.NAME + "] Filtered " + str(
len(result_list)) + " fitting events.")
return json.dumps(result_list)

def post_process(self, result, event):
Expand Down Expand Up @@ -189,15 +191,17 @@ async def sync_db(self):
filter1 = Filter().kinds([definitions.EventDefinitions.KIND_LONGFORM]).since(since) # Notes, reactions, zaps

# filter = Filter().author(keys.public_key())
print("[" + self.dvm_config.NIP89.NAME + "] Syncing notes of the last " + str(
self.db_since) + " seconds.. this might take a while..")
if self.dvm_config.LOGLEVEL.value >= LogLevel.DEBUG.value:
print("[" + self.dvm_config.NIP89.NAME + "] Syncing notes of the last " + str(
self.db_since) + " seconds.. this might take a while..")
dbopts = NegentropyOptions().direction(NegentropyDirection.DOWN)
await cli.reconcile(filter1, dbopts)
await cli.database().delete(Filter().until(Timestamp.from_secs(
Timestamp.now().as_secs() - self.db_since))) # Clear old events so db doesn't get too full.
await cli.shutdown()
print(
"[" + self.dvm_config.NIP89.NAME + "] Done Syncing Notes of the last " + str(self.db_since) + " seconds..")
if self.dvm_config.LOGLEVEL.value >= LogLevel.DEBUG.value:
print(
"[" + self.dvm_config.NIP89.NAME + "] Done Syncing Notes of the last " + str(self.db_since) + " seconds..")


# We build an example here that we can call by either calling this file directly from the main directory,
Expand Down
19 changes: 12 additions & 7 deletions nostr_dvm/tasks/content_discovery_currently_popular.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from nostr_dvm.utils.nip89_utils import NIP89Config, check_and_set_d_tag, create_amount_tag
from nostr_dvm.utils.output_utils import post_process_list_to_events


"""
This File contains a Module to discover popular notes
Accepted Inputs: none
Expand Down Expand Up @@ -125,7 +126,8 @@ async def calculate_result(self, request_form):

filter1 = Filter().kind(definitions.EventDefinitions.KIND_NOTE).since(since)
events = await database.query([filter1])
print("[" + self.dvm_config.NIP89.NAME + "] Considering " + str(len(events)) + " Events")
if self.dvm_config.LOGLEVEL.value >= LogLevel.DEBUG.value:
print("[" + self.dvm_config.NIP89.NAME + "] Considering " + str(len(events)) + " Events")
ns.finallist = {}
for event in events:
if event.created_at().as_secs() > timestamp_hour_ago:
Expand All @@ -146,8 +148,9 @@ async def calculate_result(self, request_form):
e_tag = Tag.parse(["e", entry[0]])
result_list.append(e_tag.as_vec())
#await cli.shutdown()
print("[" + self.dvm_config.NIP89.NAME + "] Filtered " + str(
len(result_list)) + " fitting events.")
if self.dvm_config.LOGLEVEL.value >= LogLevel.DEBUG.value:
print("[" + self.dvm_config.NIP89.NAME + "] Filtered " + str(
len(result_list)) + " fitting events.")
return json.dumps(result_list)

def post_process(self, result, event):
Expand Down Expand Up @@ -192,15 +195,17 @@ async def sync_db(self):
definitions.EventDefinitions.KIND_ZAP]).since(since) # Notes, reactions, zaps

# filter = Filter().author(keys.public_key())
print("[" + self.dvm_config.NIP89.NAME + "] Syncing notes of the last " + str(
self.db_since) + " seconds.. this might take a while..")
if self.dvm_config.LOGLEVEL.value >= LogLevel.DEBUG.value:
print("[" + self.dvm_config.NIP89.NAME + "] Syncing notes of the last " + str(
self.db_since) + " seconds.. this might take a while..")
dbopts = NegentropyOptions().direction(NegentropyDirection.DOWN)
await cli.reconcile(filter1, dbopts)
await cli.database().delete(Filter().until(Timestamp.from_secs(
Timestamp.now().as_secs() - self.db_since))) # Clear old events so db doesn't get too full.
await cli.shutdown()
print(
"[" + self.dvm_config.NIP89.NAME + "] Done Syncing Notes of the last " + str(self.db_since) + " seconds..")
if self.dvm_config.LOGLEVEL.value >= LogLevel.DEBUG.value:
print(
"[" + self.dvm_config.NIP89.NAME + "] Done Syncing Notes of the last " + str(self.db_since) + " seconds..")


# We build an example here that we can call by either calling this file directly from the main directory,
Expand Down
19 changes: 11 additions & 8 deletions nostr_dvm/tasks/content_discovery_currently_popular_by_top_zaps.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ async def calculate_result(self, request_form):

filter1 = Filter().kind(definitions.EventDefinitions.KIND_NOTE).since(since)
events = await database.query([filter1])
print("[" + self.dvm_config.NIP89.NAME + "] Considering " + str(len(events)) + " Events")
if self.dvm_config.LOGLEVEL.value >= LogLevel.DEBUG.value:
print("[" + self.dvm_config.NIP89.NAME + "] Considering " + str(len(events)) + " Events")

ns.finallist = {}
for event in events:
Expand Down Expand Up @@ -189,9 +190,9 @@ async def calculate_result(self, request_form):
# print(EventId.parse(entry[0]).to_bech32() + "/" + EventId.parse(entry[0]).to_hex() + ": " + str(entry[1]))
e_tag = Tag.parse(["e", entry[0]])
result_list.append(e_tag.as_vec())

print("[" + self.dvm_config.NIP89.NAME + "] Filtered " + str(
len(result_list)) + " fitting events.")
if self.dvm_config.LOGLEVEL.value >= LogLevel.DEBUG.value:
print("[" + self.dvm_config.NIP89.NAME + "] Filtered " + str(
len(result_list)) + " fitting events.")

#await cli.shutdown()

Expand Down Expand Up @@ -243,15 +244,17 @@ async def sync_db(self):
definitions.EventDefinitions.KIND_ZAP]).since(since) # Notes, reactions, zaps

# filter = Filter().author(keys.public_key())
print("[" + self.dvm_config.NIP89.NAME + "] Syncing notes of the last " + str(
self.db_since) + " seconds.. this might take a while..")
if self.dvm_config.LOGLEVEL.value >= LogLevel.DEBUG.value:
print("[" + self.dvm_config.NIP89.NAME + "] Syncing notes of the last " + str(
self.db_since) + " seconds.. this might take a while..")
dbopts = NegentropyOptions().direction(NegentropyDirection.DOWN)
await cli.reconcile(filter1, dbopts)
await cli.database().delete(Filter().until(Timestamp.from_secs(
Timestamp.now().as_secs() - self.db_since))) # Clear old events so db doesn't get too full.
await cli.shutdown()
print(
"[" + self.dvm_config.NIP89.NAME + "] Done Syncing Notes of the last " + str(self.db_since) + " seconds..")
if self.dvm_config.LOGLEVEL.value >= LogLevel.DEBUG.value:
print(
"[" + self.dvm_config.NIP89.NAME + "] Done Syncing Notes of the last " + str(self.db_since) + " seconds..")


# We build an example here that we can call by either calling this file directly from the main directory,
Expand Down
Loading

0 comments on commit b9dc440

Please sign in to comment.