Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
anthdm committed Jan 10, 2024
1 parent a03b366 commit 9335f0f
Show file tree
Hide file tree
Showing 10 changed files with 294 additions and 78 deletions.
8 changes: 7 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,21 +1,27 @@
.PHONY: proto

PROTO_PATH := "../../go/pkg/mod/github.com/anthdm/[email protected]/actor"

build:
@go build -o bin/api cmd/api/main.go
@go build -o bin/wasmserver cmd/wasmserver/main.go
@go build -o bin/raptor cmd/cli/main.go
@go build -o bin/runtime cmd/runtime/main.go

wasmserver: build
@./bin/wasmserver

runtime: build
@./bin/runtime

api: build
@./bin/api --seed

test:
@go test ./internal/* -v

proto:
protoc --go_out=. --go_opt=paths=source_relative --proto_path=. proto/types.proto
protoc --go_out=. --go_opt=paths=source_relative --proto_path=$(PROTO_PATH) --proto_path=. proto/types.proto

clean:
@rm -rf bin/api
Expand Down
52 changes: 52 additions & 0 deletions cmd/runtime/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package main

import (
"log"
"os"
"os/signal"
"syscall"

"github.com/anthdm/hollywood/cluster"
"github.com/anthdm/raptor/internal/actrs"
"github.com/anthdm/raptor/internal/config"
"github.com/anthdm/raptor/internal/storage"
)

func main() {
if err := config.Parse("config.toml"); err != nil {
log.Fatal(err)
}
var (
user = config.Get().Storage.User
pw = config.Get().Storage.Password
dbname = config.Get().Storage.Name
host = config.Get().Storage.Host
port = config.Get().Storage.Port
sslmode = config.Get().Storage.SSLMode
)
store, err := storage.NewSQLStore(user, pw, dbname, host, port, sslmode)
if err != nil {
log.Fatal(err)
}
var (
modCache = storage.NewDefaultModCache()
// metricStore = store
)
clusterConfig := cluster.NewConfig().
WithListenAddr("127.0.0.1:6000").
WithRegion("eu-west").
WithID("member_2")
c, err := cluster.New(clusterConfig)
if err != nil {
log.Fatal(err)
}
c.RegisterKind(actrs.KindRuntime, actrs.NewRuntime(store, modCache), &cluster.KindConfig{})
// c.Engine().Spawn(actrs.NewMetric, actrs.KindMetric, actor.WithID("1"))
// c.Engine().Spawn(actrs.NewRuntimeManager(c), actrs.KindRuntimeManager, actor.WithID("1"))
// c.Engine().Spawn(actrs.NewRuntimeLog, actrs.KindRuntimeLog, actor.WithID("1"))
c.Start()

sigch := make(chan os.Signal, 1)
signal.Notify(sigch, syscall.SIGINT, syscall.SIGTERM)
<-sigch
}
2 changes: 1 addition & 1 deletion cmd/wasmserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func main() {
}
c.RegisterKind(actrs.KindRuntime, actrs.NewRuntime(store, modCache), &cluster.KindConfig{})
c.Engine().Spawn(actrs.NewMetric, actrs.KindMetric, actor.WithID("1"))
c.Engine().Spawn(actrs.NewRuntimeManager(c), actrs.KindRuntimeManager, actor.WithID("1"))
c.Spawn(actrs.NewRuntimeManager(c), actrs.KindRuntimeManager, actor.WithID("1"))
c.Engine().Spawn(actrs.NewRuntimeLog, actrs.KindRuntimeLog, actor.WithID("1"))
c.Start()

Expand Down
11 changes: 5 additions & 6 deletions examples/go/main.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package main

import (
"fmt"
"net/http"

raptor "github.com/anthdm/raptor/sdk"
"github.com/go-chi/chi"
)

func handleLogin(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("hello from the login handler"))
w.Write([]byte("hello from the login handler YADA"))
}

func handleDashboard(w http.ResponseWriter, r *http.Request) {
Expand All @@ -18,9 +18,8 @@ func handleDashboard(w http.ResponseWriter, r *http.Request) {
}

func main() {
// router := chi.NewMux()
// router.Get("/dashboard", handleDashboard)
// router.Get("/login", handleLogin)
fmt.Println("user log")
router := chi.NewMux()
router.Get("/dashboard", handleDashboard)
router.Get("/login", handleLogin)
raptor.Handle(http.HandlerFunc(handleLogin))
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ require (
)

require (
github.com/go-chi/chi v1.5.5
github.com/pelletier/go-toml/v2 v2.1.1
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.14.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QH
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-chi/chi v1.5.5 h1:vOB/HbEMt9QqBqErz07QehcOKHaWFtuj87tTDVz2qXE=
github.com/go-chi/chi v1.5.5/go.mod h1:C9JqLr3tIYjDOZpzn+BCuxY8z8vmca43EeMgyZt7irw=
github.com/go-chi/chi/v5 v5.0.11 h1:BnpYbFZ3T3S1WMpD79r7R5ThWX40TaFB7L31Y8xqSwA=
github.com/go-chi/chi/v5 v5.0.11/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
Expand Down
22 changes: 18 additions & 4 deletions internal/actrs/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,27 +46,32 @@ type Runtime struct {
func NewRuntime(store storage.Store, cache storage.ModCacher) actor.Producer {
return func() actor.Receiver {
return &Runtime{
store: store,
cache: cache,
stdout: &bytes.Buffer{},
store: store,
cache: cache,
stdout: &bytes.Buffer{},
managerPID: actor.NewPID("127.0.0.1:6666", "runtime_manager/1"),
}
}
}

func (r *Runtime) Receive(c *actor.Context) {
switch msg := c.Message().(type) {
case actor.Started:
// eventPID := c.Engine().SpawnFunc(r.handleEvent, "event")
// c.Engine().Subscribe(eventPID)

r.started = time.Now()
r.repeat = c.SendRepeat(c.PID(), shutdown{}, runtimeKeepAlive)
r.managerPID = c.Engine().Registry.GetPID(KindRuntimeManager, "1")
case actor.Stopped:
// TODO: send metrics about the runtime to the metric actor.
_ = time.Since(r.started)
c.Send(r.managerPID, removeRuntime{key: r.deploymentID.String()})
c.Send(r.managerPID, &proto.RemoveRuntime{Key: r.deploymentID.String()})
r.runtime.Close()
// Releasing this mod will invalidate the cache for some reason.
// r.mod.Close(context.TODO())
case *proto.HTTPRequest:
slog.Info("runtime actor handling request", "request_id", msg.ID)
// Refresh the keepAlive timer
r.repeat = c.SendRepeat(c.PID(), shutdown{}, runtimeKeepAlive)
if r.runtime == nil {
Expand All @@ -79,6 +84,15 @@ func (r *Runtime) Receive(c *actor.Context) {
}
}

// func (r *Runtime) handleEvent(c *actor.Context) {
// switch msg := c.Message().(type) {
// case *actor.ActorStartedEvent:
// fmt.Println(msg)
// case actor.DeadLetterEvent:
// fmt.Println(reflect.TypeOf(msg.Message))
// }
// }

func (r *Runtime) initialize(msg *proto.HTTPRequest) error {
r.deploymentID = uuid.MustParse(msg.DeploymentID)
// TODO: this could be coming from a Redis cache instead of Postres.
Expand Down
16 changes: 5 additions & 11 deletions internal/actrs/runtime_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package actrs
import (
"github.com/anthdm/hollywood/actor"
"github.com/anthdm/hollywood/cluster"
"github.com/anthdm/raptor/proto"
)

const KindRuntimeManager = "runtime_manager"
Expand All @@ -11,13 +12,6 @@ type (
requestRuntime struct {
key string
}
addRuntime struct {
key string
pid *actor.PID
}
removeRuntime struct {
key string
}
)

type RuntimeManager struct {
Expand All @@ -43,10 +37,10 @@ func (rm *RuntimeManager) Receive(c *actor.Context) {
rm.runtimes[msg.key] = pid
}
c.Respond(pid)
case addRuntime:
rm.runtimes[msg.key] = msg.pid
case removeRuntime:
delete(rm.runtimes, msg.key)
case *proto.AddRuntime:
rm.runtimes[msg.Key] = msg.PID
case *proto.RemoveRuntime:
delete(rm.runtimes, msg.Key)
case actor.Started:
case actor.Stopped:
case actor.Initialized:
Expand Down
Loading

0 comments on commit 9335f0f

Please sign in to comment.