From 900e41c6088f2c1ae3d1cfad94cc1082273c7cea Mon Sep 17 00:00:00 2001 From: Steve McGrath Date: Wed, 22 May 2024 12:28:13 -0500 Subject: [PATCH] Exception in subtask upsert was hidden by multi-threading #262 The SubTask upsert method was failing do to the mapping change from 2.0.4, however was masked because this was occuring within a worker thread. Corrected the issue, Added logic to only use threading when the max_workers is above 1 (for testing), and also added a failsafe to stop execution after processing findings if the number of observed exceptions from the worker jobs is above 0. --- tenb2jira/models.py | 2 +- tenb2jira/processor.py | 51 +++++++++++++++++++++++++++++++++--------- tenb2jira/version.py | 2 +- 3 files changed, 43 insertions(+), 12 deletions(-) diff --git a/tenb2jira/models.py b/tenb2jira/models.py index e373ab9..e777ab0 100644 --- a/tenb2jira/models.py +++ b/tenb2jira/models.py @@ -28,7 +28,7 @@ class SubTaskMap(Base): sqlite_on_conflict_unique='IGNORE' ) asset_id: Mapped[UUID] - jira_key: Mapped[str] + jira_id: Mapped[str] plugin_id: Mapped[int] = mapped_column(ForeignKey('task.plugin_id')) is_open: Mapped[bool] updated: Mapped[datetime] diff --git a/tenb2jira/processor.py b/tenb2jira/processor.py index f8dc73b..25903c2 100644 --- a/tenb2jira/processor.py +++ b/tenb2jira/processor.py @@ -155,8 +155,8 @@ def upsert_task(self, s: Session, finding: dict) -> (int | None): # If the finding related to this task is not in an open state, then # there is no reason to continue. Return back a NoneType value. if not task.is_open: - log.info(f'Finding related to Task {task.fields[self.plugin_id]} ' - 'is closed, skipping' + log.info(f'Finding related to Plugin {task.fields[self.plugin_id]}' + ' is closed, skipping' ) return None @@ -221,7 +221,7 @@ def upsert_task(self, s: Session, finding: dict) -> (int | None): s.add(sql) s.commit() log.info(f'Created Task "{resp.key}" and added to SQL Cache.') - return resp.id + return resp.key # In the event that multiple tasks are returned from the search, # something went seriously wrong. We will log to the console, then @@ -245,7 +245,7 @@ def upsert_subtask(self, associated action. """ task = self.jira.subtask.generate(finding) - task.fields['parent'] = {'id': str(task_id)} + task.fields['parent'] = {'key': str(task_id)} sql = s.query(SubTaskMap)\ .filter_by(finding_id=UUID(task.fields[self.finding_id]))\ .one_or_none() @@ -293,7 +293,7 @@ def upsert_subtask(self, # return the Jira issue id back to the caller. case 1: sql = SubTaskMap(plugin_id=task.fields[self.plugin_id], - asset_id=task.fields[self.asset_id], + asset_id=task.fields[self.asset_id][0], finding_id=task.fields[self.finding_id], jira_id=page.issues[0].key, is_open=task.is_open, @@ -332,7 +332,7 @@ def upsert_subtask(self, log.info(f'Created Subtask "{resp.key}" and ' 'added to SQL Cache.' ) - return resp.id + return resp.key # In the event that multiple tasks are returned from the # search, something went seriously wrong. We will log to the @@ -342,6 +342,7 @@ def upsert_subtask(self, msg = ('Multiple Jira SubTasks match Finding ' f'"{task.fields[self.finding_id]}". Jira IDs are ' f'"{", ".join(i.key for i in page.issues)}".' + ' SKIPPING.' ) log.error(msg) raise Exception(msg) @@ -396,6 +397,7 @@ def finding_job(self, finding: dict): with Session(self.engine) as session: task_id = self.upsert_task(s=session, finding=finding) self.upsert_subtask(s=session, task_id=task_id, finding=finding) + session.commit() def sync(self): """ @@ -411,11 +413,40 @@ def sync(self): # build the db cache self.build_cache() - # Using as many threads as we need (up to the max configured) - # go ahead and process the findings. - with ThreadPoolExecutor(max_workers=self.max_workers) as e: + # If only a single thread was set, then we wont even run through a + # threaded execution worker. + if self.max_workers <= 1: for finding in findings: - e.submit(self.finding_job, finding) + self.finding_job(finding) + + # Using as many threads as we need (up to the max configured) + # go ahead and process the findings. We will store the job results + # and confirm that no exceptions had occurred. if any did, then we'll + # raise those exceptions and refuse to continue with closing any issues + # to ensure that we don't put the project into a weird state. + else: + jobs = [] + exc_count = 0 + + # launch the threat executor and store each future job for later + # analysis. + with ThreadPoolExecutor(max_workers=self.max_workers) as e: + for finding in findings: + jobs.append(e.submit(self.finding_job, finding)) + + # Check each job to see if any exceptions were raised. If so, then + # log those exceptions and increment the exception counter. + for job in jobs: + if job.exception(): + log.exception(job.exception()) + exc_count += 1 + + # If we have a non-zero value from the exception counter, then + # log the total number of exceptions encountered and terminate. + if exc_count > 0: + log.error(f'Refusing to continue ({exc_count} errors) ' + '& terminating sync.') + return # cleanup the dead hosts and clear out the empty tasks. self.close_dead_assets(asset_cleanup) diff --git a/tenb2jira/version.py b/tenb2jira/version.py index 024361b..3a5e5af 100644 --- a/tenb2jira/version.py +++ b/tenb2jira/version.py @@ -1 +1 @@ -version = '2.0.4' +version = '2.0.5'