diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml new file mode 100644 index 0000000..26ec9da --- /dev/null +++ b/.github/workflows/release.yml @@ -0,0 +1,51 @@ +name: Release + +on: + push: + tags: + - 'v*' + + +jobs: + + goreleaser: + runs-on: ubuntu-latest + permissions: + contents: write + steps: + + - uses: actions/checkout@v3 + with: + fetch-depth: 0 + + - run: git fetch --force --tags + + - uses: actions/setup-go@v3 + with: + go-version: stable + cache: true + + - uses: goreleaser/goreleaser-action@v4 + with: + distribution: goreleaser + version: latest + args: release --rm-dist + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + + + deploy: + runs-on: ubuntu-latest + permissions: + id-token: write # allow GitHub Action to authenticate with Deno Deploy + contents: read + steps: + + - uses: actions/checkout@v3 + + - uses: denoland/deployctl@v1 + with: + project: acp + root: edge + entrypoint: index.ts + diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..e89ec34 --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,80 @@ +name: Test + +on: + push: + branches: + - main + pull_request: + branches: + - main + + +jobs: + + checks: + name: Checks + runs-on: ubuntu-latest + timeout-minutes: 5 + steps: + + - uses: actions/checkout@v3 + + - uses: actions/setup-go@v3 + with: + go-version: stable + cache: true + + - uses: golangci/golangci-lint-action@v3 + with: + version: latest + + - name: Go mod + env: + DIFF_PATH: "go.mod go.sum" + run: | + go mod tidy + DIFF=$(git status --porcelain -- $DIFF_PATH) + if [ "$DIFF" ]; then + echo 'Please run `go mod tidy` before commits involving dependency change' + exit 1 + fi + + + build: + name: Build + strategy: + matrix: + os: [ubuntu-latest, windows-latest, macos-latest] + runs-on: ${{ matrix.os }} + timeout-minutes: 5 + steps: + + - uses: actions/checkout@v3 + + - uses: actions/setup-go@v3 + with: + go-version: stable + cache: true + + - run: go build ./cmd/acp + + + test: + name: Test + needs: build + strategy: + matrix: + os: [ubuntu-latest, windows-latest, macos-latest] + runs-on: ${{ matrix.os }} + timeout-minutes: 5 + steps: + + - uses: actions/checkout@v3 + + - uses: actions/setup-go@v3 + with: + go-version: stable + cache: true + + - run: go test ./... + diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..cde0123 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ + +dist/ diff --git a/.goreleaser.yaml b/.goreleaser.yaml new file mode 100644 index 0000000..db3dcf9 --- /dev/null +++ b/.goreleaser.yaml @@ -0,0 +1,45 @@ +before: + hooks: + - go mod tidy + +builds: + - main: ./cmd/acp + binary: acp + env: + - CGO_ENABLED=0 + goos: + - linux + - windows + - darwin + goarch: + - amd64 + - arm64 + flags: + - -trimpath + ldflags: + - -s -w -X main.buildTag={{.Version}} + +archives: + - format: tar.gz + name_template: >- + {{ .ProjectName }}_ + {{- title .Os }}_ + {{- if eq .Arch "amd64" }}x86_64 + {{- else if eq .Arch "386" }}i386 + {{- else }}{{ .Arch }}{{ end }} + {{- if .Arm }}v{{ .Arm }}{{ end }} + format_overrides: + - goos: windows + format: zip +checksum: + name_template: 'checksums.txt' +snapshot: + name_template: "{{ incpatch .Version }}-next" +changelog: + sort: asc + filters: + exclude: + - '^docs:' + - '^test:' + - '^chore:' + diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..c8836fa --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2023 Contextualist + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..45cf509 --- /dev/null +++ b/README.md @@ -0,0 +1,87 @@ +# acp + +![demo.gif](media/demo.gif) + +Highlights (aka "Why making another file-transfer tool?"): + +- Designed for personal use; no need to copy-paste a token / code for each transfer +- Rendezvous service runs distributively on [serverless edge function](https://deno.com/deploy/docs), + a robust solution with low latency worldwide. + +Other features: + +- End-to-end encryption (ChaCha20-Poly1305) +- P2P connection: LAN or WAN, with NAT transversal +- Compression (gzip) +- Cross platform: Linux, macOS, Windows +- Support transfering multiple files and directories + +See also [comparison table with similar tools](#similar-projects). + + +## Get started + +### Linux, macOS + +On any of your machine, run + +```bash +curl -fsS https://acp.deno.dev/get | sh +``` + +It sets up the current machine by downloading an executable and generating an identity. +By default the install path is `/usr/local/bin`; you can change it by `curl -fsS 'https://acp.deno.dev/get?dir=/path/to/bin' | sh` +At the end, it prints out the command for setting up your other machines. +You can run `acp --setup` any time you want to see the command. + +### Windows + +Currently there is no installation script for PowerShell (PR welcomes :) +You can download the released executable and put it on your `Path`. +Then run `acp --setup` to generate an identity. + + +## Usage + +```bash +# sender +acp path/to/files + +# receiver +acp # for receiving to pwd or +acp -d path/to/dest +``` + +You can run the sender and receiver in arbitrary order. +Whenever both sides are up and running, they will attempt to establish a P2P connection. +If you see messages such as `rendezvous timeout`, at least one side is behind a firewall or a strict NAT that prohibits P2P connection. + +For advanced configuration and self-hosting (it's free & takes only 5 minutes!), check out [the docs here](docs/advanced.md). + + +## Similar projects + +| | [trzsz](https://github.com/trzsz/trzsz) | scp | **acp** | [pcp](https://github.com/dennis-tra/pcp) | [croc](https://github.com/schollz/croc) | +| ------------------------------------------------------------ | --------------------------------------- | ---- | ------- | ---------------------------------------- | --------------------------------------- | +| can share files to other people /
receiver needs to enter a token | | | | O | O | +| LAN | O | O | O | O | O | +| WAN (local ↔︎ remote) | O | O | O | P | O | +| WAN (remote ↔︎ remote) | | P | O | P | O | +| relay | | | | P | O | +| p2p | | | O | O | O | +| distributive | | | O | O | | + +O: supported; P: partial support or limited usablity; (void): not supported or not relevant + +Don't judge a tool based on its apparent set of features. +This table only lists a few features, intending to differentiate the target scenarios of these tools. + + +## Acknowledgement + +Apart from the dependencies listed in [`go.mod`](go.mod), this project is also built upon + +- [**Deno Deploy**](https://deno.com/deploy) exposes low-level connection infomation and provides a fantastic `BroadcastChannel` API that makes "serverless" TCP hole-punching possible +- [**mholt/archiver**](https://github.com/mholt/archiver): tar/untar implementation +- [**libp2p/go-reuseport**](https://github.com/libp2p/go-reuseport): address reuse for TCP hole-punching +- [**egoist/bina**](https://github.com/egoist/bina): installation script diff --git a/cmd/acp/config.go b/cmd/acp/config.go new file mode 100644 index 0000000..f3de980 --- /dev/null +++ b/cmd/acp/config.go @@ -0,0 +1,134 @@ +package main + +import ( + "crypto/rand" + "encoding/base64" + "encoding/json" + "errors" + "fmt" + "os" + "path/filepath" + "runtime" +) + +const ( + idLen = 6 // 6 bytes, 8 base64 chars + pskLen = 32 // for ChaCha20-Poly1305 +) + +// Config defines the user-specific information for the transfer. +// In general, it needs to be consistent across all devices of a user. +type Config struct { + ID string `json:"id"` + PSK string `json:"psk"` + Server string `json:"server,omitempty"` + UseIPv6 bool `json:"ipv6,omitempty"` +} + +func (conf *Config) applyDefault() { + if conf.Server == "" { + conf.Server = "https://acp.deno.dev" + } +} + +var configFilename = filepath.Join(userConfigDir(), "acp", "config.json") + +func setup(confStr string) error { + if confStr != "" { + var conf Config + if err := json.Unmarshal([]byte(confStr), &conf); err != nil { + return err + } + if err := setConfig(&conf); err != nil { + return err + } + } else { + conf, err := getConfig() + if errors.Is(err, os.ErrNotExist) { + conf = &Config{ + ID: base64.StdEncoding.EncodeToString(randBytes(idLen)), + PSK: base64.StdEncoding.EncodeToString(randBytes(pskLen)), + } + if err := setConfig(conf); err != nil { + return err + } + } else if err != nil { + return err + } + confBytes, _ := json.Marshal(&conf) + confStr = string(confBytes) + } + fmt.Printf(`acp is set up on this machine. To set up another machine, run the following command there +(DO NOT share the command publicly as it contains encryption keys) + + curl -fsS https://acp.deno.dev/get | sh -s -- --setup-with '%s' + +(For Windows PowerShell, you need to download the executable to the Path manually) +If you already have the executable, run + + acp --setup-with '%s' + +`, confStr, confStr) + return nil +} + +func mustGetConfig() *Config { + conf, err := getConfig() + if err != nil { + if errors.Is(err, os.ErrNotExist) { + fmt.Fprintln(os.Stderr, "Config not found. If this is your first time using acp, run `acp --setup` to generate a config") + } else { + fmt.Fprintln(os.Stderr, err) + } + os.Exit(1) + } + return conf +} + +func getConfig() (*Config, error) { + configFile, err := os.Open(configFilename) + if err != nil { + return nil, fmt.Errorf("error opening config: %w", err) + } + var conf Config + err = json.NewDecoder(configFile).Decode(&conf) + if err != nil { + return nil, fmt.Errorf("error parsing config: %w", err) + } + return &conf, nil +} + +func setConfig(conf *Config) error { + _ = os.Mkdir(filepath.Dir(configFilename), 0o700) + configFile, err := os.Create(configFilename) + if err != nil { + return fmt.Errorf("error writing config to %s: %v", configFilename, err) + } + err = json.NewEncoder(configFile).Encode(conf) + if err != nil { + return fmt.Errorf("error writing config to %s: %v", configFilename, err) + } + return nil +} + +func userConfigDir() string { + switch runtime.GOOS { + case "linux", "darwin": + return filepath.Join(os.Getenv("HOME"), ".config") + case "windows": + return os.Getenv("APPDATA") + } + fmt.Fprintf(os.Stderr, "OS %s is not supported\n", runtime.GOOS) + os.Exit(1) + return "" +} + +func randBytes(n int) []byte { + b := make([]byte, n) + _, err := rand.Read(b) + if err != nil { + fmt.Fprintf(os.Stderr, "Error generating random bytes: %v\n", err) + os.Exit(1) + } + return b +} diff --git a/cmd/acp/config_test.go b/cmd/acp/config_test.go new file mode 100644 index 0000000..34e160a --- /dev/null +++ b/cmd/acp/config_test.go @@ -0,0 +1,57 @@ +package main + +import ( + "encoding/json" + "errors" + "os" + "path/filepath" + "testing" +) + +func TestSetup(t *testing.T) { + _ = os.Remove(configFilename) + if _, err := getConfig(); !errors.Is(err, os.ErrNotExist) { + t.Fatalf("Config exists before creation; err: %v", err) + } + if err := setup(""); err != nil { + t.Fatalf("Config initialization failed: %v", err) + } + conf, err := getConfig() + if err != nil { + t.Fatalf("Failed to get initialized config: %v", err) + } + if conf.ID == "" || conf.PSK == "" { + t.Fatalf("Initialized config does not have all necessary fields: %+v", conf) + } +} + +func TestSetupWith(t *testing.T) { + conf0 := Config{ + ID: "AAAAAAAA", + PSK: "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=", + Server: "http://localhost:8000", + UseIPv6: true, + } + conf0Bytes, _ := json.Marshal(&conf0) + if err := setup(string(conf0Bytes)); err != nil { + t.Fatalf("Config override failed: %v", err) + } + conf, err := getConfig() + if err != nil { + t.Fatalf("Failed to get config: %v", err) + } + if *conf != conf0 { + t.Fatalf("Config does not match the intented setup value: expect: %+v, got: %+v", conf0, conf) + } + + if err := setup(`{"ipv6":"true"}`); err == nil { + t.Fatalf("Setup failed to catch an invalid input") + } +} + +func TestMain(m *testing.M) { + configFilename = filepath.Join(os.TempDir(), "acp-test-config.json") + rc := m.Run() + os.Remove(configFilename) + os.Exit(rc) +} diff --git a/cmd/acp/main.go b/cmd/acp/main.go new file mode 100644 index 0000000..3a07add --- /dev/null +++ b/cmd/acp/main.go @@ -0,0 +1,110 @@ +package main + +import ( + "context" + "encoding/base64" + "errors" + "flag" + "fmt" + "io" + "os" + + tea "github.com/charmbracelet/bubbletea" + "github.com/contextualist/acp/pkg/pnet" + "github.com/contextualist/acp/pkg/tui" +) + +var buildTag string // build-time injected + +var ( + destination = flag.String("d", ".", "Save files to target directory / rename received file") + debug = flag.Bool("debug", false, "Enable debug logging") + doSetup = flag.Bool("setup", false, "Initialize config or display current config") + doSetupWith = flag.String("setup-with", "", "Initialize config with the specified value") + showVersion = flag.Bool("version", false, "Print version and exit") +) + +var logger tui.LoggerControl + +var exitStatement string + +func main() { + flag.Usage = func() { + fmt.Fprintf(flag.CommandLine.Output(), "%s (%s)\nUsage:\n", os.Args[0], buildTag) + flag.PrintDefaults() + } + flag.Parse() + if *showVersion { + fmt.Println(buildTag) + return + } + + defer func() { + if exitStatement != "" { + fmt.Fprintln(os.Stderr, exitStatement) + os.Exit(1) + } + }() + + if *doSetup || *doSetupWith != "" { + checkErr(setup(*doSetupWith)) + return + } + + filenames := flag.Args() + conf := mustGetConfig() + conf.applyDefault() + + ctx, userCancel := context.WithCancel(context.Background()) + logger = tui.NewLoggerControl(*debug) + loggerModel := tui.NewLoggerModel(logger) + go transfer(ctx, conf, filenames, loggerModel) + tui.RunProgram(loggerModel, userCancel) +} + +func transfer(ctx context.Context, conf *Config, filenames []string, loggerModel tea.Model) { + defer logger.End() + + conn, err := pnet.HolePunching(ctx, conf.Server+"/exchange", conf.ID, conf.UseIPv6, logger) + if errors.Is(err, context.Canceled) || !checkErr(err) { + return + } + + psk, err := base64.StdEncoding.DecodeString(conf.PSK) + if !checkErr(err) { + return + } + conn, err = encrypted(conn, psk) + if !checkErr(err) { + return + } + + stream, _ := conn.(io.ReadWriteCloser) + var status *tui.StatusControl + if !*debug { + status = tui.NewStatusControl() + stream = status.Monitor(stream) + logger.Next(tui.NewStatusModel(status)) + } + + if len(filenames) > 0 { + logger.Debugf("sending...") + err = sendFiles(filenames, stream) + } else { + logger.Debugf("receiving...") + err = receiveFiles(stream) + } + + if !*debug { + status.Next(loggerModel) + } + checkErr(err) +} + +func checkErr(err error) bool { + if err == nil { + return true + } + exitStatement = err.Error() + return false +} diff --git a/cmd/acp/stream.go b/cmd/acp/stream.go new file mode 100644 index 0000000..d3ee30c --- /dev/null +++ b/cmd/acp/stream.go @@ -0,0 +1,114 @@ +package main + +import ( + "archive/tar" + "fmt" + "io" + "net" + "os" + "path/filepath" + "strings" + + "github.com/klauspost/pgzip" + aead "github.com/shadowsocks/go-shadowsocks2/shadowaead" +) + +func encrypted(conn net.Conn, psk []byte) (net.Conn, error) { + cipher, err := aead.Chacha20Poly1305(psk) + conn = aead.NewConn(conn, cipher) + return conn, err +} + +func sendFiles(filenames []string, to io.WriteCloser) (err error) { + defer to.Close() + z := pgzip.NewWriter(to) + defer z.Close() + tz := tar.NewWriter(z) + defer tz.Close() + + for _, fname := range filenames { + fname, err = filepath.Abs(fname) + if err != nil { + return err + } + err = tarWalk(fname, tz) + if err != nil { + return fmt.Errorf("tar: %w", err) + } + } + return +} + +func receiveFiles(from io.ReadCloser) (err error) { + dest, destFile, err := parseDest(*destination) + if err != nil { + return + } + + defer from.Close() + z, err := pgzip.NewReader(from) + if err != nil { + return + } + defer z.Close() + tz := tar.NewReader(z) + + var theFile string + var hdr *tar.Header + for { + hdr, err = tz.Next() + if err == io.EOF { + err = nil + break + } + if err != nil { + return + } + + if !strings.ContainsRune(filepath.Clean(hdr.Name), os.PathSeparator) { + if theFile == "" { // capture the name if there's only one toplevel file/dir + theFile = hdr.Name + } else { + theFile = "N/A" + } + } + err = untarFile(hdr, tz, dest) + if err != nil { + return fmt.Errorf("untar: %w", err) + } + } + + if destFile != "" { + if theFile == "" { + return os.Remove(dest) + } + if theFile != "N/A" { // one file or dir + err = os.Rename(filepath.Join(dest, theFile), destFile) + if err != nil { + return + } + return os.Remove(dest) + } + err = os.Rename(dest, destFile) + if err != nil { + destFile = dest // fail to rename, we are OK with the tmpdir + } + logger.Infof("received more than one file or dir, saved to dir %#v", destFile) + } + return +} + +func parseDest(d string) (dest, destFile string, err error) { + if d == "" { + return d, "", nil + } + if info, err := os.Stat(d); err == nil && info.Mode().IsDir() { // an existed dest dir + return d, "", nil + } + p := filepath.Dir(d) + if info, err := os.Stat(p); err == nil && info.Mode().IsDir() { // first receive to a tempdir, then mv to destFile + p, err = os.MkdirTemp(p, "acp-tmp.") + return p, d, err + } + return "", "", fmt.Errorf("no such file or directory: %s", d) +} diff --git a/cmd/acp/tar.go b/cmd/acp/tar.go new file mode 100644 index 0000000..3183489 --- /dev/null +++ b/cmd/acp/tar.go @@ -0,0 +1,174 @@ +package main + +// Adapted from the unexported functions in pre-v4 of mholt/archiver +// Will consider moving away from vendoring once v4 is stablized + +import ( + "archive/tar" + "fmt" + "io" + "os" + "path/filepath" + "runtime" +) + +func tarWalk(source string, t *tar.Writer) error { + sourceInfo, err := os.Stat(source) + if err != nil { + return fmt.Errorf("%s: stat: %w", source, err) + } + sourceIsDir := sourceInfo.IsDir() + return filepath.Walk(source, func(fpath string, info os.FileInfo, err error) error { + if err != nil { + return fmt.Errorf("traversing %s: %w", fpath, err) + } + if info == nil { + return fmt.Errorf("no file info for %s", fpath) + } + // build the name to be used within the archive + relName, err := nameInArchive(sourceIsDir, source, fpath) + if err != nil { + return err + } + var file io.ReadCloser + if info.Mode().IsRegular() { + file, err = os.Open(fpath) + if err != nil { + return fmt.Errorf("%s: opening: %w", fpath, err) + } + defer file.Close() + } + err = addFile(t, info, relName, file) + if err != nil { + return fmt.Errorf("%s: writing: %v", fpath, err) + } + return nil + }) +} + +func nameInArchive(srcIsDir bool, src, fpath string) (string, error) { + name := filepath.Base(fpath) // start with the file or dir name + if srcIsDir { + // preserve internal directory structure; that's the path components + // between the source directory's leaf and this file's leaf + dir, err := filepath.Rel(filepath.Dir(src), filepath.Dir(fpath)) + if err != nil { + return "", err + } + // prepend the internal directory structure to the leaf name, + // and convert path separators to forward slashes as per spec + name = filepath.Join(filepath.ToSlash(dir), name) + } + return name, nil +} + +type namedFileInfo struct { + os.FileInfo + name string +} + +func (fi namedFileInfo) Name() string { + return fi.name +} + +func addFile(w *tar.Writer, info os.FileInfo, name string, file io.Reader) error { + var linkTarget string + if info.Mode()&os.ModeSymlink != 0 { + var err error + linkTarget, err = os.Readlink(name) + if err != nil { + return fmt.Errorf("%s: readlink: %v", name, err) + } + } + + hdr, err := tar.FileInfoHeader(namedFileInfo{info, name}, filepath.ToSlash(linkTarget)) + if err != nil { + return fmt.Errorf("%s: making header: %v", name, err) + } + err = w.WriteHeader(hdr) + if err != nil { + return fmt.Errorf("%s: writing header: %v", hdr.Name, err) + } + + if info.IsDir() { + return nil // directories have no contents + } + if hdr.Typeflag == tar.TypeReg { + _, err := io.Copy(w, file) + if err != nil { + return fmt.Errorf("%s: copying contents: %v", name, err) + } + } + return nil +} + +func untarFile(hdr *tar.Header, f io.Reader, dest string) error { + to := filepath.Join(dest, hdr.Name) + switch hdr.Typeflag { + case tar.TypeDir: + return mkdir(to) + case tar.TypeReg, tar.TypeRegA, tar.TypeChar, tar.TypeBlock, tar.TypeFifo: + return writeNewFile(to, f, hdr.FileInfo().Mode()) + case tar.TypeSymlink: + return writeNewSymbolicLink(to, hdr.Linkname) + case tar.TypeLink: + return writeNewHardLink(to, filepath.Join(dest, hdr.Linkname)) + case tar.TypeXGlobalHeader: + return nil // ignore the pax global header from git-generated tarballs + default: + return fmt.Errorf("%s: unknown type flag: %c", hdr.Name, hdr.Typeflag) + } +} + +func mkdir(dirPath string) error { + err := os.MkdirAll(dirPath, 0755) + if err != nil { + return fmt.Errorf("%s: making directory: %w", dirPath, err) + } + return nil +} + +func writeNewFile(fpath string, in io.Reader, fm os.FileMode) error { + err := os.MkdirAll(filepath.Dir(fpath), 0755) + if err != nil { + return fmt.Errorf("%s: making directory for file: %w", fpath, err) + } + out, err := os.Create(fpath) + if err != nil { + return fmt.Errorf("%s: creating new file: %w", fpath, err) + } + defer out.Close() + err = out.Chmod(fm) + if err != nil && runtime.GOOS != "windows" { + return fmt.Errorf("%s: changing file mode: %w", fpath, err) + } + _, err = io.Copy(out, in) + if err != nil { + return fmt.Errorf("%s: writing file: %w", fpath, err) + } + return nil +} + +func writeNewSymbolicLink(fpath string, target string) error { + err := os.MkdirAll(filepath.Dir(fpath), 0755) + if err != nil { + return fmt.Errorf("%s: making directory for file: %w", fpath, err) + } + err = os.Symlink(target, fpath) + if err != nil { + return fmt.Errorf("%s: making symbolic link for: %w", fpath, err) + } + return nil +} + +func writeNewHardLink(fpath string, target string) error { + err := os.MkdirAll(filepath.Dir(fpath), 0755) + if err != nil { + return fmt.Errorf("%s: making directory for file: %w", fpath, err) + } + err = os.Link(target, fpath) + if err != nil { + return fmt.Errorf("%s: making hard link for: %w", fpath, err) + } + return nil +} diff --git a/docs/advanced.md b/docs/advanced.md new file mode 100644 index 0000000..9d5aa42 --- /dev/null +++ b/docs/advanced.md @@ -0,0 +1,21 @@ + +## Advanced options + +`acp` stores config at `$HOME/.config/acp/config.json` (`%APPDATA%\acp\config.json` for Windows). +In general, you need to make sure that all devices use the same config. +After changing config on one device, run `acp --setup` to get the command for updating configs on other devices. + +List of configurable options: + +- `server` (default: `"https://acp.deno.dev"`): Endpoint for coordinating rendezvous +- `ipv6` (default: `false`): Establish P2P connection using IPv6 instead of IPv4. + Note that both ends of a connection need to use the same IP protocol. + +## Host the rendezvous service yourself + +Since the service is simply one TypeScript file, the easiest way to deploy is to use Deno Deploy playground. + +1. [Create a new playground project](https://dash.deno.com/new) (you need a Deno Deploy account; the free plan is more than enough), +2. Copy-paste [this file](../edge/index.ts). +3. Pick a subdomain name or bind it to your own domain. +4. Set `server` field of the acp config files to the domain. (See the section above) diff --git a/edge/index.ts b/edge/index.ts new file mode 100644 index 0000000..e78f726 --- /dev/null +++ b/edge/index.ts @@ -0,0 +1,122 @@ +import { serve } from "https://deno.land/std@0.171.0/http/server.ts"; + + +// (priAddr0|chanName) -> pubAddr1|priAddr1 +async function handleExchange(req: Request, connInfo: ConnInfo): Promise { + if (req.method != "POST") + return new Response("Invalid method", { status: 405 }) + if (req.headers.get("Content-Type") != "application/octet-stream") + return new Response("Invalid content type", { status: 415 }) + + const pubAddr = `${connInfo.remoteAddr.hostname}:${connInfo.remoteAddr.port}` + const conn = req.body!.getReader({ mode: "byob" }) + const [priAddr, chanName] = new TextDecoder().decode( + await receivePacket(conn) + ).split('|') + const x0 = `${pubAddr}|${priAddr}` + //console.log(`accepted from ${x0}`) + + const x1 = await exchange(chanName, x0, conn) + if (x1 == "") + return new Response("") + //console.log(`exchanged, got ${x1}`) + + const msg = marshallPacket(new TextEncoder().encode(x1)) + return new Response(msg) +} + + +const inbox = new Map void}>() + +async function exchange(name: string, x0: string, conn: ReadableStreamBYOBReader): Promise { + if (inbox.has(name)) { // the other party has set up an in-memory exchange + const { xa: x1, xb_resolve: x0_resolve } = inbox.get(name)! + x0_resolve(x0) + inbox.delete(name) + return x1 + } + + const [x1, source] = await Promise.any([ + // attempt to set up an in-memory exchange + new Promise((resolve) => { + inbox.set(name, { xa: x0, xb_resolve: resolve }) + }).then((x) => [x, "in-memory"]), + // attempt to do cross-regional exchange + exchangeViaBroadcastChannel(name, x0).then((x) => [x, "broadcast"]), + // if client closes early + clientClosed(conn).then(() => ["", "cancel"]), + ]) + if (source != "in-memory") { // cancel in-memory exchange + inbox.delete(name) + } + if (source != "broadcast") { + channels.get(name)?.close() + channels.delete(name) + } + return x1 +} + + +const channels = new Map() + +async function exchangeViaBroadcastChannel(name: string, x0: string): Promise { + const channel = new BroadcastChannel(name) + channels.set(name, channel) + channel.postMessage(x0) // if the other party has already subscribed + const x1 = await (new Promise((resolve) => { + channel.onmessage = (event: MessageEvent) => resolve(event.data) + })) + channel.postMessage(x0) // if the other party subscribes after the first post + return x1 +} + + +async function clientClosed(conn: ReadableStreamBYOBReader): Promise { + await conn.read(new Uint8Array(1)) + //console.log("client early close") +} + + +async function receivePacket(conn: ReadableStreamBYOBReader): Promise { + let buf = (await conn.read(new Uint8Array(2))).value + const lenCap = 1e3 + const plen = (buf[0]<<8) | buf[1] // uint16, BE + if (plen == 0 || plen > lenCap) { + console.error(`received suspicious packet header declearing len=${plen}`) + throw new Deno.errors.InvalidData + } + buf = (await conn.read(new Uint8Array(plen))).value + return buf +} + + +function marshallPacket(data: Uint8Array): Uint8Array { + const l = data.length + let p = new Uint8Array(2 + l) + p.set(new Uint8Array([(l>>8)&0xff, l&0xff])) // uint16, BE + p.set(data, 2) + return p +} + + +async function handler(req: Request, connInfo: ConnInfo): Promise { + const url = new URL(req.url) + switch (url.pathname) { + case "/get": + return new Response( + ` + curl -fsSL "https://bina.egoist.dev/contextualist/acp${url.search}" | sh + if [ $# -eq 0 ]; then acp --setup; else acp "$@"; fi + `, + { headers: { "Content-Type": "text/plain; charset=utf-8" } } + ) + case "/exchange": + return await handleExchange(req, connInfo) + default: + return new Response("Not found", { status: 404 }) + } +} + + +serve(handler) + diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..686758c --- /dev/null +++ b/go.mod @@ -0,0 +1,32 @@ +module github.com/contextualist/acp + +go 1.19 + +require ( + github.com/charmbracelet/bubbles v0.14.0 + github.com/charmbracelet/bubbletea v0.23.1 + github.com/dustin/go-humanize v1.0.0 + github.com/klauspost/pgzip v1.2.5 + github.com/shadowsocks/go-shadowsocks2 v0.1.5 + golang.org/x/sys v0.4.0 +) + +require ( + github.com/aymanbagabas/go-osc52 v1.2.1 // indirect + github.com/charmbracelet/lipgloss v0.6.0 // indirect + github.com/containerd/console v1.0.3 // indirect + github.com/klauspost/compress v1.15.14 // indirect + github.com/lucasb-eyer/go-colorful v1.2.0 // indirect + github.com/mattn/go-isatty v0.0.17 // indirect + github.com/mattn/go-localereader v0.0.1 // indirect + github.com/mattn/go-runewidth v0.0.14 // indirect + github.com/muesli/ansi v0.0.0-20221106050444-61f0cd9a192a // indirect + github.com/muesli/cancelreader v0.2.2 // indirect + github.com/muesli/reflow v0.3.0 // indirect + github.com/muesli/termenv v0.13.0 // indirect + github.com/riobard/go-bloom v0.0.0-20200614022211-cdc8013cb5b3 // indirect + github.com/rivo/uniseg v0.4.3 // indirect + golang.org/x/crypto v0.5.0 // indirect + golang.org/x/term v0.4.0 // indirect + golang.org/x/text v0.6.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..494993c --- /dev/null +++ b/go.sum @@ -0,0 +1,81 @@ +github.com/atotto/clipboard v0.1.4/go.mod h1:ZY9tmq7sm5xIbd9bOK4onWV4S6X0u6GY7Vn0Yu86PYI= +github.com/aymanbagabas/go-osc52 v1.0.3/go.mod h1:zT8H+Rk4VSabYN90pWyugflM3ZhpTZNC7cASDfUCdT4= +github.com/aymanbagabas/go-osc52 v1.2.1 h1:q2sWUyDcozPLcLabEMd+a+7Ea2DitxZVN9hTxab9L4E= +github.com/aymanbagabas/go-osc52 v1.2.1/go.mod h1:zT8H+Rk4VSabYN90pWyugflM3ZhpTZNC7cASDfUCdT4= +github.com/charmbracelet/bubbles v0.14.0 h1:DJfCwnARfWjZLvMglhSQzo76UZ2gucuHPy9jLWX45Og= +github.com/charmbracelet/bubbles v0.14.0/go.mod h1:bbeTiXwPww4M031aGi8UK2HT9RDWoiNibae+1yCMtcc= +github.com/charmbracelet/bubbletea v0.21.0/go.mod h1:GgmJMec61d08zXsOhqRC/AiOx4K4pmz+VIcRIm1FKr4= +github.com/charmbracelet/bubbletea v0.23.1 h1:CYdteX1wCiCzKNUlwm25ZHBIc1GXlYFyUIte8WPvhck= +github.com/charmbracelet/bubbletea v0.23.1/go.mod h1:JAfGK/3/pPKHTnAS8JIE2u9f61BjWTQY57RbT25aMXU= +github.com/charmbracelet/harmonica v0.2.0/go.mod h1:KSri/1RMQOZLbw7AHqgcBycp8pgJnQMYYT8QZRqZ1Ao= +github.com/charmbracelet/lipgloss v0.5.0/go.mod h1:EZLha/HbzEt7cYqdFPovlqy5FZPj0xFhg5SaqxScmgs= +github.com/charmbracelet/lipgloss v0.6.0 h1:1StyZB9vBSOyuZxQUcUwGr17JmojPNm87inij9N3wJY= +github.com/charmbracelet/lipgloss v0.6.0/go.mod h1:tHh2wr34xcHjC2HCXIlGSG1jaDF0S0atAUvBMP6Ppuk= +github.com/containerd/console v1.0.3 h1:lIr7SlA5PxZyMV30bDW0MGbiOPXwc63yRuCP0ARubLw= +github.com/containerd/console v1.0.3/go.mod h1:7LqA/THxQ86k76b8c/EMSiaJ3h1eZkMkXar0TQ1gf3U= +github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= +github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= +github.com/klauspost/compress v1.15.14 h1:i7WCKDToww0wA+9qrUZ1xOjp218vfFo3nTU6UHp+gOc= +github.com/klauspost/compress v1.15.14/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM= +github.com/klauspost/pgzip v1.2.5 h1:qnWYvvKqedOF2ulHpMG72XQol4ILEJ8k2wwRl/Km8oE= +github.com/klauspost/pgzip v1.2.5/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/lucasb-eyer/go-colorful v1.2.0 h1:1nnpGOrhyZZuNyfu1QjKiUICQ74+3FNCN69Aj6K7nkY= +github.com/lucasb-eyer/go-colorful v1.2.0/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0= +github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng= +github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-localereader v0.0.1 h1:ygSAOl7ZXTx4RdPYinUpg6W99U8jWvWi9Ye2JC/oIi4= +github.com/mattn/go-localereader v0.0.1/go.mod h1:8fBrzywKY7BI3czFoHkuzRoWE9C+EiG4R1k4Cjx5p88= +github.com/mattn/go-runewidth v0.0.10/go.mod h1:RAqKPSqVFrSLVXbA8x7dzmKdmGzieGRCM46jaSJTDAk= +github.com/mattn/go-runewidth v0.0.12/go.mod h1:RAqKPSqVFrSLVXbA8x7dzmKdmGzieGRCM46jaSJTDAk= +github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/mattn/go-runewidth v0.0.14 h1:+xnbZSEeDbOIg5/mE6JF0w6n9duR1l3/WmbinWVwUuU= +github.com/mattn/go-runewidth v0.0.14/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/muesli/ansi v0.0.0-20211018074035-2e021307bc4b/go.mod h1:fQuZ0gauxyBcmsdE3ZT4NasjaRdxmbCS0jRHsrWu3Ho= +github.com/muesli/ansi v0.0.0-20221106050444-61f0cd9a192a h1:jlDOeO5TU0pYlbc/y6PFguab5IjANI0Knrpg3u/ton4= +github.com/muesli/ansi v0.0.0-20221106050444-61f0cd9a192a/go.mod h1:CJlz5H+gyd6CUWT45Oy4q24RdLyn7Md9Vj2/ldJBSIo= +github.com/muesli/cancelreader v0.2.0/go.mod h1:3XuTXfFS2VjM+HTLZY9Ak0l6eUKfijIfMUZ4EgX0QYo= +github.com/muesli/cancelreader v0.2.2 h1:3I4Kt4BQjOR54NavqnDogx/MIoWBFa0StPA8ELUXHmA= +github.com/muesli/cancelreader v0.2.2/go.mod h1:3XuTXfFS2VjM+HTLZY9Ak0l6eUKfijIfMUZ4EgX0QYo= +github.com/muesli/reflow v0.2.1-0.20210115123740-9e1d0d53df68/go.mod h1:Xk+z4oIWdQqJzsxyjgl3P22oYZnHdZ8FFTHAQQt5BMQ= +github.com/muesli/reflow v0.3.0 h1:IFsN6K9NfGtjeggFP+68I4chLZV2yIKsXJFNZ+eWh6s= +github.com/muesli/reflow v0.3.0/go.mod h1:pbwTDkVPibjO2kyvBQRBxTWEEGDGq0FlB1BIKtnHY/8= +github.com/muesli/termenv v0.11.1-0.20220204035834-5ac8409525e0/go.mod h1:Bd5NYQ7pd+SrtBSrSNoBBmXlcY8+Xj4BMJgh8qcZrvs= +github.com/muesli/termenv v0.11.1-0.20220212125758-44cd13922739/go.mod h1:Bd5NYQ7pd+SrtBSrSNoBBmXlcY8+Xj4BMJgh8qcZrvs= +github.com/muesli/termenv v0.13.0 h1:wK20DRpJdDX8b7Ek2QfhvqhRQFZ237RGRO0RQ/Iqdy0= +github.com/muesli/termenv v0.13.0/go.mod h1:sP1+uffeLaEYpyOTb8pLCUctGcGLnoFjSn4YJK5e2bc= +github.com/riobard/go-bloom v0.0.0-20200614022211-cdc8013cb5b3 h1:f/FNXud6gA3MNr8meMVVGxhp+QBTqY91tM8HjEuMjGg= +github.com/riobard/go-bloom v0.0.0-20200614022211-cdc8013cb5b3/go.mod h1:HgjTstvQsPGkxUsCd2KWxErBblirPizecHcpD3ffK+s= +github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/rivo/uniseg v0.4.3 h1:utMvzDsuh3suAEnhH0RdHmoPbU648o6CvXxTx4SBMOw= +github.com/rivo/uniseg v0.4.3/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= +github.com/sahilm/fuzzy v0.1.0/go.mod h1:VFvziUEIMCrT6A6tw2RFIXPXXmzXbOsSHF0DOI8ZK9Y= +github.com/shadowsocks/go-shadowsocks2 v0.1.5 h1:PDSQv9y2S85Fl7VBeOMF9StzeXZyK1HakRm86CUbr28= +github.com/shadowsocks/go-shadowsocks2 v0.1.5/go.mod h1:AGGpIoek4HRno4xzyFiAtLHkOpcoznZEkAccaI/rplM= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= +golang.org/x/crypto v0.5.0 h1:U/0M97KRkSFvyD/3FSmdP5W5swImpNgle/EHFhOsQPE= +golang.org/x/crypto v0.5.0/go.mod h1:NK/OQwhpMQP3MwtdjgLlYHnH9ebylxKWv3e0fK+mkQU= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220204135822-1c1b9b1eba6a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220209214540-3681064d5158/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18= +golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.4.0 h1:O7UWfv5+A2qiuulQk30kVinPoMtoIPeVaKLEgLpVkvg= +golang.org/x/term v0.4.0/go.mod h1:9P2UbLfCdcvo3p/nzKvsmas4TnlujnuoV9hGgYzW1lQ= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.6.0 h1:3XmdazWV+ubf7QgHSTWeykHOci5oeekaGJBLkrkaw4k= +golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/media/demo.gif b/media/demo.gif new file mode 100644 index 0000000..069e38c Binary files /dev/null and b/media/demo.gif differ diff --git a/pkg/pnet/httpclient.go b/pkg/pnet/httpclient.go new file mode 100644 index 0000000..88dd064 --- /dev/null +++ b/pkg/pnet/httpclient.go @@ -0,0 +1,40 @@ +package pnet + +import ( + "context" + "net" + "net/http" +) + +type HTTPClient struct { + *http.Client + chLaddr chan net.Addr +} + +func NewHTTPClient(useIpv6 bool) *HTTPClient { + client := &HTTPClient{ + Client: &http.Client{}, + chLaddr: make(chan net.Addr, 1), + } + tr := &http.Transport{ + DialContext: func(ctx context.Context, _network, addr string) (net.Conn, error) { + network := "tcp4" + if useIpv6 { + network = "tcp6" + } + c, err := DialContext(ctx, network, ":0", addr) + if err != nil { + return nil, err + } + client.chLaddr <- c.LocalAddr() + return c, err + }, + ForceAttemptHTTP2: true, + } + client.Client.Transport = tr + return client +} + +func (cl *HTTPClient) GetLAddr() <-chan net.Addr { + return cl.chLaddr +} diff --git a/pkg/pnet/p2p.go b/pkg/pnet/p2p.go new file mode 100644 index 0000000..c1ca7c0 --- /dev/null +++ b/pkg/pnet/p2p.go @@ -0,0 +1,196 @@ +package pnet + +import ( + "context" + "encoding/binary" + "fmt" + "io" + "net" + "net/http" + "strings" + "time" +) + +type ( + lenType uint16 + + responseOrError struct { + *http.Response + error + } + + connInfo struct { + laddr string + peerLaddr string + peerRaddr string + } +) + +const ( + dialAttemptInterval = 500 * time.Millisecond + rendezvousTimeout = 5 * time.Second +) + +// Logger interface accepted by pnet for internal logging +type Logger interface { + Infof(format string, a ...any) + Debugf(format string, a ...any) +} + +var defaultLogger Logger + +// HolePunching negotiates via a rendezvous server with a peer with the same id to establish a connection. +func HolePunching(ctx context.Context, bridgeURL string, id string, useIPv6 bool, l Logger) (conn net.Conn, err error) { + defaultLogger = l + info, err := exchangeConnInfo(ctx, bridgeURL, id, useIPv6) + if err != nil { + return nil, err + } + return rendezvous(ctx, info) +} + +func exchangeConnInfo(ctx context.Context, bridgeURL string, id string, useIPv6 bool) (*connInfo, error) { + client := NewHTTPClient(useIPv6) + sendReader, sendWriter := io.Pipe() + defer sendWriter.Close() + reqCtx, cancelReqCtx := context.WithCancel(context.Background()) + go func() { + <-ctx.Done() + _, _ = sendWriter.Write([]byte{0xff}) // notify early close + cancelReqCtx() + }() + req, err := http.NewRequestWithContext(reqCtx, "POST", bridgeURL, sendReader) + if err != nil { + return nil, fmt.Errorf("failed to open a connection to the bridge: %w", err) + } + req.Header.Set("Content-Type", "application/octet-stream") + chRespOrErr := make(chan responseOrError) + go func() { + resp, err := client.Do(req) + if err != nil { + err = fmt.Errorf("failed to open a connection to the bridge: %w", err) + } + chRespOrErr <- responseOrError{resp, err} + }() + var laddr string + select { + case la := <-client.GetLAddr(): + laddr = la.String() + case <-ctx.Done(): + return nil, context.Canceled + } + + err = sendPacket(sendWriter, []byte(fmt.Sprintf("%s|%s", laddr, id))) + if err != nil { + return nil, fmt.Errorf("failed to communicate with the bridge: %w", err) + } + defaultLogger.Infof("waiting for peer...") + respOrErr := <-chRespOrErr + if respOrErr.error != nil { + return nil, respOrErr.error + } + recv, err := receivePacket(respOrErr.Response.Body) + _ = respOrErr.Response.Body.Close() + if err != nil { + return nil, fmt.Errorf("failed to communicate with the bridge: %w", err) + } + tmp := strings.Split(string(recv), "|") + peerLaddr, peerRaddr := tmp[0], tmp[1] + peerRaddrRe, _ := resolveAddr("tcp", peerRaddr) + peerRaddr = peerRaddrRe.String() + + return &connInfo{laddr, peerLaddr, peerRaddr}, nil +} + +func rendezvous(ctx context.Context, info *connInfo) (conn net.Conn, err error) { + defaultLogger.Infof("rendezvous with %s | %s", info.peerLaddr, info.peerRaddr) + chWin := make(chan net.Conn) + l, err := Listen(ctx, "tcp", info.laddr) + if err != nil { + return nil, fmt.Errorf("unable to set up rendezvous: %w", err) + } + defer l.Close() + cc := make(chan struct{}) + defer close(cc) + go accept(ctx, l, chWin, cc) + go connect(ctx, info.laddr, info.peerLaddr, chWin, cc) + if info.peerRaddr != info.peerLaddr { + go connect(ctx, info.laddr, info.peerRaddr, chWin, cc) + } + + select { + case conn = <-chWin: + case <-time.After(rendezvousTimeout): + return nil, fmt.Errorf("rendezvous timeout for %+v", info) + case <-ctx.Done(): + return nil, context.Canceled + } + return conn, nil +} + +func sendPacket(conn io.Writer, data []byte) (err error) { + if err = binary.Write(conn, binary.BigEndian, lenType(len(data))); err != nil { + return + } + if _, err = conn.Write(data); err != nil { + return + } + return +} + +func receivePacket(conn io.Reader) (data []byte, err error) { + var plen lenType + if err = binary.Read(conn, binary.BigEndian, &plen); err != nil { + return + } + buf := make([]byte, plen) + if _, err = io.ReadFull(conn, buf); err != nil { + return + } + return buf, nil +} + +func accept(ctx context.Context, l net.Listener, chWin chan<- net.Conn, cc <-chan struct{}) { + conn, err := l.Accept() + if err != nil { + return + } + select { + case chWin <- conn: + defaultLogger.Debugf("accepted %v", conn.LocalAddr()) + case <-cc: + conn.Close() + case <-ctx.Done(): + conn.Close() + } +} + +func connect(ctx context.Context, laddr, raddr string, chWin chan<- net.Conn, cc <-chan struct{}) { + var conn net.Conn + var err error + for { + select { + case <-cc: + return + default: + } + conn, err = DialContext(ctx, "tcp", laddr, raddr) + if err == nil { + break + } + time.Sleep(dialAttemptInterval) + select { + case <-time.After(dialAttemptInterval): + case <-ctx.Done(): + return + } + } + select { + case chWin <- conn: + defaultLogger.Debugf("connected %v->%v", laddr, raddr) + case <-cc: + conn.Close() + case <-ctx.Done(): + conn.Close() + } +} diff --git a/pkg/pnet/reuseaddr.go b/pkg/pnet/reuseaddr.go new file mode 100644 index 0000000..167a281 --- /dev/null +++ b/pkg/pnet/reuseaddr.go @@ -0,0 +1,46 @@ +package pnet + +// Adapted from libp2p/go-reuseport + +import ( + "context" + "fmt" + "net" +) + +var listenConfig = net.ListenConfig{Control: control} + +func Listen(ctx context.Context, network, address string) (net.Listener, error) { + return listenConfig.Listen(ctx, network, address) +} + +func ListenPacket(ctx context.Context, network, address string) (net.PacketConn, error) { + return listenConfig.ListenPacket(ctx, network, address) +} + +func DialContext(ctx context.Context, network, laddr, raddr string) (net.Conn, error) { + nla, err := resolveAddr(network, laddr) + if err != nil { + return nil, fmt.Errorf("dial failed to resolve laddr %v: %w", laddr, err) + } + d := net.Dialer{ + Control: control, + LocalAddr: nla, + } + return d.DialContext(ctx, network, raddr) +} + +func resolveAddr(network, address string) (net.Addr, error) { + switch network { + default: + return nil, net.UnknownNetworkError(network) + case "ip", "ip4", "ip6": + return net.ResolveIPAddr(network, address) + case "tcp", "tcp4", "tcp6": + return net.ResolveTCPAddr(network, address) + case "udp", "udp4", "udp6": + return net.ResolveUDPAddr(network, address) + case "unix", "unixgram", "unixpacket": + return net.ResolveUnixAddr(network, address) + } +} diff --git a/pkg/pnet/reuseaddr_unix.go b/pkg/pnet/reuseaddr_unix.go new file mode 100644 index 0000000..c8a9b27 --- /dev/null +++ b/pkg/pnet/reuseaddr_unix.go @@ -0,0 +1,24 @@ +//go:build !windows && !wasm + +package pnet + +import ( + "syscall" + + "golang.org/x/sys/unix" +) + +func control(network, address string, c syscall.RawConn) error { + var err error + _ = c.Control(func(fd uintptr) { + err = unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_REUSEADDR, 1) + if err != nil { + return + } + err = unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_REUSEPORT, 1) + if err != nil { + return + } + }) + return err +} diff --git a/pkg/pnet/reuseaddr_windows.go b/pkg/pnet/reuseaddr_windows.go new file mode 100644 index 0000000..6d130ac --- /dev/null +++ b/pkg/pnet/reuseaddr_windows.go @@ -0,0 +1,13 @@ +package pnet + +import ( + "syscall" + + "golang.org/x/sys/windows" +) + +func control(network, address string, c syscall.RawConn) (err error) { + return c.Control(func(fd uintptr) { + err = windows.SetsockoptInt(windows.Handle(fd), windows.SOL_SOCKET, windows.SO_REUSEADDR, 1) + }) +} diff --git a/pkg/tui/clearquit.go b/pkg/tui/clearquit.go new file mode 100644 index 0000000..dd5c124 --- /dev/null +++ b/pkg/tui/clearquit.go @@ -0,0 +1,9 @@ +package tui + +import tea "github.com/charmbracelet/bubbletea" + +type clearQuitModel struct{} + +func (clearQuitModel) Init() tea.Cmd { return func() tea.Msg { return struct{}{} } } +func (m clearQuitModel) Update(_ tea.Msg) (tea.Model, tea.Cmd) { return m, tea.Quit } +func (m clearQuitModel) View() string { return "" } diff --git a/pkg/tui/logger.go b/pkg/tui/logger.go new file mode 100644 index 0000000..25b6033 --- /dev/null +++ b/pkg/tui/logger.go @@ -0,0 +1,98 @@ +package tui + +import ( + "fmt" + "strings" + + tea "github.com/charmbracelet/bubbletea" +) + +type ( + logMsg string + + // A LoggerControl is the user handler for a LoggerModel + LoggerControl struct { + ch chan tea.Msg + debug bool + } +) + +func NewLoggerControl(debug bool) LoggerControl { + return LoggerControl{make(chan tea.Msg), debug} +} + +// Infof appends a log entry +func (c LoggerControl) Infof(format string, a ...any) { + c.send(format, a...) +} + +// Debugf appends a log entry when in debug mode +func (c LoggerControl) Debugf(format string, a ...any) { + if c.debug { + c.send(format, a...) + } +} + +func (c LoggerControl) send(format string, a ...any) { + c.ch <- logMsg(fmt.Sprintf(format, a...)) +} + +// Next switch the current model to the next one +func (c LoggerControl) Next(m tea.Model) { + c.ch <- modelSwitchMsg{m} +} + +// End terminates the program, preserving the log entries on screen +func (c LoggerControl) End() { + c.Next(nil) + close(c.ch) +} + +// A LoggerModel displays a stream of logs +type LoggerModel struct { + logger LoggerControl + logs []string +} + +func NewLoggerModel(c LoggerControl) tea.Model { + m := LoggerModel{logger: c} + return m +} + +func (m LoggerModel) waitForLog() tea.Msg { + return <-m.logger.ch +} + +func (m LoggerModel) Init() tea.Cmd { + return m.waitForLog +} + +func (m LoggerModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { + switch msg := msg.(type) { + case logMsg: + m.logs = append(m.logs, string(msg)) + return m, m.waitForLog + case modelSwitchMsg: + if msg.model == nil { + return m, tea.Quit // quit without clearing screen + } + return modelSwitchTo(msg.model), nil + case tea.KeyMsg: + switch msg.String() { + case "ctrl+c": + userCancel() + return modelSwitchTo(clearQuitModel{}), nil + } + } + return m, nil +} + +func (m LoggerModel) View() string { + if m.logger.debug { + return strings.Join(m.logs, "\n") + "\n" + } + if len(m.logs) == 0 { + return "" + } + return m.logs[len(m.logs)-1] + "\n" +} diff --git a/pkg/tui/status.go b/pkg/tui/status.go new file mode 100644 index 0000000..2bbc621 --- /dev/null +++ b/pkg/tui/status.go @@ -0,0 +1,117 @@ +package tui + +import ( + "fmt" + "io" + "time" + + "github.com/charmbracelet/bubbles/spinner" + tea "github.com/charmbracelet/bubbletea" + humanize "github.com/dustin/go-humanize" +) + +type ( + // A StatusControl is the user handler for a StausModel + StatusControl struct { + *meteredReadWriteCloser + chNext chan tea.Msg + } +) + +func NewStatusControl() *StatusControl { + return &StatusControl{ + chNext: make(chan tea.Msg), + } +} + +// Monitor wraps around a read/write stream for obtaining data transfer stats +func (c *StatusControl) Monitor(stream io.ReadWriteCloser) io.ReadWriteCloser { + c.meteredReadWriteCloser = newMeteredReadWriteCloser(stream, 300*time.Millisecond) + return c.meteredReadWriteCloser +} + +// Next switches to the next BubbleTea Model and shut down current StatusModel +func (c *StatusControl) Next(m tea.Model) { + c.chNext <- modelSwitchMsg{m} + close(c.chNext) +} + +type meteredReadWriteCloser struct { + io.ReadWriteCloser + rate, total uint64 + startTime time.Time + ticker *time.Ticker +} + +func newMeteredReadWriteCloser(inner io.ReadWriteCloser, interval time.Duration) *meteredReadWriteCloser { + ticker := time.NewTicker(interval) + m := &meteredReadWriteCloser{ + ReadWriteCloser: inner, + startTime: time.Now(), + ticker: ticker, + } + go func() { + for range ticker.C { + m.rate = uint64(float64(m.total) / time.Since(m.startTime).Seconds()) + } + }() + return m +} + +func (m *meteredReadWriteCloser) Read(p []byte) (n int, err error) { + n, err = m.ReadWriteCloser.Read(p) + m.total += uint64(n) + return +} +func (m *meteredReadWriteCloser) Write(p []byte) (n int, err error) { + n, err = m.ReadWriteCloser.Write(p) + m.total += uint64(n) + return +} +func (m *meteredReadWriteCloser) Close() error { + m.ticker.Stop() + return m.ReadWriteCloser.Close() +} + +// A StatusModel displays a updating stats of data transfer +type StatusModel struct { + spinner spinner.Model + status *StatusControl +} + +func NewStatusModel(c *StatusControl) tea.Model { + return StatusModel{ + spinner: spinner.New(spinner.WithSpinner(spinner.Points)), + status: c, + } +} + +func (m StatusModel) waitForNext() tea.Msg { + return <-m.status.chNext +} + +func (m StatusModel) Init() tea.Cmd { + return tea.Batch(m.spinner.Tick, m.waitForNext) +} + +func (m StatusModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { + switch msg := msg.(type) { + case spinner.TickMsg: + var cmd tea.Cmd + m.spinner, cmd = m.spinner.Update(msg) + return m, cmd + case modelSwitchMsg: + return modelSwitchTo(msg.model), nil + case tea.KeyMsg: + switch msg.String() { + case "ctrl+c": + userCancel() + return modelSwitchTo(clearQuitModel{}), nil + } + } + return m, nil +} + +func (m StatusModel) View() string { + return fmt.Sprintf("%s %6s/s %6s", m.spinner.View(), humanize.Bytes(m.status.rate), humanize.Bytes(m.status.total)) +} diff --git a/pkg/tui/tui.go b/pkg/tui/tui.go new file mode 100644 index 0000000..bad4175 --- /dev/null +++ b/pkg/tui/tui.go @@ -0,0 +1,36 @@ +package tui + +import ( + "context" + "fmt" + "os" + + tea "github.com/charmbracelet/bubbletea" +) + +var ( + programSingleton *tea.Program + userCancel context.CancelFunc +) + +// RunProgram runs a tea.Program with a tea.Model as the initial model, +// which can switch itself to other model in its Update func. +func RunProgram(model tea.Model, cancel context.CancelFunc) tea.Model { + programSingleton = tea.NewProgram(model) + userCancel = cancel + m, err := programSingleton.Run() + if err != nil { + fmt.Fprintf(os.Stderr, "BubbleTea: %v", err) + os.Exit(1) + } + return m +} + +type modelSwitchMsg struct { + model tea.Model +} + +func modelSwitchTo(m tea.Model) tea.Model { + go func() { programSingleton.Send(m.Init()()) }() + return m +}