Skip to content

Commit

Permalink
fixed hardcoded manager PID
Browse files Browse the repository at this point in the history
  • Loading branch information
anthdm committed Jan 11, 2024
1 parent 9335f0f commit ce40d62
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 153 deletions.
27 changes: 9 additions & 18 deletions internal/actrs/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,53 +46,44 @@ type Runtime struct {
func NewRuntime(store storage.Store, cache storage.ModCacher) actor.Producer {
return func() actor.Receiver {
return &Runtime{
store: store,
cache: cache,
stdout: &bytes.Buffer{},
managerPID: actor.NewPID("127.0.0.1:6666", "runtime_manager/1"),
store: store,
cache: cache,
stdout: &bytes.Buffer{},
}
}
}

func (r *Runtime) Receive(c *actor.Context) {
switch msg := c.Message().(type) {
case actor.Started:
// eventPID := c.Engine().SpawnFunc(r.handleEvent, "event")
// c.Engine().Subscribe(eventPID)

r.started = time.Now()
r.repeat = c.SendRepeat(c.PID(), shutdown{}, runtimeKeepAlive)
r.managerPID = c.Engine().Registry.GetPID(KindRuntimeManager, "1")
case actor.Stopped:
r.repeat.Stop()
// TODO: send metrics about the runtime to the metric actor.
_ = time.Since(r.started)
c.Send(r.managerPID, &proto.RemoveRuntime{Key: r.deploymentID.String()})
r.runtime.Close()
// Releasing this mod will invalidate the cache for some reason.
// r.mod.Close(context.TODO())
case *proto.HTTPRequest:
slog.Info("runtime actor handling request", "request_id", msg.ID)
slog.Info("runtime handling request", "request_id", msg.ID, "pid", c.PID())
// Refresh the keepAlive timer
r.repeat = c.SendRepeat(c.PID(), shutdown{}, runtimeKeepAlive)
if r.runtime == nil {
r.initialize(msg)
}
// In the ideal world we should ask the cluster for the PID of the manager we
// need to notify we are done invoking. Hollywood does not have that functionality
// yet. To fix this we have the PID of the manager in the request messsage.
r.managerPID = msg.ManagerPID
// Handle the HTTP request that is forwarded from the WASM server actor.
r.handleHTTPRequest(c, msg)
case shutdown:
c.Engine().Poison(c.PID())
}
}

// func (r *Runtime) handleEvent(c *actor.Context) {
// switch msg := c.Message().(type) {
// case *actor.ActorStartedEvent:
// fmt.Println(msg)
// case actor.DeadLetterEvent:
// fmt.Println(reflect.TypeOf(msg.Message))
// }
// }

func (r *Runtime) initialize(msg *proto.HTTPRequest) error {
r.deploymentID = uuid.MustParse(msg.DeploymentID)
// TODO: this could be coming from a Redis cache instead of Postres.
Expand Down
4 changes: 2 additions & 2 deletions internal/actrs/runtime_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ type (
}
)

// RuntimeManager is an actor/receiver that is responsible for managing
// runtimes across the cluster.
type RuntimeManager struct {
runtimes map[string]*actor.PID
cluster *cluster.Cluster
Expand All @@ -37,8 +39,6 @@ func (rm *RuntimeManager) Receive(c *actor.Context) {
rm.runtimes[msg.key] = pid
}
c.Respond(pid)
case *proto.AddRuntime:
rm.runtimes[msg.Key] = msg.PID
case *proto.RemoveRuntime:
delete(rm.runtimes, msg.Key)
case actor.Started:
Expand Down
35 changes: 18 additions & 17 deletions internal/actrs/wasmserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,26 +31,26 @@ func newRequestWithResponse(request *proto.HTTPRequest) requestWithResponse {

// WasmServer is an HTTP server that will proxy and route the request to the corresponding function.
type WasmServer struct {
server *http.Server
self *actor.PID
store storage.Store
metricStore storage.MetricStore
cache storage.ModCacher
cluster *cluster.Cluster
responses map[string]chan *proto.HTTPResponse
runtimeManager *actor.PID
server *http.Server
self *actor.PID
store storage.Store
metricStore storage.MetricStore
cache storage.ModCacher
cluster *cluster.Cluster
responses map[string]chan *proto.HTTPResponse
runtimeManagerPID *actor.PID
}

// NewWasmServer return a new wasm server given a storage and a mod cache.
func NewWasmServer(addr string, cluster *cluster.Cluster, store storage.Store, metricStore storage.MetricStore, cache storage.ModCacher) actor.Producer {
return func() actor.Receiver {
s := &WasmServer{
store: store,
metricStore: metricStore,
cache: cache,
cluster: cluster,
responses: make(map[string]chan *proto.HTTPResponse),
runtimeManager: cluster.Engine().Registry.GetPID(KindRuntimeManager, "1"),
store: store,
metricStore: metricStore,
cache: cache,
cluster: cluster,
responses: make(map[string]chan *proto.HTTPResponse),
runtimeManagerPID: cluster.Engine().Registry.GetPID(KindRuntimeManager, "1"),
}
server := &http.Server{
Handler: s,
Expand All @@ -67,14 +67,15 @@ func (s *WasmServer) Receive(c *actor.Context) {
s.initialize(c)
case actor.Stopped:
case requestWithResponse:
s.responses[msg.request.ID] = msg.response
// TODO: let's say the manager is not able to respond in time for some reason
// I think we might need to spawn a new runtime right here.
pid := s.requestRuntime(c, msg.request.DeploymentID)
if pid == nil {
slog.Error("failed to request a runtime PID")
return
}
s.responses[msg.request.ID] = msg.response
msg.request.ManagerPID = s.runtimeManagerPID
s.cluster.Engine().SendWithSender(pid, msg.request, s.self)
case *proto.HTTPResponse:
if resp, ok := s.responses[msg.RequestID]; ok {
Expand All @@ -93,9 +94,9 @@ func (s *WasmServer) initialize(c *actor.Context) {

// NOTE: There could be a case where we do not get a response in time, hence
// the PID will be nil. This case is handled where we should spawn the runtime
// ourselfs.
// ourselves.
func (s *WasmServer) requestRuntime(c *actor.Context, key string) *actor.PID {
res, err := c.Request(s.runtimeManager, requestRuntime{
res, err := c.Request(s.runtimeManagerPID, requestRuntime{
key: key,
}, time.Millisecond*5).Result()
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ import (
)

const defaultConfig = `
wasmServerAddr = "localhost:5000"
apiServerAddr = "localhost:3000"
wasmServerAddr = "127.0.0.1:5000"
apiServerAddr = "127.0.0.1:3000"
storageDriver = "sqlite"
apiToken = "foobarbaz"
authorization = false
[cluster]
address = "localhost:6666"
address = "127.0.0.1:6666"
id = "wasm_member_1"
region = "eu-west"
Expand Down
Loading

0 comments on commit ce40d62

Please sign in to comment.