Skip to content

Commit

Permalink
Merge pull request #44 from signalfx/wip/handle-expired-tsid-message
Browse files Browse the repository at this point in the history
Support expired-tsid stream message
  • Loading branch information
tedoc2000 authored Dec 1, 2016
2 parents b077506 + 5782c9d commit 13d1225
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 3 deletions.
11 changes: 8 additions & 3 deletions signalfx/signalflow/computation.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,19 @@ def stream(self):
self._state = Computation.STATE_COMPLETED
continue

# Intercept metadata messages to accumulate received metadata.
# TODO(mpetazzoni): this can accumulate metadata without bounds if
# a computation has a high rate of member churn.
# Intercept metadata messages to accumulate received metadata...
if isinstance(message, messages.MetadataMessage):
self._metadata[message.tsid] = message.properties
yield message
continue

# ...as well as expired-tsid messages to clean it up.
if isinstance(message, messages.ExpiredTsIdMessage):
if message.tsid in self._metadata:
del self._metadata[message.tsid]
yield message
continue

if isinstance(message, messages.InfoMessage):
self._process_info_message(message.message)
self._batch_count_detected = True
Expand Down
21 changes: 21 additions & 0 deletions signalfx/signalflow/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ def decode(mtype, payload):
return EventMessage.decode(payload)
if mtype == 'metadata':
return MetadataMessage.decode(payload)
if mtype == 'expired-tsid':
return ExpiredTsIdMessage.decode(payload)
if mtype == 'data':
return DataMessage.decode(payload)
if mtype == 'error':
Expand Down Expand Up @@ -217,6 +219,25 @@ def decode(payload):
return MetadataMessage(payload['tsId'], payload['properties'])


class ExpiredTsIdMessage(StreamMessage):
"""Message informing us that an output timeseries is no longer part of the
computation and that we may do some cleanup of whatever internal state we
have tied to that output timeseries."""

def __init__(self, tsid):
self._tsid = tsid

@property
def tsid(self):
"""The identifier of the timeseries that's no longer interesting to the
computation."""
return self._tsid

@staticmethod
def decode(payload):
return ExpiredTsIdMessage(payload['tsId'])


class DataMessage(StreamMessage):
"""Message containing a batch of datapoints generated for a particular
iteration."""
Expand Down

0 comments on commit 13d1225

Please sign in to comment.