Skip to content

Commit

Permalink
feat: update cron
Browse files Browse the repository at this point in the history
to store concepts in vector db
  • Loading branch information
marcus-ny committed Nov 10, 2024
1 parent f0758c0 commit 0c91122
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 3 deletions.
8 changes: 6 additions & 2 deletions backend/src/cron/fetch_articles.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from src.common.constants import GUARDIAN_API_KEY
from sqlalchemy import select
from src.embeddings.store_concepts import store_daily_article_concepts
from src.embeddings.vector_store import store_documents
from src.events.models import Article, ArticleSource, Event
from src.common.database import engine
Expand Down Expand Up @@ -155,7 +156,7 @@ async def run(limit: int = 30):
# Add new articles to database
await populate_daily_articles_cna()

# Process new articles i.e. find articles that we have not generated events for
# Get new articles i.e. find articles that we have not generated events for
articles = process_new_articles()

# # Generate events from articles, written to lm_events_output.json
Expand All @@ -168,7 +169,10 @@ async def run(limit: int = 30):

store_documents(analyses)

await generate_concepts()
# NOTE: this is the part to store concepts in database
article_concepts_with_id = await generate_concepts()
await store_daily_article_concepts(article_concepts_with_id)

print(analyses)


Expand Down
22 changes: 22 additions & 0 deletions backend/src/embeddings/store_concepts.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from src.events.models import ArticleConcept, Concept, Event
from langchain_core.documents import Document

from src.lm.generate_concepts import ArticleConceptsWithId


from src.common.database import engine
from sqlalchemy.orm import Session
Expand All @@ -31,6 +33,26 @@ def fetch_all_article_concepts(limit: int = None) -> list[ArticleConcept]:
return session.scalars(query).all()


async def store_daily_article_concepts(article_concepts: list[ArticleConceptsWithId]):
concepts = get_new_article_concepts(article_concepts)
await store_concepts_async(concepts)


def get_new_article_concepts(
article_concepts: list[ArticleConceptsWithId], limit: int = None
) -> list[ArticleConcept]:
"""
Given a list of ArticleConceptsWithId objects, return a list of ArticleConcept objects
"""
article_ids = [article_concept.article_id for article_concept in article_concepts]

with Session(engine) as session:
query = select(ArticleConcept).where(ArticleConcept.article_id.in_(article_ids))
if limit:
query = query.limit(limit)
return session.scalars(query).all()


async def store_concepts_async(concepts: list[ArticleConcept], batch_size: int = 200):
semaphore = asyncio.Semaphore(CONCURRENCY)
tasks = []
Expand Down
2 changes: 1 addition & 1 deletion backend/src/lm/generate_concepts.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ def add_concepts_to_db(article_concepts: list[ArticleConceptsWithId]):

async def generate_concepts(limit: int | None = None, add_to_db: bool = True):
with Session(engine) as session:
# query db for article
# query db for articles without concept
subquery = select(ArticleConcept.article_id)
query = (
select(Article)
Expand Down

0 comments on commit 0c91122

Please sign in to comment.