Skip to content

Commit

Permalink
Release v0.9.0
Browse files Browse the repository at this point in the history
Adjustments and new features include:
- Improved configuration management system
- Improved instrumentation and add error reporting
- Improved "resolver" implementation
- Upgrade gRPC and several other dependencies
  • Loading branch information
bcessa committed Jul 23, 2022
1 parent 96e0c4f commit abab050
Show file tree
Hide file tree
Showing 27 changed files with 1,817 additions and 571 deletions.
19 changes: 14 additions & 5 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,24 @@ jobs:
GITHUB_USER: ${{ github.actor }}
ACCESS_TOKEN: ${{ secrets.ACCESS_TOKEN }}

# Get commit message
- name: Get commit message
run: |
echo 'commit_msg<<EOF' >> $GITHUB_ENV
git log --format=%B -n 1 ${{ github.sha }} >> $GITHUB_ENV
echo 'EOF' >> $GITHUB_ENV
# List direct dependencies
- name: List dependencies
run: go list -mod=readonly -f '{{if not .Indirect}}{{.}}{{end}}' -m all > go.list

# Scan dependencies using Nancy
# Can be excluded if the commit message contains: [scan-deps skip]
# https://github.com/sonatype-nexus-community/nancy-github-action
- name: Scan dependencies
if: ${{ !contains(env.commit_msg, '[scan-deps skip]') }}
uses: sonatype-nexus-community/[email protected]

# Validate the protocol buffer definitions on the project
# using 'buf'. Remove if not required.
protos:
Expand Down Expand Up @@ -73,21 +82,21 @@ jobs:
# Setup buf
- name: Setup buf
id: buf-setup
uses: bufbuild/buf-setup-action@v1.5.0
uses: bufbuild/buf-setup-action@v1.6.0
with:
version: 1.5.0
github_token: ${{ github.token }}

# Static analysis
- name: Static analysis
id: buf-lint
uses: bufbuild/[email protected].0
uses: bufbuild/[email protected].1
if: ${{ steps.buf-setup.outcome == 'success' }}

# Detect breaking changes
- name: Detect breaking changes
id: buf-breaking
uses: bufbuild/buf-breaking-action@v1.0.0
uses: bufbuild/buf-breaking-action@v1.1.0
if: steps.buf-lint.outcome == 'success' && !contains(env.commit_msg, '[buf-breaking skip]')
with:
against: 'https://github.com/${{ github.repository }}.git#branch=master'
Expand Down Expand Up @@ -129,7 +138,7 @@ jobs:
# if: steps.vendor-cache.outputs.cache-hit != 'true'
- name: Restore modules from cache
id: vendor-cache
uses: actions/cache@v3.0.2
uses: actions/cache@v3
env:
cache-name: vendor
with:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:
# if: steps.vendor-cache.outputs.cache-hit != 'true'
- name: Restore modules from cache
id: vendor-cache
uses: actions/cache@v3.0.2
uses: actions/cache@v3
env:
cache-name: vendor
with:
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
.DS_Store
.idea
*.iml
.vscode

# Deps
vendor/**
Expand Down Expand Up @@ -35,6 +36,7 @@ secret*

# Default agent data store
data
/config.yaml

# Code coverage reports
coverage.*
8 changes: 2 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,14 @@ ca-roots:

## deps: Download and compile all dependencies and intermediary products
deps:
@-rm -rf vendor
go clean
go mod tidy
go mod verify
go mod download
go mod vendor

## docs: Display package documentation on local server
docs:
@echo "Docs available at: http://localhost:8080/"
godoc -http=:8080 -goroot=${GOPATH} -play


## docker: Build docker image
# https://github.com/opencontainers/image-spec/blob/master/annotations.md
docker:
Expand Down Expand Up @@ -167,4 +163,4 @@ test:
## updates: List available updates for direct dependencies
# https://github.com/golang/go/wiki/Modules#how-to-upgrade-and-downgrade-dependencies
updates:
@go list -mod=mod -f '{{if (and (not (or .Main .Indirect)) .Update)}}{{.Path}}: {{.Version}} -> {{.Update.Version}}{{end}}' -u -m all 2> /dev/null
@GOWORK=off go list -u -f '{{if (and (not (or .Main .Indirect)) .Update)}}{{.Path}}: {{.Version}} -> {{.Update.Version}}{{end}}' -mod=mod -m all 2> /dev/null
88 changes: 62 additions & 26 deletions agent/handler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package agent

import (
"context"
"crypto/sha256"
"encoding/json"
"errors"
Expand Down Expand Up @@ -42,81 +43,96 @@ func (h *Handler) Close() error {
}

// Retrieve an existing DID instance based on its subject string.
func (h *Handler) Retrieve(req *protov1.QueryRequest) (*did.Identifier, *did.ProofLD, error) {
logFields := xlog.Fields{
"method": req.Method,
"subject": req.Subject,
}
h.oop.WithFields(logFields).Debug("retrieve request")
func (h *Handler) Retrieve(ctx context.Context, req *protov1.QueryRequest) (*did.Identifier, *did.ProofLD, error) {
// Track operation
task := h.oop.Start(
ctx,
"handler.Retrieve",
otel.WithSpanKind(otel.SpanKindServer),
otel.WithSpanAttributes(otel.Attributes{"method": req.Method}))
defer task.End()

// Verify method is supported
if !h.isSupported(req.Method) {
h.oop.WithFields(logFields).Warning("non supported method")
return nil, nil, errors.New("non supported method")
err := errors.New("unsupported method")
task.Error(xlog.Error, err, nil)
return nil, nil, err
}

// Retrieve document from storage
task.Event("database read")
id, proof, err := h.store.Get(req)
if err != nil {
h.oop.WithFields(logFields).Warning(err.Error())
task.Error(xlog.Error, err, nil)
return nil, nil, err
}
return id, proof, nil
}

// Process an incoming request ticket.
func (h *Handler) Process(req *protov1.ProcessRequest) error {
// Empty request
if req == nil {
return errors.New("empty request")
}
func (h *Handler) Process(ctx context.Context, req *protov1.ProcessRequest) error {
// Track operation
task := h.oop.Start(
ctx,
"handler.Process",
otel.WithSpanKind(otel.SpanKindServer),
otel.WithSpanAttributes(otel.Attributes{"task": req.Task.String()}))
defer task.End()

// Validate ticket
if err := req.Ticket.Verify(h.difficulty); err != nil {
h.oop.WithFields(xlog.Fields{"error": err.Error()}).Error("invalid ticket")
task.Error(xlog.Error, err, nil)
return err
}

// Load DID document and proof
id, err := req.Ticket.GetDID()
if err != nil {
h.oop.WithFields(xlog.Fields{"error": err.Error()}).Error("invalid DID contents")
task.Error(xlog.Error, err, nil)
return err
}
proof, err := req.Ticket.GetProofLD()
if err != nil {
h.oop.WithFields(xlog.Fields{"error": err.Error()}).Error("invalid DID proof")
task.Error(xlog.Error, err, nil)
return err
}

// Verify method is supported
if !h.isSupported(id.Method()) {
h.oop.WithFields(xlog.Fields{"method": id.Method()}).Warning("non supported method")
return errors.New("non supported method")
err := errors.New("unsupported method")
task.Error(xlog.Error, err, otel.Attributes{
"method": id.Method(),
})
return err
}

// Update operations require another validation step using the original record
isUpdate := h.store.Exists(id)
if isUpdate {
if err := req.Ticket.Verify(h.difficulty); err != nil {
h.oop.WithFields(xlog.Fields{"error": err.Error()}).Error("invalid ticket")
task.Error(xlog.Error, err, nil)
return err
}
}

h.oop.WithFields(xlog.Fields{
fields := otel.Attributes{
"subject": id.Subject(),
"update": isUpdate,
"task": req.Task,
}).Debug("write operation")
"task": req.Task.String(),
}
switch req.Task {
case protov1.ProcessRequest_TASK_PUBLISH:
task.Event("database save", fields)
err = h.store.Save(id, proof)
case protov1.ProcessRequest_TASK_DEACTIVATE:
task.Event("database delete", fields)
err = h.store.Delete(id)
default:
return errors.New("invalid request task")
}
if err != nil {
task.Error(xlog.Error, err, fields)
}
return err
}

Expand All @@ -126,14 +142,23 @@ func (h *Handler) ServerSetup(srv *grpc.Server) {
protov1.RegisterAgentAPIServer(srv, &rpcHandler{handler: h})
}

// GatewaySetup return the HTTP setup required to be expose the handler
// GatewaySetup return the HTTP setup required to expose the handler
// instance via HTTP.
func (h *Handler) GatewaySetup() rpc.GatewayRegister {
return protov1.RegisterAgentAPIHandler
}

// CustomGatewayOptions returns additional settings required when exposing
// the handler instance via HTTP.
func (h *Handler) CustomGatewayOptions() []rpc.GatewayOption {
return []rpc.GatewayOption{
rpc.WithSpanFormatter(spanNameFormatter()),
rpc.WithInterceptor(h.queryResponseFilter()),
}
}

// QueryResponseFilter provides custom encoding of HTTP query results.
func (h *Handler) QueryResponseFilter() rpc.GatewayInterceptor {
func (h *Handler) queryResponseFilter() rpc.GatewayInterceptor {
return func(res http.ResponseWriter, req *http.Request) error {
// Filter query requests
if !strings.HasPrefix(req.URL.Path, "/v1/retrieve/") {
Expand All @@ -153,7 +178,7 @@ func (h *Handler) QueryResponseFilter() rpc.GatewayInterceptor {
Method: seg[0],
Subject: seg[1],
}
id, proof, err := h.Retrieve(rr)
id, proof, err := h.Retrieve(req.Context(), rr)
if err != nil {
response, _ = json.MarshalIndent(map[string]string{"error": err.Error()}, "", " ")
} else {
Expand Down Expand Up @@ -186,3 +211,14 @@ func (h *Handler) isSupported(method string) bool {
}
return false
}

// SpanNameFormatter determines how transactions are reported to observability
// services.
func spanNameFormatter() func(r *http.Request) string {
return func(r *http.Request) string {
if strings.HasPrefix(r.URL.Path, "/v1/retrieve") {
return fmt.Sprintf("%s %s", r.Method, "/v1/retrieve/{method}/{subject}")
}
return fmt.Sprintf("%s %s", r.Method, r.URL.Path)
}
}
58 changes: 27 additions & 31 deletions agent/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ import (
protov1 "github.com/aidtechnology/did-method/proto/did/v1"
"go.bryk.io/pkg/otel"
otelcodes "go.opentelemetry.io/otel/codes"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
)
Expand All @@ -20,52 +18,50 @@ type rpcHandler struct {
handler *Handler
}

func getHeaders() metadata.MD {
return metadata.New(map[string]string{
"x-content-type-options": "nosniff",
})
}

func (rh *rpcHandler) Ping(ctx context.Context, _ *emptypb.Empty) (*protov1.PingResponse, error) {
// Track operation
sp := rh.handler.oop.Start(ctx, "Ping", otel.WithSpanKind(otel.SpanKindServer))
defer sp.End()
// Get parent span reference
parent := rh.handler.oop.SpanFromContext(ctx)

if err := grpc.SendHeader(ctx, getHeaders()); err != nil {
sp.SetStatus(otelcodes.Error, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
// Track operation
task := rh.handler.oop.Start(parent.Context(), "Ping", otel.WithSpanKind(otel.SpanKindServer))
defer task.End()
return &protov1.PingResponse{Ok: true}, nil
}

func (rh *rpcHandler) Process(ctx context.Context, req *protov1.ProcessRequest) (*protov1.ProcessResponse, error) {
// Get parent span reference
parent := rh.handler.oop.SpanFromContext(ctx)

// Track operation
sp := rh.handler.oop.Start(ctx, "Process", otel.WithSpanKind(otel.SpanKindServer))
defer sp.End()
task := rh.handler.oop.Start(parent.Context(), "rpc.Process", otel.WithSpanKind(otel.SpanKindServer))
defer task.End()

if err := grpc.SendHeader(ctx, getHeaders()); err != nil {
sp.SetStatus(otelcodes.Error, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
if err := rh.handler.Process(req); err != nil {
sp.SetStatus(otelcodes.Error, err.Error())
// Process request
if err := rh.handler.Process(task.Context(), req); err != nil {
task.SetStatus(otelcodes.Error, err.Error())
return &protov1.ProcessResponse{Ok: false}, status.Error(codes.InvalidArgument, err.Error())
}
return &protov1.ProcessResponse{Ok: true}, nil
}

func (rh *rpcHandler) Query(ctx context.Context, req *protov1.QueryRequest) (*protov1.QueryResponse, error) {
// Get parent span reference
parent := rh.handler.oop.SpanFromContext(ctx)

// Track operation
sp := rh.handler.oop.Start(ctx, "Query", otel.WithSpanKind(otel.SpanKindServer))
defer sp.End()
task := rh.handler.oop.Start(
parent.Context(),
"rpc.Query",
otel.WithSpanKind(otel.SpanKindServer),
otel.WithSpanAttributes(otel.Attributes{
"method": req.Method,
}))
defer task.End()

if err := grpc.SendHeader(ctx, getHeaders()); err != nil {
sp.SetStatus(otelcodes.Error, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
id, proof, err := rh.handler.Retrieve(req)
// Process request
id, proof, err := rh.handler.Retrieve(task.Context(), req)
if err != nil {
sp.SetStatus(otelcodes.Error, err.Error())
task.SetStatus(otelcodes.Error, err.Error())
return nil, status.Error(codes.NotFound, err.Error())
}
doc, _ := json.Marshal(id.Document(true))
Expand Down
Loading

0 comments on commit abab050

Please sign in to comment.