Skip to content

Commit

Permalink
use enqueue function for resubmit pkg
Browse files Browse the repository at this point in the history
  • Loading branch information
mavocado4 committed Jan 9, 2025
1 parent fa9360a commit e9d4602
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 11 deletions.
36 changes: 27 additions & 9 deletions ckanext/searchterms/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import ckan.plugins.toolkit as toolkit

from .implementations import is_eligible
from .jobs import check_search_terms_resource
from .jobs import check_search_terms_resource, enqueue_terms_job

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -31,24 +31,43 @@ def submit_all_pkgs(self):
{"model": model, "ignore_auth": True},
{"include_private": True, "rows": 1000},
)
total_pkg_count = 0
total_res_count = 0
total_pkg_failed_count = 0
total_pkg_validated_count = 0
log.info("Found {0} packages".format(package_list.get("count", "bzzt")))
for pkg in package_list.get("results", []):
total_pkg_count += 1
pkgid = pkg.get("id") + " (" + pkg.get("name") + ")"
print("Validating package " + pkgid)
enqued = []
nenqued = []
validated = self.identify_pkg(pkg.get("id", ""))
try:
validated = self.identify_pkg(pkg.get("id", ""))
except Exception:
log.error("Error validating package " + pkgid)
validated = False
if validated:
print("Succesfully validated package and submitted " + pkgid)
self.resubmit_pkg(validated)
total_res_count += self.resubmit_pkg(validated)
total_pkg_validated_count += 1
enqued.append(pkgid)
else:
print("Unable to validate package " + pkg.get("id", ""))
nenqued.append(pkgid)
total_pkg_failed_count += 1
print("Enqueued:")
print(", ".join(enqued))
print("Not Enqueued:")
print(", ".join(nenqued))
log.info(
"Total {} packages found. {} failed validation and did not submit. Submitted {} packages. {} total searchterm jobs".format(
total_pkg_count,
total_pkg_failed_count,
total_pkg_validated_count,
total_res_count,
)
)

def resubmit_pkg(self, package):
pkgid = package.get("id") + " (" + package.get("name") + ")"
Expand All @@ -57,20 +76,19 @@ def resubmit_pkg(self, package):
log.info("No resources found for dataset {}".format(pkgid))
return

total_eligible_resources = 0

log.info("Starting search terms job for package {0}".format(pkgid))
for resource in package.get("resources", []):
rsrcid = resource.get("id") + " (" + resource.get("name") + ")"
if is_eligible(resource):
total_eligible_resources += 1
if self.run_in_foreground:
log.info("Checking search terms for resource " + rsrcid)
check_search_terms_resource(resource, resource_was_updated=True)
else:
log.info("Enqueueing search terms job for resource " + rsrcid)
toolkit.enqueue_job(
check_search_terms_resource,
[resource, True],
rq_kwargs={"timeout": 21600},
queue="searchterms",
)
enqueue_terms_job(resource, True)
else:
log.debug("Skipping search terms job for resource " + rsrcid)
return total_eligible_resources
4 changes: 2 additions & 2 deletions ckanext/searchterms/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@


# If resource is eligible, add terms job to worker queue
def enqueue_terms_job(resource):
def enqueue_terms_job(resource, resource_was_updated=False):
# Check package_id exists to make sure it's not a package
if (
resource.get("name") == TERMS_RSRC_NAME
Expand All @@ -48,7 +48,7 @@ def enqueue_terms_job(resource):
try:
job = tk.enqueue_job(
check_search_terms_resource,
[resource],
[resource, resource_was_updated],
rq_kwargs={"timeout": 21600},
queue="searchterms",
)
Expand Down

0 comments on commit e9d4602

Please sign in to comment.