diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/ExprInterpreter.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/ExprInterpreter.scala
index d87cc391c..7cd27f423 100644
--- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/ExprInterpreter.scala
+++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/ExprInterpreter.scala
@@ -24,6 +24,7 @@ import com.netflix.atlas.core.model.Expr
 import com.netflix.atlas.core.model.FilterExpr
 import com.netflix.atlas.core.model.ModelExtractors
 import com.netflix.atlas.core.model.StatefulExpr
+import com.netflix.atlas.core.model.StyleExpr
 import com.netflix.atlas.core.model.TraceQuery
 import com.netflix.atlas.core.model.TraceVocabulary
 import com.netflix.atlas.core.stacklang.Interpreter
@@ -37,10 +38,12 @@ import com.typesafe.config.Config
 
 import scala.util.Success
 
-private[stream] class ExprInterpreter(config: Config) {
+class ExprInterpreter(config: Config) {
 
   import ExprInterpreter.*
 
+  private val tsInterpreter = Interpreter(new CustomVocabulary(config).allWords)
+
   private val eventInterpreter = Interpreter(
     new CustomVocabulary(config, List(EventVocabulary)).allWords
   )
@@ -60,9 +63,23 @@ private[stream] class ExprInterpreter(config: Config) {
     // time shifts, filters, and integral. The filters and integral are excluded because
     // they can be confusing as the time window for evaluation is not bounded.
     val results = graphCfg.exprs.flatMap(_.perOffset)
-    results.foreach { result =>
+    results.foreach(validate)
+
+    // Perform host rewrites based on the Atlas hostname
+    val host = uri.authority.host.toString()
+    val rewritten = hostRewriter.rewrite(host, results)
+    graphCfg.copy(query = rewritten.mkString(","), parsedQuery = Success(rewritten))
+  }
+
+  /**
+    * Check that data expressions are supported. The streaming path doesn't support
+    * time shifts, filters, and integral. The filters and integral are excluded because
+    * they can be confusing as the time window for evaluation is not bounded.
+    */
+  private def validate(styleExpr: StyleExpr): Unit = {
+    styleExpr.perOffset.foreach { s =>
       // Use rewrite as a helper for searching the expression for invalid operations
-      result.expr.rewrite {
+      s.expr.rewrite {
         case op: StatefulExpr.Integral         => invalidOperator(op); op
         case op: FilterExpr                    => invalidOperator(op); op
         case op: DataExpr if !op.offset.isZero => invalidOperator(op); op
@@ -70,13 +87,8 @@ private[stream] class ExprInterpreter(config: Config) {
 
       // Double check all data expressions do not have an offset. In some cases for named rewrites
       // the check above may not detect the offset.
-      result.expr.dataExprs.filterNot(_.offset.isZero).foreach(invalidOperator)
+      s.expr.dataExprs.filterNot(_.offset.isZero).foreach(invalidOperator)
     }
-
-    // Perform host rewrites based on the Atlas hostname
-    val host = uri.authority.host.toString()
-    val rewritten = hostRewriter.rewrite(host, results)
-    graphCfg.copy(query = rewritten.mkString(","), parsedQuery = Success(rewritten))
   }
 
   private def invalidOperator(expr: Expr): Unit = {
@@ -103,45 +115,80 @@ private[stream] class ExprInterpreter(config: Config) {
   }
 
   private def invalidValue(value: Any): IllegalArgumentException = {
-    new IllegalArgumentException(s"invalid value on stack; $value")
+    new IllegalArgumentException(s"invalid value on stack: $value")
+  }
+
+  private def parseTimeSeries(query: String): List[StyleExpr] = {
+    val exprs = tsInterpreter.execute(query).stack.map {
+      case ModelExtractors.PresentationType(t) => t
+      case value                               => throw invalidValue(value)
+    }
+    exprs.foreach(validate)
+    exprs
+  }
+
+  private def parseEvents(query: String): List[EventExpr] = {
+    eventInterpreter.execute(query).stack.map {
+      case ModelExtractors.EventExprType(t) => t
+      case value                            => throw invalidValue(value)
+    }
   }
 
   private def evalEvents(uri: Uri): List[EventExpr] = {
     uri.query().get("q") match {
       case Some(query) =>
-        eventInterpreter.execute(query).stack.map {
-          case ModelExtractors.EventExprType(t) => t
-          case value                            => throw invalidValue(value)
-        }
+        parseEvents(query)
       case None =>
         throw new IllegalArgumentException(s"missing required parameter: q ($uri)")
     }
   }
 
+  private def parseTraceEvents(query: String): List[TraceQuery.SpanFilter] = {
+    traceInterpreter.execute(query).stack.map {
+      case ModelExtractors.TraceFilterType(t) => t
+      case value                              => throw invalidValue(value)
+    }
+  }
+
   private def evalTraceEvents(uri: Uri): List[TraceQuery.SpanFilter] = {
     uri.query().get("q") match {
       case Some(query) =>
-        traceInterpreter.execute(query).stack.map {
-          case ModelExtractors.TraceFilterType(t) => t
-          case value                              => throw invalidValue(value)
-        }
+        parseTraceEvents(query)
       case None =>
         throw new IllegalArgumentException(s"missing required parameter: q ($uri)")
     }
   }
 
+  private def parseTraceTimeSeries(query: String): List[TraceQuery.SpanTimeSeries] = {
+    val exprs = traceInterpreter.execute(query).stack.map {
+      case ModelExtractors.TraceTimeSeriesType(t) => t
+      case value                                  => throw invalidValue(value)
+    }
+    exprs.foreach(t => validate(t.expr))
+    exprs
+  }
+
   private def evalTraceTimeSeries(uri: Uri): List[TraceQuery.SpanTimeSeries] = {
     uri.query().get("q") match {
       case Some(query) =>
-        traceInterpreter.execute(query).stack.map {
-          case ModelExtractors.TraceTimeSeriesType(t) => t
-          case value                                  => throw invalidValue(value)
-        }
+        parseTraceTimeSeries(query)
       case None =>
         throw new IllegalArgumentException(s"missing required parameter: q ($uri)")
     }
   }
 
+  /** Parse an expression based on the type. */
+  def parseQuery(expr: String, exprType: ExprType): List[Expr] = {
+    val exprs = exprType match {
+      case ExprType.TIME_SERIES       => parseTimeSeries(expr)
+      case ExprType.EVENTS            => parseEvents(expr)
+      case ExprType.TRACE_EVENTS      => parseTraceEvents(expr)
+      case ExprType.TRACE_TIME_SERIES => parseTraceTimeSeries(expr)
+    }
+    exprs.distinct
+  }
+
+  /** Parse an expression that is part of a URI. */
   def parseQuery(uri: Uri): (ExprType, List[Expr]) = {
     val exprType = determineExprType(uri)
     val exprs = exprType match {
diff --git a/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/ExpressionMetadata.scala b/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/ExpressionMetadata.scala
index 6423614d0..cbdaed881 100644
--- a/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/ExpressionMetadata.scala
+++ b/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/ExpressionMetadata.scala
@@ -40,15 +40,16 @@ case class ExpressionMetadata(
 object ExpressionMetadata {
 
   def apply(expression: String, exprType: ExprType, step: Long): ExpressionMetadata = {
-    val f = if (step > 0) step else ApiSettings.defaultStep
-    new ExpressionMetadata(expression, exprType, f, computeId(expression, f))
+    val dfltStep = if (exprType.isTimeSeriesType) ApiSettings.defaultStep else 0L
+    val f = if (step > 0) step else dfltStep
+    new ExpressionMetadata(expression, exprType, f, computeId(expression, exprType, f))
   }
 
   def apply(expression: String): ExpressionMetadata = {
     apply(expression, ExprType.TIME_SERIES, ApiSettings.defaultStep)
   }
 
-  def computeId(e: String, f: Long): String = {
-    Strings.zeroPad(Hash.sha1bytes(s"$f~$e"), 40)
+  def computeId(e: String, t: ExprType, f: Long): String = {
+    Strings.zeroPad(Hash.sha1bytes(s"$f~$t~$e"), 40)
   }
 }
diff --git a/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/ExpressionSplitter.scala b/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/ExpressionSplitter.scala
index 448500915..f797836bb 100644
--- a/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/ExpressionSplitter.scala
+++ b/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/ExpressionSplitter.scala
@@ -17,13 +17,12 @@ package com.netflix.atlas.lwcapi
 
 import java.util.concurrent.TimeUnit
 import com.github.benmanes.caffeine.cache.Caffeine
-import com.netflix.atlas.core.model.CustomVocabulary
-import com.netflix.atlas.core.model.DataExpr
-import com.netflix.atlas.core.model.ModelExtractors
+import com.netflix.atlas.core.model.EventExpr
 import com.netflix.atlas.core.model.Query
 import com.netflix.atlas.core.model.Query.KeyQuery
-import com.netflix.atlas.core.stacklang.Interpreter
+import com.netflix.atlas.core.model.StyleExpr
 import com.netflix.atlas.eval.model.ExprType
+import com.netflix.atlas.eval.stream.ExprInterpreter
 import com.netflix.spectator.ipc.ServerGroup
 import com.typesafe.config.Config
 
@@ -42,7 +41,7 @@ class ExpressionSplitter(config: Config) {
 
   private val keepKeys = Set("nf.app", "nf.cluster", "nf.shard1", "nf.shard2", "nf.stack")
 
-  private val interpreter = Interpreter(new CustomVocabulary(config).allWords)
+  private val interpreter = new ExprInterpreter(config)
 
   /**
     * Processing the expressions can be quite expensive. In particular compiling regular
@@ -115,25 +114,31 @@ class ExpressionSplitter(config: Config) {
     }
   }
 
-  private def parse(expression: String): Try[List[DataExprMeta]] = Try {
-    val context = interpreter.execute(expression)
-    val dataExprs = context.stack.flatMap {
-      case ModelExtractors.PresentationType(t) => t.perOffset.flatMap(_.expr.dataExprs)
-      case _ => throw new IllegalArgumentException("expression is invalid")
-    }
-
-    // Offsets are not supported
-    dataExprs.foreach { dataExpr =>
-      if (!dataExpr.offset.isZero) {
-        throw new IllegalArgumentException(
-          s":offset not supported for streaming evaluation [[$dataExpr]]"
-        )
-      }
-    }
-
-    dataExprs.distinct.map { e =>
-      val q = intern(compress(e.query))
-      DataExprMeta(e, e.toString, q)
+  private def parse(expression: String, exprType: ExprType): Try[List[DataExprMeta]] = Try {
+    val parsedExpressions = interpreter.parseQuery(expression, exprType)
+    exprType match {
+      case ExprType.EVENTS =>
+        parsedExpressions.collect {
+          case e: EventExpr =>
+            val q = intern(compress(e.query))
+            DataExprMeta(e.toString, q)
+        }
+      case ExprType.TIME_SERIES =>
+        parsedExpressions
+          .collect {
+            case se: StyleExpr => se.expr.dataExprs
+          }
+          .flatten
+          .distinct
+          .map { e =>
+            val q = intern(compress(e.query))
+            DataExprMeta(e.toString, q)
+          }
+      case ExprType.TRACE_EVENTS | ExprType.TRACE_TIME_SERIES =>
+        parsedExpressions.map { e =>
+          // Tracing cannot be scoped to specific infrastructure, always use True
+          DataExprMeta(e.toString, Query.True)
+        }
     }
   }
 
@@ -142,11 +147,12 @@ class ExpressionSplitter(config: Config) {
     * contention and most threads are blocked. This just does and get/put which potentially
     * recomputes some values, but for this case that is preferable.
     */
-  private def getFromCache(k: String): Try[List[DataExprMeta]] = {
-    val value = exprCache.getIfPresent(k)
+  private def getFromCache(k: String, exprType: ExprType): Try[List[DataExprMeta]] = {
+    val key = s"$k:$exprType"
+    val value = exprCache.getIfPresent(key)
     if (value == null) {
-      val tmp = parse(k)
-      exprCache.put(k, tmp)
+      val tmp = parse(k, exprType)
+      exprCache.put(key, tmp)
       tmp
     } else {
       value
@@ -154,7 +160,7 @@ class ExpressionSplitter(config: Config) {
   }
 
   def split(expression: String, exprType: ExprType, frequency: Long): List[Subscription] = {
-    getFromCache(expression) match {
+    getFromCache(expression, exprType) match {
       case Success(exprs: List[?]) => exprs.map(e => toSubscription(e, exprType, frequency))
       case Failure(t)              => throw t
     }
@@ -207,5 +213,5 @@ class ExpressionSplitter(config: Config) {
 }
 
 object ExpressionSplitter {
-  private case class DataExprMeta(expr: DataExpr, exprString: String, compressedQuery: Query)
+  private case class DataExprMeta(exprString: String, compressedQuery: Query)
 }
diff --git a/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/SubscribeApi.scala b/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/SubscribeApi.scala
index 2d2f92457..a224c6145 100644
--- a/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/SubscribeApi.scala
+++ b/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/SubscribeApi.scala
@@ -15,6 +15,7 @@
  */
 package com.netflix.atlas.lwcapi
 
+import com.netflix.atlas.eval.model.ExprType
 import org.apache.pekko.NotUsed
 import org.apache.pekko.http.scaladsl.model.ws.BinaryMessage
 import org.apache.pekko.http.scaladsl.model.ws.Message
@@ -31,6 +32,7 @@ import com.netflix.atlas.eval.model.LwcDataExpr
 import com.netflix.atlas.eval.model.LwcHeartbeat
 import com.netflix.atlas.eval.model.LwcMessages
 import com.netflix.atlas.eval.model.LwcSubscription
+import com.netflix.atlas.eval.model.LwcSubscriptionV2
 import com.netflix.atlas.json.JsonSupport
 import com.netflix.atlas.pekko.CustomDirectives.*
 import com.netflix.atlas.pekko.DiagnosticMessage
@@ -163,7 +165,11 @@ class SubscribeApi(
       .flatMapConcat { _ =>
         val steps = sm
           .subscriptionsForStream(streamId)
-          .map(_.metadata.frequency)
+          .map { sub =>
+            // For events where step doesn't really matter use 5s as that is the typical heartbeat
+            // frequency. This only gets used for the time associated with the heartbeat messages.
+            if (sub.metadata.frequency == 0L) 5_000L else sub.metadata.frequency
+          }
           .distinct
           .map { step =>
             // To account for some delays for data coming from real systems, the heartbeat
@@ -198,7 +204,13 @@ class SubscribeApi(
         val subMessages = addedSubs.map { sub =>
           val meta = sub.metadata
           val exprInfo = LwcDataExpr(meta.id, meta.expression, meta.frequency)
-          LwcSubscription(expr.expression, List(exprInfo))
+          if (expr.exprType == ExprType.TIME_SERIES) {
+            // For backwards compatibility for older versions of eval library, use v1
+            // subscription response when it is a time series type
+            LwcSubscription(expr.expression, List(exprInfo))
+          } else {
+            LwcSubscriptionV2(expr.expression, expr.exprType, List(exprInfo))
+          }
         }
         queue.offer(subMessages)
 
diff --git a/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/ExpressionApiSuite.scala b/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/ExpressionApiSuite.scala
index abb3a7971..aa98bf523 100644
--- a/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/ExpressionApiSuite.scala
+++ b/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/ExpressionApiSuite.scala
@@ -172,11 +172,11 @@ class ExpressionApiSuite extends MUnitRouteSuite {
   }
 
   private val skanCount =
-    """{"expression":"nf.cluster,skan,:eq,:count","exprType":"TIME_SERIES","frequency":60000,"id":"6278fa6047c07316d7e265a1004882ab9e1007af"}"""
+    """{"expression":"nf.cluster,skan,:eq,:count","exprType":"TIME_SERIES","frequency":60000,"id":"039722fefa66c2cdd0147595fb9b9a50351f90f0"}"""
 
   private val skanSum =
-    """{"expression":"nf.cluster,skan,:eq,:sum","exprType":"TIME_SERIES","frequency":60000,"id":"36e0a2c61b48e062bba5361d059afd313c82c674"}"""
+    """{"expression":"nf.cluster,skan,:eq,:sum","exprType":"TIME_SERIES","frequency":60000,"id":"17e0dc5b1224c825c81cf033c46d0f0490c1ca7f"}"""
 
   private val brhMax =
-    """{"expression":"nf.app,brh,:eq,:max","exprType":"TIME_SERIES","frequency":60000,"id":"16f1b0930c0eeae0225374ea88c01e161e589aff"}"""
+    """{"expression":"nf.app,brh,:eq,:max","exprType":"TIME_SERIES","frequency":60000,"id":"b19b2aa2c802c29216d4aa36024f71a3c92f84db"}"""
 }
diff --git a/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/ExpressionMetadataSuite.scala b/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/ExpressionMetadataSuite.scala
index 5eda35da7..0a3fed3e5 100644
--- a/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/ExpressionMetadataSuite.scala
+++ b/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/ExpressionMetadataSuite.scala
@@ -48,7 +48,7 @@ class ExpressionMetadataSuite extends FunSuite {
 
   test("computes id") {
     val expr = ExpressionMetadata("test")
-    assertEquals(expr.id, "2684d3c5cb245bd2fd6ee4ea30a500e97ace8141")
+    assertEquals(expr.id, "59ec3895749cc3fb279d41dabc1d4943361de999")
   }
 
   test("id computation considers frequency") {
@@ -114,14 +114,14 @@ class ExpressionMetadataSuite extends FunSuite {
 
   test("renders as json with default frequency") {
     val expected =
-      "{\"expression\":\"this\",\"exprType\":\"TIME_SERIES\",\"frequency\":60000,\"id\":\"fc3a081088771e05bdc3aa99ffd8770157dfe6ce\"}"
+      "{\"expression\":\"this\",\"exprType\":\"TIME_SERIES\",\"frequency\":60000,\"id\":\"8189ff24a1e801c924689aeb0490d5d840f23582\"}"
     val json = ExpressionMetadata("this").toJson
     assertEquals(expected, json)
   }
 
   test("renders as json with frequency of 0") {
     val expected =
-      "{\"expression\":\"this\",\"exprType\":\"TIME_SERIES\",\"frequency\":60000,\"id\":\"fc3a081088771e05bdc3aa99ffd8770157dfe6ce\"}"
+      "{\"expression\":\"this\",\"exprType\":\"TIME_SERIES\",\"frequency\":60000,\"id\":\"8189ff24a1e801c924689aeb0490d5d840f23582\"}"
     val json = ExpressionMetadata("this", ExprType.TIME_SERIES, 0).toJson
     assertEquals(expected, json)
   }
diff --git a/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/ExpressionSplitterSuite.scala b/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/ExpressionSplitterSuite.scala
index 53fa1b6c7..89803badf 100644
--- a/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/ExpressionSplitterSuite.scala
+++ b/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/ExpressionSplitterSuite.scala
@@ -54,7 +54,7 @@ class ExpressionSplitterSuite extends FunSuite {
     val msg = intercept[IllegalArgumentException] {
       splitter.split("foo", ExprType.TIME_SERIES, frequency1)
     }
-    assertEquals(msg.getMessage, "expression is invalid")
+    assertEquals(msg.getMessage, "invalid value on stack: foo")
   }
 
   test("throws IAE for expressions with offset") {
diff --git a/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/SubscribeApiSuite.scala b/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/SubscribeApiSuite.scala
index 30101390c..1c217ed55 100644
--- a/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/SubscribeApiSuite.scala
+++ b/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/SubscribeApiSuite.scala
@@ -15,15 +15,19 @@
  */
 package com.netflix.atlas.lwcapi
 
+import com.fasterxml.jackson.databind.JsonNode
 import org.apache.pekko.http.scaladsl.model.ws.Message
 import org.apache.pekko.http.scaladsl.testkit.RouteTestTimeout
 import org.apache.pekko.http.scaladsl.testkit.WSProbe
 import com.netflix.atlas.eval.model.ExprType
 import com.netflix.atlas.eval.model.LwcDatapoint
+import com.netflix.atlas.eval.model.LwcEvent
 import com.netflix.atlas.eval.model.LwcExpression
 import com.netflix.atlas.eval.model.LwcHeartbeat
 import com.netflix.atlas.eval.model.LwcMessages
 import com.netflix.atlas.eval.model.LwcSubscription
+import com.netflix.atlas.eval.model.LwcSubscriptionV2
+import com.netflix.atlas.json.Json
 import com.netflix.atlas.pekko.DiagnosticMessage
 import com.netflix.atlas.pekko.RequestHandler
 import com.netflix.atlas.pekko.testkit.MUnitRouteSuite
@@ -90,4 +94,40 @@ class SubscribeApiSuite extends MUnitRouteSuite {
       }
     }
   }
+
+  test("subscribe websocket event") {
+    val client = WSProbe()
+    WS("/api/v2/subscribe/222", client.flow) ~> routes ~> check {
+      assert(isWebSocketUpgrade)
+
+      // Send list of expressions to subscribe to
+      val exprs = List(LwcExpression("name,disk,:eq", ExprType.EVENTS, 0L))
+      client.sendMessage(LwcMessages.encodeBatch(exprs))
+
+      // Look for subscription messages, one for sum and one for count
+      var subscriptions = List.empty[LwcSubscriptionV2]
+      while (subscriptions.size < 1) {
+        parseBatch(client.expectMessage()).foreach {
+          case _: DiagnosticMessage   =>
+          case sub: LwcSubscriptionV2 => subscriptions = sub :: subscriptions
+          case h: LwcHeartbeat        => assertEquals(h.step, 5000L)
+          case v                      => throw new MatchError(v)
+        }
+      }
+
+      // Verify subscription is in the manager, push a message to the queue check that it
+      // is received by the client
+      assertEquals(subscriptions.flatMap(_.subExprs).size, 1)
+      subscriptions.flatMap(_.subExprs).foreach { m =>
+        val tags = Map("name" -> "disk")
+        val json = Json.decode[JsonNode](Json.encode(tags))
+        val event = LwcEvent(m.id, json)
+        val handlers = sm.handlersForSubscription(m.id)
+        assertEquals(handlers.size, 1)
+        handlers.head.offer(Seq(event))
+
+        assertEquals(parseBatch(client.expectMessage()), List(event))
+      }
+    }
+  }
 }