diff --git a/signalfx/signalflow/computation.py b/signalfx/signalflow/computation.py index 67d7be2..56c2a3c 100644 --- a/signalfx/signalflow/computation.py +++ b/signalfx/signalflow/computation.py @@ -109,14 +109,19 @@ def stream(self): self._state = Computation.STATE_COMPLETED continue - # Intercept metadata messages to accumulate received metadata. - # TODO(mpetazzoni): this can accumulate metadata without bounds if - # a computation has a high rate of member churn. + # Intercept metadata messages to accumulate received metadata... if isinstance(message, messages.MetadataMessage): self._metadata[message.tsid] = message.properties yield message continue + # ...as well as expired-tsid messages to clean it up. + if isinstance(message, messages.ExpiredTsIdMessage): + if message.tsid in self._metadata: + del self._metadata[message.tsid] + yield message + continue + if isinstance(message, messages.InfoMessage): self._process_info_message(message.message) self._batch_count_detected = True diff --git a/signalfx/signalflow/messages.py b/signalfx/signalflow/messages.py index ac10187..3bb565f 100644 --- a/signalfx/signalflow/messages.py +++ b/signalfx/signalflow/messages.py @@ -19,6 +19,8 @@ def decode(mtype, payload): return EventMessage.decode(payload) if mtype == 'metadata': return MetadataMessage.decode(payload) + if mtype == 'expired-tsid': + return ExpiredTsIdMessage.decode(payload) if mtype == 'data': return DataMessage.decode(payload) if mtype == 'error': @@ -217,6 +219,25 @@ def decode(payload): return MetadataMessage(payload['tsId'], payload['properties']) +class ExpiredTsIdMessage(StreamMessage): + """Message informing us that an output timeseries is no longer part of the + computation and that we may do some cleanup of whatever internal state we + have tied to that output timeseries.""" + + def __init__(self, tsid): + self._tsid = tsid + + @property + def tsid(self): + """The identifier of the timeseries that's no longer interesting to the + computation.""" + return self._tsid + + @staticmethod + def decode(payload): + return ExpiredTsIdMessage(payload['tsId']) + + class DataMessage(StreamMessage): """Message containing a batch of datapoints generated for a particular iteration."""