Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - v2alpha1 atxs stream #5566

Closed
wants to merge 29 commits into from
Closed
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
2381693
iterate over database
dshulyak Nov 7, 2023
bab8ac6
add half baked streaming for atxs
dshulyak Nov 7, 2023
a617a05
refactor atxs db api
dshulyak Nov 9, 2023
4e9610d
save progress
dshulyak Nov 9, 2023
a767855
register v2
dshulyak Nov 9, 2023
d30636f
support for offset and limit
dshulyak Nov 9, 2023
7ff7f67
encode the rest of the fields
dshulyak Nov 9, 2023
e609d66
Merge branch 'develop' into atxs-stream
dshulyak Nov 9, 2023
f4abc11
allow to watch
dshulyak Nov 9, 2023
0c43282
track todo
dshulyak Nov 9, 2023
2da3634
fix where
dshulyak Nov 9, 2023
82fd29d
refactor offset / limit
dshulyak Nov 9, 2023
af95f33
debug
dshulyak Nov 9, 2023
8958021
Merge branch 'develop' into v2-alpha-atxs-stream
kacpersaw Feb 8, 2024
13a10f9
Use string builder to build query
kacpersaw Feb 9, 2024
7edbb42
add matcher to stream func
kacpersaw Feb 9, 2024
e07f5fb
Move to v2alpha1 and remove headers
kacpersaw Feb 12, 2024
cea030a
Add tests
kacpersaw Feb 13, 2024
c20d397
Add ActivationsCount handler
kacpersaw Feb 13, 2024
7a6a2ea
Extract query builder into separate pkg
kacpersaw Feb 13, 2024
f90fc70
lint fix
kacpersaw Feb 13, 2024
4f77497
Move service name to const
kacpersaw Feb 13, 2024
033872e
Add unit tests & bump api version
kacpersaw Feb 15, 2024
9cef9fa
Merge branch 'develop' into v2-alpha-atxs-stream
kacpersaw Feb 16, 2024
f533c6d
Apply suggestions from code review
kacpersaw Feb 16, 2024
fe17373
Apply review suggestions
kacpersaw Feb 16, 2024
7635482
lint
kacpersaw Feb 16, 2024
1f80fa9
apply review suggestion
kacpersaw Feb 16, 2024
d87b0e8
Merge branch 'develop' into v2-alpha-atxs-stream
kacpersaw Feb 16, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions api/grpcserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,25 @@ type Config struct {
type Service = string

const (
Admin Service = "admin"
Debug Service = "debug"
GlobalState Service = "global"
Mesh Service = "mesh"
Transaction Service = "transaction"
Activation Service = "activation"
Smesher Service = "smesher"
Post Service = "post"
Node Service = "node"
Admin Service = "admin"
Debug Service = "debug"
GlobalState Service = "global"
Mesh Service = "mesh"
Transaction Service = "transaction"
Activation Service = "activation"
Smesher Service = "smesher"
Post Service = "post"
Node Service = "node"
ActivationV2Alpha1 Service = "activation_v2alpha1"
ActivationStreamV2Alpha1 Service = "activation_stream_v2alpha1"
)

// DefaultConfig defines the default configuration options for api.
func DefaultConfig() Config {
return Config{
PublicServices: []Service{GlobalState, Mesh, Transaction, Node, Activation},
PublicServices: []Service{GlobalState, Mesh, Transaction, Node, Activation, ActivationV2Alpha1},
PublicListener: "0.0.0.0:9092",
PrivateServices: []Service{Admin, Smesher, Debug},
PrivateServices: []Service{Admin, Smesher, Debug, ActivationStreamV2Alpha1},
poszu marked this conversation as resolved.
Show resolved Hide resolved
PrivateListener: "127.0.0.1:9093",
PostServices: []Service{Post},
PostListener: "127.0.0.1:9094",
Expand Down
348 changes: 348 additions & 0 deletions api/grpcserver/v2alpha1/activation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,348 @@
package v2alpha1

import (
"context"
"errors"
"io"

"github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
spacemeshv2alpha1 "github.com/spacemeshos/api/release/go/spacemesh/v2alpha1"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

"github.com/spacemeshos/go-spacemesh/api/grpcserver"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/events"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/atxs"
"github.com/spacemeshos/go-spacemesh/sql/builder"
)

const (
Activation = "activation_v2alpha1"
ActivationStream = "activation_stream_v2alpha1"
)

func NewActivationStreamService(db *sql.Database) *ActivationStreamService {
return &ActivationStreamService{db: db}
}

type ActivationStreamService struct {
db *sql.Database
poszu marked this conversation as resolved.
Show resolved Hide resolved
}

var _ grpcserver.ServiceAPI = (*ActivationStreamService)(nil)
poszu marked this conversation as resolved.
Show resolved Hide resolved

func (s *ActivationStreamService) RegisterService(server *grpc.Server) {
spacemeshv2alpha1.RegisterActivationStreamServiceServer(server, s)
}

func (s *ActivationStreamService) RegisterHandlerService(mux *runtime.ServeMux) error {
return spacemeshv2alpha1.RegisterActivationStreamServiceHandlerServer(context.Background(), mux, s)

Check warning on line 45 in api/grpcserver/v2alpha1/activation.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L44-L45

Added lines #L44 - L45 were not covered by tests
}

func (s *ActivationStreamService) String() string {
return "ActivationStreamService"

Check warning on line 49 in api/grpcserver/v2alpha1/activation.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L48-L49

Added lines #L48 - L49 were not covered by tests
}

func (s *ActivationStreamService) Stream(
request *spacemeshv2alpha1.ActivationStreamRequest,
stream spacemeshv2alpha1.ActivationStreamService_StreamServer,
) error {
var sub *events.BufferedSubscription[events.ActivationTx]
if request.Watch {
matcher := resultsMatcher{request, stream.Context()}
var err error
sub, err = events.SubscribeMatched(matcher.match)
if err != nil {
return status.Error(codes.Internal, err.Error())

Check warning on line 62 in api/grpcserver/v2alpha1/activation.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L62

Added line #L62 was not covered by tests
}
defer sub.Close()
if err := stream.SendHeader(metadata.MD{}); err != nil {
return status.Errorf(codes.Unavailable, "can't send header")

Check warning on line 66 in api/grpcserver/v2alpha1/activation.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L66

Added line #L66 was not covered by tests
}
}
ops, err := toOperations(toRequest(request))
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())

Check warning on line 71 in api/grpcserver/v2alpha1/activation.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L71

Added line #L71 was not covered by tests
}
var ierr error
if err := atxs.IterateAtxsOps(s.db, ops, func(atx *types.VerifiedActivationTx) bool {
ierr = stream.Send(&spacemeshv2alpha1.Activation{Versioned: &spacemeshv2alpha1.Activation_V1{V1: toAtx(atx)}})
return ierr == nil
}); err != nil {
return status.Error(codes.Internal, err.Error())

Check warning on line 78 in api/grpcserver/v2alpha1/activation.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L78

Added line #L78 was not covered by tests
}
if sub == nil {
return nil
}
poszu marked this conversation as resolved.
Show resolved Hide resolved
for {
select {
case <-stream.Context().Done():
return nil
case <-sub.Full():
return status.Error(codes.Canceled, "buffer overflow")

Check warning on line 88 in api/grpcserver/v2alpha1/activation.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L87-L88

Added lines #L87 - L88 were not covered by tests
case rst := <-sub.Out():
if err := stream.Send(&spacemeshv2alpha1.Activation{
Versioned: &spacemeshv2alpha1.Activation_V1{V1: toAtx(rst.VerifiedActivationTx)},
},
); err != nil {
if errors.Is(err, io.EOF) {
return nil

Check warning on line 95 in api/grpcserver/v2alpha1/activation.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L94-L95

Added lines #L94 - L95 were not covered by tests
}
return status.Error(codes.Internal, err.Error())

Check warning on line 97 in api/grpcserver/v2alpha1/activation.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L97

Added line #L97 was not covered by tests
}
kacpersaw marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

func toAtx(atx *types.VerifiedActivationTx) *spacemeshv2alpha1.ActivationV1 {
v1 := &spacemeshv2alpha1.ActivationV1{
Id: atx.ID().Bytes(),
NodeId: atx.SmesherID.Bytes(),
Signature: atx.Signature.Bytes(),
PublishEpoch: atx.PublishEpoch.Uint32(),
Sequence: atx.Sequence,
PreviousAtx: atx.PrevATXID[:],
PositioningAtx: atx.PositioningATX[:],
Coinbase: atx.Coinbase.String(),
Units: atx.NumUnits,
BaseHeight: uint32(atx.BaseTickHeight()),
Ticks: uint32(atx.TickCount()),
}
if atx.CommitmentATX != nil {
v1.CommittmentAtx = atx.CommitmentATX.Bytes()

Check warning on line 118 in api/grpcserver/v2alpha1/activation.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L118

Added line #L118 was not covered by tests
}
if atx.VRFNonce != nil {
v1.VrfPostIndex = &spacemeshv2alpha1.VRFPostIndex{
Nonce: uint64(*atx.VRFNonce),

Check warning on line 122 in api/grpcserver/v2alpha1/activation.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L121-L122

Added lines #L121 - L122 were not covered by tests
}
}
if atx.InitialPost != nil {
v1.InitialPost = &spacemeshv2alpha1.Post{
Nonce: atx.InitialPost.Nonce,
Indices: atx.InitialPost.Indices,
Pow: atx.InitialPost.Pow,

Check warning on line 129 in api/grpcserver/v2alpha1/activation.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L126-L129

Added lines #L126 - L129 were not covered by tests
}
}
if nipost := atx.NIPost; nipost != nil {
if nipost.Post != nil {
v1.Post = &spacemeshv2alpha1.Post{
Nonce: nipost.Post.Nonce,
Indices: nipost.Post.Indices,
Pow: nipost.Post.Pow,

Check warning on line 137 in api/grpcserver/v2alpha1/activation.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L133-L137

Added lines #L133 - L137 were not covered by tests
}
}
if nipost.PostMetadata != nil {
v1.PostMeta = &spacemeshv2alpha1.PostMeta{
Challenge: nipost.PostMetadata.Challenge,
LabelsPerUnit: nipost.PostMetadata.LabelsPerUnit,

Check warning on line 143 in api/grpcserver/v2alpha1/activation.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L140-L143

Added lines #L140 - L143 were not covered by tests
}
}
poszu marked this conversation as resolved.
Show resolved Hide resolved
v1.Membership = &spacemeshv2alpha1.PoetMembershipProof{
ProofNodes: make([][]byte, len(nipost.Membership.Nodes)),
Leaf: nipost.Membership.LeafIndex,

Check warning on line 148 in api/grpcserver/v2alpha1/activation.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L146-L148

Added lines #L146 - L148 were not covered by tests
}
for i, node := range nipost.Membership.Nodes {
v1.Membership.ProofNodes[i] = node.Bytes()

Check warning on line 151 in api/grpcserver/v2alpha1/activation.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L150-L151

Added lines #L150 - L151 were not covered by tests
}
}
return v1
}

func NewActivationService(db *sql.Database) *ActivationService {
return &ActivationService{db: db}
}

type ActivationService struct {
db *sql.Database
}

var _ grpcserver.ServiceAPI = (*ActivationService)(nil)

func (s *ActivationService) RegisterService(server *grpc.Server) {
spacemeshv2alpha1.RegisterActivationServiceServer(server, s)
}

func (s *ActivationService) RegisterHandlerService(mux *runtime.ServeMux) error {
return spacemeshv2alpha1.RegisterActivationServiceHandlerServer(context.Background(), mux, s)
}

// String returns the service name.
func (s *ActivationService) String() string {
return "ActivationService"

Check warning on line 177 in api/grpcserver/v2alpha1/activation.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L176-L177

Added lines #L176 - L177 were not covered by tests
}

func (s *ActivationService) List(
ctx context.Context,
request *spacemeshv2alpha1.ActivationRequest,
) (*spacemeshv2alpha1.ActivationList, error) {
ops, err := toOperations(request)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())

Check warning on line 186 in api/grpcserver/v2alpha1/activation.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L186

Added line #L186 was not covered by tests
}
// every full atx is ~1KB. 100 atxs is ~100KB.
if request.Limit > 100 {
return nil, status.Error(codes.InvalidArgument, "limit is capped at 100")
} else if request.Limit == 0 {
return nil, status.Error(codes.InvalidArgument, "limit must be set to <= 100")
}
kacpersaw marked this conversation as resolved.
Show resolved Hide resolved
rst := make([]*spacemeshv2alpha1.Activation, 0, request.Limit)
if err := atxs.IterateAtxsOps(s.db, ops, func(atx *types.VerifiedActivationTx) bool {
rst = append(rst, &spacemeshv2alpha1.Activation{Versioned: &spacemeshv2alpha1.Activation_V1{V1: toAtx(atx)}})
return true
}); err != nil {
return nil, status.Error(codes.Internal, err.Error())

Check warning on line 199 in api/grpcserver/v2alpha1/activation.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L199

Added line #L199 was not covered by tests
}
return &spacemeshv2alpha1.ActivationList{Activations: rst}, nil
}

func (s *ActivationService) ActivationsCount(
ctx context.Context,
request *spacemeshv2alpha1.ActivationsCountRequest,
) (*spacemeshv2alpha1.ActivationsCountResponse, error) {
ops := builder.Operations{Filter: []builder.Op{
{
Field: builder.Epoch,
Token: builder.Eq,
Value: int64(request.Epoch),
},
}}

count, err := atxs.CountAtxsByEpoch(s.db, ops)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())

Check warning on line 218 in api/grpcserver/v2alpha1/activation.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L218

Added line #L218 was not covered by tests
}

return &spacemeshv2alpha1.ActivationsCountResponse{Count: count}, nil
}

func toRequest(filter *spacemeshv2alpha1.ActivationStreamRequest) *spacemeshv2alpha1.ActivationRequest {
return &spacemeshv2alpha1.ActivationRequest{
NodeId: filter.NodeId,
Id: filter.Id,
Coinbase: filter.Coinbase,
StartEpoch: filter.StartEpoch,
EndEpoch: filter.EndEpoch,
}
}

func toOperations(filter *spacemeshv2alpha1.ActivationRequest) (builder.Operations, error) {
ops := builder.Operations{}
if filter == nil {
return ops, nil

Check warning on line 237 in api/grpcserver/v2alpha1/activation.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L237

Added line #L237 was not covered by tests
}
if filter.NodeId != nil {
ops.Filter = append(ops.Filter, builder.Op{
Field: builder.Smesher,
Token: builder.Eq,
Value: filter.NodeId,
})
}
if filter.Id != nil {
ops.Filter = append(ops.Filter, builder.Op{
Field: builder.Id,
Token: builder.Eq,
Value: filter.Id,
})
}
if len(filter.Coinbase) > 0 {
addr, err := types.StringToAddress(filter.Coinbase)
if err != nil {
return builder.Operations{}, err

Check warning on line 256 in api/grpcserver/v2alpha1/activation.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L256

Added line #L256 was not covered by tests
}
ops.Filter = append(ops.Filter, builder.Op{
Field: builder.Coinbase,
Token: builder.Eq,
Value: addr.Bytes(),
})
}
if filter.StartEpoch != 0 {
ops.Filter = append(ops.Filter, builder.Op{
Field: builder.Epoch,
Token: builder.Gte,
Value: int64(filter.StartEpoch),
})
}
if filter.EndEpoch != 0 {
ops.Filter = append(ops.Filter, builder.Op{
Field: builder.Epoch,
Token: builder.Lte,
Value: int64(filter.EndEpoch),
})

Check warning on line 276 in api/grpcserver/v2alpha1/activation.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L272-L276

Added lines #L272 - L276 were not covered by tests
}

ops.Other = append(ops.Other, builder.Op{
Field: builder.OrderBy,
Value: "epoch asc, id",
})

if filter.Limit != 0 {
ops.Other = append(ops.Other, builder.Op{
Field: builder.Limit,
Value: int64(filter.Limit),
})
}
if filter.Offset != 0 {
ops.Other = append(ops.Other, builder.Op{
Field: builder.Offset,
Value: int64(filter.Offset),
})
}

return ops, nil
}

type resultsMatcher struct {
*spacemeshv2alpha1.ActivationStreamRequest
ctx context.Context
}

func (m *resultsMatcher) match(t *events.ActivationTx) bool {
if len(m.NodeId) > 0 {
var nodeId types.NodeID
copy(nodeId[:], m.NodeId)

if t.SmesherID != nodeId {
return false
}
}

if len(m.Id) > 0 {
var atxId types.ATXID
copy(atxId[:], m.Id)

if t.ID() != atxId {
return false
}
}

if len(m.Coinbase) > 0 {
addr, err := types.StringToAddress(m.Coinbase)
if err != nil {
ctxzap.Error(m.ctx, "unable to convert atx coinbase", zap.Error(err))
return false

Check warning on line 328 in api/grpcserver/v2alpha1/activation.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L327-L328

Added lines #L327 - L328 were not covered by tests
}
if t.Coinbase != addr {
return false
}
}

if m.StartEpoch != 0 {
if t.PublishEpoch.Uint32() < m.StartEpoch {
return false

Check warning on line 337 in api/grpcserver/v2alpha1/activation.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L337

Added line #L337 was not covered by tests
}
}

if m.EndEpoch != 0 {
if t.PublishEpoch.Uint32() > m.EndEpoch {
return false

Check warning on line 343 in api/grpcserver/v2alpha1/activation.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L342-L343

Added lines #L342 - L343 were not covered by tests
}
}

return true
}
Loading