This repository has been archived by the owner on Nov 19, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 5
/
Service.fs
135 lines (116 loc) · 6.22 KB
/
Service.fs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
module OMS.Infrastructure.Service
open System
open FSharp.Control
open System.Threading.Tasks
let private sw = System.Diagnostics.Stopwatch.StartNew()
/// Jet's microservice pattern is based on the concept of stream processors.
/// A processor receives an incoming message, decodes it, runs some business logic on it, and generates a side-effect.
/// The microservice implements its own specific (decode, handle, and interpret) functions. Those functions are piped
/// into the OMS Service functions in the following order.
///
/// let handler = Service.processInput log "MicroServiceName" microserviceHandle microServiceInterpret
///
/// incomingStreamDefinitionUri
/// |> Service.parse lookup
/// |> Service.consume
/// |> Service.decode microserviceDecoder
/// |> Service.handle log handler
/// |> Service.start
/// parses a URI into a Stream Definition, using a lookup function to replace the host with a connection string
let parse (lookup:string -> string) (uriString:string) : StreamDefinition =
let uri = Uri(uriString)
match uri.Scheme with
| "eventstore" ->
{ Repository = ""; Stream = ""; BatchSize = 0; Delay = 0}
|> StreamDefinition.EventStore
| "sbqueue" ->
{ Connection = ""; Name = ""; Delay = 0.0; ReadFrom = ""; WriteTo = "" }
|> StreamDefinition.ServiceBusQueue
| "sbtopic" ->
{ Connection = ""; Name = ""; Delay = 0.0; ReadFrom = ""; WriteTo = "" }
|> StreamDefinition.ServiceBusQueue
| "kafka" ->
{ BrokerList = ""; Topic = ""; ConsumerGroup = ""; BatchSize = 0; Delay = 0; CommitDelay = 0 }
|> StreamDefinition.Kafka
| "cosmosdb" ->
{ EndpointURI = ""; Key = ""; DatabaseId = ""; CollectionId = ""; DocumentId = ""; PartitionKey = ""; PartitionId = "" }
|> StreamDefinition.CosmosDB
| _ ->
failwith ("Unsupported scheme " + uri.Scheme)
/// Consume a stream of incoming messages from a data store
let consume (sd:StreamDefinition) : AsyncSeq<Incoming> =
/// read a continuous stream from a service, each stream type will have its own unique implementation
let consumeFromService () =
asyncSeq {
// read a stream from your source, and return an Incoming type
// semantics for CloseAction and CancelAction are defined here
let cancel = fun () -> async.Return(())
let close = fun () -> async.Return(())
// results are returned as an AsyncSeq
yield! [] |> AsyncSeq.ofSeq
}
// define the consuming details for each service type
match sd with
| StreamDefinition.EventStore md -> consumeFromService ()
| StreamDefinition.ServiceBusQueue md -> consumeFromService ()
| StreamDefinition.ServiceBusTopic (topic, md) -> consumeFromService ()
| StreamDefinition.Kafka md -> consumeFromService ()
| StreamDefinition.CosmosDB md -> consumeFromService ()
/// Decodes incoming messages into domain-specific types used by the microservice
let decode<'a> (decoder:DomainEvent -> 'a option) (incoming:AsyncSeq<Incoming>) : AsyncSeq<Messages<'a option> * CloseAction * CancelAction> =
// decode the incoming messages into a type that can be used by the microservice
// any message that can be ignored should be decoded as Option.None
incoming
|> AsyncSeq.map (function
| Incoming.Message(de, close, cancel) ->
let input = [| decoder de |] |> Messages.OfBatch
input, close, cancel
| Incoming.Batch(des, close, cancel) -> des |> Array.map decoder |> Messages.OfBatch, close, cancel
| Incoming.Bunch(des, close, cancel) -> des |> Array.map decoder |> Messages.OfBunch, close, cancel)
/// Orchestrates the handle -> interpret business logic for the microservice
let processInput (log:Logger) label (handle:'a -> Async<'b>) (interpret:'b -> Async<unit>) (input:'a) =
async {
try
let handleStart = sw.ElapsedMilliseconds
let! output = handle input |> Metrics.recordLatency "Microservice" label "handle"
do! interpret output |> Metrics.recordLatency "Microservice" label "interpret"
do! (sw.ElapsedMilliseconds - handleStart) |> Metrics.recordLatencyValue "Microservice" label "total"
with ex ->
log.Error "Exception processing message=%A. Exception=%A" (input, ex)
do! Metrics.recordCounter "Microservice" label "error"
raise ex
}
/// Handles the semantics for processing of incoming messages
let handle (log:Logger) (handler:'a ->Async<unit>) (incoming:AsyncSeq<Messages<'a option> * CloseAction * CancelAction>) =
let handleEach state (each:Messages<'a option> * CloseAction * CancelAction) = async {
let inputs, close, cancel = each
// Since the decoder will return Option.None for every message that it doesn't need to process
// it is necessary to parse
let useSerialBatchCompletion, messages =
match inputs with
| Messages.OfBatch(m) -> true, m |> Array.choose id
| Messages.OfBunch(m) -> false, m |> Array.choose id
match messages.Length with
| x when x > 0 ->
// processing for a single message, usually the Service.processInput function
let handleSingleMessage message = message |> (handler >> Async.Catch) |> Async.StartAsTask
// The processing semantics for Batches and Bunches will depend on how in-order and out-of-order
// message processing is implemented. In the OMS, this was managed by using Tasks and Parallel
// processing.
// At the end of a Batch or Bunch process, make sure to call the Close Action for all handled messages
// Depending on whether a batch or bunch failed, it might be necessary to Cancel all messages
// or just a subset of messages.
do! close ()
| _ ->
// If there is nothing to process, then Cancel and move on
do! cancel ()
return state
}
incoming
|> AsyncSeq.scanAsync handleEach [||]
/// Starts processing the consumers. Note that handling is done on a separate tasks.
/// When this method returns, it does not mean that all tasks completed.
let start f =
f
|> AsyncSeq.iter (ignore)
|> Async.RunSynchronously