Skip to content

Commit

Permalink
Fix the job scheduler parser, action listeners, and multi-node test (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
opensearch-trigger-bot[bot] authored Jul 12, 2024
1 parent 95d0099 commit 7b9a329
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@
import org.opensearch.securityanalytics.threatIntel.jobscheduler.TIFSourceConfigRunner;
import org.opensearch.securityanalytics.threatIntel.model.SATIFSourceConfig;
import org.opensearch.securityanalytics.threatIntel.model.monitor.TransportThreatIntelMonitorFanOutAction;
import org.opensearch.securityanalytics.threatIntel.model.TIFJobParameter;
import org.opensearch.securityanalytics.threatIntel.resthandler.RestDeleteTIFSourceConfigAction;
import org.opensearch.securityanalytics.threatIntel.resthandler.RestGetIocFindingsAction;
import org.opensearch.securityanalytics.threatIntel.resthandler.RestGetTIFSourceConfigAction;
Expand Down Expand Up @@ -409,18 +410,20 @@ public ScheduledJobRunner getJobRunner() {

@Override
public ScheduledJobParser getJobParser() {
// TODO: @jowg fix the job parser to parse previous tif job
return (xcp, id, jobDocVersion) -> {
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp);
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
String fieldName = xcp.currentName();
xcp.nextToken();
switch (fieldName) {
case SOURCE_CONFIG_FIELD:
return SATIFSourceConfig.parse(xcp, id, jobDocVersion.getVersion());
default:
log.error("Job parser failed for [{}] in security analytics job registration", fieldName);
xcp.skipChildren();
if (xcp.nextToken() == XContentParser.Token.START_OBJECT) {
switch (fieldName) {
case SOURCE_CONFIG_FIELD:
return SATIFSourceConfig.parse(xcp, id, null);
default:
log.error("Job parser failed for [{}] in security analytics job registration", fieldName);
xcp.skipChildren();
}
} else {
return TIFJobParameter.parseFromParser(xcp, id, jobDocVersion.getVersion());
}
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import org.opensearch.jobscheduler.spi.JobExecutionContext;
import org.opensearch.jobscheduler.spi.ScheduledJobParameter;
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
import org.opensearch.securityanalytics.threatIntel.jobscheduler.TIFJobRunner;
import org.opensearch.securityanalytics.threatIntel.jobscheduler.TIFSourceConfigRunner;
import org.opensearch.securityanalytics.threatIntel.model.TIFJobParameter;
import org.opensearch.securityanalytics.threatIntel.sacommons.TIFSourceConfig;

public class SecurityAnalyticsRunner implements ScheduledJobRunner {
Expand All @@ -30,6 +32,8 @@ private SecurityAnalyticsRunner() {}
public void runJob(ScheduledJobParameter job, JobExecutionContext context) {
if (job instanceof TIFSourceConfig) {
TIFSourceConfigRunner.getJobRunnerInstance().runJob(job, context);
} else if (job instanceof TIFJobParameter) {
TIFJobRunner.getJobRunnerInstance().runJob(job, context);
} else {
String errorMessage = "Invalid job type, found " + job.getClass().getSimpleName() + "with id: " + context.getJobId();
log.error(errorMessage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.securityanalytics.model.threatintel.BaseEntity;
import org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings;
import org.opensearch.transport.RemoteTransportException;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -226,7 +227,7 @@ public void createIndexIfNotExists(final ActionListener<Void> listener) {
log.debug("{} index created", getEntityName());
listener.onResponse(null);
}, e -> {
if (e instanceof ResourceAlreadyExistsException) {
if (e instanceof ResourceAlreadyExistsException || (e instanceof RemoteTransportException && e.getCause() instanceof ResourceAlreadyExistsException)) {
log.debug("index {} already exist", getEntityIndexMapping());
listener.onResponse(null);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ public class TIFJobParameter implements Writeable, ScheduledJobParameter {
private static final String SCHEDULE_FIELD = "schedule";
private static final String ENABLED_TIME_FIELD = "enabled_time";
private static final String ENABLED_TIME_FIELD_READABLE = "enabled_time_field";
private static final String state_field = "state";
private static final String STATE_FIELD = "state";
private static final String INDICES_FIELD = "indices";
private static final String update_stats_field = "update_stats";
private static final String UPDATE_STATS_FIELD = "update_stats";


/**
Expand All @@ -70,9 +70,9 @@ public class TIFJobParameter implements Writeable, ScheduledJobParameter {
/**
* Additional fields for tif job
*/
public static final ParseField STATE_PARSER_FIELD = new ParseField(state_field);
public static final ParseField STATE_PARSER_FIELD = new ParseField(STATE_FIELD);
public static final ParseField INDICES_PARSER_FIELD = new ParseField(INDICES_FIELD);
public static final ParseField UPDATE_STATS_PARSER_FIELD = new ParseField(update_stats_field);
public static final ParseField UPDATE_STATS_PARSER_FIELD = new ParseField(UPDATE_STATS_FIELD);

/**
* Default variables for job scheduling
Expand Down Expand Up @@ -128,6 +128,71 @@ public class TIFJobParameter implements Writeable, ScheduledJobParameter {
*/
private UpdateStats updateStats;

public static TIFJobParameter parseFromParser(XContentParser xcp, String id, Long version) throws IOException {
String name = null;
Instant lastUpdateTime = null;
Boolean isEnabled = null;
TIFJobState state = null;
Instant enabledTime = null;
IntervalSchedule schedule = null;
List<String> indices = new ArrayList<>();
UpdateStats updateStats = null;

// parsing is coming from the security analytics plugin parser, so it begins with value_string token
XContentParserUtils.ensureExpectedToken(XContentParser.Token.VALUE_STRING, xcp.currentToken(), xcp);
while (true) {
String fieldName = xcp.currentName();
switch (fieldName) {
case NAME_FIELD:
name = xcp.text();
break;
case LAST_UPDATE_TIME_FIELD:
lastUpdateTime = Instant.ofEpochMilli(xcp.longValue());
break;
case ENABLED_TIME_FIELD:
if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) {
enabledTime = null;
} else if (xcp.currentToken().isValue()) {
enabledTime = Instant.ofEpochMilli(xcp.longValue());
} else {
XContentParserUtils.throwUnknownToken(xcp.currentToken(), xcp.getTokenLocation());
enabledTime = null;
}
break;
case ENABLED_FIELD:
isEnabled = xcp.booleanValue();
break;
case SCHEDULE_FIELD:
schedule = (IntervalSchedule) ScheduleParser.parse(xcp);
break;
case STATE_FIELD:
state = toState(xcp.text());
break;
case INDICES_FIELD:
indices = new ArrayList<>();
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp);
while (xcp.nextToken() != XContentParser.Token.END_ARRAY) {
indices.add(xcp.text());
}
break;
case UPDATE_STATS_FIELD:
updateStats = UpdateStats.PARSER.parse(xcp, null);
break;
default:
xcp.skipChildren();
}

if (xcp.nextToken() == XContentParser.Token.END_OBJECT){
break;
} else {
xcp.nextToken();
}
}

return new TIFJobParameter(name, lastUpdateTime, enabledTime, isEnabled, schedule, state, indices, updateStats);
}

// parser used for integ test
public static TIFJobParameter parse(XContentParser xcp, String id, Long version) throws IOException {
String name = null;
Instant lastUpdateTime = null;
Expand All @@ -150,7 +215,7 @@ public static TIFJobParameter parse(XContentParser xcp, String id, Long version)
case ENABLED_FIELD:
isEnabled = xcp.booleanValue();
break;
case state_field:
case STATE_FIELD:
state = toState(xcp.text());
break;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,6 @@ private void storeAndDeleteIocIndices(List<STIX2IOC> stix2IOCList, ActionListene
listener.onFailure(ex);
}
));
listener.onFailure(e);
})
);
}
Expand Down Expand Up @@ -499,7 +498,6 @@ private void downloadAndSaveIocsToRefresh(ActionListener<SATIFSourceConfigDto> l
listener.onFailure(e);
}
));
listener.onFailure(downloadAndSaveIocsError);
}));
}

Expand Down

0 comments on commit 7b9a329

Please sign in to comment.