diff --git a/.gitignore b/.gitignore index 283fb37..ca28aa4 100644 --- a/.gitignore +++ b/.gitignore @@ -24,4 +24,9 @@ go.work.sum bin cover.out -cover.html \ No newline at end of file +cover.html + +.output + +.DS_Store +*.log \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 867d1e5..1edfa13 100644 --- a/Dockerfile +++ b/Dockerfile @@ -8,10 +8,10 @@ RUN apt-get update && \ && rm -rf /var/lib/apt/lists/* ARG APP_VERSION=nightly -ARG APP_NAME=p2pmq -ARG BUILD_TARGET=p2pmq +ARG APP_NAME=pmq +ARG BUILD_TARGET=pmq -WORKDIR /p2pmq +WORKDIR /pmq COPY go.mod go.sum ./ RUN go mod download @@ -23,12 +23,14 @@ RUN GOOS=linux CGO_ENABLED=0 go build -tags netgo -a -v -o ./bin/${BUILD_TARGET} FROM alpine:latest as runner -ARG BUILD_TARGET=p2pmq +ARG BUILD_TARGET=pmq RUN apk --no-cache --upgrade add ca-certificates bash -WORKDIR /p2pmq +WORKDIR /pmq -COPY --from=builder /p2pmq/.env* ./ -COPY --from=builder /p2pmq/resources/config/*.p2pmq.yaml ./ -COPY --from=builder /p2pmq/bin/${BUILD_TARGET} ./app +COPY --from=builder /pmq/.env* ./ +COPY --from=builder /pmq/resources/config/*.pmq.yaml ./ +COPY --from=builder /pmq/bin/${BUILD_TARGET} ./app + +CMD ["./app"] \ No newline at end of file diff --git a/Makefile b/Makefile index fa30e0c..688f31b 100644 --- a/Makefile +++ b/Makefile @@ -1,10 +1,14 @@ -APP_NAME?=p2pmq +APP_NAME?=pmq BUILD_TARGET?=${APP_NAME} BUILD_IMG?=${APP_NAME} APP_VERSION?=$(git describe --tags $(git rev-list --tags --max-count=1) 2> /dev/null || echo "nightly") CFG_PATH?=./resources/config/default.p2pmq.yaml TEST_PKG?=./core/... TEST_TIMEOUT?=2m +GOSSIP_OUT_DIR=../.output + +protoc: + ./scripts/proto-gen.sh lint: @docker run --rm -v $(shell pwd):/app -w /app golangci/golangci-lint:v1.54 golangci-lint run -v --timeout=5m ./... @@ -46,3 +50,9 @@ docker-run-default: docker-run-boot: @docker run -d --restart unless-stopped --name "${APP_NAME}" -p "${TCP_PORT}":"${TCP_PORT}" -p "${GRPC_PORT}":"${GRPC_PORT}" -e "GRPC_PORT=${GRPC_PORT}" -it "${BUILD_IMG}" /p2pmq/app -config=./bootstrapper.p2pmq.yaml + +gossip-sim: + @mkdir -p "${GOSSIP_OUT_DIR}" \ + && export GOSSIP_SIMULATION=full \ + && export GOSSIP_OUT_DIR="${GOSSIP_OUT_DIR}" \ + && go test -v -timeout 10m ./core -run TestGossipSimulation \ No newline at end of file diff --git a/README.md b/README.md index 41c5fb8..72620a0 100644 --- a/README.md +++ b/README.md @@ -1,16 +1,24 @@ -# Decentralized Message Engine +# PMQ + +Decentralized messaging engine that facilitates the secure exchange of verifiable messages across networks, enabling the formation of a global, collaborative network.
-**NOTE: This is an experimental work in progress. DO NOT USE** +**WARNING: This is an experimental work in progress, DO NOT USE in production** + +
+ +[![API Reference]( +https://camo.githubusercontent.com/915b7be44ada53c290eb157634330494ebe3e30a/68747470733a2f2f676f646f632e6f72672f6769746875622e636f6d2f676f6c616e672f6764646f3f7374617475732e737667 +)](https://pkg.go.dev/github.com/amirylm/p2pmq?tab=doc) +![Go version](https://img.shields.io/badge/go-1.21-blue.svg) +![Github Actions](https://github.com/amirylm/p2pmq/actions/workflows/lint.yml/badge.svg?branch=main) +![Github Actions](https://github.com/amirylm/p2pmq/actions/workflows/test.yml/badge.svg?branch=main) + +## Documentation -## Overview +You can find documentation in [./resources/docs](./resources/docs). -**DME** is a distributed, permissionless messaging engine for cross oracle communication. +## Usage -A network of agents is capable of the following: -- Broadcast messages over topics with optimal latency -- Pluggable and decoupled message validation using gRPC -- Scoring for protection from bad actors -- Syncing peers with the latest messages to recover from -restarts, network partition, etc. +Usage examples are available in the [examples](./examples) folder. \ No newline at end of file diff --git a/cmd/p2pmq/main.go b/cmd/pmq/main.go similarity index 84% rename from cmd/p2pmq/main.go rename to cmd/pmq/main.go index 48a4a42..1468c9d 100644 --- a/cmd/p2pmq/main.go +++ b/cmd/pmq/main.go @@ -17,7 +17,7 @@ import ( func main() { app := &cli.App{ - Name: "p2pmq", + Name: "pmq", Flags: []cli.Flag{ cli.IntFlag{ Name: "grpc-port", @@ -92,20 +92,6 @@ func main() { ctrl.Start(ctx) defer ctrl.Close() - // <-time.After(time.Second * 10) - - // if cfg.Pubsub != nil { - // if err := ctrl.Subscribe(ctx, "test-1"); err != nil { - // lggr.Errorw("could not subscribe to topic", "topic", "test-1", "err", err) - // } - // for i := 0; i < 10; i++ { - // <-time.After(time.Second * 5) - // if err := ctrl.Publish(ctx, "test-1", []byte(fmt.Sprintf("test-data-%d-%s", i, ctrl.ID()))); err != nil { - // lggr.Errorw("could not subscribe to topic", "topic", "test-1", "err", err) - // } - // } - // } - return grpcapi.ListenGrpc(srv, c.Int("grpc-port")) }, diff --git a/cmd/pqclient/main.go b/cmd/pqclient/main.go deleted file mode 100644 index 7905807..0000000 --- a/cmd/pqclient/main.go +++ /dev/null @@ -1,5 +0,0 @@ -package main - -func main() { - -} diff --git a/commons/config_pubsub.go b/commons/config_pubsub.go index 422b197..f25150b 100644 --- a/commons/config_pubsub.go +++ b/commons/config_pubsub.go @@ -18,7 +18,7 @@ type PubsubConfig struct { Scoring *ScoringParams `json:"scoring,omitempty" yaml:"scoring,omitempty"` MsgValidator *MsgValidationConfig `json:"msgValidator,omitempty" yaml:"msgValidator,omitempty"` MsgIDFnConfig *MsgIDFnConfig `json:"msgIDFn,omitempty" yaml:"msgIDFn,omitempty"` - Trace bool `json:"trace,omitempty" yaml:"trace,omitempty"` + Trace *PubsubTraceConfig `json:"trace,omitempty" yaml:"trace,omitempty"` } func (psc PubsubConfig) GetTopicConfig(name string) (TopicConfig, bool) { @@ -30,6 +30,12 @@ func (psc PubsubConfig) GetTopicConfig(name string) (TopicConfig, bool) { return TopicConfig{}, false } +type PubsubTraceConfig struct { + Skiplist []string `json:"skiplist,omitempty" yaml:"skiplist,omitempty"` + JsonFile string `json:"jsonFile,omitempty" yaml:"jsonFile,omitempty"` + Debug bool `json:"debug,omitempty" yaml:"debug,omitempty"` +} + type MsgIDFnConfig struct { Type string `json:"type,omitempty" yaml:"type,omitempty"` Size int `json:"size,omitempty" yaml:"size,omitempty"` diff --git a/core/ctrl.go b/core/ctrl.go index 0ca959e..832b586 100644 --- a/core/ctrl.go +++ b/core/ctrl.go @@ -3,6 +3,7 @@ package core import ( "context" "fmt" + "sync/atomic" "github.com/amirylm/p2pmq/commons" "github.com/amirylm/p2pmq/commons/utils" @@ -12,6 +13,8 @@ import ( pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/host" + libp2pnetwork "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-libp2p/p2p/discovery/mdns" "github.com/libp2p/go-libp2p/p2p/net/connmgr" @@ -33,9 +36,11 @@ type Controller struct { mdnsSvc mdns.Service pubsub *pubsub.PubSub - topicManager *topicManager - denylist pubsub.Blacklist - subFilter pubsub.SubscriptionFilter + topicManager *topicManager + denylist pubsub.Blacklist + subFilter pubsub.SubscriptionFilter + psTracer *psTracer + pubsubRpcCounter *atomic.Uint64 valRouter MsgRouter[pubsub.ValidationResult] msgRouter MsgRouter[error] @@ -46,15 +51,16 @@ func NewController( cfg commons.Config, msgRouter MsgRouter[error], valRouter MsgRouter[pubsub.ValidationResult], - lggrNS string, + name string, ) (*Controller, error) { d := &Controller{ - threadControl: utils.NewThreadControl(), - lggr: lggr.Named(lggrNS).Named("controller"), - cfg: cfg, - valRouter: valRouter, - msgRouter: msgRouter, - topicManager: newTopicManager(), + threadControl: utils.NewThreadControl(), + lggr: lggr.Named(name).Named("ctrl"), + cfg: cfg, + valRouter: valRouter, + msgRouter: msgRouter, + topicManager: newTopicManager(), + pubsubRpcCounter: new(atomic.Uint64), } err := d.setup(ctx, cfg) @@ -65,6 +71,21 @@ func (c *Controller) ID() string { return c.host.ID().String() } +func (c *Controller) Connect(ctx context.Context, dest *Controller) error { + ai := peer.AddrInfo{ + ID: dest.host.ID(), + Addrs: dest.host.Addrs(), + } + switch c.host.Network().Connectedness(ai.ID) { + case libp2pnetwork.Connected: + return nil + case libp2pnetwork.CannotConnect: + return fmt.Errorf("cannot connect to %s", ai.ID) + default: + } + return c.host.Connect(ctx, ai) +} + func (c *Controller) RefreshRouters(msgHandler func(*MsgWrapper[error]), valHandler func(*MsgWrapper[pubsub.ValidationResult])) { if c.valRouter != nil { c.valRouter.RefreshHandler(valHandler) @@ -78,7 +99,7 @@ func (c *Controller) RefreshRouters(msgHandler func(*MsgWrapper[error]), valHand func (c *Controller) Start(ctx context.Context) { c.StartOnce(func() { - // d.lggr.Debugf("starting controller with host %s", d.host.ID()) + c.lggr.Debugf("starting ctrl") if c.msgRouter != nil { c.threadControl.Go(c.msgRouter.Start) @@ -98,7 +119,7 @@ func (c *Controller) Start(ctx context.Context) { c.connect(b) } if err := c.dht.Bootstrap(ctx); err != nil { - c.lggr.Panicf("failed to start discovery: %w", err) + c.lggr.Panicf("failed to start dht: %w", err) } } if c.mdnsSvc != nil { @@ -111,7 +132,8 @@ func (c *Controller) Start(ctx context.Context) { func (c *Controller) Close() { c.StopOnce(func() { - c.lggr.Debugf("closing controller with host %s", c.host.ID()) + h := c.host.ID() + c.lggr.Debugf("closing controller with host %s", h) c.threadControl.Close() if c.dht != nil { if err := c.dht.Close(); err != nil { @@ -126,6 +148,7 @@ func (c *Controller) Close() { if err := c.host.Close(); err != nil { c.lggr.Errorf("failed to close host: %w", err) } + c.lggr.Debugf("closed controller with host %s", h) }) } @@ -194,7 +217,8 @@ func (c *Controller) setup(ctx context.Context, cfg commons.Config) (err error) return err } c.host = h - c.lggr.Infow("created libp2p host", "peerID", h.ID(), "addrs", h.Addrs()) + c.lggr = c.lggr.With("peerID", h.ID()) + c.lggr.Debugw("created libp2p host", "addrs", h.Addrs()) if len(cfg.MdnsTag) > 0 { c.setupMdnsDiscovery(ctx, h, cfg.MdnsTag) @@ -207,5 +231,7 @@ func (c *Controller) setup(ctx context.Context, cfg commons.Config) (err error) } } + c.lggr.Infow("ctrl setup done", "addrs", h.Addrs()) + return nil } diff --git a/core/dht.go b/core/dht.go index cc3278e..37b414c 100644 --- a/core/dht.go +++ b/core/dht.go @@ -6,13 +6,12 @@ import ( "github.com/amirylm/p2pmq/commons" dht "github.com/libp2p/go-libp2p-kad-dht" - dhtopts "github.com/libp2p/go-libp2p-kad-dht/opts" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/routing" ) -func (c *Controller) dhtRoutingFactory(ctx context.Context, opts ...dhtopts.Option) func(host.Host) (routing.PeerRouting, error) { +func (c *Controller) dhtRoutingFactory(ctx context.Context, opts ...dht.Option) func(host.Host) (routing.PeerRouting, error) { return func(h host.Host) (routing.PeerRouting, error) { dhtInst, err := dht.New(ctx, h, opts...) if err != nil { diff --git a/core/gossipsub_test.go b/core/gossipsub_test.go new file mode 100644 index 0000000..2e02c99 --- /dev/null +++ b/core/gossipsub_test.go @@ -0,0 +1,572 @@ +package core + +import ( + "context" + "encoding/json" + "fmt" + "os" + "strings" + "sync" + "sync/atomic" + "testing" + "text/template" + "time" + + "github.com/amirylm/p2pmq/commons" + "github.com/amirylm/p2pmq/commons/utils" + "github.com/amirylm/p2pmq/core/gossip" + logging "github.com/ipfs/go-log" + pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/stretchr/testify/require" +) + +func TestGossipSimulation(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + require.NoError(t, logging.SetLogLevelRegex("p2pmq", "error")) + + groupsCfgSimple18 := testGroupSimple(18, 10, 6, 2) + groupsCfgSimple36 := testGroupSimple(36, 16, 16, 4) + groupsCfgSimple54 := testGroupSimple(54, 32, 16, 6) + + benchFlows := []flow{ + flowActionA2B(1, time.Millisecond*1, time.Millisecond*250), + flowActionA2B(10, time.Millisecond*1, time.Millisecond*10), + flowActionA2B(100, time.Millisecond*1, time.Millisecond*10), + flowActionA2B(1000, time.Millisecond*10, time.Millisecond*10), + flowTrigger("b", 1, time.Millisecond*10), + flowTrigger("b", 10, time.Millisecond*10), + flowTrigger("b", 100, time.Millisecond*10), + flowTrigger("b", 1000, time.Millisecond*10), + flowTrigger("a", 1, time.Millisecond*10), + flowTrigger("a", 10, time.Millisecond*10), + flowTrigger("a", 100, time.Millisecond*10), + flowTrigger("a", 1000, time.Millisecond*10), + } + + tests := []struct { + name string + n int + pubsubConfig *commons.PubsubConfig + gen *testGen + groupsCfg groupsCfg + conns connectivity + flows []flow + }{ + { + name: "simple_18", + n: 18, + gen: &testGen{ + // hitMaps: map[string]*nodeHitMap{}, + routingFn: func(m *pubsub.Message) {}, + validationFn: func(p peer.ID, m *pubsub.Message) pubsub.ValidationResult { + return pubsub.ValidationAccept + }, + pubsubConfig: &commons.PubsubConfig{ + MsgValidator: &commons.MsgValidationConfig{}, + Trace: &commons.PubsubTraceConfig{}, + Overlay: &commons.OverlayParams{ + D: 3, + Dlow: 2, + Dhi: 5, + Dlazy: 3, + }, + }, + }, + groupsCfg: groupsCfgSimple18, + conns: groupsCfgSimple18.allToAllConnectivity(), + flows: benchFlows[:], + }, + { + name: "simple_36", + n: 36, + gen: &testGen{ + // hitMaps: map[string]*nodeHitMap{}, + routingFn: func(m *pubsub.Message) {}, + validationFn: func(p peer.ID, m *pubsub.Message) pubsub.ValidationResult { + return pubsub.ValidationAccept + }, + pubsubConfig: &commons.PubsubConfig{ + MsgValidator: &commons.MsgValidationConfig{}, + Trace: &commons.PubsubTraceConfig{}, + Overlay: &commons.OverlayParams{ + D: 4, + Dlow: 2, + Dhi: 6, + Dlazy: 3, + }, + }, + }, + groupsCfg: groupsCfgSimple36, + conns: groupsCfgSimple36.allToAllConnectivity(), + flows: benchFlows[:], + }, + { + name: "simple_36_with_default_overlay_params", + n: 36, + gen: &testGen{ + // hitMaps: map[string]*nodeHitMap{}, + routingFn: func(m *pubsub.Message) {}, + validationFn: func(p peer.ID, m *pubsub.Message) pubsub.ValidationResult { + return pubsub.ValidationAccept + }, + pubsubConfig: &commons.PubsubConfig{ + MsgValidator: &commons.MsgValidationConfig{}, + Trace: &commons.PubsubTraceConfig{}, + }, + }, + groupsCfg: groupsCfgSimple36, + conns: groupsCfgSimple36.allToAllConnectivity(), + flows: benchFlows[:], + }, + { + name: "simple_54_with_default_overlay_params", + n: 54, + gen: &testGen{ + // hitMaps: map[string]*nodeHitMap{}, + routingFn: func(m *pubsub.Message) {}, + validationFn: func(p peer.ID, m *pubsub.Message) pubsub.ValidationResult { + return pubsub.ValidationAccept + }, + pubsubConfig: &commons.PubsubConfig{ + MsgValidator: &commons.MsgValidationConfig{}, + Trace: &commons.PubsubTraceConfig{}, + }, + }, + groupsCfg: groupsCfgSimple54, + conns: groupsCfgSimple54.allToAllConnectivity(), + flows: benchFlows[:], + }, + } + + outDir := os.Getenv("GOSSIP_OUT_DIR") + if len(outDir) == 0 { + outDir = t.TempDir() + } + gossipSimulation := os.Getenv("GOSSIP_SIMULATION") + switch gossipSimulation { + case "full": + t.Log("running full simulation") + default: + t.Log("running limited simulation (only for 18 and 36 nodes and flows with less than 100 iterations)") + tests = tests[:3] + for i, tc := range tests { + newFlows := make([]flow, 0, len(tc.flows)) + for _, f := range tc.flows { + if f.iterations < 100 { + newFlows = append(newFlows, f) + } + } + tc.flows = newFlows + tests[i] = tc + } + } + + var outputs []gossipTestOutput + for _, tt := range tests { + tc := tt + t.Run(tc.name, func(t *testing.T) { + tgen := tc.gen + ctrls, _, _, done, err := StartControllers(ctx, tc.n, tgen) + defer done() + require.NoError(t, err) + + t.Logf("%d controllers were started, connecting peers...", tc.n) + + require.NoError(t, tc.conns.connect(ctx, ctrls)) + + <-time.After(time.Second * 2) // TODO: avoid timeout + + t.Log("subscribing to topics...") + groups := make(map[string][]*Controller) + for name, groupCfg := range tc.groupsCfg { + for _, i := range groupCfg.ids { + if i >= tc.n { + continue + } + ctrl := ctrls[i] + groups[name] = append(groups[name], ctrl) + for _, topic := range groupCfg.subs { + require.NoError(t, ctrl.Subscribe(ctx, topic)) + } + for _, topic := range groupCfg.relays { + require.NoError(t, ctrl.Relay(topic)) + } + } + } + + // waiting for nodes to setup subscriptions + <-time.After(time.Second * 4) // TODO: avoid timeout + + // starting fresh trace after subscriptions + startupFaucets := newTraceFaucets() + pubsubRpcCount := new(atomic.Uint64) + for _, ctrl := range ctrls { + startupFaucets.add(ctrl.psTracer.faucets) + ctrl.psTracer.Reset() + pubsubRpcCount.Store(ctrl.pubsubRpcCounter.Swap(0)) + } + // t.Logf("\n [%s] startup in_rpc_count: %d\n; trace faucets: %+v\n", tc.name, startupFaucets, pubsubRpcCount.Load()) + + d := uint(6) + if tgen.pubsubConfig != nil && tgen.pubsubConfig.Overlay != nil { + d = uint(tgen.pubsubConfig.Overlay.D) + } + baseTestOutput := gossipTestOutput{ + Name: tc.name, + N: uint(tc.n), + A: uint(len(groups["a"])), + B: uint(len(groups["b"])), + R: uint(len(groups["relayers"])), + D: uint(d), + } + + startupTestOutput := baseTestOutput + startupTestOutput.Iterations = 1 + startupTestOutput.InboundRPC = pubsubRpcCount.Load() + startupTestOutput.Name = fmt.Sprintf("%s-startup", tc.name) + outputs = append(outputs, startupTestOutput) + + t.Log("starting flows...") + + for _, f := range tc.flows { + flow := f + flowTestName := fmt.Sprintf("%s-x%d", flow.name, flow.iterations) + t.Run(flowTestName, func(t *testing.T) { + threadCtrl := utils.NewThreadControl() + defer threadCtrl.Close() + + start := time.Now() + + for i := 0; i < flow.iterations; i++ { + for _, e := range flow.events { + event := e + group, ok := groups[event.srcGroup] + if !ok || len(group) == 0 { + continue + } + var wg sync.WaitGroup + for _, c := range group { + _i := i + ctrl := c + ctrlName := strings.Replace(ctrl.lggr.Desugar().Name(), ".ctrl", "", 1) + ctrlName = strings.Replace(ctrlName, "p2pmq.", "", 1) + wg.Add(1) + threadCtrl.Go(func(ctx context.Context) { + defer wg.Done() + msg := event.Msg(msgArgs{ + i: _i, + group: event.srcGroup, + ctrl: ctrlName, + flow: flow.name, + }) + require.NoError(t, ctrl.Publish(ctx, event.topic, []byte(msg))) + // msgID := gossip.DefaultMsgIDFn(&pubsub_pb.Message{Data: []byte(msg)}) + // hmap.addSent(msgID) + }) + } + if event.wait { + wg.Wait() + } + if event.interval > 0 { + <-time.After(event.interval) + } + } + <-time.After(flow.interval) + } + + <-time.After(time.Second * 2) // TODO: avoid timeout + + faucets := newTraceFaucets() + var pubsubRpcCount uint64 + for _, ctrl := range ctrls { + nodeFaucets := ctrl.psTracer.faucets + // t.Logf("[%s] trace faucets: %+v", ctrl.lggr.Desugar().Name(), nodeFaucets) + faucets.add(nodeFaucets) + pubsubRpcCount += ctrl.pubsubRpcCounter.Swap(0) + ctrl.psTracer.Reset() + } + testOutput := baseTestOutput + testOutput.Name = flow.name + testOutput.Iterations = uint(flow.iterations) + testOutput.Faucets = msgTraceFaucetsOutput{ + Publish: int(faucets.publish.Load()), + Deliver: int(faucets.deliver.Load()), + Reject: int(faucets.reject.Load()), + DropRPC: int(faucets.dropRPC.Load()), + SendRPC: int(faucets.sendRPC.Load()), + RecvRPC: int(faucets.recvRPC.Load()), + } + testOutput.InboundRPC = pubsubRpcCount + testOutput.TotalTime = time.Since(start) + outputs = append(outputs, testOutput) + t.Logf("output: %+v", testOutput) + }) + } + }) + } + outputsJson, err := json.Marshal(outputs) + require.NoError(t, err) + outputFileName := fmt.Sprintf("%s/test-%d.json", outDir, time.Now().UnixMilli()) + require.NoError(t, os.WriteFile(outputFileName, outputsJson, 0644)) + t.Logf("outputs saved in %s", outputFileName) +} + +type connectivity map[int][]int + +func (connect connectivity) connect(ctx context.Context, ctrls []*Controller) error { + for i, ctrl := range ctrls { + conns := connect[i] + for _, c := range conns { + if i == c { + continue + } + if err := ctrl.Connect(ctx, ctrls[c]); err != nil { + return err + } + } + } + return nil +} + +type msgTraceFaucetsOutput struct { + Publish int `json:"publish,omitempty"` + Deliver int `json:"deliver,omitempty"` + Reject int `json:"reject,omitempty"` + DropRPC int `json:"drop_rpc,omitempty"` + SendRPC int `json:"send_rpc,omitempty"` + RecvRPC int `json:"recv_rpc,omitempty"` +} + +type gossipTestOutput struct { + Name string + N, A, B, R, D uint + Iterations uint + Faucets msgTraceFaucetsOutput + InboundRPC uint64 + TotalTime time.Duration +} + +type groupsCfg map[string]groupCfg + +// baseConnectivity returns a base connectivity map for the groups, +// where each group member is connected to all other members of the group +// and to the relayers. +// func (groups groupsCfg) baseConnectivity() connectivity { +// conns := make(connectivity) +// var relayerIDs []int +// relayers, ok := groups["relayers"] +// if ok { +// relayerIDs = relayers.ids +// } +// for _, cfg := range groups { +// connectIDs := append(cfg.ids, relayerIDs...) +// for _, i := range cfg.ids { +// conns[i] = connectIDs +// } +// } +// return conns +// } + +func (groups groupsCfg) allToAllConnectivity() connectivity { + conns := make(connectivity) + var allIDs []int + for _, cfg := range groups { + allIDs = append(allIDs, cfg.ids...) + } + for _, id := range allIDs { + conns[id] = allIDs + } + return conns +} + +type groupCfg struct { + ids []int + subs []string + relays []string +} + +// testGroupSimple creates a simple test group configuration with n nodes: +// group a: a nodes +// group b: b nodes +// relayers: r nodes +// NOTE: n >= a + b + r must hold +func testGroupSimple(n, a, b, r int) groupsCfg { + ids := make([]int, n) + for i := 0; i < n; i++ { + ids[i] = i + } + return groupsCfg{ + "a": { + ids: ids[:a], + subs: []string{"b.action.res", "b.trigger"}, + }, + "b": { + ids: ids[a : a+b], + subs: []string{"a.trigger", "b.action.req"}, + }, + "relayers": { + ids: ids[a+b : a+b+r], + relays: []string{"a.trigger", "b.action.req", "b.action.res", "b.trigger"}, + }, + } +} + +func flowActionA2B(iterations int, interval, waitAfterReq time.Duration) flow { + return flow{ + name: "action a->b", + events: []flowEvent{ + { + srcGroup: "a", + topic: "b.action.req", + pattern: "dummy-request-{{.group}}-{{.i}}", + interval: waitAfterReq, + wait: true, + }, + { + srcGroup: "b", + topic: "b.action.res", + pattern: "dummy-response-{{.group}}-{{.i}}", + }, + }, + iterations: iterations, + interval: interval, + } +} + +func flowTrigger(src string, iterations int, interval time.Duration) flow { + return flow{ + name: fmt.Sprintf("trigger %s", src), + events: []flowEvent{ + { + srcGroup: src, + topic: fmt.Sprintf("%s.trigger", src), + pattern: "dummy-trigger-{{.group}}-{{.i}}", + }, + }, + iterations: iterations, + interval: interval, + } +} + +type flowEvent struct { + srcGroup string + topic string + pattern string + interval time.Duration + wait bool +} + +type msgArgs struct { + i int + group string + ctrl string + flow string +} + +func (fe flowEvent) Msg(args msgArgs) string { + tmpl, err := template.New("msg").Parse(fe.pattern) + if err != nil { + return "" + } + sb := new(strings.Builder) + if err := tmpl.Execute(sb, map[string]interface{}{ + "i": args.i, + "group": args.group, + "ctrl": args.ctrl, + "flow": args.flow, + }); err != nil { + return "" + } + return sb.String() +} + +type flow struct { + name string + events []flowEvent + iterations int + interval time.Duration +} + +// type nodeHitMap struct { +// lock sync.RWMutex +// valHitMap map[string]uint32 +// msgHitMap map[string]uint32 +// sent, recieved map[string]time.Time +// } + +// func (n *nodeHitMap) validations(topic string) uint32 { +// n.lock.RLock() +// defer n.lock.RUnlock() + +// return n.valHitMap[topic] +// } + +// func (n *nodeHitMap) messages(topic string) uint32 { +// n.lock.RLock() +// defer n.lock.RUnlock() + +// return n.msgHitMap[topic] +// } + +// func (n *nodeHitMap) addValidation(topic string) { +// n.lock.Lock() +// defer n.lock.Unlock() + +// n.valHitMap[topic] += 1 +// } + +// func (n *nodeHitMap) addMessage(topic, msgID string) { +// n.lock.Lock() +// defer n.lock.Unlock() + +// n.msgHitMap[topic] += 1 +// n.recieved[msgID] = time.Now() +// } + +// func (n *nodeHitMap) addSent(msgID string) { +// n.lock.Lock() +// defer n.lock.Unlock() + +// n.sent[msgID] = time.Now() +// } + +type testGen struct { + // hitMaps map[string]*nodeHitMap + routingFn func(*pubsub.Message) + validationFn func(peer.ID, *pubsub.Message) pubsub.ValidationResult + pubsubConfig *commons.PubsubConfig +} + +func (g *testGen) NextConfig(i int) (commons.Config, MsgRouter[error], MsgRouter[pubsub.ValidationResult], string) { + cfg := commons.Config{ + ListenAddrs: []string{ + "/ip4/127.0.0.1/tcp/0", + }, + Pubsub: g.pubsubConfig, + } + + name := fmt.Sprintf("node-%d", i+1) + + // hitMap := &nodeHitMap{ + // valHitMap: make(map[string]uint32), + // msgHitMap: make(map[string]uint32), + // recieved: make(map[string]time.Time), + // sent: make(map[string]time.Time), + // } + // g.hitMaps[name] = hitMap + + msgRouter := NewMsgRouter(1024, 4, func(mw *MsgWrapper[error]) { + // hitMap.addMessage(mw.Msg.GetTopic(), mw.Msg.ID) + g.routingFn(mw.Msg) + }, gossip.DefaultMsgIDFn) + + valRouter := NewMsgRouter(1024, 4, func(mw *MsgWrapper[pubsub.ValidationResult]) { + // hitMap.addValidation(mw.Msg.GetTopic()) + res := g.validationFn(mw.Peer, mw.Msg) + mw.Result = res + }, gossip.DefaultMsgIDFn) + + return cfg, msgRouter, valRouter, name +} diff --git a/core/pubsub.go b/core/pubsub.go index 5815525..2efaf7f 100644 --- a/core/pubsub.go +++ b/core/pubsub.go @@ -9,9 +9,7 @@ import ( "github.com/amirylm/p2pmq/commons" "github.com/amirylm/p2pmq/core/gossip" pubsub "github.com/libp2p/go-libp2p-pubsub" - pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb" "github.com/libp2p/go-libp2p/core/peer" - "go.uber.org/zap" ) var ( @@ -59,8 +57,23 @@ func (c *Controller) setupPubsubRouter(ctx context.Context, cfg commons.Config) opts = append(opts, pubsub.WithSubscriptionFilter(sf)) } - if cfg.Pubsub.Trace { - opts = append(opts, pubsub.WithEventTracer(newPubsubTracer(c.lggr.Named("PubsubTracer")))) + if cfg.Pubsub.Trace != nil { + var jtracer pubsub.EventTracer + if len(cfg.Pubsub.Trace.JsonFile) > 0 { + var err error + jtracer, err = pubsub.NewJSONTracer(cfg.Pubsub.Trace.JsonFile) + if err != nil { + return err + } + } + tracer := newPubsubTracer(c.lggr.Named("PubsubTracer"), cfg.Pubsub.Trace.Debug, cfg.Pubsub.Trace.Skiplist, jtracer) + c.psTracer = tracer.(*psTracer) + opts = append(opts, pubsub.WithEventTracer(tracer)) + // TODO: config? + opts = append(opts, pubsub.WithAppSpecificRpcInspector(func(p peer.ID, rpc *pubsub.RPC) error { + c.pubsubRpcCounter.Add(1) + return nil + })) } ps, err := pubsub.NewGossipSub(ctx, c.host, opts...) @@ -79,7 +92,7 @@ func (c *Controller) Publish(ctx context.Context, topicName string, data []byte) if err != nil { return err } - // d.lggr.Debugw("publishing on topic", "topic", topicName, "data", string(data)) + c.lggr.Debugw("publishing on topic", "topic", topicName, "data", string(data)) return topic.Publish(ctx, data) } @@ -128,6 +141,28 @@ func (c *Controller) Subscribe(ctx context.Context, topicName string) error { return nil } +func (c *Controller) Relay(topicName string) error { + topic, err := c.tryJoin(topicName) + if err != nil { + return err + } + cancel, err := topic.Relay() + if err != nil { + return err + } + c.topicManager.setTopicRelayCancelFn(topicName, cancel) + return nil +} + +func (c *Controller) Unrelay(topicName string) error { + tw := c.topicManager.getTopicWrapper(topicName) + if tw.state.Load() == topicStateUnknown { + return nil // TODO: topic not found? + } + tw.relayCancelFn() + return nil +} + func (c *Controller) listenSubscription(ctx context.Context, sub *pubsub.Subscription) { c.lggr.Debugw("listening on topic", "topic", sub.Topic()) @@ -182,7 +217,7 @@ func (c *Controller) tryJoin(topicName string) (*pubsub.Topic, error) { if cfg.MsgValidator != nil || c.cfg.Pubsub.MsgValidator != nil { msgValConfig := (&commons.MsgValidationConfig{}).Defaults(c.cfg.Pubsub.MsgValidator) - if cfg.MsgValidator != nil { + if cfg.MsgValidator != nil { // specific topic validator config msgValConfig = msgValConfig.Defaults(cfg.MsgValidator) } valOpts := []pubsub.ValidatorOpt{ @@ -237,21 +272,3 @@ func (c *Controller) validateMsg(ctx context.Context, p peer.ID, msg *pubsub.Mes func (c *Controller) inspectPeerScores(map[peer.ID]*pubsub.PeerScoreSnapshot) { // TODO } - -// psTracer helps to trace pubsub events, implements pubsublibp2p.EventTracer -type psTracer struct { - lggr *zap.SugaredLogger -} - -// NewTracer creates an instance of pubsub tracer -func newPubsubTracer(lggr *zap.SugaredLogger) pubsub.EventTracer { - return &psTracer{ - lggr: lggr.Named("PubsubTracer"), - } -} - -// Trace handles events, implementation of pubsub.EventTracer -func (pst *psTracer) Trace(evt *pubsub_pb.TraceEvent) { - eType := evt.GetType().String() - pst.lggr.Debugw("pubsub event", "type", eType) -} diff --git a/core/pubsub_trace.go b/core/pubsub_trace.go new file mode 100644 index 0000000..b165f99 --- /dev/null +++ b/core/pubsub_trace.go @@ -0,0 +1,319 @@ +package core + +import ( + "encoding/hex" + "encoding/json" + "fmt" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + pubsub "github.com/libp2p/go-libp2p-pubsub" + pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb" + "github.com/libp2p/go-libp2p/core/peer" + "go.uber.org/zap" +) + +type traceFaucets struct { + join, leave, publish, deliver, reject, duplicate, addPeer, removePeer, graft, prune, sendRPC, dropRPC, recvRPC *atomic.Uint64 +} + +func newTraceFaucets() traceFaucets { + return traceFaucets{ + join: new(atomic.Uint64), + leave: new(atomic.Uint64), + publish: new(atomic.Uint64), + deliver: new(atomic.Uint64), + reject: new(atomic.Uint64), + duplicate: new(atomic.Uint64), + addPeer: new(atomic.Uint64), + removePeer: new(atomic.Uint64), + graft: new(atomic.Uint64), + prune: new(atomic.Uint64), + sendRPC: new(atomic.Uint64), + dropRPC: new(atomic.Uint64), + recvRPC: new(atomic.Uint64), + } +} + +func (tf *traceFaucets) add(other traceFaucets) { + tf.join.Add(other.join.Load()) + tf.leave.Add(other.leave.Load()) + tf.publish.Add(other.publish.Load()) + tf.deliver.Add(other.deliver.Load()) + tf.reject.Add(other.reject.Load()) + tf.duplicate.Add(other.duplicate.Load()) + tf.addPeer.Add(other.addPeer.Load()) + tf.removePeer.Add(other.removePeer.Load()) + tf.graft.Add(other.graft.Load()) + tf.prune.Add(other.prune.Load()) + tf.sendRPC.Add(other.sendRPC.Load()) + tf.dropRPC.Add(other.dropRPC.Load()) + tf.recvRPC.Add(other.recvRPC.Load()) +} + +func (tf *traceFaucets) set(other traceFaucets) { + tf.join.Store(other.join.Load()) + tf.leave.Store(other.leave.Load()) + tf.publish.Store(other.publish.Load()) + tf.deliver.Store(other.deliver.Load()) + tf.reject.Store(other.reject.Load()) + tf.duplicate.Store(other.duplicate.Load()) + tf.addPeer.Store(other.addPeer.Load()) + tf.removePeer.Store(other.removePeer.Load()) + tf.graft.Store(other.graft.Load()) + tf.prune.Store(other.prune.Load()) + tf.sendRPC.Store(other.sendRPC.Load()) + tf.dropRPC.Store(other.dropRPC.Load()) + tf.recvRPC.Store(other.recvRPC.Load()) +} + +type eventFields map[string]string + +func MarshalTraceEvents(events []eventFields) ([]byte, error) { + return json.Marshal(events) +} + +func UnmarshalTraceEvents(data []byte) ([]eventFields, error) { + var events []eventFields + err := json.Unmarshal(data, &events) + return events, err +} + +// psTracer helps to trace pubsub events, implements pubsublibp2p.EventTracer +type psTracer struct { + lggr *zap.SugaredLogger + subTracer pubsub.EventTracer + skiplist []string + faucets traceFaucets + lock sync.Mutex + events []eventFields + debug bool +} + +// NewTracer creates an instance of pubsub tracer +func newPubsubTracer(lggr *zap.SugaredLogger, debug bool, skiplist []string, subTracer pubsub.EventTracer) pubsub.EventTracer { + return &psTracer{ + lggr: lggr.Named("PubsubTracer"), + subTracer: subTracer, + skiplist: skiplist, + debug: debug, + faucets: newTraceFaucets(), + } +} + +func (pst *psTracer) Reset() { + pst.lock.Lock() + defer pst.lock.Unlock() + + pst.events = nil + pst.faucets.set(newTraceFaucets()) +} + +func (pst *psTracer) Events() []eventFields { + pst.lock.Lock() + defer pst.lock.Unlock() + + return pst.events +} + +func (pst *psTracer) Faucets() traceFaucets { + return pst.faucets +} + +// Trace handles events, implementation of pubsub.EventTracer +func (pst *psTracer) Trace(evt *pubsub_pb.TraceEvent) { + fields := eventFields{} + fields["type"] = evt.GetType().String() + fields["time"] = time.Now().Format(time.RFC3339) + pid, err := peer.IDFromBytes(evt.GetPeerID()) + if err != nil { + fields["peerID"] = "error" + } + fields["peerID"] = pid.String() + eventType := evt.GetType() + switch eventType { + case pubsub_pb.TraceEvent_PUBLISH_MESSAGE: + pst.faucets.publish.Add(1) + msg := evt.GetPublishMessage() + evt.GetPeerID() + fields["msgID"] = hex.EncodeToString(msg.GetMessageID()) + fields["topic"] = msg.GetTopic() + case pubsub_pb.TraceEvent_REJECT_MESSAGE: + pst.faucets.reject.Add(1) + msg := evt.GetRejectMessage() + pid, err := peer.IDFromBytes(msg.GetReceivedFrom()) + if err == nil { + fields["receivedFrom"] = pid.String() + } + fields["msgID"] = hex.EncodeToString(msg.GetMessageID()) + fields["topic"] = msg.GetTopic() + fields["reason"] = msg.GetReason() + case pubsub_pb.TraceEvent_DUPLICATE_MESSAGE: + pst.faucets.duplicate.Add(1) + msg := evt.GetDuplicateMessage() + pid, err := peer.IDFromBytes(msg.GetReceivedFrom()) + if err == nil { + fields["receivedFrom"] = pid.String() + } + fields["msgID"] = hex.EncodeToString(msg.GetMessageID()) + fields["topic"] = msg.GetTopic() + case pubsub_pb.TraceEvent_DELIVER_MESSAGE: + pst.faucets.deliver.Add(1) + msg := evt.GetDeliverMessage() + pid, err := peer.IDFromBytes(msg.GetReceivedFrom()) + if err == nil { + fields["receivedFrom"] = pid.String() + } + fields["msgID"] = hex.EncodeToString(msg.GetMessageID()) + fields["topic"] = msg.GetTopic() + case pubsub_pb.TraceEvent_ADD_PEER: + pst.faucets.addPeer.Add(1) + pid, err := peer.IDFromBytes(evt.GetAddPeer().GetPeerID()) + if err == nil { + fields["targetPeer"] = pid.String() + } + case pubsub_pb.TraceEvent_REMOVE_PEER: + pst.faucets.removePeer.Add(1) + pid, err := peer.IDFromBytes(evt.GetRemovePeer().GetPeerID()) + if err == nil { + fields["targetPeer"] = pid.String() + } + case pubsub_pb.TraceEvent_JOIN: + pst.faucets.join.Add(1) + fields["topic"] = evt.GetJoin().GetTopic() + case pubsub_pb.TraceEvent_LEAVE: + pst.faucets.leave.Add(1) + fields["topic"] = evt.GetLeave().GetTopic() + case pubsub_pb.TraceEvent_GRAFT: + pst.faucets.graft.Add(1) + msg := evt.GetGraft() + pid, err := peer.IDFromBytes(msg.GetPeerID()) + if err == nil { + fields["graftPeer"] = pid.String() + } + fields["topic"] = msg.GetTopic() + case pubsub_pb.TraceEvent_PRUNE: + pst.faucets.prune.Add(1) + msg := evt.GetPrune() + pid, err := peer.IDFromBytes(msg.GetPeerID()) + if err == nil { + fields["prunePeer"] = pid.String() + } + fields["topic"] = msg.GetTopic() + case pubsub_pb.TraceEvent_SEND_RPC: + pst.faucets.sendRPC.Add(1) + msg := evt.GetSendRPC() + pid, err := peer.IDFromBytes(msg.GetSendTo()) + if err == nil { + fields["targetPeer"] = pid.String() + } + if meta := msg.GetMeta(); meta != nil { + if ctrl := meta.Control; ctrl != nil { + fields = appendIHave(fields, ctrl.GetIhave()) + fields = appendIWant(fields, "self", ctrl.GetIwant()) + // ctrl.GetGraft() + // ctrl.GetPrune() + } + var subs []string + for _, sub := range meta.Subscription { + subs = append(subs, sub.GetTopic()) + } + fields["subs"] = strings.Join(subs, ",") + } + case pubsub_pb.TraceEvent_DROP_RPC: + pst.faucets.dropRPC.Add(1) + msg := evt.GetDropRPC() + pid, err := peer.IDFromBytes(msg.GetSendTo()) + if err == nil { + fields["targetPeer"] = pid.String() + } + case pubsub_pb.TraceEvent_RECV_RPC: + pst.faucets.recvRPC.Add(1) + msg := evt.GetRecvRPC() + pid, err := peer.IDFromBytes(msg.GetReceivedFrom()) + if err == nil { + fields["receivedFrom"] = pid.String() + } + if meta := msg.GetMeta(); meta != nil { + if ctrl := meta.Control; ctrl != nil { + fields = appendIHave(fields, ctrl.GetIhave()) + fields = appendIWant(fields, pid.String(), ctrl.GetIwant()) + } + var subs []string + for _, sub := range meta.Subscription { + subs = append(subs, sub.GetTopic()) + } + fields["subs"] = strings.Join(subs, ",") + } + default: + } + + if pst.shouldSkip(eventType.String()) { + return + } + + pst.debugEvent(fields) + pst.storeEvent(fields) + + if pst.subTracer != nil { + pst.subTracer.Trace(evt) + } +} + +func (pst *psTracer) debugEvent(fields eventFields) { + if pst.debug { + pst.lggr.Debugf("pubsub trace event: %+v", fields) + } +} + +func (pst *psTracer) storeEvent(fields eventFields) { + pst.lock.Lock() + defer pst.lock.Unlock() + + pst.events = append(pst.events, fields) +} + +func (pst *psTracer) shouldSkip(eventType string) bool { + pst.lock.Lock() + defer pst.lock.Unlock() + + for _, skip := range pst.skiplist { + if eventType == skip { + return true + } + } + return false +} + +func appendIHave(fields map[string]string, ihave []*pubsub_pb.TraceEvent_ControlIHaveMeta) map[string]string { + if len(ihave) > 0 { + fields["ihaveCount"] = strconv.Itoa(len(ihave)) + for _, im := range ihave { + var mids []string + msgids := im.GetMessageIDs() + for _, mid := range msgids { + mids = append(mids, hex.EncodeToString(mid)) + } + fields[fmt.Sprintf("%s-IHAVEmsgIDs", im.GetTopic())] = strings.Join(mids, ",") + } + } + return fields +} + +func appendIWant(fields map[string]string, peer string, iwant []*pubsub_pb.TraceEvent_ControlIWantMeta) map[string]string { + if len(iwant) > 0 { + fields["iwantCount"] = strconv.Itoa(len(iwant)) + var mids []string + for _, im := range iwant { + msgids := im.GetMessageIDs() + for _, mid := range msgids { + mids = append(mids, hex.EncodeToString(mid)) + } + } + fields[fmt.Sprintf("%s-IWANTmsgIDs", peer)] = strings.Join(mids, ",") + } + return fields +} diff --git a/core/testutils.go b/core/testutils.go index df48819..a4d810e 100644 --- a/core/testutils.go +++ b/core/testutils.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "math/rand" - "sync/atomic" "testing" "time" @@ -37,59 +36,49 @@ func SetupTestControllers(ctx context.Context, t *testing.T, n int, routingFn fu <-time.After(time.Second * 2) - hitMap := map[string]*atomic.Int32{} - for i := 0; i < n; i++ { - hitMap[fmt.Sprintf("test-%d", i+1)] = &atomic.Int32{} - } - - controllers := make([]*Controller, n) - msgRouters := make([]MsgRouter[error], n) - valRouters := make([]MsgRouter[pubsub.ValidationResult], n) - for i := 0; i < n; i++ { - cfg := commons.Config{ - ListenAddrs: []string{ - "/ip4/127.0.0.1/tcp/0", - }, - // MdnsTag: "p2pmq/mdns/test", - Discovery: &commons.DiscoveryConfig{ - Mode: commons.ModeServer, - ProtocolPrefix: "p2pmq/kad/test", - Bootstrappers: []string{ - fmt.Sprintf("%s/p2p/%s", bootAddr, boot.host.ID()), - }, - }, - Pubsub: &commons.PubsubConfig{ - MsgValidator: &commons.MsgValidationConfig{}, - }, - } - msgRouter := NewMsgRouter(1024, 4, func(mw *MsgWrapper[error]) { - routingFn(mw.Msg) - }, gossip.DefaultMsgIDFn) - valRouter := NewMsgRouter(1024, 4, func(mw *MsgWrapper[pubsub.ValidationResult]) { - res := valFn(mw.Peer, mw.Msg) - mw.Result = res - }, gossip.DefaultMsgIDFn) - c, err := NewController(ctx, cfg, msgRouter, valRouter, fmt.Sprintf("peer-%d", i+1)) - require.NoError(t, err) - controllers[i] = c - msgRouters[i] = msgRouter - valRouters[i] = valRouter - t.Logf("created controller %d: %s", i+1, c.host.ID()) + gen := &DefaultGenerator{ + BootAddr: fmt.Sprintf("%s/p2p/%s", bootAddr, boot.host.ID()), + RoutingFn: routingFn, + ValidationFn: valFn, } + controllers, msgRouters, valRouters, done, err := StartControllers(ctx, n, gen) + require.NoError(t, err) - for i, c := range controllers { - c.Start(ctx) - t.Logf("started controller %d: %s", i+1, c.host.ID()) - } + t.Logf("created %d controllers", n) waitControllersConnected(n) return controllers, msgRouters, valRouters, func() { - go boot.Close() // closing bootstrapper in the background + done() + boot.Close() + } +} + +func StartControllers(ctx context.Context, n int, gen Generator) ([]*Controller, []MsgRouter[error], []MsgRouter[pubsub.ValidationResult], func(), error) { + controllers := make([]*Controller, 0, n) + msgRouters := make([]MsgRouter[error], 0, n) + valRouters := make([]MsgRouter[pubsub.ValidationResult], 0, n) + done := func() { for _, c := range controllers { c.Close() } } + for i := 0; i < n; i++ { + cfg, msgRouter, valRouter, name := gen.NextConfig(i) + c, err := NewController(ctx, cfg, msgRouter, valRouter, name) + if err != nil { + return controllers, msgRouters, valRouters, done, err + } + controllers = append(controllers, c) + msgRouters = append(msgRouters, msgRouter) + valRouters = append(valRouters, valRouter) + } + + for _, c := range controllers { + c.Start(ctx) + } + + return controllers, msgRouters, valRouters, done, nil } func waitControllersConnected(n int, controllers ...*Controller) { @@ -108,3 +97,44 @@ func waitControllersConnected(n int, controllers ...*Controller) { } } } + +type Generator interface { + NextConfig(i int) (commons.Config, MsgRouter[error], MsgRouter[pubsub.ValidationResult], string) +} + +type DefaultGenerator struct { + BootAddr string + RoutingFn func(*pubsub.Message) + ValidationFn func(peer.ID, *pubsub.Message) pubsub.ValidationResult +} + +func (g *DefaultGenerator) NextConfig(i int) (commons.Config, MsgRouter[error], MsgRouter[pubsub.ValidationResult], string) { + cfg := commons.Config{ + ListenAddrs: []string{ + "/ip4/127.0.0.1/tcp/0", + }, + MdnsTag: "p2pmq/mdns/test", + // Discovery: &commons.DiscoveryConfig{ + // Mode: commons.ModeServer, + // ProtocolPrefix: "p2pmq/kad/test", + // Bootstrappers: []string{ + // g.BootAddr, + // // fmt.Sprintf("%s/p2p/%s", g.BootAddr, boot.host.ID()), + // }, + // }, + Pubsub: &commons.PubsubConfig{ + MsgValidator: &commons.MsgValidationConfig{}, + }, + } + + msgRouter := NewMsgRouter(1024, 4, func(mw *MsgWrapper[error]) { + g.RoutingFn(mw.Msg) + }, gossip.DefaultMsgIDFn) + + valRouter := NewMsgRouter(1024, 4, func(mw *MsgWrapper[pubsub.ValidationResult]) { + res := g.ValidationFn(mw.Peer, mw.Msg) + mw.Result = res + }, gossip.DefaultMsgIDFn) + + return cfg, msgRouter, valRouter, fmt.Sprintf("node-%d", i+1) +} diff --git a/core/topic.go b/core/topic.go index 33a812e..4e559e2 100644 --- a/core/topic.go +++ b/core/topic.go @@ -19,9 +19,10 @@ func newTopicManager() *topicManager { } type topicWrapper struct { - state atomic.Int32 - topic *pubsub.Topic - sub *pubsub.Subscription + state atomic.Int32 + topic *pubsub.Topic + sub *pubsub.Subscription + relayCancelFn pubsub.RelayCancelFunc } const ( @@ -57,6 +58,20 @@ func (tm *topicManager) upgradeTopic(name string, topic *pubsub.Topic) bool { return true } +func (tm *topicManager) setTopicRelayCancelFn(name string, fn pubsub.RelayCancelFunc) bool { + tm.lock.Lock() + defer tm.lock.Unlock() + + tw, ok := tm.topics[name] + if !ok { + return false + } + tw.relayCancelFn = fn + tm.topics[name] = tw + + return true +} + func (tm *topicManager) getTopicWrapper(topic string) *topicWrapper { tm.lock.RLock() defer tm.lock.RUnlock() diff --git a/examples/README.md b/examples/README.md new file mode 100644 index 0000000..822e312 --- /dev/null +++ b/examples/README.md @@ -0,0 +1,8 @@ +# Decentralized Message Engine: Examples + +This folder contains examples of how the DME can be used to facilitate cross network communication. + +The following examples are available: + +- [OCR based cryptography](./don) (Chainlink Oracles) +- [BLS based networks](./bls) \ No newline at end of file diff --git a/examples/bls/README.md b/examples/bls/README.md index 4b918c1..4c1561c 100644 --- a/examples/bls/README.md +++ b/examples/bls/README.md @@ -1,10 +1,9 @@ # BLS example -This package shows a high level integration with BLS based networks. +This package shows a high level integration with BLS based networks, using [herumi/bls-eth-go-binary](https://github.com/herumi/bls-eth-go-binary) package for BLS cryptography. ## Links -- [github.com/herumi/bls-eth-go-binary](https://github.com/herumi/bls-eth-go-binary) is the library used for BLS signatures. - [BLS Multi-Signatures With Public-Key Aggregation](https://crypto.stanford.edu/~dabo/pubs/papers/BLSmultisig.html) - [BLS Signatures Part 1 — Overview](https://alonmuroch-65570.medium.com/bls-signatures-part-1-overview-47d9eebf1c75) - [BLS signatures in Solidity](https://ethresear.ch/t/bls-signatures-in-solidity/7919) diff --git a/examples/bls/bls_test.go b/examples/bls/bls_test.go index b9495ce..7f43a93 100644 --- a/examples/bls/bls_test.go +++ b/examples/bls/bls_test.go @@ -232,26 +232,27 @@ func triggerReports(pctx context.Context, t *testing.T, net string, interval tim if !ok { continue } - if node.getLeader(net, nextSeq) == share.SignerID { - node.threadC.Go(func(ctx context.Context) { - report := &SignedReport{ - Network: net, - SeqNumber: nextSeq, - Data: []byte(fmt.Sprintf("report for %s, seq %d", net, nextSeq)), - } - share.Sign(report) - if pctx.Err() != nil || ctx.Err() != nil { // ctx might be canceled by the time we get here + if node.getLeader(net, nextSeq) != share.SignerID { + continue + } + node.threadC.Go(func(ctx context.Context) { + report := &SignedReport{ + Network: net, + SeqNumber: nextSeq, + Data: []byte(fmt.Sprintf("report for %s, seq %d", net, nextSeq)), + } + share.Sign(report) + if pctx.Err() != nil || ctx.Err() != nil { // ctx might be canceled by the time we get here + return + } + if err := node.Broadcast(ctx, *report); ctx.Err() == nil && pctx.Err() == nil { + if err != nil && strings.Contains(err.Error(), "context canceled") { return } - if err := node.Broadcast(ctx, *report); ctx.Err() == nil && pctx.Err() == nil { - if err != nil && strings.Contains(err.Error(), "context canceled") { - return - } - require.NoError(t, err) - reports.Add(net, *report) - } - }) - } + require.NoError(t, err) + reports.Add(net, *report) + } + }) } } } diff --git a/examples/bls/node_internalnet.go b/examples/bls/node_internalnet.go index 8b7e066..a449cd8 100644 --- a/examples/bls/node_internalnet.go +++ b/examples/bls/node_internalnet.go @@ -125,7 +125,7 @@ func (n *Node) getLeader(net string, seq uint64) uint64 { if !ok { return 0 } - return (seq % uint64(len(share.Signers))) + 1 + return RoundRobinLeader(seq, share.Signers) } // isProcessable ensures that we sign once and only leaders can trigger a new sequence diff --git a/examples/bls/share.go b/examples/bls/share.go index 8738c16..d235cbf 100644 --- a/examples/bls/share.go +++ b/examples/bls/share.go @@ -38,6 +38,12 @@ func (share *Share) QuorumCount() int { return Threshold(len(share.Signers)) } +type LeaderSelector func(seq uint64, signers map[uint64]*bls.PublicKey) uint64 + +func RoundRobinLeader(seq uint64, signers map[uint64]*bls.PublicKey) uint64 { + return (seq % uint64(len(signers))) + 1 +} + func Threshold(count int) int { f := (count - 1) / 3 diff --git a/resources/docs/APPENDIX_LIBP2P.md b/resources/docs/APPENDIX_LIBP2P.md new file mode 100644 index 0000000..01015d4 --- /dev/null +++ b/resources/docs/APPENDIX_LIBP2P.md @@ -0,0 +1,66 @@ +# Appendix: Libp2p + +## Links + +- [Libp2p specs](https://github.com/libp2p/specs) +- [Gossipsub v1.1 spec](https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md) +- [Gossipsub v1.1 Evaluation Report](https://gateway.ipfs.io/ipfs/QmRAFP5DBnvNjdYSbWhEhVRJJDFCLpPyvew5GwCCB4VxM4) + +## Overview + +Libp2p is a modular networking framework designed for peer-to-peer communication in decentralized systems. It provides a foundation for building decentralized applications and systems by offering a range of essential components. + +Libp2p was chosen because it provides a battle tested, complete yet extensible networking framework for a distributed message engine. + +## Libp2p Protocols + +The following libp2p protocols are utilized by the DME: + +| Name | Description | Links | +| --- | --- | --- | +| Gossipsub (v1.1) | Pubsub messaging | https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md | +| Kad DHT | Distributed hash table for discovery | https://github.com/libp2p/specs/tree/master/kad-dht | +| Noise | Protocol for securing transport | https://github.com/libp2p/specs/blob/master/noise/README.md | +| Yamux | Protocol for multiplexing transport | https://github.com/libp2p/specs/blob/master/yamux/README.md | + +## Security + +Libp2p provides strong cryptographic support, including secure key management and encryption. It offers options for transport-level encryption and authentication, ensuring the confidentiality and integrity of data exchanged between peers. + +Secure channels in libp2p are established with the help of a transport upgrader, which providers layers of security and stream multiplexing over "raw" connections such as TCP sockets. + +## Kad DHT + +[KadDHT](https://github.com/libp2p/specs/tree/master/kad-dht) is used for peer discovery. It requires to deploy a set of bootstrapper nodes, which are used by new peers to join the network. + +**NOTE:** bootstrappers should managed by multiple parties for decentralization. + +## Pubsub + +Libp2p's pubsub system is designed to be extensible by more specialized routers and provides an optimized environment for a distributed protocol that runs over a trustless p2p network. + +### Key Concepts of Gossiping + +Gossipsub v1.1 is a dynamic and efficient message propagation protocol, it is based on randomized topic meshes and gossip, with moderate amplification factors and good scaling properties. + +* Gossipsub introduces a **heartbeat** mechanism that defines a regular interval at which peers exchange heartbeat messages with their mesh peers + * Heartbeats help maintain the freshness of the mesh and enable the discovery of new topics or subscriptions + * During heartbeats, peers can also inform each other about their latest state, including the topics they are interested in +* Gossipsub orchestrates dynamic, real-time subscription/peering management while outsourcing peer discovery for flexibility +* Gossipsub facilitates concurrent & configurable validation system that allows to outsource validation using [extended validators](https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#extended-validators) +* Gossipsub incorporates a **peer scoring** mechanism to evaluate the behavior and trustworthiness of network peers. Peers are assigned scores based on various factors, including their message forwarding reliability, responsiveness, and adherence to protocol rules + * Low-scoring peers may be disconnected from the network to maintain its health and reliability +* Gossiping enables to have a loosely connected network of nodes, while all-2-all requires fully connected network + * helps to scale and overcome bad network conditions where we don't have connectivity among all participants +* Gossipsub is using a msg_id concept which is useful for de-duplication, to ensure each message is processed only once + +### How Gossipsub Works + +- **Message Exchange:** When a node wishes to publish a message to a topic, it first sends the message to its mesh peers within the corresponding fanout group. Mesh peers validate the message and, if valid, forward it to their mesh peers and so on. Messages propagate through the mesh until they reach all mesh peers within the group. +- **Heartbeat:** Each peer runs a periodic stabilization process called the "heartbeat procedure" +at regular intervals (1s is the default). The heartbeat serves three functions: +1. [mesh maintenance](https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.0.md#mesh-maintenance) to keep mesh fresh and discover new subscriptions +2. [fanout maintenance](https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.0.md#fanout-maintenance) to efficiently adjust fanout groups based on changing interests +3. [gossip emission](https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.0.md#gossip-emission) to a random selection of peers for each topic (that are not already members of the topic mesh) +- **Piggybacking:** Gossipsub employs [piggybacking](https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.0.md#control-message-piggybacking) - a technique where acknowledgment (ACK) messages carry new messages. When a peer sends an ACK for a message, it may include new messages it has received. This optimizes bandwidth usage by combining acknowledgment and message propagation in a single step. + diff --git a/resources/docs/README.md b/resources/docs/README.md new file mode 100644 index 0000000..180d974 --- /dev/null +++ b/resources/docs/README.md @@ -0,0 +1,136 @@ +# PMQ + +
+ +**WARNING: This is an experimental work in progress, DO NOT USE in production** + +
+ +This document describes a solution for cross networks communication, including oracle, offchain computation or blockchain networks. + +## Links + +- [Threat Analysis](./THREAT_ANALYSIS.md) +- [Appendix: Libp2p](./APPENDIX_LIBP2P.md) +- Examples: + - [OCR based cryptography](https://github.com/amirylm/p2pmq/tree/main/examples/don) (Chainlink Oracles) + - [BLS based networks](https://github.com/amirylm/p2pmq/tree/main/examples/bls) + +## Table of Contents + +- [Overview](#overview) + - [Goals](#goals) + - [Background: Libp2p](#background-libp2p) +- [High Level Design](#high-level-design) + - [Technical Overview](#technical-overview) + - [Architecture](#architecture) + - [API](#api) + - [Network Topology](#network-topology) + - [Message Validation](#message-validation) + +## Overview + +By introducing a decentralized messaging engine that facilitates the secure exchange of verifiable messages across networks, we enable the formation of a global, collaborative network that consists of multiple overlay networks. + +The resulting protocol leverages libp2p and gossipsub v1.1 in order to provide robust networking and message propagation while ensuring the integrity and authenticity of transmitted data by outsourcing the process of cryptographic and sequence validation. + +The following diagram visualizes the topology of a such a global network, +where the dashed lines represent overlay networks and the solid lines represent the underlying/standalone networks: + +![overlays-topology.png](./overlays-topology.png) + +### Goals + +- Enable secure communication layer across networks +- Enable exchange of messages while facilitating validation and authentication via an outsourced verification mechanism +- Provie efficient and reliable network communication and message propagation by utilizing proven protocols such as gossipsub +- Provide a flexible & extensible API that serves an array of use cases, while maintaining a simple and robust protocol that can withstand scaling and high throughput + +### Background: Libp2p + +Libp2p is a modular networking framework designed for peer-to-peer communication in decentralized systems. It provides a foundation for building decentralized applications and systems by offering a range of essential components. Among its core features are [pubsub](./APPENDIX_LIBP2P.md#pubsub), [peer discovery](./APPENDIX_LIBP2P.md#kad-dht), abstracted transport layer and a complete [cryptography suite](./APPENDIX_LIBP2P.md#security). + +[gossipsub v1.1](https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md) is a dynamic and efficient message propagation protocol, it is based on randomized topic meshes and gossip, with moderate amplification factors and good scaling properties. + +Gossipsub is designed to be extensible by more specialized routers and provides an optimized environment for a distributed protocol that runs over a trust-less p2p network. + +Libp2p was chosen because it provides a battle tested, complete yet extensible networking framework for a distributed message engine. + +**NOTE:** For more information see the [libp2p appendix](./APPENDIX_LIBP2P.md). + +## High Level Design + +### Technical Overview + +Agents runs within some parent nodes, and enables the node to gossip messages in an overlay network (topic), while using an outsourced message validation to avoid introducing additional dependencies for the agent such as requiring validation keys. + +Sending verified-only information enables to achieve optimal latency and throughput due to having no additional signing involved. Additionally, consensus is not needed for sharing a sequential, signed-by-quorum data, which is usually stored on-chain or some public storage. + +Message validation is decoupled from the agents, which queue messages for validation and processing, thus enabling the implementation of any specifically tailored, custom validation logic according to each network requirements. + +Peer scoring is facilitated by the pubsub router according to validation results, messaging rate and overall behaviour, however it is still required that network specific setting fit the topology, expected message rate and the strictness of the validation requirements. + +### Architecture + +Agents are separate processes running within some parent node, interaction is done via a gRPC API, following [go-plugin](https://github.com/hashicorp/go-plugin) in order to achieve modularity. + +The following diagram illustrates the system architecture: + +![arch-node.png](./arch-node.png) + +Agents manages libp2p components such as the gossipsub router, which is responsible for message propagation and peer scoring. Additionally, the following services run within the agent: + +- **validation router** for message validation and processing +- **message router** for consuming messages +- **control service** for managing subscriptions, publishing messages and more. + +The node implements: +- verifiers for validating messages +- processors for processing incoming messages from other networks, and broadcasting messages on behalf of the node to other networks. + +### API + +The following gRPC services are used by the clients from within the parent node, for interacting with the agent: + +```protobuf +service ControlService { + rpc Publish(PublishRequest) returns (PublishResponse); + rpc Subscribe(SubscribeRequest) returns (SubscribeResponse); + rpc Unsubscribe(UnsubscribeRequest) returns (UnsubscribeResponse); +} + +service MsgRouter { + rpc Listen(ListenRequest) returns (stream Message) {} +} + +service ValidationRouter { + rpc Handle(stream Message) returns (stream ValidatedMessage) {} +} +``` + +### Network Topology + +Each network creates an overlay network for outbound traffic (i.e. pubsub topic), where other networks can subscribe for messages. + +There might be multiple overlay networks for broadcasting messages from the same network, depending on versioning and business logic. +For instance, in case the encoding of the messages has changed, a new overlay network will be created for the new version and the old overlay network will be neglected until it is no longer needed or used. + +The amount of overlay connections to nodes in other networks might be changed dynamically, depending on the network topology and the amount of nodes in each network. Gossipsub allows to configure the amount of peers per topic while facilitating decent propagation of messages. This property enables the scale of the global network to a large quantity of nodes w/o flooding the wires and consuming too much resources. + +### Message Validation + +Due to the validation being outsourced to the parent node, we rely on the security properties of an existing infrastructure for the processes of signing and verifying messages. + +**NOTE:** Having validation within the agent introduces significant complexity, dependencies and aaditional vulnerabilities. + +All the messages are propagated through the pipes must be valid. +In case of invalid messages, the message will be dropped and the sender, regardless of the message origin, will be penalized by the gossipsub router. + +E.g. for an oracles (or other offchain computing) network, messages must comply with the following strict rules: + +- The message was **signed by a quorum** of a standalone-network nodes +- The message has a **sequence number** that match the current order, taking into account potential gaps that might be created due to bad network conditions. + - Messages that are older than **Validity Threshold** are considered invalid and will result in a low score for the sender, which can be either the origin peer or a gossiper peer + - Messages that are older than **Skip Threshold** will be ignored w/o affecting sender scores + +**NOTE:** validity and skip thresholds should be set within the parent node, according to the network topology and expected message rate. diff --git a/resources/docs/THREAT_ANALYSIS.md b/resources/docs/THREAT_ANALYSIS.md new file mode 100644 index 0000000..738bcf3 --- /dev/null +++ b/resources/docs/THREAT_ANALYSIS.md @@ -0,0 +1,115 @@ +# Threat Analysis + +In order to create a robust and secure messaging solution for cross network interoperability, the following threats were considered: + +### 1. Message Spamming + +Attackers flood the network with invalid or malicious messages, consuming bandwidth and degrading network performance. + +**Severity:** Very high \ +**Impact:** Network congestion, performance degradation, resource exhaustion. + +**Mitigation:** + +- Require message validation with cryptographic signatures to ensure message authenticity and integrity +- Require message sequence validation, where unrealistic sequences are considered invalid. Use caching, where the key is based on content hashing, so the same message won't exhaust resources + +### 2. Message Spamming: Validation Queue Flooding + +Attackers can overload the validation queue by sending spam messages at a very high rate. Legitimate messages get dropped, resulting in a denial of service as messages are ignored. + +**Severity:** Very high \ +**Impact:** Denial of service, message loss. + +**Mitigation:** + +Implement a circuit breaker before the validation queue that makes informed decisions based on message origin IP and a probabilistic strategy to drop messages. See [gossipsub v1.1: validation-queue-protection](https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/red.md#gossipsub-v11-functional-extension-for-validation-queue-protection). + + +### 3. Censorship Attacks + +Malicious nodes selectively block messages to suppress certain information. Countermeasures include redundancy and diversification of message propagation paths. + +**Severity:** Very high \ +**Impact:** Information suppression, network manipulation. + +**Mitigation:** + +- Maintain a diverse set of mesh peers to maintain network resilience +- Use redundancy in message propagation paths to counter censorship attacks +- Employ mechanisms to detect and mitigate Sybil nodes, such as peer scoring and validation + + +### 4. Denial of Service (DoS) + +Adversaries flood the network with malicious traffic or connections to disrupt its operation. + +**Severity:** High \ +**Impact:** Network disruption, resource exhaustion. + +**Mitigation:** + +Implement rate limiting, connection policies, and adaptive firewall mechanisms to protect against DoS attacks. + + +### 5. Partition Attacks + +Adversaries attempt to partition the network by disrupting communication between mesh peers. + +**Severity:** High \ +**Impact:** Network fragmentation, reduced communication. + +**Mitigation:** + +Ensuring a diverse set of well-behaved mesh peers can help prevent this. Implement a robust peer scoring system to detect and disconnect poorly performing or malicious peers. See [gossipsub v1.1: peer scoring](https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#peer-scoring). + + +### 6. Sybil Attacks + +Attackers create multiple fake identities (Sybil nodes) to manipulate the network. + +**Severity:** Medium \ +**Impact:** Potential network manipulation, disruption. + +**Mitigation:** + +Relies on the effectiveness of peer discovery mechanisms, including whitelisting which in controlled by the parent node, which should have access to that information. + + +### 7. Eclipse Attacks + +Malicious nodes attempt to control a target node's connections, isolating it from the legitimate peers. + +**Severity:** Medium \ +**Impact:** Network isolation, potential data manipulation. + +**Mitigation:** + +Ensure diverse connectivity by utilizing peer discovery methods and continuously change connected peers. + + +### 8. DHT Pollution: connections + +Malicious nodes flood the DHT with malicious entries as part of an eclipse attack. + +**Severity:** Medium \ +**Impact:** Network isolation. + +**Mitigation:** + +- Implement DHT security mechanisms to prevent unauthorized writes and ensure data validity +- Regularly check the integrity of DHT data and remove or quarantine polluted entries + + +### 9. DHT Pollution: storage + +Malicious nodes flood the DHT with irrelevant data, potentially disrupting the network's ability to perform efficient content retrieval. + +**Severity:** Medium \ +**Impact:** Degraded performance in content retrieval, network congestion, resource exhaustion. + +**Mitigation:** + +- Implement DHT security mechanisms to prevent unauthorized writes and ensure data validity +- Regularly check the integrity of DHT data and remove or quarantine polluted entries +- Implement rate limiting and access controls for DHT writes to mitigate pollution attempts diff --git a/resources/docs/arch-node.png b/resources/docs/arch-node.png new file mode 100644 index 0000000..a0c3d1e Binary files /dev/null and b/resources/docs/arch-node.png differ diff --git a/resources/docs/overlays-topology.png b/resources/docs/overlays-topology.png new file mode 100644 index 0000000..70bd917 Binary files /dev/null and b/resources/docs/overlays-topology.png differ diff --git a/resources/img/clnode-pmq.png b/resources/img/clnode-pmq.png deleted file mode 100644 index 29f369c..0000000 Binary files a/resources/img/clnode-pmq.png and /dev/null differ diff --git a/resources/img/composer-p2pmq.png b/resources/img/composer-p2pmq.png deleted file mode 100644 index 7699bdc..0000000 Binary files a/resources/img/composer-p2pmq.png and /dev/null differ diff --git a/scripts/proto-gen.sh b/scripts/proto-gen.sh index 133871d..f634567 100755 --- a/scripts/proto-gen.sh +++ b/scripts/proto-gen.sh @@ -1,7 +1,4 @@ #!/bin/bash protoc --go_out=. --go_opt=paths=source_relative \ - --go-grpc_out=. --go-grpc_opt=paths=source_relative ./proto/*.proto - -protoc --go_out=. --go_opt=paths=source_relative \ - --go-grpc_out=. --go-grpc_opt=paths=source_relative ./proto/**/*.proto \ No newline at end of file + --go-grpc_out=. --go-grpc_opt=paths=source_relative ./**/*.proto