From 81efa4be0f429a4ef2e2a005a865b574cc283278 Mon Sep 17 00:00:00 2001 From: MohammadReza Palide Date: Tue, 17 Oct 2023 02:13:28 +0000 Subject: [PATCH 1/4] remove dmsgget and dmsgpost --- cmd/dmsgget/commands/dmsgget.go | 372 ------------------------------ cmd/dmsgget/dmsgget.go | 8 - cmd/dmsgpost/commands/dmsgpost.go | 223 ------------------ cmd/dmsgpost/dmsgpost.go | 8 - docs/dmsgget.md | 65 ------ pkg/dmsgget/dmsgget.go | 270 ---------------------- pkg/dmsgget/dmsgget_test.go | 186 --------------- pkg/dmsgget/flags.go | 67 ------ pkg/dmsgget/progress_writer.go | 32 --- pkg/dmsgget/url.go | 40 ---- 10 files changed, 1271 deletions(-) delete mode 100644 cmd/dmsgget/commands/dmsgget.go delete mode 100644 cmd/dmsgget/dmsgget.go delete mode 100644 cmd/dmsgpost/commands/dmsgpost.go delete mode 100644 cmd/dmsgpost/dmsgpost.go delete mode 100644 docs/dmsgget.md delete mode 100644 pkg/dmsgget/dmsgget.go delete mode 100644 pkg/dmsgget/dmsgget_test.go delete mode 100644 pkg/dmsgget/flags.go delete mode 100644 pkg/dmsgget/progress_writer.go delete mode 100644 pkg/dmsgget/url.go diff --git a/cmd/dmsgget/commands/dmsgget.go b/cmd/dmsgget/commands/dmsgget.go deleted file mode 100644 index cbff55a24..000000000 --- a/cmd/dmsgget/commands/dmsgget.go +++ /dev/null @@ -1,372 +0,0 @@ -// Package commands cmd/dmsgget/commands/dmsgget.go -package commands - -import ( - "context" - "errors" - "fmt" - "io" - "log" - "net/http" - "net/url" - "os" - "path/filepath" - "sync/atomic" - "time" - - cc "github.com/ivanpirog/coloredcobra" - "github.com/sirupsen/logrus" - "github.com/skycoin/skywire-utilities/pkg/buildinfo" - "github.com/skycoin/skywire-utilities/pkg/cipher" - "github.com/skycoin/skywire-utilities/pkg/cmdutil" - "github.com/skycoin/skywire-utilities/pkg/logging" - "github.com/skycoin/skywire-utilities/pkg/skyenv" - "github.com/spf13/cobra" - - "github.com/skycoin/dmsg/pkg/disc" - dmsg "github.com/skycoin/dmsg/pkg/dmsg" - "github.com/skycoin/dmsg/pkg/dmsghttp" -) - -var ( - dmsgDisc string - dmsgSessions int - dmsggetTries int - dmsggetWait int - dmsggetOutput string - sk cipher.SecKey - dmsggetLog *logging.Logger - dmsggetAgent string - stdout bool - logLvl string -) - -func init() { - RootCmd.Flags().StringVarP(&dmsgDisc, "dmsg-disc", "d", "", "dmsg discovery url default:\n"+skyenv.DmsgDiscAddr) - RootCmd.Flags().IntVarP(&dmsgSessions, "sess", "e", 1, "number of dmsg servers to connect to") - RootCmd.Flags().StringVarP(&logLvl, "loglvl", "l", "", "[ debug | warn | error | fatal | panic | trace | info ]\033[0m") - RootCmd.Flags().StringVarP(&dmsggetOutput, "out", "o", ".", "output filepath") - RootCmd.Flags().BoolVarP(&stdout, "stdout", "n", false, "output to STDOUT") - RootCmd.Flags().IntVarP(&dmsggetTries, "try", "t", 1, "download attempts (0 unlimits)") - RootCmd.Flags().IntVarP(&dmsggetWait, "wait", "w", 0, "time to wait between fetches") - RootCmd.Flags().StringVarP(&dmsggetAgent, "agent", "a", "dmsgget/"+buildinfo.Version(), "identify as `AGENT`") - if os.Getenv("DMSGGET_SK") != "" { - sk.Set(os.Getenv("DMSGGET_SK")) //nolint - } - RootCmd.Flags().VarP(&sk, "sk", "s", "a random key is generated if unspecified\n\r") - var helpflag bool - RootCmd.SetUsageTemplate(help) - RootCmd.PersistentFlags().BoolVarP(&helpflag, "help", "h", false, "help for dmsgget") - RootCmd.SetHelpCommand(&cobra.Command{Hidden: true}) - RootCmd.PersistentFlags().MarkHidden("help") //nolint -} - -// RootCmd contains the root command -var RootCmd = &cobra.Command{ - Use: "get", - Short: "dmsg wget implementation - wget over dmsg", - Long: ` - ┌┬┐┌┬┐┌─┐┌─┐┌─┐┌─┐┌┬┐ - │││││└─┐│ ┬│ ┬├┤ │ - ─┴┘┴ ┴└─┘└─┘└─┘└─┘ ┴ `, - SilenceErrors: true, - SilenceUsage: true, - DisableSuggestions: true, - DisableFlagsInUseLine: true, - Version: buildinfo.Version(), - PreRun: func(cmd *cobra.Command, args []string) { - if dmsgDisc == "" { - dmsgDisc = skyenv.DmsgDiscAddr - } - }, - RunE: func(cmd *cobra.Command, args []string) error { - if dmsggetLog == nil { - dmsggetLog = logging.MustGetLogger("dmsgget") - } - if logLvl != "" { - if lvl, err := logging.LevelFromString(logLvl); err == nil { - logging.SetLevel(lvl) - } - } - //if the log level was not explicitly set but stdout was specified ; suppress all logging except panic - if logLvl == "" { - //suppress logging on stdout - if stdout { - if lvl, err := logging.LevelFromString("panic"); err == nil { - logging.SetLevel(lvl) - } - } - } - ctx, cancel := cmdutil.SignalContext(context.Background(), dmsggetLog) - defer cancel() - - pk, err := sk.PubKey() - if err != nil { - pk, sk = cipher.GenerateKeyPair() - } - - u, err := parseURL(args) - if err != nil { - return fmt.Errorf("failed to parse provided URL: %w", err) - } - - file, err := parseOutputFile(dmsggetOutput, u.URL.Path) - if err != nil { - return fmt.Errorf("failed to prepare output file: %w", err) - } - defer func() { - if fErr := file.Close(); fErr != nil { - dmsggetLog.WithError(fErr).Warn("Failed to close output file.") - } - if err != nil { - if rErr := os.RemoveAll(file.Name()); rErr != nil { - dmsggetLog.WithError(rErr).Warn("Failed to remove output file.") - } - } - }() - - dmsgC, closeDmsg, err := startDmsg(ctx, pk, sk) - if err != nil { - return fmt.Errorf("failed to start dmsg: %w", err) - } - defer closeDmsg() - - httpC := http.Client{Transport: dmsghttp.MakeHTTPTransport(ctx, dmsgC)} - - for i := 0; i < dmsggetTries; i++ { - if !stdout { - dmsggetLog.Debugf("Download attempt %d/%d ...", i, dmsggetTries) - } - - if _, err := file.Seek(0, 0); err != nil { - return fmt.Errorf("failed to reset file: %w", err) - } - if stdout { - if fErr := file.Close(); fErr != nil { - dmsggetLog.WithError(fErr).Warn("Failed to close output file.") - } - if err != nil { - if rErr := os.RemoveAll(file.Name()); rErr != nil { - dmsggetLog.WithError(rErr).Warn("Failed to remove output file.") - } - } - file = os.Stdout - } - if err := Download(ctx, dmsggetLog, &httpC, file, u.URL.String(), 0); err != nil { - dmsggetLog.WithError(err).Error() - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(time.Duration(dmsggetWait) * time.Second): - continue - } - } - - // download successful. - return nil - } - - return errors.New("all download attempts failed") - - }, -} - -// URL represents a dmsg http URL. -type URL struct { - dmsg.Addr - url.URL -} - -// Fill fills the internal fields from an URL string. -func (du *URL) fill(str string) error { - u, err := url.Parse(str) - if err != nil { - return err - } - - if u.Scheme == "" { - return errors.New("URL is missing a scheme") - } - - if u.Host == "" { - return errors.New("URL is missing a host") - } - - du.URL = *u - return du.Addr.Set(u.Host) -} - -func parseURL(args []string) (*URL, error) { - if len(args) == 0 { - return nil, errors.New("no URL(s) provided") - } - - if len(args) > 1 { - return nil, errors.New("multiple URLs is not yet supported") - } - - var out URL - if err := out.fill(args[0]); err != nil { - return nil, fmt.Errorf("provided URL is invalid: %w", err) - } - - return &out, nil -} - -func parseOutputFile(name string, urlPath string) (*os.File, error) { - stat, statErr := os.Stat(name) - if statErr != nil { - if os.IsNotExist(statErr) { - f, err := os.Create(name) //nolint - if err != nil { - return nil, err - } - return f, nil - } - return nil, statErr - } - - if stat.IsDir() { - f, err := os.Create(filepath.Join(name, urlPath)) //nolint - if err != nil { - return nil, err - } - return f, nil - } - - return nil, os.ErrExist -} - -func startDmsg(ctx context.Context, pk cipher.PubKey, sk cipher.SecKey) (dmsgC *dmsg.Client, stop func(), err error) { - dmsgC = dmsg.NewClient(pk, sk, disc.NewHTTP(dmsgDisc, &http.Client{}, dmsggetLog), &dmsg.Config{MinSessions: dmsgSessions}) - go dmsgC.Serve(context.Background()) - - stop = func() { - err := dmsgC.Close() - dmsggetLog.WithError(err).Debug("Disconnected from dmsg network.") - fmt.Printf("\n") - } - if !stdout { - dmsggetLog.WithField("public_key", pk.String()).WithField("dmsg_disc", dmsgDisc). - Debug("Connecting to dmsg network...") - } - - select { - case <-ctx.Done(): - stop() - return nil, nil, ctx.Err() - - case <-dmsgC.Ready(): - dmsggetLog.Debug("Dmsg network ready.") - return dmsgC, stop, nil - } -} - -// Download downloads a file from the given URL into 'w'. -func Download(ctx context.Context, log logrus.FieldLogger, httpC *http.Client, w io.Writer, urlStr string, maxSize int64) error { - req, err := http.NewRequest(http.MethodGet, urlStr, nil) - if err != nil { - log.WithError(err).Fatal("Failed to formulate HTTP request.") - } - resp, err := httpC.Do(req) - if err != nil { - return fmt.Errorf("failed to connect to HTTP server: %w", err) - } - if maxSize > 0 { - if resp.ContentLength > maxSize*1024 { - return fmt.Errorf("requested file size is more than allowed size: %d KB > %d KB", (resp.ContentLength / 1024), maxSize) - } - } - n, err := CancellableCopy(ctx, w, resp.Body, resp.ContentLength) - if err != nil { - return fmt.Errorf("download failed at %d/%dB: %w", n, resp.ContentLength, err) - } - defer func() { - if err := resp.Body.Close(); err != nil { - log.WithError(err).Warn("HTTP Response body closed with non-nil error.") - } - }() - - return nil -} - -type readerFunc func(p []byte) (n int, err error) - -func (rf readerFunc) Read(p []byte) (n int, err error) { return rf(p) } - -// CancellableCopy will call the Reader and Writer interface multiple time, in order -// to copy by chunk (avoiding loading the whole file in memory). -func CancellableCopy(ctx context.Context, w io.Writer, body io.ReadCloser, length int64) (int64, error) { - - n, err := io.Copy(io.MultiWriter(w, &ProgressWriter{Total: length}), readerFunc(func(p []byte) (int, error) { - - // golang non-blocking channel: https://gobyexample.com/non-blocking-channel-operations - select { - - // if context has been canceled - case <-ctx.Done(): - // stop process and propagate "Download Canceled" error - return 0, errors.New("Download Canceled") - default: - // otherwise just run default io.Reader implementation - return body.Read(p) - } - })) - return n, err -} - -// ProgressWriter prints the progress of a download to stdout. -type ProgressWriter struct { - // atomic requires 64-bit alignment for struct field access - Current int64 - Total int64 -} - -// Write implements io.Writer -func (pw *ProgressWriter) Write(p []byte) (int, error) { - n := len(p) - - current := atomic.AddInt64(&pw.Current, int64(n)) - total := atomic.LoadInt64(&pw.Total) - pc := fmt.Sprintf("%d%%", current*100/total) - if !stdout { - fmt.Printf("Downloading: %d/%dB (%s)", current, total, pc) - if current != total { - fmt.Print("\r") - } else { - fmt.Print("\n") - } - } - - return n, nil -} - -// Execute executes root CLI command. -func Execute() { - cc.Init(&cc.Config{ - RootCmd: RootCmd, - Headings: cc.HiBlue + cc.Bold, //+ cc.Underline, - Commands: cc.HiBlue + cc.Bold, - CmdShortDescr: cc.HiBlue, - Example: cc.HiBlue + cc.Italic, - ExecName: cc.HiBlue + cc.Bold, - Flags: cc.HiBlue + cc.Bold, - //FlagsDataType: cc.HiBlue, - FlagsDescr: cc.HiBlue, - NoExtraNewlines: true, - NoBottomNewline: true, - }) - if err := RootCmd.Execute(); err != nil { - log.Fatal("Failed to execute command: ", err) - } -} - -const help = "Usage:\r\n" + - " {{.UseLine}}{{if .HasAvailableSubCommands}}{{end}} {{if gt (len .Aliases) 0}}\r\n\r\n" + - "{{.NameAndAliases}}{{end}}{{if .HasAvailableSubCommands}}\r\n\r\n" + - "Available Commands:{{range .Commands}}{{if (or .IsAvailableCommand)}}\r\n " + - "{{rpad .Name .NamePadding }} {{.Short}}{{end}}{{end}}{{end}}{{if .HasAvailableLocalFlags}}\r\n\r\n" + - "Flags:\r\n" + - "{{.LocalFlags.FlagUsages | trimTrailingWhitespaces}}{{end}}{{if .HasAvailableInheritedFlags}}\r\n\r\n" + - "Global Flags:\r\n" + - "{{.InheritedFlags.FlagUsages | trimTrailingWhitespaces}}{{end}}\r\n\r\n" diff --git a/cmd/dmsgget/dmsgget.go b/cmd/dmsgget/dmsgget.go deleted file mode 100644 index 8b5840739..000000000 --- a/cmd/dmsgget/dmsgget.go +++ /dev/null @@ -1,8 +0,0 @@ -// package main cmd/dmsg-discovery/dmsg-discovery.go -package main - -import "github.com/skycoin/dmsg/cmd/dmsgget/commands" - -func main() { - commands.Execute() -} diff --git a/cmd/dmsgpost/commands/dmsgpost.go b/cmd/dmsgpost/commands/dmsgpost.go deleted file mode 100644 index c90d5a936..000000000 --- a/cmd/dmsgpost/commands/dmsgpost.go +++ /dev/null @@ -1,223 +0,0 @@ -// Package commands cmd/dmsgpost/commands/dmsgpost.go -package commands - -import ( - "context" - "errors" - "fmt" - "io" - "log" - "net/http" - "net/url" - "os" - "strings" - - cc "github.com/ivanpirog/coloredcobra" - "github.com/skycoin/skywire-utilities/pkg/buildinfo" - "github.com/skycoin/skywire-utilities/pkg/cipher" - "github.com/skycoin/skywire-utilities/pkg/cmdutil" - "github.com/skycoin/skywire-utilities/pkg/logging" - "github.com/skycoin/skywire-utilities/pkg/skyenv" - "github.com/spf13/cobra" - - "github.com/skycoin/dmsg/pkg/disc" - "github.com/skycoin/dmsg/pkg/dmsg" - "github.com/skycoin/dmsg/pkg/dmsghttp" -) - -var ( - dmsgDisc string - dmsgSessions int - dmsgpostData string - // dmsgpostHeader string - sk cipher.SecKey - dmsgpostLog *logging.Logger - dmsgpostAgent string - logLvl string -) - -func init() { - RootCmd.Flags().StringVarP(&dmsgDisc, "dmsg-disc", "c", "", "dmsg discovery url default:\n"+skyenv.DmsgDiscAddr) - RootCmd.Flags().IntVarP(&dmsgSessions, "sess", "e", 1, "number of dmsg servers to connect to") - RootCmd.Flags().StringVarP(&logLvl, "loglvl", "l", "", "[ debug | warn | error | fatal | panic | trace | info ]\033[0m") - RootCmd.Flags().StringVarP(&dmsgpostData, "data", "d", "", "dmsghttp POST data") - // RootCmd.Flags().StringVarP(&dmsgpostHeader, "header", "H", "", "Pass custom header(s) to server") - RootCmd.Flags().StringVarP(&dmsgpostAgent, "agent", "a", "dmsgpost/"+buildinfo.Version(), "identify as `AGENT`") - if os.Getenv("dmsgpost_SK") != "" { - sk.Set(os.Getenv("dmsgpost_SK")) //nolint - } - RootCmd.Flags().VarP(&sk, "sk", "s", "a random key is generated if unspecified\n\r") - var helpflag bool - RootCmd.SetUsageTemplate(help) - RootCmd.PersistentFlags().BoolVarP(&helpflag, "help", "h", false, "help for dmsgpost") - RootCmd.SetHelpCommand(&cobra.Command{Hidden: true}) - RootCmd.PersistentFlags().MarkHidden("help") //nolint -} - -// RootCmd containsa the root dmsgpost command -var RootCmd = &cobra.Command{ - Use: "post", - Short: "dmsgpost", - Long: ` - ┌┬┐┌┬┐┌─┐┌─┐┌─┐┌─┐┌─┐┌┬┐ - │││││└─┐│ ┬├─┘│ │└─┐ │ - ─┴┘┴ ┴└─┘└─┘┴ └─┘└─┘ ┴ `, - SilenceErrors: true, - SilenceUsage: true, - DisableSuggestions: true, - DisableFlagsInUseLine: true, - Version: buildinfo.Version(), - PreRun: func(cmd *cobra.Command, args []string) { - if dmsgDisc == "" { - dmsgDisc = skyenv.DmsgDiscAddr - } - }, - Run: func(cmd *cobra.Command, args []string) { - if dmsgpostLog == nil { - dmsgpostLog = logging.MustGetLogger("dmsgpost") - } - if logLvl != "" { - if lvl, err := logging.LevelFromString(logLvl); err == nil { - logging.SetLevel(lvl) - } - } - - ctx, cancel := cmdutil.SignalContext(context.Background(), dmsgpostLog) - defer cancel() - - pk, err := sk.PubKey() - if err != nil { - pk, sk = cipher.GenerateKeyPair() - } - - u, err := parseURL(args) - if err != nil { - dmsgpostLog.WithError(err).Fatal("failed to parse provided URL") - } - - dmsgC, closeDmsg, err := startDmsg(ctx, pk, sk) - if err != nil { - dmsgpostLog.WithError(err).Fatal("failed to start dmsg") - } - defer closeDmsg() - - httpC := http.Client{Transport: dmsghttp.MakeHTTPTransport(ctx, dmsgC)} - - req, err := http.NewRequest(http.MethodPost, u.URL.String(), strings.NewReader(dmsgpostData)) - if err != nil { - dmsgpostLog.WithError(err).Fatal("Failed to formulate HTTP request.") - } - req.Header.Set("Content-Type", "text/plain") - - resp, err := httpC.Do(req) - if err != nil { - dmsgpostLog.WithError(err).Fatal("Failed to execute HTTP request.") - } - - defer func() { - if err := resp.Body.Close(); err != nil { - dmsgpostLog.WithError(err).Fatal("Failed to close response body") - } - }() - respBody, err := io.ReadAll(resp.Body) - if err != nil { - dmsgpostLog.WithError(err).Fatal("Failed to read respose body.") - } - fmt.Println(string(respBody)) - }, -} - -// URL represents a dmsg http URL. -type URL struct { - dmsg.Addr - url.URL -} - -// Fill fills the internal fields from an URL string. -func (du *URL) fill(str string) error { - u, err := url.Parse(str) - if err != nil { - return err - } - - if u.Scheme == "" { - return errors.New("URL is missing a scheme") - } - - if u.Host == "" { - return errors.New("URL is missing a host") - } - - du.URL = *u - return du.Addr.Set(u.Host) -} - -func parseURL(args []string) (*URL, error) { - if len(args) == 0 { - return nil, errors.New("no URL(s) provided") - } - - if len(args) > 1 { - return nil, errors.New("multiple URLs is not yet supported") - } - - var out URL - if err := out.fill(args[0]); err != nil { - return nil, fmt.Errorf("provided URL is invalid: %w", err) - } - - return &out, nil -} - -func startDmsg(ctx context.Context, pk cipher.PubKey, sk cipher.SecKey) (dmsgC *dmsg.Client, stop func(), err error) { - dmsgC = dmsg.NewClient(pk, sk, disc.NewHTTP(dmsgDisc, &http.Client{}, dmsgpostLog), &dmsg.Config{MinSessions: dmsgSessions}) - go dmsgC.Serve(context.Background()) - - stop = func() { - err := dmsgC.Close() - dmsgpostLog.WithError(err).Debug("Disconnected from dmsg network.") - fmt.Printf("\n") - } - dmsgpostLog.WithField("public_key", pk.String()).WithField("dmsg_disc", dmsgDisc). - Debug("Connecting to dmsg network...") - - select { - case <-ctx.Done(): - stop() - return nil, nil, ctx.Err() - - case <-dmsgC.Ready(): - dmsgpostLog.Debug("Dmsg network ready.") - return dmsgC, stop, nil - } -} - -// Execute executes root CLI command. -func Execute() { - cc.Init(&cc.Config{ - RootCmd: RootCmd, - Headings: cc.HiBlue + cc.Bold, //+ cc.Underline, - Commands: cc.HiBlue + cc.Bold, - CmdShortDescr: cc.HiBlue, - Example: cc.HiBlue + cc.Italic, - ExecName: cc.HiBlue + cc.Bold, - Flags: cc.HiBlue + cc.Bold, - //FlagsDataType: cc.HiBlue, - FlagsDescr: cc.HiBlue, - NoExtraNewlines: true, - NoBottomNewline: true, - }) - if err := RootCmd.Execute(); err != nil { - log.Fatal("Failed to execute command: ", err) - } -} - -const help = "Usage:\r\n" + - " {{.UseLine}}{{if .HasAvailableSubCommands}}{{end}} {{if gt (len .Aliases) 0}}\r\n\r\n" + - "{{.NameAndAliases}}{{end}}{{if .HasAvailableSubCommands}}\r\n\r\n" + - "Available Commands:{{range .Commands}}{{if (or .IsAvailableCommand)}}\r\n " + - "{{rpad .Name .NamePadding }} {{.Short}}{{end}}{{end}}{{end}}{{if .HasAvailableLocalFlags}}\r\n\r\n" + - "Flags:\r\n" + - "{{.LocalFlags.FlagUsages | trimTrailingWhitespaces}}{{end}}{{if .HasAvailableInheritedFlags}}\r\n\r\n" + - "Global Flags:\r\n" + - "{{.InheritedFlags.FlagUsages | trimTrailingWhitespaces}}{{end}}\r\n\r\n" diff --git a/cmd/dmsgpost/dmsgpost.go b/cmd/dmsgpost/dmsgpost.go deleted file mode 100644 index 3e073e563..000000000 --- a/cmd/dmsgpost/dmsgpost.go +++ /dev/null @@ -1,8 +0,0 @@ -// package main cmd/dmsgpost/dmsgpost.go -package main - -import "github.com/skycoin/dmsg/cmd/dmsgpost/commands" - -func main() { - commands.Execute() -} diff --git a/docs/dmsgget.md b/docs/dmsgget.md deleted file mode 100644 index dc643834f..000000000 --- a/docs/dmsgget.md +++ /dev/null @@ -1,65 +0,0 @@ -# Dmsgget - -`dmsgget` is a utility exec which can download from HTTP servers hosted over the `dmsg` network (similar to a simplified `wget` over `dmsg`). - -``` -$ dmsgget --help - - Skycoin dmsgget v0.1.0, wget over dmsg. - Usage: dmsgget [OPTION]... [URL] - - -O FILE - write documents to FILE (default ".") - -U AGENT - identify as AGENT (default "dmsgget/v0.1.0") - -dmsg-disc URL - dmsg discovery URL (default "http://dmsgd.skywire.skycoin.com") - -dmsg-sessions NUMBER - connect to NUMBER of dmsg servers (default 1) - -h - -help - print this help - -t NUMBER - set number of retries to NUMBER (0 unlimits) (default 1) - -w SECONDS - wait SECONDS between retrievals -``` - -### Example usage - -In this example, we will use the `dmsg` network where the `dmsg.Discovery` address is `http://dmsgd.skywire.skycoin.com`. However, any `dmsg.Discovery` would work. - -First, lets create a folder where we will host files to serve over `dmsg` and create a `hello.txt` file within. - -```shell script -// Create serving folder. -$ mkdir /tmp/dmsghttp -p - -// Create file. -$ echo 'Hello World!' > /tmp/dmsghttp/hello.txt -``` - -Next, let's serve this over `http` via `dmsg` as transport. We have an example exec for this located within `/example/dmsgget/dmsg-example-http-server`. - -```shell script -# Generate public/private key pair -$ go run ./examples/dmsgget/gen-keys/gen-keys.go -# PK: 038dde2d050803db59e2ad19e5a6db0f58f8419709fc65041c48b0cb209bb7a851 -# SK: e5740e093bd472c2730b0a58944a5dee220d415de62acf45d1c559f56eea2b2d - -# Run dmsg http server. -# (replace 'e5740e093bd472c2730b0a58944a5dee220d415de62acf45d1c559f56eea2b2d' with the SK returned from above command) -$ go run ./examples/dmsgget/dmsg-example-http-server/dmsg-example-http-server.go --dir /tmp/dmsghttp --sk e5740e093bd472c2730b0a58944a5dee220d415de62acf45d1c559f56eea2b2d -``` - -Now we can use `dsmgget` to download the hosted file. Open a new terminal and run the following. - -```shell script -# Replace '038dde2d050803db59e2ad19e5a6db0f58f8419709fc65041c48b0cb209bb7a851' with the generated PK. -$ dmsgget dmsg://038dde2d050803db59e2ad19e5a6db0f58f8419709fc65041c48b0cb209bb7a851:80/hello.txt - -# Check downloaded file. -$ cat hello.txt -# Hello World! -``` - diff --git a/pkg/dmsgget/dmsgget.go b/pkg/dmsgget/dmsgget.go deleted file mode 100644 index 0193d6dbc..000000000 --- a/pkg/dmsgget/dmsgget.go +++ /dev/null @@ -1,270 +0,0 @@ -// Package dmsgget pkg/dmsgget/dmsgget.go -package dmsgget - -import ( - "context" - "errors" - "flag" - "fmt" - "io" - "net/http" - "os" - "path/filepath" - "time" - - jsoniter "github.com/json-iterator/go" - "github.com/sirupsen/logrus" - "github.com/skycoin/skywire-utilities/pkg/cipher" - "github.com/skycoin/skywire-utilities/pkg/logging" - - "github.com/skycoin/dmsg/pkg/disc" - dmsg "github.com/skycoin/dmsg/pkg/dmsg" - "github.com/skycoin/dmsg/pkg/dmsghttp" -) - -var json = jsoniter.ConfigFastest - -// DmsgGet contains the logic for dmsgget (wget over dmsg). -type DmsgGet struct { - startF startupFlags - dmsgF dmsgFlags - dlF downloadFlags - httpF httpFlags - fs *flag.FlagSet -} - -// New creates a new DmsgGet instance. -func New(fs *flag.FlagSet) *DmsgGet { - dg := &DmsgGet{fs: fs} - - for _, fg := range dg.flagGroups() { - fg.Init(fs) - } - - w := fs.Output() - flag.Usage = func() { - _, _ = fmt.Fprintf(w, "Skycoin %s %s, wget over dmsg.\n", ExecName, Version) - _, _ = fmt.Fprintf(w, "Usage: %s [OPTION]... [URL]\n\n", ExecName) - flag.PrintDefaults() - _, _ = fmt.Fprintln(w, "") - } - - return dg -} - -// String implements io.Stringer -func (dg *DmsgGet) String() string { - m := make(map[string]interface{}) - for _, fg := range dg.flagGroups() { - m[fg.Name()] = fg - } - j, err := json.Marshal(m) - if err != nil { - panic(err) - } - return string(j) -} - -func (dg *DmsgGet) flagGroups() []FlagGroup { - return []FlagGroup{&dg.startF, &dg.dmsgF, &dg.dlF, &dg.httpF} -} - -// Run runs the download logic. -func (dg *DmsgGet) Run(ctx context.Context, log *logging.Logger, skStr string, args []string) (err error) { - if log == nil { - log = logging.MustGetLogger("dmsgget") - } - - if dg.startF.Help { - dg.fs.Usage() - return nil - } - - pk, sk, err := parseKeyPair(skStr) - if err != nil { - return fmt.Errorf("failed to parse provided key pair: %w", err) - } - - u, err := parseURL(args) - if err != nil { - return fmt.Errorf("failed to parse provided URL: %w", err) - } - - file, err := parseOutputFile(dg.dlF.Output, u.URL.Path) - if err != nil { - return fmt.Errorf("failed to prepare output file: %w", err) - } - defer func() { - if fErr := file.Close(); fErr != nil { - log.WithError(fErr).Warn("Failed to close output file.") - } - if err != nil { - if rErr := os.RemoveAll(file.Name()); rErr != nil { - log.WithError(rErr).Warn("Failed to remove output file.") - } - } - }() - - dmsgC, closeDmsg, err := dg.StartDmsg(ctx, log, pk, sk) - if err != nil { - return fmt.Errorf("failed to start dmsg: %w", err) - } - defer closeDmsg() - - httpC := http.Client{Transport: dmsghttp.MakeHTTPTransport(ctx, dmsgC)} - - for i := 0; i < dg.dlF.Tries; i++ { - log.Infof("Download attempt %d/%d ...", i, dg.dlF.Tries) - - if _, err := file.Seek(0, 0); err != nil { - return fmt.Errorf("failed to reset file: %w", err) - } - - if err := Download(ctx, log, &httpC, file, u.URL.String(), 0); err != nil { - log.WithError(err).Error() - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(time.Duration(dg.dlF.Wait) * time.Second): - continue - } - } - - // download successful. - return nil - } - - return errors.New("all download attempts failed") -} - -func parseKeyPair(skStr string) (pk cipher.PubKey, sk cipher.SecKey, err error) { - if skStr == "" { - pk, sk = cipher.GenerateKeyPair() - return - } - - if err = sk.Set(skStr); err != nil { - return - } - - pk, err = sk.PubKey() - return -} - -func parseURL(args []string) (*URL, error) { - if len(args) == 0 { - return nil, ErrNoURLs - } - - if len(args) > 1 { - return nil, ErrMultipleURLsNotSupported - } - - var out URL - if err := out.Fill(args[0]); err != nil { - return nil, fmt.Errorf("provided URL is invalid: %w", err) - } - - return &out, nil -} - -func parseOutputFile(name string, urlPath string) (*os.File, error) { - stat, statErr := os.Stat(name) - if statErr != nil { - if os.IsNotExist(statErr) { - f, err := os.Create(name) //nolint - if err != nil { - return nil, err - } - return f, nil - } - return nil, statErr - } - - if stat.IsDir() { - f, err := os.Create(filepath.Join(name, urlPath)) //nolint - if err != nil { - return nil, err - } - return f, nil - } - - return nil, os.ErrExist -} - -// StartDmsg create dsmg client instance -func (dg *DmsgGet) StartDmsg(ctx context.Context, log *logging.Logger, pk cipher.PubKey, sk cipher.SecKey) (dmsgC *dmsg.Client, stop func(), err error) { - dmsgC = dmsg.NewClient(pk, sk, disc.NewHTTP(dg.dmsgF.Disc, &http.Client{}, log), &dmsg.Config{MinSessions: dg.dmsgF.Sessions}) - go dmsgC.Serve(context.Background()) - - stop = func() { - err := dmsgC.Close() - log.WithError(err).Info("Disconnected from dmsg network.") - } - - log.WithField("public_key", pk.String()).WithField("dmsg_disc", dg.dmsgF.Disc). - Info("Connecting to dmsg network...") - - select { - case <-ctx.Done(): - stop() - return nil, nil, ctx.Err() - - case <-dmsgC.Ready(): - log.Info("Dmsg network ready.") - return dmsgC, stop, nil - } -} - -// Download downloads a file from the given URL into 'w'. -func Download(ctx context.Context, log logrus.FieldLogger, httpC *http.Client, w io.Writer, urlStr string, maxSize int64) error { - req, err := http.NewRequest(http.MethodGet, urlStr, nil) - if err != nil { - log.WithError(err).Fatal("Failed to formulate HTTP request.") - } - resp, err := httpC.Do(req) - if err != nil { - return fmt.Errorf("failed to connect to HTTP server: %w", err) - } - if maxSize > 0 { - if resp.ContentLength > maxSize*1024 { - return fmt.Errorf("requested file size is more than allowed size: %d KB > %d KB", (resp.ContentLength / 1024), maxSize) - } - } - n, err := CancellableCopy(ctx, w, resp.Body, resp.ContentLength) - if err != nil { - return fmt.Errorf("download failed at %d/%dB: %w", n, resp.ContentLength, err) - } - defer func() { - if err := resp.Body.Close(); err != nil { - log.WithError(err).Warn("HTTP Response body closed with non-nil error.") - } - }() - - return nil -} - -type readerFunc func(p []byte) (n int, err error) - -func (rf readerFunc) Read(p []byte) (n int, err error) { return rf(p) } - -// CancellableCopy will call the Reader and Writer interface multiple time, in order -// to copy by chunk (avoiding loading the whole file in memory). -func CancellableCopy(ctx context.Context, w io.Writer, body io.ReadCloser, length int64) (int64, error) { - - n, err := io.Copy(io.MultiWriter(w, &ProgressWriter{Total: length}), readerFunc(func(p []byte) (int, error) { - - // golang non-blocking channel: https://gobyexample.com/non-blocking-channel-operations - select { - - // if context has been canceled - case <-ctx.Done(): - // stop process and propagate "Download Canceled" error - return 0, errors.New("Download Canceled") - default: - // otherwise just run default io.Reader implementation - return body.Read(p) - } - })) - return n, err -} diff --git a/pkg/dmsgget/dmsgget_test.go b/pkg/dmsgget/dmsgget_test.go deleted file mode 100644 index c194e0022..000000000 --- a/pkg/dmsgget/dmsgget_test.go +++ /dev/null @@ -1,186 +0,0 @@ -// Package dmsgget pkg/dmsgget/dmsgget_test.go -package dmsgget - -import ( - "context" - "fmt" - "net/http" - "os" - "path/filepath" - "testing" - "time" - - "github.com/go-chi/chi/v5" - "github.com/skycoin/skywire-utilities/pkg/cipher" - "github.com/skycoin/skywire-utilities/pkg/cmdutil" - "github.com/skycoin/skywire-utilities/pkg/logging" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "golang.org/x/net/nettest" - - "github.com/skycoin/dmsg/pkg/disc" - dmsg "github.com/skycoin/dmsg/pkg/dmsg" - "github.com/skycoin/dmsg/pkg/dmsghttp" -) - -const ( - nSrvs = 2 - maxSessions = 100 -) - -// Serve a HTTP server over dmsg, and have multiple clients download a document simultaneously. -// Arrange: -// - Typical dmsg environment. -// - Dmsg client that serves a HTTP server. -// Act: -// - Start multiple dmsg clients that download from the HTTP server. -// Assert: -// - Ensure the downloads all succeed. -// - Ensure the downloaded data (of all downloads) is the same as the original document. -func TestDownload(t *testing.T) { - const ( - fileSize = 64 - dlClients = 2 // number of clients to download from HTTP server. - ) - - // Arrange: Prepare file to be downloaded. - srcData := cipher.RandByte(fileSize) - src := makeFile(t, srcData) - - // Arrange: Start dmsg environment. - dc := startDmsgEnv(t, nSrvs, maxSessions) - - // Arrange: Start dmsg client that serves a http server which hosts the src file. - hsAddr := runHTTPSrv(t, dc, src.Name()) - // Arrange: Download results (dst files and client errors). - dsts := make([]*os.File, dlClients) - errs := make([]chan error, dlClients) - for i := range dsts { - dsts[i] = makeFile(t, nil) - errs[i] = make(chan error, 1) - } - - // Act: Download - for i := 0; i < dlClients; i++ { - func(i int) { - log := logging.MustGetLogger(fmt.Sprintf("dl_client_%d", i)) - ctx, cancel := cmdutil.SignalContext(context.Background(), log) - defer cancel() - err := Download(ctx, log, newHTTPClient(t, dc), dsts[i], hsAddr, fileSize) - - errs[i] <- err - close(errs[i]) - }(i) - } - - // Assert: Ensure download finishes without error and downloaded file is the same as src. - for i := 0; i < dlClients; i++ { - assert.NoError(t, <-errs[i]) - - dstData, err := os.ReadFile(dsts[i].Name()) - assert.NoErrorf(t, err, "[%d] failed to read destination file", i) - assert.Equalf(t, srcData, dstData, "[%d] destination file data is not equal", i) - } -} - -func makeFile(t *testing.T, data []byte) *os.File { - f, err := os.CreateTemp(os.TempDir(), "dmsgget_test_file_*") - require.NoError(t, err) - - t.Cleanup(func() { - assert.NoError(t, f.Close()) - assert.NoError(t, os.Remove(f.Name())) - }) - - if data != nil { - n, err := f.Write(data) - require.NoError(t, err) - require.Len(t, data, n) - } - - return f -} - -func startDmsgEnv(t *testing.T, nSrvs, maxSessions int) disc.APIClient { - dc := disc.NewMock(0) - - for i := 0; i < nSrvs; i++ { - pk, sk := cipher.GenerateKeyPair() - - conf := dmsg.ServerConfig{ - MaxSessions: maxSessions, - UpdateInterval: 0, - } - srv := dmsg.NewServer(pk, sk, dc, &conf, nil) - srv.SetLogger(logging.MustGetLogger(fmt.Sprintf("server_%d", i))) - - lis, err := nettest.NewLocalListener("tcp") - require.NoError(t, err) - - errCh := make(chan error, 1) - go func() { - errCh <- srv.Serve(lis, "") - close(errCh) - }() - - t.Cleanup(func() { - // listener is also closed when dmsg server is closed - assert.NoError(t, srv.Close()) - assert.NoError(t, <-errCh) - }) - } - - return dc -} - -func runHTTPSrv(t *testing.T, dc disc.APIClient, fName string) string { - pk, sk := cipher.GenerateKeyPair() - httpPath := filepath.Base(fName) - - dmsgC := dmsg.NewClient(pk, sk, dc, nil) - go dmsgC.Serve(context.Background()) - t.Cleanup(func() { assert.NoError(t, dmsgC.Close()) }) - <-dmsgC.Ready() - - r := chi.NewRouter() - r.HandleFunc("/"+httpPath, func(w http.ResponseWriter, r *http.Request) { - http.ServeFile(w, r, fName) - }) - - lis, err := dmsgC.Listen(80) - require.NoError(t, err) - - errCh := make(chan error, 1) - srv := &http.Server{ - ReadTimeout: 3 * time.Second, - WriteTimeout: 3 * time.Second, - IdleTimeout: 30 * time.Second, - ReadHeaderTimeout: 3 * time.Second, - Handler: r, - } - go func() { - errCh <- srv.Serve(lis) - close(errCh) - }() - - t.Cleanup(func() { - assert.NoError(t, lis.Close()) - assert.EqualError(t, <-errCh, dmsg.ErrEntityClosed.Error()) - }) - - return fmt.Sprintf("http://%s/%s", pk.String(), httpPath) -} - -func newHTTPClient(t *testing.T, dc disc.APIClient) *http.Client { - pk, sk := cipher.GenerateKeyPair() - - dmsgC := dmsg.NewClient(pk, sk, dc, nil) - go dmsgC.Serve(context.Background()) - t.Cleanup(func() { assert.NoError(t, dmsgC.Close()) }) - <-dmsgC.Ready() - - log := logging.MustGetLogger("http_client") - ctx, cancel := cmdutil.SignalContext(context.Background(), log) - defer cancel() - return &http.Client{Transport: dmsghttp.MakeHTTPTransport(ctx, dmsgC)} -} diff --git a/pkg/dmsgget/flags.go b/pkg/dmsgget/flags.go deleted file mode 100644 index 22c5fae54..000000000 --- a/pkg/dmsgget/flags.go +++ /dev/null @@ -1,67 +0,0 @@ -// Package dmsgget pkg/dmsgget/flags.go -package dmsgget - -import ( - "flag" - - "github.com/skycoin/skywire-utilities/pkg/buildinfo" -) - -// ExecName contains the execution name. -const ExecName = "dmsgget" - -// Version contains the version string. -var Version = buildinfo.Version() - -// FlagGroup represents a group of flags. -type FlagGroup interface { - Name() string - Init(fs *flag.FlagSet) -} - -type startupFlags struct { - Help bool -} - -func (f *startupFlags) Name() string { return "Startup" } - -func (f *startupFlags) Init(fs *flag.FlagSet) { - fs.BoolVar(&f.Help, "help", false, "print this help") - fs.BoolVar(&f.Help, "h", false, "") -} - -type dmsgFlags struct { - Disc string - Sessions int -} - -func (f *dmsgFlags) Name() string { return "Dmsg" } - -func (f *dmsgFlags) Init(fs *flag.FlagSet) { - fs.StringVar(&f.Disc, "dmsg-disc", "http://dmsgd.skywire.skycoin.com", "dmsg discovery `URL`") - fs.IntVar(&f.Sessions, "dmsg-sessions", 1, "connect to `NUMBER` of dmsg servers") -} - -type downloadFlags struct { - Output string - Tries int - Wait int -} - -func (f *downloadFlags) Name() string { return "Download" } - -func (f *downloadFlags) Init(fs *flag.FlagSet) { - fs.StringVar(&f.Output, "O", ".", "write documents to `FILE`") - fs.IntVar(&f.Tries, "t", 1, "set number of retries to `NUMBER` (0 unlimits)") - fs.IntVar(&f.Wait, "w", 0, "wait `SECONDS` between retrievals") -} - -type httpFlags struct { - UserAgent string -} - -func (f *httpFlags) Name() string { return "HTTP" } - -func (f *httpFlags) Init(fs *flag.FlagSet) { - fs.StringVar(&f.UserAgent, "U", ExecName+"/"+Version, "identify as `AGENT`") -} diff --git a/pkg/dmsgget/progress_writer.go b/pkg/dmsgget/progress_writer.go deleted file mode 100644 index d9415554e..000000000 --- a/pkg/dmsgget/progress_writer.go +++ /dev/null @@ -1,32 +0,0 @@ -// Package dmsgget pkg/dmsgget/progress_writer.go -package dmsgget - -import ( - "fmt" - "sync/atomic" -) - -// ProgressWriter prints the progress of a download to stdout. -type ProgressWriter struct { - // atomic requires 64-bit alignment for struct field access - Current int64 - Total int64 -} - -// Write implements io.Writer -func (pw *ProgressWriter) Write(p []byte) (int, error) { - n := len(p) - - current := atomic.AddInt64(&pw.Current, int64(n)) - total := atomic.LoadInt64(&pw.Total) - pc := fmt.Sprintf("%d%%", current*100/total) - - fmt.Printf("Downloading: %d/%dB (%s)", current, total, pc) - if current != total { - fmt.Print("\r") - } else { - fmt.Print("\n") - } - - return n, nil -} diff --git a/pkg/dmsgget/url.go b/pkg/dmsgget/url.go deleted file mode 100644 index 6a3962c84..000000000 --- a/pkg/dmsgget/url.go +++ /dev/null @@ -1,40 +0,0 @@ -// Package dmsgget pkg/dmsgget/url.go -package dmsgget - -import ( - "errors" - "net/url" - - dmsg "github.com/skycoin/dmsg/pkg/dmsg" -) - -// Errors related to URLs. -var ( - ErrNoURLs = errors.New("no URLs provided") - ErrMultipleURLsNotSupported = errors.New("multiple URLs is not yet supported") -) - -// URL represents a dmsg http URL. -type URL struct { - dmsg.Addr - url.URL -} - -// Fill fills the internal fields from an URL string. -func (du *URL) Fill(str string) error { - u, err := url.Parse(str) - if err != nil { - return err - } - - if u.Scheme == "" { - return errors.New("URL is missing a scheme") - } - - if u.Host == "" { - return errors.New("URL is missing a host") - } - - du.URL = *u - return du.Addr.Set(u.Host) -} From 6d972236c5838434255380be63057d7f8ffbea9f Mon Sep 17 00:00:00 2001 From: MohammadReza Palide Date: Tue, 17 Oct 2023 02:13:56 +0000 Subject: [PATCH 2/4] add dmsgcurl --- .goreleaser-linux.yml | 32 +-- .goreleaser-windows.yml | 8 +- Makefile | 2 +- README.md | 2 +- cmd/dmsg/dmsg.go | 6 +- cmd/dmsgcurl/commands/dmsgcurl.go | 399 ++++++++++++++++++++++++++++++ cmd/dmsgcurl/dmsgcurl.go | 8 + docs/dmsgcurl.md | 65 +++++ pkg/dmsgcurl/dmsgcurl.go | 270 ++++++++++++++++++++ pkg/dmsgcurl/dmsgcurl_test.go | 186 ++++++++++++++ pkg/dmsgcurl/flags.go | 67 +++++ pkg/dmsgcurl/progress_writer.go | 32 +++ pkg/dmsgcurl/url.go | 40 +++ 13 files changed, 1091 insertions(+), 26 deletions(-) create mode 100644 cmd/dmsgcurl/commands/dmsgcurl.go create mode 100644 cmd/dmsgcurl/dmsgcurl.go create mode 100644 docs/dmsgcurl.md create mode 100644 pkg/dmsgcurl/dmsgcurl.go create mode 100644 pkg/dmsgcurl/dmsgcurl_test.go create mode 100644 pkg/dmsgcurl/flags.go create mode 100644 pkg/dmsgcurl/progress_writer.go create mode 100644 pkg/dmsgcurl/url.go diff --git a/.goreleaser-linux.yml b/.goreleaser-linux.yml index 055a0d187..3f4f3b719 100644 --- a/.goreleaser-linux.yml +++ b/.goreleaser-linux.yml @@ -121,8 +121,8 @@ builds: main: ./cmd/dmsg-server/ ldflags: -s -w -linkmode external -extldflags '-static' -buildid= -X github.com/skycoin/skywire-utilities/pkg/buildinfo.version=v{{.Version}} -X github.com/skycoin/skywire-utilities/pkg/buildinfo.commit={{.ShortCommit}} -X github.com/skycoin/skywire-utilities/pkg/buildinfo.date={{.Date}} - - id: dmsgget-amd64 - binary: dmsgget + - id: dmsgcurl-amd64 + binary: dmsgcurl goos: - linux goarch: @@ -130,11 +130,11 @@ builds: env: - CGO_ENABLED=1 - CC=/home/runner/work/dmsg/dmsg/musl-data/x86_64-linux-musl-cross/bin/x86_64-linux-musl-gcc - main: ./cmd/dmsgget/ + main: ./cmd/dmsgcurl/ ldflags: -s -w -linkmode external -extldflags '-static' -buildid= -X github.com/skycoin/skywire-utilities/pkg/buildinfo.version=v{{.Version}} -X github.com/skycoin/skywire-utilities/pkg/buildinfo.commit={{.ShortCommit}} -X github.com/skycoin/skywire-utilities/pkg/buildinfo.date={{.Date}} - - id: dmsgget-arm64 - binary: dmsgget + - id: dmsgcurl-arm64 + binary: dmsgcurl goos: - linux goarch: @@ -142,11 +142,11 @@ builds: env: - CGO_ENABLED=1 - CC=/home/runner/work/dmsg/dmsg/musl-data/aarch64-linux-musl-cross/bin/aarch64-linux-musl-gcc - main: ./cmd/dmsgget/ + main: ./cmd/dmsgcurl/ ldflags: -s -w -linkmode external -extldflags '-static' -buildid= -X github.com/skycoin/skywire-utilities/pkg/buildinfo.version=v{{.Version}} -X github.com/skycoin/skywire-utilities/pkg/buildinfo.commit={{.ShortCommit}} -X github.com/skycoin/skywire-utilities/pkg/buildinfo.date={{.Date}} - - id: dmsgget-arm - binary: dmsgget + - id: dmsgcurl-arm + binary: dmsgcurl goos: - linux goarch: @@ -156,11 +156,11 @@ builds: env: - CGO_ENABLED=1 - CC=/home/runner/work/dmsg/dmsg/musl-data/arm-linux-musleabi-cross/bin/arm-linux-musleabi-gcc - main: ./cmd/dmsgget/ + main: ./cmd/dmsgcurl/ ldflags: -s -w -linkmode external -extldflags '-static' -buildid= -X github.com/skycoin/skywire-utilities/pkg/buildinfo.version=v{{.Version}} -X github.com/skycoin/skywire-utilities/pkg/buildinfo.commit={{.ShortCommit}} -X github.com/skycoin/skywire-utilities/pkg/buildinfo.date={{.Date}} - - id: dmsgget-armhf - binary: dmsgget + - id: dmsgcurl-armhf + binary: dmsgcurl goos: - linux goarch: @@ -170,7 +170,7 @@ builds: env: - CGO_ENABLED=1 - CC=/home/runner/work/dmsg/dmsg/musl-data/arm-linux-musleabihf-cross/bin/arm-linux-musleabihf-gcc - main: ./cmd/dmsgget/ + main: ./cmd/dmsgcurl/ ldflags: -s -w -linkmode external -extldflags '-static' -buildid= -X github.com/skycoin/skywire-utilities/pkg/buildinfo.version=v{{.Version}} -X github.com/skycoin/skywire-utilities/pkg/buildinfo.commit={{.ShortCommit}} -X github.com/skycoin/skywire-utilities/pkg/buildinfo.date={{.Date}} - id: dmsgpty-ui-amd64 @@ -339,7 +339,7 @@ archives: - dmsg-server-amd64 - dmsgpty-ui-amd64 - dmsgpty-cli-amd64 - - dmsgget-amd64 + - dmsgcurl-amd64 - dmsgpty-host-amd64 - id: arm64 @@ -351,7 +351,7 @@ archives: - dmsg-server-arm64 - dmsgpty-ui-arm64 - dmsgpty-cli-arm64 - - dmsgget-arm64 + - dmsgcurl-arm64 - dmsgpty-host-arm64 - id: arm @@ -363,7 +363,7 @@ archives: - dmsg-server-arm - dmsgpty-ui-arm - dmsgpty-cli-arm - - dmsgget-arm + - dmsgcurl-arm - dmsgpty-host-arm - id: armhf @@ -375,7 +375,7 @@ archives: - dmsg-server-armhf - dmsgpty-ui-armhf - dmsgpty-cli-armhf - - dmsgget-armhf + - dmsgcurl-armhf - dmsgpty-host-armhf checksum: diff --git a/.goreleaser-windows.yml b/.goreleaser-windows.yml index 5f2c976c1..246420e1b 100644 --- a/.goreleaser-windows.yml +++ b/.goreleaser-windows.yml @@ -39,8 +39,8 @@ builds: main: ./cmd/dmsg-server/ ldflags: -s -w -X github.com/skycoin/skywire-utilities/pkg/buildinfo.version=v{{.Version}} -X github.com/skycoin/skywire-utilities/pkg/buildinfo.commit={{.ShortCommit}} -X github.com/skycoin/skywire-utilities/pkg/buildinfo.date={{.Date}} - - id: dmsgget - binary: dmsgget + - id: dmsgcurl + binary: dmsgcurl goos: - windows goarch: @@ -48,7 +48,7 @@ builds: - 386 env: - CGO_ENABLED=0 - main: ./cmd/dmsgget/ + main: ./cmd/dmsgcurl/ ldflags: -s -w -X github.com/skycoin/skywire-utilities/pkg/buildinfo.version=v{{.Version}} -X github.com/skycoin/skywire-utilities/pkg/buildinfo.commit={{.ShortCommit}} -X github.com/skycoin/skywire-utilities/pkg/buildinfo.date={{.Date}} - id: dmsgpty-ui @@ -95,7 +95,7 @@ archives: builds: - dmsg-discovery - dmsg-server - - dmsgget + - dmsgcurl - dmsgpty-cli - dmsgpty-ui - dmsgpty-host diff --git a/Makefile b/Makefile index 5febef58b..ee3ffd1bf 100644 --- a/Makefile +++ b/Makefile @@ -98,7 +98,7 @@ dep: ## Sorts dependencies ${OPTS} go mod vendor -v ${OPTS} go mod tidy -v -install: ## Install `dmsg-discovery`, `dmsg-server`, `dmsgget`,`dmsgpty-cli`, `dmsgpty-host`, `dmsgpty-ui` +install: ## Install `dmsg-discovery`, `dmsg-server`, `dmsgcurl`,`dmsgpty-cli`, `dmsgpty-host`, `dmsgpty-ui` ${OPTS} go install ${BUILD_OPTS} ./cmd/* build: ## Build binaries into ./bin diff --git a/README.md b/README.md index 3c892831f..8f862e246 100644 --- a/README.md +++ b/README.md @@ -27,7 +27,7 @@ The connection between a `dmsg.Client` and `dmsg.Server` is called a `dmsg.Sessi ## Dmsg tools and libraries -- [`dmsgget`](./docs/dmsgget.md) - Simplified `wget` over `dmsg`. +- [`dmsgcurl`](./docs/dmsgcurl.md) - Simplified `curl` over `dmsg`. - [`dmsgpty`](./docs/dmsgpty.md) - Simplified `SSH` over `dmsg`. ## Additional resources - [`dmsg` examples.](./examples) diff --git a/cmd/dmsg/dmsg.go b/cmd/dmsg/dmsg.go index 94199683c..a43872de7 100644 --- a/cmd/dmsg/dmsg.go +++ b/cmd/dmsg/dmsg.go @@ -9,9 +9,8 @@ import ( dmsgdisc "github.com/skycoin/dmsg/cmd/dmsg-discovery/commands" dmsgserver "github.com/skycoin/dmsg/cmd/dmsg-server/commands" - dmsgget "github.com/skycoin/dmsg/cmd/dmsgget/commands" + dmsgcurl "github.com/skycoin/dmsg/cmd/dmsgcurl/commands" dmsghttp "github.com/skycoin/dmsg/cmd/dmsghttp/commands" - dmsgpost "github.com/skycoin/dmsg/cmd/dmsgpost/commands" dmsgptycli "github.com/skycoin/dmsg/cmd/dmsgpty-cli/commands" dmsgptyhost "github.com/skycoin/dmsg/cmd/dmsgpty-host/commands" dmsgptyui "github.com/skycoin/dmsg/cmd/dmsgpty-ui/commands" @@ -27,9 +26,8 @@ func init() { dmsgptyCmd, dmsgdisc.RootCmd, dmsgserver.RootCmd, - dmsgget.RootCmd, dmsghttp.RootCmd, - dmsgpost.RootCmd, + dmsgcurl.RootCmd, ) var helpflag bool RootCmd.SetUsageTemplate(help) diff --git a/cmd/dmsgcurl/commands/dmsgcurl.go b/cmd/dmsgcurl/commands/dmsgcurl.go new file mode 100644 index 000000000..b8767c048 --- /dev/null +++ b/cmd/dmsgcurl/commands/dmsgcurl.go @@ -0,0 +1,399 @@ +// Package commands cmd/dmsgcurl/commands/dmsgcurl.go +package commands + +import ( + "context" + "errors" + "fmt" + "io" + "log" + "net/http" + "net/url" + "os" + "path/filepath" + "strings" + "sync/atomic" + "time" + + cc "github.com/ivanpirog/coloredcobra" + "github.com/sirupsen/logrus" + "github.com/skycoin/skywire-utilities/pkg/buildinfo" + "github.com/skycoin/skywire-utilities/pkg/cipher" + "github.com/skycoin/skywire-utilities/pkg/cmdutil" + "github.com/skycoin/skywire-utilities/pkg/logging" + "github.com/skycoin/skywire-utilities/pkg/skyenv" + "github.com/spf13/cobra" + + "github.com/skycoin/dmsg/pkg/disc" + "github.com/skycoin/dmsg/pkg/dmsg" + "github.com/skycoin/dmsg/pkg/dmsghttp" +) + +var ( + dmsgDisc string + dmsgSessions int + dmsgcurlData string + // dmsgcurlHeader string + sk cipher.SecKey + dmsgcurlLog *logging.Logger + dmsgcurlAgent string + logLvl string + dmsgcurlTries int + dmsgcurlWait int + dmsgcurlOutput string + stdout bool +) + +func init() { + RootCmd.Flags().StringVarP(&dmsgDisc, "dmsg-disc", "c", "", "dmsg discovery url default:\n"+skyenv.DmsgDiscAddr) + RootCmd.Flags().IntVarP(&dmsgSessions, "sess", "e", 1, "number of dmsg servers to connect to") + RootCmd.Flags().StringVarP(&logLvl, "loglvl", "l", "", "[ debug | warn | error | fatal | panic | trace | info ]\033[0m") + RootCmd.Flags().StringVarP(&dmsgcurlData, "data", "d", "", "dmsghttp POST data") + // RootCmd.Flags().StringVarP(&dmsgcurlHeader, "header", "H", "", "Pass custom header(s) to server") + RootCmd.Flags().StringVarP(&dmsgcurlOutput, "out", "o", ".", "output filepath") + RootCmd.Flags().BoolVarP(&stdout, "stdout", "n", false, "output to STDOUT") + RootCmd.Flags().IntVarP(&dmsgcurlTries, "try", "t", 1, "download attempts (0 unlimits)") + RootCmd.Flags().IntVarP(&dmsgcurlWait, "wait", "w", 0, "time to wait between fetches") + RootCmd.Flags().StringVarP(&dmsgcurlAgent, "agent", "a", "dmsgcurl/"+buildinfo.Version(), "identify as `AGENT`") + if os.Getenv("DMSGCURL_SK") != "" { + sk.Set(os.Getenv("DMSGCURL_SK")) //nolint + } + RootCmd.Flags().VarP(&sk, "sk", "s", "a random key is generated if unspecified\n\r") + var helpflag bool + RootCmd.SetUsageTemplate(help) + RootCmd.PersistentFlags().BoolVarP(&helpflag, "help", "h", false, "help for dmsgcurl") + RootCmd.SetHelpCommand(&cobra.Command{Hidden: true}) + RootCmd.PersistentFlags().MarkHidden("help") //nolint +} + +// RootCmd containsa the root dmsgcurl command +var RootCmd = &cobra.Command{ + Short: "dmsgcurl", + Long: ` + ┌┬┐┌┬┐┌─┐┌─┐┌─┐┬ ┬┬─┐┬ + │││││└─┐│ ┬│ │ │├┬┘│ + ─┴┘┴ ┴└─┘└─┘└─┘└─┘┴└─┴─┘`, + SilenceErrors: true, + SilenceUsage: true, + DisableSuggestions: true, + DisableFlagsInUseLine: true, + Version: buildinfo.Version(), + PreRun: func(cmd *cobra.Command, args []string) { + if dmsgDisc == "" { + dmsgDisc = skyenv.DmsgDiscAddr + } + }, + RunE: func(cmd *cobra.Command, args []string) error { + if dmsgcurlLog == nil { + dmsgcurlLog = logging.MustGetLogger("dmsgcurl") + } + if logLvl != "" { + if lvl, err := logging.LevelFromString(logLvl); err == nil { + logging.SetLevel(lvl) + } + } + + ctx, cancel := cmdutil.SignalContext(context.Background(), dmsgcurlLog) + defer cancel() + + pk, err := sk.PubKey() + if err != nil { + pk, sk = cipher.GenerateKeyPair() + } + + u, err := parseURL(args) + if err != nil { + dmsgcurlLog.WithError(err).Fatal("failed to parse provided URL") + } + if dmsgcurlData != "" { + dmsgC, closeDmsg, err := startDmsg(ctx, pk, sk) + if err != nil { + dmsgcurlLog.WithError(err).Fatal("failed to start dmsg") + } + defer closeDmsg() + + httpC := http.Client{Transport: dmsghttp.MakeHTTPTransport(ctx, dmsgC)} + + req, err := http.NewRequest(http.MethodPost, u.URL.String(), strings.NewReader(dmsgcurlData)) + if err != nil { + dmsgcurlLog.WithError(err).Fatal("Failed to formulate HTTP request.") + } + req.Header.Set("Content-Type", "text/plain") + + resp, err := httpC.Do(req) + if err != nil { + dmsgcurlLog.WithError(err).Fatal("Failed to execute HTTP request.") + } + + defer func() { + if err := resp.Body.Close(); err != nil { + dmsgcurlLog.WithError(err).Fatal("Failed to close response body") + } + }() + respBody, err := io.ReadAll(resp.Body) + if err != nil { + dmsgcurlLog.WithError(err).Fatal("Failed to read respose body.") + } + fmt.Println(string(respBody)) + } else { + + file, err := parseOutputFile(dmsgcurlOutput, u.URL.Path) + if err != nil { + return fmt.Errorf("failed to prepare output file: %w", err) + } + defer func() { + if fErr := file.Close(); fErr != nil { + dmsgcurlLog.WithError(fErr).Warn("Failed to close output file.") + } + if err != nil { + if rErr := os.RemoveAll(file.Name()); rErr != nil { + dmsgcurlLog.WithError(rErr).Warn("Failed to remove output file.") + } + } + }() + + dmsgC, closeDmsg, err := startDmsg(ctx, pk, sk) + if err != nil { + return fmt.Errorf("failed to start dmsg: %w", err) + } + defer closeDmsg() + + httpC := http.Client{Transport: dmsghttp.MakeHTTPTransport(ctx, dmsgC)} + + for i := 0; i < dmsgcurlTries; i++ { + if !stdout { + dmsgcurlLog.Debugf("Download attempt %d/%d ...", i, dmsgcurlTries) + } + + if _, err := file.Seek(0, 0); err != nil { + return fmt.Errorf("failed to reset file: %w", err) + } + if stdout { + if fErr := file.Close(); fErr != nil { + dmsgcurlLog.WithError(fErr).Warn("Failed to close output file.") + } + if err != nil { + if rErr := os.RemoveAll(file.Name()); rErr != nil { + dmsgcurlLog.WithError(rErr).Warn("Failed to remove output file.") + } + } + file = os.Stdout + } + if err := Download(ctx, dmsgcurlLog, &httpC, file, u.URL.String(), 0); err != nil { + dmsgcurlLog.WithError(err).Error() + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(time.Duration(dmsgcurlWait) * time.Second): + continue + } + } + + // download successful. + return nil + } + + return errors.New("all download attempts failed") + + } + return nil + }, +} + +// URL represents a dmsg http URL. +type URL struct { + dmsg.Addr + url.URL +} + +// Fill fills the internal fields from an URL string. +func (du *URL) fill(str string) error { + u, err := url.Parse(str) + if err != nil { + return err + } + + if u.Scheme == "" { + return errors.New("URL is missing a scheme") + } + + if u.Host == "" { + return errors.New("URL is missing a host") + } + + du.URL = *u + return du.Addr.Set(u.Host) +} + +func parseURL(args []string) (*URL, error) { + if len(args) == 0 { + return nil, errors.New("no URL(s) provided") + } + + if len(args) > 1 { + return nil, errors.New("multiple URLs is not yet supported") + } + + var out URL + if err := out.fill(args[0]); err != nil { + return nil, fmt.Errorf("provided URL is invalid: %w", err) + } + + return &out, nil +} + +func parseOutputFile(name string, urlPath string) (*os.File, error) { + stat, statErr := os.Stat(name) + if statErr != nil { + if os.IsNotExist(statErr) { + f, err := os.Create(name) //nolint + if err != nil { + return nil, err + } + return f, nil + } + return nil, statErr + } + + if stat.IsDir() { + f, err := os.Create(filepath.Join(name, urlPath)) //nolint + if err != nil { + return nil, err + } + return f, nil + } + + return nil, os.ErrExist +} + +func startDmsg(ctx context.Context, pk cipher.PubKey, sk cipher.SecKey) (dmsgC *dmsg.Client, stop func(), err error) { + dmsgC = dmsg.NewClient(pk, sk, disc.NewHTTP(dmsgDisc, &http.Client{}, dmsgcurlLog), &dmsg.Config{MinSessions: dmsgSessions}) + go dmsgC.Serve(context.Background()) + + stop = func() { + err := dmsgC.Close() + dmsgcurlLog.WithError(err).Debug("Disconnected from dmsg network.") + fmt.Printf("\n") + } + dmsgcurlLog.WithField("public_key", pk.String()).WithField("dmsg_disc", dmsgDisc). + Debug("Connecting to dmsg network...") + + select { + case <-ctx.Done(): + stop() + return nil, nil, ctx.Err() + + case <-dmsgC.Ready(): + dmsgcurlLog.Debug("Dmsg network ready.") + return dmsgC, stop, nil + } +} + +// Download downloads a file from the given URL into 'w'. +func Download(ctx context.Context, log logrus.FieldLogger, httpC *http.Client, w io.Writer, urlStr string, maxSize int64) error { + req, err := http.NewRequest(http.MethodGet, urlStr, nil) + if err != nil { + log.WithError(err).Fatal("Failed to formulate HTTP request.") + } + resp, err := httpC.Do(req) + if err != nil { + return fmt.Errorf("failed to connect to HTTP server: %w", err) + } + if maxSize > 0 { + if resp.ContentLength > maxSize*1024 { + return fmt.Errorf("requested file size is more than allowed size: %d KB > %d KB", (resp.ContentLength / 1024), maxSize) + } + } + n, err := CancellableCopy(ctx, w, resp.Body, resp.ContentLength) + if err != nil { + return fmt.Errorf("download failed at %d/%dB: %w", n, resp.ContentLength, err) + } + defer func() { + if err := resp.Body.Close(); err != nil { + log.WithError(err).Warn("HTTP Response body closed with non-nil error.") + } + }() + + return nil +} + +type readerFunc func(p []byte) (n int, err error) + +func (rf readerFunc) Read(p []byte) (n int, err error) { return rf(p) } + +// CancellableCopy will call the Reader and Writer interface multiple time, in order +// to copy by chunk (avoiding loading the whole file in memory). +func CancellableCopy(ctx context.Context, w io.Writer, body io.ReadCloser, length int64) (int64, error) { + + n, err := io.Copy(io.MultiWriter(w, &ProgressWriter{Total: length}), readerFunc(func(p []byte) (int, error) { + + // golang non-blocking channel: https://gobyexample.com/non-blocking-channel-operations + select { + + // if context has been canceled + case <-ctx.Done(): + // stop process and propagate "Download Canceled" error + return 0, errors.New("Download Canceled") + default: + // otherwise just run default io.Reader implementation + return body.Read(p) + } + })) + return n, err +} + +// ProgressWriter prints the progress of a download to stdout. +type ProgressWriter struct { + // atomic requires 64-bit alignment for struct field access + Current int64 + Total int64 +} + +// Write implements io.Writer +func (pw *ProgressWriter) Write(p []byte) (int, error) { + n := len(p) + + current := atomic.AddInt64(&pw.Current, int64(n)) + total := atomic.LoadInt64(&pw.Total) + pc := fmt.Sprintf("%d%%", current*100/total) + if !stdout { + fmt.Printf("Downloading: %d/%dB (%s)", current, total, pc) + if current != total { + fmt.Print("\r") + } else { + fmt.Print("\n") + } + } + + return n, nil +} + +// Execute executes root CLI command. +func Execute() { + cc.Init(&cc.Config{ + RootCmd: RootCmd, + Headings: cc.HiBlue + cc.Bold, //+ cc.Underline, + Commands: cc.HiBlue + cc.Bold, + CmdShortDescr: cc.HiBlue, + Example: cc.HiBlue + cc.Italic, + ExecName: cc.HiBlue + cc.Bold, + Flags: cc.HiBlue + cc.Bold, + //FlagsDataType: cc.HiBlue, + FlagsDescr: cc.HiBlue, + NoExtraNewlines: true, + NoBottomNewline: true, + }) + if err := RootCmd.Execute(); err != nil { + log.Fatal("Failed to execute command: ", err) + } +} + +const help = "Usage:\r\n" + + " {{.UseLine}}{{if .HasAvailableSubCommands}}{{end}} {{if gt (len .Aliases) 0}}\r\n\r\n" + + "{{.NameAndAliases}}{{end}}{{if .HasAvailableSubCommands}}\r\n\r\n" + + "Available Commands:{{range .Commands}}{{if (or .IsAvailableCommand)}}\r\n " + + "{{rpad .Name .NamePadding }} {{.Short}}{{end}}{{end}}{{end}}{{if .HasAvailableLocalFlags}}\r\n\r\n" + + "Flags:\r\n" + + "{{.LocalFlags.FlagUsages | trimTrailingWhitespaces}}{{end}}{{if .HasAvailableInheritedFlags}}\r\n\r\n" + + "Global Flags:\r\n" + + "{{.InheritedFlags.FlagUsages | trimTrailingWhitespaces}}{{end}}\r\n\r\n" diff --git a/cmd/dmsgcurl/dmsgcurl.go b/cmd/dmsgcurl/dmsgcurl.go new file mode 100644 index 000000000..8b308a694 --- /dev/null +++ b/cmd/dmsgcurl/dmsgcurl.go @@ -0,0 +1,8 @@ +// package main cmd/dmsgcurl/dmsgcurl.go +package main + +import "github.com/skycoin/dmsg/cmd/dmsgcurl/commands" + +func main() { + commands.Execute() +} diff --git a/docs/dmsgcurl.md b/docs/dmsgcurl.md new file mode 100644 index 000000000..30143b537 --- /dev/null +++ b/docs/dmsgcurl.md @@ -0,0 +1,65 @@ +# Dmsgcurl + +`dmsgcurl` is a utility exec which can download/upload from HTTP servers hosted over the `dmsg` network (similar to a simplified `curl` over `dmsg`). + +``` +$ dmsgcurl --help + + Skycoin dmsgcurl v0.1.0, wget over dmsg. + Usage: dmsgcurl [OPTION]... [URL] + + -O FILE + write documents to FILE (default ".") + -U AGENT + identify as AGENT (default "dmsgcurl/v0.1.0") + -dmsg-disc URL + dmsg discovery URL (default "http://dmsgd.skywire.skycoin.com") + -dmsg-sessions NUMBER + connect to NUMBER of dmsg servers (default 1) + -h + -help + print this help + -t NUMBER + set number of retries to NUMBER (0 unlimits) (default 1) + -w SECONDS + wait SECONDS between retrievals +``` + +### Example usage + +In this example, we will use the `dmsg` network where the `dmsg.Discovery` address is `http://dmsgd.skywire.skycoin.com`. However, any `dmsg.Discovery` would work. + +First, lets create a folder where we will host files to serve over `dmsg` and create a `hello.txt` file within. + +```shell script +// Create serving folder. +$ mkdir /tmp/dmsghttp -p + +// Create file. +$ echo 'Hello World!' > /tmp/dmsghttp/hello.txt +``` + +Next, let's serve this over `http` via `dmsg` as transport. We have an example exec for this located within `/example/dmsgget/dmsg-example-http-server`. + +```shell script +# Generate public/private key pair +$ go run ./examples/dmsgget/gen-keys/gen-keys.go +# PK: 038dde2d050803db59e2ad19e5a6db0f58f8419709fc65041c48b0cb209bb7a851 +# SK: e5740e093bd472c2730b0a58944a5dee220d415de62acf45d1c559f56eea2b2d + +# Run dmsg http server. +# (replace 'e5740e093bd472c2730b0a58944a5dee220d415de62acf45d1c559f56eea2b2d' with the SK returned from above command) +$ go run ./examples/dmsgget/dmsg-example-http-server/dmsg-example-http-server.go --dir /tmp/dmsghttp --sk e5740e093bd472c2730b0a58944a5dee220d415de62acf45d1c559f56eea2b2d +``` + +Now we can use `dmsgcurl` to download the hosted file. Open a new terminal and run the following. + +```shell script +# Replace '038dde2d050803db59e2ad19e5a6db0f58f8419709fc65041c48b0cb209bb7a851' with the generated PK. +$ dmsgcurl dmsg://038dde2d050803db59e2ad19e5a6db0f58f8419709fc65041c48b0cb209bb7a851:80/hello.txt + +# Check downloaded file. +$ cat hello.txt +# Hello World! +``` + diff --git a/pkg/dmsgcurl/dmsgcurl.go b/pkg/dmsgcurl/dmsgcurl.go new file mode 100644 index 000000000..20bf02f44 --- /dev/null +++ b/pkg/dmsgcurl/dmsgcurl.go @@ -0,0 +1,270 @@ +// Package dmsgcurl pkg/dmsgcurl/dmsgcurl.go +package dmsgcurl + +import ( + "context" + "errors" + "flag" + "fmt" + "io" + "net/http" + "os" + "path/filepath" + "time" + + jsoniter "github.com/json-iterator/go" + "github.com/sirupsen/logrus" + "github.com/skycoin/skywire-utilities/pkg/cipher" + "github.com/skycoin/skywire-utilities/pkg/logging" + + "github.com/skycoin/dmsg/pkg/disc" + dmsg "github.com/skycoin/dmsg/pkg/dmsg" + "github.com/skycoin/dmsg/pkg/dmsghttp" +) + +var json = jsoniter.ConfigFastest + +// DmsgCurl contains the logic for dmsgcurl (curl over dmsg). +type DmsgCurl struct { + startF startupFlags + dmsgF dmsgFlags + dlF downloadFlags + httpF httpFlags + fs *flag.FlagSet +} + +// New creates a new DmsgCurl instance. +func New(fs *flag.FlagSet) *DmsgCurl { + dg := &DmsgCurl{fs: fs} + + for _, fg := range dg.flagGroups() { + fg.Init(fs) + } + + w := fs.Output() + flag.Usage = func() { + _, _ = fmt.Fprintf(w, "Skycoin %s %s, wget over dmsg.\n", ExecName, Version) + _, _ = fmt.Fprintf(w, "Usage: %s [OPTION]... [URL]\n\n", ExecName) + flag.PrintDefaults() + _, _ = fmt.Fprintln(w, "") + } + + return dg +} + +// String implements io.Stringer +func (dg *DmsgCurl) String() string { + m := make(map[string]interface{}) + for _, fg := range dg.flagGroups() { + m[fg.Name()] = fg + } + j, err := json.Marshal(m) + if err != nil { + panic(err) + } + return string(j) +} + +func (dg *DmsgCurl) flagGroups() []FlagGroup { + return []FlagGroup{&dg.startF, &dg.dmsgF, &dg.dlF, &dg.httpF} +} + +// Run runs the download logic. +func (dg *DmsgCurl) Run(ctx context.Context, log *logging.Logger, skStr string, args []string) (err error) { + if log == nil { + log = logging.MustGetLogger("dmsgcurl") + } + + if dg.startF.Help { + dg.fs.Usage() + return nil + } + + pk, sk, err := parseKeyPair(skStr) + if err != nil { + return fmt.Errorf("failed to parse provided key pair: %w", err) + } + + u, err := parseURL(args) + if err != nil { + return fmt.Errorf("failed to parse provided URL: %w", err) + } + + file, err := parseOutputFile(dg.dlF.Output, u.URL.Path) + if err != nil { + return fmt.Errorf("failed to prepare output file: %w", err) + } + defer func() { + if fErr := file.Close(); fErr != nil { + log.WithError(fErr).Warn("Failed to close output file.") + } + if err != nil { + if rErr := os.RemoveAll(file.Name()); rErr != nil { + log.WithError(rErr).Warn("Failed to remove output file.") + } + } + }() + + dmsgC, closeDmsg, err := dg.StartDmsg(ctx, log, pk, sk) + if err != nil { + return fmt.Errorf("failed to start dmsg: %w", err) + } + defer closeDmsg() + + httpC := http.Client{Transport: dmsghttp.MakeHTTPTransport(ctx, dmsgC)} + + for i := 0; i < dg.dlF.Tries; i++ { + log.Infof("Download attempt %d/%d ...", i, dg.dlF.Tries) + + if _, err := file.Seek(0, 0); err != nil { + return fmt.Errorf("failed to reset file: %w", err) + } + + if err := Download(ctx, log, &httpC, file, u.URL.String(), 0); err != nil { + log.WithError(err).Error() + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(time.Duration(dg.dlF.Wait) * time.Second): + continue + } + } + + // download successful. + return nil + } + + return errors.New("all download attempts failed") +} + +func parseKeyPair(skStr string) (pk cipher.PubKey, sk cipher.SecKey, err error) { + if skStr == "" { + pk, sk = cipher.GenerateKeyPair() + return + } + + if err = sk.Set(skStr); err != nil { + return + } + + pk, err = sk.PubKey() + return +} + +func parseURL(args []string) (*URL, error) { + if len(args) == 0 { + return nil, ErrNoURLs + } + + if len(args) > 1 { + return nil, ErrMultipleURLsNotSupported + } + + var out URL + if err := out.Fill(args[0]); err != nil { + return nil, fmt.Errorf("provided URL is invalid: %w", err) + } + + return &out, nil +} + +func parseOutputFile(name string, urlPath string) (*os.File, error) { + stat, statErr := os.Stat(name) + if statErr != nil { + if os.IsNotExist(statErr) { + f, err := os.Create(name) //nolint + if err != nil { + return nil, err + } + return f, nil + } + return nil, statErr + } + + if stat.IsDir() { + f, err := os.Create(filepath.Join(name, urlPath)) //nolint + if err != nil { + return nil, err + } + return f, nil + } + + return nil, os.ErrExist +} + +// StartDmsg create dsmg client instance +func (dg *DmsgCurl) StartDmsg(ctx context.Context, log *logging.Logger, pk cipher.PubKey, sk cipher.SecKey) (dmsgC *dmsg.Client, stop func(), err error) { + dmsgC = dmsg.NewClient(pk, sk, disc.NewHTTP(dg.dmsgF.Disc, &http.Client{}, log), &dmsg.Config{MinSessions: dg.dmsgF.Sessions}) + go dmsgC.Serve(context.Background()) + + stop = func() { + err := dmsgC.Close() + log.WithError(err).Info("Disconnected from dmsg network.") + } + + log.WithField("public_key", pk.String()).WithField("dmsg_disc", dg.dmsgF.Disc). + Info("Connecting to dmsg network...") + + select { + case <-ctx.Done(): + stop() + return nil, nil, ctx.Err() + + case <-dmsgC.Ready(): + log.Info("Dmsg network ready.") + return dmsgC, stop, nil + } +} + +// Download downloads a file from the given URL into 'w'. +func Download(ctx context.Context, log logrus.FieldLogger, httpC *http.Client, w io.Writer, urlStr string, maxSize int64) error { + req, err := http.NewRequest(http.MethodGet, urlStr, nil) + if err != nil { + log.WithError(err).Fatal("Failed to formulate HTTP request.") + } + resp, err := httpC.Do(req) + if err != nil { + return fmt.Errorf("failed to connect to HTTP server: %w", err) + } + if maxSize > 0 { + if resp.ContentLength > maxSize*1024 { + return fmt.Errorf("requested file size is more than allowed size: %d KB > %d KB", (resp.ContentLength / 1024), maxSize) + } + } + n, err := CancellableCopy(ctx, w, resp.Body, resp.ContentLength) + if err != nil { + return fmt.Errorf("download failed at %d/%dB: %w", n, resp.ContentLength, err) + } + defer func() { + if err := resp.Body.Close(); err != nil { + log.WithError(err).Warn("HTTP Response body closed with non-nil error.") + } + }() + + return nil +} + +type readerFunc func(p []byte) (n int, err error) + +func (rf readerFunc) Read(p []byte) (n int, err error) { return rf(p) } + +// CancellableCopy will call the Reader and Writer interface multiple time, in order +// to copy by chunk (avoiding loading the whole file in memory). +func CancellableCopy(ctx context.Context, w io.Writer, body io.ReadCloser, length int64) (int64, error) { + + n, err := io.Copy(io.MultiWriter(w, &ProgressWriter{Total: length}), readerFunc(func(p []byte) (int, error) { + + // golang non-blocking channel: https://gobyexample.com/non-blocking-channel-operations + select { + + // if context has been canceled + case <-ctx.Done(): + // stop process and propagate "Download Canceled" error + return 0, errors.New("Download Canceled") + default: + // otherwise just run default io.Reader implementation + return body.Read(p) + } + })) + return n, err +} diff --git a/pkg/dmsgcurl/dmsgcurl_test.go b/pkg/dmsgcurl/dmsgcurl_test.go new file mode 100644 index 000000000..c46a570a0 --- /dev/null +++ b/pkg/dmsgcurl/dmsgcurl_test.go @@ -0,0 +1,186 @@ +// Package dmsgcurl pkg/dmsgcurl/dmsgcurl_test.go +package dmsgcurl + +import ( + "context" + "fmt" + "net/http" + "os" + "path/filepath" + "testing" + "time" + + "github.com/go-chi/chi/v5" + "github.com/skycoin/skywire-utilities/pkg/cipher" + "github.com/skycoin/skywire-utilities/pkg/cmdutil" + "github.com/skycoin/skywire-utilities/pkg/logging" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/net/nettest" + + "github.com/skycoin/dmsg/pkg/disc" + dmsg "github.com/skycoin/dmsg/pkg/dmsg" + "github.com/skycoin/dmsg/pkg/dmsghttp" +) + +const ( + nSrvs = 2 + maxSessions = 100 +) + +// Serve a HTTP server over dmsg, and have multiple clients download a document simultaneously. +// Arrange: +// - Typical dmsg environment. +// - Dmsg client that serves a HTTP server. +// Act: +// - Start multiple dmsg clients that download from the HTTP server. +// Assert: +// - Ensure the downloads all succeed. +// - Ensure the downloaded data (of all downloads) is the same as the original document. +func TestDownload(t *testing.T) { + const ( + fileSize = 64 + dlClients = 2 // number of clients to download from HTTP server. + ) + + // Arrange: Prepare file to be downloaded. + srcData := cipher.RandByte(fileSize) + src := makeFile(t, srcData) + + // Arrange: Start dmsg environment. + dc := startDmsgEnv(t, nSrvs, maxSessions) + + // Arrange: Start dmsg client that serves a http server which hosts the src file. + hsAddr := runHTTPSrv(t, dc, src.Name()) + // Arrange: Download results (dst files and client errors). + dsts := make([]*os.File, dlClients) + errs := make([]chan error, dlClients) + for i := range dsts { + dsts[i] = makeFile(t, nil) + errs[i] = make(chan error, 1) + } + + // Act: Download + for i := 0; i < dlClients; i++ { + func(i int) { + log := logging.MustGetLogger(fmt.Sprintf("dl_client_%d", i)) + ctx, cancel := cmdutil.SignalContext(context.Background(), log) + defer cancel() + err := Download(ctx, log, newHTTPClient(t, dc), dsts[i], hsAddr, fileSize) + + errs[i] <- err + close(errs[i]) + }(i) + } + + // Assert: Ensure download finishes without error and downloaded file is the same as src. + for i := 0; i < dlClients; i++ { + assert.NoError(t, <-errs[i]) + + dstData, err := os.ReadFile(dsts[i].Name()) + assert.NoErrorf(t, err, "[%d] failed to read destination file", i) + assert.Equalf(t, srcData, dstData, "[%d] destination file data is not equal", i) + } +} + +func makeFile(t *testing.T, data []byte) *os.File { + f, err := os.CreateTemp(os.TempDir(), "dmsgcurl_test_file_*") + require.NoError(t, err) + + t.Cleanup(func() { + assert.NoError(t, f.Close()) + assert.NoError(t, os.Remove(f.Name())) + }) + + if data != nil { + n, err := f.Write(data) + require.NoError(t, err) + require.Len(t, data, n) + } + + return f +} + +func startDmsgEnv(t *testing.T, nSrvs, maxSessions int) disc.APIClient { + dc := disc.NewMock(0) + + for i := 0; i < nSrvs; i++ { + pk, sk := cipher.GenerateKeyPair() + + conf := dmsg.ServerConfig{ + MaxSessions: maxSessions, + UpdateInterval: 0, + } + srv := dmsg.NewServer(pk, sk, dc, &conf, nil) + srv.SetLogger(logging.MustGetLogger(fmt.Sprintf("server_%d", i))) + + lis, err := nettest.NewLocalListener("tcp") + require.NoError(t, err) + + errCh := make(chan error, 1) + go func() { + errCh <- srv.Serve(lis, "") + close(errCh) + }() + + t.Cleanup(func() { + // listener is also closed when dmsg server is closed + assert.NoError(t, srv.Close()) + assert.NoError(t, <-errCh) + }) + } + + return dc +} + +func runHTTPSrv(t *testing.T, dc disc.APIClient, fName string) string { + pk, sk := cipher.GenerateKeyPair() + httpPath := filepath.Base(fName) + + dmsgC := dmsg.NewClient(pk, sk, dc, nil) + go dmsgC.Serve(context.Background()) + t.Cleanup(func() { assert.NoError(t, dmsgC.Close()) }) + <-dmsgC.Ready() + + r := chi.NewRouter() + r.HandleFunc("/"+httpPath, func(w http.ResponseWriter, r *http.Request) { + http.ServeFile(w, r, fName) + }) + + lis, err := dmsgC.Listen(80) + require.NoError(t, err) + + errCh := make(chan error, 1) + srv := &http.Server{ + ReadTimeout: 3 * time.Second, + WriteTimeout: 3 * time.Second, + IdleTimeout: 30 * time.Second, + ReadHeaderTimeout: 3 * time.Second, + Handler: r, + } + go func() { + errCh <- srv.Serve(lis) + close(errCh) + }() + + t.Cleanup(func() { + assert.NoError(t, lis.Close()) + assert.EqualError(t, <-errCh, dmsg.ErrEntityClosed.Error()) + }) + + return fmt.Sprintf("http://%s/%s", pk.String(), httpPath) +} + +func newHTTPClient(t *testing.T, dc disc.APIClient) *http.Client { + pk, sk := cipher.GenerateKeyPair() + + dmsgC := dmsg.NewClient(pk, sk, dc, nil) + go dmsgC.Serve(context.Background()) + t.Cleanup(func() { assert.NoError(t, dmsgC.Close()) }) + <-dmsgC.Ready() + + log := logging.MustGetLogger("http_client") + ctx, cancel := cmdutil.SignalContext(context.Background(), log) + defer cancel() + return &http.Client{Transport: dmsghttp.MakeHTTPTransport(ctx, dmsgC)} +} diff --git a/pkg/dmsgcurl/flags.go b/pkg/dmsgcurl/flags.go new file mode 100644 index 000000000..e763a0599 --- /dev/null +++ b/pkg/dmsgcurl/flags.go @@ -0,0 +1,67 @@ +// Package dmsgcurl pkg/dmsgcurl/flags.go +package dmsgcurl + +import ( + "flag" + + "github.com/skycoin/skywire-utilities/pkg/buildinfo" +) + +// ExecName contains the execution name. +const ExecName = "dmsgcurl" + +// Version contains the version string. +var Version = buildinfo.Version() + +// FlagGroup represents a group of flags. +type FlagGroup interface { + Name() string + Init(fs *flag.FlagSet) +} + +type startupFlags struct { + Help bool +} + +func (f *startupFlags) Name() string { return "Startup" } + +func (f *startupFlags) Init(fs *flag.FlagSet) { + fs.BoolVar(&f.Help, "help", false, "print this help") + fs.BoolVar(&f.Help, "h", false, "") +} + +type dmsgFlags struct { + Disc string + Sessions int +} + +func (f *dmsgFlags) Name() string { return "Dmsg" } + +func (f *dmsgFlags) Init(fs *flag.FlagSet) { + fs.StringVar(&f.Disc, "dmsg-disc", "http://dmsgd.skywire.skycoin.com", "dmsg discovery `URL`") + fs.IntVar(&f.Sessions, "dmsg-sessions", 1, "connect to `NUMBER` of dmsg servers") +} + +type downloadFlags struct { + Output string + Tries int + Wait int +} + +func (f *downloadFlags) Name() string { return "Download" } + +func (f *downloadFlags) Init(fs *flag.FlagSet) { + fs.StringVar(&f.Output, "O", ".", "write documents to `FILE`") + fs.IntVar(&f.Tries, "t", 1, "set number of retries to `NUMBER` (0 unlimits)") + fs.IntVar(&f.Wait, "w", 0, "wait `SECONDS` between retrievals") +} + +type httpFlags struct { + UserAgent string +} + +func (f *httpFlags) Name() string { return "HTTP" } + +func (f *httpFlags) Init(fs *flag.FlagSet) { + fs.StringVar(&f.UserAgent, "U", ExecName+"/"+Version, "identify as `AGENT`") +} diff --git a/pkg/dmsgcurl/progress_writer.go b/pkg/dmsgcurl/progress_writer.go new file mode 100644 index 000000000..e104337a5 --- /dev/null +++ b/pkg/dmsgcurl/progress_writer.go @@ -0,0 +1,32 @@ +// Package dmsgcurl pkg/dmsgcurl/progress_writer.go +package dmsgcurl + +import ( + "fmt" + "sync/atomic" +) + +// ProgressWriter prints the progress of a download to stdout. +type ProgressWriter struct { + // atomic requires 64-bit alignment for struct field access + Current int64 + Total int64 +} + +// Write implements io.Writer +func (pw *ProgressWriter) Write(p []byte) (int, error) { + n := len(p) + + current := atomic.AddInt64(&pw.Current, int64(n)) + total := atomic.LoadInt64(&pw.Total) + pc := fmt.Sprintf("%d%%", current*100/total) + + fmt.Printf("Downloading: %d/%dB (%s)", current, total, pc) + if current != total { + fmt.Print("\r") + } else { + fmt.Print("\n") + } + + return n, nil +} diff --git a/pkg/dmsgcurl/url.go b/pkg/dmsgcurl/url.go new file mode 100644 index 000000000..84895e9e8 --- /dev/null +++ b/pkg/dmsgcurl/url.go @@ -0,0 +1,40 @@ +// Package dmsgcurl pkg/dmsgcurl/url.go +package dmsgcurl + +import ( + "errors" + "net/url" + + dmsg "github.com/skycoin/dmsg/pkg/dmsg" +) + +// Errors related to URLs. +var ( + ErrNoURLs = errors.New("no URLs provided") + ErrMultipleURLsNotSupported = errors.New("multiple URLs is not yet supported") +) + +// URL represents a dmsg http URL. +type URL struct { + dmsg.Addr + url.URL +} + +// Fill fills the internal fields from an URL string. +func (du *URL) Fill(str string) error { + u, err := url.Parse(str) + if err != nil { + return err + } + + if u.Scheme == "" { + return errors.New("URL is missing a scheme") + } + + if u.Host == "" { + return errors.New("URL is missing a host") + } + + du.URL = *u + return du.Addr.Set(u.Host) +} From db24d156fcac9b155699735807e02079843016b8 Mon Sep 17 00:00:00 2001 From: MohammadReza Palide Date: Tue, 17 Oct 2023 02:14:46 +0000 Subject: [PATCH 3/4] modifying darwin goreleaser --- .goreleaser-darwin.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.goreleaser-darwin.yml b/.goreleaser-darwin.yml index 36b548124..7c06a6c87 100644 --- a/.goreleaser-darwin.yml +++ b/.goreleaser-darwin.yml @@ -40,8 +40,8 @@ builds: main: ./cmd/dmsg-server/ ldflags: -s -w -X github.com/skycoin/skywire-utilities/pkg/buildinfo.version=v{{.Version}} -X github.com/skycoin/skywire-utilities/pkg/buildinfo.commit={{.ShortCommit}} -X github.com/skycoin/skywire-utilities/pkg/buildinfo.date={{.Date}} - - id: dmsgget - binary: dmsgget + - id: dmsgcurl + binary: dmsgcurl goos: - darwin goarch: @@ -49,7 +49,7 @@ builds: - amd64 env: - CGO_ENABLED=0 - main: ./cmd/dmsgget/ + main: ./cmd/dmsgcurl/ ldflags: -s -w -X github.com/skycoin/skywire-utilities/pkg/buildinfo.version=v{{.Version}} -X github.com/skycoin/skywire-utilities/pkg/buildinfo.commit={{.ShortCommit}} -X github.com/skycoin/skywire-utilities/pkg/buildinfo.date={{.Date}} - id: dmsgpty-ui @@ -98,7 +98,7 @@ archives: - dmsg-server - dmsgpty-ui - dmsgpty-host - - dmsgget + - dmsgcurl - dmsgpty-cli allow_different_binary_count: true From 9026d5196d22924b5a635516892a29369ad68891 Mon Sep 17 00:00:00 2001 From: MohammadReza Palide Date: Tue, 17 Oct 2023 03:05:08 +0000 Subject: [PATCH 4/4] update README.md | add usage description --- cmd/dmsgcurl/commands/dmsgcurl.go | 3 ++- docs/dmsgcurl.md | 39 +++++++++++++++++-------------- 2 files changed, 23 insertions(+), 19 deletions(-) diff --git a/cmd/dmsgcurl/commands/dmsgcurl.go b/cmd/dmsgcurl/commands/dmsgcurl.go index b8767c048..56780ab92 100644 --- a/cmd/dmsgcurl/commands/dmsgcurl.go +++ b/cmd/dmsgcurl/commands/dmsgcurl.go @@ -69,10 +69,11 @@ func init() { // RootCmd containsa the root dmsgcurl command var RootCmd = &cobra.Command{ Short: "dmsgcurl", + Use: "dmsgcurl [OPTIONS] ... [URL]", Long: ` ┌┬┐┌┬┐┌─┐┌─┐┌─┐┬ ┬┬─┐┬ │││││└─┐│ ┬│ │ │├┬┘│ - ─┴┘┴ ┴└─┘└─┘└─┘└─┘┴└─┴─┘`, + ─┴┘┴ ┴└─┘└─┘└─┘└─┘┴└─┴─┘`, SilenceErrors: true, SilenceUsage: true, DisableSuggestions: true, diff --git a/docs/dmsgcurl.md b/docs/dmsgcurl.md index 30143b537..5176b4cd6 100644 --- a/docs/dmsgcurl.md +++ b/docs/dmsgcurl.md @@ -4,25 +4,27 @@ ``` $ dmsgcurl --help + ┌┬┐┌┬┐┌─┐┌─┐┌─┐┬ ┬┬─┐┬ + │││││└─┐│ ┬│ │ │├┬┘│ + ─┴┘┴ ┴└─┘└─┘└─┘└─┘┴└─┴─┘ - Skycoin dmsgcurl v0.1.0, wget over dmsg. - Usage: dmsgcurl [OPTION]... [URL] - - -O FILE - write documents to FILE (default ".") - -U AGENT - identify as AGENT (default "dmsgcurl/v0.1.0") - -dmsg-disc URL - dmsg discovery URL (default "http://dmsgd.skywire.skycoin.com") - -dmsg-sessions NUMBER - connect to NUMBER of dmsg servers (default 1) - -h - -help - print this help - -t NUMBER - set number of retries to NUMBER (0 unlimits) (default 1) - -w SECONDS - wait SECONDS between retrievals + Usage: + dmsgcurl [OPTIONS] ... [URL] + + Flags: + -a, --agent AGENT identify as AGENT (default "dmsgcurl/v1.2.0-184-gdb24d156") + -d, --data string dmsghttp POST data + -c, --dmsg-disc string dmsg discovery url default: + http://dmsgd.skywire.skycoin.com + -l, --loglvl string [ debug | warn | error | fatal | panic | trace | info ] + -o, --out string output filepath (default ".") + -e, --sess int number of dmsg servers to connect to (default 1) + -s, --sk cipher.SecKey a random key is generated if unspecified + (default 0000000000000000000000000000000000000000000000000000000000000000) + -n, --stdout output to STDOUT + -t, --try int download attempts (0 unlimits) (default 1) + -v, --version version for dmsgcurl + -w, --wait int time to wait between fetches ``` ### Example usage @@ -63,3 +65,4 @@ $ cat hello.txt # Hello World! ``` +Note: If you set `-d` or `--data` flag, then curl work as post method (upload), and if not then work as get method (download).