Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scaling out operators - InvalidLifecycleStatusException #71

Open
elkasoapy opened this issue Mar 17, 2016 · 7 comments
Open

Scaling out operators - InvalidLifecycleStatusException #71

elkasoapy opened this issue Mar 17, 2016 · 7 comments

Comments

@elkasoapy
Copy link

It seems like the new type of SEEP query (uk.ac.imperial.lsds.seep.api.operator.SeepLogicalQuery) is not able to properly scale out operators. For example, the static-scaleout-pipeline-stateless-query example is not running with the current SEEP version as it triggers an InvalidLifecycleStatusException.

Looking further into this I found that this event is triggered by lines 135-138 in function setInitialPhysicalInstancesForLogicalOperator in uk.ac.imperial.lsds.seep.api.QueryBuilder as the given opId is expected "not to be" in usedIds. I would rather think that this doesn't make any sense, to scale out an operator its id should have already been used, right? However, if it seems that if I take out this comparison, then a network exception arises in the scaled-out operator:

Exception in thread "Network-Reader-0" java.nio.BufferUnderflowException at java.nio.Buffer.nextGetIndex(Buffer.java:506) at java.nio.HeapByteBuffer.getInt(HeapByteBuffer.java:361) at uk.ac.imperial.lsds.seepworker.core.input.InputBuffer.forwardTuples(InputBuffer.java:88) at uk.ac.imperial.lsds.seepworker.core.input.InputBuffer.readFrom(InputBuffer.java:64) at uk.ac.imperial.lsds.seepworker.comm.NetworkSelector$Reader.run(NetworkSelector.java:347) at java.lang.Thread.run(Thread.java:745)

Any clues about how this can be fixed?

@raulcf
Copy link
Owner

raulcf commented Mar 17, 2016

Thanks,

First, the comparison should be the opposite, as you point out. We should enforce that the opId is indeed in usedIds. Also, the message of the exception that is thrown if this is not the case must change.

Regarding the exception. First, make sure that after deploying the query the LOG messages say the correct thing. If you scale out an operator A into A1 and A2, its downstream should configure an input stream with 2 inputs. That will give you a lot of information about how to debug the exception.

Once you detect the root cause of the exception I can help in providing insights on how to fix it.

@elkasoapy
Copy link
Author

The LOG messages say the correct thing and the SeepLogicalQuery object is built correctly in terms of Upstream and Downstream connections using appropriate opIds.

I think that the issues may be comming from the new Data classes, especially from the worker side (I've debugged the seep master process long enough to believe that the problem does not come from that side). The exception comes from the NetworkSelector class, which implements the DataStoreSelector interface. It is not clear to me what the DataReferenceManager, Dataset and DataStoreSelectorFactory classes do (and represent, and what is their relationship with the rest of the SEEP classes) but my guess is that these (which I believe are especially important for MDFs) have some kind of conflict with the scaling out process. Any clue about what this?

@raulcf
Copy link
Owner

raulcf commented Mar 18, 2016

Two quick comments:

  • have you specified the hint in the base class to make the query be "scheduled"?
  • worst case, until you figure this out, you can simply set new downstream manually in the dataflow, instead of declaratively say how many you want. Just make sure to set up different ids and different stream ids.

@elkasoapy
Copy link
Author

As at this point what I'm trying to do is just directly run the static-scaleout-pipeline-stateless-query example, I have not changed it to be "scheduled". I understand that in MDFs our goal is to run a scheduled query, however, I don't think that we can leave the materialized examples just broken (and it makes way easier to develop an algorithm).

For the static-scaleout-pipeline-stateless-query using several streamIDs for the connections seems to do the trick. For the static-scaleout-pipeline-stateful-query this doesn't do the trick as SEEP needs to manage the state and it breaks when using the api.sendKey method.

I'm not sure if this comes as a related issue to having "duplicated" DataReference objects when scaling out. For instance, in method deployQueryToNodes within the MaterializedQueryManager class, when inputs and outputs (lines 162-163) are created, for scaled-out operators we would be having different DataReference objects which are "logically" equals, which means, that for scaled-out operators with just one input, we would end up having several inputs that refer to the same DataStore and ControlEndPoint. Is this what we should expect in the DataReference-DataStore design?

@raulcf
Copy link
Owner

raulcf commented Mar 23, 2016

I don't think that we can leave the materialized examples just broken (and it makes
way easier to develop an algorithm).

I'm not suggesting to leave anything broken. All I'm saying is that if you cannot fix the materialized version now, maybe you could try the scheduled one.

For instance, in method deployQueryToNodes within the MaterializedQueryManager class,
when inputs and outputs (lines 162-163) are created, for scaled-out operators we would
be having different DataReference objects which are "logically" equals, which means, that
for scaled-out operators with just one input, we would end up having several inputs that
refer to the same DataStore and ControlEndPoint.

The problem is then that there is not a check for operators that are scaled out. This can be fixed by working on the function MaterializedQueryManager.generateOutputDataReferences.

Specifically, every time the loop gets a new downstream, it checks whether it is a MarkerSink (that branch is fine, as there can only be one sink in this case). If not it creates one DataReference.

Before creating the DataReference it is necessary to check whether the downstream operator we are working with has been assigned more than 1 physical instance. You ca do this by querying SeepLogicalQuery, that contains a method called "getInitialPhysicalInstancesForLogicalOperator" that will give you the number of physical instances given the opId. Once you have this number, you just need to create a DataReference (in the same way that is done) per instance.

This will also imply that line 229 should be a list of DataReferences, so that you can keep all the DataReferences that you create per node, and then you add them to the HashSet in line 240.

Hope this helps a bit. Let me know if this helps you to make progress forward. We can then discuss the next problem you find.

@elkasoapy
Copy link
Author

I don't understand how that would solve the problem as I can't see the difference with how the code works at the moment.

A DataReference contains the "ownerID" (operator that can serve it), a DataStore object (schema, DataStoreType & configuration) and a ControlEndPoint (who to contact to request the DataReference). At the moment, the current code is creating a "different" DataReference per logical operator (as we have several logical operators if we are scaling out). As the DataReference only has information about the upstream operator, the new created DataReferences are "logically" equals.

To my understanding this stops making sense when creating the inputs list (line 163), as this adds all the outputs of one node as the inputs of the subsequent nodes. For scaled-out operators this means that their input contains all the DataReferences for the scaled-out operators, which seems really weird. How is this supposed to work?

@raulcf
Copy link
Owner

raulcf commented Mar 23, 2016

the current code is creating a "different" DataReference per logical operator (as we have
several logical operators if we are scaling out). As the DataReference only has information
about the upstream operator, the new created DataReferences are "logically" equals.

This sounds correct. The data that is to be served lives in the same node. Do you think that the upstream node is creating only one output? (regardless the number of downstream nodes?)

You said you had checked out the output logs and they were ok. Would you mind pasting the relevant output for one of the workers here as well as the query (num operators and scale out factor) your are using?

thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants