Skip to content

Commit

Permalink
http schema for volume (#154)
Browse files Browse the repository at this point in the history
Wrap global volume service with an http route group.
Modified listPaths to get and return file metadata
  • Loading branch information
jsun-m authored Apr 25, 2024
1 parent a1ada69 commit 02eaa79
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 19 deletions.
144 changes: 144 additions & 0 deletions internal/abstractions/volume/http.go
Original file line number Diff line number Diff line change
@@ -1 +1,145 @@
package volume

import (
"net/http"

"github.com/beam-cloud/beta9/internal/auth"
"github.com/beam-cloud/beta9/internal/types"
"github.com/labstack/echo/v4"
)

type volumeGroup struct {
routeGroup *echo.Group
gvs *GlobalVolumeService
}

func registerVolumeRoutes(g *echo.Group, gvs *GlobalVolumeService) *volumeGroup {
group := &volumeGroup{routeGroup: g, gvs: gvs}

g.GET("/:workspaceId", group.ListVolumes)
g.GET("/:workspaceId/", group.ListVolumes)

g.POST("/:workspaceId/:volumeName", group.CreateVolume)
g.POST("/:workspaceId/:volumeName/", group.CreateVolume)

g.GET("/:workspaceId/:volumeName/*", group.Ls)
g.DELETE("/:workspaceId/:volumeName/*", group.Rm)
g.PATCH("/:workspaceId/:volumeName/*", group.Mv)

return group
}

func (g *volumeGroup) ListVolumes(ctx echo.Context) error {
_, err := g.authorize(ctx)
if err != nil {
return err
}

workspaceId := ctx.Param("workspaceId")
workspace, err := g.gvs.backendRepo.GetWorkspaceByExternalId(ctx.Request().Context(), workspaceId)
if err != nil {
return echo.NewHTTPError(http.StatusBadRequest, "Invalid workspace ID")
}

if volumes, err := g.gvs.listVolumes(ctx.Request().Context(), &workspace); err != nil {
return echo.NewHTTPError(http.StatusInternalServerError, "Failed to list volumes")
} else {
return ctx.JSON(http.StatusOK, volumes)
}
}

func (g *volumeGroup) DeleteVolume(ctx echo.Context) error {
return nil
}

func (g *volumeGroup) CreateVolume(ctx echo.Context) error {
_, err := g.authorize(ctx)
if err != nil {
return err
}

workspaceId := ctx.Param("workspaceId")
workspace, err := g.gvs.backendRepo.GetWorkspaceByExternalId(ctx.Request().Context(), workspaceId)
if err != nil {
return echo.NewHTTPError(http.StatusBadRequest, "Invalid workspace ID")
}

volumeName := ctx.Param("volumeName")
if volume, err := g.gvs.getOrCreateVolume(ctx.Request().Context(), &workspace, volumeName); err != nil {
return echo.NewHTTPError(http.StatusInternalServerError, "Failed to create volume")
} else {
return ctx.JSON(http.StatusOK, volume)
}
}

func (g *volumeGroup) UploadFile(ctx echo.Context) error {
return nil
}

func (g *volumeGroup) DownloadFile(ctx echo.Context) error {
return nil
}

func (g *volumeGroup) Ls(ctx echo.Context) error {
_, err := g.authorize(ctx)
if err != nil {
return err
}

workspaceId := ctx.Param("workspaceId")
workspace, err := g.gvs.backendRepo.GetWorkspaceByExternalId(ctx.Request().Context(), workspaceId)
if err != nil {
return echo.NewHTTPError(http.StatusBadRequest, "Invalid workspace ID")
}

volumeName := ctx.Param("volumeName")
path := ctx.Param("*")

if paths, err := g.gvs.listPath(
ctx.Request().Context(),
volumeName+"/"+path,
&workspace,
); err != nil {
return echo.NewHTTPError(http.StatusInternalServerError, "Failed to list path")
} else {
return ctx.JSON(http.StatusOK, paths)
}
}

func (g *volumeGroup) Rm(ctx echo.Context) error {
_, err := g.authorize(ctx)
if err != nil {
return err
}

workspaceId := ctx.Param("workspaceId")
workspace, err := g.gvs.backendRepo.GetWorkspaceByExternalId(ctx.Request().Context(), workspaceId)
if err != nil {
return echo.NewHTTPError(http.StatusBadRequest, "Invalid workspace ID")
}

volumeName := ctx.Param("volumeName")
path := ctx.Param("*")

if _, err := g.gvs.deletePath(
ctx.Request().Context(),
volumeName+"/"+path,
&workspace,
); err != nil {
return echo.NewHTTPError(http.StatusInternalServerError, "Failed to delete path")
} else {
return ctx.JSON(http.StatusOK, nil)
}
}

func (g *volumeGroup) Mv(ctx echo.Context) error {
return nil
}

func (g *volumeGroup) authorize(ctx echo.Context) (*auth.HttpAuthContext, error) {
cc, _ := ctx.(*auth.HttpAuthContext)
if cc.AuthInfo.Token.TokenType != types.TokenTypeClusterAdmin {
return nil, echo.NewHTTPError(http.StatusUnauthorized)
}
return cc, nil
}
43 changes: 36 additions & 7 deletions internal/abstractions/volume/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/beam-cloud/beta9/internal/repository"
"github.com/beam-cloud/beta9/internal/types"
pb "github.com/beam-cloud/beta9/proto"
"github.com/labstack/echo/v4"
"google.golang.org/protobuf/types/known/timestamppb"
)

Expand All @@ -29,10 +30,25 @@ type GlobalVolumeService struct {
backendRepo repository.BackendRepository
}

func NewGlobalVolumeService(backendRepo repository.BackendRepository) (VolumeService, error) {
return &GlobalVolumeService{
type FileInfo struct {
Path string `json:"path"`
Size uint64 `json:"size"`
ModTime int64 `json:"mod_time"`
IsDir bool `json:"is_dir"`
}

var volumeRoutePrefix string = "/volume"

func NewGlobalVolumeService(backendRepo repository.BackendRepository, routeGroup *echo.Group) (VolumeService, error) {
gvs := &GlobalVolumeService{
backendRepo: backendRepo,
}, nil
}

// Register HTTP routes
authMiddleware := auth.AuthMiddleware(backendRepo)
registerVolumeRoutes(routeGroup.Group(volumeRoutePrefix, authMiddleware), gvs)

return gvs, nil
}

func (vs *GlobalVolumeService) GetOrCreateVolume(ctx context.Context, in *pb.GetOrCreateVolumeRequest) (*pb.GetOrCreateVolumeResponse, error) {
Expand Down Expand Up @@ -100,9 +116,15 @@ func (vs *GlobalVolumeService) ListPath(ctx context.Context, in *pb.ListPathRequ
}, nil
}

// Temporarily return the paths as strings
stringPaths := make([]string, len(paths))
for i, p := range paths {
stringPaths[i] = p.Path
}

return &pb.ListPathResponse{
Ok: true,
Paths: paths,
Paths: stringPaths,
}, nil
}

Expand Down Expand Up @@ -288,7 +310,7 @@ func (vs *GlobalVolumeService) deletePath(ctx context.Context, inputPath string,
return deleted, nil
}

func (vs *GlobalVolumeService) listPath(ctx context.Context, inputPath string, workspace *types.Workspace) ([]string, error) {
func (vs *GlobalVolumeService) listPath(ctx context.Context, inputPath string, workspace *types.Workspace) ([]FileInfo, error) {
// Parse the volume and path/pattern
volumeName, volumePath := parseVolumeInput(inputPath)
if volumeName == "" {
Expand Down Expand Up @@ -319,11 +341,18 @@ func (vs *GlobalVolumeService) listPath(ctx context.Context, inputPath string, w
}

// Modify paths to be relative
files := make([]FileInfo, len(matches))
for i, p := range matches {
matches[i] = strings.TrimPrefix(p, rootVolumePath+"/")
info, _ := os.Stat(p)
files[i] = FileInfo{
Path: strings.TrimPrefix(p, rootVolumePath+"/"),
Size: uint64(info.Size()),
ModTime: info.ModTime().Unix(),
IsDir: info.IsDir(),
}
}

return matches, nil
return files, nil
}

func parseVolumeInput(input string) (string, string) {
Expand Down
3 changes: 1 addition & 2 deletions internal/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,6 @@ func (g *Gateway) registerServices() error {

// Register endpoint service
ws, err := endpoint.NewEndpointService(g.ctx, endpoint.EndpointServiceOpts{
Config: g.config,
ContainerRepo: g.ContainerRepo,
BackendRepo: g.BackendRepo,
TaskRepo: g.TaskRepo,
Expand All @@ -260,7 +259,7 @@ func (g *Gateway) registerServices() error {
pb.RegisterEndpointServiceServer(g.grpcServer, ws)

// Register volume service
vs, err := volume.NewGlobalVolumeService(g.BackendRepo)
vs, err := volume.NewGlobalVolumeService(g.BackendRepo, g.rootRouteGroup)
if err != nil {
return err
}
Expand Down
20 changes: 10 additions & 10 deletions internal/types/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import (
)

type Workspace struct {
Id uint `db:"id" json:"id"`
Id uint `db:"id" json:"id,omitempty"`
ExternalId string `db:"external_id" json:"external_id"`
Name string `db:"name" json:"name"`
CreatedAt time.Time `db:"created_at" json:"created_at"`
UpdatedAt time.Time `db:"updated_at" json:"updated_at"`
CreatedAt time.Time `db:"created_at" json:"created_at,omitempty"`
UpdatedAt time.Time `db:"updated_at" json:"updated_at,omitempty"`
}

const (
Expand All @@ -36,13 +36,13 @@ type Token struct {
}

type Volume struct {
Id uint `db:"id"`
ExternalId string `db:"external_id"`
Name string `db:"name"`
Size uint64 // Populated by volume abstraction
WorkspaceId uint `db:"workspace_id"` // Foreign key to Workspace
CreatedAt time.Time `db:"created_at"`
UpdatedAt time.Time `db:"updated_at"`
Id uint `db:"id" json:"id"`
ExternalId string `db:"external_id" json:"external_id"`
Name string `db:"name" json:"name"`
Size uint64 `json:"size"` // Populated by volume abstraction
WorkspaceId uint `db:"workspace_id" json:"workspace_id"` // Foreign key to Workspace
CreatedAt time.Time `db:"created_at" json:"created_at"`
UpdatedAt time.Time `db:"updated_at" json:"updated_at"`
}

type VolumeWithRelated struct {
Expand Down

0 comments on commit 02eaa79

Please sign in to comment.