diff --git a/ckanext/searchterms/command.py b/ckanext/searchterms/command.py index d4f826a..6e31c2e 100644 --- a/ckanext/searchterms/command.py +++ b/ckanext/searchterms/command.py @@ -3,7 +3,7 @@ import ckan.plugins.toolkit as toolkit from .implementations import is_eligible -from .jobs import check_search_terms_resource +from .jobs import check_search_terms_resource, enqueue_terms_job log = logging.getLogger(__name__) @@ -31,24 +31,43 @@ def submit_all_pkgs(self): {"model": model, "ignore_auth": True}, {"include_private": True, "rows": 1000}, ) + total_pkg_count = 0 + total_res_count = 0 + total_pkg_failed_count = 0 + total_pkg_validated_count = 0 log.info("Found {0} packages".format(package_list.get("count", "bzzt"))) for pkg in package_list.get("results", []): + total_pkg_count += 1 pkgid = pkg.get("id") + " (" + pkg.get("name") + ")" print("Validating package " + pkgid) enqued = [] nenqued = [] - validated = self.identify_pkg(pkg.get("id", "")) + try: + validated = self.identify_pkg(pkg.get("id", "")) + except Exception: + log.error("Error validating package " + pkgid) + validated = False if validated: print("Succesfully validated package and submitted " + pkgid) - self.resubmit_pkg(validated) + total_res_count += self.resubmit_pkg(validated) + total_pkg_validated_count += 1 enqued.append(pkgid) else: print("Unable to validate package " + pkg.get("id", "")) nenqued.append(pkgid) + total_pkg_failed_count += 1 print("Enqueued:") print(", ".join(enqued)) print("Not Enqueued:") print(", ".join(nenqued)) + log.info( + "Total {} packages found. {} failed validation and did not submit. Submitted {} packages. {} total searchterm jobs".format( + total_pkg_count, + total_pkg_failed_count, + total_pkg_validated_count, + total_res_count, + ) + ) def resubmit_pkg(self, package): pkgid = package.get("id") + " (" + package.get("name") + ")" @@ -57,20 +76,19 @@ def resubmit_pkg(self, package): log.info("No resources found for dataset {}".format(pkgid)) return + total_eligible_resources = 0 + log.info("Starting search terms job for package {0}".format(pkgid)) for resource in package.get("resources", []): rsrcid = resource.get("id") + " (" + resource.get("name") + ")" if is_eligible(resource): + total_eligible_resources += 1 if self.run_in_foreground: log.info("Checking search terms for resource " + rsrcid) check_search_terms_resource(resource, resource_was_updated=True) else: log.info("Enqueueing search terms job for resource " + rsrcid) - toolkit.enqueue_job( - check_search_terms_resource, - [resource, True], - rq_kwargs={"timeout": 21600}, - queue="searchterms", - ) + enqueue_terms_job(resource, True) else: log.debug("Skipping search terms job for resource " + rsrcid) + return total_eligible_resources diff --git a/ckanext/searchterms/jobs.py b/ckanext/searchterms/jobs.py index f33d7bd..5ad44fa 100644 --- a/ckanext/searchterms/jobs.py +++ b/ckanext/searchterms/jobs.py @@ -22,7 +22,7 @@ # If resource is eligible, add terms job to worker queue -def enqueue_terms_job(resource): +def enqueue_terms_job(resource, resource_was_updated=False): # Check package_id exists to make sure it's not a package if ( resource.get("name") == TERMS_RSRC_NAME @@ -48,7 +48,7 @@ def enqueue_terms_job(resource): try: job = tk.enqueue_job( check_search_terms_resource, - [resource], + [resource, resource_was_updated], rq_kwargs={"timeout": 21600}, queue="searchterms", )