Skip to content

Commit

Permalink
feat: Add publish and update connect
Browse files Browse the repository at this point in the history
  • Loading branch information
ersonp authored and ersonp committed Jun 17, 2024
1 parent 65a4123 commit 8ee38fa
Show file tree
Hide file tree
Showing 7 changed files with 356 additions and 101 deletions.
3 changes: 1 addition & 2 deletions cmd/skywire-cli/commands/net/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ var conCmd = &cobra.Command{
internal.Catch(cmd.Flags(), err)

for _, connectConn := range connectConns {
_, err = fmt.Fprintf(w, "%s\t%s\t%s\n", connectConn.ID, strconv.Itoa(int(connectConn.LocalPort)),
_, err = fmt.Fprintf(w, "%s\t%s\t%s\n", connectConn.ID, strconv.Itoa(int(connectConn.WebPort)),
strconv.Itoa(int(connectConn.RemotePort)))
internal.Catch(cmd.Flags(), err)
}
Expand Down Expand Up @@ -97,7 +97,6 @@ var conCmd = &cobra.Command{
if 65536 < remotePort || 65536 < localPort {
internal.PrintFatalError(cmd.Flags(), fmt.Errorf("port cannot be greater than 65535"))
}

id, err := rpcClient.Connect(remotePK, remotePort, localPort)
internal.Catch(cmd.Flags(), err)
internal.PrintOutput(cmd.Flags(), id, fmt.Sprintln(id))
Expand Down
8 changes: 6 additions & 2 deletions cmd/skywire-cli/commands/net/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,15 @@ var pubCmd = &cobra.Command{

if deregister {
err = rpcClient.DeregisterHTTPPort(portNo)
internal.Catch(cmd.Flags(), err)

} else {
err = rpcClient.RegisterHTTPPort(portNo)
internal.Catch(cmd.Flags(), err)
id, err := rpcClient.Publish(portNo)
internal.Catch(cmd.Flags(), err)
internal.PrintOutput(cmd.Flags(), "id: %v\n", fmt.Sprintln(id))
}
internal.Catch(cmd.Flags(), err)
internal.PrintOutput(cmd.Flags(), "OK", "OK\n")

},
}
117 changes: 51 additions & 66 deletions pkg/app/appnet/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ import (
"net"
"net/http"
"sync"
"time"

"github.com/gin-gonic/gin"
"github.com/google/uuid"

"github.com/skycoin/skywire-utilities/pkg/cipher"
"github.com/skycoin/skywire-utilities/pkg/logging"
)

Expand Down Expand Up @@ -54,47 +55,45 @@ func RemoveConnectConn(id uuid.UUID) {
// ConnectConn represents a connection that is published on the skywire network
type ConnectConn struct {
ID uuid.UUID
LocalPort int
WebPort int
RemotePort int
remoteConn net.Conn
r *gin.Engine
closeOnce sync.Once
srv *http.Server
closeChan chan struct{}
log *logging.Logger
}

// NewConnectConn creates a new ConnectConn
func NewConnectConn(log *logging.Logger, remoteConn net.Conn, remotePort, localPort int) *ConnectConn {
closeChan := make(chan struct{})
var once sync.Once
handler := http.NewServeMux()
var lock sync.Mutex
handler.HandleFunc("/", handleFunc(remoteConn, log, closeChan, once, &lock))

srv := &http.Server{
Addr: fmt.Sprintf(":%v", localPort),
Handler: handler,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
MaxHeaderBytes: 1 << 20,
}
func NewConnectConn(log *logging.Logger, remoteConn net.Conn, remotePK cipher.PubKey, remotePort, webPort int) *ConnectConn {

httpC := &http.Client{Transport: MakeHTTPTransport(remoteConn, log)}

r := gin.New()

r.Use(gin.Recovery())

r.Use(loggingMiddleware())

r.Any("/*path", handleConnectFunc(httpC, remotePK, remotePort))

fwdConn := &ConnectConn{
ID: uuid.New(),
remoteConn: remoteConn,
srv: srv,
LocalPort: localPort,
WebPort: webPort,
RemotePort: remotePort,
closeChan: closeChan,
log: log,
r: r,
}

AddConnect(fwdConn)
return fwdConn
}

// Serve serves a HTTP forward conn that accepts all requests and forwards them directly to the remote server over the specified net.Conn.
func (f *ConnectConn) Serve() {
go func() {
err := f.srv.ListenAndServe()
err := f.r.Run(":" + fmt.Sprintf("%v", f.WebPort)) //nolint
if err != nil {
// don't print error if local server is closed
if !errors.Is(err, http.ErrServerClosed) {
Expand All @@ -109,71 +108,57 @@ func (f *ConnectConn) Serve() {
f.log.Error(err)
}
}()
f.log.Debugf("Serving on localhost:%v", f.LocalPort)
f.log.Debugf("Serving on localhost:%v", f.WebPort)
}

// Close closes the server and remote connection.
func (f *ConnectConn) Close() (err error) {
f.closeOnce.Do(func() {
err = f.srv.Close()
err = f.remoteConn.Close()
RemoveConnectConn(f.ID)
})
return err
}

func isClosed(c chan struct{}) bool {
select {
case <-c:
return true
default:
return false
}
}

func handleFunc(remoteConn net.Conn, log *logging.Logger, closeChan chan struct{}, once sync.Once, lock *sync.Mutex) func(w http.ResponseWriter, req *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
lock.Lock()
defer lock.Unlock()
func handleConnectFunc(httpC *http.Client, remotePK cipher.PubKey, remotePort int) func(c *gin.Context) {
return func(c *gin.Context) {
var urlStr string
urlStr = fmt.Sprintf("sky://%s:%v%s", remotePK, remotePort, c.Param("path"))
if c.Request.URL.RawQuery != "" {
urlStr = fmt.Sprintf("%s?%s", urlStr, c.Request.URL.RawQuery)
}

if isClosed(closeChan) {
fmt.Printf("Proxying request: %s %s\n", c.Request.Method, urlStr)
req, err := http.NewRequest(c.Request.Method, urlStr, c.Request.Body)
if err != nil {
c.String(http.StatusInternalServerError, "Failed to create HTTP request")
return
}
client := http.Client{Transport: MakeHTTPTransport(remoteConn, log)}
// Forward request to remote server
resp, err := client.Transport.RoundTrip(r)

for header, values := range c.Request.Header {
for _, value := range values {
req.Header.Add(header, value)
}
}

resp, err := httpC.Do(req)
if err != nil {
http.Error(w, "Could not reach remote server", 500)
log.WithError(err).Errorf("Could not reach remote server %v", resp)
once.Do(func() {
close(closeChan)
})
c.String(http.StatusInternalServerError, "Failed to connect to HTTP server")
fmt.Printf("Error: %v\n", err)
return
}
defer resp.Body.Close() //nolint

defer func() {
if err := resp.Body.Close(); err != nil {
log.WithError(err).Errorln("Failed to close forwarding response body")
}
}()
for key, value := range resp.Header {
for _, v := range value {
w.Header().Set(key, v)
for header, values := range resp.Header {
for _, value := range values {
c.Writer.Header().Add(header, value)
}
}
w.WriteHeader(resp.StatusCode)
// Transfer response from remote server -> client
if resp.ContentLength > 0 {
if _, err := io.CopyN(w, resp.Body, resp.ContentLength); err != nil {
log.Warn(err)
}
} else if resp.Close {
// Copy until EOF or some other error occurs
for {
if _, err := io.Copy(w, resp.Body); err != nil {
break
}
}

c.Status(resp.StatusCode)
if _, err := io.Copy(c.Writer, resp.Body); err != nil {
c.String(http.StatusInternalServerError, "Failed to copy response body")
fmt.Printf("Error copying response body: %v\n", err)
}
}
}
Loading

0 comments on commit 8ee38fa

Please sign in to comment.