Skip to content
This repository has been archived by the owner on Dec 31, 2020. It is now read-only.

Support stateful select function #7

Open
EronWright opened this issue Mar 28, 2016 · 2 comments
Open

Support stateful select function #7

EronWright opened this issue Mar 28, 2016 · 2 comments
Assignees

Comments

@EronWright
Copy link
Collaborator

The Flink API supports stateful mapper functions and so should the HTM DSL.

An important scenario is to store predictions over time for comparison purposes with later events, to calculate an error rate (for example). See the HotGym example.

The workaround at this time is to use a trivial select function followed by mapWithState.

@EronWright EronWright self-assigned this Mar 28, 2016
@EronWright
Copy link
Collaborator Author

I tried and failed - see branch issue-7. Some reflection is needed on how to ensure that the state would be kept separate if the input stream were keyed.

The below error relates to the fact that the StatefulFunction trait is for keyed streams only. See apache/flink#1239 for details.

Caused by: java.lang.RuntimeException: Error while getting state
    at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:120)
    at org.apache.flink.streaming.api.scala.function.StatefulFunction$class.open(StatefulFunction.scala:51)
    at org.numenta.nupic.flink.streaming.api.scala.HTMStream$$anon$1.open(HTM.scala:123)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:314)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:214)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: State key serializer has not been configured in the config. This operation cannot use partitioned state.
    at org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedState(AbstractStateBackend.java:199)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:260)
    at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:118)

@EronWright
Copy link
Collaborator Author

With commit 5c05f39, we now use a keyed stream internally in all cases. This may eliminate the issue I mentioned above.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests

1 participant