forked from evolution-gaming/akka-effect
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathActorOf.scala
76 lines (62 loc) · 2.1 KB
/
ActorOf.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package com.evolutiongaming.akkaeffect
import akka.actor.{Actor, ActorRef, ReceiveTimeout}
import cats.effect.*
import cats.effect.implicits.effectResourceOps
import cats.syntax.all.*
import com.evolutiongaming.akkaeffect.ActorVar.Directive
import com.evolutiongaming.akkaeffect.Fail.implicits.*
import com.evolutiongaming.catshelper.CatsHelper.*
import com.evolutiongaming.catshelper.ToFuture
/** Creates instance of [[akka.actor.Actor]] out of [[ReceiveOf]]
*/
object ActorOf {
type Stop = Boolean
def apply[F[_]: Async: ToFuture](
receiveOf: ReceiveOf[F, Envelope[Any], Stop],
): Actor = {
type State = Receive[F, Envelope[Any], Stop]
def onPreStart(actorCtx: ActorCtx[F])(implicit fail: Fail[F]) =
receiveOf(actorCtx)
.handleErrorWith { (error: Throwable) =>
s"failed to allocate receive".fail[F, State](error).toResource
}
def onReceive(a: Any, sender: ActorRef)(implicit fail: Fail[F]) = { (state: State) =>
val stop = a match {
case ReceiveTimeout => state.timeout
case a => state(Envelope(a, sender))
}
stop
.map {
case false => Directive.ignore[Releasable[F, State]]
case true => Directive.stop[Releasable[F, State]]
}
.handleErrorWith { error =>
s"failed on $a from $sender".fail[F, Directive[Releasable[F, State]]](error)
}
}
new Actor {
implicit private val fail: Fail[F] = Fail.fromActorRef[F](self)
private val act = Act.Adapter(self)
private val actorVar = ActorVar[F, State](act.value, context)
override def preStart(): Unit = {
super.preStart()
act.sync {
val actorCtx = ActorCtx[F](act.value, context)
actorVar.preStart {
onPreStart(actorCtx)
}
}
}
def receive: Receive = act.receive {
case a => actorVar.receive(onReceive(a, sender = sender()))
}
override def postStop(): Unit = {
act.value.postStop()
act.sync {
actorVar.postStop().toFuture
}
super.postStop()
}
}
}
}