Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix transport bandwidth logs #1653

Merged
merged 5 commits into from
Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
230 changes: 175 additions & 55 deletions cmd/skywire-cli/commands/log/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ import (
"net/http"
"os"
"sync"
"sync/atomic"
"time"

"github.com/hashicorp/go-version"
"github.com/sirupsen/logrus"
"github.com/skycoin/dmsg/pkg/dmsgget"
"github.com/skycoin/dmsg/pkg/dmsghttp"
"github.com/spf13/cobra"
Expand All @@ -38,16 +40,22 @@ var (
logOnly bool
surveyOnly bool
deleteOnErrors bool
fetchFile string
fetchFrom string
writeDir string
)

func init() {
logCmd.Flags().SortFlags = false
logCmd.Flags().StringVarP(&env, "env", "e", "prod", "deployment to get uptimes from")
logCmd.Flags().BoolVarP(&logOnly, "log", "l", false, "fetch only transport logs")
logCmd.Flags().BoolVarP(&surveyOnly, "survey", "v", false, "fetch only surveys")
logCmd.Flags().StringVarP(&fetchFile, "file", "f", "", "fetch only a specific file from all online visors")
logCmd.Flags().StringVarP(&fetchFrom, "pks", "k", "", "fetch only from specific public keys ; semicolon separated")
0pcom marked this conversation as resolved.
Show resolved Hide resolved
logCmd.Flags().StringVarP(&writeDir, "dir", "d", "log_collecting", "save files to specified dir")
logCmd.Flags().BoolVarP(&deleteOnErrors, "clean", "c", false, "delete files and folders on errors")
logCmd.Flags().StringVar(&minv, "minv", "v1.3.4", "minimum visor version to fetch from")
logCmd.Flags().IntVarP(&duration, "duration", "n", 0, "numberof days before today to fetch transport logs for")
logCmd.Flags().StringVar(&minv, "minv", "v1.3.8", "minimum visor version to fetch from")
logCmd.Flags().IntVarP(&duration, "duration", "n", 0, "number of days before today to fetch transport logs for")
logCmd.Flags().BoolVar(&allVisors, "all", false, "consider all visors ; no version filtering")
logCmd.Flags().IntVar(&batchSize, "batchSize", 50, "number of visor in each batch")
logCmd.Flags().Int64Var(&maxFileSize, "maxfilesize", 30, "maximum file size allowed to download during collecting logs, in KB")
Expand All @@ -59,13 +67,13 @@ func init() {
logCmd.Flags().VarP(&sk, "sk", "s", "a random key is generated if unspecified\n\r")
}

// RootCmd is surveyCmd
// RootCmd is logCmd
var RootCmd = logCmd

var logCmd = &cobra.Command{
Use: "log",
Short: "survey & transport log collection",
Long: "collect surveys and transport logging from visors which are online in the uptime tracker",
Long: "Fetch health, survey, and transport logging from visors which are online in the uptime tracker\nhttp://ut.skywire.skycoin.com/uptimes?v=v2\nhttp://ut.skywire.skycoin.com/uptimes?v=v2&visors=<pk1>;<pk2>;<pk3>",
Run: func(cmd *cobra.Command, args []string) {
log := logging.MustGetLogger("log-collecting")
if logOnly && surveyOnly {
Expand All @@ -81,15 +89,15 @@ var logCmd = &cobra.Command{
}()

// Preparing directories
if _, err := os.ReadDir("log_collecting"); err != nil {
if err := os.Mkdir("log_collecting", 0750); err != nil {
log.Error("Unable to log_collecting directory")
if _, err := os.ReadDir(writeDir); err != nil {
if err := os.Mkdir(writeDir, 0750); err != nil {
log.Error("Unable to create directory " + writeDir)
return
}
}

if err := os.Chdir("log_collecting"); err != nil {
log.Error("Unable to change directory to log_collecting")
if err := os.Chdir(writeDir); err != nil {
log.Error("Unable to change directory to " + writeDir)
return
}

Expand All @@ -98,13 +106,19 @@ var logCmd = &cobra.Command{
flag.Parse()

// Set the uptime tracker to fetch data from
endpoint := skyenv.UptimeTrackerAddr + "/uptimes?v=v2"
endpoint := skyenv.UptimeTrackerAddr
if env == "test" {
endpoint = skyenv.TestUptimeTrackerAddr + "/uptimes?v=v2"
endpoint = skyenv.TestUptimeTrackerAddr
}
endpoint = endpoint + "/uptimes?v=v2"
if utAddr != "" {
endpoint = utAddr
}

if fetchFrom != "" {
endpoint = endpoint + "&visors=" + fetchFrom
}

//Fetch the uptime data over http
uptimes, err := getUptimes(endpoint, log)
if err != nil {
Expand Down Expand Up @@ -141,56 +155,74 @@ var logCmd = &cobra.Command{
// Get visors data
var wg sync.WaitGroup
for _, v := range uptimes {
//only attempt to fetch from online visors
if v.Online {
if fetchFile == "" {
visorVersion, err := version.NewVersion(v.Version) //nolint
if err != nil {
log.Warnf("The version %s for visor %s is not valid", v.Version, v.PubKey)
continue
}
if !allVisors && visorVersion.LessThan(minimumVersion) {
log.Warnf("The version %s for visor %s does not satisfy our minimum version condition", v.Version, v.PubKey)
continue
}
wg.Add(1)
go func(key string, wg *sync.WaitGroup) {
httpC := http.Client{Transport: dmsghttp.MakeHTTPTransport(ctx, dmsgC), Timeout: 10 * time.Second}
defer httpC.CloseIdleConnections()
defer wg.Done()

visorVersion, err := version.NewVersion(v.Version) //nolint
if err != nil {
log.Warnf("The version %s for visor %s is not valid", v.Version, v.PubKey)
continue
}
if !allVisors && visorVersion.LessThan(minimumVersion) {
log.Warnf("The version %s for visor %s does not satisfy our minimum version condition", v.Version, v.PubKey)
continue
}
wg.Add(1)
go func(key string, wg *sync.WaitGroup) {
httpC := http.Client{Transport: dmsghttp.MakeHTTPTransport(ctx, dmsgC), Timeout: 10 * time.Second}
defer httpC.CloseIdleConnections()
defer wg.Done()

deleteOnError := false
if _, err := os.ReadDir(key); err != nil {
if err := os.Mkdir(key, 0750); err != nil {
log.Errorf("Unable to create directory for visor %s", key)
return
deleteOnError := false
if _, err := os.ReadDir(key); err != nil {
if err := os.Mkdir(key, 0750); err != nil {
log.Errorf("Unable to create directory for visor %s", key)
return
}
deleteOnError = true
}
deleteOnError = true
}
// health check before downloading anything else
// delete that folder if the health check fails
err = download(ctx, log, httpC, "health", "health.json", key, maxFileSize)
if err != nil {
if deleteOnErrors {
if deleteOnError {
bulkFolders = append(bulkFolders, key)
// health check before downloading anything else
// delete that folder if the health check fails
err = download(ctx, log, httpC, "health", "health.json", key, maxFileSize)
if err != nil {
if deleteOnErrors {
if deleteOnError {
bulkFolders = append(bulkFolders, key)
}
return
}
return
}
}
if !logOnly {
download(ctx, log, httpC, "node-info.json", "node-info.json", key, maxFileSize) //nolint
}
if !surveyOnly {
for i := 0; i <= duration; i++ {
date := time.Now().AddDate(0, 0, -i).UTC().Format("2006-01-02")
download(ctx, log, httpC, "transport_logs/"+date+".csv", date+".csv", key, maxFileSize) //nolint
if !logOnly {
download(ctx, log, httpC, "node-info.json", "node-info.json", key, maxFileSize) //nolint
}
if !surveyOnly {
for i := 0; i <= duration; i++ {
date := time.Now().AddDate(0, 0, -i).UTC().Format("2006-01-02")
download(ctx, log, httpC, date+".csv", date+".csv", key, maxFileSize) //nolint
}
}
}(v.PubKey, &wg)
batchSize--
if batchSize == 0 {
time.Sleep(15 * time.Second)
batchSize = 50
}
}(v.PubKey, &wg)
batchSize--
if batchSize == 0 {
time.Sleep(15 * time.Second)
batchSize = 50
}
//omit the filters if a file was specified
if fetchFile != "" {
wg.Add(1)
go func(key string, wg *sync.WaitGroup) {
httpC := http.Client{Transport: dmsghttp.MakeHTTPTransport(ctx, dmsgC), Timeout: 10 * time.Second}
defer httpC.CloseIdleConnections()
defer wg.Done()
if _, err := os.ReadDir(key); err != nil {
if err := os.Mkdir(key, 0750); err != nil {
log.Errorf("Unable to create directory for visor %s", key)
return
}
}
_ = download(ctx, log, httpC, fetchFile, fetchFile, key, maxFileSize) //nolint
}(v.PubKey, &wg)
0pcom marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Expand All @@ -210,13 +242,93 @@ func download(ctx context.Context, log *logging.Logger, httpC http.Client, targe
file, _ := os.Create(pubkey + "/" + fileName) //nolint
defer file.Close() //nolint

if err := dmsgget.Download(ctx, log, &httpC, file, target, maxSize); err != nil {
if err := Download(ctx, log, &httpC, file, target, maxSize); err != nil {
log.WithError(err).Errorf("The %s for visor %s not available", fileName, pubkey)
return err
}
return nil
}

// Download downloads a file from the given URL into 'w'.
func Download(ctx context.Context, log logrus.FieldLogger, httpC *http.Client, w io.Writer, urlStr string, maxSize int64) error {
0pcom marked this conversation as resolved.
Show resolved Hide resolved
req, err := http.NewRequest(http.MethodGet, urlStr, nil)
if err != nil {
log.WithError(err).Fatal("Failed to formulate HTTP request.")
}
resp, err := httpC.Do(req)
if err != nil {
return fmt.Errorf("failed to connect to HTTP server: %w", err)
}
0pcom marked this conversation as resolved.
Show resolved Hide resolved
if resp.StatusCode == http.StatusOK {
// 200 OK
if maxSize > 0 {
if resp.ContentLength > maxSize*1024 {
return fmt.Errorf("requested file size is more than allowed size: %d KB > %d KB", (resp.ContentLength / 1024), maxSize)
}
}
n, err := CancellableCopy(ctx, w, resp.Body, resp.ContentLength)
if err != nil {
return fmt.Errorf("download failed at %d/%dB: %w", n, resp.ContentLength, err)
}
defer func() {
0pcom marked this conversation as resolved.
Show resolved Hide resolved
if err := resp.Body.Close(); err != nil {
log.WithError(err).Warn("HTTP Response body closed with non-nil error.")
}
}()
return nil
}
// Convert the non-200 status code to an error
return &httpError{Status: resp.StatusCode}
}

type readerFunc func(p []byte) (n int, err error)

func (rf readerFunc) Read(p []byte) (n int, err error) { return rf(p) }

// CancellableCopy will call the Reader and Writer interface multiple time, in order
0pcom marked this conversation as resolved.
Show resolved Hide resolved
// to copy by chunk (avoiding loading the whole file in memory).
func CancellableCopy(ctx context.Context, w io.Writer, body io.ReadCloser, length int64) (int64, error) {

n, err := io.Copy(io.MultiWriter(w, &ProgressWriter{Total: length}), readerFunc(func(p []byte) (int, error) {

// golang non-blocking channel: https://gobyexample.com/non-blocking-channel-operations
select {

// if context has been canceled
case <-ctx.Done():
// stop process and propagate "Download Canceled" error
return 0, errors.New("Download Canceled")
default:
// otherwise just run default io.Reader implementation
return body.Read(p)
}
}))
return n, err
}

// ProgressWriter prints the progress of a download to stdout.
type ProgressWriter struct {
// atomic requires 64-bit alignment for struct field access
Current int64
Total int64
}

// Write implements io.Writer
func (pw *ProgressWriter) Write(p []byte) (int, error) {
n := len(p)

current := atomic.AddInt64(&pw.Current, int64(n))
total := atomic.LoadInt64(&pw.Total)
pc := fmt.Sprintf("%d%%", current*100/total)
fmt.Printf("Downloading: %d/%dB (%s)", current, total, pc)
if current != total {
fmt.Print("\r")
} else {
fmt.Print("\n")
}
return n, nil
}

func getUptimes(endpoint string, log *logging.Logger) ([]VisorUptimeResponse, error) {
var results []VisorUptimeResponse

Expand Down Expand Up @@ -272,3 +384,11 @@ func getAllDMSGServers() []dmsgServer {
type dmsgServer struct {
PK cipher.PubKey `json:"static"`
}

type httpError struct {
Status int
}

func (e *httpError) Error() string {
return fmt.Sprintf("http error: %d", e.Status)
}
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ require (
github.com/lib/pq v1.10.9
github.com/orandin/lumberjackrus v1.0.1
github.com/pterm/pterm v0.12.62
github.com/skycoin/dmsg v1.3.0-rc1.0.20230619181939-277586bbacd7
github.com/skycoin/skywire-utilities v0.0.0-20230601232053-0abbc9604fbc
github.com/skycoin/dmsg v1.3.11
github.com/skycoin/skywire-utilities v1.3.11
github.com/skycoin/systray v1.10.0
github.com/spf13/pflag v1.0.5
github.com/zcalusic/sysinfo v1.0.0
Expand Down
Loading
Loading