Skip to content

Commit

Permalink
Exception in subtask upsert was hidden by multi-threading #262
Browse files Browse the repository at this point in the history
The SubTask upsert method was failing do to the mapping change from 2.0.4, however was masked because this was occuring within a worker thread.  Corrected the issue, Added logic to only use threading when the max_workers is above 1 (for testing), and also added a failsafe to stop execution after processing findings if the number of observed exceptions from the worker jobs is above 0.
  • Loading branch information
SteveMcGrath committed May 22, 2024
1 parent 03b63da commit 900e41c
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 12 deletions.
2 changes: 1 addition & 1 deletion tenb2jira/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class SubTaskMap(Base):
sqlite_on_conflict_unique='IGNORE'
)
asset_id: Mapped[UUID]
jira_key: Mapped[str]
jira_id: Mapped[str]
plugin_id: Mapped[int] = mapped_column(ForeignKey('task.plugin_id'))
is_open: Mapped[bool]
updated: Mapped[datetime]
Expand Down
51 changes: 41 additions & 10 deletions tenb2jira/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ def upsert_task(self, s: Session, finding: dict) -> (int | None):
# If the finding related to this task is not in an open state, then
# there is no reason to continue. Return back a NoneType value.
if not task.is_open:
log.info(f'Finding related to Task {task.fields[self.plugin_id]} '
'is closed, skipping'
log.info(f'Finding related to Plugin {task.fields[self.plugin_id]}'
' is closed, skipping'
)
return None

Expand Down Expand Up @@ -221,7 +221,7 @@ def upsert_task(self, s: Session, finding: dict) -> (int | None):
s.add(sql)
s.commit()
log.info(f'Created Task "{resp.key}" and added to SQL Cache.')
return resp.id
return resp.key

# In the event that multiple tasks are returned from the search,
# something went seriously wrong. We will log to the console, then
Expand All @@ -245,7 +245,7 @@ def upsert_subtask(self,
associated action.
"""
task = self.jira.subtask.generate(finding)
task.fields['parent'] = {'id': str(task_id)}
task.fields['parent'] = {'key': str(task_id)}
sql = s.query(SubTaskMap)\
.filter_by(finding_id=UUID(task.fields[self.finding_id]))\
.one_or_none()
Expand Down Expand Up @@ -293,7 +293,7 @@ def upsert_subtask(self,
# return the Jira issue id back to the caller.
case 1:
sql = SubTaskMap(plugin_id=task.fields[self.plugin_id],
asset_id=task.fields[self.asset_id],
asset_id=task.fields[self.asset_id][0],
finding_id=task.fields[self.finding_id],
jira_id=page.issues[0].key,
is_open=task.is_open,
Expand Down Expand Up @@ -332,7 +332,7 @@ def upsert_subtask(self,
log.info(f'Created Subtask "{resp.key}" and '
'added to SQL Cache.'
)
return resp.id
return resp.key

# In the event that multiple tasks are returned from the
# search, something went seriously wrong. We will log to the
Expand All @@ -342,6 +342,7 @@ def upsert_subtask(self,
msg = ('Multiple Jira SubTasks match Finding '
f'"{task.fields[self.finding_id]}". Jira IDs are '
f'"{", ".join(i.key for i in page.issues)}".'
' SKIPPING.'
)
log.error(msg)
raise Exception(msg)
Expand Down Expand Up @@ -396,6 +397,7 @@ def finding_job(self, finding: dict):
with Session(self.engine) as session:
task_id = self.upsert_task(s=session, finding=finding)
self.upsert_subtask(s=session, task_id=task_id, finding=finding)
session.commit()

def sync(self):
"""
Expand All @@ -411,11 +413,40 @@ def sync(self):
# build the db cache
self.build_cache()

# Using as many threads as we need (up to the max configured)
# go ahead and process the findings.
with ThreadPoolExecutor(max_workers=self.max_workers) as e:
# If only a single thread was set, then we wont even run through a
# threaded execution worker.
if self.max_workers <= 1:
for finding in findings:
e.submit(self.finding_job, finding)
self.finding_job(finding)

# Using as many threads as we need (up to the max configured)
# go ahead and process the findings. We will store the job results
# and confirm that no exceptions had occurred. if any did, then we'll
# raise those exceptions and refuse to continue with closing any issues
# to ensure that we don't put the project into a weird state.
else:
jobs = []
exc_count = 0

# launch the threat executor and store each future job for later
# analysis.
with ThreadPoolExecutor(max_workers=self.max_workers) as e:
for finding in findings:
jobs.append(e.submit(self.finding_job, finding))

# Check each job to see if any exceptions were raised. If so, then
# log those exceptions and increment the exception counter.
for job in jobs:
if job.exception():
log.exception(job.exception())
exc_count += 1

# If we have a non-zero value from the exception counter, then
# log the total number of exceptions encountered and terminate.
if exc_count > 0:
log.error(f'Refusing to continue ({exc_count} errors) '
'& terminating sync.')
return

# cleanup the dead hosts and clear out the empty tasks.
self.close_dead_assets(asset_cleanup)
Expand Down
2 changes: 1 addition & 1 deletion tenb2jira/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = '2.0.4'
version = '2.0.5'

0 comments on commit 900e41c

Please sign in to comment.