Skip to content

Commit

Permalink
Merge pull request #38 from keenlabs/MID-408-add-in-memory-queue
Browse files Browse the repository at this point in the history
Add in-memory queueing
  • Loading branch information
terrhorn committed Feb 27, 2016
2 parents 88fd338 + 0479ce5 commit a114868
Show file tree
Hide file tree
Showing 19 changed files with 1,070 additions and 47 deletions.
6 changes: 6 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
language: scala

scala:
- 2.11.2
- 2.10.4

jdk:
- oraclejdk7
- openjdk7

addons:
# Fix OpenJDK builds
# https://github.com/travis-ci/travis-ci/issues/5227
hostname: short-hostname
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## v0.6.0

* Added in-memory queueing - #38 (terryhorner)

## v0.5.0

* Configuration with Typesafe Config - #25 (ches)
Expand Down
3 changes: 3 additions & 0 deletions CONTRIBUTORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
* Created the `AccessLevel` traits abstraction
* Added Typesafe Config support for settings
* Added TravisCI & integration tests in SBT
* [Terry Horner](https://github.com/terryhorner)
* Maintainer
* Added in-memory queueing


[Dispatch]: http://dispatch.databinder.net/
Expand Down
129 changes: 98 additions & 31 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

[![Build Status]](https://travis-ci.org/keenlabs/KeenClient-Scala)

---

The official asynchronous Scala client for the [Keen IO] API.

**Note**: This library is in early development and does not implement all of the
Expand All @@ -12,52 +14,33 @@ new release version.

Additional API features will be added over time. Contributions are welcome!

## Use It - A Quick Taste

```scala
import io.keen.client.scala.{ Client, Writer }

// Assumes you've configured a write key as explained in Configuration below
val keen = new Client with Writer

// Publish an event!
keen.addEvent(
collection = "collectionNameHere",
event = """{"foo": "bar"}"""
)
---

// Publish lots of events!
keen.addEvents(someEvents)
* [Get It](#get-it)
* [Configuration](#configuration)
* [Settings](#settings)
* [Use It](#use-it---a-quick-taste)
* [Queueing](#queueing)
* [Dependencies](#dependencies)
* [Using the Dispatch adapter](#using-the-dispatch-adapter)
* [JSON](#json)
* [Hack On It](#hack-on-it)

// Responses are Futures - handle errors!
val resp = keen.addEvent(
collection = "collectionNameHere",
event = """{"foo": "bar"}"""
)

resp onComplete {
case Success(r) => println(resp.statusCode)
case Failure(t) => println(t.getMessage) // A Throwable
}

// Or using map
resp map { println("I succeeded!") } getOrElse { println("I failed :(") }
```

## Get It

Artifacts for keen-client-scala are [hosted on Maven Central](http://search.maven.org/#search%7Cga%7C1%7Ckeenclient-scala).
You can use them in your project with SBT thusly:

```scala
libraryDependencies += "io.keen" %% "keenclient-scala" % "0.5.0"
libraryDependencies += "io.keen" %% "keenclient-scala" % "0.6.0"
```

Note that we publish artifacts for Scala 2.10 and 2.11, so you can either use `%%` to automatically pick the correct
version or specify them explicitly with something like:

```scala
libraryDependencies += "io.keen" % "keenclient-scala_2.10" % "0.5.0"
libraryDependencies += "io.keen" % "keenclient-scala_2.10" % "0.6.0"
```

## Configuration
Expand Down Expand Up @@ -100,6 +83,90 @@ development setup (install it as a [global plugin], and `chmod 600` your `.env`
files that contain credentials!). In production, a [good service manager][runit]
can set env vars for app processes with ease. On Heroku you'll be right at home.

### Settings

* `keen.project-id`: Your project ID.
* `keen.optional.read-key`: Your project read key.
* `keen.optional.write-key`: Your project write key.
* `keen.optional.master-key`: Your project master key.
* `keen.optional.queue.batch.size`: Number of events to include in each batch sent by `sendQueuedEvents()`. Default is `500`.
* `keen.optional.queue.batch.timeout`: Seconds each batch sent by `sendQueuedEvents()` should wait before the request times out. Default is `5`.
* `keen.optional.queue.max-events-per-collection`: Maximum number of events to store for each collection. Old events are purged from the queue to make room for new events when the size of the queue exceeds this number. Default is `10000`.
* `keen.optional.queue.send-interval.events`: Automatically send all queued events every time the queue reaches this number. Minimum is `100`, maximum is `10000`, and default is `0`.
* `keen.optional.queue.send-interval.seconds`: Automatically send all queued events at a specified interval. Minimum is `60`, maximum is `3600`, and default is `0`.
* `keen.optional.queue.shutdown-delay`: Seconds to wait before client stops attempting to send events scheduled to be sent at a specific interval. Default is `30`.

## Use It - A Quick Taste

```scala
import io.keen.client.scala.{ Client, Writer }

// Assumes you've configured a write key as explained in Configuration below
val keen = new Client with Writer

// Publish an event!
keen.addEvent(
collection = "collectionNameHere",
event = """{"foo": "bar"}"""
)

// Publish lots of events!
keen.addEvents(someEvents)

// Responses are Futures - handle errors!
val resp = keen.addEvent(
collection = "collectionNameHere",
event = """{"foo": "bar"}"""
)

resp onComplete {
case Success(r) => println(resp.statusCode)
case Failure(t) => println(t.getMessage) // A Throwable
}

// Or using map
resp map { println("I succeeded!") } getOrElse { println("I failed :(") }
```

### Queueing

Though you can certainly implement your own queueing and batching via `addEvents`, the client also includes automated queueing and batching for your more simplified implementation pleasures.

```scala
// Queue an event
keen.queueEvent("collectionNameHere", """{"foo": "bar"}""")
```

Queuing is handled via an in-memory queue that lives as long as the `Client` does. The behavior of the queue and automated sending of events is configurable in `conf/application.conf` as outlined below.

**Sending queued events manually**

```scala
keen.sendQueuedEvents()
```

**Sending queued events every time the queue reaches 100 events**

Set `keen.optional.queue.send-interval.events` equal to `100` in `conf/application.conf`.

**Sending queued events every 5 minutes**

Set `keen.optional.queue.send-interval.seconds` equal to `300` in `conf/application.conf`.

Note that `send-interval.events` takes precedence when both `send-interval.events` and `send-interval.seconds` contain values greater than zero.

**Using batch sizes**

Setting a specific batch size will help optimize your experience when sending events. It's recommended that you set `keen.optional.queue.batch.size` to something that makes sense for your implementation (default is `500`). Note that a batch size of `5000` is the upper bound of what you should shoot for. Anything higher and your request has a good chance of being rejected due to payload size limitations.

**Failed events**

Events that fail to be sent for whatever reason (payload rejection, network issue, etc.) are **not** removed from the queue. They will remain in the queue until they are either manually removed or the client shuts down.

**Shutdown**

`Client` will attempt one last time to send all queued events when `shutdown()` is called just in case you forget to send the events yourself. Make sure you call `shutdown()` before you exit otherwise all events that remain in the queue upon termination will be lost.

## Dependencies

The client's default HTTP adapter is built on the [spray HTTP toolkit][spray],
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ description := "Keen IO SDK/client library for Scala"

homepage := Some(url("https://github.com/keenlabs/KeenClient-Scala"))

version := "0.5.0"
version := "0.6.0"

scalaVersion := "2.11.2"

Expand Down
24 changes: 16 additions & 8 deletions src/it/scala/ClientIntegrationSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class ClientIntegrationSpec extends Specification with NoTimeConversions {
// feature scalable time dilation for testing on CI servers that might be
// slow--see IntegrationPatience in ScalaTest, not sure if specs2 has similar...
val timeout = 4.seconds
val timeframe = Some("this_week") // all queries require timeframes

sequential

Expand Down Expand Up @@ -89,7 +90,8 @@ class ClientIntegrationSpec extends Specification with NoTimeConversions {

"count" in {
val res = Await.result(client.count(
collection = "foo"
collection = "foo",
timeframe = timeframe
), timeout)
res.statusCode must beEqualTo(200)
}
Expand All @@ -98,55 +100,61 @@ class ClientIntegrationSpec extends Specification with NoTimeConversions {
val res = Await.result(client.count(
collection = "foo",
filters = Some("""[{"property_name": "baz","operator":"eq","property_value":"gorch"}]"""),
timeframe = Some("this_week")
timeframe = timeframe
), timeout)
res.statusCode must beEqualTo(200)
}

"count unique" in {
val res = Await.result(client.countUnique(
collection = "foo",
targetProperty = "gorch"
targetProperty = "gorch",
timeframe = timeframe
), timeout)
res.statusCode must beEqualTo(200)
}

"minimum" in {
val res = Await.result(client.minimum(
collection = "foo",
targetProperty = "gorch"
targetProperty = "gorch",
timeframe = timeframe
), timeout)
res.statusCode must beEqualTo(200)
}

"maximum" in {
val res = Await.result(client.maximum(
collection = "foo",
targetProperty = "gorch"
targetProperty = "gorch",
timeframe = timeframe
), timeout)
res.statusCode must beEqualTo(200)
}

"average" in {
val res = Await.result(client.average(
collection = "foo",
targetProperty = "gorch"
targetProperty = "gorch",
timeframe = timeframe
), timeout)
res.statusCode must beEqualTo(200)
}

"sum" in {
val res = Await.result(client.sum(
collection = "foo",
targetProperty = "gorch"
targetProperty = "gorch",
timeframe = timeframe
), timeout)
res.statusCode must beEqualTo(200)
}

"select unique" in {
val res = Await.result(client.selectUnique(
collection = "foo",
targetProperty = "gorch"
targetProperty = "gorch",
timeframe = timeframe
), timeout)
res.statusCode must beEqualTo(200)
}
Expand Down
14 changes: 14 additions & 0 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,20 @@ keen {
master-key = ${?KEEN_MASTER_KEY}
read-key = ${?KEEN_READ_KEY}
write-key = ${?KEEN_WRITE_KEY}

# configuration for local event store queue
queue {
batch {
size = ${?KEEN_EVENT_QUEUE_BATCH_SIZE}
timeout = ${?KEEN_EVENT_QUEUE_BATCH_TIMEOUT}
}
max-events-per-collection = ${?KEEN_EVENT_QUEUE_MAX_EVENTS_PER_COLLECTION}
send-interval {
events = ${?KEEN_EVENT_QUEUE_SEND_INTERVAL_EVENTS}
seconds = ${?KEEN_EVENT_QUEUE_SEND_INTERVAL_SECONDS}
}
shutdown-delay = ${?KEEN_EVENT_QUEUE_SHUTDOWN_DELAY}
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package io.keen.client.scala

trait AttemptCountingEventStore extends EventStore {

def getAttempts(projectId: String, eventCollection: String): String

def setAttempts(projectId: String, eventCollection: String, attemptsString: String): Unit

}
Loading

0 comments on commit a114868

Please sign in to comment.