Skip to content

Commit

Permalink
add FirebaseSink
Browse files Browse the repository at this point in the history
  • Loading branch information
bennfocus committed Feb 28, 2019
1 parent c6a407a commit 17a7896
Show file tree
Hide file tree
Showing 6 changed files with 437 additions and 71 deletions.
112 changes: 112 additions & 0 deletions admin/frontend/src/lib/task-schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,117 @@ const sinkSchemas: any = {
}
},

'firebase': {
"title": "Firebase Sink",
"type": "object",
"properties": {
"default-request": {
"type": "object",
"title": "Server",
"properties": {
"method": {
"type": "string",
"enum": [
"POST", "GET", "PUT", "OPTIONS", "DELETE", "HEAD", "PATCH"
]
},
"uri": {
"type": "string",
"description": "when event does not include url info, use this default one",
"required": true
},
"auth-key": {
"type": "string",
"title": "Authorization Key",
"required": true
},
"protocol": {
"type": "string",
"default": "HTTP/1.1"
},
"headers": {
"type": "object",
"title": "Headers",
"properties": {}
}
},
"required": true
},
"concurrent-requests": {
"type": "integer",
"default": 1
},
"request-buffer-size": {
"type": "integer",
"default": 100
},
"expected-response": {
"type": "string",
"default": ""
},
"allow-extra-signals": {
"type": "boolean",
"default": false
},
"use-retry-sender": {
"type": "boolean",
"default": true
},
"retry-sender": {
"type": "object",
"properties": {
"min-backoff": {
"type": "string",
"default": "1 s"
},
"max-backoff": {
"type": "string",
"default": "30 s"
},
"random-factor": {
"type": "number",
"default": 0.2
},
"retry-duration": {
"type": "string",
"default": "12 h"
}
}
},
"pool": {
"type": "object",
"title": "Http Connection Pool",
"properties": {
"max-connections": {
"type": "integer",
"default": 32
},
"min-connections": {
"type": "integer",
"default": 0
},
"max-retries": {
"type": "integer",
"default": 1
},
"max-open-requests": {
"type": "integer",
"default": 256
},
"pipelining-limit": {
"type": "integer",
"default": 1
},
"idle-timeout": {
"type": "string",
"default": "30 s",
"description": "possible units: (s)econd, (m)inute, (h)ours"
}
}
}
}
},

'kafka': {
"title": "Kafka Sink",
"type": "object",
Expand Down Expand Up @@ -461,6 +572,7 @@ const sinkSchemas: any = {
}
}


export default {
'source': sourceSchemas,
'operator': operatorSchemas,
Expand Down
1 change: 1 addition & 0 deletions core/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ app {
],
sink = [
"com.thenetcircle.event_bus.story.tasks.http.HttpSinkBuilder",
"com.thenetcircle.event_bus.story.tasks.http.FirebaseSinkBuilder",
"com.thenetcircle.event_bus.story.tasks.kafka.KafkaSinkBuilder"
]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ case class Activity(
published: Option[String],
verb: Option[String],
actor: Option[GeneralObject],
`object`: Option[GeneralObject],
target: Option[GeneralObject],
provider: Option[GeneralObject],
// content: Option[Any],
generator: Option[GeneratorObject]
`object`: Option[RichObject],
generator: Option[RichObject]
)

sealed trait ActivityObject {
Expand All @@ -61,16 +61,16 @@ case class GeneralObject(
// author: Option[ActivityObject]
) extends ActivityObject

case class GeneratorObject(
case class RichObject(
id: Option[String],
objectType: Option[String],
content: Option[String]
) extends ActivityObject

trait ActivityStreamsProtocol extends DefaultJsonProtocol {
implicit val generalObjectFormat = jsonFormat2(GeneralObject)
implicit val generatorObjectFormat = jsonFormat3(GeneratorObject)
implicit val activityFormat = jsonFormat9(Activity)
implicit val generalObjectFormat = jsonFormat2(GeneralObject)
implicit val richObjectFormat = jsonFormat3(RichObject)
implicit val activityFormat = jsonFormat9(Activity)
}

class ActivityStreamsEventExtractor extends EventExtractor with ActivityStreamsProtocol {
Expand All @@ -97,10 +97,14 @@ class ActivityStreamsEventExtractor extends EventExtractor with ActivityStreamsP
objOption.foreach(o => {
o.id.foreach(s => extra = extra + (s"${prefix}Id" -> s))
o.objectType.foreach(s => extra = extra + (s"${prefix}Type" -> s))

if (o.isInstanceOf[RichObject]) {
o.asInstanceOf[RichObject].content.foreach(s => extra = extra + (s"${prefix}Content" -> s))
}
})
}

// TODO performance test for parse how many fields
// TODO performance for json parsing
activity.verb.foreach(s => extra = extra + ("verb" -> s))
setExtraFromActivityObject(activity.provider, "provider")
setExtraFromActivityObject(activity.actor, "actor")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Contributors:
* Beineng Ma <[email protected]>
*/

package com.thenetcircle.event_bus.story.tasks.http

import akka.http.scaladsl.model.HttpHeader.ParsingResult.Ok
import akka.http.scaladsl.model.{ContentTypes, HttpEntity, HttpHeader, HttpRequest}
import com.thenetcircle.event_bus.event.Event
import com.thenetcircle.event_bus.story.interfaces.ITaskBuilder
import com.thenetcircle.event_bus.{AppContext, BuildInfo}
import com.typesafe.config.{Config, ConfigFactory}

case class FirebaseSinkSettings(
httpSinkSettings: HttpSinkSettings
)

class FirebaseSink(val fbSinkSettings: FirebaseSinkSettings) extends HttpSink(fbSinkSettings.httpSinkSettings) {

override def createHttpRequest(event: Event): HttpRequest = {
if (!event.hasExtra("objectContent")) {
throw new IllegalStateException(s"""A event sending to Firebase has not "objectContent", ${event.summary}""")
}

val requestContentType = ContentTypes.`application/json`
val requestData = event.getExtra("objectContent").get

taskLogger.debug(s"created request data: $requestData")

settings.defaultRequest.withEntity(HttpEntity(requestContentType, requestData))
}

}

class FirebaseSinkBuilder() extends HttpSinkBuilder with ITaskBuilder[FirebaseSink] {

override val taskType: String = "firebase"

override def defaultConfig: Config =
ConfigFactory
.parseString(
s"""{
| # this could be overwritten by event info later
| default-request {
| # uri = "https://fcm.googleapis.com/fcm/send"
| # auth-key = ""
| method = POST
| protocol = "HTTP/1.1"
| headers {
| "user-agent": "event-bus/${BuildInfo.version}"
| }
| }
|
| # set this to be empty, so only check response code
| expected-response = ""
| allow-extra-signals = false
|
| pool {
| max-connections = 4
| min-connections = 0
| max-open-requests = 32
| pipelining-limit = 1
| idle-timeout = 30 s
| }
|}""".stripMargin
)
.withFallback(super[HttpSinkBuilder].defaultConfig)

override protected def buildHttpRequestFromConfig(config: Config)(implicit appContext: AppContext): HttpRequest = {
val request = super.buildHttpRequestFromConfig(config)
val authHeader = HttpHeader.parse("Authorization", s"key=${config.getString("auth-key")}")
authHeader match {
case Ok(header, errors) => request.addHeader(header)
case _ => throw new IllegalArgumentException("Parsing Authorization header failed.")
}
}

override def buildTask(
config: Config
)(implicit appContext: AppContext): FirebaseSink =
try {
require(config.hasPath("default-request.auth-key"), "Authorization Key is required.")

val fbSinkSettings = FirebaseSinkSettings(httpSinkSettings = createHttpSinkSettings(config))
new FirebaseSink(fbSinkSettings)
} catch {
case ex: Throwable =>
logger.error(s"Build FirebaseSink failed with error: ${ex.getMessage}")
throw ex
}
}
Loading

0 comments on commit 17a7896

Please sign in to comment.