Skip to content

Commit

Permalink
chg ! celery boost
Browse files Browse the repository at this point in the history
  • Loading branch information
vitali-yanushchyk-valor committed Nov 29, 2024
1 parent ca42b4f commit e23307d
Show file tree
Hide file tree
Showing 10 changed files with 1,334 additions and 147 deletions.
12 changes: 5 additions & 7 deletions src/country_workspace/contrib/aurora/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,20 @@

from django.db.transaction import atomic

from country_workspace.constants import BATCH_NAME_DEFAULT
from country_workspace.contrib.aurora.client import AuroraClient
from country_workspace.models import AsyncJob, Batch, Household, Individual, User
from country_workspace.models import AsyncJob, Batch, Household, Individual
from country_workspace.utils.fields import clean_field_name


def sync_aurora_job(job: AsyncJob) -> dict[str, Any]:
def sync_aurora_job(job: AsyncJob) -> dict[str, int]:
"""
Synchronizes data from the Aurora system into the database for the given job within an atomic transaction.
Args:
job (AsyncJob): The job instance containing configuration and context for synchronization.
Returns:
dict[str, Any]: A dictionary with counts of households and individuals created, e.g.,
{"households": total_hh, "individuals": total_ind}.
dict[str, int]: A dictionary with counts of households and individuals created.
"""
total_hh = total_ind = 0
client = AuroraClient()
Expand Down Expand Up @@ -46,10 +44,10 @@ def _create_batch(job: AsyncJob) -> Batch:
Batch: The newly created batch instance.
"""
return Batch.objects.create(
name=job.config.get("batch_name") or BATCH_NAME_DEFAULT,
name=job.config.get("batch_name"),
program=job.program,
country_office=job.program.country_office,
imported_by=User.objects.get(pk=job.config.get("imported_by_id")),
imported_by=job.owner,
)


Expand Down
5 changes: 1 addition & 4 deletions src/country_workspace/datasources/rdi.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,11 @@
from hope_smart_import.readers import open_xls_multi

from country_workspace.models import AsyncJob, Batch, Household, Program
from country_workspace.utils.fields import clean_field_name

RDI = Union[str, io.BytesIO]


def clean_field_name(v: str) -> str:
return v.replace("_h_c", "").replace("_h_f", "").replace("_i_c", "").replace("_i_f", "").lower()


def import_from_rdi_job(job: AsyncJob) -> dict[str, int]:
return import_from_rdi(
job.file,
Expand Down
1 change: 0 additions & 1 deletion src/country_workspace/models/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ class JobType(models.TextChoices):
TASK = "TASK", "Task"
BULK_UPDATE_HH = "BULK_UPDATE_HH"
BULK_UPDATE_IND = "BULK_UPDATE_IND"
AURORA_SYNC = "AURORA_SYNC"

type = models.CharField(max_length=50, choices=JobType.choices)
program = models.ForeignKey("Program", related_name="jobs", on_delete=models.CASCADE)
Expand Down
1 change: 0 additions & 1 deletion src/country_workspace/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from redis_lock import Lock

from country_workspace.config.celery import app
from country_workspace.contrib.aurora.sync import sync_aurora_job
from country_workspace.models import AsyncJob

logger = logging.getLogger(__name__)
Expand Down
24 changes: 15 additions & 9 deletions src/country_workspace/workspaces/admin/program.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,20 @@
from django.http import HttpRequest, HttpResponse, HttpResponseRedirect
from django.shortcuts import render
from django.urls import reverse
from django.utils import timezone
from django.utils.translation import gettext as _

from admin_extra_buttons.api import button, link
from admin_extra_buttons.buttons import LinkButton
from hope_flex_fields.models import DataChecker
from strategy_field.utils import fqn

from country_workspace.constants import BATCH_NAME_DEFAULT
from country_workspace.contrib.aurora.sync import sync_aurora_job
from country_workspace.state import state

from ...contrib.aurora.forms import ImportAuroraForm
from ...datasources.rdi import import_from_rdi_job
from ...models import AsyncJob, Batch
from ...contrib.aurora.forms import ImportAuroraForm
from ...utils.flex_fields import get_checker_fields
from ..models import CountryProgram
from ..options import WorkspaceModelAdmin
Expand Down Expand Up @@ -192,7 +193,7 @@ def import_rdi(self, request: HttpRequest, pk: str) -> "HttpResponse":
form = ImportFileForm(request.POST, request.FILES)
if form.is_valid():
batch, __ = Batch.objects.get_or_create(
name=form.cleaned_data["batch_name"] or ("Batch %s" % timezone.now()),
name=form.cleaned_data["batch_name"] or BATCH_NAME_DEFAULT,
program=program,
country_office=program.country_office,
imported_by=state.request.user,
Expand Down Expand Up @@ -256,18 +257,23 @@ def import_aurora(self, request: HttpRequest, pk: str) -> HttpResponse:
if request.method == "POST":
form = ImportAuroraForm(request.POST)
if form.is_valid():
j: AsyncJob = AsyncJob.objects.create(
program=program,
type=AsyncJob.JobType.AURORA_SYNC,
job: AsyncJob = AsyncJob.objects.create(
type=AsyncJob.JobType.TASK,
action=fqn(sync_aurora_job),
batch=None,
file=None,
config={**form.cleaned_data, "imported_by_id": request.user.id},
program=program,
owner=request.user,
config={
"batch_name": form.cleaned_data["batch_name"] or BATCH_NAME_DEFAULT,
"household_name_column": form.cleaned_data["household_name_column"],
},
)
j.queue()
job.queue()
self.message_user(
request,
_("The import task from Aurora has been successfully queued. Asynchronous task ID: {0}.").format(
j.curr_async_result_id
job.curr_async_result_id
),
level=messages.SUCCESS,
)
Expand Down
5 changes: 3 additions & 2 deletions tests/contrib/aurora/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def mock_aurora_data() -> dict[str, Any]:
"batch_name": "Batch 1",
"household_name_column": "family_name",
},
"imported_by_id": 1,
# "imported_by_id": 1,
}


Expand Down Expand Up @@ -80,9 +80,10 @@ def user():
@pytest.fixture
def job(mock_aurora_data, program, batch, user):
return AsyncJobFactory(
type=AsyncJob.JobType.AURORA_SYNC,
type=AsyncJob.JobType.TASK,
program=program,
batch=batch,
owner=user,
config={
**mock_aurora_data["form_cleaned_data"],
"imported_by_id": user.pk,
Expand Down
4 changes: 2 additions & 2 deletions tests/contrib/aurora/test_aurora_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from country_workspace.models import Batch, Household, Office, Program, User


def test_create_batch_success(mock_aurora_data, job, user):
def test_create_batch_success(mock_aurora_data, job):
batch = _create_batch(job)
assert isinstance(batch, Batch)
assert isinstance(batch.country_office, Office)
Expand All @@ -21,7 +21,7 @@ def test_create_batch_success(mock_aurora_data, job, user):
assert batch.name == mock_aurora_data["form_cleaned_data"]["batch_name"]
assert batch.program == job.program
assert batch.country_office == job.program.country_office
assert batch.imported_by == user
assert batch.imported_by == job.owner


def test_create_household_success(mock_aurora_data, job):
Expand Down
Loading

0 comments on commit e23307d

Please sign in to comment.