Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Projected result of a SELECT query is too big #233

Open
yadid opened this issue Jun 22, 2017 · 5 comments
Open

Projected result of a SELECT query is too big #233

yadid opened this issue Jun 22, 2017 · 5 comments

Comments

@yadid
Copy link

yadid commented Jun 22, 2017

We are using the spark-riak-connector to query a riakTS cluster and getting a com.basho.riak.client.core.netty.RiakResponseException: Projected result of a SELECT query is too big.
We thought that by increasing the number of executors we would avoid this error but this does not seem to be the case.
Is the query range divided into several smaller range queries and then distributed to each of the executors ?
We are trying to avoid having to segment the range into several smaller ranges within in our code.

@srgg
Copy link
Contributor

srgg commented Jun 23, 2017

Could you please provide a querying example?

@yadid
Copy link
Author

yadid commented Jun 23, 2017

Query is as follows:

_sc.riakTSTable(ts_table_name).sql(s"SELECT * FROM $ts_table_name WHERE time > 1382846337500 and time < 1383846337500 and source = '123' and instance = 'instance1'")

table is as follow:

"CREATE TABLE FloatStream " +
"(source varchar not null, " +
"instance varchar not null, " +
"time timestamp not null, " +
"val double not null," +
"PRIMARY KEY ( " +
"(source, instance, quantum(time, 1, 'd')), " +
" source, instance, time )) WITH (n_val=1);

The query spans 10 quanta

@yadid
Copy link
Author

yadid commented Jun 23, 2017

Additional info:
I also tried to query using dataframes. My query is as follows:

val df = sqlContext.read.option("spark.riak.input.split.count", "100").option("spark.riak.connection.host","myhost.media.com:8087").format("org.apache.spark.sql.riak").load(ts_table_name).select("time", "source", "instance").filter(s"time >= $from ) AND time $to AND source= 'mysource' AND instance='myinstance'")

I received the following error:

coveragePlanResult - CoveragePlanOperation returns at least one coverage entry: 'CoverageEntry [host=0.0.0.0, port=8087, fieldName=time, lowerBound=1382846337500, lowerBoundInclusive=true, upperBound=1382918400000, upperBoundInclusive=false, description=FloatStream / time >= 1382846337500 and time < 1382918400000]' -- with IP address '0.0.0.0'.
Execution will be failed due to the imposibility of using IP '0.0.0.0' for querying data from the remote Riak.
java.lang.RuntimeException: CoveragePlanOperation returns at least one coverage entry with ip '0.0.0.0'.

@yadid
Copy link
Author

yadid commented Jun 23, 2017

Managed to get past that one as well. Apparently I did not have the partitioning field configured. I added option("spark.riak.partitioning.ts-range-field-name", "time") and now the dataframe is created successfully. However, when I try to create an additional dataframe using the existing spark context I get the following error:

Caused by: java.util.concurrent.ExecutionException: com.basho.riak.client.core.netty.RiakResponseException: overload
at com.basho.riak.client.core.FutureOperation.throwExceptionIfSet(FutureOperation.java:325)
at com.basho.riak.client.core.FutureOperation.get(FutureOperation.java:311)
at com.basho.riak.spark.query.QueryTS$$anonfun$nextChunk$2.apply(QueryTS.scala:66)
at com.basho.riak.spark.query.QueryTS$$anonfun$nextChunk$2.apply(QueryTS.scala:65)
at com.basho.riak.spark.rdd.connector.RiakConnector$$anonfun$withSessionDo$1.apply(RiakConnector.scala:58)
at com.basho.riak.spark.rdd.connector.RiakConnector$$anonfun$withSessionDo$1.apply(RiakConnector.scala:58)
at com.basho.riak.spark.rdd.connector.RiakConnector.closeSessionAfterUse(RiakConnector.scala:63)
at com.basho.riak.spark.rdd.connector.RiakConnector.withSessionDo(RiakConnector.scala:58)
at com.basho.riak.spark.query.QueryTS.nextChunk(QueryTS.scala:65)
at com.basho.riak.spark.query.TSDataQueryingIterator.prefetch(TSDataQueryingIterator.scala:58)
at com.basho.riak.spark.query.TSDataQueryingIterator.columnDefs(TSDataQueryingIterator.scala:44)
at com.basho.riak.spark.rdd.RiakTSRDD.computeTS(RiakTSRDD.scala:73)
at com.basho.riak.spark.rdd.RiakTSRDD.compute(RiakTSRDD.scala:105)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

@srgg
Copy link
Contributor

srgg commented Jun 24, 2017

@yadid The reason of the error you've got for the first DF case is a default Riak TS configuration. You just need to re-configure Riak TS to use real IPs instead of providing 0.0.0.0 for the PB listener. Here is an example http://docs.basho.com/riak/kv/2.2.3/using/running-a-cluster/#select-an-ip-address-and-port

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants