From 8ee38faa134bc9f3f2caed7f3cb2f33cf5e4632b Mon Sep 17 00:00:00 2001 From: ersonp Date: Mon, 17 Jun 2024 11:43:16 +0530 Subject: [PATCH] feat: Add `publish` and update `connect` --- cmd/skywire-cli/commands/net/connect.go | 3 +- cmd/skywire-cli/commands/net/publish.go | 8 +- pkg/app/appnet/connect.go | 117 ++++++------ pkg/app/appnet/publish.go | 226 ++++++++++++++++++++++++ pkg/visor/api.go | 64 +++---- pkg/visor/rpc.go | 16 ++ pkg/visor/rpc_client.go | 23 +++ 7 files changed, 356 insertions(+), 101 deletions(-) create mode 100644 pkg/app/appnet/publish.go diff --git a/cmd/skywire-cli/commands/net/connect.go b/cmd/skywire-cli/commands/net/connect.go index 6031496f5..bfb7f8517 100644 --- a/cmd/skywire-cli/commands/net/connect.go +++ b/cmd/skywire-cli/commands/net/connect.go @@ -56,7 +56,7 @@ var conCmd = &cobra.Command{ internal.Catch(cmd.Flags(), err) for _, connectConn := range connectConns { - _, err = fmt.Fprintf(w, "%s\t%s\t%s\n", connectConn.ID, strconv.Itoa(int(connectConn.LocalPort)), + _, err = fmt.Fprintf(w, "%s\t%s\t%s\n", connectConn.ID, strconv.Itoa(int(connectConn.WebPort)), strconv.Itoa(int(connectConn.RemotePort))) internal.Catch(cmd.Flags(), err) } @@ -97,7 +97,6 @@ var conCmd = &cobra.Command{ if 65536 < remotePort || 65536 < localPort { internal.PrintFatalError(cmd.Flags(), fmt.Errorf("port cannot be greater than 65535")) } - id, err := rpcClient.Connect(remotePK, remotePort, localPort) internal.Catch(cmd.Flags(), err) internal.PrintOutput(cmd.Flags(), id, fmt.Sprintln(id)) diff --git a/cmd/skywire-cli/commands/net/publish.go b/cmd/skywire-cli/commands/net/publish.go index 624b04e08..ab574fe25 100644 --- a/cmd/skywire-cli/commands/net/publish.go +++ b/cmd/skywire-cli/commands/net/publish.go @@ -78,11 +78,15 @@ var pubCmd = &cobra.Command{ if deregister { err = rpcClient.DeregisterHTTPPort(portNo) + internal.Catch(cmd.Flags(), err) + } else { err = rpcClient.RegisterHTTPPort(portNo) + internal.Catch(cmd.Flags(), err) + id, err := rpcClient.Publish(portNo) + internal.Catch(cmd.Flags(), err) + internal.PrintOutput(cmd.Flags(), "id: %v\n", fmt.Sprintln(id)) } - internal.Catch(cmd.Flags(), err) - internal.PrintOutput(cmd.Flags(), "OK", "OK\n") }, } diff --git a/pkg/app/appnet/connect.go b/pkg/app/appnet/connect.go index da28e4f8b..67c9ae189 100644 --- a/pkg/app/appnet/connect.go +++ b/pkg/app/appnet/connect.go @@ -8,10 +8,11 @@ import ( "net" "net/http" "sync" - "time" + "github.com/gin-gonic/gin" "github.com/google/uuid" + "github.com/skycoin/skywire-utilities/pkg/cipher" "github.com/skycoin/skywire-utilities/pkg/logging" ) @@ -54,39 +55,37 @@ func RemoveConnectConn(id uuid.UUID) { // ConnectConn represents a connection that is published on the skywire network type ConnectConn struct { ID uuid.UUID - LocalPort int + WebPort int RemotePort int remoteConn net.Conn + r *gin.Engine closeOnce sync.Once - srv *http.Server closeChan chan struct{} log *logging.Logger } // NewConnectConn creates a new ConnectConn -func NewConnectConn(log *logging.Logger, remoteConn net.Conn, remotePort, localPort int) *ConnectConn { - closeChan := make(chan struct{}) - var once sync.Once - handler := http.NewServeMux() - var lock sync.Mutex - handler.HandleFunc("/", handleFunc(remoteConn, log, closeChan, once, &lock)) - - srv := &http.Server{ - Addr: fmt.Sprintf(":%v", localPort), - Handler: handler, - ReadTimeout: 10 * time.Second, - WriteTimeout: 10 * time.Second, - MaxHeaderBytes: 1 << 20, - } +func NewConnectConn(log *logging.Logger, remoteConn net.Conn, remotePK cipher.PubKey, remotePort, webPort int) *ConnectConn { + + httpC := &http.Client{Transport: MakeHTTPTransport(remoteConn, log)} + + r := gin.New() + + r.Use(gin.Recovery()) + + r.Use(loggingMiddleware()) + + r.Any("/*path", handleConnectFunc(httpC, remotePK, remotePort)) + fwdConn := &ConnectConn{ ID: uuid.New(), remoteConn: remoteConn, - srv: srv, - LocalPort: localPort, + WebPort: webPort, RemotePort: remotePort, - closeChan: closeChan, log: log, + r: r, } + AddConnect(fwdConn) return fwdConn } @@ -94,7 +93,7 @@ func NewConnectConn(log *logging.Logger, remoteConn net.Conn, remotePort, localP // Serve serves a HTTP forward conn that accepts all requests and forwards them directly to the remote server over the specified net.Conn. func (f *ConnectConn) Serve() { go func() { - err := f.srv.ListenAndServe() + err := f.r.Run(":" + fmt.Sprintf("%v", f.WebPort)) //nolint if err != nil { // don't print error if local server is closed if !errors.Is(err, http.ErrServerClosed) { @@ -109,71 +108,57 @@ func (f *ConnectConn) Serve() { f.log.Error(err) } }() - f.log.Debugf("Serving on localhost:%v", f.LocalPort) + f.log.Debugf("Serving on localhost:%v", f.WebPort) } // Close closes the server and remote connection. func (f *ConnectConn) Close() (err error) { f.closeOnce.Do(func() { - err = f.srv.Close() err = f.remoteConn.Close() RemoveConnectConn(f.ID) }) return err } -func isClosed(c chan struct{}) bool { - select { - case <-c: - return true - default: - return false - } -} - -func handleFunc(remoteConn net.Conn, log *logging.Logger, closeChan chan struct{}, once sync.Once, lock *sync.Mutex) func(w http.ResponseWriter, req *http.Request) { - return func(w http.ResponseWriter, r *http.Request) { - lock.Lock() - defer lock.Unlock() +func handleConnectFunc(httpC *http.Client, remotePK cipher.PubKey, remotePort int) func(c *gin.Context) { + return func(c *gin.Context) { + var urlStr string + urlStr = fmt.Sprintf("sky://%s:%v%s", remotePK, remotePort, c.Param("path")) + if c.Request.URL.RawQuery != "" { + urlStr = fmt.Sprintf("%s?%s", urlStr, c.Request.URL.RawQuery) + } - if isClosed(closeChan) { + fmt.Printf("Proxying request: %s %s\n", c.Request.Method, urlStr) + req, err := http.NewRequest(c.Request.Method, urlStr, c.Request.Body) + if err != nil { + c.String(http.StatusInternalServerError, "Failed to create HTTP request") return } - client := http.Client{Transport: MakeHTTPTransport(remoteConn, log)} - // Forward request to remote server - resp, err := client.Transport.RoundTrip(r) + + for header, values := range c.Request.Header { + for _, value := range values { + req.Header.Add(header, value) + } + } + + resp, err := httpC.Do(req) if err != nil { - http.Error(w, "Could not reach remote server", 500) - log.WithError(err).Errorf("Could not reach remote server %v", resp) - once.Do(func() { - close(closeChan) - }) + c.String(http.StatusInternalServerError, "Failed to connect to HTTP server") + fmt.Printf("Error: %v\n", err) return } + defer resp.Body.Close() //nolint - defer func() { - if err := resp.Body.Close(); err != nil { - log.WithError(err).Errorln("Failed to close forwarding response body") - } - }() - for key, value := range resp.Header { - for _, v := range value { - w.Header().Set(key, v) + for header, values := range resp.Header { + for _, value := range values { + c.Writer.Header().Add(header, value) } } - w.WriteHeader(resp.StatusCode) - // Transfer response from remote server -> client - if resp.ContentLength > 0 { - if _, err := io.CopyN(w, resp.Body, resp.ContentLength); err != nil { - log.Warn(err) - } - } else if resp.Close { - // Copy until EOF or some other error occurs - for { - if _, err := io.Copy(w, resp.Body); err != nil { - break - } - } + + c.Status(resp.StatusCode) + if _, err := io.Copy(c.Writer, resp.Body); err != nil { + c.String(http.StatusInternalServerError, "Failed to copy response body") + fmt.Printf("Error copying response body: %v\n", err) } } } diff --git a/pkg/app/appnet/publish.go b/pkg/app/appnet/publish.go new file mode 100644 index 000000000..802609fa7 --- /dev/null +++ b/pkg/app/appnet/publish.go @@ -0,0 +1,226 @@ +// Package appnet pkg/app/appnet/forwarding.go +package appnet + +import ( + "errors" + "fmt" + "net" + "net/http" + "net/http/httputil" + "net/url" + "sync" + "time" + + "github.com/gin-gonic/gin" + "github.com/google/uuid" + + "github.com/skycoin/skywire-utilities/pkg/logging" +) + +// nolint: gochecknoglobals +var ( + publishListenertners = make(map[uuid.UUID]*publishListener) + publishListenerMu sync.Mutex +) + +// AddPublish adds publishListener to with it's ID +func AddPublish(fwd *publishListener) { + publishListenerMu.Lock() + defer publishListenerMu.Unlock() + publishListenertners[fwd.ID] = fwd +} + +// GetpublishListenertner get's a publishListener by ID +func GetpublishListenertner(id uuid.UUID) *publishListener { + publishListenerMu.Lock() + defer publishListenerMu.Unlock() + + return publishListenertners[id] +} + +// GetAllpublishListenertners gets all publishListeners +func GetAllpublishListenertners() map[uuid.UUID]*publishListener { + publishListenerMu.Lock() + defer publishListenerMu.Unlock() + + return publishListenertners +} + +// RemovepublishListener removes a publishListener by ID +func RemovepublishListener(id uuid.UUID) { + publishListenerMu.Lock() + defer publishListenerMu.Unlock() + delete(publishListenertners, id) +} + +// publishListener represents a publishion that is published on the skywire network +type publishListener struct { + ID uuid.UUID + LocalPort int + lis net.Listener + closeOnce sync.Once + srv *http.Server + closeChan chan struct{} + log *logging.Logger +} + +type ginHandler struct { + Router *gin.Engine +} + +func (h *ginHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + h.Router.ServeHTTP(w, r) +} + +// NewPublishListener creates a new publishListener +func NewPublishListener(log *logging.Logger, lis net.Listener, localPort int) *publishListener { + closeChan := make(chan struct{}) + r1 := gin.New() + r1.Use(gin.Recovery()) + r1.Use(loggingMiddleware()) + authRoute := r1.Group("/") + authRoute.Any("/*path", func(c *gin.Context) { + log.Error("Request received") + targetURL, _ := url.Parse(fmt.Sprintf("http://127.0.0.1:%v%s?%s", localPort, c.Request.URL.Path, c.Request.URL.RawQuery)) //nolint + proxy := httputil.ReverseProxy{ + Director: func(req *http.Request) { + req.URL = targetURL + req.Host = targetURL.Host + req.Method = c.Request.Method + }, + Transport: &http.Transport{}, + } + proxy.ServeHTTP(c.Writer, c.Request) + }) + srv := &http.Server{ + Handler: &ginHandler{Router: r1}, + ReadHeaderTimeout: 5 * time.Second, + ReadTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + } + + pubLis := &publishListener{ + ID: uuid.New(), + srv: srv, + lis: lis, + LocalPort: localPort, + closeChan: closeChan, + log: log, + } + AddPublish(pubLis) + return pubLis +} + +// Serve serves a HTTP forward Lis that accepts all requests and forwards them directly to the remote server over the specified net.Lis. +func (f *publishListener) Listen() { + go func() { + err := f.srv.Serve(f.lis) + if err != nil { + // don't print error if local server is closed + if !errors.Is(err, http.ErrServerClosed) { + f.log.WithError(err).Error("Error listening and serving app forwarding.") + } + } + }() + go func() { + <-f.closeChan + err := f.Close() + if err != nil { + f.log.Error(err) + } + }() + f.log.Debugf("Serving HTTP on dmsg port %v with DMSG listener %s", f.LocalPort, f.lis.Addr().String()) +} + +// Close closes the server and remote publishion. +func (f *publishListener) Close() (err error) { + f.closeOnce.Do(func() { + f.log.Error("Closing publishListener") + err = f.srv.Close() + err = f.lis.Close() + RemovepublishListener(f.ID) + }) + return err +} + +func loggingMiddleware() gin.HandlerFunc { + return func(c *gin.Context) { + start := time.Now() + c.Next() + latency := time.Since(start) + if latency > time.Minute { + latency = latency.Truncate(time.Second) + } + statusCode := c.Writer.Status() + method := c.Request.Method + path := c.Request.URL.Path + // Get the background color based on the status code + statusCodeBackgroundColor := getBackgroundColor(statusCode) + // Get the method color + methodColor := getMethodColor(method) + // Print the logging in a custom format which includes the publickeyfrom c.Request.RemoteAddr ex.: + // [DMSGHTTP] 2023/05/18 - 19:43:15 | 200 | 10.80885ms | | 02b5ee5333aa6b7f5fc623b7d5f35f505cb7f974e98a70751cf41962f84c8c4637:49153 | GET /node-info.json + fmt.Printf("[DMSGWEB] %s |%s %3d %s| %13v | %15s | %72s |%s %-7s %s %s\n", + time.Now().Format("2006/01/02 - 15:04:05"), + statusCodeBackgroundColor, + statusCode, + resetColor(), + latency, + c.ClientIP(), + c.Request.RemoteAddr, + methodColor, + method, + resetColor(), + path, + ) + } +} + +func getBackgroundColor(statusCode int) string { + switch { + case statusCode >= http.StatusOK && statusCode < http.StatusMultipleChoices: + return green + case statusCode >= http.StatusMultipleChoices && statusCode < http.StatusBadRequest: + return white + case statusCode >= http.StatusBadRequest && statusCode < http.StatusInternalServerError: + return yellow + default: + return red + } +} + +func getMethodColor(method string) string { + switch method { + case http.MethodGet: + return blue + case http.MethodPost: + return cyan + case http.MethodPut: + return yellow + case http.MethodDelete: + return red + case http.MethodPatch: + return green + case http.MethodHead: + return magenta + case http.MethodOptions: + return white + default: + return reset + } +} + +func resetColor() string { + return reset +} + +const ( + green = "\033[97;42m" + white = "\033[90;47m" + yellow = "\033[90;43m" + red = "\033[97;41m" + blue = "\033[97;44m" + magenta = "\033[97;45m" + cyan = "\033[97;46m" + reset = "\033[0m" +) diff --git a/pkg/visor/api.go b/pkg/visor/api.go index b57dfeaa0..744a68165 100644 --- a/pkg/visor/api.go +++ b/pkg/visor/api.go @@ -119,6 +119,8 @@ type API interface { RegisterHTTPPort(localPort int) error DeregisterHTTPPort(localPort int) error ListHTTPPorts() ([]int, error) + Publish(localPort int) (uuid.UUID, error) + Depublish(id uuid.UUID) error Connect(remotePK cipher.PubKey, remotePort, localPort int) (uuid.UUID, error) Disconnect(id uuid.UUID) error List() (map[uuid.UUID]*appnet.ConnectConn, error) @@ -1582,6 +1584,7 @@ func (v *Visor) ListHTTPPorts() ([]int, error) { // Connect implements API. func (v *Visor) Connect(remotePK cipher.PubKey, remotePort, localPort int) (uuid.UUID, error) { + v.log.Errorf("Connecting to %v:%v via %v", remotePK, remotePort, localPort) ok := isPortAvailable(v.log, localPort) if !ok { return uuid.UUID{}, fmt.Errorf(":%v local port already in use", localPort) @@ -1589,7 +1592,7 @@ func (v *Visor) Connect(remotePK cipher.PubKey, remotePort, localPort int) (uuid connApp := appnet.Addr{ Net: appnet.TypeSkynet, PubKey: remotePK, - Port: routing.Port(skyenv.SkyForwardingServerPort), + Port: routing.Port(remotePort), } conn, err := appnet.Dial(connApp) if err != nil { @@ -1600,46 +1603,45 @@ func (v *Visor) Connect(remotePK cipher.PubKey, remotePort, localPort int) (uuid return uuid.UUID{}, err } - cMsg := clientMsg{ - Port: remotePort, - } + connectConn := appnet.NewConnectConn(v.log, remoteConn, remotePK, remotePort, localPort) + connectConn.Serve() + return connectConn.ID, nil +} - clientMsg, err := json.Marshal(cMsg) - if err != nil { - return uuid.UUID{}, err +// Disconnect implements API. +func (v *Visor) Disconnect(id uuid.UUID) error { + connectConn := appnet.GetConnectConn(id) + return connectConn.Close() +} + +// Publish implements API. +func (v *Visor) Publish(localPort int) (uuid.UUID, error) { + v.log.Errorf("Publishing on %v:%v", v.conf.PK, localPort) + ok := isPortAvailable(v.log, localPort) + if ok { + return uuid.UUID{}, fmt.Errorf(":%v local port not in use", localPort) } - _, err = remoteConn.Write([]byte(clientMsg)) - if err != nil { - return uuid.UUID{}, err + connApp := appnet.Addr{ + Net: appnet.TypeSkynet, + PubKey: v.conf.PK, + Port: routing.Port(localPort), } - v.log.Debugf("Msg sent %s", clientMsg) - buf := make([]byte, 32*1024) - n, err := remoteConn.Read(buf) + lis, err := appnet.Listen(connApp) if err != nil { return uuid.UUID{}, err } - var sReply serverReply - err = json.Unmarshal(buf[:n], &sReply) - if err != nil { - return uuid.UUID{}, err - } - v.log.Debugf("Received: %v", sReply) - if sReply.Error != nil { - sErr := sReply.Error - v.log.WithError(fmt.Errorf(*sErr)).Error("Server closed with error") - return uuid.UUID{}, fmt.Errorf(*sErr) - } - connectConn := appnet.NewConnectConn(v.log, remoteConn, remotePort, localPort) - connectConn.Serve() - return connectConn.ID, nil + publishLis := appnet.NewPublishListener(v.log, lis, localPort) + publishLis.Listen() + + return publishLis.ID, nil } -// Disconnect implements API. -func (v *Visor) Disconnect(id uuid.UUID) error { - connectConn := appnet.GetConnectConn(id) - return connectConn.Close() +// Depublish implements API. +func (v *Visor) Depublish(id uuid.UUID) error { + forwardConn := appnet.GetConnectConn(id) + return forwardConn.Close() } // List implements API. diff --git a/pkg/visor/rpc.go b/pkg/visor/rpc.go index 8d03a9f89..58ecc8bdc 100644 --- a/pkg/visor/rpc.go +++ b/pkg/visor/rpc.go @@ -770,6 +770,22 @@ func (r *RPC) Disconnect(id *uuid.UUID, _ *struct{}) (err error) { return err } +// Connect creates a connection with the remote visor to listen on the remote port and serve that on the local port +func (r *RPC) Publish(localPort *int, out *uuid.UUID) (err error) { + defer rpcutil.LogCall(r.log, "Publish", localPort)(out, &err) + + id, err := r.visor.Publish(*localPort) + *out = id + return err +} + +// Disconnect breaks the connection with the given id +func (r *RPC) Depublish(id *uuid.UUID, _ *struct{}) (err error) { + defer rpcutil.LogCall(r.log, "Depublish", id)(nil, &err) + err = r.visor.Depublish(*id) + return err +} + // List returns all the ongoing skyforwarding connections func (r *RPC) List(_ *struct{}, out *map[uuid.UUID]*appnet.ConnectConn) (err error) { defer rpcutil.LogCall(r.log, "List", nil)(out, &err) diff --git a/pkg/visor/rpc_client.go b/pkg/visor/rpc_client.go index 4b9dffda8..a26c444fe 100644 --- a/pkg/visor/rpc_client.go +++ b/pkg/visor/rpc_client.go @@ -572,6 +572,19 @@ func (rc *rpcClient) Disconnect(id uuid.UUID) error { return err } +// Publish calls Publish. +func (rc *rpcClient) Publish(localPort int) (uuid.UUID, error) { + var out uuid.UUID + err := rc.Call("Publish", &localPort, &out) + return out, err +} + +// Depublish calls Depublish. +func (rc *rpcClient) Depublish(id uuid.UUID) error { + err := rc.Call("Depublish", &id, &struct{}{}) + return err +} + // List calls List. func (rc *rpcClient) List() (map[uuid.UUID]*appnet.ConnectConn, error) { var out map[uuid.UUID]*appnet.ConnectConn @@ -1320,6 +1333,16 @@ func (mc *mockRPCClient) Disconnect(id uuid.UUID) error { //nolint:all return nil } +// Publish implements API. +func (mc *mockRPCClient) Publish(localPort int) (uuid.UUID, error) { //nolint:all + return uuid.UUID{}, nil +} + +// Depublish implements API. +func (mc *mockRPCClient) Depublish(id uuid.UUID) error { //nolint:all + return nil +} + // List implements API. func (mc *mockRPCClient) List() (map[uuid.UUID]*appnet.ConnectConn, error) { return nil, nil