Skip to content

Commit 3abb4f3

Browse files
authored
support ingestion timestamp returned by cassandra (#294)
* support ingestion timestamp returned by cassandra * revert it test
1 parent bde6bf6 commit 3abb4f3

File tree

8 files changed

+205
-120
lines changed

8 files changed

+205
-120
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
test: test
1+
test: test

src/it/scala/integration/AddAndQueryDataPointsIntegrationSpec.scala

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ class AddAndQueryDataPointsIntegrationSpec extends IntegrationSpec {
4343
"my.new.metric",
4444
Seq(GroupBy.GroupByType("number")),
4545
Seq(TagResult("aoeu", Seq("snth"))),
46-
Seq((instant, KNumber(555)))
46+
Seq((instant, KNumber(555), None))
4747
)
4848
)
4949
)
@@ -85,9 +85,9 @@ class AddAndQueryDataPointsIntegrationSpec extends IntegrationSpec {
8585
Seq(GroupBy.GroupByType("number")),
8686
List(TagResult("aoeu", List("123", "456")), TagResult("snth", List("321"))),
8787
Seq(
88-
instant.plusMillis(1) -> KNumber(111),
89-
instant.plusMillis(2) -> KNumber(222),
90-
instant.plusMillis(3) -> KNumber(333)
88+
(instant.plusMillis(1), KNumber(111), None),
89+
(instant.plusMillis(2), KNumber(222), None),
90+
(instant.plusMillis(3), KNumber(333), None)
9191
)
9292
)
9393
)
@@ -149,9 +149,9 @@ class AddAndQueryDataPointsIntegrationSpec extends IntegrationSpec {
149149
Seq(GroupBy.GroupByType("number")),
150150
Seq(TagResult("aoeu", Seq("snth"))),
151151
Seq(
152-
(Instant.parse("1970-01-02T00:00:00Z"), KNumber(555)),
153-
(Instant.parse("1970-01-03T00:00:00Z"), KNull),
154-
(Instant.parse("1970-01-04T00:00:00Z"), KNumber(555))
152+
(Instant.parse("1970-01-02T00:00:00Z"), KNumber(555), None),
153+
(Instant.parse("1970-01-03T00:00:00Z"), KNull, None),
154+
(Instant.parse("1970-01-04T00:00:00Z"), KNumber(555), None)
155155
)
156156
)
157157
)
@@ -171,10 +171,10 @@ class AddAndQueryDataPointsIntegrationSpec extends IntegrationSpec {
171171
Seq(GroupBy.GroupByType("number")),
172172
Seq(TagResult("aoeu", Seq("snth"))),
173173
Seq(
174-
(Instant.parse("1970-01-01T00:00:00Z"), KNull),
175-
(Instant.parse("1970-01-02T00:00:00Z"), KNumber(555)),
176-
(Instant.parse("1970-01-03T00:00:00Z"), KNull),
177-
(Instant.parse("1970-01-04T00:00:00Z"), KNumber(555))
174+
(Instant.parse("1970-01-01T00:00:00Z"), KNull, None),
175+
(Instant.parse("1970-01-02T00:00:00Z"), KNumber(555), None),
176+
(Instant.parse("1970-01-03T00:00:00Z"), KNull, None),
177+
(Instant.parse("1970-01-04T00:00:00Z"), KNumber(555), None)
178178
)
179179
)
180180
)
@@ -194,11 +194,11 @@ class AddAndQueryDataPointsIntegrationSpec extends IntegrationSpec {
194194
Seq(GroupBy.GroupByType("number")),
195195
Seq(TagResult("aoeu", Seq("snth"))),
196196
Seq(
197-
(Instant.parse("1970-01-01T00:00:00Z"), KNull),
198-
(Instant.parse("1970-01-02T00:00:00Z"), KNumber(555)),
199-
(Instant.parse("1970-01-03T00:00:00Z"), KNull),
200-
(Instant.parse("1970-01-04T00:00:00Z"), KNumber(555)),
201-
(Instant.parse("1970-01-05T00:00:00Z"), KNull)
197+
(Instant.parse("1970-01-01T00:00:00Z"), KNull, None),
198+
(Instant.parse("1970-01-02T00:00:00Z"), KNumber(555), None),
199+
(Instant.parse("1970-01-03T00:00:00Z"), KNull, None),
200+
(Instant.parse("1970-01-04T00:00:00Z"), KNumber(555), None),
201+
(Instant.parse("1970-01-05T00:00:00Z"), KNull, None)
202202
)
203203
)
204204
)

src/it/scala/integration/DeleteDataPointsByTagIntegrationSpec.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,9 @@ class DeleteDataPointsByTagIntegrationSpec extends IntegrationSpec {
6161
TagResult("snth", Seq("321"))
6262
), // not sure if the order is deteministic. Convert to Set?
6363
Seq(
64-
instant.plusMillis(2) -> KNumber(222),
65-
instant.plusMillis(4) -> KNumber(444),
66-
instant.plusMillis(5) -> KNumber(123)
64+
(instant.plusMillis(2), KNumber(222), None),
65+
(instant.plusMillis(4), KNumber(444), None),
66+
(instant.plusMillis(5), KNumber(123), None)
6767
)
6868
)
6969
)

src/it/scala/integration/StringDataPointsIntegrationSpec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ class StringDataPointsIntegrationSpec extends IntegrationSpec {
4646
"my.new.metric",
4747
Seq(GroupBy.GroupByType("text")),
4848
Seq(TagResult("aoeu", Seq("snth"))),
49-
Seq((instant, KString("my test string")))
49+
Seq((instant, KString("my test string"), None))
5050
)
5151
)
5252
)
@@ -83,7 +83,7 @@ class StringDataPointsIntegrationSpec extends IntegrationSpec {
8383
"my.new.metric",
8484
Seq(GroupBy.GroupByType("text")),
8585
Seq(TagResult("aoeu", Seq("snth"))),
86-
Seq((instant, KString("12345")))
86+
Seq((instant, KString("12345"), None))
8787
)
8888
)
8989
)

src/main/scala/io/waylay/kairosdb/driver/models/Models.scala

Lines changed: 107 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -8,21 +8,21 @@ import io.waylay.kairosdb.driver.models.TimeRange.KairosTimeUnit
88
import java.time.Instant
99

1010
/**
11-
* Metric names are case sensitive and can only contain the following characters: alphanumeric characters, period ”.”,
12-
* slash “/”, dash “-”, and underscore “_” (not enforced)
13-
*/
11+
* Metric names are case sensitive and can only contain the following characters: alphanumeric characters, period ”.”,
12+
* slash “/”, dash “-”, and underscore “_” (not enforced)
13+
*/
1414
case class MetricName(name: String) extends AnyVal
1515

1616
object TimeRange {
1717
sealed trait KairosTimeUnit
1818
case object MILLISECONDS extends KairosTimeUnit
19-
case object SECONDS extends KairosTimeUnit
20-
case object MINUTES extends KairosTimeUnit
21-
case object HOURS extends KairosTimeUnit
22-
case object DAYS extends KairosTimeUnit
23-
case object WEEKS extends KairosTimeUnit
24-
case object MONTHS extends KairosTimeUnit
25-
case object YEARS extends KairosTimeUnit
19+
case object SECONDS extends KairosTimeUnit
20+
case object MINUTES extends KairosTimeUnit
21+
case object HOURS extends KairosTimeUnit
22+
case object DAYS extends KairosTimeUnit
23+
case object WEEKS extends KairosTimeUnit
24+
case object MONTHS extends KairosTimeUnit
25+
case object YEARS extends KairosTimeUnit
2626
}
2727
case class TimeRange(amount: Long, unit: KairosTimeUnit)
2828

@@ -33,34 +33,46 @@ sealed trait DataPoint {
3333
}
3434

3535
object DataPoint {
36-
def apply(metricName: MetricName, value: KairosCompatibleType, timestamp: Instant = Instant.now, tags: Seq[Tag],
37-
ttl: Option[TimeRange] = None) = DataPointWithSingleValue(metricName, value, timestamp, tags, ttl)
36+
def apply(
37+
metricName: MetricName,
38+
value: KairosCompatibleType,
39+
timestamp: Instant = Instant.now,
40+
tags: Seq[Tag],
41+
ttl: Option[TimeRange] = None
42+
) = DataPointWithSingleValue(metricName, value, timestamp, tags, ttl)
3843
}
3944

4045
/**
41-
* A data point has with metric name, a single value, a timestamp, and a list of one or more tags
42-
*
43-
* @param ttl Sets the Cassandra ttl for the data points. None or Some(0.seconds) will not set a TTL
44-
*
45-
*/
46-
case class DataPointWithSingleValue(metricName: MetricName, value: KairosCompatibleType, timestamp: Instant,
47-
tags: Seq[Tag], ttl: Option[TimeRange] = None) extends DataPoint
46+
* A data point has with metric name, a single value, a timestamp, and a list of one or more tags
47+
*
48+
* @param ttl Sets the Cassandra ttl for the data points. None or Some(0.seconds) will not set a TTL
49+
*/
50+
case class DataPointWithSingleValue(
51+
metricName: MetricName,
52+
value: KairosCompatibleType,
53+
timestamp: Instant,
54+
tags: Seq[Tag],
55+
ttl: Option[TimeRange] = None
56+
) extends DataPoint
4857

4958
/**
50-
* A data point has with metric name, a single value, a timestamp, and a list of one or more tags
51-
*
52-
* @param ttl Sets the Cassandra ttl for the data points. None or Some(0.seconds) will not set a TTL
53-
*/
54-
case class DataPointWithMultipleValues(metricName: MetricName, values: Seq[(Instant, KairosCompatibleType)],
55-
tags: Seq[Tag] = Seq.empty, ttl: Option[TimeRange] = None) extends DataPoint
56-
59+
* A data point has with metric name, a single value, a timestamp, and a list of one or more tags
60+
*
61+
* @param ttl Sets the Cassandra ttl for the data points. None or Some(0.seconds) will not set a TTL
62+
*/
63+
case class DataPointWithMultipleValues(
64+
metricName: MetricName,
65+
values: Seq[(Instant, KairosCompatibleType)],
66+
tags: Seq[Tag] = Seq.empty,
67+
ttl: Option[TimeRange] = None
68+
) extends DataPoint
5769

5870
/**
59-
* Tags are named properties that identify the data, such as its type and where it comes from
60-
*
61-
* Tag names and values are case sensitive and can only contain the following characters: alphanumeric characters,
62-
* period ”.”, slash “/”, dash “-”, and underscore “_” (not enforced)
63-
*/
71+
* Tags are named properties that identify the data, such as its type and where it comes from
72+
*
73+
* Tag names and values are case sensitive and can only contain the following characters: alphanumeric characters,
74+
* period ”.”, slash “/”, dash “-”, and underscore “_” (not enforced)
75+
*/
6476
case class Tag(name: String, value: String)
6577

6678
/** Kairos base URL */
@@ -82,9 +94,7 @@ object KairosDBConfig {
8294
val uriWithCredentialsOpt = for {
8395
user <- username
8496
pass <- password
85-
} yield {
86-
baseUri.toAbsoluteUrl.withUser(user).withPassword(pass)
87-
}
97+
} yield baseUri.toAbsoluteUrl.withUser(user).withPassword(pass)
8898

8999
val serverUri = uriWithCredentialsOpt getOrElse baseUri
90100

@@ -95,22 +105,22 @@ object KairosDBConfig {
95105
}
96106

97107
/**
98-
* KairosDB provides REST APIs that show the health of the system.
99-
*
100-
* There are currently two health checks executed for each API.
101-
*
102-
* - The JVM thread deadlock check verifies that no deadlocks exist in the KairosDB JVM.
103-
* - The Datastore query check performs a query on the data store to ensure that the data store is responding.
104-
*
105-
* Example value: HealthStatusResults(Seq("JVM-Thread-Deadlock: OK", "Datastore-Query: OK"))
106-
*/
108+
* KairosDB provides REST APIs that show the health of the system.
109+
*
110+
* There are currently two health checks executed for each API.
111+
*
112+
* - The JVM thread deadlock check verifies that no deadlocks exist in the KairosDB JVM.
113+
* - The Datastore query check performs a query on the data store to ensure that the data store is responding.
114+
*
115+
* Example value: HealthStatusResults(Seq("JVM-Thread-Deadlock: OK", "Datastore-Query: OK"))
116+
*/
107117
case class HealthStatusResults(results: Seq[String])
108118

109119
sealed trait HealthCheckResult
110120

111121
object HealthCheckResult {
112122
case object AllHealthy extends HealthCheckResult
113-
case object Unhealthy extends HealthCheckResult
123+
case object Unhealthy extends HealthCheckResult
114124
}
115125

116126
/** KairosDB only supports numbers and strings. Custom types can be defined */
@@ -124,24 +134,28 @@ object KairosCompatibleType {
124134
}
125135

126136
case class KNumber(value: BigDecimal) extends KairosCompatibleType {
127-
def kairosType:String = {
128-
//TODO check if this is 100% correct
137+
def kairosType: String =
138+
// TODO check if this is 100% correct
129139
if (value.scale > 0) {
130140
"double"
131141
} else {
132142
"long"
133143
}
134-
}
135144
}
136145
case class KString(value: String) extends KairosCompatibleType {
137-
val kairosType:String = "string"
146+
val kairosType: String = "string"
138147
}
139148
}
140149

141150
object QueryResponse {
142151
case class Response(queries: Seq[ResponseQuery])
143152
case class ResponseQuery(sampleSize: Int, results: Seq[Result])
144-
case class Result(name: MetricName, groupBy: Seq[GroupBy], tags: Seq[TagResult], values: Seq[(Instant, KairosCompatibleType)])
153+
case class Result(
154+
name: MetricName,
155+
groupBy: Seq[GroupBy],
156+
tags: Seq[TagResult],
157+
values: Seq[(Instant, KairosCompatibleType, Option[Instant])]
158+
)
145159
case class TagResult(name: String, values: Seq[String]) // not sure if this is the correct meaning
146160
}
147161

@@ -152,40 +166,44 @@ object QueryMetricTagsResponse {
152166
}
153167

154168
object KairosQuery {
169+
155170
/**
156-
* Used to filter a query. With `QueryTag("aoeu", Seq("snth", "htns"))` you would only get results where the value of
157-
* tag `aoeu` is `snth` or `htns`
158-
*/
171+
* Used to filter a query. With `QueryTag("aoeu", Seq("snth", "htns"))` you would only get results where the value of
172+
* tag `aoeu` is `snth` or `htns`
173+
*/
159174
case class QueryTag(name: String, allowedValues: Seq[String])
160175

161176
object QueryTag {
162-
//def apply(name: String, allowedValue: String): QueryTag = QueryTag(name, Seq(allowedValue))
163-
def apply(tuple: (String, String)): QueryTag = QueryTag(tuple._1, Seq(tuple._2))
177+
// def apply(name: String, allowedValue: String): QueryTag = QueryTag(name, Seq(allowedValue))
178+
def apply(tuple: (String, String)): QueryTag = QueryTag(tuple._1, Seq(tuple._2))
164179
def apply(tuple: (String, String)*): Seq[QueryTag] = tuple.map(tup => QueryTag(tup)).to(Seq)
165180
}
166181

167182
sealed trait Order { val value: String }
168183

169184
object Order {
170-
case object Ascending extends Order { override val value = "asc" }
171-
case object Descending extends Order { override val value = "desc"}
185+
case object Ascending extends Order { override val value = "asc" }
186+
case object Descending extends Order { override val value = "desc" }
172187
val defaultOrder = Ascending
173188
}
174189
}
175190

176-
/** @param tags Tags narrow down the search. Only metrics that include the tag and matches one of the values are
177-
* returned. Tags are optional.
178-
* @param groupBys The resulting data points can be grouped by one or more tags, a time range, or by value, or by a
179-
* combination of the three.
180-
* @param aggregators An ordered array of aggregators. They are processed in the order specified. The output of an
181-
* aggregator is passed to the input of the next until all have been processed.
182-
* @param limit Limits the number of data points returned from the data store.
183-
* The limit is applied before any aggregator is executed.
184-
* @param order Orders the returned data points. This sorting is done before any aggregators are executed.
185-
* @param excludeTags By default, the result of the query includes tags and tag values associated with the data points.
186-
* If `excludeTags` is set to true, the tags will be excluded from the response.
187-
* @param plugins optional plugin references to customize the behavior of the query on this metric
188-
*/
191+
/**
192+
* @param tags Tags narrow down the search. Only metrics that include the tag and matches one of the values are
193+
* returned. Tags are optional.
194+
* @param groupBys The resulting data points can be grouped by one or more tags, a time range, or by value, or by a
195+
* combination of the three.
196+
* @param aggregators An ordered array of aggregators. They are processed in the order specified. The output of an
197+
* aggregator is passed to the input of the next until all have been processed.
198+
* @param limit Limits the number of data points returned from the data store.
199+
* The limit is applied before any aggregator is executed.
200+
* @param order Orders the returned data points. This sorting is done before any aggregators are executed.
201+
* @param excludeTags By default, the result of the query includes tags and tag values associated with the data points.
202+
* If `excludeTags` is set to true, the tags will be excluded from the response.
203+
* @param plugins optional plugin references to customize the behavior of the query on this metric
204+
* @param returnIngestionTimestamp If set to true, KairosDB will return ingestion timestamps for the data points.
205+
* This requires KairosDB server support for the feature.
206+
*/
189207
case class Query(
190208
metricName: MetricName,
191209
tags: Seq[QueryTag] = Seq.empty,
@@ -194,29 +212,30 @@ case class Query(
194212
limit: Option[Int] = None,
195213
order: Order = Order.defaultOrder,
196214
excludeTags: Boolean = false,
197-
plugins: Seq[QueryPlugin] = Seq.empty)
198-
199-
/** @param timeZone The time zone for the time range of the query. If not specified, UTC is used. tz format, e.g. "Europe/Brussels"
200-
* @param cacheTime The amount of time in seconds to re use the cache from a previous query. When a query is made,
201-
* Kairos looks for the cache file for the query. If a cache file is found and the timestamp of the
202-
* cache file is within cache_time seconds from the current query, the cache is used.
203-
* Sending a query with a cacheTime set to 0 will always refresh the cache with new data from Cassandra.
204-
* @param plugins optional plugin references to custom the behavior of this query
205-
*/
215+
plugins: Seq[QueryPlugin] = Seq.empty,
216+
returnIngestionTimestamp: Boolean = false
217+
)
218+
219+
/**
220+
* @param timeZone The time zone for the time range of the query. If not specified, UTC is used. tz format, e.g. "Europe/Brussels"
221+
* @param cacheTime The amount of time in seconds to re use the cache from a previous query. When a query is made,
222+
* Kairos looks for the cache file for the query. If a cache file is found and the timestamp of the
223+
* cache file is within cache_time seconds from the current query, the cache is used.
224+
* Sending a query with a cacheTime set to 0 will always refresh the cache with new data from Cassandra.
225+
* @param plugins optional plugin references to custom the behavior of this query
226+
*/
206227
case class QueryMetrics(
207228
metrics: Seq[Query],
208229
timeSpan: TimeSpan,
209230
timeZone: Option[String] = None,
210231
cacheTime: Option[Int] = None,
211-
plugins: Seq[QueryPlugin] = Seq.empty)
212-
232+
plugins: Seq[QueryPlugin] = Seq.empty
233+
)
213234

214235
/**
215-
* Reference to a plugin which can customize the behavior of a query.
216-
*
217-
* @param name published name of the plugin
218-
* @param properties properties for the plugin within the query invocation
219-
*/
220-
case class QueryPlugin(
221-
name: String,
222-
properties: Map[String,Any] = Map.empty)
236+
* Reference to a plugin which can customize the behavior of a query.
237+
*
238+
* @param name published name of the plugin
239+
* @param properties properties for the plugin within the query invocation
240+
*/
241+
case class QueryPlugin(name: String, properties: Map[String, Any] = Map.empty)

0 commit comments

Comments
 (0)