diff --git a/admin/frontend/src/lib/task-schema.ts b/admin/frontend/src/lib/task-schema.ts index bf8ee4c..d2ccc6f 100644 --- a/admin/frontend/src/lib/task-schema.ts +++ b/admin/frontend/src/lib/task-schema.ts @@ -403,6 +403,117 @@ const sinkSchemas: any = { } }, + 'firebase': { + "title": "Firebase Sink", + "type": "object", + "properties": { + "default-request": { + "type": "object", + "title": "Server", + "properties": { + "method": { + "type": "string", + "enum": [ + "POST", "GET", "PUT", "OPTIONS", "DELETE", "HEAD", "PATCH" + ] + }, + "uri": { + "type": "string", + "description": "when event does not include url info, use this default one", + "required": true + }, + "auth-key": { + "type": "string", + "title": "Authorization Key", + "required": true + }, + "protocol": { + "type": "string", + "default": "HTTP/1.1" + }, + "headers": { + "type": "object", + "title": "Headers", + "properties": {} + } + }, + "required": true + }, + "concurrent-requests": { + "type": "integer", + "default": 1 + }, + "request-buffer-size": { + "type": "integer", + "default": 100 + }, + "expected-response": { + "type": "string", + "default": "" + }, + "allow-extra-signals": { + "type": "boolean", + "default": false + }, + "use-retry-sender": { + "type": "boolean", + "default": true + }, + "retry-sender": { + "type": "object", + "properties": { + "min-backoff": { + "type": "string", + "default": "1 s" + }, + "max-backoff": { + "type": "string", + "default": "30 s" + }, + "random-factor": { + "type": "number", + "default": 0.2 + }, + "retry-duration": { + "type": "string", + "default": "12 h" + } + } + }, + "pool": { + "type": "object", + "title": "Http Connection Pool", + "properties": { + "max-connections": { + "type": "integer", + "default": 32 + }, + "min-connections": { + "type": "integer", + "default": 0 + }, + "max-retries": { + "type": "integer", + "default": 1 + }, + "max-open-requests": { + "type": "integer", + "default": 256 + }, + "pipelining-limit": { + "type": "integer", + "default": 1 + }, + "idle-timeout": { + "type": "string", + "default": "30 s", + "description": "possible units: (s)econd, (m)inute, (h)ours" + } + } + } + } + }, + 'kafka': { "title": "Kafka Sink", "type": "object", @@ -461,6 +572,7 @@ const sinkSchemas: any = { } } + export default { 'source': sourceSchemas, 'operator': operatorSchemas, diff --git a/core/src/main/resources/application.conf b/core/src/main/resources/application.conf index 944add4..4757568 100644 --- a/core/src/main/resources/application.conf +++ b/core/src/main/resources/application.conf @@ -38,6 +38,7 @@ app { ], sink = [ "com.thenetcircle.event_bus.story.tasks.http.HttpSinkBuilder", + "com.thenetcircle.event_bus.story.tasks.http.FirebaseSinkBuilder", "com.thenetcircle.event_bus.story.tasks.kafka.KafkaSinkBuilder" ] } diff --git a/core/src/main/scala/com/thenetcircle/event_bus/event/extractor/ActivityStreamsEventExtractor.scala b/core/src/main/scala/com/thenetcircle/event_bus/event/extractor/ActivityStreamsEventExtractor.scala index 570f85c..fc169c8 100644 --- a/core/src/main/scala/com/thenetcircle/event_bus/event/extractor/ActivityStreamsEventExtractor.scala +++ b/core/src/main/scala/com/thenetcircle/event_bus/event/extractor/ActivityStreamsEventExtractor.scala @@ -37,11 +37,11 @@ case class Activity( published: Option[String], verb: Option[String], actor: Option[GeneralObject], - `object`: Option[GeneralObject], target: Option[GeneralObject], provider: Option[GeneralObject], // content: Option[Any], - generator: Option[GeneratorObject] + `object`: Option[RichObject], + generator: Option[RichObject] ) sealed trait ActivityObject { @@ -61,16 +61,16 @@ case class GeneralObject( // author: Option[ActivityObject] ) extends ActivityObject -case class GeneratorObject( +case class RichObject( id: Option[String], objectType: Option[String], content: Option[String] ) extends ActivityObject trait ActivityStreamsProtocol extends DefaultJsonProtocol { - implicit val generalObjectFormat = jsonFormat2(GeneralObject) - implicit val generatorObjectFormat = jsonFormat3(GeneratorObject) - implicit val activityFormat = jsonFormat9(Activity) + implicit val generalObjectFormat = jsonFormat2(GeneralObject) + implicit val richObjectFormat = jsonFormat3(RichObject) + implicit val activityFormat = jsonFormat9(Activity) } class ActivityStreamsEventExtractor extends EventExtractor with ActivityStreamsProtocol { @@ -97,10 +97,14 @@ class ActivityStreamsEventExtractor extends EventExtractor with ActivityStreamsP objOption.foreach(o => { o.id.foreach(s => extra = extra + (s"${prefix}Id" -> s)) o.objectType.foreach(s => extra = extra + (s"${prefix}Type" -> s)) + + if (o.isInstanceOf[RichObject]) { + o.asInstanceOf[RichObject].content.foreach(s => extra = extra + (s"${prefix}Content" -> s)) + } }) } - // TODO performance test for parse how many fields + // TODO performance for json parsing activity.verb.foreach(s => extra = extra + ("verb" -> s)) setExtraFromActivityObject(activity.provider, "provider") setExtraFromActivityObject(activity.actor, "actor") diff --git a/core/src/main/scala/com/thenetcircle/event_bus/story/tasks/http/FirebaseSink.scala b/core/src/main/scala/com/thenetcircle/event_bus/story/tasks/http/FirebaseSink.scala new file mode 100644 index 0000000..6b02d24 --- /dev/null +++ b/core/src/main/scala/com/thenetcircle/event_bus/story/tasks/http/FirebaseSink.scala @@ -0,0 +1,104 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Contributors: + * Beineng Ma + */ + +package com.thenetcircle.event_bus.story.tasks.http + +import akka.http.scaladsl.model.HttpHeader.ParsingResult.Ok +import akka.http.scaladsl.model.{ContentTypes, HttpEntity, HttpHeader, HttpRequest} +import com.thenetcircle.event_bus.event.Event +import com.thenetcircle.event_bus.story.interfaces.ITaskBuilder +import com.thenetcircle.event_bus.{AppContext, BuildInfo} +import com.typesafe.config.{Config, ConfigFactory} + +case class FirebaseSinkSettings( + httpSinkSettings: HttpSinkSettings +) + +class FirebaseSink(val fbSinkSettings: FirebaseSinkSettings) extends HttpSink(fbSinkSettings.httpSinkSettings) { + + override def createHttpRequest(event: Event): HttpRequest = { + if (!event.hasExtra("objectContent")) { + throw new IllegalStateException(s"""A event sending to Firebase has not "objectContent", ${event.summary}""") + } + + val requestContentType = ContentTypes.`application/json` + val requestData = event.getExtra("objectContent").get + + taskLogger.debug(s"created request data: $requestData") + + settings.defaultRequest.withEntity(HttpEntity(requestContentType, requestData)) + } + +} + +class FirebaseSinkBuilder() extends HttpSinkBuilder with ITaskBuilder[FirebaseSink] { + + override val taskType: String = "firebase" + + override def defaultConfig: Config = + ConfigFactory + .parseString( + s"""{ + | # this could be overwritten by event info later + | default-request { + | # uri = "https://fcm.googleapis.com/fcm/send" + | # auth-key = "" + | method = POST + | protocol = "HTTP/1.1" + | headers { + | "user-agent": "event-bus/${BuildInfo.version}" + | } + | } + | + | # set this to be empty, so only check response code + | expected-response = "" + | allow-extra-signals = false + | + | pool { + | max-connections = 4 + | min-connections = 0 + | max-open-requests = 32 + | pipelining-limit = 1 + | idle-timeout = 30 s + | } + |}""".stripMargin + ) + .withFallback(super[HttpSinkBuilder].defaultConfig) + + override protected def buildHttpRequestFromConfig(config: Config)(implicit appContext: AppContext): HttpRequest = { + val request = super.buildHttpRequestFromConfig(config) + val authHeader = HttpHeader.parse("Authorization", s"key=${config.getString("auth-key")}") + authHeader match { + case Ok(header, errors) => request.addHeader(header) + case _ => throw new IllegalArgumentException("Parsing Authorization header failed.") + } + } + + override def buildTask( + config: Config + )(implicit appContext: AppContext): FirebaseSink = + try { + require(config.hasPath("default-request.auth-key"), "Authorization Key is required.") + + val fbSinkSettings = FirebaseSinkSettings(httpSinkSettings = createHttpSinkSettings(config)) + new FirebaseSink(fbSinkSettings) + } catch { + case ex: Throwable => + logger.error(s"Build FirebaseSink failed with error: ${ex.getMessage}") + throw ex + } +} diff --git a/core/src/main/scala/com/thenetcircle/event_bus/story/tasks/http/HttpSink.scala b/core/src/main/scala/com/thenetcircle/event_bus/story/tasks/http/HttpSink.scala index e569875..bc85479 100644 --- a/core/src/main/scala/com/thenetcircle/event_bus/story/tasks/http/HttpSink.scala +++ b/core/src/main/scala/com/thenetcircle/event_bus/story/tasks/http/HttpSink.scala @@ -29,7 +29,7 @@ import akka.pattern.{ask, AskTimeoutException} import akka.stream._ import akka.stream.scaladsl.{Flow, Keep, Sink, Source, SourceQueueWithComplete} import akka.util.Timeout -import com.thenetcircle.event_bus.event.EventStatus.{NORMAL, STAGING} +import com.thenetcircle.event_bus.event.EventStatus.{FAILED, NORMAL, STAGING} import com.thenetcircle.event_bus.event.{Event, EventStatus} import com.thenetcircle.event_bus.misc.Logging import com.thenetcircle.event_bus.story.interfaces.{ISink, ITaskBuilder, ITaskLogging} @@ -83,20 +83,29 @@ class HttpSink(val settings: HttpSinkSettings) extends ISink with ITaskLogging { def doNormalSend(payload: Payload)(implicit runningContext: TaskRunningContext): Future[Payload] = { implicit val executionContext: ExecutionContext = runningContext.getExecutionContext() - val event = payload._2 - send(createHttpRequest(event)) - .flatMap(checkResponse) - .map { - case CheckResponseResult.Passed => (NORMAL, event) - case result => - ( - STAGING( - Some(new UnexpectedResponseException(s"Check response failed with result $result")), - getTaskName() - ), - event - ) - } + + val event = payload._2 + try { + send(createHttpRequest(event)) + .flatMap(checkResponse) + .map { + case CheckResponseResult.Passed => (NORMAL, event) + case result => + ( + STAGING( + Some(new UnexpectedResponseException(s"Check response failed with result $result")), + getTaskName() + ), + event + ) + } + } catch { + case NonFatal(ex) => + taskLogger.warn( + s"A event was sent to HTTP endpoint failed by by doNormalSend, ${event.summary}, With error $ex" + ) + Future.successful((FAILED(ex, getTaskName()), event)) + } } def retrySendingFlow()( @@ -119,29 +128,35 @@ class HttpSink(val settings: HttpSinkSettings) extends ISink with ITaskLogging { implicit val askTimeout: Timeout = Timeout(retryDuration) implicit val executionContext: ExecutionContext = runningContext.getExecutionContext() - val event = payload._2 - val request = createHttpRequest(event) - val endPoint = request.getUri().toString - - (retrySender.get ? RetrySender.Commands.Req(request, retryDuration.fromNow)) - .mapTo[Try[HttpResponse]] - .map[(EventStatus, Event)] { - case Success(resp) => - taskLogger.info(s"A event was successfully sent to HTTP endpoint [$endPoint], ${event.summary}") - (NORMAL, event) - case Failure(ex) => - taskLogger.warn( - s"A event was sent to HTTP endpoint [$endPoint] failed, ${event.summary}, With error $ex" - ) - (STAGING(Some(ex), getTaskName()), event) - } - .recover { - case ex: AskTimeoutException => - taskLogger.warn( - s"A event was sent to HTTP endpoint [$endPoint] timeout, exceed [$retryDuration], ${event.summary}" - ) - (STAGING(Some(ex), getTaskName()), event) - } + val event = payload._2 + try { + val request = createHttpRequest(event) + val endPoint = request.getUri().toString + + (retrySender.get ? RetrySender.Commands.Req(request, retryDuration.fromNow)) + .mapTo[Try[HttpResponse]] + .map[(EventStatus, Event)] { + case Success(resp) => + taskLogger.info(s"A event was successfully sent to HTTP endpoint [$endPoint], ${event.summary}") + (NORMAL, event) + case Failure(ex) => + taskLogger.warn( + s"A event was sent to HTTP endpoint [$endPoint] failed, ${event.summary}, With error $ex" + ) + (STAGING(Some(ex), getTaskName()), event) + } + .recover { + case ex: AskTimeoutException => + taskLogger.warn( + s"A event was sent to HTTP endpoint [$endPoint] timeout, exceed [$retryDuration], ${event.summary}" + ) + (STAGING(Some(ex), getTaskName()), event) + } + } catch { + case NonFatal(ex) => + taskLogger.warn(s"A event was sent to HTTP endpoint failed by doRetrySend, ${event.summary}, With error $ex") + Future.successful((FAILED(ex, getTaskName()), event)) + } } override def sinkFlow()( @@ -406,10 +421,10 @@ class HttpSinkBuilder() extends ITaskBuilder[HttpSink] with Logging { override val taskType: String = "http" - override val defaultConfig: Config = + override def defaultConfig: Config = ConfigFactory.parseString( s"""{ - | # the default request could be overrided by info of the event + | # this could be overwritten by event info later | default-request { | # uri = "http://www.google.com" | method = POST @@ -476,34 +491,36 @@ class HttpSinkBuilder() extends ITaskBuilder[HttpSink] with Logging { |}""".stripMargin) } + protected def createHttpSinkSettings(config: Config)(implicit appContext: AppContext): HttpSinkSettings = { + val defaultRequest: HttpRequest = buildHttpRequestFromConfig(config.getConfig("default-request")) + val connectionPoolSettings = + config.as[Option[Map[String, String]]]("pool").filter(_.nonEmpty).map(buildConnectionPoolSettings) + + val retrySenderConfig = config.getConfig("retry-sender") + val retrySenderSettings = RetrySenderSettings( + retrySenderConfig.as[FiniteDuration]("min-backoff"), + retrySenderConfig.as[FiniteDuration]("max-backoff"), + retrySenderConfig.as[Double]("random-factor"), + retrySenderConfig.as[FiniteDuration]("retry-duration") + ) + + HttpSinkSettings( + defaultRequest, + config.as[Boolean]("use-retry-sender"), + retrySenderSettings, + connectionPoolSettings, + config.as[Int]("concurrent-requests"), + config.as[Int]("request-buffer-size"), + config.as[Option[String]]("expected-response").filter(_.trim != ""), + config.as[Boolean]("allow-extra-signals") + ) + } + override def buildTask( config: Config )(implicit appContext: AppContext): HttpSink = try { - val defaultRequest: HttpRequest = buildHttpRequestFromConfig(config.getConfig("default-request")) - val connectionPoolSettings = - config.as[Option[Map[String, String]]]("pool").filter(_.nonEmpty).map(buildConnectionPoolSettings) - - val retrySenderConfig = config.getConfig("retry-sender") - val retrySenderSettings = RetrySenderSettings( - retrySenderConfig.as[FiniteDuration]("min-backoff"), - retrySenderConfig.as[FiniteDuration]("max-backoff"), - retrySenderConfig.as[Double]("random-factor"), - retrySenderConfig.as[FiniteDuration]("retry-duration") - ) - - new HttpSink( - HttpSinkSettings( - defaultRequest, - config.as[Boolean]("use-retry-sender"), - retrySenderSettings, - connectionPoolSettings, - config.as[Int]("concurrent-requests"), - config.as[Int]("request-buffer-size"), - config.as[Option[String]]("expected-response").filter(_.trim != ""), - config.as[Boolean]("allow-extra-signals") - ) - ) + new HttpSink(createHttpSinkSettings(config)) } catch { case ex: Throwable => logger.error(s"Build HttpSink failed with error: ${ex.getMessage}") diff --git a/core/src/test/scala/com/thenetcircle/event_bus/story/builder/FirebaseSinkBuilderTest.scala b/core/src/test/scala/com/thenetcircle/event_bus/story/builder/FirebaseSinkBuilderTest.scala new file mode 100644 index 0000000..9b3d1c7 --- /dev/null +++ b/core/src/test/scala/com/thenetcircle/event_bus/story/builder/FirebaseSinkBuilderTest.scala @@ -0,0 +1,128 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Contributors: + * Beineng Ma + */ + +package com.thenetcircle.event_bus.story.builder + +import akka.http.scaladsl.model._ +import com.thenetcircle.event_bus.TestBase +import com.thenetcircle.event_bus.story.tasks.http.FirebaseSinkBuilder + +import scala.concurrent.duration._ + +class FirebaseSinkBuilderTest extends TestBase { + + behavior of "FirebaseSinkBuilder" + + val builder = new FirebaseSinkBuilder + + it should "build correct FirebaseSink with minimum config" in { + + val sink = storyBuilder.buildTaskWithBuilder("""{ + | "default-request" : { + | "uri": "https://fcm.googleapis.com/fcm/send", + | "auth-key": "AAABBB" + | } + |}""".stripMargin)(builder) + + val settings = sink.settings + + settings.defaultRequest.method shouldEqual HttpMethods.POST + settings.defaultRequest.uri shouldEqual Uri("https://fcm.googleapis.com/fcm/send") + settings.defaultRequest.protocol shouldEqual HttpProtocols.`HTTP/1.1` + + print(settings.defaultRequest.headers) + + settings.concurrentRequests shouldEqual 1 + settings.requestBufferSize shouldEqual 100 + settings.expectedResponse shouldEqual None + settings.allowExtraSignals shouldEqual false + + settings.useRetrySender shouldEqual true + settings.retrySenderSettings.minBackoff shouldEqual 1.second + settings.retrySenderSettings.maxBackoff shouldEqual 30.seconds + settings.retrySenderSettings.randomFactor shouldEqual 0.2 + settings.retrySenderSettings.retryDuration shouldEqual 12.hours + + settings.connectionPoolSettings.get.maxConnections shouldEqual 4 + settings.connectionPoolSettings.get.minConnections shouldEqual 0 + settings.connectionPoolSettings.get.maxOpenRequests shouldEqual 32 + settings.connectionPoolSettings.get.maxRetries shouldEqual 5 + settings.connectionPoolSettings.get.idleTimeout shouldEqual 30.seconds + settings.connectionPoolSettings.get.pipeliningLimit shouldEqual 1 + } + + it should "build correct sink with the full config" in { + + val sink = storyBuilder.buildTaskWithBuilder( + """{ + | "default-request" : { + | "method": "PUT", + | "protocol": "http/1.0", + | "uri": "https://fcm.googleapis.com/fcm/send", + | "auth-key": "AAABBB", + | "headers": { + | "X-Forwarded-For": "127.0.0.1", + | "user-agent": "custom-agent", + | "referer": "http://www.google.com" + | } + | }, + | "concurrent-requests": 100, + | "request-buffer-size": 1000, + | "expected-response": "ok", + | "allow-extra-signals": true, + | "use-retry-sender": false, + | "retry-sender": { + | "min-backoff": "3 s", + | "max-backoff": "1 m", + | "random-factor": 0.8, + | "retry-duration": "10 m" + | }, + | "pool": { + | "max-retries": 11, + | "max-open-requests": 64, + | "idle-timeout": "1 min" + | } + |}""".stripMargin + )(builder) + + val settings = sink.settings + + settings.defaultRequest.method shouldEqual HttpMethods.PUT + settings.defaultRequest.uri shouldEqual Uri("https://fcm.googleapis.com/fcm/send") + settings.defaultRequest.protocol shouldEqual HttpProtocols.`HTTP/1.0` + settings.defaultRequest.headers.size shouldEqual 4 + + settings.concurrentRequests shouldEqual 100 + settings.requestBufferSize shouldEqual 1000 + settings.allowExtraSignals shouldEqual true + settings.expectedResponse shouldEqual Some("ok") + + settings.useRetrySender shouldEqual false + settings.retrySenderSettings.minBackoff shouldEqual 3.seconds + settings.retrySenderSettings.maxBackoff shouldEqual 1.minute + settings.retrySenderSettings.randomFactor shouldEqual 0.8 + settings.retrySenderSettings.retryDuration shouldEqual 10.minutes + + settings.connectionPoolSettings.get.maxRetries shouldEqual 11 + settings.connectionPoolSettings.get.maxOpenRequests shouldEqual 64 + settings.connectionPoolSettings.get.idleTimeout shouldEqual 1.minute + settings.connectionPoolSettings.get.pipeliningLimit shouldEqual 1 + settings.connectionPoolSettings.get.maxConnections shouldEqual 4 + settings.connectionPoolSettings.get.minConnections shouldEqual 0 + } + +}