Skip to content

Commit

Permalink
move is_input_supported and post_process to async, fix some get_event…
Browse files Browse the repository at this point in the history
…_id async
  • Loading branch information
believethehype committed Jun 17, 2024
1 parent 1b65cce commit b5a848d
Show file tree
Hide file tree
Showing 45 changed files with 150 additions and 154 deletions.
26 changes: 11 additions & 15 deletions nostr_dvm/dvm.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
import asyncio
import json
import os
import subprocess
import threading
from datetime import timedelta
from sys import platform

from nostr_sdk import PublicKey, Keys, Client, Tag, Event, EventBuilder, Filter, HandleNotification, Timestamp, \
init_logger, LogLevel, Options, nip04_encrypt, NostrSigner, Kind, RelayLimits

import time

from nostr_dvm.utils.definitions import EventDefinitions, RequiredJobToWatch, JobToWatch
from nostr_dvm.utils.dvmconfig import DVMConfig
from nostr_dvm.utils.admin_utils import admin_make_database_updates, AdminConfig
Expand Down Expand Up @@ -115,7 +111,7 @@ async def handle_nip90_job_event(nip90_event):
return

# check if task is supported by the current DVM
task_supported, task = check_task_is_supported(nip90_event, client=self.client,
task_supported, task = await check_task_is_supported(nip90_event, client=self.client,
config=self.dvm_config)
# if task is supported, continue, else do nothing.
if task_supported:
Expand Down Expand Up @@ -306,7 +302,7 @@ async def handle_zap(zap_event):
if tag.as_vec()[0] == 'amount':
amount = int(float(tag.as_vec()[1]) / 1000)
elif tag.as_vec()[0] == 'e':
job_event = get_event_by_id(tag.as_vec()[1], client=self.client, config=self.dvm_config)
job_event = await get_event_by_id(tag.as_vec()[1], client=self.client, config=self.dvm_config)
if job_event is not None:
job_event = check_and_decrypt_tags(job_event, self.dvm_config)
if job_event is None:
Expand All @@ -326,7 +322,7 @@ async def handle_zap(zap_event):


else:
task_supported, task = check_task_is_supported(job_event, client=self.client,
task_supported, task = await check_task_is_supported(job_event, client=self.client,
config=self.dvm_config)
if job_event is not None and task_supported:
print("Zap received for NIP90 task: " + str(invoice_amount) + " Sats from " + str(
Expand Down Expand Up @@ -382,7 +378,7 @@ async def handle_zap(zap_event):
print("[" + self.dvm_config.NIP89.NAME + "] Error during content decryption: " + str(e))

async def check_event_has_not_unfinished_job_input(nevent, append, client, dvmconfig):
task_supported, task = check_task_is_supported(nevent, client, config=dvmconfig)
task_supported, task = await check_task_is_supported(nevent, client, config=dvmconfig)
if not task_supported:
return False

Expand All @@ -395,7 +391,7 @@ async def check_event_has_not_unfinished_job_input(nevent, append, client, dvmco
input = tag.as_vec()[1]
input_type = tag.as_vec()[2]
if input_type == "job":
evt = get_referenced_event_by_id(event_id=input, client=client,
evt = await get_referenced_event_by_id(event_id=input, client=client,
kinds=EventDefinitions.ANY_RESULT,
dvm_config=dvmconfig)
if evt is None:
Expand Down Expand Up @@ -434,11 +430,11 @@ async def check_and_return_event(data, original_event: Event):
await send_nostr_reply_event(data, original_event.as_json())
break

task = get_task(original_event, self.client, self.dvm_config)
task = await get_task(original_event, self.client, self.dvm_config)
for dvm in self.dvm_config.SUPPORTED_DVMS:
if task == dvm.TASK:
try:
post_processed = dvm.post_process(data, original_event)
post_processed = await dvm.post_process(data, original_event)
await send_nostr_reply_event(post_processed, original_event.as_json())
except Exception as e:
print(e)
Expand Down Expand Up @@ -515,7 +511,7 @@ async def send_job_status_reaction(original_event, status, is_paid=True, amount=
content=None,
dvm_config=None, user=None):

task = get_task(original_event, client=client, dvm_config=dvm_config)
task = await get_task(original_event, client=client, dvm_config=dvm_config)
alt_description, reaction = build_status_reaction(status, task, amount, content, dvm_config)

e_tag = Tag.parse(["e", original_event.id().to_hex()])
Expand Down Expand Up @@ -654,14 +650,14 @@ async def do_work(job_event, amount):
EventDefinitions.KIND_NIP90_EXTRACT_TEXT.as_u64() <= job_event.kind().as_u64() <= EventDefinitions.KIND_NIP90_GENERIC.as_u64())
or job_event.kind().as_u64() == EventDefinitions.KIND_DM.as_u64()):

task = get_task(job_event, client=self.client, dvm_config=self.dvm_config)
task = await get_task(job_event, client=self.client, dvm_config=self.dvm_config)

for dvm in self.dvm_config.SUPPORTED_DVMS:
result = ""
try:
if task == dvm.TASK:

request_form = dvm.create_request_from_nostr_event(job_event, self.client, self.dvm_config)
request_form = await dvm.create_request_from_nostr_event(job_event, self.client, self.dvm_config)

if dvm_config.USE_OWN_VENV:
python_location = "/bin/python"
Expand Down Expand Up @@ -689,7 +685,7 @@ async def do_work(job_event, amount):
# We install locally in these cases for now
result = await dvm.process(request_form)
try:
post_processed = dvm.post_process(result, job_event)
post_processed = await dvm.post_process(result, job_event)
await send_nostr_reply_event(post_processed, job_event.as_json())
except Exception as e:
print(bcolors.RED + "[" + self.dvm_config.NIP89.NAME + "] Error: " + str(
Expand Down
6 changes: 3 additions & 3 deletions nostr_dvm/interfaces/dvmtaskinterface.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,19 +120,19 @@ def NIP89_announcement(self, nip89config: NIP89Config):
nip89.CONTENT = nip89config.CONTENT
return nip89

def is_input_supported(self, tags, client=None, dvm_config=None) -> bool:
async def is_input_supported(self, tags, client=None, dvm_config=None) -> bool:
"""Check if input is supported for current Task."""
pass

def create_request_from_nostr_event(self, event, client=None, dvm_config=None) -> dict:
async def create_request_from_nostr_event(self, event, client=None, dvm_config=None) -> dict:
"""Parse input into a request form that will be given to the process method"""
pass

async def process(self, request_form):
"Process the data and return the result"
pass

def post_process(self, result, event):
async def post_process(self, result, event):
"""Post-process the data and return the result Use default function, if not overwritten"""
return post_process_result(result, event)

Expand Down
6 changes: 3 additions & 3 deletions nostr_dvm/tasks/advanced_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ async def init_dvm(self, name, dvm_config: DVMConfig, nip89config: NIP89Config,
admin_config: AdminConfig = None, options=None):
dvm_config.SCRIPT = os.path.abspath(__file__)

def is_input_supported(self, tags, client=None, dvm_config=None):
async def is_input_supported(self, tags, client=None, dvm_config=None):
for tag in tags:
if tag.as_vec()[0] == 'i':
input_value = tag.as_vec()[1]
Expand All @@ -38,7 +38,7 @@ def is_input_supported(self, tags, client=None, dvm_config=None):
return False
return True

def create_request_from_nostr_event(self, event, client=None, dvm_config=None):
async def create_request_from_nostr_event(self, event, client=None, dvm_config=None):
self.dvm_config = dvm_config
print(self.dvm_config.PRIVATE_KEY)

Expand Down Expand Up @@ -144,7 +144,7 @@ async def process(self, request_form):
await cli.shutdown()
return json.dumps(result_list)

def post_process(self, result, event):
async def post_process(self, result, event):
"""Overwrite the interface function to return a social client readable format, if requested"""
for tag in event.tags():
if tag.as_vec()[0] == 'output':
Expand Down
6 changes: 3 additions & 3 deletions nostr_dvm/tasks/advanced_search_wine.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ async def init_dvm(self, name, dvm_config: DVMConfig, nip89config: NIP89Config,
admin_config: AdminConfig = None, options=None):
dvm_config.SCRIPT = os.path.abspath(__file__)

def is_input_supported(self, tags, client=None, dvm_config=None):
async def is_input_supported(self, tags, client=None, dvm_config=None):
for tag in tags:
if tag.as_vec()[0] == 'i':
input_value = tag.as_vec()[1]
Expand All @@ -40,7 +40,7 @@ def is_input_supported(self, tags, client=None, dvm_config=None):
return False
return True

def create_request_from_nostr_event(self, event, client=None, dvm_config=None):
async def create_request_from_nostr_event(self, event, client=None, dvm_config=None):
self.dvm_config = dvm_config
print(self.dvm_config.PRIVATE_KEY)

Expand Down Expand Up @@ -129,7 +129,7 @@ async def process(self, request_form):

return json.dumps(result_list)

def post_process(self, result, event):
async def post_process(self, result, event):
"""Overwrite the interface function to return a social client readable format, if requested"""
for tag in event.tags():
if tag.as_vec()[0] == 'output':
Expand Down
6 changes: 3 additions & 3 deletions nostr_dvm/tasks/audiogeneration_suno_ai.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ async def init_dvm(self, name, dvm_config: DVMConfig, nip89config: NIP89Config,
dvm_config.SCRIPT = os.path.abspath(__file__)
self.base_url = 'http://localhost:3000'

def is_input_supported(self, tags, client=None, dvm_config=None):
async def is_input_supported(self, tags, client=None, dvm_config=None):
for tag in tags:
if tag.as_vec()[0] == 'i':
input_value = tag.as_vec()[1]
Expand All @@ -45,7 +45,7 @@ def is_input_supported(self, tags, client=None, dvm_config=None):
return False
return True

def create_request_from_nostr_event(self, event, client=None, dvm_config=None):
async def create_request_from_nostr_event(self, event, client=None, dvm_config=None):
request_form = {"jobID": event.id().to_hex() + "_" + self.NAME.replace(" ", "")}

prompt = "A popular heavy metal song about a purple Ostrich, Nostr, sung by a deep-voiced male singer, slowly and melodiously. The lyrics depict hope for a better future."
Expand Down Expand Up @@ -131,7 +131,7 @@ async def process(self, request_form):
print(f"{data[1]['id']} ==> {data[1]['video_url']}")
break
# sleep 5s
asyncio.sleep(5.0)
await asyncio.sleep(5.0)

response1 = self.get_clip(data[0]['id'])
print(response1['video_url'])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async def init_dvm(self, name, dvm_config: DVMConfig, nip89config: NIP89Config,
if not self.personalized:
self.result = await self.calculate_result(self.request_form)

def is_input_supported(self, tags, client=None, dvm_config=None):
async def is_input_supported(self, tags, client=None, dvm_config=None):
for tag in tags:
if tag.as_vec()[0] == 'i':
input_value = tag.as_vec()[1]
Expand All @@ -70,7 +70,7 @@ def is_input_supported(self, tags, client=None, dvm_config=None):
return False
return True

def create_request_from_nostr_event(self, event, client=None, dvm_config=None):
async def create_request_from_nostr_event(self, event, client=None, dvm_config=None):
self.dvm_config = dvm_config

request_form = {"jobID": event.id().to_hex()}
Expand Down Expand Up @@ -148,7 +148,7 @@ async def calculate_result(self, request_form):
len(result_list)) + " fitting events.")
return json.dumps(result_list)

def post_process(self, result, event):
async def post_process(self, result, event):
"""Overwrite the interface function to return a social client readable format, if requested"""
for tag in event.tags():
if tag.as_vec()[0] == 'output':
Expand Down
6 changes: 3 additions & 3 deletions nostr_dvm/tasks/content_discovery_currently_popular.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ async def init_dvm(self, name, dvm_config: DVMConfig, nip89config: NIP89Config,
if not self.personalized:
self.result = await self.calculate_result(self.request_form)

def is_input_supported(self, tags, client=None, dvm_config=None):
async def is_input_supported(self, tags, client=None, dvm_config=None):
for tag in tags:
if tag.as_vec()[0] == 'i':
input_value = tag.as_vec()[1]
Expand All @@ -72,7 +72,7 @@ def is_input_supported(self, tags, client=None, dvm_config=None):
return False
return True

def create_request_from_nostr_event(self, event, client=None, dvm_config=None):
async def create_request_from_nostr_event(self, event, client=None, dvm_config=None):
self.dvm_config = dvm_config

request_form = {"jobID": event.id().to_hex()}
Expand Down Expand Up @@ -145,7 +145,7 @@ async def calculate_result(self, request_form):
return json.dumps(result_list)


def post_process(self, result, event):
async def post_process(self, result, event):
"""Overwrite the interface function to return a social client readable format, if requested"""
for tag in event.tags():
if tag.as_vec()[0] == 'output':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ async def init_dvm(self, name, dvm_config: DVMConfig, nip89config: NIP89Config,
if not self.personalized:
self.result = await self.calculate_result(self.request_form)

def is_input_supported(self, tags, client=None, dvm_config=None):
async def is_input_supported(self, tags, client=None, dvm_config=None):
for tag in tags:
if tag.as_vec()[0] == 'i':
input_value = tag.as_vec()[1]
Expand All @@ -73,7 +73,7 @@ def is_input_supported(self, tags, client=None, dvm_config=None):
return False
return True

def create_request_from_nostr_event(self, event, client=None, dvm_config=None):
async def create_request_from_nostr_event(self, event, client=None, dvm_config=None):
self.dvm_config = dvm_config

request_form = {"jobID": event.id().to_hex()}
Expand Down Expand Up @@ -198,7 +198,7 @@ async def calculate_result(self, request_form):

return json.dumps(result_list)

def post_process(self, result, event):
async def post_process(self, result, event):
"""Overwrite the interface function to return a social client readable format, if requested"""
for tag in event.tags():
if tag.as_vec()[0] == 'output':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ async def init_dvm(self, name, dvm_config: DVMConfig, nip89config: NIP89Config,
if self.dvm_config.UPDATE_DATABASE:
await self.sync_db()

def is_input_supported(self, tags, client=None, dvm_config=None):
async def is_input_supported(self, tags, client=None, dvm_config=None):
for tag in tags:
if tag.as_vec()[0] == 'i':
input_value = tag.as_vec()[1]
Expand All @@ -60,7 +60,7 @@ def is_input_supported(self, tags, client=None, dvm_config=None):
return False
return True

def create_request_from_nostr_event(self, event: Event, client=None, dvm_config=None):
async def create_request_from_nostr_event(self, event: Event, client=None, dvm_config=None):
self.dvm_config = dvm_config

request_form = {"jobID": event.id().to_hex()}
Expand Down Expand Up @@ -170,7 +170,7 @@ async def process(self, request_form):

return json.dumps(result_list)

def post_process(self, result, event):
async def post_process(self, result, event):
"""Overwrite the interface function to return a social client readable format, if requested"""
for tag in event.tags():
if tag.as_vec()[0] == 'output':
Expand Down
6 changes: 3 additions & 3 deletions nostr_dvm/tasks/content_discovery_currently_popular_topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ async def init_dvm(self, name, dvm_config: DVMConfig, nip89config: NIP89Config,
if not self.personalized:
self.result = await self.calculate_result(self.request_form)

def is_input_supported(self, tags, client=None, dvm_config=None):
async def is_input_supported(self, tags, client=None, dvm_config=None):
for tag in tags:
if tag.as_vec()[0] == 'i':
input_value = tag.as_vec()[1]
Expand All @@ -82,7 +82,7 @@ def is_input_supported(self, tags, client=None, dvm_config=None):
return False
return True

def create_request_from_nostr_event(self, event, client=None, dvm_config=None):
async def create_request_from_nostr_event(self, event, client=None, dvm_config=None):
self.dvm_config = dvm_config

request_form = {"jobID": event.id().to_hex()}
Expand Down Expand Up @@ -113,7 +113,7 @@ async def process(self, request_form):
else:
return self.result

def post_process(self, result, event):
async def post_process(self, result, event):
"""Overwrite the interface function to return a social client readable format, if requested"""
for tag in event.tags():
if tag.as_vec()[0] == 'output':
Expand Down
6 changes: 3 additions & 3 deletions nostr_dvm/tasks/content_discovery_update_db_only.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ async def init_dvm(self, name, dvm_config: DVMConfig, nip89config: NIP89Config,
if self.dvm_config.UPDATE_DATABASE:
await self.sync_db()

def is_input_supported(self, tags, client=None, dvm_config=None):
async def is_input_supported(self, tags, client=None, dvm_config=None):
for tag in tags:
if tag.as_vec()[0] == 'i':
input_value = tag.as_vec()[1]
Expand All @@ -74,7 +74,7 @@ def is_input_supported(self, tags, client=None, dvm_config=None):
return False
return True

def create_request_from_nostr_event(self, event, client=None, dvm_config=None):
async def create_request_from_nostr_event(self, event, client=None, dvm_config=None):
self.dvm_config = dvm_config

request_form = {"jobID": event.id().to_hex()}
Expand All @@ -100,7 +100,7 @@ def create_request_from_nostr_event(self, event, client=None, dvm_config=None):
async def process(self, request_form):
return "I don't return results, I just update the DB."

def post_process(self, result, event):
async def post_process(self, result, event):
"""Overwrite the interface function to return a social client readable format, if requested"""
for tag in event.tags():
if tag.as_vec()[0] == 'output':
Expand Down
Loading

0 comments on commit b5a848d

Please sign in to comment.