Skip to content

Commit

Permalink
chore: fix failing test by wrapping textual querys in sqlalchemy text…
Browse files Browse the repository at this point in the history
… func
  • Loading branch information
coder1963 committed Nov 30, 2024
1 parent 68872cb commit 2205445
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 5 deletions.
3 changes: 2 additions & 1 deletion icon_contracts/workers/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
# from confluent_kafka.serialization import StringSerializer
from loguru import logger
from pydantic import BaseModel
from sqlalchemy import text

from icon_contracts.config import settings

Expand All @@ -25,7 +26,7 @@ def get_current_offset(session):
output = {}
while True:
logger.info(f"Getting kafka job with job_id = {settings.JOB_ID}")
sql = f"select * from kafka_jobs WHERE job_id='{settings.JOB_ID}';"
sql = text(f"select * from kafka_jobs WHERE job_id='{settings.JOB_ID}';")
result = session.execute(sql).fetchall()
session.commit()

Expand Down
9 changes: 5 additions & 4 deletions tests/integration/worker/test_partition_init.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pytest
from sqlalchemy import text

from icon_contracts.config import settings
from icon_contracts.workers.kafka import get_current_offset
Expand All @@ -10,17 +11,17 @@
def backfill_job(db):
def f(job_id):
with db as session:
sql = "DROP TABLE IF EXISTS kafka_jobs;"
sql = text("DROP TABLE IF EXISTS kafka_jobs;")
session.execute(sql)
session.commit()

sql = "CREATE TABLE IF NOT EXISTS kafka_jobs (job_id varchar, worker_group varchar, topic varchar, partition bigint, stop_offset bigint, PRIMARY KEY (job_id, worker_group, topic, partition));"
sql = text("CREATE TABLE IF NOT EXISTS kafka_jobs (job_id varchar, worker_group varchar, topic varchar, partition bigint, stop_offset bigint, PRIMARY KEY (job_id, worker_group, topic, partition));")
session.execute(sql)
session.commit()

num_msgs = 1000
for i in range(0, 12):
sql = (
sql = text(
f"INSERT INTO kafka_jobs (job_id, worker_group, topic, partition, stop_offset) VALUES "
f"('{job_id}','{settings.CONSUMER_GROUP}-{job_id}',"
f"'{settings.CONSUMER_TOPIC_BLOCKS}','{i}','{num_msgs}');"
Expand All @@ -33,7 +34,7 @@ def f(job_id):


def test_get_current_offset(db, backfill_job):
settings.JOB_ID = "test6"
settings.JOB_ID = text("test6")
backfill_job(settings.JOB_ID)

with db as session:
Expand Down

0 comments on commit 2205445

Please sign in to comment.