From e399ea70a41ec53e7f828dff77b95e18851d234a Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Wed, 21 Feb 2024 11:16:56 +0530 Subject: [PATCH 1/2] chore: remove REST API --- cmd/waku/flags.go | 41 -- cmd/waku/main.go | 6 - cmd/waku/node.go | 21 -- cmd/waku/options.go | 11 - cmd/waku/server/no_rln.go | 13 - cmd/waku/server/rest/admin.go | 135 ------- cmd/waku/server/rest/admin_api.yaml | 92 ----- cmd/waku/server/rest/debug.go | 52 --- cmd/waku/server/rest/debug_api.yaml | 43 --- cmd/waku/server/rest/debug_test.go | 33 -- cmd/waku/server/rest/filter.go | 380 ------------------- cmd/waku/server/rest/filter_api.yaml | 337 ----------------- cmd/waku/server/rest/filter_cache.go | 84 ----- cmd/waku/server/rest/filter_test.go | 392 -------------------- cmd/waku/server/rest/health.go | 52 --- cmd/waku/server/rest/health_api.yaml | 41 -- cmd/waku/server/rest/lightpush_api.yaml | 84 ----- cmd/waku/server/rest/lightpush_rest.go | 80 ---- cmd/waku/server/rest/lightpush_rest_test.go | 61 --- cmd/waku/server/rest/message.go | 46 --- cmd/waku/server/rest/relay.go | 316 ---------------- cmd/waku/server/rest/relay_api.yaml | 245 ------------ cmd/waku/server/rest/relay_test.go | 296 --------------- cmd/waku/server/rest/runner.go | 48 --- cmd/waku/server/rest/store.go | 206 ---------- cmd/waku/server/rest/store_api.yaml | 203 ---------- cmd/waku/server/rest/store_test.go | 97 ----- cmd/waku/server/rest/utils.go | 83 ----- cmd/waku/server/rest/utils_test.go | 23 -- cmd/waku/server/rest/waku_rest.go | 97 ----- cmd/waku/server/rest/waku_rest_test.go | 19 - cmd/waku/server/rln.go | 23 -- cmd/waku/server/utils.go | 48 --- 33 files changed, 3708 deletions(-) delete mode 100644 cmd/waku/server/no_rln.go delete mode 100644 cmd/waku/server/rest/admin.go delete mode 100644 cmd/waku/server/rest/admin_api.yaml delete mode 100644 cmd/waku/server/rest/debug.go delete mode 100644 cmd/waku/server/rest/debug_api.yaml delete mode 100644 cmd/waku/server/rest/debug_test.go delete mode 100644 cmd/waku/server/rest/filter.go delete mode 100644 cmd/waku/server/rest/filter_api.yaml delete mode 100644 cmd/waku/server/rest/filter_cache.go delete mode 100644 cmd/waku/server/rest/filter_test.go delete mode 100644 cmd/waku/server/rest/health.go delete mode 100644 cmd/waku/server/rest/health_api.yaml delete mode 100644 cmd/waku/server/rest/lightpush_api.yaml delete mode 100644 cmd/waku/server/rest/lightpush_rest.go delete mode 100644 cmd/waku/server/rest/lightpush_rest_test.go delete mode 100644 cmd/waku/server/rest/message.go delete mode 100644 cmd/waku/server/rest/relay.go delete mode 100644 cmd/waku/server/rest/relay_api.yaml delete mode 100644 cmd/waku/server/rest/relay_test.go delete mode 100644 cmd/waku/server/rest/runner.go delete mode 100644 cmd/waku/server/rest/store.go delete mode 100644 cmd/waku/server/rest/store_api.yaml delete mode 100644 cmd/waku/server/rest/store_test.go delete mode 100644 cmd/waku/server/rest/utils.go delete mode 100644 cmd/waku/server/rest/utils_test.go delete mode 100644 cmd/waku/server/rest/waku_rest.go delete mode 100644 cmd/waku/server/rest/waku_rest_test.go delete mode 100644 cmd/waku/server/rln.go delete mode 100644 cmd/waku/server/utils.go diff --git a/cmd/waku/flags.go b/cmd/waku/flags.go index 387c4492e..5bb247f8c 100644 --- a/cmd/waku/flags.go +++ b/cmd/waku/flags.go @@ -479,47 +479,6 @@ var ( Destination: &options.Metrics.Port, EnvVars: []string{"WAKUNODE2_METRICS_SERVER_PORT"}, }) - RESTFlag = altsrc.NewBoolFlag(&cli.BoolFlag{ - Name: "rest", - Usage: "Enable Waku REST HTTP server", - Destination: &options.RESTServer.Enable, - EnvVars: []string{"WAKUNODE2_REST"}, - }) - RESTAddress = altsrc.NewStringFlag(&cli.StringFlag{ - Name: "rest-address", - Value: "127.0.0.1", - Usage: "Listening address of the REST HTTP server", - Destination: &options.RESTServer.Address, - EnvVars: []string{"WAKUNODE2_REST_ADDRESS"}, - }) - RESTPort = altsrc.NewIntFlag(&cli.IntFlag{ - Name: "rest-port", - Value: 8645, - Usage: "Listening port of the REST HTTP server", - Destination: &options.RESTServer.Port, - EnvVars: []string{"WAKUNODE2_REST_PORT"}, - }) - RESTRelayCacheCapacity = altsrc.NewIntFlag(&cli.IntFlag{ - Name: "rest-relay-cache-capacity", - Value: 1000, - Usage: "Capacity of the Relay REST API message cache", - Destination: &options.RESTServer.RelayCacheCapacity, - EnvVars: []string{"WAKUNODE2_REST_RELAY_CACHE_CAPACITY"}, - }) - RESTFilterCacheCapacity = altsrc.NewIntFlag(&cli.IntFlag{ - Name: "rest-filter-cache-capacity", - Value: 30, - Usage: "Capacity of the Filter REST API message cache", - Destination: &options.RESTServer.FilterCacheCapacity, - EnvVars: []string{"WAKUNODE2_REST_FILTER_CACHE_CAPACITY"}, - }) - RESTAdmin = altsrc.NewBoolFlag(&cli.BoolFlag{ - Name: "rest-admin", - Value: false, - Usage: "Enable access to REST HTTP Admin API", - Destination: &options.RESTServer.Admin, - EnvVars: []string{"WAKUNODE2_REST_ADMIN"}, - }) PProf = altsrc.NewBoolFlag(&cli.BoolFlag{ Name: "pprof", Usage: "provides runtime profiling data at /debug/pprof in both REST and RPC servers if they're enabled", diff --git a/cmd/waku/main.go b/cmd/waku/main.go index cb97c0272..ce6a1598a 100644 --- a/cmd/waku/main.go +++ b/cmd/waku/main.go @@ -87,12 +87,6 @@ func main() { MetricsServer, MetricsServerAddress, MetricsServerPort, - RESTFlag, - RESTAddress, - RESTPort, - RESTRelayCacheCapacity, - RESTFilterCacheCapacity, - RESTAdmin, PProf, } diff --git a/cmd/waku/node.go b/cmd/waku/node.go index 4a5a352bd..2d9b3d33d 100644 --- a/cmd/waku/node.go +++ b/cmd/waku/node.go @@ -39,7 +39,6 @@ import ( "github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoreds" // nolint: staticcheck ws "github.com/libp2p/go-libp2p/p2p/transport/websocket" "github.com/multiformats/go-multiaddr" - "github.com/waku-org/go-waku/cmd/waku/server/rest" "github.com/waku-org/go-waku/logging" "github.com/waku-org/go-waku/waku/metrics" "github.com/waku-org/go-waku/waku/persistence" @@ -388,20 +387,6 @@ func Execute(options NodeOptions) error { } } - var restServer *rest.WakuRest - if options.RESTServer.Enable { - wg.Add(1) - restConfig := rest.RestConfig{Address: options.RESTServer.Address, - Port: uint(options.RESTServer.Port), - EnablePProf: options.PProf, - EnableAdmin: options.RESTServer.Admin, - RelayCacheCapacity: uint(options.RESTServer.RelayCacheCapacity), - FilterCacheCapacity: uint(options.RESTServer.FilterCacheCapacity)} - - restServer = rest.NewWakuRest(wakuNode, restConfig, logger) - restServer.Start(ctx, &wg) - } - wg.Wait() logger.Info("Node setup complete") @@ -414,12 +399,6 @@ func Execute(options NodeOptions) error { // shut the node down wakuNode.Stop() - if options.RESTServer.Enable { - if err := restServer.Stop(ctx); err != nil { - return err - } - } - if options.Metrics.Enable { if err = metricsServer.Stop(ctx); err != nil { return err diff --git a/cmd/waku/options.go b/cmd/waku/options.go index 212fcf1c0..36fe9f115 100644 --- a/cmd/waku/options.go +++ b/cmd/waku/options.go @@ -99,16 +99,6 @@ type MetricsOptions struct { Port int } -// RESTServerOptions are settings used to start a rest http server -type RESTServerOptions struct { - Enable bool - Port int - Address string - Admin bool - RelayCacheCapacity int - FilterCacheCapacity int -} - // WSOptions are settings used for enabling websockets and secure websockets // support type WSOptions struct { @@ -174,5 +164,4 @@ type NodeOptions struct { DNSDiscovery DNSDiscoveryOptions Rendezvous RendezvousOptions Metrics MetricsOptions - RESTServer RESTServerOptions } diff --git a/cmd/waku/server/no_rln.go b/cmd/waku/server/no_rln.go deleted file mode 100644 index 78aac3178..000000000 --- a/cmd/waku/server/no_rln.go +++ /dev/null @@ -1,13 +0,0 @@ -//go:build gowaku_no_rln -// +build gowaku_no_rln - -package server - -import ( - "github.com/waku-org/go-waku/waku/v2/node" - "github.com/waku-org/go-waku/waku/v2/protocol/pb" -) - -func AppendRLNProof(node *node.WakuNode, msg *pb.WakuMessage) error { - return nil -} diff --git a/cmd/waku/server/rest/admin.go b/cmd/waku/server/rest/admin.go deleted file mode 100644 index 7823c46bc..000000000 --- a/cmd/waku/server/rest/admin.go +++ /dev/null @@ -1,135 +0,0 @@ -package rest - -import ( - "encoding/json" - "net/http" - - "github.com/go-chi/chi/v5" - "github.com/libp2p/go-libp2p/core/peer" - "github.com/libp2p/go-libp2p/core/protocol" - ma "github.com/multiformats/go-multiaddr" - "github.com/waku-org/go-waku/cmd/waku/server" - "github.com/waku-org/go-waku/logging" - "github.com/waku-org/go-waku/waku/v2/node" - "github.com/waku-org/go-waku/waku/v2/peerstore" - waku_proto "github.com/waku-org/go-waku/waku/v2/protocol" - "go.uber.org/zap" -) - -type AdminService struct { - node *node.WakuNode - mux *chi.Mux - log *zap.Logger -} - -type WakuPeer struct { - ID string `json:"id"` - MultiAddrs []string `json:"multiaddrs"` - Protocols []string `json:"protocols"` - Connected bool `json:"connected"` - PubsubTopics []string `json:"pubsubTopics"` -} - -type WakuPeerInfo struct { - MultiAddr string `json:"multiaddr"` - Shards []int `json:"shards"` - Protocols []string `json:"protocols"` -} - -const routeAdminV1Peers = "/admin/v1/peers" - -func NewAdminService(node *node.WakuNode, m *chi.Mux, log *zap.Logger) *AdminService { - d := &AdminService{ - node: node, - mux: m, - log: log, - } - - m.Get(routeAdminV1Peers, d.getV1Peers) - m.Post(routeAdminV1Peers, d.postV1Peer) - - return d -} - -func (a *AdminService) getV1Peers(w http.ResponseWriter, req *http.Request) { - peers, err := a.node.Peers() - if err != nil { - a.log.Error("failed to fetch peers", zap.Error(err)) - writeErrOrResponse(w, err, nil) - return - } - a.log.Info("fetched peers", zap.Int("count", len(peers))) - - response := make([]WakuPeer, 0) - for _, peer := range peers { - if peer.ID.String() == a.node.Host().ID().String() { - //Skip own node id - continue - } - wPeer := WakuPeer{ - ID: peer.ID.String(), - Connected: peer.Connected, - } - - for _, addr := range peer.Addrs { - wPeer.MultiAddrs = append(wPeer.MultiAddrs, addr.String()) - } - for _, proto := range peer.Protocols { - if !server.IsWakuProtocol(proto) { - a.log.Debug("skipping protocol as it is a non-waku protocol", logging.HostID("peer", peer.ID), zap.String("protocol", string(proto))) - continue - } - wPeer.Protocols = append(wPeer.Protocols, string(proto)) - } - wPeer.PubsubTopics = peer.PubsubTopics - response = append(response, wPeer) - } - - writeErrOrResponse(w, nil, response) -} - -func (a *AdminService) postV1Peer(w http.ResponseWriter, req *http.Request) { - var pInfo WakuPeerInfo - var topics []string - var protos []protocol.ID - - decoder := json.NewDecoder(req.Body) - if err := decoder.Decode(&pInfo); err != nil { - a.log.Error("failed to decode request", zap.Error(err)) - w.WriteHeader(http.StatusBadRequest) - return - } - defer req.Body.Close() - - addr, err := ma.NewMultiaddr(pInfo.MultiAddr) - if err != nil { - a.log.Error("building multiaddr", zap.Error(err)) - writeErrOrResponse(w, err, nil) - return - } - - for _, shard := range pInfo.Shards { - topic := waku_proto.NewStaticShardingPubsubTopic(a.node.ClusterID(), uint16(shard)) - topics = append(topics, topic.String()) - } - - for _, proto := range pInfo.Protocols { - protos = append(protos, protocol.ID(proto)) - } - - id, err := a.node.AddPeer(addr, peerstore.Static, topics, protos...) - if err != nil { - a.log.Error("failed to add peer", zap.Error(err)) - writeErrOrResponse(w, err, nil) - return - } - a.log.Info("add peer successful", logging.HostID("peerID", id)) - pi := peer.AddrInfo{ID: id, Addrs: []ma.Multiaddr{addr}} - err = a.node.Host().Connect(req.Context(), pi) - if err != nil { - a.log.Error("failed to connect to peer", logging.HostID("peerID", id), zap.Error(err)) - writeErrOrResponse(w, err, nil) - return - } - writeErrOrResponse(w, nil, nil) -} diff --git a/cmd/waku/server/rest/admin_api.yaml b/cmd/waku/server/rest/admin_api.yaml deleted file mode 100644 index 04834b0a1..000000000 --- a/cmd/waku/server/rest/admin_api.yaml +++ /dev/null @@ -1,92 +0,0 @@ -openapi: 3.0.3 -info: - title: Waku V2 node REST API - version: 1.0.0 - contact: - name: VAC Team - url: https://forum.vac.dev/ - -tags: - - name: admin - description: Admin REST API for WakuV2 node - -paths: - /admin/v1/peers: - get: - summary: Get connected peers info - description: Retrieve information about connected peers. - operationId: getPeerInfo - tags: - - admin - responses: - '200': - description: Information about a Waku v2 node. - content: - application/json: - schema: - type: array - items: - $ref: '#/components/schemas/WakuPeer' - '5XX': - description: Unexpected error. - post: - summary: Adds new peer(s) to connect with - description: Adds new peer(s) to connect with. - operationId: postPeerInfo - tags: - - admin - requestBody: - content: - application/json: - schema: - type: object - items: - $ref: '#/components/schemas/WakuPeerInfo' - responses: - '200': - description: Ok - '400': - description: Cannot connect to one or more peers. - '5XX': - description: Unexpected error. - -components: - schemas: - WakuPeerInfo: - type: object - required: - - multiaddr - - shards - - protocols - protocols: - type: array - items: - type: string - shards: - type: array - items: - type: integer - WakuPeer: - type: object - required: - - id - - addrs - - protocols - - connected - properties: - connected: - type: string - addrs: - type: array - items: - type: string - protocols: - type: array - items: - type: string - connected: - type: boolean - pubsubTopics: - type: array - items: - type: string \ No newline at end of file diff --git a/cmd/waku/server/rest/debug.go b/cmd/waku/server/rest/debug.go deleted file mode 100644 index 6606f5d0d..000000000 --- a/cmd/waku/server/rest/debug.go +++ /dev/null @@ -1,52 +0,0 @@ -package rest - -import ( - "net/http" - - "github.com/go-chi/chi/v5" - "github.com/waku-org/go-waku/waku/v2/node" -) - -type DebugService struct { - node *node.WakuNode - mux *chi.Mux -} - -type InfoArgs struct { -} - -type InfoReply struct { - ENRUri string `json:"enrUri,omitempty"` - ListenAddresses []string `json:"listenAddresses,omitempty"` -} - -const routeDebugInfoV1 = "/debug/v1/info" -const routeDebugVersionV1 = "/debug/v1/version" - -func NewDebugService(node *node.WakuNode, m *chi.Mux) *DebugService { - d := &DebugService{ - node: node, - mux: m, - } - - m.Get(routeDebugInfoV1, d.getV1Info) - m.Get(routeDebugVersionV1, d.getV1Version) - - return d -} - -type VersionResponse string - -func (d *DebugService) getV1Info(w http.ResponseWriter, req *http.Request) { - response := new(InfoReply) - response.ENRUri = d.node.ENR().String() - for _, addr := range d.node.ListenAddresses() { - response.ListenAddresses = append(response.ListenAddresses, addr.String()) - } - writeErrOrResponse(w, nil, response) -} - -func (d *DebugService) getV1Version(w http.ResponseWriter, req *http.Request) { - response := VersionResponse(node.GetVersionInfo().String()) - writeErrOrResponse(w, nil, response) -} diff --git a/cmd/waku/server/rest/debug_api.yaml b/cmd/waku/server/rest/debug_api.yaml deleted file mode 100644 index 2eb2db10b..000000000 --- a/cmd/waku/server/rest/debug_api.yaml +++ /dev/null @@ -1,43 +0,0 @@ -openapi: 3.0.3 -info: - title: Waku V2 node Debug REST API - version: 1.0.0 - contact: - name: VAC Team - url: https://forum.vac.dev/ - -tags: - - name: debug - description: Debug REST API for WakuV2 node - -paths: - /debug/v1/info: - get: - summary: Get node info - description: Retrieve information about a Waku v2 node. - operationId: getNodeInfo - tags: - - debug - responses: - '200': - description: Information about a Waku v2 node. - content: - application/json: - schema: - $ref: '#/components/schemas/WakuInfo' - '5XX': - description: Unexpected error. - -components: - schemas: - WakuInfo: - type: object - properties: - listenAddresses: - type: array - items: - type: string - enrUri: - type: string - required: - - listenAddresses diff --git a/cmd/waku/server/rest/debug_test.go b/cmd/waku/server/rest/debug_test.go deleted file mode 100644 index a4adab8ca..000000000 --- a/cmd/waku/server/rest/debug_test.go +++ /dev/null @@ -1,33 +0,0 @@ -package rest - -import ( - "bytes" - "context" - "net/http" - "net/http/httptest" - "testing" - - "github.com/stretchr/testify/require" - "github.com/waku-org/go-waku/waku/v2/node" -) - -func TestGetV1Info(t *testing.T) { - wakuNode1, err := node.New() - require.NoError(t, err) - defer wakuNode1.Stop() - err = wakuNode1.Start(context.Background()) - require.NoError(t, err) - - d := &DebugService{ - node: wakuNode1, - } - - request, err := http.NewRequest(http.MethodPost, routeDebugInfoV1, bytes.NewReader([]byte(""))) - require.NoError(t, err) - - rr := httptest.NewRecorder() - - d.getV1Info(rr, request) - - require.Equal(t, http.StatusOK, rr.Code) -} diff --git a/cmd/waku/server/rest/filter.go b/cmd/waku/server/rest/filter.go deleted file mode 100644 index 7973cc023..000000000 --- a/cmd/waku/server/rest/filter.go +++ /dev/null @@ -1,380 +0,0 @@ -package rest - -import ( - "context" - "encoding/json" - "fmt" - "net/http" - - "github.com/go-chi/chi/v5" - "github.com/libp2p/go-libp2p/core/peer" - "github.com/waku-org/go-waku/logging" - "github.com/waku-org/go-waku/waku/v2/node" - "github.com/waku-org/go-waku/waku/v2/peermanager" - "github.com/waku-org/go-waku/waku/v2/protocol" - "github.com/waku-org/go-waku/waku/v2/protocol/filter" - "go.uber.org/zap" -) - -const filterV2Subscriptions = "/filter/v2/subscriptions" -const filterv2Messages = "/filter/v2/messages" - -// FilterService represents the REST service for Filter client -type FilterService struct { - node *node.WakuNode - cancel context.CancelFunc - - log *zap.Logger - - cache *filterCache - runner *runnerService -} - -// Start starts the RelayService -func (s *FilterService) Start(ctx context.Context) { - - for _, sub := range s.node.FilterLightnode().Subscriptions() { - s.cache.subscribe(sub.ContentFilter) - } - - ctx, cancel := context.WithCancel(ctx) - s.cancel = cancel - s.runner.Start(ctx) -} - -// Stop stops the RelayService -func (r *FilterService) Stop() { - if r.cancel == nil { - return - } - r.cancel() -} - -// NewFilterService returns an instance of FilterService -func NewFilterService(node *node.WakuNode, m *chi.Mux, cacheCapacity int, log *zap.Logger) *FilterService { - logger := log.Named("filter") - - s := &FilterService{ - node: node, - log: logger, - cache: newFilterCache(cacheCapacity, logger), - } - - m.Route(filterV2Subscriptions, func(r chi.Router) { - r.Get("/", s.ping) - r.Get("/{requestId}", s.ping) - r.Post("/", s.subscribe) - r.Delete("/", s.unsubscribe) - r.Delete("/all", s.unsubscribeAll) - }) - - m.Route(filterv2Messages, func(r chi.Router) { - r.Get("/{contentTopic}", s.getMessagesByContentTopic) - r.Get("/{pubsubTopic}/{contentTopic}", s.getMessagesByPubsubTopic) - }) - - s.runner = newRunnerService(node.Broadcaster(), s.cache.addMessage) - - return s -} - -func convertFilterErrorToHttpStatus(err error) (int, string) { - code := http.StatusInternalServerError - statusDesc := "ping request failed" - - filterErrorCode := filter.ExtractCodeFromFilterError(err.Error()) - switch filterErrorCode { - case 404: - code = http.StatusNotFound - statusDesc = "peer has no subscription" - case 300: - case 400: - code = http.StatusBadRequest - statusDesc = "bad request format" - case 504: - code = http.StatusGatewayTimeout - case 503: - code = http.StatusServiceUnavailable - } - return code, statusDesc -} - -// 400 for bad requestId -// 404 when request failed or no suitable peers -// 200 when ping successful -func (s *FilterService) ping(w http.ResponseWriter, req *http.Request) { - requestID := chi.URLParam(req, "requestId") - if requestID == "" { - writeResponse(w, &filterSubscriptionResponse{ - RequestID: requestID, - StatusDesc: "bad request id", - }, http.StatusBadRequest) - return - } - - // selecting random peer that supports filter protocol - peerId := s.getRandomFilterPeer(req.Context(), requestID, w) - if peerId == "" { - return - } - - if err := s.node.FilterLightnode().Ping(req.Context(), peerId, filter.WithPingRequestId([]byte(requestID))); err != nil { - s.log.Error("ping request failed", zap.Error(err)) - - code, statusDesc := convertFilterErrorToHttpStatus(err) - - writeResponse(w, &filterSubscriptionResponse{ - RequestID: requestID, - StatusDesc: statusDesc, - }, code) - - return - } - - // success - writeResponse(w, &filterSubscriptionResponse{ - RequestID: requestID, - StatusDesc: http.StatusText(http.StatusOK), - }, http.StatusOK) -} - -// same for FilterUnsubscribeRequest -type filterSubscriptionRequest struct { - RequestID string `json:"requestId"` - ContentFilters []string `json:"contentFilters"` - PubsubTopic string `json:"pubsubTopic"` -} - -type filterSubscriptionResponse struct { - RequestID string `json:"requestId"` - StatusDesc string `json:"statusDesc"` -} - -// 400 on invalid request -// 404 on failed subscription -// 200 on single returned successful subscription -// NOTE: subscribe on filter client randomly selects a peer if missing for given pubSubTopic -func (s *FilterService) subscribe(w http.ResponseWriter, req *http.Request) { - message := filterSubscriptionRequest{} - if !s.readBody(w, req, &message) { - return - } - - contentFilter := protocol.NewContentFilter(message.PubsubTopic, message.ContentFilters...) - // - subscriptions, err := s.node.FilterLightnode().Subscribe(req.Context(), - contentFilter, - filter.WithRequestID([]byte(message.RequestID))) - - // on partial subscribe failure - if len(subscriptions) > 0 && err != nil { - s.log.Error("partial subscribe failed", zap.Error(err)) - // on partial failure - writeResponse(w, filterSubscriptionResponse{ - RequestID: message.RequestID, - StatusDesc: err.Error(), - }, http.StatusOK) - } - - if err != nil { - s.log.Error("subscription failed", zap.Error(err)) - code := filter.ExtractCodeFromFilterError(err.Error()) - if code == -1 { - code = http.StatusBadRequest - } - writeResponse(w, filterSubscriptionResponse{ - RequestID: message.RequestID, - StatusDesc: "subscription failed", - }, code) - return - } - - // on success - s.cache.subscribe(contentFilter) - writeResponse(w, filterSubscriptionResponse{ - RequestID: message.RequestID, - StatusDesc: http.StatusText(http.StatusOK), - }, http.StatusOK) -} - -// 400 on invalid request -// 500 on failed subscription -// 200 on successful unsubscribe -// NOTE: unsubscribe on filter client will remove subscription from all peers with matching pubSubTopic, if peerId is not provided -// to match functionality in nwaku, we will randomly select a peer that supports filter protocol. -func (s *FilterService) unsubscribe(w http.ResponseWriter, req *http.Request) { - message := filterSubscriptionRequest{} // as pubSubTopics can also be present - if !s.readBody(w, req, &message) { - return - } - - peerId := s.getRandomFilterPeer(req.Context(), message.RequestID, w) - if peerId == "" { - return - } - - contentFilter := protocol.NewContentFilter(message.PubsubTopic, message.ContentFilters...) - // unsubscribe on filter - result, err := s.node.FilterLightnode().Unsubscribe( - req.Context(), - contentFilter, - filter.WithRequestID([]byte(message.RequestID)), - filter.WithPeer(peerId), - ) - - if err != nil { - s.log.Error("unsubscribe failed", zap.Error(err)) - if result == nil { - writeResponse(w, filterSubscriptionResponse{ - RequestID: message.RequestID, - StatusDesc: err.Error(), - }, http.StatusBadRequest) - } - writeResponse(w, filterSubscriptionResponse{ - RequestID: message.RequestID, - StatusDesc: err.Error(), - }, http.StatusServiceUnavailable) - return - } - - // on success - for cTopic := range contentFilter.ContentTopics { - if !s.node.FilterLightnode().IsListening(contentFilter.PubsubTopic, cTopic) { - s.cache.unsubscribe(contentFilter.PubsubTopic, cTopic) - } - } - writeResponse(w, filterSubscriptionResponse{ - RequestID: message.RequestID, - StatusDesc: s.unsubscribeGetMessage(result), - }, http.StatusOK) -} - -func (s *FilterService) unsubscribeGetMessage(result *filter.WakuFilterPushResult) string { - if result == nil { - return http.StatusText(http.StatusOK) - } - var peerIds string - ind := 0 - for _, entry := range result.Errors() { - if entry.Err != nil { - s.log.Error("can't unsubscribe", logging.HostID("peer", entry.PeerID), zap.Error(entry.Err)) - if ind != 0 { - peerIds += ", " - } - peerIds += entry.PeerID.String() - } - ind++ - } - if peerIds != "" { - return "can't unsubscribe from " + peerIds - } - return http.StatusText(http.StatusOK) -} - -type filterUnsubscribeAllRequest struct { - RequestID string `json:"requestId"` -} - -func (s *FilterService) readBody(w http.ResponseWriter, req *http.Request, message interface{}) bool { - decoder := json.NewDecoder(req.Body) - if err := decoder.Decode(message); err != nil { - s.log.Error("bad request", zap.Error(err)) - w.WriteHeader(http.StatusBadRequest) - return false - } - defer req.Body.Close() - return true -} - -// 400 on invalid request -// 500 on failed subscription -// 200 on all successful unsubscribe -// unsubscribe all subscriptions for a given peer -func (s *FilterService) unsubscribeAll(w http.ResponseWriter, req *http.Request) { - message := filterUnsubscribeAllRequest{} - if !s.readBody(w, req, &message) { - return - } - - peerId := s.getRandomFilterPeer(req.Context(), message.RequestID, w) - if peerId == "" { - return - } - - // unsubscribe all subscriptions for a given peer - errCh, err := s.node.FilterLightnode().UnsubscribeAll( - req.Context(), - filter.WithRequestID([]byte(message.RequestID)), - filter.WithPeer(peerId), - ) - if err != nil { - s.log.Error("unsubscribeAll failed", zap.Error(err)) - writeResponse(w, filterSubscriptionResponse{ - RequestID: message.RequestID, - StatusDesc: err.Error(), - }, http.StatusServiceUnavailable) - return - } - - // on success - writeResponse(w, filterSubscriptionResponse{ - RequestID: message.RequestID, - StatusDesc: s.unsubscribeGetMessage(errCh), - }, http.StatusOK) -} - -func (s FilterService) getRandomFilterPeer(ctx context.Context, requestId string, w http.ResponseWriter) peer.ID { - // selecting random peer that supports filter protocol - peerIds, err := s.node.PeerManager().SelectPeers(peermanager.PeerSelectionCriteria{ - SelectionType: peermanager.Automatic, - Proto: filter.FilterSubscribeID_v20beta1, - Ctx: ctx, - }) - if err != nil { - s.log.Error("selecting peer", zap.Error(err)) - writeResponse(w, filterSubscriptionResponse{ - RequestID: requestId, - StatusDesc: "No suitable peers", - }, http.StatusServiceUnavailable) - return "" - } - return peerIds[0] -} - -func (s *FilterService) getMessagesByContentTopic(w http.ResponseWriter, req *http.Request) { - contentTopic := topicFromPath(w, req, "contentTopic", s.log) - if contentTopic == "" { - return - } - pubsubTopic, err := protocol.GetPubSubTopicFromContentTopic(contentTopic) - if err != nil { - writeGetMessageErr(w, fmt.Errorf("bad content topic"), http.StatusBadRequest, s.log) - return - } - s.getMessages(w, req, pubsubTopic, contentTopic) -} - -func (s *FilterService) getMessagesByPubsubTopic(w http.ResponseWriter, req *http.Request) { - contentTopic := topicFromPath(w, req, "contentTopic", s.log) - if contentTopic == "" { - return - } - pubsubTopic := topicFromPath(w, req, "pubsubTopic", s.log) - if pubsubTopic == "" { - return - } - s.getMessages(w, req, pubsubTopic, contentTopic) -} - -// 400 on invalid request -// 500 on failed subscription -// 200 on all successful unsubscribe -// unsubscribe all subscriptions for a given peer -func (s *FilterService) getMessages(w http.ResponseWriter, req *http.Request, pubsubTopic, contentTopic string) { - msgs, err := s.cache.getMessages(pubsubTopic, contentTopic) - if err != nil { - writeGetMessageErr(w, err, http.StatusNotFound, s.log) - return - } - writeResponse(w, msgs, http.StatusOK) -} diff --git a/cmd/waku/server/rest/filter_api.yaml b/cmd/waku/server/rest/filter_api.yaml deleted file mode 100644 index 5f5478b5a..000000000 --- a/cmd/waku/server/rest/filter_api.yaml +++ /dev/null @@ -1,337 +0,0 @@ -openapi: 3.0.3 -info: - title: Waku V2 node REST API - version: 1.0.0 - contact: - name: VAC Team - url: https://forum.vac.dev/ -tags: - - name: filter - description: Filter REST API for WakuV2 node - -paths: - /filter/v2/subscriptions/{requestId}: - get: # get_waku_v2_filter_v2_subscription - ping - summary: Subscriber-ping - a peer can query if there is a registered subscription for it - description: | - Subscriber peer can query its subscription existence on service node. - Returns HTTP200 if exists and HTTP404 if not. - Client must not fill anything but requestId in the request body. - operationId: subscriberPing - tags: - - filter - parameters: - - in: path - name: requestId - required: true - schema: - type: string - description: Id of ping request - responses: - '200': - description: OK - content: - application/json: - schema: - $ref: '#/components/schemas/FilterSubscriptionResponse' - '400': - description: Bad request. - content: - application/json: - schema: - $ref: '#/components/schemas/FilterSubscriptionResponse' - '404': - description: Not found. - content: - application/json: - schema: - $ref: '#/components/schemas/FilterSubscriptionResponse' - '5XX': - description: Unexpected error. - content: - application/json: - schema: - $ref: '#/components/schemas/FilterSubscriptionResponse' - - - /filter/v2/subscriptions: - post: # post_waku_v2_filter_v2_subscription - summary: Subscribe a peer to an array of content topics under a pubsubTopic - description: | - Subscribe a peer to an array of content topics under a pubsubTopic. - - It is allowed to refresh or add new content topic to an existing subscription. - - Fields pubsubTopic and contentFilters must be filled. - operationId: postSubscriptions - tags: - - filter - requestBody: - content: - application/json: - schema: - $ref: '#/components/schemas/FilterSubscribeRequest' - responses: - '200': - description: OK - content: - application/json: - schema: - $ref: '#/components/schemas/FilterSubscriptionResponse' - # TODO: Review the possible errors of this endpoint - '400': - description: Bad request. - content: - application/json: - schema: - $ref: '#/components/schemas/FilterSubscriptionResponse' - '404': - description: Not found. - content: - application/json: - schema: - $ref: '#/components/schemas/FilterSubscriptionResponse' - '5XX': - description: Unexpected error. - content: - application/json: - schema: - $ref: '#/components/schemas/FilterSubscriptionResponse' - delete: # delete_waku_v2_filter_v2_subscription - summary: Unsubscribe a peer from content topics - description: | - Unsubscribe a peer from content topics - Only that subscription will be removed which matches existing. - operationId: deleteSubscriptions - tags: - - filter - requestBody: - content: - application/json: - schema: - $ref: '#/components/schemas/FilterUnsubscribeRequest' - responses: - '200': - description: OK - content: - application/json: - schema: - $ref: '#/components/schemas/FilterSubscriptionResponse' - '400': - description: Bad request. - content: - application/json: - schema: - $ref: '#/components/schemas/FilterSubscriptionResponse' - '404': - description: Not found. - content: - application/json: - schema: - $ref: '#/components/schemas/FilterSubscriptionResponse' - '5XX': - description: Unexpected error. - content: - application/json: - schema: - $ref: '#/components/schemas/FilterSubscriptionResponse' - - /filter/v2/subscriptions/all: - delete: # delete_waku_v2_filter_v2_subscription - summary: Unsubscribe a peer from all content topics - description: | - Unsubscribe a peer from all content topics - operationId: deleteAllSubscriptions - tags: - - filter - requestBody: - content: - application/json: - schema: - $ref: '#/components/schemas/FilterUnsubscribeAllRequest' - responses: - '200': - description: OK - content: - application/json: - schema: - $ref: '#/components/schemas/FilterSubscriptionResponse' - '400': - description: Bad request. - content: - application/json: - schema: - $ref: '#/components/schemas/FilterSubscriptionResponse' - '404': - description: Not found. - content: - application/json: - schema: - $ref: '#/components/schemas/FilterSubscriptionResponse' - '5XX': - description: Unexpected error. - content: - application/json: - schema: - $ref: '#/components/schemas/FilterSubscriptionResponse' - /filter/v2/messages/{contentTopic}: - get: # get_waku_v2_filter_v2_messages - summary: Get the latest messages on the polled content topic - description: Get a list of messages that were received on a subscribed content topic after the last time this method was called. - operationId: getMessagesByTopic - tags: - - filter - parameters: - - in: path - name: contentTopic # Note the name is the same as in the path - required: true - schema: - type: string - description: Content topic of message - responses: - '200': - description: The latest messages on the polled topic. - content: - application/json: - schema: - $ref: '#/components/schemas/FilterGetMessagesResponse' - # TODO: Review the possible errors of this endpoint - '400': - description: Bad request. - content: - text/plain: - schema: - type: string - '404': - description: Not found. - content: - text/plain: - schema: - type: string - '5XX': - description: Unexpected error. - content: - text/plain: - schema: - type: string - /filter/v2/messages/{pubsubTopic}/{contentTopic}: - get: # get_waku_v2_filter_v2_messages - summary: Get the latest messages on the polled pubsub/content topic pair - description: Get a list of messages that were received on a subscribed content topic after the last time this method was called. - operationId: getMessagesByTopic - tags: - - filter - parameters: - - in: path - name: contentTopic # Note the name is the same as in the path - required: true - schema: - type: string - description: Content topic of message - - in: path - name: pubsubTopic # Note the name is the same as in the path - required: true - schema: - type: string - description: pubsub topic of message - responses: - '200': - description: The latest messages on the polled topic. - content: - application/json: - schema: - $ref: '#/components/schemas/FilterGetMessagesResponse' - # TODO: Review the possible errors of this endpoint - '400': - description: Bad request. - content: - text/plain: - schema: - type: string - '404': - description: Not found. - content: - text/plain: - schema: - type: string - '5XX': - description: Unexpected error. - content: - text/plain: - schema: - type: string - -components: - PubSubTopic: - type: string - ContentTopic: - type: string - - FilterSubscriptionResponse: - type: object - properties: - requestId: - type: string - statusDesc: - type: string - required: - - requestId - - FilterSubscribeRequest: - type: object - properties: - requestId: - type: string - contentFilters: - type: array - items: - $ref: '#/components/schemas/ContentTopic' - pubsubTopic: - $ref: "#/components/schemas/PubSubTopic" - required: - - requestId - - contentFilters - - pubsubTopic - - FilterUnsubscribeRequest: - type: object - properties: - requestId: - type: string - contentFilters: - type: array - items: - $ref: '#/components/schemas/ContentTopic' - pubsubTopic: - $ref: "#/components/schemas/PubSubTopic" - required: - - requestId - - contentFilters - - FilterUnsubscribeAllRequest: - type: object - properties: - requestId: - type: string - required: - - requestId - - FilterGetMessagesResponse: - type: array - items: - $ref: '#/components/schemas/FilterWakuMessage' - - FilterWakuMessage: - type: object - properties: - payload: - type: string - format: byte - contentTopic: - $ref: '#/components/schemas/ContentTopic' - version: - type: number - timestamp: - type: number - required: - - payload \ No newline at end of file diff --git a/cmd/waku/server/rest/filter_cache.go b/cmd/waku/server/rest/filter_cache.go deleted file mode 100644 index 77144435e..000000000 --- a/cmd/waku/server/rest/filter_cache.go +++ /dev/null @@ -1,84 +0,0 @@ -package rest - -import ( - "fmt" - "sync" - - "github.com/waku-org/go-waku/waku/v2/protocol" - "go.uber.org/zap" -) - -type filterCache struct { - capacity int - mu sync.RWMutex - log *zap.Logger - data map[string]map[string][]*RestWakuMessage -} - -func newFilterCache(capacity int, log *zap.Logger) *filterCache { - return &filterCache{ - capacity: capacity, - data: make(map[string]map[string][]*RestWakuMessage), - log: log.Named("cache"), - } -} - -func (c *filterCache) subscribe(contentFilter protocol.ContentFilter) { - c.mu.Lock() - defer c.mu.Unlock() - - pubSubTopicMap, _ := protocol.ContentFilterToPubSubTopicMap(contentFilter) - for pubsubTopic, contentTopics := range pubSubTopicMap { - if c.data[pubsubTopic] == nil { - c.data[pubsubTopic] = make(map[string][]*RestWakuMessage) - } - for _, topic := range contentTopics { - if c.data[pubsubTopic][topic] == nil { - c.data[pubsubTopic][topic] = []*RestWakuMessage{} - } - } - } -} - -func (c *filterCache) unsubscribe(pubsubTopic string, contentTopic string) { - c.mu.Lock() - defer c.mu.Unlock() - - delete(c.data[pubsubTopic], contentTopic) -} - -func (c *filterCache) addMessage(envelope *protocol.Envelope) { - c.mu.Lock() - defer c.mu.Unlock() - - pubsubTopic := envelope.PubsubTopic() - contentTopic := envelope.Message().ContentTopic - if c.data[pubsubTopic] == nil || c.data[pubsubTopic][contentTopic] == nil { - return - } - - // Keep a specific max number of message per topic - if len(c.data[pubsubTopic][contentTopic]) >= c.capacity { - c.data[pubsubTopic][contentTopic] = c.data[pubsubTopic][contentTopic][1:] - } - - message := &RestWakuMessage{} - if err := message.FromProto(envelope.Message()); err != nil { - c.log.Error("converting protobuffer msg into rest msg", zap.Error(err)) - return - } - - c.data[pubsubTopic][contentTopic] = append(c.data[pubsubTopic][contentTopic], message) -} - -func (c *filterCache) getMessages(pubsubTopic string, contentTopic string) ([]*RestWakuMessage, error) { - c.mu.RLock() - defer c.mu.RUnlock() - - if c.data[pubsubTopic] == nil || c.data[pubsubTopic][contentTopic] == nil { - return nil, fmt.Errorf("not subscribed to pubsubTopic:%s contentTopic: %s", pubsubTopic, contentTopic) - } - msgs := c.data[pubsubTopic][contentTopic] - c.data[pubsubTopic][contentTopic] = []*RestWakuMessage{} - return msgs, nil -} diff --git a/cmd/waku/server/rest/filter_test.go b/cmd/waku/server/rest/filter_test.go deleted file mode 100644 index 1641175b7..000000000 --- a/cmd/waku/server/rest/filter_test.go +++ /dev/null @@ -1,392 +0,0 @@ -package rest - -import ( - "bytes" - "context" - "encoding/hex" - "encoding/json" - "fmt" - "net/http" - "net/http/httptest" - "net/url" - "strings" - "testing" - "time" - - "github.com/go-chi/chi/v5" - "github.com/libp2p/go-libp2p/core/peerstore" - "github.com/stretchr/testify/require" - "github.com/waku-org/go-waku/tests" - "github.com/waku-org/go-waku/waku/v2/node" - wakupeerstore "github.com/waku-org/go-waku/waku/v2/peerstore" - "github.com/waku-org/go-waku/waku/v2/protocol" - "github.com/waku-org/go-waku/waku/v2/protocol/filter" - "github.com/waku-org/go-waku/waku/v2/protocol/pb" - "github.com/waku-org/go-waku/waku/v2/utils" -) - -func createNode(t *testing.T, opts ...node.WakuNodeOption) *node.WakuNode { - node, err := node.New(opts...) - require.NoError(t, err) - - err = node.Start(context.Background()) - require.NoError(t, err) - - return node -} - -// node2 connects to node1 -func twoFilterConnectedNodes(t *testing.T, pubSubTopics ...string) (*node.WakuNode, *node.WakuNode) { - node1 := createNode(t, node.WithWakuFilterFullNode()) // full node filter - node2 := createNode(t, node.WithWakuFilterLightNode()) // light node filter - - node2.Host().Peerstore().AddAddr(node1.Host().ID(), tests.GetHostAddress(node1.Host()), peerstore.PermanentAddrTTL) - err := node2.Host().Peerstore().AddProtocols(node1.Host().ID(), filter.FilterSubscribeID_v20beta1) - require.NoError(t, err) - - err = node2.Host().Peerstore().(*wakupeerstore.WakuPeerstoreImpl).SetPubSubTopics(node1.Host().ID(), pubSubTopics) - require.NoError(t, err) - - return node1, node2 -} - -// test 400, 404 status code for ping rest endpoint -// both requests are not successful -func TestFilterPingFailure(t *testing.T) { - node1, node2 := twoFilterConnectedNodes(t) - defer func() { - node1.Stop() - node2.Stop() - }() - - router := chi.NewRouter() - _ = NewFilterService(node2, router, 0, utils.Logger()) - - // with empty requestID - rr := httptest.NewRecorder() - req, _ := http.NewRequest(http.MethodGet, fmt.Sprintf("/filter/v2/subscriptions/%s", ""), nil) - router.ServeHTTP(rr, req) - checkJSON(t, filterSubscriptionResponse{ - RequestID: "", - StatusDesc: "bad request id", - }, getFilterResponse(t, rr.Body)) - require.Equal(t, http.StatusBadRequest, rr.Code) - - // no subscription with peer - requestID := hex.EncodeToString(protocol.GenerateRequestID()) - rr = httptest.NewRecorder() - req, _ = http.NewRequest(http.MethodGet, fmt.Sprintf("/filter/v2/subscriptions/%s", requestID), nil) - router.ServeHTTP(rr, req) - checkJSON(t, filterSubscriptionResponse{ - RequestID: requestID, - StatusDesc: "peer has no subscription", - }, getFilterResponse(t, rr.Body)) - require.Equal(t, http.StatusNotFound, rr.Code) -} - -// create a filter subscription to the peer and try peer that peer -// both steps should be successful -func TestFilterSubscribeAndPing(t *testing.T) { - pubsubTopic := "/waku/2/test/proto" - contentTopics := []string{"test"} - requestID := hex.EncodeToString(protocol.GenerateRequestID()) - - node1, node2 := twoFilterConnectedNodes(t, pubsubTopic) - defer func() { - node1.Stop() - node2.Stop() - }() - - router := chi.NewRouter() - _ = NewFilterService(node2, router, 0, utils.Logger()) - - // create subscription to peer - rr := httptest.NewRecorder() - reqReader := strings.NewReader(toString(t, filterSubscriptionRequest{ - RequestID: requestID, - PubsubTopic: pubsubTopic, - ContentFilters: contentTopics, - })) - req, _ := http.NewRequest(http.MethodPost, filterV2Subscriptions, reqReader) - router.ServeHTTP(rr, req) - checkJSON(t, filterSubscriptionResponse{ - RequestID: requestID, - StatusDesc: "OK", - }, getFilterResponse(t, rr.Body)) - require.Equal(t, http.StatusOK, rr.Code) - - // trying pinging the peer once there is subscription to it - rr = httptest.NewRecorder() - req, _ = http.NewRequest(http.MethodGet, fmt.Sprintf("%s/%s", filterV2Subscriptions, requestID), nil) - router.ServeHTTP(rr, req) - checkJSON(t, filterSubscriptionResponse{ - RequestID: requestID, - StatusDesc: "OK", - }, getFilterResponse(t, rr.Body)) - require.Equal(t, http.StatusOK, rr.Code) -} - -// create subscription to peer -// delete the subscription to the peer with matching pubSub and contentTopic -func TestFilterSubscribeAndUnsubscribe(t *testing.T) { - pubsubTopic := "/waku/2/test/proto" - contentTopics := []string{"test"} - requestID := hex.EncodeToString(protocol.GenerateRequestID()) - - node1, node2 := twoFilterConnectedNodes(t, pubsubTopic) - defer func() { - node1.Stop() - node2.Stop() - }() - - router := chi.NewRouter() - _ = NewFilterService(node2, router, 0, utils.Logger()) - - // create subscription to peer - rr := httptest.NewRecorder() - reqReader := strings.NewReader(toString(t, filterSubscriptionRequest{ - RequestID: requestID, - PubsubTopic: pubsubTopic, - ContentFilters: contentTopics, - })) - req, _ := http.NewRequest(http.MethodPost, filterV2Subscriptions, reqReader) - router.ServeHTTP(rr, req) - checkJSON(t, filterSubscriptionResponse{ - RequestID: requestID, - StatusDesc: "OK", - }, getFilterResponse(t, rr.Body)) - require.Equal(t, http.StatusOK, rr.Code) - - // delete the subscription to the peer with matching pubSub and contentTopic - requestID = hex.EncodeToString(protocol.GenerateRequestID()) - rr = httptest.NewRecorder() - reqReader = strings.NewReader(toString(t, filterSubscriptionRequest{ - RequestID: requestID, - PubsubTopic: pubsubTopic, - ContentFilters: contentTopics, - })) - req, _ = http.NewRequest(http.MethodDelete, filterV2Subscriptions, reqReader) - router.ServeHTTP(rr, req) - checkJSON(t, filterSubscriptionResponse{ - RequestID: requestID, - StatusDesc: "OK", - }, getFilterResponse(t, rr.Body)) - require.Equal(t, http.StatusOK, rr.Code) -} - -// create 2 subscription from filter client to server -// make a unsubscribeAll request -// try pinging the peer, if 404 is returned then unsubscribeAll was successful -func TestFilterAllUnsubscribe(t *testing.T) { - pubsubTopic := "/waku/2/test/proto" - contentTopics1 := "ct_1" - contentTopics2 := "ct_2" - - node1, node2 := twoFilterConnectedNodes(t, pubsubTopic) - defer func() { - node1.Stop() - node2.Stop() - }() - - router := chi.NewRouter() - _ = NewFilterService(node2, router, 0, utils.Logger()) - - // create 2 different subscription to peer - for _, ct := range []string{contentTopics1, contentTopics2} { - requestID := hex.EncodeToString(protocol.GenerateRequestID()) - rr := httptest.NewRecorder() - reqReader := strings.NewReader(toString(t, filterSubscriptionRequest{ - RequestID: requestID, - PubsubTopic: pubsubTopic, - ContentFilters: []string{ct}, - })) - req, _ := http.NewRequest(http.MethodPost, filterV2Subscriptions, reqReader) - router.ServeHTTP(rr, req) - checkJSON(t, filterSubscriptionResponse{ - RequestID: requestID, - StatusDesc: "OK", - }, getFilterResponse(t, rr.Body)) - require.Equal(t, http.StatusOK, rr.Code) - } - - // delete all subscription to the peer - requestID := hex.EncodeToString(protocol.GenerateRequestID()) - rr := httptest.NewRecorder() - reqReader := strings.NewReader(toString(t, filterUnsubscribeAllRequest{ - RequestID: requestID, - })) - req, _ := http.NewRequest(http.MethodDelete, fmt.Sprintf("%s/all", filterV2Subscriptions), reqReader) - router.ServeHTTP(rr, req) - checkJSON(t, filterSubscriptionResponse{ - RequestID: requestID, - StatusDesc: "OK", - }, getFilterResponse(t, rr.Body)) - require.Equal(t, http.StatusOK, rr.Code) - - // check if all subscriptions are deleted to the peer are deleted - requestID = hex.EncodeToString(protocol.GenerateRequestID()) - rr = httptest.NewRecorder() - req, _ = http.NewRequest(http.MethodGet, fmt.Sprintf("%s/%s", filterV2Subscriptions, requestID), nil) - router.ServeHTTP(rr, req) - checkJSON(t, filterSubscriptionResponse{ - RequestID: requestID, - StatusDesc: "peer has no subscription", - }, getFilterResponse(t, rr.Body)) - require.Equal(t, http.StatusNotFound, rr.Code) -} - -func checkJSON(t *testing.T, expected, actual interface{}) { - require.JSONEq(t, toString(t, expected), toString(t, actual)) -} -func getFilterResponse(t *testing.T, body *bytes.Buffer) filterSubscriptionResponse { - resp := filterSubscriptionResponse{} - err := json.Unmarshal(body.Bytes(), &resp) - require.NoError(t, err) - return resp -} -func getMessageResponse(t *testing.T, body *bytes.Buffer) []*pb.WakuMessage { - resp := []*pb.WakuMessage{} - err := json.Unmarshal(body.Bytes(), &resp) - require.NoError(t, err) - return resp -} -func toString(t *testing.T, data interface{}) string { - bytes, err := json.Marshal(data) - require.NoError(t, err) - return string(bytes) -} - -func TestFilterGetMessages(t *testing.T) { - pubsubTopic := "/waku/2/test/proto" - contentTopic := "/waku/2/app/1" - - // get nodes add connect them - generatedPubsubTopic, err := protocol.GetPubSubTopicFromContentTopic(contentTopic) - require.NoError(t, err) - node1, node2 := twoFilterConnectedNodes(t, pubsubTopic, generatedPubsubTopic) - defer func() { - node1.Stop() - node2.Stop() - }() - - // set router and start filter service - router := chi.NewRouter() - service := NewFilterService(node2, router, 2, utils.Logger()) - go service.Start(context.Background()) - defer service.Stop() - - { // create subscription so that messages are cached - for _, pubsubTopic := range []string{"", pubsubTopic} { - requestID := hex.EncodeToString(protocol.GenerateRequestID()) - rr := httptest.NewRecorder() - reqReader := strings.NewReader(toString(t, filterSubscriptionRequest{ - RequestID: requestID, - PubsubTopic: pubsubTopic, - ContentFilters: []string{contentTopic}, - })) - req, _ := http.NewRequest(http.MethodPost, filterV2Subscriptions, reqReader) - router.ServeHTTP(rr, req) - checkJSON(t, filterSubscriptionResponse{ - RequestID: requestID, - StatusDesc: "OK", - }, getFilterResponse(t, rr.Body)) - require.Equal(t, http.StatusOK, rr.Code) - } - } - - // submit messages - messageByContentTopic := []*protocol.Envelope{ - genMessage("", contentTopic), - genMessage("", contentTopic), - genMessage("", contentTopic), - } - messageByPubsubTopic := []*protocol.Envelope{ - genMessage(pubsubTopic, contentTopic), - } - for _, envelope := range append(messageByContentTopic, messageByPubsubTopic...) { - node2.Broadcaster().Submit(envelope) - } - time.Sleep(1 * time.Second) - - { // with malformed contentTopic - rr := httptest.NewRecorder() - req, _ := http.NewRequest(http.MethodGet, - fmt.Sprintf("%s/%s", filterv2Messages, url.QueryEscape("/waku/2/wrongtopic")), - nil, - ) - router.ServeHTTP(rr, req) - require.Equal(t, http.StatusBadRequest, rr.Code) - require.Equal(t, "bad content topic", rr.Body.String()) - } - - { // with check if the cache is working properly - rr := httptest.NewRecorder() - req, _ := http.NewRequest(http.MethodGet, - fmt.Sprintf("%s/%s", filterv2Messages, url.QueryEscape(contentTopic)), - nil, - ) - router.ServeHTTP(rr, req) - require.Equal(t, http.StatusOK, rr.Code) - checkJSON(t, toMessage(messageByContentTopic[1:]), getMessageResponse(t, rr.Body)) - } - - { // check if pubsubTopic is present in the url - rr := httptest.NewRecorder() - req, _ := http.NewRequest(http.MethodGet, - fmt.Sprintf("%s//%s", filterv2Messages, url.QueryEscape(contentTopic)), - nil, - ) - router.ServeHTTP(rr, req) - require.Equal(t, http.StatusBadRequest, rr.Code) - require.Equal(t, "missing pubsubTopic", rr.Body.String()) - } - - { // check messages by pubsub/contentTopic pair - rr := httptest.NewRecorder() - req, _ := http.NewRequest(http.MethodGet, - fmt.Sprintf("%s/%s/%s", filterv2Messages, url.QueryEscape(pubsubTopic), url.QueryEscape(contentTopic)), - nil, - ) - router.ServeHTTP(rr, req) - require.Equal(t, http.StatusOK, rr.Code) - checkJSON(t, toMessage(messageByPubsubTopic), getMessageResponse(t, rr.Body)) - } - - { // check if pubsubTopic/contentTOpic is subscribed or not. - rr := httptest.NewRecorder() - notSubscibredPubsubTopic := "/waku/2/test2/proto" - req, _ := http.NewRequest(http.MethodGet, - fmt.Sprintf("%s/%s/%s", filterv2Messages, url.QueryEscape(notSubscibredPubsubTopic), url.QueryEscape(contentTopic)), - nil, - ) - router.ServeHTTP(rr, req) - require.Equal(t, http.StatusNotFound, rr.Code) - require.Equal(t, - fmt.Sprintf("not subscribed to pubsubTopic:%s contentTopic: %s", notSubscibredPubsubTopic, contentTopic), - rr.Body.String(), - ) - } -} - -func toMessage(envs []*protocol.Envelope) []*pb.WakuMessage { - msgs := make([]*pb.WakuMessage, len(envs)) - for i, env := range envs { - msgs[i] = env.Message() - } - return msgs -} - -func genMessage(pubsubTopic, contentTopic string) *protocol.Envelope { - if pubsubTopic == "" { - pubsubTopic, _ = protocol.GetPubSubTopicFromContentTopic(contentTopic) - } - return protocol.NewEnvelope( - &pb.WakuMessage{ - Payload: []byte{1, 2, 3}, - ContentTopic: contentTopic, - Timestamp: utils.GetUnixEpoch(), - }, - 0, - pubsubTopic, - ) -} diff --git a/cmd/waku/server/rest/health.go b/cmd/waku/server/rest/health.go deleted file mode 100644 index 5a5a319e2..000000000 --- a/cmd/waku/server/rest/health.go +++ /dev/null @@ -1,52 +0,0 @@ -package rest - -import ( - "context" - "errors" - "net/http" - - "github.com/go-chi/chi/v5" - "github.com/waku-org/go-waku/waku/v2/node" -) - -type HealthService struct { - node *node.WakuNode - mux *chi.Mux -} - -const routeHealth = "/health" - -func NewHealthService(node *node.WakuNode, m *chi.Mux) *HealthService { - h := &HealthService{ - node: node, - mux: m, - } - - m.Get(routeHealth, h.getHealth) - - return h -} - -type HealthResponse string - -func (d *HealthService) getHealth(w http.ResponseWriter, r *http.Request) { - if d.node.RLNRelay() != nil { - isReady, err := d.node.RLNRelay().IsReady(r.Context()) - if err != nil { - if errors.Is(err, context.DeadlineExceeded) { - writeResponse(w, HealthResponse("Health check timed out"), http.StatusInternalServerError) - } else { - writeResponse(w, HealthResponse(err.Error()), http.StatusInternalServerError) - } - return - } - - if isReady { - writeResponse(w, HealthResponse("Node is healthy"), http.StatusOK) - } else { - writeResponse(w, HealthResponse("Node is not ready"), http.StatusInternalServerError) - } - } else { - writeResponse(w, HealthResponse("Non RLN healthcheck is not implemented"), http.StatusNotImplemented) - } -} diff --git a/cmd/waku/server/rest/health_api.yaml b/cmd/waku/server/rest/health_api.yaml deleted file mode 100644 index 56cd9ae3e..000000000 --- a/cmd/waku/server/rest/health_api.yaml +++ /dev/null @@ -1,41 +0,0 @@ -openapi: 3.0.3 -info: - title: Waku V2 node Health REST API - version: 1.0.0 - contact: - name: VAC Team - url: https://forum.vac.dev/ - -tags: - - name: health - description: Healt check REST API for WakuV2 node - -paths: - /health: - get: - summary: Get node health status - description: Retrieve readiness of a Waku v2 node. - operationId: healthcheck - tags: - - health - responses: - '200': - description: Waku v2 node is up and running. - content: - text/plain: - schema: - type: string - example: Node is healty - '500': - description: Internal server error - content: - text/plain: - schema: - type: string - '503': - description: Node not initialized or having issues - content: - text/plain: - schema: - type: string - example: Node is not initialized \ No newline at end of file diff --git a/cmd/waku/server/rest/lightpush_api.yaml b/cmd/waku/server/rest/lightpush_api.yaml deleted file mode 100644 index b2c342f64..000000000 --- a/cmd/waku/server/rest/lightpush_api.yaml +++ /dev/null @@ -1,84 +0,0 @@ -openapi: 3.0.3 -info: - title: Waku V2 node REST API - version: 1.0.0 - contact: - name: VAC Team - url: https://forum.vac.dev/ - -tags: - - name: lightpush - description: Lightpush REST API for WakuV2 node - -paths: - /lightpush/v1/message: - post: - summary: Request a message relay from a LightPush service provider - description: Push a message to be relayed on a PubSub topic. - operationId: postMessagesToPubsubTopic - tags: - - lightpush - requestBody: - content: - application/json: - schema: - $ref: '#/components/schemas/PushRequest' - responses: - '200': - description: OK - content: - text/plain: - schema: - type: string - '400': - description: Bad request. - content: - text/plain: - schema: - type: string - '500': - description: Internal server error - content: - text/plain: - schema: - type: string - '503': - description: Service not available - content: - text/plain: - schema: - type: string - -components: - schemas: - PubsubTopic: - type: string - - ContentTopic: - type: string - - WakuMessage: - type: object - properties: - payload: - type: string - format: byte - contentTopic: - $ref: '#/components/schemas/ContentTopic' - version: - type: number - timestamp: - type: number - required: - - payload - - contentTopic - - PushRequest: - type: object - properties: - pusbsubTopic: - $ref: '#/components/schemas/PubsubTopic' - message: - $ref: '#/components/schemas/WakuMessage' - required: - - message \ No newline at end of file diff --git a/cmd/waku/server/rest/lightpush_rest.go b/cmd/waku/server/rest/lightpush_rest.go deleted file mode 100644 index 488b0b166..000000000 --- a/cmd/waku/server/rest/lightpush_rest.go +++ /dev/null @@ -1,80 +0,0 @@ -package rest - -import ( - "encoding/json" - "errors" - "net/http" - - "github.com/go-chi/chi/v5" - "github.com/waku-org/go-waku/waku/v2/node" - "github.com/waku-org/go-waku/waku/v2/protocol/lightpush" - "go.uber.org/zap" -) - -const routeLightPushV1Messages = "/lightpush/v1/message" - -type LightpushService struct { - node *node.WakuNode - log *zap.Logger -} - -func NewLightpushService(node *node.WakuNode, m *chi.Mux, log *zap.Logger) *LightpushService { - serv := &LightpushService{ - node: node, - log: log.Named("lightpush"), - } - - m.Post(routeLightPushV1Messages, serv.postMessagev1) - - return serv -} - -func (msg lightpushRequest) Check() error { - if msg.Message == nil { - return errors.New("waku message is required") - } - return nil -} - -type lightpushRequest struct { - PubSubTopic string `json:"pubsubTopic"` - Message *RestWakuMessage `json:"message"` -} - -// handled error codes are 200, 400, 500, 503 -func (serv *LightpushService) postMessagev1(w http.ResponseWriter, req *http.Request) { - request := &lightpushRequest{} - decoder := json.NewDecoder(req.Body) - if err := decoder.Decode(request); err != nil { - w.WriteHeader(http.StatusBadRequest) - return - } - defer req.Body.Close() - - if err := request.Check(); err != nil { - w.WriteHeader(http.StatusBadRequest) - _, err = w.Write([]byte(err.Error())) - serv.log.Error("writing response", zap.Error(err)) - return - } - - if serv.node.Lightpush() == nil { - w.WriteHeader(http.StatusServiceUnavailable) - return - } - - message, err := request.Message.ToProto() - if err != nil { - w.WriteHeader(http.StatusBadRequest) - return - } - - _, err = serv.node.Lightpush().Publish(req.Context(), message, lightpush.WithPubSubTopic(request.PubSubTopic)) - if err != nil { - w.WriteHeader(http.StatusServiceUnavailable) - _, err = w.Write([]byte(err.Error())) - serv.log.Error("writing response", zap.Error(err)) - } else { - writeErrOrResponse(w, err, true) - } -} diff --git a/cmd/waku/server/rest/lightpush_rest_test.go b/cmd/waku/server/rest/lightpush_rest_test.go deleted file mode 100644 index 541739094..000000000 --- a/cmd/waku/server/rest/lightpush_rest_test.go +++ /dev/null @@ -1,61 +0,0 @@ -package rest - -import ( - "bytes" - "encoding/json" - "net/http" - "net/http/httptest" - "testing" - - "github.com/go-chi/chi/v5" - "github.com/libp2p/go-libp2p/core/peerstore" - "github.com/stretchr/testify/require" - "github.com/waku-org/go-waku/tests" - "github.com/waku-org/go-waku/waku/v2/node" - wakupeerstore "github.com/waku-org/go-waku/waku/v2/peerstore" - "github.com/waku-org/go-waku/waku/v2/protocol/lightpush" - "github.com/waku-org/go-waku/waku/v2/utils" -) - -// node2 connects to node1 -func twoLightPushConnectedNodes(t *testing.T, pubSubTopic string) (*node.WakuNode, *node.WakuNode) { - node1 := createNode(t, node.WithLightPush(), node.WithWakuRelay()) - node2 := createNode(t, node.WithLightPush(), node.WithWakuRelay()) - - node2.Host().Peerstore().AddAddr(node1.Host().ID(), tests.GetHostAddress(node1.Host()), peerstore.PermanentAddrTTL) - err := node2.Host().Peerstore().AddProtocols(node1.Host().ID(), lightpush.LightPushID_v20beta1) - require.NoError(t, err) - err = node2.Host().Peerstore().(*wakupeerstore.WakuPeerstoreImpl).SetPubSubTopics(node1.Host().ID(), []string{pubSubTopic}) - require.NoError(t, err) - return node1, node2 -} - -func TestLightpushMessagev1(t *testing.T) { - pubSubTopic := "/waku/2/default-waku/proto" - node1, node2 := twoLightPushConnectedNodes(t, pubSubTopic) - defer func() { - node1.Stop() - node2.Stop() - }() - - router := chi.NewRouter() - serv := NewLightpushService(node2, router, utils.Logger()) - _ = serv - - msg := lightpushRequest{ - PubSubTopic: pubSubTopic, - Message: &RestWakuMessage{ - Payload: []byte{1, 2, 3}, - ContentTopic: "abc", - Timestamp: utils.GetUnixEpoch(), - }, - } - msgJSONBytes, err := json.Marshal(msg) - require.NoError(t, err) - - rr := httptest.NewRecorder() - req, _ := http.NewRequest(http.MethodPost, routeLightPushV1Messages, bytes.NewReader(msgJSONBytes)) - router.ServeHTTP(rr, req) - require.Equal(t, http.StatusOK, rr.Code) - require.Equal(t, "true", rr.Body.String()) -} diff --git a/cmd/waku/server/rest/message.go b/cmd/waku/server/rest/message.go deleted file mode 100644 index 55521ac09..000000000 --- a/cmd/waku/server/rest/message.go +++ /dev/null @@ -1,46 +0,0 @@ -package rest - -import ( - "errors" - - "github.com/waku-org/go-waku/cmd/waku/server" - "github.com/waku-org/go-waku/waku/v2/protocol/pb" -) - -type RestWakuMessage struct { - Payload server.Base64URLByte `json:"payload"` - ContentTopic string `json:"contentTopic"` - Version *uint32 `json:"version,omitempty"` - Timestamp *int64 `json:"timestamp,omitempty"` - Meta []byte `json:"meta,omitempty"` -} - -func (r *RestWakuMessage) FromProto(input *pb.WakuMessage) error { - if err := input.Validate(); err != nil { - return err - } - - r.Payload = input.Payload - r.ContentTopic = input.ContentTopic - r.Timestamp = input.Timestamp - r.Version = input.Version - r.Meta = input.Meta - - return nil -} - -func (r *RestWakuMessage) ToProto() (*pb.WakuMessage, error) { - if r == nil { - return nil, errors.New("wakumessage is missing") - } - - msg := &pb.WakuMessage{ - Payload: r.Payload, - ContentTopic: r.ContentTopic, - Version: r.Version, - Timestamp: r.Timestamp, - Meta: r.Meta, - } - - return msg, nil -} diff --git a/cmd/waku/server/rest/relay.go b/cmd/waku/server/rest/relay.go deleted file mode 100644 index fd64422e9..000000000 --- a/cmd/waku/server/rest/relay.go +++ /dev/null @@ -1,316 +0,0 @@ -package rest - -import ( - "encoding/json" - "net/http" - "strings" - - "github.com/go-chi/chi/v5" - "github.com/waku-org/go-waku/cmd/waku/server" - "github.com/waku-org/go-waku/waku/v2/node" - "github.com/waku-org/go-waku/waku/v2/protocol" - "github.com/waku-org/go-waku/waku/v2/protocol/pb" - "github.com/waku-org/go-waku/waku/v2/protocol/relay" - "go.uber.org/zap" -) - -const routeRelayV1Subscriptions = "/relay/v1/subscriptions" -const routeRelayV1Messages = "/relay/v1/messages/{topic}" - -const routeRelayV1AutoSubscriptions = "/relay/v1/auto/subscriptions" -const routeRelayV1AutoMessages = "/relay/v1/auto/messages" - -// RelayService represents the REST service for WakuRelay -type RelayService struct { - node *node.WakuNode - - log *zap.Logger - - cacheCapacity uint -} - -// NewRelayService returns an instance of RelayService -func NewRelayService(node *node.WakuNode, m *chi.Mux, cacheCapacity uint, log *zap.Logger) *RelayService { - s := &RelayService{ - node: node, - log: log.Named("relay"), - cacheCapacity: cacheCapacity, - } - - m.Post(routeRelayV1Subscriptions, s.postV1Subscriptions) - m.Delete(routeRelayV1Subscriptions, s.deleteV1Subscriptions) - m.Get(routeRelayV1Messages, s.getV1Messages) - m.Post(routeRelayV1Messages, s.postV1Message) - - m.Post(routeRelayV1AutoSubscriptions, s.postV1AutoSubscriptions) - m.Delete(routeRelayV1AutoSubscriptions, s.deleteV1AutoSubscriptions) - - m.Route(routeRelayV1AutoMessages, func(r chi.Router) { - r.Get("/{contentTopic}", s.getV1AutoMessages) - r.Post("/", s.postV1AutoMessage) - }) - - return s -} - -func (r *RelayService) deleteV1Subscriptions(w http.ResponseWriter, req *http.Request) { - var topics []string - decoder := json.NewDecoder(req.Body) - if err := decoder.Decode(&topics); err != nil { - r.log.Error("decoding request failure", zap.Error(err)) - w.WriteHeader(http.StatusBadRequest) - return - } - defer req.Body.Close() - - var err error - for _, topic := range topics { - err = r.node.Relay().Unsubscribe(req.Context(), protocol.NewContentFilter(topic)) - if err != nil { - r.log.Error("unsubscribing from topic", zap.String("topic", strings.Replace(strings.Replace(topic, "\n", "", -1), "\r", "", -1)), zap.Error(err)) - } - } - - writeErrOrResponse(w, err, true) -} - -func (r *RelayService) postV1Subscriptions(w http.ResponseWriter, req *http.Request) { - var topics []string - decoder := json.NewDecoder(req.Body) - if err := decoder.Decode(&topics); err != nil { - r.log.Error("decoding request failure", zap.Error(err)) - w.WriteHeader(http.StatusBadRequest) - return - } - defer req.Body.Close() - - var err error - var successCnt int - var topicToSubscribe string - for _, topic := range topics { - if topic == "" { - topicToSubscribe = relay.DefaultWakuTopic - } else { - topicToSubscribe = topic - } - _, err = r.node.Relay().Subscribe(r.node.Relay().Context(), protocol.NewContentFilter(topicToSubscribe), relay.WithCacheSize(r.cacheCapacity)) - - if err != nil { - r.log.Error("subscribing to topic", zap.String("topic", strings.Replace(topicToSubscribe, "\n", "", -1)), zap.Error(err)) - continue - } - successCnt++ - } - - // on partial subscribe failure - if successCnt > 0 && err != nil { - r.log.Error("partial subscribe failed", zap.Error(err)) - // on partial failure - writeResponse(w, err, http.StatusOK) - return - } - - writeErrOrResponse(w, err, true) -} - -func (r *RelayService) getV1Messages(w http.ResponseWriter, req *http.Request) { - topic := topicFromPath(w, req, "topic", r.log) - if topic == "" { - r.log.Debug("topic is not specified, using default waku topic") - topic = relay.DefaultWakuTopic - } - //TODO: Update the API to also take a contentTopic since relay now supports filtering based on contentTopic as well. - sub, err := r.node.Relay().GetSubscriptionWithPubsubTopic(topic, "") - if err != nil { - w.WriteHeader(http.StatusNotFound) - _, err = w.Write([]byte(err.Error())) - r.log.Error("writing response", zap.Error(err)) - return - } - var response []*RestWakuMessage - done := false - for { - if done || len(response) > int(r.cacheCapacity) { - break - } - select { - case envelope, open := <-sub.Ch: - if !open { - r.log.Error("consume channel is closed for subscription", zap.String("pubsubTopic", topic)) - w.WriteHeader(http.StatusNotFound) - _, err = w.Write([]byte("consume channel is closed for subscription")) - if err != nil { - r.log.Error("writing response", zap.Error(err)) - } - return - } - - message := &RestWakuMessage{} - if err := message.FromProto(envelope.Message()); err != nil { - r.log.Error("converting protobuffer msg into rest msg", zap.Error(err)) - } else { - response = append(response, message) - } - case <-req.Context().Done(): - done = true - default: - done = true - } - } - - writeErrOrResponse(w, nil, response) -} - -func (r *RelayService) postV1Message(w http.ResponseWriter, req *http.Request) { - topic := topicFromPath(w, req, "topic", r.log) - if topic == "" { - r.log.Debug("topic is not specified, using default waku topic") - topic = relay.DefaultWakuTopic - } - - var restMessage *RestWakuMessage - decoder := json.NewDecoder(req.Body) - if err := decoder.Decode(&restMessage); err != nil { - r.log.Error("decoding request failure", zap.Error(err)) - writeErrResponse(w, r.log, err, http.StatusBadRequest) - return - } - defer req.Body.Close() - - message, err := restMessage.ToProto() - if err != nil { - r.log.Error("failed to convert message to proto", zap.Error(err)) - writeErrResponse(w, r.log, err, http.StatusBadRequest) - return - } - - if err := server.AppendRLNProof(r.node, message); err != nil { - r.log.Error("failed to append RLN proof for the message", zap.Error(err)) - writeErrOrResponse(w, err, nil) - return - } - - _, err = r.node.Relay().Publish(req.Context(), message, relay.WithPubSubTopic(strings.Replace(topic, "\n", "", -1))) - if err != nil { - r.log.Error("publishing message", zap.Error(err)) - if err == pb.ErrMissingPayload || err == pb.ErrMissingContentTopic || err == pb.ErrInvalidMetaLength { - writeErrResponse(w, r.log, err, http.StatusBadRequest) - return - } - } - - writeErrOrResponse(w, err, true) -} - -func (r *RelayService) deleteV1AutoSubscriptions(w http.ResponseWriter, req *http.Request) { - var cTopics []string - decoder := json.NewDecoder(req.Body) - if err := decoder.Decode(&cTopics); err != nil { - r.log.Error("decoding request failure", zap.Error(err)) - w.WriteHeader(http.StatusBadRequest) - return - } - defer req.Body.Close() - - err := r.node.Relay().Unsubscribe(req.Context(), protocol.NewContentFilter("", cTopics...)) - if err != nil { - r.log.Error("unsubscribing from topics", zap.Strings("contentTopics", cTopics), zap.Error(err)) - } - - writeErrOrResponse(w, err, true) -} - -func (r *RelayService) postV1AutoSubscriptions(w http.ResponseWriter, req *http.Request) { - var cTopics []string - decoder := json.NewDecoder(req.Body) - if err := decoder.Decode(&cTopics); err != nil { - r.log.Error("decoding request failure", zap.Error(err)) - w.WriteHeader(http.StatusBadRequest) - return - } - defer req.Body.Close() - - var err error - _, err = r.node.Relay().Subscribe(r.node.Relay().Context(), protocol.NewContentFilter("", cTopics...), relay.WithCacheSize(r.cacheCapacity)) - if err != nil { - r.log.Error("subscribing to topics", zap.Strings("contentTopics", cTopics), zap.Error(err)) - } - r.log.Debug("subscribed to topics", zap.Strings("contentTopics", cTopics)) - - if err != nil { - r.log.Error("writing response", zap.Error(err)) - writeErrResponse(w, r.log, err, http.StatusBadRequest) - } else { - writeErrOrResponse(w, err, true) - } - -} - -func (r *RelayService) getV1AutoMessages(w http.ResponseWriter, req *http.Request) { - - cTopic := topicFromPath(w, req, "contentTopic", r.log) - sub, err := r.node.Relay().GetSubscription(cTopic) - if err != nil { - r.log.Error("writing response", zap.Error(err)) - writeErrResponse(w, r.log, err, http.StatusNotFound) - return - } - var response []*RestWakuMessage - done := false - for { - if done || len(response) > int(r.cacheCapacity) { - break - } - select { - case envelope := <-sub.Ch: - message := &RestWakuMessage{} - if err := message.FromProto(envelope.Message()); err != nil { - r.log.Error("converting protobuffer msg into rest msg", zap.Error(err)) - } else { - response = append(response, message) - } - case <-req.Context().Done(): - done = true - default: - done = true - } - } - - writeErrOrResponse(w, nil, response) -} - -func (r *RelayService) postV1AutoMessage(w http.ResponseWriter, req *http.Request) { - - var restMessage *RestWakuMessage - decoder := json.NewDecoder(req.Body) - if err := decoder.Decode(&restMessage); err != nil { - r.log.Error("decoding request failure", zap.Error(err)) - w.WriteHeader(http.StatusBadRequest) - return - } - defer req.Body.Close() - - message, err := restMessage.ToProto() - if err != nil { - writeErrOrResponse(w, err, nil) - return - } - - if err = server.AppendRLNProof(r.node, message); err != nil { - writeErrOrResponse(w, err, nil) - return - } - - _, err = r.node.Relay().Publish(req.Context(), message) - if err != nil { - r.log.Error("publishing message", zap.Error(err)) - if err == pb.ErrMissingPayload || err == pb.ErrMissingContentTopic || err == pb.ErrInvalidMetaLength { - writeErrResponse(w, r.log, err, http.StatusBadRequest) - return - } - writeErrResponse(w, r.log, err, http.StatusBadRequest) - } else { - w.WriteHeader(http.StatusOK) - } - -} diff --git a/cmd/waku/server/rest/relay_api.yaml b/cmd/waku/server/rest/relay_api.yaml deleted file mode 100644 index e42432a13..000000000 --- a/cmd/waku/server/rest/relay_api.yaml +++ /dev/null @@ -1,245 +0,0 @@ -openapi: 3.0.3 -info: - title: Waku V2 node Relay REST API - version: 1.0.0 - contact: - name: VAC Team - url: https://forum.vac.dev/ - -tags: - - name: relay - description: Relay REST API for WakuV2 node - -paths: - /relay/v1/messages/{topic}: # Note the plural in messages - get: # get_waku_v2_relay_v1_messages - summary: Get the latest messages on the polled topic - description: Get a list of messages that were received on a subscribed PubSub topic after the last time this method was called. - operationId: getMessagesByTopic - tags: - - relay - parameters: - - in: path - name: topic # Note the name is the same as in the path - required: true - schema: - type: string - description: The user ID - responses: - '200': - description: The latest messages on the polled topic. - content: - application/json: - schema: - $ref: '#/components/schemas/RelayGetMessagesResponse' - # TODO: Review the possible errors of this endpoint - '5XX': - description: Unexpected error. - - post: # post_waku_v2_relay_v1_message - summary: Publish a message to be relayed - description: Publishes a message to be relayed on a PubSub topic. - operationId: postMessagesToTopic - tags: - - relay - parameters: - - in: path - name: topic # Note the name is the same as in the path - description: The messages content topic - required: true - schema: - $ref: '#/components/schemas/RelayPostMessagesRequest' - requestBody: - content: - application/json: - schema: - $ref: '#/components/schemas/RelayPostMessagesRequest' - responses: - '200': - description: OK - # TODO: Review the possible errors of this endpoint - '5XX': - description: Unexpected error. - - /relay/v1/subscriptions: - post: # post_waku_v2_relay_v1_subscriptions - summary: Subscribe a node to an array of topics - description: Subscribe a node to an array of PubSub topics. - operationId: postSubscriptions - tags: - - relay - requestBody: - content: - application/json: - schema: - $ref: '#/components/schemas/RelayPostSubscriptionsRequest' - responses: - '200': - description: OK - content: - text/plain: - schema: - type: string - # TODO: Review the possible errors of this endpoint - '5XX': - description: Unexpected error. - - delete: # delete_waku_v2_relay_v1_subscriptions - summary: Unsubscribe a node from an array of topics - description: Unsubscribe a node from an array of PubSub topics. - operationId: deleteSubscriptions - tags: - - relay - requestBody: - content: - application/json: - schema: - $ref: '#/components/schemas/RelayDeleteSubscriptionsRequest' - responses: - '200': - description: OK - content: - text/plain: - schema: - type: string - # TODO: Review the possible errors of this endpoint - '5XX': - description: Unexpected error. - - /relay/v1/auto/messages/{contentTopic}: # Note the plural in messages - get: # get_waku_v2_relay_v1_auto_messages - summary: Get the latest messages on the polled topic - description: Get a list of messages that were received on a subscribed Content topic after the last time this method was called. - operationId: getMessagesByTopic - tags: - - relay - parameters: - - in: path - name: contentTopic # Note the name is the same as in the path - required: true - schema: - type: string - description: The user ID - responses: - '200': - description: The latest messages on the polled topic. - content: - application/json: - schema: - $ref: '#/components/schemas/RelayGetMessagesResponse' - '4XX': - description: Bad request. - '5XX': - description: Unexpected error. - - /relay/v1/auto/messages: # Note the plural in messages - post: # post_waku_v2_relay_v1_auto_message - summary: Publish a message to be relayed - description: Publishes a message to be relayed on a Content topic. - operationId: postMessagesToTopic - tags: - - relay - requestBody: - content: - application/json: - schema: - $ref: '#/components/schemas/RelayPostMessagesRequest' - responses: - '200': - description: OK - '4XX': - description: Bad request. - '5XX': - description: Unexpected error. - - /relay/v1/auto/subscriptions: - post: # post_waku_v2_relay_v1_auto_subscriptions - summary: Subscribe a node to an array of topics - description: Subscribe a node to an array of Content topics. - operationId: postSubscriptions - tags: - - relay - requestBody: - content: - application/json: - schema: - type array: - items: - $ref: '#/components/schemas/ContentTopic' - responses: - '200': - description: OK - content: - text/plain: - schema: - type: string - '4XX': - description: Bad request. - '5XX': - description: Unexpected error. - - delete: # delete_waku_v2_relay_v1_auto_subscriptions - summary: Unsubscribe a node from an array of topics - description: Unsubscribe a node from an array of Content topics. - operationId: deleteSubscriptions - tags: - - relay - requestBody: - content: - application/json: - schema: - type array: - items: - $ref: '#/components/schemas/ContentTopic' - responses: - '200': - description: OK - content: - text/plain: - schema: - type: string - '4XX': - description: Bad request. - '5XX': - description: Unexpected error. - -components: - schemas: - PubSubTopic: - type: string - ContentTopic: - type: string - - RelayWakuMessage: - type: object - properties: - payload: - type: string - format: byte - contentTopic: - $ref: '#/components/schemas/ContentTopic' - version: - type: number - timestamp: - type: number - required: - - payload - - RelayGetMessagesResponse: - type: array - items: - $ref: '#/components/schemas/RelayWakuMessage' - - RelayPostMessagesRequest: - $ref: '#/components/schemas/RelayWakuMessage' - - RelayPostSubscriptionsRequest: - type: array - items: - $ref: '#/components/schemas/PubSubTopic' - - RelayDeleteSubscriptionsRequest: - type: array - items: - $ref: '#/components/schemas/PubSubTopic' - diff --git a/cmd/waku/server/rest/relay_test.go b/cmd/waku/server/rest/relay_test.go deleted file mode 100644 index c0d54fed0..000000000 --- a/cmd/waku/server/rest/relay_test.go +++ /dev/null @@ -1,296 +0,0 @@ -package rest - -import ( - "bytes" - "context" - "encoding/json" - "fmt" - "net/http" - "net/http/httptest" - "net/url" - "testing" - "time" - - "github.com/go-chi/chi/v5" - "github.com/multiformats/go-multiaddr" - "github.com/stretchr/testify/require" - "github.com/waku-org/go-waku/tests" - "github.com/waku-org/go-waku/waku/v2/node" - "github.com/waku-org/go-waku/waku/v2/protocol/pb" - "github.com/waku-org/go-waku/waku/v2/protocol/relay" - "github.com/waku-org/go-waku/waku/v2/utils" - "google.golang.org/protobuf/proto" -) - -func makeRelayService(t *testing.T, mux *chi.Mux) *RelayService { - options := node.WithWakuRelayAndMinPeers(0) - n, err := node.New(options) - require.NoError(t, err) - err = n.Start(context.Background()) - require.NoError(t, err) - - return NewRelayService(n, mux, 3, utils.Logger()) -} - -func TestPostV1Message(t *testing.T) { - router := chi.NewRouter() - - _ = makeRelayService(t, router) - msg := &RestWakuMessage{ - Payload: []byte{1, 2, 3}, - ContentTopic: "abc", - Timestamp: utils.GetUnixEpoch(), - } - msgJSONBytes, err := json.Marshal(msg) - require.NoError(t, err) - - rr := httptest.NewRecorder() - req, _ := http.NewRequest(http.MethodPost, "/relay/v1/messages/test", bytes.NewReader(msgJSONBytes)) - router.ServeHTTP(rr, req) - require.Equal(t, http.StatusOK, rr.Code) - require.Equal(t, "true", rr.Body.String()) -} - -func TestRelaySubscription(t *testing.T) { - router := chi.NewRouter() - - r := makeRelayService(t, router) - - // Wait for node to start - time.Sleep(500 * time.Millisecond) - - topics := []string{"test"} - topicsJSONBytes, err := json.Marshal(topics) - require.NoError(t, err) - - rr := httptest.NewRecorder() - req, _ := http.NewRequest(http.MethodPost, routeRelayV1Subscriptions, bytes.NewReader(topicsJSONBytes)) - router.ServeHTTP(rr, req) - require.Equal(t, http.StatusOK, rr.Code) - require.Equal(t, "true", rr.Body.String()) - - // Test max messages in subscription - now := *utils.GetUnixEpoch() - _, err = r.node.Relay().Publish(context.Background(), - tests.CreateWakuMessage("test", proto.Int64(now+1)), relay.WithPubSubTopic("test")) - require.NoError(t, err) - _, err = r.node.Relay().Publish(context.Background(), - tests.CreateWakuMessage("test", proto.Int64(now+2)), relay.WithPubSubTopic("test")) - require.NoError(t, err) - - _, err = r.node.Relay().Publish(context.Background(), - tests.CreateWakuMessage("test", proto.Int64(now+3)), relay.WithPubSubTopic("test")) - require.NoError(t, err) - - // Wait for the messages to be processed - time.Sleep(5 * time.Millisecond) - - // Test deletion - rr = httptest.NewRecorder() - req, _ = http.NewRequest(http.MethodDelete, routeRelayV1Subscriptions, bytes.NewReader(topicsJSONBytes)) - router.ServeHTTP(rr, req) - require.Equal(t, http.StatusOK, rr.Code) - require.Equal(t, "true", rr.Body.String()) -} - -func TestRelayGetV1Messages(t *testing.T) { - router := chi.NewRouter() - router1 := chi.NewRouter() - - serviceA := makeRelayService(t, router) - - serviceB := makeRelayService(t, router1) - - hostInfo, err := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", serviceB.node.Host().ID().String())) - require.NoError(t, err) - - var addr multiaddr.Multiaddr - for _, a := range serviceB.node.Host().Addrs() { - addr = a.Encapsulate(hostInfo) - break - } - err = serviceA.node.DialPeerWithMultiAddress(context.Background(), addr) - require.NoError(t, err) - - // Wait for the dial to complete - time.Sleep(1 * time.Second) - - topics := []string{"test"} - topicsJSONBytes, err := json.Marshal(topics) - require.NoError(t, err) - - rr := httptest.NewRecorder() - req, _ := http.NewRequest(http.MethodPost, routeRelayV1Subscriptions, bytes.NewReader(topicsJSONBytes)) - router.ServeHTTP(rr, req) - require.Equal(t, http.StatusOK, rr.Code) - - // Wait for the subscription to be started - time.Sleep(1 * time.Second) - - msg := &RestWakuMessage{ - Payload: []byte{1, 2, 3}, - ContentTopic: "test", - Timestamp: utils.GetUnixEpoch(), - } - msgJsonBytes, err := json.Marshal(msg) - require.NoError(t, err) - - rr = httptest.NewRecorder() - req, _ = http.NewRequest(http.MethodPost, "/relay/v1/messages/test", bytes.NewReader(msgJsonBytes)) - router.ServeHTTP(rr, req) - require.Equal(t, http.StatusOK, rr.Code) - - // Wait for the message to be received - time.Sleep(1 * time.Second) - - rr = httptest.NewRecorder() - req, _ = http.NewRequest(http.MethodGet, "/relay/v1/messages/test", bytes.NewReader([]byte{})) - router.ServeHTTP(rr, req) - require.Equal(t, http.StatusOK, rr.Code) - - var messages []*pb.WakuMessage - err = json.Unmarshal(rr.Body.Bytes(), &messages) - require.NoError(t, err) - require.Len(t, messages, 1) - - rr = httptest.NewRecorder() - req, _ = http.NewRequest(http.MethodGet, "/relay/v1/messages/test", bytes.NewReader([]byte{})) - router1.ServeHTTP(rr, req) - require.Equal(t, http.StatusNotFound, rr.Code) - -} - -func TestPostAutoV1Message(t *testing.T) { - router := chi.NewRouter() - - _ = makeRelayService(t, router) - msg := &RestWakuMessage{ - Payload: []byte{1, 2, 3}, - ContentTopic: "/toychat/1/huilong/proto", - Timestamp: utils.GetUnixEpoch(), - } - msgJSONBytes, err := json.Marshal(msg) - require.NoError(t, err) - - rr := httptest.NewRecorder() - req, _ := http.NewRequest(http.MethodPost, routeRelayV1AutoMessages, bytes.NewReader(msgJSONBytes)) - router.ServeHTTP(rr, req) - require.Equal(t, http.StatusOK, rr.Code) -} - -func TestRelayAutoSubUnsub(t *testing.T) { - router := chi.NewRouter() - - r := makeRelayService(t, router) - - // Wait for node to start - time.Sleep(500 * time.Millisecond) - - cTopic1 := "/toychat/1/huilong/proto" - - cTopics := []string{cTopic1} - topicsJSONBytes, err := json.Marshal(cTopics) - require.NoError(t, err) - - rr := httptest.NewRecorder() - req, _ := http.NewRequest(http.MethodPost, routeRelayV1AutoSubscriptions, bytes.NewReader(topicsJSONBytes)) - router.ServeHTTP(rr, req) - require.Equal(t, http.StatusOK, rr.Code) - require.Equal(t, "true", rr.Body.String()) - - // Test publishing messages after subscription - now := *utils.GetUnixEpoch() - _, err = r.node.Relay().Publish(context.Background(), - tests.CreateWakuMessage(cTopic1, proto.Int64(now+1))) - require.NoError(t, err) - - // Wait for the messages to be processed - time.Sleep(5 * time.Millisecond) - - // Test deletion - rr = httptest.NewRecorder() - req, _ = http.NewRequest(http.MethodDelete, routeRelayV1AutoSubscriptions, bytes.NewReader(topicsJSONBytes)) - router.ServeHTTP(rr, req) - require.Equal(t, http.StatusOK, rr.Code) - require.Equal(t, "true", rr.Body.String()) - - cTopics = append(cTopics, "test") - topicsJSONBytes, err = json.Marshal(cTopics) - require.NoError(t, err) - - rr = httptest.NewRecorder() - req, _ = http.NewRequest(http.MethodPost, routeRelayV1AutoSubscriptions, bytes.NewReader(topicsJSONBytes)) - router.ServeHTTP(rr, req) - require.Equal(t, http.StatusBadRequest, rr.Code) - -} - -func TestRelayGetV1AutoMessages(t *testing.T) { - router := chi.NewRouter() - router1 := chi.NewRouter() - - serviceA := makeRelayService(t, router) - - serviceB := makeRelayService(t, router1) - - hostInfo, err := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", serviceB.node.Host().ID().String())) - require.NoError(t, err) - - var addr multiaddr.Multiaddr - for _, a := range serviceB.node.Host().Addrs() { - addr = a.Encapsulate(hostInfo) - break - } - err = serviceA.node.DialPeerWithMultiAddress(context.Background(), addr) - require.NoError(t, err) - - // Wait for the dial to complete - time.Sleep(1 * time.Second) - - cTopic1 := "/toychat/1/huilong/proto" - - cTopics := []string{cTopic1} - topicsJSONBytes, err := json.Marshal(cTopics) - require.NoError(t, err) - - rr := httptest.NewRecorder() - req, _ := http.NewRequest(http.MethodPost, routeRelayV1AutoSubscriptions, bytes.NewReader(topicsJSONBytes)) - router.ServeHTTP(rr, req) - require.Equal(t, http.StatusOK, rr.Code) - require.Equal(t, "true", rr.Body.String()) - - // Wait for the subscription to be started - time.Sleep(1 * time.Second) - - msg := &RestWakuMessage{ - Payload: []byte{1, 2, 3}, - ContentTopic: cTopic1, - Timestamp: utils.GetUnixEpoch(), - } - msgJsonBytes, err := json.Marshal(msg) - require.NoError(t, err) - - rr = httptest.NewRecorder() - req, _ = http.NewRequest(http.MethodPost, routeRelayV1AutoMessages, bytes.NewReader(msgJsonBytes)) - router.ServeHTTP(rr, req) - require.Equal(t, http.StatusOK, rr.Code) - - // Wait for the message to be received - time.Sleep(1 * time.Second) - - rr = httptest.NewRecorder() - req, _ = http.NewRequest(http.MethodGet, fmt.Sprintf("%s/%s", routeRelayV1AutoMessages, url.QueryEscape(cTopic1)), bytes.NewReader([]byte{})) - router.ServeHTTP(rr, req) - require.Equal(t, http.StatusOK, rr.Code) - - var messages []*pb.WakuMessage - err = json.Unmarshal(rr.Body.Bytes(), &messages) - require.NoError(t, err) - require.Len(t, messages, 1) - - rr = httptest.NewRecorder() - req, _ = http.NewRequest(http.MethodGet, fmt.Sprintf("%s/%s", routeRelayV1AutoMessages, url.QueryEscape(cTopic1)), bytes.NewReader([]byte{})) - router1.ServeHTTP(rr, req) - require.Equal(t, http.StatusNotFound, rr.Code) - -} diff --git a/cmd/waku/server/rest/runner.go b/cmd/waku/server/rest/runner.go deleted file mode 100644 index c1c4d3105..000000000 --- a/cmd/waku/server/rest/runner.go +++ /dev/null @@ -1,48 +0,0 @@ -package rest - -import ( - "context" - - "github.com/waku-org/go-waku/waku/v2/protocol" - "github.com/waku-org/go-waku/waku/v2/protocol/relay" -) - -type Adder func(msg *protocol.Envelope) - -type runnerService struct { - broadcaster relay.Broadcaster - sub *relay.Subscription - cancel context.CancelFunc - adder Adder -} - -func newRunnerService(broadcaster relay.Broadcaster, adder Adder) *runnerService { - return &runnerService{ - broadcaster: broadcaster, - adder: adder, - } -} - -func (r *runnerService) Start(ctx context.Context) { - ctx, cancel := context.WithCancel(ctx) - r.cancel = cancel - r.sub = r.broadcaster.RegisterForAll(relay.WithBufferSize(relay.DefaultRelaySubscriptionBufferSize)) - for { - select { - case <-ctx.Done(): - return - case envelope, ok := <-r.sub.Ch: - if ok { - r.adder(envelope) - } - } - } -} - -func (r *runnerService) Stop() { - if r.cancel == nil { - return - } - r.sub.Unsubscribe() - r.cancel() -} diff --git a/cmd/waku/server/rest/store.go b/cmd/waku/server/rest/store.go deleted file mode 100644 index 8e8ac87b2..000000000 --- a/cmd/waku/server/rest/store.go +++ /dev/null @@ -1,206 +0,0 @@ -package rest - -import ( - "context" - "encoding/base64" - "fmt" - "net/http" - "strconv" - "strings" - "time" - - "github.com/go-chi/chi/v5" - "github.com/multiformats/go-multiaddr" - "github.com/waku-org/go-waku/waku/v2/node" - "github.com/waku-org/go-waku/waku/v2/protocol/store" - "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" -) - -type StoreService struct { - node *node.WakuNode - mux *chi.Mux -} - -type StoreResponse struct { - Messages []StoreWakuMessage `json:"messages"` - Cursor *HistoryCursor `json:"cursor,omitempty"` - ErrorMessage string `json:"error_message,omitempty"` -} - -type HistoryCursor struct { - PubsubTopic string `json:"pubsubTopic"` - SenderTime string `json:"senderTime"` - StoreTime string `json:"storeTime"` - Digest []byte `json:"digest"` -} - -type StoreWakuMessage struct { - Payload []byte `json:"payload"` - ContentTopic string `json:"contentTopic"` - Version *uint32 `json:"version,omitempty"` - Timestamp *int64 `json:"timestamp,omitempty"` - Meta []byte `json:"meta,omitempty"` -} - -const routeStoreMessagesV1 = "/store/v1/messages" - -func NewStoreService(node *node.WakuNode, m *chi.Mux) *StoreService { - s := &StoreService{ - node: node, - mux: m, - } - - m.Get(routeStoreMessagesV1, s.getV1Messages) - - return s -} - -func getStoreParams(r *http.Request) (*store.Query, []store.HistoryRequestOption, error) { - query := &store.Query{} - var options []store.HistoryRequestOption - var err error - peerAddrStr := r.URL.Query().Get("peerAddr") - var m multiaddr.Multiaddr - if peerAddrStr != "" { - m, err = multiaddr.NewMultiaddr(peerAddrStr) - if err != nil { - return nil, nil, err - } - options = append(options, store.WithPeerAddr(m)) - } - query.PubsubTopic = r.URL.Query().Get("pubsubTopic") - - contentTopics := r.URL.Query().Get("contentTopics") - if contentTopics != "" { - query.ContentTopics = strings.Split(contentTopics, ",") - } - - startTimeStr := r.URL.Query().Get("startTime") - if startTimeStr != "" { - startTime, err := strconv.ParseInt(startTimeStr, 10, 64) - if err != nil { - return nil, nil, err - } - query.StartTime = &startTime - } - - endTimeStr := r.URL.Query().Get("endTime") - if endTimeStr != "" { - endTime, err := strconv.ParseInt(endTimeStr, 10, 64) - if err != nil { - return nil, nil, err - } - query.EndTime = &endTime - } - - var cursor *pb.Index - - senderTimeStr := r.URL.Query().Get("senderTime") - storeTimeStr := r.URL.Query().Get("storeTime") - digestStr := r.URL.Query().Get("digest") - - if senderTimeStr != "" || storeTimeStr != "" || digestStr != "" { - cursor = &pb.Index{} - - if senderTimeStr != "" { - cursor.SenderTime, err = strconv.ParseInt(senderTimeStr, 10, 64) - if err != nil { - return nil, nil, err - } - } - - if storeTimeStr != "" { - cursor.ReceiverTime, err = strconv.ParseInt(storeTimeStr, 10, 64) - if err != nil { - return nil, nil, err - } - } - - if digestStr != "" { - cursor.Digest, err = base64.URLEncoding.DecodeString(digestStr) - if err != nil { - return nil, nil, err - } - } - - cursor.PubsubTopic = query.PubsubTopic - - options = append(options, store.WithCursor(cursor)) - } - - pageSizeStr := r.URL.Query().Get("pageSize") - ascendingStr := r.URL.Query().Get("ascending") - if ascendingStr != "" || pageSizeStr != "" { - ascending := true - pageSize := uint64(store.DefaultPageSize) - if ascendingStr != "" { - ascending, err = strconv.ParseBool(ascendingStr) - if err != nil { - return nil, nil, err - } - } - - if pageSizeStr != "" { - pageSize, err = strconv.ParseUint(pageSizeStr, 10, 64) - if err != nil { - return nil, nil, err - } - if pageSize > store.MaxPageSize { - pageSize = store.MaxPageSize - } - } - - options = append(options, store.WithPaging(ascending, pageSize)) - } - - return query, options, nil -} - -func writeStoreError(w http.ResponseWriter, code int, err error) { - writeResponse(w, StoreResponse{ErrorMessage: err.Error()}, code) -} - -func toStoreResponse(result *store.Result) StoreResponse { - response := StoreResponse{} - - cursor := result.Cursor() - if cursor != nil { - response.Cursor = &HistoryCursor{ - PubsubTopic: cursor.PubsubTopic, - SenderTime: fmt.Sprintf("%d", cursor.SenderTime), - StoreTime: fmt.Sprintf("%d", cursor.ReceiverTime), - Digest: cursor.Digest, - } - } - - for _, m := range result.Messages { - response.Messages = append(response.Messages, StoreWakuMessage{ - Payload: m.Payload, - ContentTopic: m.ContentTopic, - Version: m.Version, - Timestamp: m.Timestamp, - Meta: m.Meta, - }) - } - - return response -} - -func (d *StoreService) getV1Messages(w http.ResponseWriter, r *http.Request) { - query, options, err := getStoreParams(r) - if err != nil { - writeStoreError(w, http.StatusBadRequest, err) - return - } - - ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) - defer cancel() - - result, err := d.node.Store().Query(ctx, *query, options...) - if err != nil { - writeStoreError(w, http.StatusInternalServerError, err) - return - } - - writeErrOrResponse(w, nil, toStoreResponse(result)) -} diff --git a/cmd/waku/server/rest/store_api.yaml b/cmd/waku/server/rest/store_api.yaml deleted file mode 100644 index 41a313fe5..000000000 --- a/cmd/waku/server/rest/store_api.yaml +++ /dev/null @@ -1,203 +0,0 @@ -openapi: 3.0.3 -info: - title: Waku V2 node Store REST API - version: 1.0.0 - contact: - name: VAC Team - url: https://forum.vac.dev/ - -tags: - - name: store - description: Store REST API for WakuV2 node - -paths: - /store/v1/messages: - get: - summary: Gets message history - description: > - Retrieves WakuV2 message history. The returned history - can be potentially filtered by optional request parameters. - operationId: getMessageHistory - tags: - - store - parameters: - - name: peerAddr - in: query - schema: - type: string - required: true - description: > - P2P fully qualified peer multiaddress - in the format `(ip4|ip6)/tcp/p2p/$peerId` and URL-encoded. - example: '%2Fip4%2F127.0.0.1%2Ftcp%2F60001%2Fp2p%2F16Uiu2HAmVFXtAfSj4EiR7mL2KvL4EE2wztuQgUSBoj2Jx2KeXFLN' - - - name: pubsubTopic - in: query - schema: - type: string - description: > - The pubsub topic on which a WakuMessage is published. - If left empty, no filtering is applied. - It is also intended for pagination purposes. - It should be a URL-encoded string. - example: 'my%20pubsub%20topic' - - - name: contentTopics - in: query - schema: string - description: > - Comma-separated list of content topics. When specified, - only WakuMessages that are linked to any of the given - content topics will be delivered in the get response. - It should be a URL-encoded-comma-separated string. - example: 'my%20first%20content%20topic%2Cmy%20second%20content%20topic%2Cmy%20third%20content%20topic' - - - name: startTime - in: query - schema: - type: string - description: > - The inclusive lower bound on the timestamp of - queried WakuMessages. This field holds the - Unix epoch time in nanoseconds as a 64-bits - integer value. - example: '1680590945000000000' - - - name: endTime - in: query - schema: - type: string - description: > - The inclusive upper bound on the timestamp of - queried WakuMessages. This field holds the - Unix epoch time in nanoseconds as a 64-bits - integer value. - example: '1680590945000000000' - - - name: senderTime - in: query - schema: - type: string - description: > - Cursor field intended for pagination purposes. - Represents the Unix time in nanoseconds at which a message was generated. - It could be empty for retrieving the first page, - and will be returned from the GET response so that - it can be part of the next page request. - example: '1680590947000000000' - - - name: storeTime - in: query - schema: - type: string - description: > - Cursor field intended for pagination purposes. - Represents the Unix time in nanoseconds at which a message was stored. - It could be empty for retrieving the first page, - and will be returned from the GET response so that - it can be part of the next page request. - example: '1680590945000000000' - - - name: digest - in: query - schema: - type: string - description: > - Cursor field intended for pagination purposes. - URL-base64-encoded string computed as a hash of the - a message content topic plus a message payload. - It could be empty for retrieving the first page, - and will be returned from the GET response so that - it can be part of the next page request. - example: 'Gc4ACThW5t2QQO82huq3WnDv%2FapPPJpD%2FwJfxDxAnR0%3D' - - - name: pageSize - in: query - schema: - type: string - description: > - Number of messages to retrieve per page - example: '5' - - - name: ascending - in: query - schema: - type: string - description: > - "true" for paging forward, "false" for paging backward - example: "true" - - responses: - '200': - description: WakuV2 message history. - content: - application/json: - schema: - $ref: '#/components/schemas/StoreResponse' - '400': - description: Bad request error. - content: - text/plain: - type: string - '412': - description: Precondition failed. - content: - text/plain: - type: string - '500': - description: Internal server error. - content: - text/plain: - type: string - -components: - schemas: - StoreResponse: - type: object - properties: - messages: - type: array - items: - $ref: '#/components/schemas/WakuMessage' - cursor: - $ref: '#/components/schemas/HistoryCursor' - error_message: - type: string - required: - - messages - - HistoryCursor: - type: object - properties: - pubsub_topic: - type: string - sender_time: - type: string - store_time: - type: string - digest: - type: string - required: - - pubsub_topic - - sender_time - - store_time - - digest - - WakuMessage: - type: object - properties: - payload: - type: string - content_topic: - type: string - version: - type: integer - format: int32 - timestamp: - type: integer - format: int64 - ephemeral: - type: boolean - required: - - payload - - content_topic diff --git a/cmd/waku/server/rest/store_test.go b/cmd/waku/server/rest/store_test.go deleted file mode 100644 index 3b7eca13e..000000000 --- a/cmd/waku/server/rest/store_test.go +++ /dev/null @@ -1,97 +0,0 @@ -package rest - -import ( - "context" - "encoding/base64" - "encoding/json" - "fmt" - "net/http" - "net/http/httptest" - "net/url" - "testing" - - "github.com/go-chi/chi/v5" - "github.com/multiformats/go-multiaddr" - "github.com/stretchr/testify/require" - "github.com/waku-org/go-waku/tests" - "github.com/waku-org/go-waku/waku/v2/node" - "github.com/waku-org/go-waku/waku/v2/protocol" - "github.com/waku-org/go-waku/waku/v2/utils" - "google.golang.org/protobuf/proto" -) - -func TestGetMessages(t *testing.T) { - - db := MemoryDB(t) - - node1, err := node.New(node.WithWakuStore(), node.WithMessageProvider(db)) - require.NoError(t, err) - err = node1.Start(context.Background()) - require.NoError(t, err) - defer node1.Stop() - - topic1 := "1" - pubsubTopic1 := "topic1" - - now := *utils.GetUnixEpoch() - msg1 := tests.CreateWakuMessage(topic1, proto.Int64(now+1)) - msg2 := tests.CreateWakuMessage(topic1, proto.Int64(now+2)) - msg3 := tests.CreateWakuMessage(topic1, proto.Int64(now+3)) - - node1.Broadcaster().Submit(protocol.NewEnvelope(msg1, *utils.GetUnixEpoch(), pubsubTopic1)) - node1.Broadcaster().Submit(protocol.NewEnvelope(msg2, *utils.GetUnixEpoch(), pubsubTopic1)) - node1.Broadcaster().Submit(protocol.NewEnvelope(msg3, *utils.GetUnixEpoch(), pubsubTopic1)) - - n1HostInfo, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", node1.Host().ID().String())) - n1Addr := node1.ListenAddresses()[0].Encapsulate(n1HostInfo) - - node2, err := node.New() - require.NoError(t, err) - err = node2.Start(context.Background()) - require.NoError(t, err) - defer node2.Stop() - router := chi.NewRouter() - - _ = NewStoreService(node2, router) - - // TEST: get cursor - // TEST: get no messages - - // First page - rr := httptest.NewRecorder() - queryParams := url.Values{ - "peerAddr": {n1Addr.String()}, - "pubsubTopic": {pubsubTopic1}, - "pageSize": {"2"}, - } - path := routeStoreMessagesV1 + "?" + queryParams.Encode() - req, _ := http.NewRequest(http.MethodGet, path, nil) - router.ServeHTTP(rr, req) - require.Equal(t, http.StatusOK, rr.Code) - - response := StoreResponse{} - err = json.Unmarshal(rr.Body.Bytes(), &response) - require.NoError(t, err) - require.Len(t, response.Messages, 2) - - // Second page - rr = httptest.NewRecorder() - queryParams = url.Values{ - "peerAddr": {n1Addr.String()}, - "pubsubTopic": {pubsubTopic1}, - "senderTime": {response.Cursor.SenderTime}, - "storeTime": {response.Cursor.StoreTime}, - "digest": {base64.URLEncoding.EncodeToString(response.Cursor.Digest)}, - "pageSize": {"2"}, - } - path = routeStoreMessagesV1 + "?" + queryParams.Encode() - req, _ = http.NewRequest(http.MethodGet, path, nil) - router.ServeHTTP(rr, req) - require.Equal(t, http.StatusOK, rr.Code) - - response = StoreResponse{} - err = json.Unmarshal(rr.Body.Bytes(), &response) - require.NoError(t, err) - require.Len(t, response.Messages, 1) - require.Nil(t, response.Cursor) -} diff --git a/cmd/waku/server/rest/utils.go b/cmd/waku/server/rest/utils.go deleted file mode 100644 index e178f12cf..000000000 --- a/cmd/waku/server/rest/utils.go +++ /dev/null @@ -1,83 +0,0 @@ -package rest - -import ( - "encoding/json" - "fmt" - "net/http" - "net/url" - - "github.com/go-chi/chi/v5" - "go.uber.org/zap" -) - -// The functions writes error response in plain text format with specified statusCode -func writeErrResponse(w http.ResponseWriter, log *zap.Logger, err error, statusCode int) { - w.WriteHeader(statusCode) - _, err = w.Write([]byte(err.Error())) - if err != nil { - log.Error("error while writing response", zap.Error(err)) - } -} - -// This function writes error or response in json format with statusCode as 500 in case of error -func writeErrOrResponse(w http.ResponseWriter, err error, value interface{}) { - w.Header().Set("Content-Type", "application/json; charset=UTF-8") - - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - return - } - - jsonResponse, err := json.Marshal(value) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - return - } - - _, err = w.Write(jsonResponse) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - return - } -} - -// This function writes a response in json format -func writeResponse(w http.ResponseWriter, value interface{}, code int) { - w.Header().Set("Content-Type", "application/json; charset=UTF-8") - jsonResponse, err := json.Marshal(value) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - return - } - - // w.Write implicitly writes a 200 status code - // and only once we can write 2xx-5xx status code - // so any statusCode apart from 1xx being written to the header, will be ignored. - w.WriteHeader(code) - _, _ = w.Write(jsonResponse) -} - -func topicFromPath(w http.ResponseWriter, req *http.Request, field string, logger *zap.Logger) string { - topic := chi.URLParam(req, field) - if topic == "" { - errMissing := fmt.Errorf("missing %s", field) - writeGetMessageErr(w, errMissing, http.StatusBadRequest, logger) - return "" - } - topic, err := url.QueryUnescape(topic) - if err != nil { - errInvalid := fmt.Errorf("invalid %s format", field) - writeGetMessageErr(w, errInvalid, http.StatusBadRequest, logger) - return "" - } - return topic -} - -func writeGetMessageErr(w http.ResponseWriter, err error, code int, logger *zap.Logger) { - // write status before the body - w.WriteHeader(code) - logger.Error("get message", zap.Error(err)) - if _, err := w.Write([]byte(err.Error())); err != nil { - logger.Error("writing response", zap.Error(err)) - } -} diff --git a/cmd/waku/server/rest/utils_test.go b/cmd/waku/server/rest/utils_test.go deleted file mode 100644 index 12797f4ae..000000000 --- a/cmd/waku/server/rest/utils_test.go +++ /dev/null @@ -1,23 +0,0 @@ -package rest - -import ( - "database/sql" - "testing" - - "github.com/prometheus/client_golang/prometheus" - "github.com/stretchr/testify/require" - "github.com/waku-org/go-waku/waku/persistence" - "github.com/waku-org/go-waku/waku/persistence/sqlite" - "github.com/waku-org/go-waku/waku/v2/utils" -) - -func MemoryDB(t *testing.T) *persistence.DBStore { - var db *sql.DB - db, err := sqlite.NewDB(":memory:", utils.Logger()) - require.NoError(t, err) - - dbStore, err := persistence.NewDBStore(prometheus.DefaultRegisterer, utils.Logger(), persistence.WithDB(db), persistence.WithMigrations(sqlite.Migrations)) - require.NoError(t, err) - - return dbStore -} diff --git a/cmd/waku/server/rest/waku_rest.go b/cmd/waku/server/rest/waku_rest.go deleted file mode 100644 index 3126f37eb..000000000 --- a/cmd/waku/server/rest/waku_rest.go +++ /dev/null @@ -1,97 +0,0 @@ -package rest - -import ( - "context" - "fmt" - "net/http" - "sync" - - "github.com/go-chi/chi/v5" - "github.com/go-chi/chi/v5/middleware" - "github.com/waku-org/go-waku/waku/v2/node" - "go.uber.org/zap" -) - -type WakuRest struct { - node *node.WakuNode - server *http.Server - - log *zap.Logger - - relayService *RelayService - filterService *FilterService -} - -type RestConfig struct { - Address string - Port uint - EnablePProf bool - EnableAdmin bool - RelayCacheCapacity uint - FilterCacheCapacity uint -} - -func NewWakuRest(node *node.WakuNode, config RestConfig, log *zap.Logger) *WakuRest { - wrpc := new(WakuRest) - wrpc.log = log.Named("rest") - - mux := chi.NewRouter() - mux.Use(middleware.Logger) - mux.Use(middleware.NoCache) - - if config.EnablePProf { - mux.Mount("/debug", middleware.Profiler()) - } - - _ = NewDebugService(node, mux) - _ = NewHealthService(node, mux) - _ = NewStoreService(node, mux) - _ = NewLightpushService(node, mux, log) - - listenAddr := fmt.Sprintf("%s:%d", config.Address, config.Port) - - server := &http.Server{ - Addr: listenAddr, - Handler: mux, - } - - wrpc.node = node - wrpc.server = server - - if node.Relay() != nil { - relayService := NewRelayService(node, mux, config.RelayCacheCapacity, log) - wrpc.relayService = relayService - } - - if config.EnableAdmin { - _ = NewAdminService(node, mux, wrpc.log) - } - - if node.FilterLightnode() != nil { - filterService := NewFilterService(node, mux, int(config.FilterCacheCapacity), log) - server.RegisterOnShutdown(func() { - filterService.Stop() - }) - wrpc.filterService = filterService - } - - return wrpc -} - -func (r *WakuRest) Start(ctx context.Context, wg *sync.WaitGroup) { - defer wg.Done() - - if r.node.FilterLightnode() != nil { - go r.filterService.Start(ctx) - } - - go func() { - _ = r.server.ListenAndServe() - }() - r.log.Info("server started", zap.String("addr", r.server.Addr)) -} - -func (r *WakuRest) Stop(ctx context.Context) error { - r.log.Info("shutting down server") - return r.server.Shutdown(ctx) -} diff --git a/cmd/waku/server/rest/waku_rest_test.go b/cmd/waku/server/rest/waku_rest_test.go deleted file mode 100644 index 0d62c22e0..000000000 --- a/cmd/waku/server/rest/waku_rest_test.go +++ /dev/null @@ -1,19 +0,0 @@ -package rest - -import ( - "testing" - - "github.com/stretchr/testify/require" - "github.com/waku-org/go-waku/waku/v2/node" - "github.com/waku-org/go-waku/waku/v2/utils" -) - -func TestWakuRest(t *testing.T) { - options := node.WithWakuStore() - n, err := node.New(options) - require.NoError(t, err) - - rpc := NewWakuRest(n, RestConfig{Address: "127.0.0.1", Port: 8080, EnablePProf: false, EnableAdmin: false, RelayCacheCapacity: 10}, utils.Logger()) - require.NotNil(t, rpc.server) - require.Equal(t, rpc.server.Addr, "127.0.0.1:8080") -} diff --git a/cmd/waku/server/rln.go b/cmd/waku/server/rln.go deleted file mode 100644 index b203f91e5..000000000 --- a/cmd/waku/server/rln.go +++ /dev/null @@ -1,23 +0,0 @@ -//go:build !gowaku_no_rln -// +build !gowaku_no_rln - -package server - -import ( - "fmt" - - "github.com/waku-org/go-waku/waku/v2/node" - "github.com/waku-org/go-waku/waku/v2/protocol/pb" - "github.com/waku-org/go-waku/waku/v2/protocol/rln" -) - -func AppendRLNProof(node *node.WakuNode, msg *pb.WakuMessage) error { - _, rlnEnabled := node.RLNRelay().(*rln.WakuRLNRelay) - if rlnEnabled { - err := node.RLNRelay().AppendRLNProof(msg, node.Timesource().Now()) - if err != nil { - return fmt.Errorf("could not append rln proof: %w", err) - } - } - return nil -} diff --git a/cmd/waku/server/utils.go b/cmd/waku/server/utils.go deleted file mode 100644 index 865d09009..000000000 --- a/cmd/waku/server/utils.go +++ /dev/null @@ -1,48 +0,0 @@ -package server - -import ( - "encoding/base64" - "strings" - - "github.com/libp2p/go-libp2p/core/protocol" - "github.com/waku-org/go-waku/waku/v2/protocol/filter" - "github.com/waku-org/go-waku/waku/v2/protocol/lightpush" - "github.com/waku-org/go-waku/waku/v2/protocol/relay" - "github.com/waku-org/go-waku/waku/v2/protocol/store" -) - -func IsWakuProtocol(protocol protocol.ID) bool { - return protocol == filter.FilterPushID_v20beta1 || - protocol == filter.FilterSubscribeID_v20beta1 || - protocol == relay.WakuRelayID_v200 || - protocol == lightpush.LightPushID_v20beta1 || - protocol == store.StoreID_v20beta4 -} - -type Base64URLByte []byte - -// UnmarshalText is used by json.Unmarshal to decode both url-safe and standard -// base64 encoded strings with and without padding -func (h *Base64URLByte) UnmarshalText(b []byte) error { - inputValue := "" - if b != nil { - inputValue = string(b) - } - - enc := base64.StdEncoding - if strings.ContainsAny(inputValue, "-_") { - enc = base64.URLEncoding - } - if len(inputValue)%4 != 0 { - enc = enc.WithPadding(base64.NoPadding) - } - - decodedBytes, err := enc.DecodeString(inputValue) - if err != nil { - return err - } - - *h = decodedBytes - - return nil -} From 6fbe631f2c2f1152b6450e38126e8e9ec20f1a68 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Wed, 21 Feb 2024 11:22:56 +0530 Subject: [PATCH 2/2] chore: fix makefile --- Makefile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index 554461bd5..4a9f6b743 100644 --- a/Makefile +++ b/Makefile @@ -77,10 +77,10 @@ lint-full: @golangci-lint run ./... --config=./.golangci.full.yaml --deadline=5m test-with-race: - ${GOBIN} test -race -timeout 300s ./waku/... ./cmd/waku/server/... + ${GOBIN} test -race -timeout 300s ./waku/... test: - ${GOBIN} test -timeout 300s ./waku/... ./cmd/waku/server/... -coverprofile=${GO_TEST_OUTFILE}.tmp -coverpkg ./... + ${GOBIN} test -timeout 300s ./waku/... -coverprofile=${GO_TEST_OUTFILE}.tmp -coverpkg ./... cat ${GO_TEST_OUTFILE}.tmp | grep -v ".pb.go" > ${GO_TEST_OUTFILE} ${GOBIN} tool cover -html=${GO_TEST_OUTFILE} -o ${GO_HTML_COV}