Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Errors in calling a DAP within a DAP #57

Open
shahamit opened this issue May 10, 2016 · 8 comments
Open

Errors in calling a DAP within a DAP #57

shahamit opened this issue May 10, 2016 · 8 comments

Comments

@shahamit
Copy link

With snappy 0.2.1 I am trying to call a data aware procedure (DAP) inside a DAP. There are couple of issues with the nested call

  1. Using the snappy-shell client I call the DAP which executes the DAP (verified through logs) but the nested DAP call executes multiple times instead of once.
  2. The nested call later fails with a NPE. The error trace is shared here.

The DAP code is shared here.

I would appreciate for any inputs on this. Thanks!

@kneeraj
Copy link

kneeraj commented May 11, 2016

Hi Shahamit,

I could not find whether the APP.PARTITIONED_AIRLINE_TEST is a row table or a column table. For row table it should work and we will look into this for problems. But if the table is column type then problems are expected as we have not tested the DAP with SnappyData product. In fact we are thinking of removing the support of DAP completely. ( Still under deliberation though ) as SnappyData will have the capabilities of running both submitted jobs and any type of queries supported by Spark SQL on SnappyData. We think that probably all the capabilities of DAP will become redundant.

@shahamit
Copy link
Author

Thanks for the follow up.

  1. I didn't specify the "USING row OPTIONS()" or "USING column OPTIONS" in the create table statement. What is the default table type?
  2. How do you get the capability that DAP provides i.e. executing custom code on the node where the data resides with snappy spark support?
  3. Further investigating on the DAP execution, I found inconsistencies in the number of times the procedure gets called on the server when executed with the "ON TABLE WHERE" clause. I could see the procedure getting called twice with the connection id as -2 and remote flag set to true. This lead to NPE. Executing the DAP without the "ON TABLE WHERE" clause works as expected.
  4. This is related question - With a partitioned table how can we query rows only from a single partition present on a node? This is basically to verify and test the DAP execution.

Thanks!

@kneeraj
Copy link

kneeraj commented May 11, 2016

Please see answers inline..

I didn't specify the "USING row OPTIONS()" or "USING column OPTIONS" in the create table statement. What is the default table type?

Neeraj: The default is Row Type.

How do you get the capability that DAP provides i.e. executing custom code on the node where the data resides with snappy spark support?

Neeraj: DAP, as you have rightly stated takes advantage of data location and routes the execution based on this. The other use of this could be to break a complex query, like a distributed join query which the rowstore in itself does not support, into multiple queries and correlate them in the java function. Well the application can also do it, and for the user it is probably one and the same, except if it does in the application there will be multiple trips from the server cluster and the app side. If they are not in the same network then this could have perf impact. So the second part is taken care of by snappy spark as it has the capability to execute these queries. For the first part you are right that if you want the entire procedure body to run as a single unit of execution then we will lose that if we remove DAP capabilities. But having said that we are also making the snappy spark plan preparation intelligent to use data location information too and adjust the execution plan accordingly. Also we have plans to give an alternative to this where in the entire job execution can be hinted a location but this is still under discussion. But thanks for your insightful question. A good point to discuss.

Further investigating on the DAP execution, I found inconsistencies in the number of times the procedure gets called on the server when executed with the "ON TABLE WHERE" clause. I could see the procedure getting called twice with the connection id as -2 and remote flag set to true. This lead to NPE. Executing the DAP without the "ON TABLE WHERE" clause works as expected.

Neeraj: ON TABLE WHERE CLAUSE will actually go and execute on the partition where it is located. If the location determined is multiple then it will go and execute on all such locations and hence multiple calls but at different nodes.If you remove the extension it will be executed locally on the server to which your client is connected. I don't understand the bug which you are pointing to and we will have to look into it further.

This is related question - With a partitioned table how can we query rows only from a single partition present on a node? This is basically to verify and test the DAP execution.

Neeraj: If you keep the number of buckets to 1 then only one partition will be created and all the data will go into the same partition or bucket. You can use the ON TABLE extension to just let your code get executed on that partition. This you can use for testing.

@shahamit
Copy link
Author

Some more follow up questions

But having said that we are also making the snappy spark plan preparation intelligent to use data location information too and adjust the execution plan accordingly. Also we have plans to give an alternative to this where in the entire job execution can be hinted a location but this is still under discussion.

I followed the discussion on implementing DAP capabilities in snappy spark execution. I wonder how would the spark api determine where the data resides? Would it query gemfire sys tables to point it to the server where the row resides?

If the location determined is multiple then it will go and execute on all such locations and hence multiple calls but at different nodes.

The table on which I call the DAP on (the command is "CALL myProc ON TABLE PARTITIONED_TABLE_TEST WHERE COL1 = 'x' ") is a partitioned table without any replication. I assume that would not lead to having multiple copies of data on different nodes. Connecting to leader jmx port I could see that the table data (rows) are distributed across the cluster and total matches with the total number of rows in the table.

My create table statement is as below for reference

CREATE TABLE PARTITIONED_AIRLINE_TEST (CODE char(25), DESCRIPTION char(100)) PARTITION BY COLUMN (CODE) PERSISTENT SYNCHRONOUS;

I don't understand the bug which you are pointing to and we will have to look into it further.

The bug or an inconsistency which I am seeing right now with the DAP execution is that the DAP is getting executed sometimes multiple times on a node and subsequent calls of the nested call (kindly have a look at the code I have shared in my original post) fail because they are getting executed on an invalid connection object.

If you keep the number of buckets to 1 then only one partition will be created and all the data will go into the same partition or bucket.

Would that be a right test? I want to basically see if by changing the WHERE clause in the CALL myproc statement actually goes and executes the code on different nodes in the cluster. Is there no way to know (from system tables) what rows are present on what partition?

Thanks.

@kneeraj
Copy link

kneeraj commented May 12, 2016

I followed the discussion on implementing DAP capabilities in snappy spark execution. I wonder how would the spark api determine where the data resides? Would it query gemfire sys tables to point it to the server where the row resides?

In Spark it can be done directly using RDDs partitioning and combination of filter and mapPartitions transformations and zipped iterators on multiple RDDs. If the RDDs in questions are colocated then the entire execution can be shipped to the executor nodes. Of course if non-colocated RDDs are zipped that will be a logical problem and will incur costs of shuffle. However directly iterating only on the partition of your choice is not as easy as in DAP. If you have a use case in mind then let us know we can try writing a code snippet for that.

The table on which I call the DAP on (the command is "CALL myProc ON TABLE PARTITIONED_TABLE_TEST WHERE COL1 = 'x' ") is a partitioned table without any replication. I assume that would not lead to having multiple copies of data on different nodes. Connecting to leader jmx port I could see that the table data (rows) are distributed across the cluster and total matches with the total number of rows in the table.

Yes no replication without explicit redundancy specification. So when you say ' COL1 = 'x'' you mean CODE = 'x'. This should go to only 1 node.

I looked at your code and the src code as well and looks like for nested calls there is a bug. Will file a jira and take this up. BTW can you please try a workaround. Instead of using the same connection for the recursive call, can you use another pool of connections, or a dedicated connection in thread local and try that.

Would that be a right test? I want to basically see if by changing the WHERE clause in the CALL myproc statement actually goes and executes the code on different nodes in the cluster. Is there no way to know (from system tables) what rows are present on what partition?

It won't be right to do for the actual use but for testing you could do that. But if you just want to know whether the function is actually going to different nodes. You can just print the result of this query in your procedure method:
VALUES dsid()

@shahamit
Copy link
Author

However directly iterating only on the partition of your choice is not as easy as in DAP. If you have a use case in mind then let us know we can try writing a code snippet for that.

I will elaborate on the use case we are trying to solve with DAP and you can suggest me if it can be done through snappy's spark api. We are trying to implement cell level dependencies within multiple tables. It's similar to the functionality excel offers just that in our case it could span across multiple tables. Imagine multiple cells in an excel worksheet having interdependent formula's where updating a value in one cell causes another cell value to change and that cell update causes another cell value to update and so on and so forth. It is kind of graph of dependencies that determines what is the next row cell that needs to be updated. Kindly let me know if you have follow up questions.

BTW can you please try a workaround. Instead of using the same connection for the recursive call, can you use another pool of connections, or a dedicated connection in thread local and try that.

Thinking more about this test, I wonder how would transactions work when we create new connections for nested DAP calls? Can you please elaborate on it?

But if you just want to know whether the function is actually going to different nodes. You can just print the result of this query in your procedure method:
VALUES dsid()

I will let you know if this helps.

@kneeraj
Copy link

kneeraj commented May 13, 2016

I will elaborate on the use case we are trying to solve with DAP and you can suggest me if it can be done through snappy's spark api. We are trying to implement cell level dependencies within multiple tables. It's similar to the functionality excel offers just that in our case it could span across multiple tables. Imagine multiple cells in an excel worksheet having interdependent formula's where updating a value in one cell causes another cell value to change and that cell update causes another cell value to update and so on and so forth. It is kind of graph of dependencies that determines what is the next row cell that needs to be updated. Kindly let me know if you have follow up questions.
Ok. understand that.
Thinking more about this test, I wonder how would transactions work when we create new connections for nested DAP calls? Can you please elaborate on it?
You are right. They won't be in the same transaction. In this workaround you will have to take care of the failures in your code.

@shahamit
Copy link
Author

In Spark it can be done directly using RDDs partitioning and combination of filter and mapPartitions transformations and zipped iterators on multiple RDDs.

I was getting myself acquainted with the spark and snappy api's that you mention above. Following is the code flow I tried to visualize -

  1. Read snappy tables and load them as data frames
  2. Apply transformations like filter
  3. Assuming the data is partitioned correctly, execute a function passed to the "forEachPartition()" api to perform operations only on the partition(s) that satifies the transformations. This function would internally call the snappy api's to mutate data (update/insert).

Below are few queries that I have

  1. The snappy api does not provide a way to update the RDD as a whole or to update few selected elements in the RDD. The current update api accepts a tableName along with other attributes. Is there a way to work on individual elements of the RDD in order to do surgical updates as I explained in my use case below.
  2. Applying various transformations on the loaded data frames would cause new data frames to be created after each transformation which would increase the memory usage. for e.g. assume we applied a filter on 1 million rows to extract out just 50k rows, we would have two dataframes in memory, right?

Let me know your thoughts on the approach and my queries.
In case you got a chance to implement the use case that I had detailed I would be glad to have a look.

Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants