Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dataflow #9

Open
MichaelLangbein opened this issue Jul 27, 2022 · 4 comments
Open

Dataflow #9

MichaelLangbein opened this issue Jul 27, 2022 · 4 comments

Comments

@MichaelLangbein
Copy link
Contributor

No description provided.

@nbrinckm
Copy link
Contributor

First idea (german only):

order1 = {
  id: 1,
  constraints: {
    shakyground: {
      quakeMLFile: "https://",
      gmpe: ["gmpe"]
    },
    assetmaster: {
      model: ["Peru-CVT1", "Peru-CVT2"],
    }
  }
}


Dann started Shakyground, da alle Sachen definiert sind, die es braucht.
Es schreibt sein Resultat in die DB (shakeMapXml).
Es schreibt auf den Shakyground-Success-Topic.

Deus lauscht auf dem Shakyground-Success-Topic.
Es startet, findet die Shakemap für die order id in der DB.
Er findet aber keine exposure-Modelle. Und bricht ab.


Assetmaster hat auch auf der Order gelaucht.
Und schreibt dann sein Exposure-Model in die DB.
Es schreibt dann auf das Assetmaster-Success Topic.

Deus horcht auch auf dem Assetmaster Success Topic.
Nun findet es sowohl die Shakemap, als auch das Exposure-Modell in der
DB. (Nebenbei hat es auch schon die Daten von Modelprop drin für diese Order).

Deus kann dann berechnen, schreibt seine Resultate in die DB.
Und er schreibt dann in die Deus-Success-Topic.

Für jeden neuen Wert aus den Shakyground-Success-Topics & Assetmaster-Success-Topics kann deus dann laufen - und berechnet für alles was von
Downstream rein kommt.

@nbrinckm
Copy link
Contributor

TODO @nbrinckm Make it better (maybe with the payloads, maybe with some pseudo code for the db lookip) & in english.

@nbrinckm
Copy link
Contributor

nbrinckm commented Jul 27, 2022

Proposal for a possible data flow

2022-07-27 12:24 CEST

The Problem

We want to have a data flow that allows us to run processes that depend on multiple complex inputs, which are complex outputs of other processes. The overall workflow shouldn't need any further user interaction (except the creation of this order) and the data flow should be completely asynchronosly - and without any unneeded sleeping time.

The example use case aka a more concrete example

In our current flow in the riesgos demonstrator such an example is the process to compute damages (output of the deus process) based on a shaking intensity of an earthquake (shakyground), an exposure model (assetmaster) and the fragiligy functions (modelprop).

A possible data structure

To have some book keeping about what the user asks for (order) and the already calcuated results, we have a database in the background. See #6

The current datamodel on time of writing is the following:

create table processes (
  id serial,
  wps_url varchar(256),
  wps_identifier varchar(256),
  primary key (id)
);

create table users (
  id serial,
  email varchar(256),
  primary key (id)
);

create table orders (
  id serial,
  user_id bigint,
  order_constraints jsonb,
  primary key (id)
);

create table jobs (
  id serial,
  process_id bigint,
  status varchar(16),
  primary key (id),
  foreign key (process_id) references processes(id)
);

create table order_job_refs (
  id serial,
  job_id bigint,
  order_id bigint,
  primary key (id),
  foreign key (order_id) references orders(id),
  foreign key (job_id) references jobs(id)
);

create table complex_outputs (
  id serial,
  job_id bigint,
  wps_identifier varchar(256),
  link varchar(1024),
  mime_type varchar(64),
  xmlschema varchar(256),
  primary key (id),
  foreign key (job_id) references jobs(id)
);

create table complex_outputs_as_inputs(
  id serial,
  wps_identifier varchar(256),
  job_id bigint,
  -- To refer to an already existing complex output
  -- that we are going to reuse.
  complex_output_id bigint,
  primary key (id),
  foreign key (job_id) references jobs(id),
  foreign key (complex_output_id) references complex_outputs(id)
);

create table complex_inputs (
  id serial,
  job_id bigint,
  wps_identifier varchar(256),
  link varchar(1024),
  mime_type varchar(64),
  xmlschema varchar(256),
  primary key (id),
  foreign key (job_id) references jobs(id)
);

create table literal_inputs (
  id serial,
  job_id bigint,
  wps_identifier varchar(256),
  input_value text,
  primary key (id),
  foreign key (job_id) references jobs(id)
);

create table bbox_inputs (
  id serial,
  job_id bigint,
  wps_identifier varchar(256),
  lower_corner_x real,
  lower_corner_y real,
  upper_corner_x real,
  upper_corner_y real,
  crs varchar(32),
  primary key (id),
  foreign key (job_id) references jobs(id)
);

The flow

New Order

In the UI the user starts the request for a new order. Here is defined what should be computed, what the comutation is based on & what possible input parameter constraints are.

In this case the user selects an earthquake (an explicit one that the frontend loaded from quakeledger before - our database catalog) in the fronented & decides to calculate the later products with the 2 ground motion prediction equations, 2 exposure models, and 1 schema.

With this we define the following constraints:

{
  "shakyground": {
    "quakeMLFile": "<link_to_geojson>",
     "gmpe": ["Abrahamson", "Montalva"],
  },
  "assetmaster": {
     "model": ["LimaCVT4_PD40_TI60_5000", "LimaCVT3_PD30_TI70_50000"]
  },
  "modelprop": {
     "schema": "SARA_v1.0"
  }
}

We put this order in the orders table:

id order_constraints user_id
1 {"shakyground": { ... 123

After we stored that in the database (we need to store it before other processes are trying to read it), we can send a message to the pulsar:

ws://pulsar:8080/ws/v2/non-persistent/public/riesogs/new-order

{
  "orderId": 1,
}

Ground motion simulation

The shakyground service listens to the new-order topic.

It extracts the order id & queries the database.

select order_constraints from orders where id = :order_id

With those constraints it can extract the value for quakeMLFile and the gmpe.
The vsgrid that shakyground needs additionally is not specified, but shakyground knows that it has 2 possible parameters, that it can use to calcuate the shakemaps (USGSSlopeBasedTopographyProxy or FromSeismogeotechnicsMicrozonation).

It will create a loop & uses each of them to do the further processing. The second run will not be different from the first run of the wrapper internal logic (except the different parameter). As we also specified different gmpes, we will have runs for all the combinations of gmpe and vsgrid.

With that the shakyground wrapper will write in the database that it starts the processing & will create new entries for
jobs, complex_inputs and literal_inputs:

jobs:

id process_id status
1 1 (shakyground) pending

literal_inputs:

id job_id wps_identifier input_value
1 1 gmpe Abrahamson
2 1 vsgrid USGSSlopeBasedTopographyProxy

complex_inputs:

id job_id wps_identifier link mime_type xmlschema
1 1 quakeMLFile http://... application/vnd.geo+json

We also link the order with the job:

order_jobs_ref:

id job_id order_id
1 1 1

Now that we have those in place we call the real wps that does the work.
We update our jobs table accordingly:

id process_id status
1 1 running

And when the wps call is done, we can update the job entry again:

id process_id status
1 1 succeded

After that we can put the outputs in our complex_outputs table:

id job_id wps_identifier link mime_type xmlschema
1 1 shakeMapFile http://... text/xml http://earthquake.usgs.gov/eqcenter/shakemap
2 1 shakeMapFile http://... application/WMS

With that we are done with this processing within the loop with the explicit parametrization of shakyground with Abrahamson
and USGSSlopeBasedTopographyProxy.

So now we publish a message on the pulsar - and on the shakyground-success topic:

{
  "orderId": 1,
}

We also run the processing for the other parameter combinations of vsgrid and gmpe. For each parameter combination we create a new job, associate those with the order & put all the inputs & outputs in the database.

First try damage computation

The deus wrapper for the damage computation listens on multiple topics - one of them is shakyground-success.

It extracts the order id and starts to extract the inputs for itself.

select  processes.wps_identifier as process_identifier, complex_outputs.*
from complex_outputs
join jobs on jobs.id = complex_outputs.job_id
join order_job_refs on order_job_refs.job_id = jobs.id
join processes on processes.id = jobs.process_id
where order_job_refs.order_id = :order_id

We then get the results from our shakyground service so far:

process_identifier id job_id wps_identifier link mime_type xmlschema
Shakyground Process 1 1 shakeMapFile http://... text/xml http://earthquake.usgs.gov/eqcenter/shakemap
Shakyground Process 2 1 shakeMapFile http://... application/WMS

It can extract the shakeMapFile with text/xml - so deus has the input that it could use as intensity file. However, it doesn't find anything from assetmaster, nor modelprop.

As it has no data for those, it will stop processing right now.

Exposure model (Assetmaster)

Assetmaster listens to the new-order topic, similar as shakyground. Similar to there, we extract the order id & load the constrains from the database.

select order_constraints from orders where id = :order_id

With that we now that we have 2 model parameters that we can use.
For the rest we provide defautl parameters in the wrapper itself.

Similar to shakyground, we create a job, with inputs & outputs in the database for each parameter combination.
Like shakyground, we publish in assetmaster-success after we run the wps process itself & inserted the outputs in the database.

Deus after assetmaster

Deus is triggered also by the assetmaster-success. We extract here also the order id & the existing outputs.
As we don't find the modelprop output, we stop again.

Fragility functions aka Modelprop

Here it is analog to assetmaster & shakyground. We send messages to modelprop-success.

Finally some deus processing

As we also listen to modelprop-success we know now that modelprop finished.
Now we can extract all the needed outputs from the complex_outputs table:

select  processes.wps_identifier as process_identifier, complex_outputs.*
from complex_outputs
join jobs on jobs.id = complex_outputs.job_id
join order_job_refs on order_job_refs.job_id = jobs.id
join processes on processes.id = jobs.process_id
where order_job_refs.order_id = :order_id

We then get the results from our shakyground service so far:

process_identifier id job_id wps_identifier link mime_type xmlschema
Shakyground Process 1 1 shakeMapFile http://... text/xml http://earthquake.usgs.gov/eqcenter/shakemap
Shakyground Process 2 1 shakeMapFile http://... application/WMS
Assetmaster Process 3 2 selectedRowsGeoJson http://... applicattion/json
Modelprop Process 4 3 selectedRows http://... applicattion/json

With those we now that we can make a run of deus.

We create a new entry in the jobs table, fill the literal_input for the schema (see extra point for the problem about how we could extract this).

For the complex inputs, we make it a little bit different - we store them in the complex_outputs_as_inputs table - so that we can create a relationship about which exsting complex data we reuse:

id job_id wps_identifier complex_output_id
1 5 (job id of the new deus job) intensity 1
2 5 exposure 3
3 5 fragility 4

Now we can start the deus process on the WPS, update the job data accordingly & set the complex outputs once we are done.

Once we are done with the job, we send a message with the order id to the deus-success topic.

When modelprop, assetmaster and shakyground provide new data

As deus listens to shakyground-success, modelprop-success and assetmaster-success topics, we can run when there are new data. However, we don't want to calculate then again also for parameter combinations for that we already computed the values.

So the moment we checked for the input data of the processes & we extract the parameter combinations that we can run for, we also need to extract the combinations for that we already computed the resuts for (or started a job some seconds ago).

select  processes.wps_identifier as process_identifier, complex_outputs.*
from complex_outputs_as_inputs
join complex_outputs.id = complex_outputs_as_inputs.complex_output_id
join jobs on jobs.id = complex_outputs.job_id
join order_job_refs on order_job_refs.job_id = jobs.id
join processes on processes.id = jobs.process_id
where order_job_refs.order_id = :order_id

So, with those we know for which parameter combinations we already run deus & can remove those from the sets for that deus is going to run.

Problems

How to extract the schema for the deus call?

It should be possible, as we know the job id that gave us the complex_output of assetmaster.

select *
from literal_inputs
where job_id = :job_id
and wps_identifier = 'schema'

(But maybe this will require a bit more of thinking).

@nbrinckm
Copy link
Contributor

nbrinckm commented Jul 28, 2022

Addition to re-use existing products

2022-07-28 13:34 CEST

The Problem

We want to allow user requests that start with a an existing product (aka job output).

An example

We have an concreate earthquake near the chilenian coast (https://geofon.gfz-potsdam.de/eqinfo/event.php?id=gfz2022oqjb) with magnitude 8.1, depth 60 km (values as the time of writing - real world geofon value may change as the scientsits get more information about it).

We already had an order that created this shakemap.

Orders:

id order_constraints user_id
13 {"shakeyground": {... 1234

Jobs:

id process_id status
42 1 (shakyground) succeded

We have its inputs & outputs:

LiteralInputs:

id job_id wps_identifier input_value
103 42 gmpe Abrahamson
104 42 vsgrid USGSSlopeBasedTopographyProxy

ComplexInputs:

id job_id wps_identifier link mime_type xmlschema
111 42 quakeMLFile http://... application/vnd.geo+json

ComplexOutputs:

id job_id wps_identifier link mime_type xmlschema
204 42 shakeMapFile http://... text/xml http://earthquake.usgs.gov/eqcenter/shakemap
205 42 shakeMapFile http://... application/WMS

The old job is also still associated with an old order:

order_job_ref:

id job_id order_id
222 42 13

The order constraints

The main place to define that we want to reuse the existing shakyground are the order constraints.

We would put something in the database like this:

{
  "shakyground": {
      "job_id": 42,
   },
   // ...
}

Now we put our new order in the database:

Orders:

id order_constraints user_id
13 {"shakyground": {... 1234
14 {"shakyground": {"job_id": 42, ... 2345

And emit a new message in the new-order topic:

{
  "orderId": 14
}

The processing on shakyground

The shakyground wrapper listens to the new-order topic & gets the new message, so that we can extract the order id.
With that we can access the order constraints & extract for our service that we got a job id.

With that we don't need to start a new job, but instead just link the existing job to our new order.

order_job_ref:

id job_id order_id
222 42 13
223 42 14

The wrapper doesn't have to do anything more than emiting a message on the shakyground-success topic.

{
  "orderId": 14
}

With this relationship the later processes can extract the output data of the old jobs for their own order id.

The processing in deus

As before, deus listens to the shakyground-success topic. As the older job of the shakyground processing is now associated with the our order id, deus can extract the output values of shakyground. If assetmaster & modelprop already delivered their results, we can start the processing. If not, we stop & wait for assetmaster and/or modelprop to deliver their results.

For the wrapped deus process there is no change.

Just another kind of caching

In any case a process with its wrapper can decice to either start the main processing (calling the wps itself + do all the bookkeeping in the database) or to associate an existing job with its new order id & just emit a messge on the success topic.

In any case the later processes are able to extract the needed input values from the database by checking the jobs that are linked to the order.

There are two options to do this:

  • the constraints contain an explicit job id
  • all the input parameters match to those that an older job already used

Suggestion for the order constraints structure

With the idea to use the job id in the constraints, designing the structure of the constraints gets a little bit more complicated.

My best idea so far is the following:

{
    "serviceA": {
         "job_id": 123,
    },
   "serviceB": {
        "literal_inputs": {
             "parameter1": ["Possible value 1", "possible value 2"],
             "parameter2": ["One value that needs to be used"],
        },
        "complex_inputs": {
             "parameter3": ["https://url/to/first/parameter/option", "https://url/to/second/paramaeter/option",
             "parameter4": ["https://some/parameter/that/needs/to/be/used"],
        }
}

If there are more steps inbetween

The setting

In the skakyground & deus example, we have the case that we define a result for shakyground which already listens to the new-order topic. So it gets the new order message anyway & can make its own processing easily.

However, imagine we don't start with shakyground here, but have another service that needs to be called before. This process then directly listens to the new-order topic (but shakyground doesn't do that anymore).

So instead of

new order -> shakyground processing -> shakyground success -> deus processing 

we have

new order -> imaginary service -> imaginary success -> shakyground processing -> shakyground success -> deus processing

We still want to fix the shakemap, so we give a job_id in the order constraints.

A problem

When we commit an order the imaginary service doesn't care about the shakygrounds constraints, it either starts anyway & computes everything (triggering shakyground later too), or it stops as it has not enough contraints nor data. (And as the requesting user we don't care about it, as we already said that we want to start with the exact shakyground result). In the later case it doesn't even emit a message on its success topic. The shakyground wrapper will not run, nor the deus wrapper.

Possible ideas

There are some ideas to address that:

1. Add more constraints

We also put the constraints for the jobs that delivered the shakemap. The earlier jobs can just give success messages as the data is already in the database.

{
  "imaginary": {
      "job_id": 41,
   },
  "shakyground": {
      "job_id": 42,
   },
   // ...
}

It looks like a tight coupling in the (we need to send all the job ids that gave us the shakemap), but it could be extracted from the database easily.

2. Let all processes listen to new orders

We let also the later processes (in this case shakyground; in the real case it would be deus & the system reliability service) listen to the new-order topic directly. Earlier processes would just ignore the order as they don't have enough contraints to run.
However this here would also mean that the later stages are always listing to new orders even if they can't run. They will just ignore the new order message then & will be triggered again by the success messages of the other processes.

new order (not constrained) -> shakyground -> ignore (no data from imaginary)
new order (not constrained) -> imaginary -> imaginary success -> shakyground -> shakyground success -> deus
new order (shakemap with job_id) -> shakyground -> shakyground success -> deus
new order (shakemap with job_id) -> imaginary -> ignore (no constraints for imaginary)

But this last example would have require that the process needs to have some kind of constraints to run overall. If the process would need no constraints, it would still start processing (unless there really is the job id also for the process in the constraints).
But maybe this would be a case where the overall caching mechanism could work - to not re-calculate the products & just deliver success messages for the next topics as it has no reason to re-run the processing with the very same input parameters that the run already used that delivered the shakemap.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants