diff --git a/core/databases/db_base.py b/core/databases/db_base.py new file mode 100644 index 0000000..d9d7e53 --- /dev/null +++ b/core/databases/db_base.py @@ -0,0 +1,17 @@ +import logging + +from sqlalchemy import create_engine +from sqlalchemy.orm import DeclarativeBase + +engine = create_engine("sqlite://") + +logging.basicConfig() +logging.getLogger("sqlalchemy.engine").setLevel(logging.CRITICAL) + + +class Base(DeclarativeBase): + pass + + +def db_init(): + Base.metadata.create_all(engine) diff --git a/core/databases/db_completion_tasks.py b/core/databases/db_completion_tasks.py index 59bc406..4ce9e2b 100644 --- a/core/databases/db_completion_tasks.py +++ b/core/databases/db_completion_tasks.py @@ -1,79 +1,124 @@ -from tinydb import Query +from sqlalchemy import String, Integer, Boolean, select, update +from sqlalchemy.orm import Mapped, mapped_column, Session, relationship from core.databases import defaults +from core.databases.db_base import Base, engine +from core.databases.db_crawl_tasks import CrawlTask from core.tools import utils -from core.tools.utils import use_tinydb, gen_unix_time +from core.tools.utils import gen_unix_time, page_to_range -db = use_tinydb("completion_tasks") +class CompletionTask(Base): + __tablename__ = "completion_tasks" -def db_add_completion_task(prompt, mode): + uuid: Mapped[str] = mapped_column(primary_key=True) + prompt: Mapped[str] = mapped_column(String()) # make sure postgres uses "TEXT" here + mode: Mapped[str] = mapped_column(String(12)) + timestamp: Mapped[int] = mapped_column(Integer()) # time added + completion_result: Mapped[str] = mapped_column(String()) # "TEXT" type here as well + + executing: Mapped[bool] = mapped_column(Boolean()) + execution_date: Mapped[int] = mapped_column(Integer()) # time started completion + + completed: Mapped[bool] = mapped_column(Boolean()) + completion_date: Mapped[int] = mapped_column(Integer()) # time completed + + required_crawl_tasks: Mapped[list["CrawlTask"]] = relationship() + + +def db_add_completion_task(prompt, mode) -> str: new_uuid = utils.gen_uuid() timestamp = utils.gen_unix_time() - db.insert( - { - "uuid": new_uuid, - "prompt": prompt, - "mode": mode, - "completed": False, - "completion_result": None, - "executing": False, - "required_crawl_tasks": [], # uuid list that has to be completed first - "completion_date": 0, - "execution_date": 0, - "timestamp": timestamp, - } - ) + with Session(engine) as session: + completion_task = CompletionTask( + uuid=new_uuid, + prompt=prompt, + mode=mode, + timestamp=timestamp, + executing=False, + execution_date=0, + completed=False, + completion_date=0, + required_crawl_tasks=[], + ) + + session.add(completion_task) + session.commit() return new_uuid -def db_get_completion_tasks_by_page(page: int, per_page: int = defaults.ITEMS_PER_PAGE): - # returns all as TinyDB does not support pagination - # we'll be moving to SQLite or Cassandra soon enough - results = db.all() +def db_get_completion_tasks_by_page( + page: int, per_page: int = defaults.ITEMS_PER_PAGE +) -> list[CompletionTask]: + session = Session(engine) + + start, stop = page_to_range(page, per_page) + query = select(CompletionTask).slice(start, stop) + + results = list(session.scalars(query)) return results -def db_get_completion_tasks_by_uuid(uuid: int): - fields = Query() - result = db.get(fields.uuid == uuid) +def db_get_completion_task_by_uuid(uuid: int) -> CompletionTask: + session = Session(engine) + + query = select(CompletionTask).where(CompletionTask.uuid.is_(uuid)) + + result = session.scalars(query).one() return result def db_set_completion_task_executing(uuid: str): - fields = Query() - db.update( - {"executing": True, "execution_date": gen_unix_time()}, fields.uuid == uuid + session = Session(engine) + + session.execute( + update(CompletionTask) + .where(CompletionTask.uuid.is_(uuid)) + .values(executing=True, execution_date=gen_unix_time()) ) + session.commit() + def db_get_incomplete_completion_tasks(amount: int = 1): - fields = Query() + session = Session(engine) + + query = ( + select(CompletionTask).where(CompletionTask.completed.is_(False)).limit(amount) + ) - results = db.search(fields.completed == False and fields.executing == False) - results = results[:amount] + results = list(session.scalars(query).all()) for task in results: - db_set_completion_task_executing(task["uuid"]) + db_set_completion_task_executing(task.uuid) return results def db_release_executing_tasks(uuid_list: list[str]): - fields = Query() - db.update({"executing": False}, fields.uuid.one_of(uuid_list)) + session = Session(engine) + + session.execute( + update(CompletionTask) + .where(CompletionTask.uuid.in_(uuid_list)) + .values(executing=False, execution_date=0) + ) + + session.commit() def db_update_completion_task_after_summarizing(summary: str, uuid: str): - fields = Query() - db.update( - { - "completed": True, - "completion_result": summary, - "completion_date": gen_unix_time(), - }, - fields.uuid == uuid, + session = Session(engine) + + session.execute( + update(CompletionTask) + .where(CompletionTask.uuid.is_(uuid)) + .values( + completed=True, completion_result=summary, completion_date=gen_unix_time() + ) ) + + session.commit() diff --git a/core/databases/db_crawl_history.py b/core/databases/db_crawl_history.py deleted file mode 100644 index f511e5b..0000000 --- a/core/databases/db_crawl_history.py +++ /dev/null @@ -1,29 +0,0 @@ -from core.databases import defaults -from core.tools import utils -from core.tools.utils import use_tinydb - -db = use_tinydb("crawl_history") - -# this db is completely optional, only used by the UI, and so it's development can be delayed -# most sensible solution here is to make items of the url_database point to entries of this database -# even better, let's add a prompt field to each entry of the url_database, and count them here -# this db will still be used to store the prompts, and their embeddings, so that the UI -# will have an easy time comparing new prompts to historical ones - - -def db_add_crawl_history(prompt: str) -> str: - new_uuid = utils.gen_uuid() - return new_uuid - - -def db_add_url_to_crawl_history(url: str, prompt: str) -> str: - new_uuid = utils.gen_uuid() - return new_uuid - - -def db_get_similar_prompts(prompt: str) -> list: - return [] - - -def db_get_crawl_history_by_page(page: int, per_page=defaults.ITEMS_PER_PAGE) -> list: - return [] diff --git a/core/databases/db_crawl_tasks.py b/core/databases/db_crawl_tasks.py index f1cb2e3..d8e7e64 100644 --- a/core/databases/db_crawl_tasks.py +++ b/core/databases/db_crawl_tasks.py @@ -1,13 +1,50 @@ from typing import Literal -from tinydb import Query - +from sqlalchemy import ( + String, + Boolean, + Integer, + update, + select, + ForeignKey, +) +from sqlalchemy.orm import Mapped, mapped_column, Session, relationship + +from core.databases.db_base import Base, engine from core.tools import utils -from core.tools.utils import use_tinydb, gen_unix_time +from core.tools.utils import gen_unix_time + + +class EmbeddingProgression(Base): + __tablename__ = "embedding_progressions" + + uuid: Mapped[str] = mapped_column(primary_key=True) + + crawl_uuid: Mapped[str] = mapped_column(ForeignKey("crawl_tasks.uuid")) + + embedder_name: Mapped[str] = mapped_column(String()) + embedding_amount: Mapped[int] = mapped_column(Integer(), default=0) + timestamp: Mapped[int] = mapped_column(Integer()) # time added UNIX SECONDS + + +class CrawlTask(Base): + __tablename__ = "crawl_tasks" -db = use_tinydb("crawl_tasks") + uuid: Mapped[str] = mapped_column(primary_key=True) + prompt: Mapped[str] = mapped_column(String()) + mode: Mapped[str] = mapped_column(String(12)) + timestamp: Mapped[int] = mapped_column(Integer()) # time added UNIX SECONDS -# we have to heartbeat our workers once we run out of tasks, websocks should suffice + executing: Mapped[bool] = mapped_column(Boolean()) + execution_date: Mapped[int] = mapped_column(Integer()) # time started completion + + completed: Mapped[bool] = mapped_column(Boolean()) + completion_date: Mapped[int] = mapped_column(Integer()) # time completed + + embedding_progression: Mapped[list["EmbeddingProgression"]] = relationship() + base_amount_scheduled: Mapped[int] = mapped_column(Integer()) + + required_by_uuid: Mapped[str] = mapped_column(ForeignKey("completion_tasks.uuid")) def db_add_crawl_task(prompt: str, mode: Literal["news", "wiki", "docs"] = "wiki"): @@ -15,41 +52,56 @@ def db_add_crawl_task(prompt: str, mode: Literal["news", "wiki", "docs"] = "wiki new_uuid = utils.gen_uuid() timestamp = utils.gen_unix_time() - db.insert( - { - "uuid": new_uuid, - "prompt": prompt, - "type": mode, - "completed": False, - "executing": False, - "completion_date": 0, # time completed - "execution_date": 0, # time started completion - "timestamp": timestamp, # time added - "base_amount_scheduled": 100, # todo: replace with dynamically adjusted value - "embedding_progression": {}, # {model_name: count} | progress tracking - } - ) + with Session(engine) as session: + crawl_task = CrawlTask( + uuid=new_uuid, + prompt=prompt, + mode=mode, + timestamp=timestamp, + executing=False, + execution_date=0, + completed=False, + completion_date=0, + base_amount_scheduled=100, + embedding_progression={}, + ) + + session.add(crawl_task) + session.commit() return new_uuid def db_set_crawl_executing(uuid: str): - fields = Query() - db.update( - {"executing": True, "execution_date": gen_unix_time()}, fields.uuid == uuid + session = Session(engine) + + session.execute( + update(CrawlTask) + .where(CrawlTask.uuid.is_(uuid)) + .values(executing=True, execution_date=gen_unix_time()) ) + session.commit() + def db_set_crawl_completed(uuid: str): - fields = Query() - db.update( - {"completed": True, "completion_date": gen_unix_time()}, fields.uuid == uuid + session = Session(engine) + + session.execute( + update(CrawlTask) + .where(CrawlTask.uuid.is_(uuid)) + .values(completed=True, completion_date=gen_unix_time()) ) + session.commit() + +# fixme: this function should return a list of all tasks for management purposes (see below) def db_get_crawl_task(): - fields = Query() - crawl_task = db.get(fields.completed == False) + session = Session(engine) + + query = select(CrawlTask).where(CrawlTask.completed.is_(False)) + crawl_task = session.scalars(query).one_or_none() if crawl_task is not None: db_set_crawl_executing(crawl_task.uuid) @@ -57,19 +109,31 @@ def db_get_crawl_task(): return crawl_task +# fixme cont. and this function should only return n of inComp and nonExec tasks, for workers def db_get_incomplete_crawl_task(): - fields = Query() - task = db.get(fields.completed == False and fields.executing == False) - db.update({"executing": True}, fields.uuid == task.uuid) + session = Session(engine) + + query = ( + select(CrawlTask) + .where(CrawlTask.completed.is_(False)) + .where(CrawlTask.executing.is_(False)) + ) + + crawl_task = session.scalars(query).one_or_none() + + if crawl_task is not None: + db_set_crawl_executing(crawl_task.uuid) - return task + return crawl_task def db_is_task_completed(uuid: str): - fields = Query() - task = db.get(fields.uuid == uuid) + session = Session(engine) - return task.completed + query = select(CrawlTask).where(CrawlTask.uuid.is_(uuid)) + crawl_task = session.scalars(query).one_or_none() + + return crawl_task.completed def db_are_tasks_completed(uuid_list: list[str]): @@ -80,17 +144,21 @@ def db_are_tasks_completed(uuid_list: list[str]): for uuid in uuid_list: task_completeness = db_is_task_completed(uuid) - total_completeness *= task_completeness + if task_completeness is False: + total_completeness = False + break - pass + return total_completeness def db_is_crawl_task_fully_embedded(uuid: str, model_name: str): - fields = Query() - task = db.get(fields.uuid == uuid) + session = Session(engine) + + query = select(CrawlTask).where(CrawlTask.uuid.is_(uuid)) + crawl_task = session.scalars(query).one() - baseline_count = task.base_amount_scheduled - current_count = task.embedding_progression[model_name] + baseline_count = crawl_task.base_amount_scheduled + current_count = crawl_task.embedding_progression[model_name] return current_count >= baseline_count @@ -105,10 +173,12 @@ def db_are_crawl_tasks_fully_embedded(uuid_list: str, model_name: str): def db_increment_task_embedding_progression(uuid: str, model_name: str): - fields = Query() - task = db.get(fields.uuid == uuid) + session = Session(engine) - current_progression = task.embedding_progression + query = select(CrawlTask).where(CrawlTask.uuid.is_(uuid)) + crawl_task = session.scalars(query).one() + + current_progression = crawl_task.embedding_progression current_count = current_progression[model_name] if current_count is not None: @@ -118,4 +188,10 @@ def db_increment_task_embedding_progression(uuid: str, model_name: str): current_progression[model_name] = current_count - db.update({"embedding_progression": current_progression}, fields.uuid == task.uuid) + session.execute( + update(CrawlTask) + .where(CrawlTask.uuid.is_(crawl_task.uuid)) + .values(embedding_progression=current_progression) + ) + + session.commit() diff --git a/core/databases/db_embeddings.py b/core/databases/db_embeddings.py index 68c0ce5..5877bbb 100644 --- a/core/databases/db_embeddings.py +++ b/core/databases/db_embeddings.py @@ -1,4 +1,11 @@ +from __future__ import annotations + +from typing import Union + +from langchain_community.embeddings import OllamaEmbeddings +from langchain_community.vectorstores import FAISS from langchain_text_splitters import RecursiveCharacterTextSplitter +from llama_cpp import Llama from configurator import get_runtime_config from core.tools.utils import use_faiss, is_text_junk @@ -9,8 +16,11 @@ # fixme: these cause linter errors, i don't like this code overall, # but i had to quickly find a solution to file-scope execution -vector_db, embedder = None, None -text_splitter = None +vector_db: FAISS | None = None +embedder: Union[OllamaEmbeddings, Llama] | None = None +text_splitter: RecursiveCharacterTextSplitter | None = None + +# todo: we've changed every other db to Postgres, but here, we'll have to use 'FAISS server' def first_use_init(): diff --git a/core/databases/db_url_pool.py b/core/databases/db_url_pool.py index 6f3a0f0..6cf8936 100644 --- a/core/databases/db_url_pool.py +++ b/core/databases/db_url_pool.py @@ -1,88 +1,159 @@ -from tinydb import Query -from tinydb.table import Document - -from core.databases import defaults +from sqlalchemy import ( + String, + Boolean, + Integer, + select, + update, + ForeignKey, +) +from sqlalchemy.orm import Mapped, mapped_column, Session, relationship + +from core.databases.db_base import Base, engine from core.tools import utils -from core.tools.utils import use_tinydb -db = use_tinydb("url_pool") -# we have to heartbeat the workers once we run out of urls -# i believe this db should remain local permanently -# instead, we should have a separate global file db for embedder to use, -# and a tiny global kv cache just to prevent duplicate urls +class UrlEmbedding(Base): + __tablename__ = "url_embeddings" + + uuid: Mapped[str] = mapped_column(primary_key=True) + + document_uuid: Mapped[str] = mapped_column(ForeignKey("url_pool.uuid")) + + embedder_name: Mapped[str] = mapped_column(String()) + timestamp: Mapped[int] = mapped_column(Integer()) # time added UNIX SECONDS + + +class UrlObject(Base): + __tablename__ = "url_pool" + + # base data + uuid: Mapped[str] = mapped_column(primary_key=True) + url: Mapped[str] = mapped_column(String()) + text: Mapped[str] = mapped_column(String()) + + # tracking data + parent_uuid: Mapped[str] = mapped_column(String()) + task_uuid: Mapped[str] = mapped_column(String()) + prompt: Mapped[str] = mapped_column(String()) + + timestamp: Mapped[int] = mapped_column(Integer()) # time added UNIX SECONDS + + is_downloaded: Mapped[bool] = mapped_column(Boolean()) + is_rubbish: Mapped[bool] = mapped_column(Boolean()) + + embedded_by: Mapped[list["UrlEmbedding"]] = relationship() def db_add_url(url: str, prompt: str, parent_uuid: str = None, task_uuid: str = None): new_uuid = utils.gen_uuid() timestamp = utils.gen_unix_time() - new_url_object = { - "uuid": new_uuid, - "parent_uuid": parent_uuid, - "task_uuid": task_uuid, - "prompt": prompt, - "url": url, - "text": None, - "is_downloaded": False, - "is_rubbish": False, - "embedded_by": [], - "timestamp": timestamp, - } + with Session(engine) as session: + completion_task = UrlObject( + uuid=new_uuid, + parent_uuid=parent_uuid, + task_uuid=task_uuid, + prompt=prompt, + url=url, + text=None, + is_downloaded=False, + is_rubbish=False, + embedded_by=[], + timestamp=timestamp, + ) - db.insert(new_url_object) + session.add(completion_task) + session.commit() - return new_url_object + return new_uuid, completion_task def db_get_not_downloaded() -> list: - db_query = Query() - db_results = db.search( - db_query.fragment({"is_downloaded": False, "is_rubbish": False}) + session = Session(engine) + + query = ( + select(UrlObject) + .where(UrlObject.is_downloaded.is_(False)) + .where(UrlObject.is_rubbish.is_(False)) ) - return db_results + results = list(session.scalars(query).all()) + + return results + +def db_get_not_embedded(model: str, amount: int = 100) -> list[UrlObject]: + session = Session(engine) -def db_get_not_embedded(model: str, per_page=defaults.ITEMS_PER_PAGE) -> list[Document]: - fields = Query() + exclusion_query = ( + select(UrlObject) + .where(UrlEmbedding.document_uuid.is_(UrlObject.uuid)) + .where(UrlEmbedding.embedder_name.is_(model)) + .limit(amount) + ) + + query = ( + select(UrlObject) + .where(UrlObject.is_downloaded.is_(True)) + .where(UrlObject.is_rubbish.is_(False)) + ) + + # todo: this particular function requires rigorous testing, + # as this is not a well documented use case + query.except_(exclusion_query) - db_results = db.search(~fields.embedded_by.any(model)) + results = list(session.scalars(query).all()) - return db_results + return results def db_set_url_embedded(url_id: str, embedding_model: str): - query = Query() - record = db.get(query.uuid == url_id) - if record is None: - return + new_uuid = utils.gen_uuid() + timestamp = utils.gen_unix_time() + + with Session(engine) as session: + completion_task = UrlEmbedding( + uuid=new_uuid, + document_uuid=url_id, + embedder_name=embedding_model, + timestamp=timestamp, + ) - embedded_by = record["embedded_by"] - embedded_by.append(embedding_model) + session.add(completion_task) + session.commit() - db.update({"embedded_by": embedded_by}, query.uuid == url_id) + return True def db_set_url_downloaded(url_id: str, text: str): - query = Query() - record = db.get(query.uuid == url_id) - if record is None: - return + session = Session(engine) - db.update({"is_downloaded": True, "text": text}, query.uuid == url_id) + session.execute( + update(UrlObject) + .where(UrlObject.uuid.is_(url_id)) + .values(is_downloaded=True, text=text) + ) + + session.commit() def db_set_url_rubbish(url_id: str): - query = Query() - record = db.get(query.uuid == url_id) - if record is None: - return + session = Session(engine) + + session.execute( + update(UrlObject).where(UrlObject.uuid.is_(url_id)).values(is_rubbish=True) + ) - db.update({"is_rubbish": True}, query.uuid == url_id) + session.commit() def db_is_url_present(url: str): - query = Query() - record = db.get(query.url == url) - return record is not None + session = Session(engine) + + query = select(UrlObject).where(UrlObject.url.is_(url)) + result = session.scalar(query) + + if result is None: + return False + + return True diff --git a/core/tools/errorlib.py b/core/tools/errorlib.py index b428c6e..e651452 100644 --- a/core/tools/errorlib.py +++ b/core/tools/errorlib.py @@ -8,14 +8,14 @@ def pretty_error(title: str, advice: str): + global debug err_content = ( f"\n{Fore.RED}{Style.BRIGHT}{title}{Style.RESET_ALL}\n" f"{advice}{Style.RESET_ALL}\n" f"Run {Fore.CYAN}main.py -h{Fore.RESET} for more details\n" ) - debug = 1 - if debug == 1: + if debug == 1 or debug is True: raise ValueError(err_content) print(err_content) diff --git a/core/tools/utils.py b/core/tools/utils.py index 09716e4..c7c03c3 100644 --- a/core/tools/utils.py +++ b/core/tools/utils.py @@ -79,6 +79,13 @@ def use_tinydb(db_name): return db +def page_to_range(page: int, per_page: int) -> (int, int): + start = page * per_page + stop = start + per_page + + return start, stop + + def gen_vec_db_full_name(db_name, model_name): return db_name + "_" + model_name diff --git a/main.py b/main.py index 3b70641..2ed30e8 100644 --- a/main.py +++ b/main.py @@ -4,12 +4,14 @@ from colorama import init as colorama_init, Fore from configurator import get_runtime_config, args +from core.databases import db_base from core.tools import errorlib from workers.crawler import start_crawler from workers.embedder import start_embedder from workers.summarizer import start_summarizer colorama_init() +db_base.db_init() if args.worker_type == "webui": # fixme: this is a workaround, webui should be started from it's folder diff --git a/webui/main.py b/webui/main.py index 614d04d..b8c34f6 100644 --- a/webui/main.py +++ b/webui/main.py @@ -5,7 +5,6 @@ db_add_completion_task, db_get_completion_tasks_by_page, db_get_incomplete_completion_tasks, - db_get_completion_tasks_by_uuid ) from core.databases.db_crawl_tasks import ( db_add_crawl_task, @@ -84,13 +83,7 @@ def get_incomplete_task(): @app.post("/task") def add_completion_task(completion_task: TaskCreator): uuid = db_add_completion_task(completion_task.prompt, completion_task.mode) - result = db_get_completion_tasks_by_uuid(uuid) - while result is None or result["completion_result"] is None: - result = db_get_completion_tasks_by_uuid(uuid) - return { - "summary": result["completion_result"] - } - + return {"uuid": uuid} @app.get("/crawl") @@ -111,15 +104,15 @@ def set_crawl_completed(uuid): @app.get("/crawl/incomplete") -def get_inocmplete_completion_task(): +def get_incomplete_completion_task(): result = db_get_incomplete_completion_tasks() return {"task": result} @app.post("/url") def add_url(url: UrlCreator): - result = db_add_url(url.url, url.prompt, url.parent_uuid) - return {"url_object": result} + uuid, result = db_add_url(url.url, url.prompt, url.parent_uuid) + return {"uuid": uuid, "url_object": result} @app.get("/url/downloaded") diff --git a/workers/crawler.py b/workers/crawler.py index be3273c..6055ea5 100644 --- a/workers/crawler.py +++ b/workers/crawler.py @@ -1,7 +1,7 @@ from urllib.error import HTTPError +from colorama import Fore, Style from langchain_community.document_loaders import WebBaseLoader, PyPDFLoader -from tinydb import Query from core.classes.traffic_manager import TrafficManager from core.databases import db_url_pool, db_crawl_tasks @@ -23,7 +23,7 @@ # url order: # 0. use short memory urls # 1. refill with non-researched db urls -# 2. refill with google search +# 2. refill with Google search google_traffic_manager = TrafficManager() @@ -46,7 +46,7 @@ def rq_refill(seed_task, use_google: bool = True): db_url_objects = db_url_pool.db_get_not_downloaded() url_space_left = url_space_left - len(db_url_objects) - # 2. get from google + # 2. get from Google google_url_objects = [] if use_google and seed_query is not None: quit_unexpectedly = False @@ -66,7 +66,7 @@ def rq_refill(seed_task, use_google: bool = True): continue prompt = seed_query.web_query - new_url_object = db_url_pool.db_add_url( + _, new_url_object = db_url_pool.db_add_url( url=url, prompt=prompt, parent_uuid=None, @@ -177,23 +177,19 @@ def processing_iteration(): process_url(url_object) -previous_db_not_downloaded = None +previous_tasks_queued = 0 def start_crawler(): - global previous_db_not_downloaded + global previous_tasks_queued while True: - db_query = Query() - db_not_downloaded = db_url_pool.db.search( - db_query.fragment({"is_downloaded": False, "is_rubbish": False}) - ) - db_rubbish = db_url_pool.db.search(db_query.fragment({"is_rubbish": True})) - db_total = db_url_pool.db.all() - - if db_not_downloaded != previous_db_not_downloaded: - print("urls left to be downloaded:", len(db_not_downloaded)) - print("urls marked rubbish:", len(db_rubbish)) - print("url running total:", len(db_total)) - previous_db_not_downloaded = db_not_downloaded + queue_length = len(url_rapid_queue) + if queue_length > previous_tasks_queued: + print(f"{Fore.CYAN}{Style.BRIGHT}RECEIVED NEW TASKS") + print(f"currently executing:", url_rapid_queue[0]) + + if queue_length != previous_tasks_queued: + print(f"{Fore.CYAN}tasks left:", queue_length) + previous_tasks_queued = queue_length processing_iteration() diff --git a/workers/embedder.py b/workers/embedder.py index 0bdeeda..4f7aef4 100644 --- a/workers/embedder.py +++ b/workers/embedder.py @@ -1,7 +1,5 @@ # embedding worker # url_db -> THIS -> THIS.vector_db -from colorama import Fore -from tinydb import Query from tinydb.table import Document from configurator import get_runtime_config @@ -27,13 +25,13 @@ def processing_iteration(): embedding_queue = db_url_pool.db_get_not_embedded(embed_model_name) for url_object in embedding_queue: - document = url_object["text"] - task_uuid = url_object["task_uuid"] + document = url_object.text + task_uuid = url_object.task_uuid db_full_name = utils.gen_vec_db_full_name("embeddings", embed_model_name) db_embeddings.db_add_text_batch(document, db_full_name) - db_url_pool.db_set_url_embedded(url_object["uuid"], embed_model_name) + db_url_pool.db_set_url_embedded(url_object.uuid, embed_model_name) db_crawl_tasks.db_increment_task_embedding_progression( task_uuid, embed_model_name ) @@ -45,12 +43,5 @@ def processing_iteration(): def start_embedder(): global previous_not_embedded while True: - db_not_embedded = db_url_pool.db_get_not_embedded(embedder_config.model_name) - db_total = db_url_pool.db.all() - - if len(db_not_embedded) != previous_not_embedded: - print("urls left to be embedded:", len(db_not_embedded)) - print("url running total:", len(db_total)) - previous_not_embedded = len(db_not_embedded) - + # todo: improve verbosity processing_iteration() diff --git a/workers/summarizer.py b/workers/summarizer.py index 69dde56..fe28c94 100644 --- a/workers/summarizer.py +++ b/workers/summarizer.py @@ -17,7 +17,7 @@ web_news_lookup_prompt, web_wiki_lookup_prompt, ) -from core.tools.model_loader import load_llm, load_embedder +from core.tools.model_loader import load_llm from langchain_core.output_parsers import StrOutputParser from tinydb import Query