Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add realtime inference consumer #90

Open
wants to merge 50 commits into
base: dev
Choose a base branch
from
Open

Conversation

yatharthranjan
Copy link
Member

@yatharthranjan yatharthranjan commented Oct 20, 2021

Realtime Consumers are consumers for the results from the invocation of a model from the realtime analysis pipeline (see model-builder, model-invocation-endpointand ksqldb). The results from the pipeline are in JSON and the invocation is called by a KSQL function which gets inference results then puts the results in a kafka topic. These consumers are meant to consume data from this topic.

3 terms used -

  1. Action: Any task to be performed is an action. This can be sending a notification or an email
    or just logging something.
  2. Condition: This is a predicate that will be evaluated on the incoming data. For instance, this
    can be used to evaluate if the ML model run crossed a threshold of some sort.
  3. RealtimeInferenceConsumer: This is the base consumer for realtime results topics. Each consumer
    can be configured with a set of Conditions and Actions. If all the conditions evaluate to be
    true, then all the actions are triggered.

Currently, only one Condition is provided which evaluates a JSONPath expression. This should be sufficient for most simple use cases.
More complex use cases can create concrete implementations of the Condition
interface or ConditionBase abstract class. Currently, the JsonPath expression is read from the
configuration file and hence is static for a particular consumer and topic. Later, we can also
provide this through AppConfig so it can be dynamic based on userId or projectId.

The supported config (for instance an intervention using aRMT app) is of the format -

realtime-consumers:
  - name: 'lstm-lung-study-consumer' # Name of the consumer
    topic: 'lung_study_lstm_ad_inference' # Kafka topic to consume from (this should contain the inference results)
    notify_errors:
      email_addresses:
        - '[email protected]'
    conditions:
      - name: 'LocalJsonPathCondition' # Name of the condition
        projects: [ 'radar-test' ]  # Only evaluate for these projects
        subjects: [ 'sub-1', 'sub-2' ]  # Only evaluate for these subjects
        properties:
          jsonpath: '$[?(@.invocation_result.anomaly_detected == true)]' # JsonPath expression to evaluate
          key: 'invocation_result' # Key that contains data to evaluate 
    actions:
      - name: 'ActiveAppNotificationAction' # Name of the action
        projects: [ 'radar-test' ] # Only execute for these projects
        subjects: [ 'sub-1', 'sub-2' ]  # Only execute for these subjects
        properties:
          questionnaire_name: 'ers' # Name of the questionnaire to trigger
          time_of_day: '09:00:00' # Local user time of day to trigger at
          default_timezone: 'Europe/London' # Default timezone to use for the time of day if not found in the appserver
          appserver_base_url: 'http://localhost:8080/' # Base URL of the appserver
          management_portal_token_url: 'http://localhost/managementportal/api/oauth/token' # URL to get the management portal token
          client_id: 'realtime_consumer' # Client ID for the management portal
          client_secret: 'secret' # Client secret for the management portal
          metadata_key: 'invocation_result' # Key that contains the metdata to be forwarded to the aRMT app
      - name: 'EmailUserAction'
        projects: [ 'radar-test' ]
        subjects: [ 'sub-1', 'sub-2' ]
        properties:
          email_addresses: [ '[email protected]' ]

Note: The properties section is specific to each Action and Condition. Please take a look at the
condition and action docs for the keys supported. If the projects or subjects key is not
specified the action and condition will be used on all projects or subjects respectively.

yatharthranjan and others added 7 commits July 17, 2018 10:44
- Includes various implementations includeing appserver notification actions and email user.
- Conditions evaluation using json path (currently locally, later read jsonpath from appconfig too).
@yatharthranjan yatharthranjan marked this pull request as draft October 20, 2021 14:05
Copy link
Contributor

@blootsvoets blootsvoets left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this looks like a very good start! At some point the threshold may need to come from radar-app-config service instead of from a fixed value. This may also be a good entry point to enable / disable the condition for a given project (e.g. to only process records for certain projects).

.getProperties()
.getOrDefault(
"management_portal_token_url",
"https://radar-cns-platform.rosalind.kcl.ac.uk/managementportal/api/ouath/token");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Preferably keep this out of the code. If you want to provide defaults, it would be better to define a base URL somewhere, and use that plus the default path as the default.

new EmailSender(
(String) props.getOrDefault("host", "localhost"),
(Integer) props.getOrDefault("port", 25),
(String) props.getOrDefault("from", "[email protected]"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not have a default.

objectMapper = new ObjectMapper();

if (tokenUrl == null || tokenUrl.isEmpty()) {
logger.warn("MP Token url was not provided. Will use unauthenticated requests to appserver.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this ever the case? It would be better to exclude this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unlikely in a production environment. The appserver does support a no-security mode where unauthenticated requests can be made (particularly useful for dev environments). I will remove it once I have tested it with the appserver.

@yatharthranjan
Copy link
Member Author

yatharthranjan commented Nov 30, 2021

At some point the threshold may need to come from radar-app-config service instead of from a fixed value.

Yes definitely can create impl to read the config from the app-config.

This may also be a good entry point to enable / disable the condition for a given project (e.g. to only process records for certain projects).

Yes a good idea, will have a think through how it can be done, since then conditions will need to be accessed based on the project in the record in real-time. I guess a very simple first approach could be to just to add a getConditionsFor(String project) and getActionsFor(String project) and possibly also a user-specific variant like getConditionsFor(String project, String user) to the RealtimeConsumerConfig, and use that on each incoming record.

EDIT: Actually made projects a part of conditions and actions, so you can specify a projects list to filter-

...
     conditions:
        - name: 'LocalJsonPathCondition'
          projects: ['radar-test']
...

If no projects are provided, works as default on all projects. But now I am wondering how this will be achieved when we move to app-config - will the conditions have the projects key? Or will it be on the request path like /config/client/realtime-consumers/projects/{project} ?

* <p>To be used with the model-invocation-endpoint and KSQL API_INFERENCE function to evaluate and
* take action on incoming results from realtime inference on data.
*/
public class RealtimeInferenceConsumer implements KafkaMonitor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand it correctly, all of these Consumers are realtime. We can remove that word from the class name I think.
Why not reuse/extend from existing AbstractKafkaMonitor?

Copy link
Member Author

@yatharthranjan yatharthranjan Jan 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the adjective realtime was added for inference and not for consumers. So these are consumers for realtime inference as compared to batch or historic inference. But I agree if it seems confusing best to remove it. AbstractKafkaMonitor was not extended just to allow more flexibility (like for now a different constructor spec) but can definitely extend from AbstractKafkaMonitor if we need functionality already present there.

.map(c -> createConsumer(handler.getRadarProperties(), c)));
}

private static KafkaMonitor createConsumer(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would call it InferenceMonitor, InferenceMonitorFactory for consistency

}
}

public Map<String, Object> createMessage(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are referring to the create operation that is done on the service that receives the request. Right? I think this should be either sendMessage or addMessage.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes good point

blootsvoets and others added 18 commits February 15, 2022 15:35
- Add realtime consumer to docker builds for integration tests
- Fix starting for docker application when no schemas in schema registry
- Updates classes and code based on expected API spec of appserver
- Reuse classes wherever possible
- Fix timezone conversions and add jitter and tolerance to the time calculations
- Simplify configs and string formatting
- Handle unhandled exceptions
- Add more properties and fixes
- update fields and time calculations based on app expectations
- Add smtp to docker-compose.yml
- Also catch all exceptions from action execution, so other actions can proceed.
@yatharthranjan yatharthranjan changed the title WIP: Add realtime consumer Add realtime consumer Feb 24, 2022
@yatharthranjan yatharthranjan marked this pull request as ready for review February 24, 2022 12:37
@yatharthranjan yatharthranjan changed the title Add realtime consumer Add realtime inference consumer Feb 24, 2022
@yatharthranjan
Copy link
Member Author

@mpgxvii, is it possible for you to review this when you have some free bandwidth? I think you plan to use this in the near future, so might be good to review and familiarise, unless you plan to use something else?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: In Progress
Development

Successfully merging this pull request may close these issues.

3 participants