diff --git a/pkg/visor/api.go b/pkg/visor/api.go index 52496aa798..1700c80de3 100644 --- a/pkg/visor/api.go +++ b/pkg/visor/api.go @@ -46,6 +46,7 @@ type API interface { Uptime() (float64, error) Reload() error Shutdown() error + ShutdownWithoutOsExit() error RuntimeLogs() (string, error) RemoteVisors() ([]string, error) GetLogRotationInterval() (visorconfig.Duration, error) @@ -221,19 +222,21 @@ func (v *Visor) Overview() (*Overview, error) { // Summary provides detailed info including overview and health of the visor. type Summary struct { - Overview *Overview `json:"overview"` - Health *HealthInfo `json:"health"` - Uptime float64 `json:"uptime"` - Routes []routingRuleResp `json:"routes"` - IsHypervisor bool `json:"is_hypervisor,omitempty"` - DmsgStats *dmsgtracker.DmsgClientSummary `json:"dmsg_stats"` - Online bool `json:"online"` - MinHops uint16 `json:"min_hops"` - PersistentTransports []transport.PersistentTransports `json:"persistent_transports"` - SkybianBuildVersion string `json:"skybian_build_version"` - RewardAddress string `json:"reward_address"` - BuildTag string `json:"build_tag"` - PublicAutoconnect bool `json:"public_autoconnect"` + Overview *Overview `json:"overview"` + Health *HealthInfo `json:"health"` + Uptime float64 `json:"uptime"` + Routes []routingRuleResp `json:"routes"` + LocalForwardedPorts []int `json:"local_forwarded_ports"` + RemoteConnectedPorts map[uuid.UUID]*appnet.ForwardConn `json:"remote_connected_ports"` + IsHypervisor bool `json:"is_hypervisor,omitempty"` + DmsgStats *dmsgtracker.DmsgClientSummary `json:"dmsg_stats"` + Online bool `json:"online"` + MinHops uint16 `json:"min_hops"` + PersistentTransports []transport.PersistentTransports `json:"persistent_transports"` + SkybianBuildVersion string `json:"skybian_build_version"` + RewardAddress string `json:"reward_address"` + BuildTag string `json:"build_tag"` + PublicAutoconnect bool `json:"public_autoconnect"` } // BuildTag variable that will set when building binary @@ -261,6 +264,16 @@ func (v *Visor) Summary() (*Summary, error) { return nil, fmt.Errorf("routes") } + localPorts, err := v.ListHTTPPorts() + if err != nil { + return nil, fmt.Errorf("fwd") + } + + remotePorts, err := v.List() + if err != nil { + return nil, fmt.Errorf("rev") + } + skybianBuildVersion := v.SkybianBuildVersion() extraRoutes := make([]routingRuleResp, 0, len(routes)) @@ -293,6 +306,8 @@ func (v *Visor) Summary() (*Summary, error) { Health: health, Uptime: uptime, Routes: extraRoutes, + LocalForwardedPorts: localPorts, + RemoteConnectedPorts: remotePorts, MinHops: v.conf.Routing.MinHops, PersistentTransports: pts, SkybianBuildVersion: skybianBuildVersion, @@ -1450,6 +1465,14 @@ func (v *Visor) Shutdown() error { return v.Close() } +// ShutdownWithoutOsExit implements API. +func (v *Visor) ShutdownWithoutOsExit() error { + if v.restartCtx == nil { + return ErrMalformedRestartContext + } + return v.Close() +} + // RuntimeLogs returns visor runtime logs func (v *Visor) RuntimeLogs() (string, error) { var builder strings.Builder diff --git a/pkg/visor/hypervisor.go b/pkg/visor/hypervisor.go index 016e3a403f..9765d9dd15 100644 --- a/pkg/visor/hypervisor.go +++ b/pkg/visor/hypervisor.go @@ -10,6 +10,7 @@ import ( "io" "math/rand" "net/http" + "os" "strconv" "strings" "sync" @@ -38,7 +39,8 @@ import ( ) const ( - httpTimeout = 30 * time.Second + // HTTPWriteTimeout is the max time, in seconds, for returning a http response. + HTTPWriteTimeout = 30 * time.Second ) const ( @@ -211,7 +213,7 @@ func (hv *Hypervisor) makeMux() chi.Router { r.Route("/", func(r chi.Router) { r.Route("/api", func(r chi.Router) { - r.Use(middleware.Timeout(httpTimeout)) + r.Use(middleware.Timeout(HTTPWriteTimeout)) r.Get("/ping", hv.getPong()) @@ -261,6 +263,7 @@ func (hv *Hypervisor) makeMux() chi.Router { r.Delete("/visors/{pk}/routes/{rid}", hv.deleteRoute()) r.Delete("/visors/{pk}/routes/", hv.deleteRoutes()) r.Get("/visors/{pk}/routegroups", hv.getRouteGroups()) + r.Post("/visors/{pk}/restart", hv.restart()) r.Post("/visors/{pk}/shutdown", hv.shutdown()) r.Get("/visors/{pk}/runtime-logs", hv.getRuntimeLogs()) r.Post("/visors/{pk}/min-hops", hv.postMinHops()) @@ -271,6 +274,13 @@ func (hv *Hypervisor) makeMux() chi.Router { r.Get("/visors/{pk}/reward", hv.getRewardAddress()) r.Put("/visors/{pk}/reward", hv.putRewardAddress()) r.Delete("/visors/{pk}/reward", hv.deleteRewardAddress()) + r.Get("/visors/{pk}/fwd", hv.getLocalFwdPorts()) + r.Post("/visors/{pk}/fwd", hv.postLocalFwdPort()) + r.Delete("/visors/{pk}/fwd/{port}", hv.deleteLocalFwdPort()) + r.Get("/visors/{pk}/rev", hv.getRemoteRevPorts()) + r.Post("/visors/{pk}/rev", hv.postRemoteRevPort()) + r.Delete("/visors/{pk}/rev/{id}", hv.deleteRemoteRevPort()) + r.Post("/visors/{pk}/ping-visor", hv.postPingVisor()) }) }) @@ -1181,6 +1191,23 @@ func (hv *Hypervisor) shutdown() http.HandlerFunc { }) } +func (hv *Hypervisor) shutdown() http.HandlerFunc { + return hv.withCtx(hv.visorCtx, func(w http.ResponseWriter, r *http.Request, ctx *httpCtx) { + if err := ctx.API.ShutdownWithoutOsExit(); err != nil { + httputil.WriteJSON(w, r, http.StatusInternalServerError, err) + return + } + + httputil.WriteJSON(w, r, http.StatusOK, true) + + // Wait a few seconds to be able so send the response. + go func() { + time.Sleep(8 * time.Second) + go os.Exit(0) + }() + }) +} + func (hv *Hypervisor) getRuntimeLogs() http.HandlerFunc { return hv.withCtx(hv.visorCtx, func(w http.ResponseWriter, r *http.Request, ctx *httpCtx) { logs, err := ctx.API.RuntimeLogs() @@ -1331,6 +1358,157 @@ func (hv *Hypervisor) deleteRewardAddress() http.HandlerFunc { }) } +// getLocalFwdPorts lists registered local ports +func (hv *Hypervisor) getLocalFwdPorts() http.HandlerFunc { + return hv.withCtx(hv.visorCtx, func(w http.ResponseWriter, r *http.Request, ctx *httpCtx) { + ports, err := ctx.API.ListHTTPPorts() + if err != nil { + httputil.WriteJSON(w, r, http.StatusInternalServerError, err) + return + } + + httputil.WriteJSON(w, r, http.StatusOK, ports) + }) +} + +// postLocalFwdPort registers a local port +func (hv *Hypervisor) postLocalFwdPort() http.HandlerFunc { + return hv.withCtx(hv.visorCtx, func(w http.ResponseWriter, r *http.Request, ctx *httpCtx) { + var reqBody struct { + Port int `json:"port"` + } + + if err := httputil.ReadJSON(r, &reqBody); err != nil { + if err != io.EOF { + hv.log(r).Warnf("postLocalFwdPort request: %v", err) + } + httputil.WriteJSON(w, r, http.StatusBadRequest, usermanager.ErrMalformedRequest) + return + } + + if err := ctx.API.RegisterHTTPPort(reqBody.Port); err != nil { + httputil.WriteJSON(w, r, http.StatusInternalServerError, err) + return + } + httputil.WriteJSON(w, r, http.StatusOK, struct{}{}) + }) +} + +// deleteLocalFwdPort deregisters a local port +func (hv *Hypervisor) deleteLocalFwdPort() http.HandlerFunc { + return hv.withCtx(hv.fwdCtx, func(w http.ResponseWriter, r *http.Request, ctx *httpCtx) { + err := ctx.API.DeregisterHTTPPort(ctx.FwdPort) + if err != nil { + httputil.WriteJSON(w, r, http.StatusInternalServerError, err) + return + } + httputil.WriteJSON(w, r, http.StatusOK, struct{}{}) + }) +} + +// getRemoteRevPorts list configured connections to remote ports +func (hv *Hypervisor) getRemoteRevPorts() http.HandlerFunc { + return hv.withCtx(hv.visorCtx, func(w http.ResponseWriter, r *http.Request, ctx *httpCtx) { + list, err := ctx.API.List() + if err != nil { + httputil.WriteJSON(w, r, http.StatusInternalServerError, err) + return + } + + httputil.WriteJSON(w, r, http.StatusOK, list) + }) +} + +// postRemoteRevPort connect to a remote port +func (hv *Hypervisor) postRemoteRevPort() http.HandlerFunc { + return hv.withCtx(hv.visorCtx, func(w http.ResponseWriter, r *http.Request, ctx *httpCtx) { + var reqBody struct { + RemotePk string `json:"remote_pk"` + RemotePort int `json:"remote_port"` + LocalPort int `json:"local_port"` + } + + if err := httputil.ReadJSON(r, &reqBody); err != nil { + if err != io.EOF { + hv.log(r).Warnf("postRemoteRevPort request: %v", err) + } + httputil.WriteJSON(w, r, http.StatusBadRequest, usermanager.ErrMalformedRequest) + return + } + + pk := cipher.PubKey{} + if err := pk.UnmarshalText([]byte(reqBody.RemotePk)); err != nil { + httputil.WriteJSON(w, r, http.StatusBadRequest, usermanager.ErrMalformedRequest) + return + } + + if _, err := ctx.API.Connect(pk, reqBody.RemotePort, reqBody.LocalPort); err != nil { + httputil.WriteJSON(w, r, http.StatusInternalServerError, err) + return + } + + httputil.WriteJSON(w, r, http.StatusOK, struct{}{}) + }) +} + +// deleteRemoteRevPort disconnect from a remote port +func (hv *Hypervisor) deleteRemoteRevPort() http.HandlerFunc { + return hv.withCtx(hv.revCtx, func(w http.ResponseWriter, r *http.Request, ctx *httpCtx) { + err := ctx.API.Disconnect(ctx.RevID) + if err != nil { + httputil.WriteJSON(w, r, http.StatusInternalServerError, err) + return + } + httputil.WriteJSON(w, r, http.StatusOK, struct{}{}) + }) +} + +func (hv *Hypervisor) postPingVisor() http.HandlerFunc { + return hv.withCtx(hv.visorCtx, func(w http.ResponseWriter, r *http.Request, ctx *httpCtx) { + var reqBody struct { + RemotePk string `json:"remote_pk"` + Size int `json:"size"` + Tries int `json:"tries"` + } + + if err := httputil.ReadJSON(r, &reqBody); err != nil { + if err != io.EOF { + hv.log(r).Warnf("postPingVisor request: %v", err) + } + httputil.WriteJSON(w, r, http.StatusBadRequest, usermanager.ErrMalformedRequest) + return + } + + pk := cipher.PubKey{} + if err := pk.UnmarshalText([]byte(reqBody.RemotePk)); err != nil { + httputil.WriteJSON(w, r, http.StatusBadRequest, usermanager.ErrMalformedRequest) + return + } + + pingConfig := PingConfig{PK: pk, Tries: reqBody.Tries, PcktSize: reqBody.Size} + + err := ctx.API.DialPing(pingConfig) + if err != nil { + httputil.WriteJSON(w, r, http.StatusInternalServerError, err) + return + } + latencies, err := ctx.API.Ping(pingConfig) + if err != nil { + go ctx.API.StopPing(pk) //nolint + httputil.WriteJSON(w, r, http.StatusInternalServerError, errors.New("unexpected problem during connection")) + return + } + ctx.API.StopPing(pk) //nolint + + var response []int64 + for _, lat := range latencies { + response = append(response, lat.Milliseconds()) + } + + httputil.WriteJSON(w, r, http.StatusOK, response) + }) +} + /* <<< Helper functions >>> */ @@ -1355,6 +1533,12 @@ type httpCtx struct { // Route RtKey routing.RouteID + + // FWD local port + FwdPort int + + // Rev connection id + RevID uuid.UUID } type ( @@ -1481,6 +1665,40 @@ func (hv *Hypervisor) routeCtx(w http.ResponseWriter, r *http.Request) (*httpCtx return ctx, true } +func (hv *Hypervisor) fwdCtx(w http.ResponseWriter, r *http.Request) (*httpCtx, bool) { + ctx, ok := hv.visorCtx(w, r) + if !ok { + return nil, false + } + + port, err := strconv.Atoi(chi.URLParam(r, "port")) + if err != nil { + httputil.WriteJSON(w, r, http.StatusBadRequest, err) + return nil, false + } + + ctx.FwdPort = port + + return ctx, true +} + +func (hv *Hypervisor) revCtx(w http.ResponseWriter, r *http.Request) (*httpCtx, bool) { + ctx, ok := hv.visorCtx(w, r) + if !ok { + return nil, false + } + + id, err := uuidFromParam(r, "id") + if err != nil { + httputil.WriteJSON(w, r, http.StatusBadRequest, err) + return nil, false + } + + ctx.RevID = id + + return ctx, true +} + func pkFromParam(r *http.Request, key string) (cipher.PubKey, error) { pk := cipher.PubKey{} err := pk.UnmarshalText([]byte(chi.URLParam(r, key))) diff --git a/pkg/visor/init.go b/pkg/visor/init.go index d5ec4d8bb8..e3849d7c7e 100644 --- a/pkg/visor/init.go +++ b/pkg/visor/init.go @@ -1479,7 +1479,7 @@ func initHypervisor(_ context.Context, v *Visor, log *logging.Logger) error { // Addr: conf.HTTPAddr, Handler: handler, ReadTimeout: 5 * time.Second, - WriteTimeout: 10 * time.Second, + WriteTimeout: HTTPWriteTimeout, } go func() { diff --git a/pkg/visor/rpc_client.go b/pkg/visor/rpc_client.go index 75c1e7c025..d2e8432579 100644 --- a/pkg/visor/rpc_client.go +++ b/pkg/visor/rpc_client.go @@ -457,6 +457,11 @@ func (rc *rpcClient) Shutdown() error { return rc.Call("Shutdown", &struct{}{}, &struct{}{}) } +// ShutdownWithoutOsExit calls ShutdownWithoutOsExit. +func (rc *rpcClient) ShutdownWithoutOsExit() error { + return rc.Call("ShutdownWithoutOsExit", &struct{}{}, &struct{}{}) +} + // Exec calls Exec. func (rc *rpcClient) Exec(command string) ([]byte, error) { output := make([]byte, 0) @@ -1239,6 +1244,11 @@ func (mc *mockRPCClient) Shutdown() error { return nil } +// ShutdownWithoutOsExit implements API. +func (mc *mockRPCClient) ShutdownWithoutOsExit() error { + return nil +} + // Exec implements API. func (mc *mockRPCClient) Exec(string) ([]byte, error) { return []byte("mock"), nil