From a120cd1e9553a74acf367d533dff215767124844 Mon Sep 17 00:00:00 2001 From: Steve Kemp Date: Wed, 24 Apr 2019 17:07:52 +0300 Subject: [PATCH 1/9] Document how we expect things to work when we're done. This updates the hostname to be `tunnel.steve.fi`, because we're changing the way the client <-> server link works, and that means the existing public-endpoint won't work. TODO: * Think about signing the requests the server submits. * That doesn't stop sniffing, but does stop spidering/probing. --- README.md | 80 ++++++++++++++++++++-------------------------------- mq/README.md | 36 +++++++++++++++++++++++ 2 files changed, 66 insertions(+), 50 deletions(-) create mode 100644 mq/README.md diff --git a/README.md b/README.md index aed433c..bebeabc 100644 --- a/README.md +++ b/README.md @@ -11,48 +11,59 @@ Table of Contents * [Installation](#installation) * [Source Installation go <= 1.11](#source-installation-go---111) * [Source installation go >= 1.12](#source-installation-go---112) - * [Installation Self-Hosted Server](#installation-self-hosted-server) + * [Installation of your self-hosted Server](#installation-of-your-self-hosted-server) * [Github Setup](#github-setup) -* [Implementation Issues](#implementation-issues) # tunneller -Tunneller allows you to expose services which are running on `localhost`, or on your local network, to the entire internet. +Tunneller allows you to expose services which are running on `localhost`, or on your local network, to the public internet. This is very useful for testing webhooks, the generation of static-site compilers, and similar things. +>**NOTE**: There is a public end-point I host, you __SHOULD NOT__ rely upon it. It might come and go. It is not secure. + ## Overview -Assuming you have a service running within your local network, perhaps a HTTP server you could access via http://localhost:8080/, you can expose that to the entire internet by running: +Assuming you have a service running within your local network, perhaps a HTTP server you could access via http://localhost:8080/, you can expose that to the public-internet by running: - $ tunneller client -expose localhost:8080 -name=example + $ tunneller client -expose localhost:8080 This will output something like this: .. - Visit http://example.tunneller.steve.fi/ to see your content + Visit http://5ab654e6-7672-4e54-8368-49ec4aa4c6e4.tunnel.steve.fi/ to see your content + +The location listed will now be publicly visible to all remote hosts. + +As the name implies there is a central-host involved which is in charge of routing/proxying to your local network - in this case that central host is `tunnel.steve.fi` - the reason this project exists is not to host a general-purpose end-point, but instead to allow you to host your own. +In short this project is designed to be a __self-hosted__ alternative to software such as `ngrok`. -The location listed will now be publicly visible to all remote hosts. As the name implies there is a central-host involved which is in charge of routing/proxying to your local network - in this case that central host is `tunneller.steve.fi`, but the important thing is that you can run your own instance of that server. +So remember: + +>**NOTE**: There is a public end-point I host, you __SHOULD NOT__ rely upon it. You should configure your own server, and use it. -This is a self-hosted alternative to a system such as `ngrok`. ## How it works -When a client is launched it creates a web-socket connection to the default remote end-point, `tunneller.steve.fi`, and keeps that connection alive. A name is also sent for that connection. +When a client is launched it creates a connection to a message-bus running on the default remote end-point, `tunnel.steve.fi`, it keeps that connection alive waiting for instructions. + +When a request comes in for `foo.tunnel.steve.fi` the server will submit a command for the client to make the appropriate request by publishing a message upon the topic the client is listening to. (Each client has a name, and listens to its own topic). -Next, when a request comes in for `foo.tunneller.steve.fi` the server can look for an open web-socket connection with the name `foo`, and route the request through it: +In short: * The server sends a "Fetch this URL" request to the client. * The client makes the request to fetch the URL * This will succeed, because the client is running inside your network and can access localhost, and any other "internal" resources. -* The response is sent back to the server +* The response is sent back to the server. * And from there it is routed back to the requested web-browser. +Because the client connects directly to a message-bus there is always the risk that malicious actors will inject fake requests, attempting to scan, probe, and otherwise abuse your local network. + ## Installation @@ -81,24 +92,19 @@ If you don't have a golang environment setup you should be able to download a bi -## Installation Self-Hosted Server - -If you wish to host your own central-server things are a little more complex: - -* You'll need to create a DNS-entry `tunneller.example.com` -* You'll also need to setup a __wildcard__ DNS entry for `*.tunneller.example.com` to point to the same host. -* Finally you'll need to setup nginx/apache to proxy to the tunneller application. - * By default this will listen upon 127.0.0.1:8080. +## Installation of your self-hosted Server -You can find a sample configuration file for Apache2 beneath the [apache2](apache2) directory. +If you wish to host your own central-server this is how to do it: -## WSS options +* Create a DNS-entry `tunnel.example.com`, pointing to your host. +* Create a __wildcard__ DNS entry for `*.tunnel.example.com` to point to the same host. +* Setup and configure [mosquitto queue](https://mosquitto.org/) running on that same host. + * See [mq/](mq/) for details there. + * Don't forget to ensure that the MQ-service is publicly visible, by opening a firewall hole for port `1883` if required. -If you want to setup an encrypted remote endpoint (using apache mod_ssl, for example) you can use the `wss://` prefix to connect to a secure websocket (eg. `-tunnel wss://tunneller.example.com`). If no prefix is set on the `-tunnel` option `ws://` is assumed. +Of course security is important, so you should ensure that your message-bus is only reachable by clients you trust to expose their services. (i.e. Your VPN and office range(s).) -In case the SSL certificate you have installed on your webserver is self-signed or you are just testing, you can use the `-insecure` option to allow connection to unverified domains. -See the [apache2](apache2) folder for documentation on how to setup an encrypted websocket. ## Github Setup @@ -109,29 +115,3 @@ pull-requests are created/updated. The testing is carried out via Releases are automated in a similar fashion via [.github/build](.github/build), and the [github-action-publish-binaries](https://github.com/skx/github-action-publish-binaries) action. - - -## Implementation Issues - -This is a _really_ simple application, but it was more fiddly to implement than I expected, primarily because of issues with using websockets: - -* Websocket reads/writes block. -* Keeping the connection alive is fiddly, and requires the use of keep-alives. - * But these can't overlap with the "real" communication. - * Hence the mutex-use. - -It would be much simpler to implement a system like this using a RabbitMQ, mosquitto, or some other message-bus: - -* Client connects to the message-bus. - * When a HTTP-request must be made it would be sent down the bus. - * The reply would get posted back via the same route. - -The downside to using a message-bus is security; a client could easily sniff messages meant for _other_ clients. In a secure-environment this wouldn't matter, but hosting the central-endpoint publicly this would be a mistake. - -(Similarly you could use a Redis instance as the central queue, but exposing a Redis host to the internet would be a recipe for compromise.) - -If I were running this software at scale I'd probably look at using a queue though; we could create a per-client topic avoiding the issue of security. The downside to that would be we'd need to register: - - $ tunneller register --login=steve --password=bar - -Once that was complete we could connect to the MQ queue, and subscribe to "#steve" - assuming that the registration created an MQ-user for us, and that the ACLs on the queue would be setup such that the user `steve` couldn't subscripe to the topic belonging to another client (e.g. "`#bob`"). diff --git a/mq/README.md b/mq/README.md new file mode 100644 index 0000000..549fbe7 --- /dev/null +++ b/mq/README.md @@ -0,0 +1,36 @@ + +# Install mosquitto + +For a Debian/Ubuntu system: + + apt-get install mosquitto + + +# Configure Mosquitto + +Create `/etc/mosquitto/conf.d/acl.conf` with just the following contents: + + acl_file /etc/mosquitto/conf.d/acl.txt + +Now populate that with: + + topic readwrite clients/# + +The result of this will be that __any__ client can connect without any +username/password, and read/write to the topics beneath `clients`. + +For example client with the name `cake` can read/write to the topic +`clients/cake`. + + +## Test Subscription + +You should find that you can subscribe to the wildcard topic `clients/#` +via: + + $ mosquitto_sub -v -t clients/# + + +## Now you're good. + +Of course this does mean that clients can sniff on other user's traffic.. From e1cb931cb99b000da61b10cb1846c7224bd19f7f Mon Sep 17 00:00:00 2001 From: Steve Kemp Date: Wed, 24 Apr 2019 17:22:38 +0300 Subject: [PATCH 2/9] Added stub for public-key signing --- signer/main.go | 48 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) create mode 100644 signer/main.go diff --git a/signer/main.go b/signer/main.go new file mode 100644 index 0000000..1fce44f --- /dev/null +++ b/signer/main.go @@ -0,0 +1,48 @@ +// package signer contains a couple of utility methods +// for signing a string and validating the signature of that +// string again. + +package signer + +import ( + "fmt" + + s "github.com/blevesearch/bleve/docs" +) + +// Signer holds our state +type Signer struct { + // public holds the public-key as a string. + public string + + // private holds the private key as a string + private string +} + +// New constructs our object. +// It assumes that either the public-key or the private-key will be specified, +// as strings. +func New(pubkey string, privkey string) *Signer { + obj := &Singer{public: pubkey, private: privkey} + return obj +} + +// Sign is called to generate a signature of the given string. +func (s *Signer) Sign(input string) (string, error) { + + if s.private == "" { + return "", fmt.Errorf("Private key not present - cannot sign") + } + + return "", fmt.Errorf("Unimplemented") +} + +// Validate is called to validate the signature of the given string. +func Validate(input string, signature string) (bool, error) { + + if s.public == "" { + return "", fmt.Errorf("Public key not present - cannot validate") + } + + return false, fmt.Errorf("Unimplemented") +} From 72013a5da18d04b7a80bad4a844ec59a16dc8901 Mon Sep 17 00:00:00 2001 From: Steve Kemp Date: Wed, 24 Apr 2019 17:35:39 +0300 Subject: [PATCH 3/9] Ignore our generated public/private keys --- .gitignore | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.gitignore b/.gitignore index 0770506..2b61ddb 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,3 @@ tunneller +public.pem +private.pem From 6c83f344e2040e6bdf18548b76129e19af4e1c4b Mon Sep 17 00:00:00 2001 From: Steve Kemp Date: Wed, 24 Apr 2019 17:36:15 +0300 Subject: [PATCH 4/9] Remove reference to apache, config since we won't be using it for anything important --- apache2/README.md | 102 ---------------------------------------------- 1 file changed, 102 deletions(-) delete mode 100644 apache2/README.md diff --git a/apache2/README.md b/apache2/README.md deleted file mode 100644 index 502e43e..0000000 --- a/apache2/README.md +++ /dev/null @@ -1,102 +0,0 @@ -# Apache2 Setup - -The following is a complete configuration file for Apache2, as used in -production: - - # - # This is the bare-tunnel domain. - # - # Accesses to this will be websockets, so we need to handle - # that by using `ws:/` - # - - ServerName tunneller.steve.fi - - RewriteEngine On - RewriteCond %{HTTP:Upgrade} =websocket [NC] - RewriteRule /(.*) ws://localhost:8080/$1 [P,L] - RewriteCond %{HTTP:Upgrade} !=websocket [NC] - RewriteRule /(.*) http://localhost:8080/$1 [P,L] - - - Order allow,deny - Allow from all - - - ProxyPass / http://localhost:8080/ Keepalive=On - ProxyPassReverse / http://localhost:8080/ Keepalive=On - - - Order allow,deny - Allow from all - - - ProxyPreserveHost on - ProxyBadHeader ignore - - - - # - # This is the wildcard virtual-host, which will be accessed by - # the internet and proxied. - # - # We don't need to care about websockets here. - # - - ServerName a.tunneller.steve.fi - ServerAlias *.tunneller.steve.fi - - - Order allow,deny - Allow from all - - - ProxyPass / http://localhost:8080/ Keepalive=On - ProxyPassReverse / http://localhost:8080/ Keepalive=On - - - Order allow,deny - Allow from all - - - ProxyPreserveHost on - ProxyBadHeader ignore - - -## Encrypted websocket tunnel - -If you want to use a TLS encrypted tunnel just change the configuration to listen on 443 and -mod_ssl as usual: - - - # - # This is the bare-tunnel domain. - # - # Accesses to this will be websockets, so we need to handle - # that by using `wss:/` - # - - ServerName tunneller.steve.fi - - SSLEngine on - SSLCertificateFile /path/to/your/certificate.pem - SSLCertificateKeyFile /path/to/your/key.pem - SSLCertificateChainFile /path/to/your/chain.pem - - RewriteEngine On - RewriteCond %{HTTP:Upgrade} =websocket [NC] - RewriteRule /(.*) ws://localhost:8080/$1 [P,L] - RewriteCond %{HTTP:Upgrade} !=websocket [NC] - RewriteRule /(.*) http://localhost:8080/$1 [P,L] - [...] - # Same VirtualHost configuration as example above - -Obviously you can encrypt also the public (*.tunneller.steve.fi) part with using HTTPS - -## Modules - -Note if you're not using the proxy-modules already you'll need: - - a2enmod proxy - a2enmod proxy_http - a2enmod proxy_wstunnel From 0cac8b8f82178dd50ab838bbedf0316847b1c99a Mon Sep 17 00:00:00 2001 From: Steve Kemp Date: Wed, 24 Apr 2019 19:36:57 +0300 Subject: [PATCH 5/9] Updated to use MQ as the transport. This has been tested via the `mosquitto_pub` command; inject a full-request such as: GET / HTTP/1.0 Host: foo.example.com The result is that the response is sent back down the same channel, as expected. Porting the server should be pretty simple, based on this work. Since the HTTP-server and the MQ-server are on the same host I'm going to cheat and only talk to MQ on the localhost for the server. --- cmd_client.go | 283 +++++++++++++++++++++++++------------------------- 1 file changed, 144 insertions(+), 139 deletions(-) diff --git a/cmd_client.go b/cmd_client.go index 37af122..553a15c 100644 --- a/cmd_client.go +++ b/cmd_client.go @@ -1,25 +1,33 @@ // // Client for our self-hosted ngrok alternative. // +// The way that this operates is pretty simple: +// +// 1. Connect to the named Mosquitto Queue +// +// 2. Subscribe to /clients/$id +// +// 3. Wait for an URL to be posted to that topic, when it +// is we fetch it and return the result. +// +// 4. Magic. package main import ( "bytes" - "crypto/tls" "context" - b64 "encoding/base64" "flag" "fmt" "io" - "io/ioutil" "net" - "net/url" + "os" + "os/signal" "strings" - "sync" + "syscall" + MQTT "github.com/eclipse/paho.mqtt.golang" "github.com/google/subcommands" - "github.com/gorilla/websocket" uuid "github.com/satori/go.uuid" ) @@ -28,11 +36,6 @@ import ( // type clientCmd struct { - // - // Mutex protects our state. - // - mutex *sync.Mutex - // // The name we'll access this resource via. // @@ -47,11 +50,6 @@ type clientCmd struct { // The service to expose. // expose string - - // - // Allow insecure TLS connection (for self signed certs, for example) - // - insecure bool } // Name returns the name of this sub-command. @@ -71,167 +69,174 @@ func (p *clientCmd) Usage() string { func (p *clientCmd) SetFlags(f *flag.FlagSet) { f.StringVar(&p.expose, "expose", "", "The host/port to expose to the internet.") - f.StringVar(&p.tunnel, "tunnel", "tunneller.steve.fi", "The address of the publicly visible tunnel-host") + f.StringVar(&p.tunnel, "tunnel", "tunnel.steve.fi", "The address of the publicly visible tunnel-host") f.StringVar(&p.name, "name", "", "The name for this connection") - f.BoolVar(&p.insecure, "insecure", false, "Skip remote certificate validation (insecure!)") } -// Check if str has allowed prefix and if not return -// the string with default one -func checkUrlSchema(str string, defaultPrefix string, prefixes []string) string { - for _, prefix := range prefixes { - if strings.HasPrefix(str, prefix) { - return str - } +// onMessage is called when a message is received upon the MQ-topic we're +// watching. +// +// We have to peform the HTTP-fetch which is contained within the message, +// and submit the result back to that same topic. +func (p *clientCmd) onMessage(client MQTT.Client, msg MQTT.Message) { + + // + // Get the text of the request. + // + fetch := msg.Payload() + + // + // TODO: Validate.. + // + + // + // If this is one of our replies ignore it. + // + if strings.HasPrefix(string(fetch), "X-") { + return } - fmt.Printf("No known prefix found, using %s\n", defaultPrefix) - return defaultPrefix + str + + // + // At this point we've received a request. + // + fmt.Printf("Received incoming request:\n%s\n", fetch) + + // + // This is the result we'll publish back onto the topic. + // + result := `HTTP/1.0 200 OK +Content-type: text/html; charset=UTF-8 +Connection: close + + + + +

The remote server was unreachable.

+ +` + + // + // Make the connection to our proxied host. + // + d := net.Dialer{} + con, err := d.Dial("tcp", p.expose) + + // + // OK we have a default result saved, which shows an error-page. + // + // If we didn't actually get an error then save the real response. + // + if err == nil { + + // + // Make the request + // + con.Write(fetch) + + // + // Read the reply. + // + var reply bytes.Buffer + io.Copy(&reply, con) + + // + // Store. + // + result = string(reply.Bytes()) + } + + // + // Send the reply back to the MQ topic. + // + fmt.Printf("Returning response:\n%s\n", result) + token := client.Publish("clients/"+p.name, 0, false, "X-"+result) + token.Wait() } +// // Execute is the entry-point to this sub-command. +// +// 1. Connect to the tunnel-host. +// +// 2. Subscribe to MQ and await the reception of URLs to fetch. +// +// (When one is received it will be handled via onMessage.) +// func (p *clientCmd) Execute(_ context.Context, f *flag.FlagSet, _ ...interface{}) subcommands.ExitStatus { - p.mutex = &sync.Mutex{} - // // Ensure that we have setup variables // if p.expose == "" { - fmt.Printf("You must specify the host:port to expose.\n") + fmt.Printf("You must specify the local host:port to expose.\n") return 1 } if p.tunnel == "" { - fmt.Printf("You must specify the URL of the tunnel end-point.\n") + fmt.Printf("You must specify the tunnel end-point.\n") return 1 } + // + // This is optional, but useful. + // if p.name == "" { - // or error handling uid := uuid.NewV4() p.name = uid.String() } - // Not so clever hack to deal with malformed (schemaless) urls - // Schema must be set in the string to be parsed as url.Parse enforces correct url format - allowedPrefixes := []string{"ws://", "wss://"} - p.tunnel = checkUrlSchema(p.tunnel, "ws://", allowedPrefixes) - - // Parse url; - parsedUrl, err := url.Parse(p.tunnel) - if err != nil { - fmt.Printf("Cannot parse url %s: %s\n", p.tunnel, err) - } - // - // These are the details of the tunneller-server + // Create a channel so that we can be disconnected cleanly. // - u := url.URL{Scheme: parsedUrl.Scheme, Host: parsedUrl.Host, Path: "/" + p.name} - fmt.Printf("Connecting to %s\n", u.String()) + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) // - // connect to it + // Setup the server-address. // - tls_config := tls.Config{InsecureSkipVerify: p.insecure} - - ws_dial := websocket.Dialer{ - TLSClientConfig: &tls_config, - } - - c, resp, err := ws_dial.Dial(u.String(), nil) - if err != nil { - - if err == websocket.ErrBadHandshake { - fmt.Printf("\tHandshake failed with status %d\n", resp.StatusCode) - - defer resp.Body.Close() - var body []byte - body, err = ioutil.ReadAll(resp.Body) - if err == nil { - fmt.Printf("\t%s\n\n", body) - } - } - fmt.Printf("Connection failed: %s", err) - return 1 - } - defer c.Close() + opts := MQTT.NewClientOptions().AddBroker(fmt.Sprintf("tcp://%s:1883", p.tunnel)) + + // + // Set our name. + // + opts.SetClientID(p.name) // // Connected now, show instructions // - fmt.Printf("Visit http://%s.%s to see the local content from %s\n", - p.name, parsedUrl.Host, p.expose) + fmt.Printf("tunneller client launched\n") + fmt.Printf("=========================\n") + fmt.Printf("Visit http://%s.%s/ to see the local content from %s\n", + p.name, p.tunnel, p.expose) - // Loop for messages - for { - p.mutex.Lock() - msgType, message, err := c.ReadMessage() - p.mutex.Unlock() + // + // Once we're connected we will subscribe to the named topic. + // + opts.OnConnect = func(c MQTT.Client) { - if err != nil { - fmt.Printf("Error reading the message from the socket: %s", err.Error()) + topic := "clients/" + p.name + + if token := c.Subscribe(topic, 0, p.onMessage); token.Wait() && token.Error() != nil { + fmt.Printf("Failed to subscribe to the MQ-topic:%s\n", token.Error()) return 1 } + } - if msgType == websocket.PingMessage { - fmt.Printf("Got pong-reply\n") - p.mutex.Lock() - c.WriteMessage(websocket.PongMessage, nil) - p.mutex.Unlock() + // + // Actually establish the MQ connection. + // + client := MQTT.NewClient(opts) + if token := client.Connect(); token.Wait() && token.Error() != nil { + fmt.Printf("Failed to connect to the MQ-host %s\n", token.Error()) + return 1 + } - } - if msgType == websocket.TextMessage { - - // - // At this point we've received a message. - // - // Show it - // - fmt.Printf("Received incoming request:\n%s\n", message) - - // - // Make the connection to our proxied host. - // - d := net.Dialer{} - con, err := d.Dial("tcp", p.expose) - if err != nil { - // - // Connection refused talking to the host - // - res := `HTTP 200 OK -Connection: close + // + // Wait until we're interrupted. + // + <-c -Remote server was unreachable -` - safe := b64.StdEncoding.EncodeToString([]byte(res)) - - p.mutex.Lock() - err = c.WriteMessage(websocket.TextMessage, []byte(safe)) - if err != nil { - fmt.Printf("Error writing our error message to the socket:%s\n", err.Error()) - } - p.mutex.Unlock() - continue - } - con.Write(message) - - // - // Read the reply - // - var reply bytes.Buffer - io.Copy(&reply, con) - - // - // Send it back - // - safe := b64.StdEncoding.EncodeToString(reply.Bytes()) - p.mutex.Lock() - err = c.WriteMessage(websocket.TextMessage, []byte(safe)) - if err != nil { - fmt.Printf("Error writing our response to the socket:%s\n", err.Error()) - } - - p.mutex.Unlock() - fmt.Printf("Sent reply ..\n") - } - } + // + // Not reached. + // + return 0 } From ef56929ecc68a8c56701afc3a19e3230f2113b1f Mon Sep 17 00:00:00 2001 From: Steve Kemp Date: Wed, 24 Apr 2019 19:38:31 +0300 Subject: [PATCH 6/9] Updated our dependencies, now that we're using MQ. --- go.mod | 4 +++- go.sum | 11 +++++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 0db7232..28a9bde 100644 --- a/go.mod +++ b/go.mod @@ -3,9 +3,11 @@ module github.com/skx/tunneller go 1.12 require ( + github.com/blevesearch/bleve v0.7.0 + github.com/eclipse/paho.mqtt.golang v1.2.0 github.com/google/subcommands v1.0.1 - github.com/gorilla/websocket v1.4.0 github.com/kr/pretty v0.1.0 // indirect github.com/satori/go.uuid v1.2.0 + golang.org/x/net v0.0.0-20190424024845-afe8014c977f // indirect gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect ) diff --git a/go.sum b/go.sum index d03c46b..08b5748 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,9 @@ +github.com/blevesearch/bleve v0.7.0 h1:znyZ3zjsh2Scr60vszs7rbF29TU6i1q9bfnZf1vh0Ac= +github.com/blevesearch/bleve v0.7.0/go.mod h1:Y2lmIkzV6mcNfAnAdOd+ZxHkHchhBfU/xroGIp61wfw= +github.com/eclipse/paho.mqtt.golang v1.2.0 h1:1F8mhG9+aO5/xpdtFkW4SxOJB67ukuDC3t2y2qayIX0= +github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts= github.com/google/subcommands v1.0.1 h1:/eqq+otEXm5vhfBrbREPCSVQbvofip6kIz+mX5TUH7k= github.com/google/subcommands v1.0.1/go.mod h1:ZjhPrFU+Olkh9WazFPsl27BQ4UPiG37m3yTrtFlrHVk= -github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q= -github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= @@ -9,5 +11,10 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/net v0.0.0-20190424024845-afe8014c977f h1:uALRiwYevCJtciRa4mKKFkrs5jY4F2OTf1D2sfi1swY= +golang.org/x/net v0.0.0-20190424024845-afe8014c977f/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= From d242e11c7fb7a8b5e62d4f7ee97e39de44151788 Mon Sep 17 00:00:00 2001 From: Steve Kemp Date: Wed, 24 Apr 2019 21:42:21 +0300 Subject: [PATCH 7/9] Completed port to MQ. Tested with 20 concurrent clients accessing 20 local (python webservers). Zero deadlocks. Zero failures. --- cmd_server.go | 348 +++++++++++++++++++------------------------------- 1 file changed, 128 insertions(+), 220 deletions(-) diff --git a/cmd_server.go b/cmd_server.go index 64be153..448061c 100644 --- a/cmd_server.go +++ b/cmd_server.go @@ -1,32 +1,35 @@ +// +// We present ourselves as a HTTP-server. +// +// We assume that *.tunnel.example.com will point to us, +// such that we receive requests for all names. +// +// When a request comes in for the host "foo.tunnel.example.com" +// +// 1. we squirt the incoming request down the MQ topic clients/foo. +// +// 2. We then await a reply, for up to 10 seconds. +// +// If we receive it great. +// +// Otherwise we return an error. +// + package main import ( "context" - b64 "encoding/base64" "flag" "fmt" "net/http" "net/http/httputil" "strings" - "sync" "time" + MQTT "github.com/eclipse/paho.mqtt.golang" "github.com/google/subcommands" - "github.com/gorilla/websocket" ) -// -// Each incoming websocket-connection will be allocated an instance of this -// because we want to ensure we read/write safely. -// -type connection struct { - // mutex for safety - mutex *sync.RWMutex - - // the socket to use to talk to the remote peer. - socket *websocket.Conn -} - // // serveCmd is the structure for this sub-command. // @@ -34,14 +37,11 @@ type serveCmd struct { // The host we bind upon bindHost string + // MQ conneciton + mq MQTT.Client + // the port we bind upon bindPort int - - // mutex for safety - assignedMutex *sync.RWMutex - - // keep track of name/connection pairs - assigned map[string]*connection } // Name returns the name of this sub-command. @@ -53,7 +53,7 @@ func (p *serveCmd) Synopsis() string { return "Launch the HTTP server." } // Usage returns details of this sub-command. func (p *serveCmd) Usage() string { return `serve [options]: - Launch the HTTP server for proxying via our clients + Launch the HTTP server for proxying via our MQ-connection to the clients. ` } @@ -63,58 +63,22 @@ func (p *serveCmd) SetFlags(f *flag.FlagSet) { f.StringVar(&p.bindHost, "host", "127.0.0.1", "The IP to listen upon.") } -// -// We want to make sure that we check the origin of any websocket-connections -// and bump the size of the buffers. -// -var upgrader = websocket.Upgrader{ - ReadBufferSize: 2048, - WriteBufferSize: 2048, - CheckOrigin: func(r *http.Request) bool { return true }, -} - // // HTTPHandler is the core of our server. // -// This function is invoked for all accesses. However it is complicated -// because it will be invoked in two different roles: -// -// http://foo.tunneller.example.com/blah -// -// -> Route the request to the host connected with name "foo". +// This function is invoked for all accesses. // -// ws://tunneller.example.com/foo -// -// -> Associate the name 'foo' with the long-lived web-socket connection -// -// We can decide at run-time if we're invoked with a HTTP-connection or -// a WS:// connection via the `Connection` header. +// If a request is made for our public-key that is handled, otherwise we +// defer to sending requests to connected clients via MQ. // func (p *serveCmd) HTTPHandler(w http.ResponseWriter, r *http.Request) { - // - // See if we're upgrading to a websocket connection. - // - con := r.Header.Get("Connection") - if strings.Contains(con, "Upgrade") { - p.HTTPHandlerWS(w, r) - } else { - p.HTTPHandlerHTTP(w, r) - } -} - -// -// HTTPHandlerHTTP is invoked to forward an incoming HTTP-request -// to the remote host which is tunnelling it. -// -func (p *serveCmd) HTTPHandlerHTTP(w http.ResponseWriter, r *http.Request) { - // // See which vhost the connection was sent to, we assume that // the variable part will be the start of the hostname, which will // be split by "." // - // i.e. "foo.tunneller.steve.fi" has a name of "foo". + // i.e. "foo.tunnel.steve.fi" has a name of "foo". // host := r.Host if strings.Contains(host, ".") { @@ -122,17 +86,6 @@ func (p *serveCmd) HTTPHandlerHTTP(w http.ResponseWriter, r *http.Request) { host = hsts[0] } - // - // Find the client to which to route the request. - // - p.assignedMutex.Lock() - sock := p.assigned[host] - p.assignedMutex.Unlock() - if sock == nil { - fmt.Fprintf(w, "The request cannot be made to '%s' as the host is offline!", host) - return - } - // // Dump the request to plain-text // @@ -145,57 +98,104 @@ func (p *serveCmd) HTTPHandlerHTTP(w http.ResponseWriter, r *http.Request) { } // - // Forward it on. + // Publish the request we've received to the topic that we + // believe the client will be listening upon. // - fmt.Printf("Locking mutex\n") - sock.mutex.Lock() - fmt.Printf("Locked mutex\n") - err = sock.socket.WriteMessage(websocket.TextMessage, []byte(requestDump)) - if err != nil { - fmt.Printf("Failed to send request down socket %s\n", err.Error()) - } - sock.mutex.Unlock() - fmt.Printf("\tRequest sent.\n") + token := p.mq.Publish("clients/"+host, 0, false, requestDump) + token.Wait() // - // Wait for the response from the client. + // The (complete) response from the client will be placed here. // response := "" - for len(response) == 0 { - fmt.Printf("Awaiting a reply ..\n") - - sock.mutex.Lock() - msgType, msg, error := sock.socket.ReadMessage() - sock.mutex.Unlock() - fmt.Printf("\tReceived something ..\n") + // + // Subscribe to the topic. + // + sub_token := p.mq.Subscribe("clients/"+host, 0, func(client MQTT.Client, msg MQTT.Message) { - if error != nil { - fmt.Printf("\tError reading from websocket:%s\n", error.Error()) - fmt.Fprintf(w, "Error reading from websocket %s", error.Error()) - return - } - if msgType == websocket.TextMessage { - fmt.Printf("\tReply received.\n") - - var raw []byte - raw, err = b64.StdEncoding.DecodeString(string(msg)) - if err != nil { - fmt.Printf("Error decoding BASE64 from WS:%s\n", err.Error()) - fmt.Fprintf(w, "Error decoding BASE64 from WS:%s\n", err.Error()) - return - } - - response = string(raw) + // + // This function will be executed when a message is received + // + // To avoid loops we're making sure that the client publishes + // its response with a specific-prefix, so that it doesn't + // treat it as a request to be made. + // + // That means that we can identify it here too. + // + tmp := string(msg.Payload()) + if strings.HasPrefix(tmp, "X-") { + response = tmp[2:] } + }) + sub_token.Wait() + if sub_token.Error() != nil { + fmt.Printf("Error subscribing to clients/%s - %s\n", host, sub_token.Error()) + fmt.Fprintf(w, "Error subscribing to clients/%s - %s\n", host, sub_token.Error()) + return + } + + // + // We now busy-wait until we have a reply. + // + // We wait for up to ten seconds before deciding the client + // is either a) offline, or b) failing. + // + count := 0 + for len(response) == 0 && count < 10 { + + // + // Sleep 1 second; max count 10, result: 10 seconds. + // + fmt.Printf("Awaiting a reply ..\n") + time.Sleep(1 * time.Second) + count++ + } + + // + // Unsubscribe from the topic, regardless of whether we received + // a response or note. + // + // Just to cut down on resource-usage. + // + unsub_token := p.mq.Unsubscribe("clients/" + host) + unsub_token.Wait() + if unsub_token.Error() != nil { + fmt.Printf("Failed to unsubscribe from clients/%s - %s\n", + host, unsub_token.Error()) } // - // This is a hack. + // If the length is empty then that means either: + // + // 1. We didn't get a reply because the remote host was slow. + // + // 2. Nothing is listening on the topic, so the client is dead. + // + if len(response) == 0 { + + // + // Failure-response. + // + // NOTE: This is a "complete" response. + // + response = `HTTP/1.0 200 OK +Content-type: text/html; charset=UTF-8 +Connection: close + + + + +

We didn't receive a reply from the remote host, despite waiting 10 seconds.

+ + +` + } + // // The response from the client will be: // - // HTTP 200 OK + // HTTP/1.0 200 OK // Header: blah // Date: blah // [newline] @@ -217,125 +217,28 @@ func (p *serveCmd) HTTPHandlerHTTP(w http.ResponseWriter, r *http.Request) { fmt.Printf("Error running hijack:%s", err.Error()) return } - // Don't forget to close the connection: - fmt.Fprintf(bufrw, "%s", response) - bufrw.Flush() - conn.Close() - -} - -// -// HTTPHandlerWS is invoked to handle an incoming websocket request. -// -// If a request is made for http://tunneller.example.com/blah we -// assign the name "blah" to the connection. -// -func (p *serveCmd) HTTPHandlerWS(w http.ResponseWriter, r *http.Request) { - - // - // At this point we've got a known-client. - // - // Record their ID in our connection - // - // The ID will be client-sent, for now. - // - cid := r.URL.Path[1:] - - // - // Ensure the name isn't already in-use. - // - p.assignedMutex.Lock() - tmp := p.assigned[cid] - p.assignedMutex.Unlock() - - if tmp != nil { - w.WriteHeader(http.StatusForbidden) - fmt.Fprintf(w, "The name you've chosen is already in use.") - return - - } // - // Upgrade, and handle any upgrade-errors. - // - conn, err := upgrader.Upgrade(w, r, nil) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - fmt.Fprintf(w, "Error upgrading the connection to a web-socket %s", err.Error()) - return - } - - // - // Store their name / connection in the map. - // - p.assignedMutex.Lock() - p.assigned[cid] = &connection{mutex: &sync.RWMutex{}, socket: conn} - p.assignedMutex.Unlock() - - // - // Now we're just going to busy-loop. - // - // Ensuring that we keep the client connection alive. + // Send the reply, and close the connection: // - go func() { - // - // We're connected. - // - connected := true - - // - // Get the structure, we just set. - // - p.assignedMutex.Lock() - connection := p.assigned[cid] - p.assignedMutex.Unlock() + fmt.Fprintf(bufrw, "%s", response) + bufrw.Flush() + conn.Close() - // - // Loop until we get a disconnection. - // - for connected { - - // - // Try to write .. - // - connection.mutex.Lock() - fmt.Printf("Keepalive..\n") - err := conn.WriteMessage(websocket.PingMessage, []byte("!")) - connection.mutex.Unlock() - - // - // If/when it failed .. - // - if err != nil { - - // - // Reap the client. - // - fmt.Printf("Client gone away - freeing the name '%s'\n", cid) - p.assignedMutex.Lock() - p.assigned[cid] = nil - p.assignedMutex.Unlock() - connected = false - continue - } - - // - // Otherwise wait for the future. - // - time.Sleep(5 * time.Second) - } - }() } // Execute is the entry-point to this sub-command. func (p *serveCmd) Execute(_ context.Context, f *flag.FlagSet, _ ...interface{}) subcommands.ExitStatus { // - // Setup a mapping between connections and handlers, and ensure - // that our mutex is ready. + // Connect to our MQ instance. // - p.assigned = make(map[string]*connection) - p.assignedMutex = &sync.RWMutex{} + opts := MQTT.NewClientOptions().AddBroker("tcp://localhost:1883") + p.mq = MQTT.NewClient(opts) + if token := p.mq.Connect(); token.Wait() && token.Error() != nil { + fmt.Printf("Failed to connect to MQ-server: %s\n", token.Error()) + return 1 + } // // We present a HTTP-server, and we handle all incoming @@ -354,7 +257,12 @@ func (p *serveCmd) Execute(_ context.Context, f *flag.FlagSet, _ ...interface{}) // a non-default http-server // srv := &http.Server{ - Addr: bind, + Addr: bind, + + // + // NOTE: These are a little generous, considering our + // proxy to the client will timeout after 10 seconds.. + // ReadTimeout: 300 * time.Second, WriteTimeout: 300 * time.Second, } @@ -362,7 +270,7 @@ func (p *serveCmd) Execute(_ context.Context, f *flag.FlagSet, _ ...interface{}) // // Launch the server. // - err := srv.ListenAndServe() + err = srv.ListenAndServe() if err != nil { fmt.Printf("\nError: %s\n", err.Error()) } From 78752a8054b1b5ffcfae2ad2f4dde808103e951b Mon Sep 17 00:00:00 2001 From: Steve Kemp Date: Wed, 24 Apr 2019 21:45:55 +0300 Subject: [PATCH 8/9] Fixed the build and the github CI tests --- cmd_client.go | 6 +----- cmd_server.go | 20 ++++++++++---------- 2 files changed, 11 insertions(+), 15 deletions(-) diff --git a/cmd_client.go b/cmd_client.go index 553a15c..eaec6a0 100644 --- a/cmd_client.go +++ b/cmd_client.go @@ -85,10 +85,6 @@ func (p *clientCmd) onMessage(client MQTT.Client, msg MQTT.Message) { // fetch := msg.Payload() - // - // TODO: Validate.. - // - // // If this is one of our replies ignore it. // @@ -217,7 +213,7 @@ func (p *clientCmd) Execute(_ context.Context, f *flag.FlagSet, _ ...interface{} if token := c.Subscribe(topic, 0, p.onMessage); token.Wait() && token.Error() != nil { fmt.Printf("Failed to subscribe to the MQ-topic:%s\n", token.Error()) - return 1 + os.Exit(1) } } diff --git a/cmd_server.go b/cmd_server.go index 448061c..c9b10cd 100644 --- a/cmd_server.go +++ b/cmd_server.go @@ -112,7 +112,7 @@ func (p *serveCmd) HTTPHandler(w http.ResponseWriter, r *http.Request) { // // Subscribe to the topic. // - sub_token := p.mq.Subscribe("clients/"+host, 0, func(client MQTT.Client, msg MQTT.Message) { + subToken := p.mq.Subscribe("clients/"+host, 0, func(client MQTT.Client, msg MQTT.Message) { // // This function will be executed when a message is received @@ -128,10 +128,10 @@ func (p *serveCmd) HTTPHandler(w http.ResponseWriter, r *http.Request) { response = tmp[2:] } }) - sub_token.Wait() - if sub_token.Error() != nil { - fmt.Printf("Error subscribing to clients/%s - %s\n", host, sub_token.Error()) - fmt.Fprintf(w, "Error subscribing to clients/%s - %s\n", host, sub_token.Error()) + subToken.Wait() + if subToken.Error() != nil { + fmt.Printf("Error subscribing to clients/%s - %s\n", host, subToken.Error()) + fmt.Fprintf(w, "Error subscribing to clients/%s - %s\n", host, subToken.Error()) return } @@ -158,11 +158,11 @@ func (p *serveCmd) HTTPHandler(w http.ResponseWriter, r *http.Request) { // // Just to cut down on resource-usage. // - unsub_token := p.mq.Unsubscribe("clients/" + host) - unsub_token.Wait() - if unsub_token.Error() != nil { + unsubToken := p.mq.Unsubscribe("clients/" + host) + unsubToken.Wait() + if unsubToken.Error() != nil { fmt.Printf("Failed to unsubscribe from clients/%s - %s\n", - host, unsub_token.Error()) + host, unsubToken.Error()) } // @@ -270,7 +270,7 @@ func (p *serveCmd) Execute(_ context.Context, f *flag.FlagSet, _ ...interface{}) // // Launch the server. // - err = srv.ListenAndServe() + err := srv.ListenAndServe() if err != nil { fmt.Printf("\nError: %s\n", err.Error()) } From 323bfad9c260370f477f721c50780882ac853878 Mon Sep 17 00:00:00 2001 From: Steve Kemp Date: Wed, 24 Apr 2019 21:47:37 +0300 Subject: [PATCH 9/9] Removed; since we don't use it yet, and we don't necessarily need it if we have an embedded queue --- signer/main.go | 48 ------------------------------------------------ 1 file changed, 48 deletions(-) delete mode 100644 signer/main.go diff --git a/signer/main.go b/signer/main.go deleted file mode 100644 index 1fce44f..0000000 --- a/signer/main.go +++ /dev/null @@ -1,48 +0,0 @@ -// package signer contains a couple of utility methods -// for signing a string and validating the signature of that -// string again. - -package signer - -import ( - "fmt" - - s "github.com/blevesearch/bleve/docs" -) - -// Signer holds our state -type Signer struct { - // public holds the public-key as a string. - public string - - // private holds the private key as a string - private string -} - -// New constructs our object. -// It assumes that either the public-key or the private-key will be specified, -// as strings. -func New(pubkey string, privkey string) *Signer { - obj := &Singer{public: pubkey, private: privkey} - return obj -} - -// Sign is called to generate a signature of the given string. -func (s *Signer) Sign(input string) (string, error) { - - if s.private == "" { - return "", fmt.Errorf("Private key not present - cannot sign") - } - - return "", fmt.Errorf("Unimplemented") -} - -// Validate is called to validate the signature of the given string. -func Validate(input string, signature string) (bool, error) { - - if s.public == "" { - return "", fmt.Errorf("Public key not present - cannot validate") - } - - return false, fmt.Errorf("Unimplemented") -}