Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement malloc volume #882

Merged
merged 3 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ func runGrpcServer(grpcPort int, useKvm bool, store gokv.Store, spdkAddress, qmp

pb.RegisterNvmeRemoteControllerServiceServer(s, backendServer)
pb.RegisterNullVolumeServiceServer(s, backendServer)
pb.RegisterMallocVolumeServiceServer(s, backendServer)
pb.RegisterAioVolumeServiceServer(s, backendServer)
pb.RegisterMiddleendEncryptionServiceServer(s, middleendServer)
pb.RegisterMiddleendQosVolumeServiceServer(s, middleendServer)
Expand Down Expand Up @@ -199,6 +200,7 @@ func runGatewayServer(grpcPort int, httpPort int) {

registerGatewayHandler(ctx, mux, endpoint, opts, pb.RegisterAioVolumeServiceHandlerFromEndpoint, "backend aio")
registerGatewayHandler(ctx, mux, endpoint, opts, pb.RegisterNullVolumeServiceHandlerFromEndpoint, "backend null")
registerGatewayHandler(ctx, mux, endpoint, opts, pb.RegisterMallocVolumeServiceHandlerFromEndpoint, "backend malloc")
registerGatewayHandler(ctx, mux, endpoint, opts, pb.RegisterNvmeRemoteControllerServiceHandlerFromEndpoint, "backend nvme")

registerGatewayHandler(ctx, mux, endpoint, opts, pb.RegisterMiddleendEncryptionServiceHandlerFromEndpoint, "middleend encryption")
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.1
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0
github.com/onsi/ginkgo/v2 v2.14.0
github.com/opiproject/gospdk v0.0.0-20240108075015-92d689fff706
github.com/opiproject/opi-api v0.0.0-20240118183513-e44db269fba4
github.com/opiproject/gospdk v0.0.0-20240415072512-98d71122a73b
github.com/opiproject/opi-api v0.0.0-20240415072823-bb755a5f6ecc
github.com/philippgille/gokv v0.6.0
github.com/philippgille/gokv/gomap v0.6.0
github.com/philippgille/gokv/redis v0.6.0
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -453,8 +453,14 @@ github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1Cpa
github.com/onsi/gomega v1.30.0 h1:hvMK7xYz4D3HapigLTeGdId/NcfQx1VHMJc60ew99+8=
github.com/opiproject/gospdk v0.0.0-20240108075015-92d689fff706 h1:fZcUuXgfsfWIbSZTPCJG72wh7LyKIyrpPR9kH+rKEXI=
github.com/opiproject/gospdk v0.0.0-20240108075015-92d689fff706/go.mod h1:9CMbTd9ptR6tl6HRRn8C33DPeWF85hTo4KZCa5iKftY=
github.com/opiproject/gospdk v0.0.0-20240115073959-dff04eece15a h1:RcBndvpWDCt4LciHMViwOs1rVXD/Qbi95KhY2bNyLfE=
github.com/opiproject/gospdk v0.0.0-20240115073959-dff04eece15a/go.mod h1:9CMbTd9ptR6tl6HRRn8C33DPeWF85hTo4KZCa5iKftY=
github.com/opiproject/gospdk v0.0.0-20240415072512-98d71122a73b h1:SlDLubL/Bo0ehKR0fNHUJosQ+ZNUrFpxFFmUKdNOxh8=
github.com/opiproject/gospdk v0.0.0-20240415072512-98d71122a73b/go.mod h1:9CMbTd9ptR6tl6HRRn8C33DPeWF85hTo4KZCa5iKftY=
github.com/opiproject/opi-api v0.0.0-20240118183513-e44db269fba4 h1:YBjvYWQQAbNIGsAXvB6FwL9Encr1nzo3/w+bB/tXltM=
github.com/opiproject/opi-api v0.0.0-20240118183513-e44db269fba4/go.mod h1:92pv4ulvvPMuxCJ9ND3aYbmBfEMLx0VCjpkiR7ZTqPY=
github.com/opiproject/opi-api v0.0.0-20240415072823-bb755a5f6ecc h1:iBcdnHiFFCIKggBDOL5S2OUONKyu8m+x/zhJGxIT2UY=
github.com/opiproject/opi-api v0.0.0-20240415072823-bb755a5f6ecc/go.mod h1:92pv4ulvvPMuxCJ9ND3aYbmBfEMLx0VCjpkiR7ZTqPY=
github.com/otiai10/copy v1.2.0/go.mod h1:rrF5dJ5F0t/EWSYODDu4j9/vEeYHMkc8jt0zJChqQWw=
github.com/otiai10/copy v1.11.0 h1:OKBD80J/mLBrwnzXqGtFCzprFSGioo30JcmR4APsNwc=
github.com/otiai10/curr v0.0.0-20150429015615-9b4961190c95/go.mod h1:9qAhocn7zKJG+0mI8eUu6xqkFDYS2kb2saOteoSB3cE=
Expand Down
7 changes: 5 additions & 2 deletions pkg/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ import (

// VolumeParameters contains all BackEnd volume related structures
type VolumeParameters struct {
AioVolumes map[string]*pb.AioVolume
NullVolumes map[string]*pb.NullVolume
AioVolumes map[string]*pb.AioVolume
NullVolumes map[string]*pb.NullVolume
MallocVolumes map[string]*pb.MallocVolume

NvmeControllers map[string]*pb.NvmeRemoteController
NvmePaths map[string]*pb.NvmePath
Expand All @@ -31,6 +32,7 @@ type VolumeParameters struct {
type Server struct {
pb.UnimplementedNvmeRemoteControllerServiceServer
pb.UnimplementedNullVolumeServiceServer
pb.UnimplementedMallocVolumeServiceServer
pb.UnimplementedAioVolumeServiceServer

rpc spdk.JSONRPC
Expand All @@ -55,6 +57,7 @@ func NewServer(jsonRPC spdk.JSONRPC, store gokv.Store) *Server {
Volumes: VolumeParameters{
AioVolumes: make(map[string]*pb.AioVolume),
NullVolumes: make(map[string]*pb.NullVolume),
MallocVolumes: make(map[string]*pb.MallocVolume),
NvmeControllers: make(map[string]*pb.NvmeRemoteController),
NvmePaths: make(map[string]*pb.NvmePath),
},
Expand Down
5 changes: 5 additions & 0 deletions pkg/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ var checkGlobalTestProtoObjectsNotChanged = utils.CheckTestProtoObjectsNotChange
&testAioVolumeWithName,
&testNullVolume,
&testNullVolumeWithName,
&testMallocVolume,
&testMallocVolumeWithName,
&testNvmeCtrl,
&testNvmeCtrlWithName,
&testNvmePath,
Expand All @@ -38,6 +40,7 @@ var checkGlobalTestProtoObjectsNotChanged = utils.CheckTestProtoObjectsNotChange
type backendClient struct {
pb.NvmeRemoteControllerServiceClient
pb.NullVolumeServiceClient
pb.MallocVolumeServiceClient
pb.AioVolumeServiceClient
}

Expand Down Expand Up @@ -83,6 +86,7 @@ func createTestEnvironment(spdkResponses []string) *testEnv {
env.client = &backendClient{
pb.NewNvmeRemoteControllerServiceClient(env.conn),
pb.NewNullVolumeServiceClient(env.conn),
pb.NewMallocVolumeServiceClient(env.conn),
pb.NewAioVolumeServiceClient(env.conn),
}

Expand All @@ -94,6 +98,7 @@ func dialer(opiSpdkServer *Server) func(context.Context, string) (net.Conn, erro
server := grpc.NewServer()
pb.RegisterNvmeRemoteControllerServiceServer(server, opiSpdkServer)
pb.RegisterNullVolumeServiceServer(server, opiSpdkServer)
pb.RegisterMallocVolumeServiceServer(server, opiSpdkServer)
pb.RegisterAioVolumeServiceServer(server, opiSpdkServer)

go func() {
Expand Down
281 changes: 281 additions & 0 deletions pkg/backend/malloc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,281 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2022-2024 Dell Inc, or its subsidiaries.
// Copyright (C) 2023 Intel Corporation
moshe-shahar marked this conversation as resolved.
Show resolved Hide resolved
// Copyright (c) 2024 Xsight Labs Inc

// Package backend implememnts the BackEnd APIs (network facing) of the storage Server
package backend

import (
"context"
"fmt"
"log"
"path"
"sort"

"github.com/opiproject/gospdk/spdk"
pb "github.com/opiproject/opi-api/storage/v1alpha1/gen/go"
"github.com/opiproject/opi-spdk-bridge/pkg/utils"

"github.com/google/uuid"
"go.einride.tech/aip/fieldbehavior"
"go.einride.tech/aip/fieldmask"
"go.einride.tech/aip/resourceid"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
)

func sortMallocVolumes(volumes []*pb.MallocVolume) {
sort.Slice(volumes, func(i int, j int) bool {
return volumes[i].Name < volumes[j].Name
})
}

// CreateMallocVolume creates a Malloc volume instance
func (s *Server) CreateMallocVolume(ctx context.Context, in *pb.CreateMallocVolumeRequest) (*pb.MallocVolume, error) {
// check input correctness
if err := s.validateCreateMallocVolumeRequest(in); err != nil {
return nil, err
}
// see https://google.aip.dev/133#user-specified-ids
resourceID := resourceid.NewSystemGenerated()
if in.MallocVolumeId != "" {
log.Printf("client provided the ID of a resource %v, ignoring the name field %v", in.MallocVolumeId, in.MallocVolume.Name)
resourceID = in.MallocVolumeId
}
in.MallocVolume.Name = utils.ResourceIDToVolumeName(resourceID)
// idempotent API when called with same key, should return same object
volume, ok := s.Volumes.MallocVolumes[in.MallocVolume.Name]
if ok {
log.Printf("Already existing MallocVolume with id %v", in.MallocVolume.Name)
return volume, nil
}
// not found, so create a new one
params := spdk.BdevMallocCreateParams{
Name: resourceID,
BlockSize: int(in.GetMallocVolume().GetBlockSize()),
NumBlocks: int(in.GetMallocVolume().GetBlocksCount()),
MdSize: int(in.GetMallocVolume().GetMetadataSize()),
MdInterleave: true,
glimchb marked this conversation as resolved.
Show resolved Hide resolved
}
var result spdk.BdevMallocCreateResult
err := s.rpc.Call(ctx, "bdev_malloc_create", &params, &result)
if err != nil {
return nil, err
}
log.Printf("Received from SPDK: %v", result)
if result == "" {
msg := fmt.Sprintf("Could not create Malloc Dev: %s", params.Name)
return nil, status.Errorf(codes.InvalidArgument, msg)
}
response := utils.ProtoClone(in.MallocVolume)
s.Volumes.MallocVolumes[in.MallocVolume.Name] = response
return response, nil
}

// DeleteMallocVolume deletes a Malloc volume instance
func (s *Server) DeleteMallocVolume(ctx context.Context, in *pb.DeleteMallocVolumeRequest) (*emptypb.Empty, error) {
// check input correctness
if err := s.validateDeleteMallocVolumeRequest(in); err != nil {
return nil, err
}
// fetch object from the database
volume, ok := s.Volumes.MallocVolumes[in.Name]
if !ok {
if in.AllowMissing {
return &emptypb.Empty{}, nil
}
err := status.Errorf(codes.NotFound, "unable to find key %s", in.Name)
return nil, err
}
resourceID := path.Base(volume.Name)
params := spdk.BdevMallocDeleteParams{
Name: resourceID,
}
var result spdk.BdevMallocDeleteResult
err := s.rpc.Call(ctx, "bdev_malloc_delete", &params, &result)
if err != nil {
return nil, err
}
log.Printf("Received from SPDK: %v", result)
if !result {
msg := fmt.Sprintf("Could not delete Malloc Dev: %s", params.Name)
return nil, status.Errorf(codes.InvalidArgument, msg)
}
delete(s.Volumes.MallocVolumes, volume.Name)
return &emptypb.Empty{}, nil
}

// UpdateMallocVolume updates a Malloc volume instance
func (s *Server) UpdateMallocVolume(ctx context.Context, in *pb.UpdateMallocVolumeRequest) (*pb.MallocVolume, error) {
// check input correctness
if err := s.validateUpdateMallocVolumeRequest(in); err != nil {
return nil, err
}
// fetch object from the database
volume, ok := s.Volumes.MallocVolumes[in.MallocVolume.Name]
if !ok {
if in.AllowMissing {
log.Printf("Got AllowMissing, create a new resource, don't return error when resource not found")
params := spdk.BdevMallocCreateParams{
Name: path.Base(in.MallocVolume.Name),
BlockSize: int(in.GetMallocVolume().GetBlockSize()),
NumBlocks: int(in.GetMallocVolume().GetBlocksCount()),
}
var result spdk.BdevMallocCreateResult
err := s.rpc.Call(ctx, "bdev_malloc_create", &params, &result)
if err != nil {
return nil, err
}
log.Printf("Received from SPDK: %v", result)
if result == "" {
msg := fmt.Sprintf("Could not create Malloc Dev: %s", params.Name)
return nil, status.Errorf(codes.InvalidArgument, msg)
}
response := utils.ProtoClone(in.MallocVolume)
s.Volumes.MallocVolumes[in.MallocVolume.Name] = response
return response, nil
}
err := status.Errorf(codes.NotFound, "unable to find key %s", in.MallocVolume.Name)
return nil, err
}
resourceID := path.Base(volume.Name)
// update_mask = 2
if err := fieldmask.Validate(in.UpdateMask, in.MallocVolume); err != nil {
return nil, err
}
params1 := spdk.BdevMallocDeleteParams{
Name: resourceID,
}
var result1 spdk.BdevMallocDeleteResult
err1 := s.rpc.Call(ctx, "bdev_malloc_delete", &params1, &result1)
if err1 != nil {
return nil, err1
}
log.Printf("Received from SPDK: %v", result1)
if !result1 {
msg := fmt.Sprintf("Could not delete Malloc Dev: %s", params1.Name)
return nil, status.Errorf(codes.InvalidArgument, msg)
}
params2 := spdk.BdevMallocCreateParams{
Name: resourceID,
BlockSize: 512,
NumBlocks: 64,
}
var result2 spdk.BdevMallocCreateResult
err2 := s.rpc.Call(ctx, "bdev_malloc_create", &params2, &result2)
if err2 != nil {
return nil, err2
}
log.Printf("Received from SPDK: %v", result2)
if result2 == "" {
msg := fmt.Sprintf("Could not create Malloc Dev: %s", params2.Name)
return nil, status.Errorf(codes.InvalidArgument, msg)
}
response := utils.ProtoClone(in.MallocVolume)
s.Volumes.MallocVolumes[in.MallocVolume.Name] = response
return response, nil
}

// ListMallocVolumes lists Malloc volume instances
func (s *Server) ListMallocVolumes(ctx context.Context, in *pb.ListMallocVolumesRequest) (*pb.ListMallocVolumesResponse, error) {
// check required fields
if err := fieldbehavior.ValidateRequiredFields(in); err != nil {
return nil, err
}
// fetch object from the database
size, offset, perr := utils.ExtractPagination(in.PageSize, in.PageToken, s.Pagination)
if perr != nil {
return nil, perr
}
var result []spdk.BdevGetBdevsResult
err := s.rpc.Call(ctx, "bdev_get_bdevs", nil, &result)
if err != nil {
return nil, err
}
log.Printf("Received from SPDK: %v", result)
token := ""
log.Printf("Limiting result len(%d) to [%d:%d]", len(result), offset, size)
result, hasMoreElements := utils.LimitPagination(result, offset, size)
if hasMoreElements {
token = uuid.New().String()
s.Pagination[token] = offset + size
}
Blobarray := make([]*pb.MallocVolume, len(result))
for i := range result {
r := &result[i]
Blobarray[i] = &pb.MallocVolume{Name: r.Name, Uuid: r.UUID, BlockSize: r.BlockSize, BlocksCount: r.NumBlocks}
}
sortMallocVolumes(Blobarray)
return &pb.ListMallocVolumesResponse{MallocVolumes: Blobarray, NextPageToken: token}, nil
}

// GetMallocVolume gets a a Malloc volume instance
func (s *Server) GetMallocVolume(ctx context.Context, in *pb.GetMallocVolumeRequest) (*pb.MallocVolume, error) {
// check input correctness
if err := s.validateGetMallocVolumeRequest(in); err != nil {
return nil, err
}
// fetch object from the database
volume, ok := s.Volumes.MallocVolumes[in.Name]
if !ok {
err := status.Errorf(codes.NotFound, "unable to find key %s", in.Name)
return nil, err
}
resourceID := path.Base(volume.Name)
params := spdk.BdevGetBdevsParams{
Name: resourceID,
}
var result []spdk.BdevGetBdevsResult
err := s.rpc.Call(ctx, "bdev_get_bdevs", &params, &result)
if err != nil {
return nil, err
}
log.Printf("Received from SPDK: %v", result)
if len(result) != 1 {
msg := fmt.Sprintf("expecting exactly 1 result, got %d", len(result))
return nil, status.Errorf(codes.InvalidArgument, msg)
}
return &pb.MallocVolume{Name: result[0].Name, Uuid: result[0].UUID, BlockSize: result[0].BlockSize, BlocksCount: result[0].NumBlocks}, nil
}

// StatsMallocVolume gets a Malloc volume instance stats
func (s *Server) StatsMallocVolume(ctx context.Context, in *pb.StatsMallocVolumeRequest) (*pb.StatsMallocVolumeResponse, error) {
// check input correctness
if err := s.validateStatsMallocVolumeRequest(in); err != nil {
return nil, err
}
// fetch object from the database
volume, ok := s.Volumes.MallocVolumes[in.Name]
if !ok {
err := status.Errorf(codes.NotFound, "unable to find key %s", in.Name)
return nil, err
}
resourceID := path.Base(volume.Name)
params := spdk.BdevGetIostatParams{
Name: resourceID,
}
// See https://mholt.github.io/json-to-go/
var result spdk.BdevGetIostatResult
err := s.rpc.Call(ctx, "bdev_get_iostat", &params, &result)
if err != nil {
return nil, err
}
log.Printf("Received from SPDK: %v", result)
if len(result.Bdevs) != 1 {
msg := fmt.Sprintf("expecting exactly 1 result, got %d", len(result.Bdevs))
return nil, status.Errorf(codes.InvalidArgument, msg)
}
return &pb.StatsMallocVolumeResponse{Stats: &pb.VolumeStats{
ReadBytesCount: int32(result.Bdevs[0].BytesRead),
ReadOpsCount: int32(result.Bdevs[0].NumReadOps),
WriteBytesCount: int32(result.Bdevs[0].BytesWritten),
WriteOpsCount: int32(result.Bdevs[0].NumWriteOps),
UnmapBytesCount: int32(result.Bdevs[0].BytesUnmapped),
UnmapOpsCount: int32(result.Bdevs[0].NumUnmapOps),
ReadLatencyTicks: int32(result.Bdevs[0].ReadLatencyTicks),
WriteLatencyTicks: int32(result.Bdevs[0].WriteLatencyTicks),
UnmapLatencyTicks: int32(result.Bdevs[0].UnmapLatencyTicks),
}}, nil
}
Loading
Loading