Skip to content

Latest commit

 

History

History
254 lines (163 loc) · 11.7 KB

kafka-streams-internals-AbstractTask.adoc

File metadata and controls

254 lines (163 loc) · 11.7 KB

AbstractTask — Base Processor Task

AbstractTask is the base implementation of the Task contract for tasks that FIXME.

Note
AbstractTask is a Java abstract class and cannot be created directly. It is created indirectly for the concrete AbstractTasks.

AbstractTask has taskInitialized flag that is on (i.e. true) to mark when:

The taskInitialized flag is used by the concrete AbstractTasks as an optimization when:

AbstractTask has taskClosed flag that is enabled (i.e. true) to mark when StandbyTask and StreamTask have already been requested to close.

Table 1. AbstractTasks
AbstractTask Description

StandbyTask

StreamTask

Creating AbstractTask Instance

AbstractTask takes the following to be created:

AbstractTask initializes the internal registries and counters.

Note
AbstractTask is a Java abstract class and cannot be created directly. It is created indirectly for the concrete AbstractTasks.

Closing State Manager — closeStateManager Method

void closeStateManager(final boolean writeCheckpoint) throws ProcessorStateException

closeStateManager…​FIXME

Note
closeStateManager is used when…​FIXME

Checking If Topology Uses State Stores — hasStateStores Method

boolean hasStateStores()
Note
hasStateStores is part of Task Contract to…​FIXME.

hasStateStores is positive (and returns true) when ProcessorTopology has at least one local state store.

Flushing All State Stores — flushState Method

void flushState()

flushState simply requests the ProcessorStateManager to flush.

Note
flushState is used exclusively when StreamTask is requested to commit (using the custom flushState).

recordCollectorOffsets Method

Map<TopicPartition, Long> recordCollectorOffsets()

recordCollectorOffsets…​FIXME

Note
recordCollectorOffsets is used when…​FIXME

Registering and Initializing State Stores — registerStateStores Method

void registerStateStores()

registerStateStores simply exits when the ProcessorTopology has no state stores.

registerStateStores requests the StateDirectory for the lock for the state directory of the task (by TaskId).

registerStateStores prints out the following TRACE message to the logs:

Initializing state stores

registerStateStores updateOffsetLimits.

In the end, for every StateStore in the ProcessorTopology, registerStateStores prints out the following TRACE message to the logs:

Initializing store [name]

registerStateStores requests the InternalProcessorContext to uninitialize and then requests the StateStore to initialize (with the InternalProcessorContext and itself).

registerStateStores throws a LockException when requesting the StateDirectory for the lock for the state directory of the task failed:

[logPrefix]Failed to lock the state directory for task [id]

registerStateStores throws a StreamsException when requesting the StateDirectory for the lock for the state directory of the task failed with an IOException:

[logPrefix]Fatal error while trying to lock the state directory for task [id]
Note
registerStateStores is used when the concrete Tasks (StandbyTask and StreamTask) are requested to initialize state stores.

changelogPartitions Method

Collection<TopicPartition> changelogPartitions()
Note
changelogPartitions is part of Task Contract to get the changelog partitions of a task.

changelogPartitions simply requests the ProcessorStateManager for the changelog partitions.

Accessing State Store by Name — getStore Method

StateStore getStore(final String name)
Note
getStore is part of the Task Contract to access the state store by name.

getStore simply requests the ProcessorStateManager for the StateStore by name.

updateOffsetLimits Method

void updateOffsetLimits()

updateOffsetLimits…​FIXME

Note

updateOffsetLimits is used when:

reinitializeStateStoresForPartitions Method

void reinitializeStateStoresForPartitions(
  Collection<TopicPartition> partitions)

reinitializeStateStoresForPartitions simply requests the ProcessorStateManager to reinitializeStateStores for the input partitions and the InternalProcessorContext.

Note

reinitializeStateStoresForPartitions is used when:

Checkpointable Offsets — activeTaskCheckpointableOffsets Method

Map<TopicPartition, Long> activeTaskCheckpointableOffsets()

activeTaskCheckpointableOffsets simply returns an empty collection (of checkpointable offsets).

Internal Properties

Name Description

applicationId

commitNeeded

eosEnabled

Flag that controls whether Exactly-Once Support is enabled (true) or not (false)

Used when (directly or indirectly as isEosEnabled public method):

processorContext

stateMgr

Created when AbstractTask is created

Used when: