Skip to content

Commit

Permalink
Merge pull request #1653 from 0pcom/fix-tplogs
Browse files Browse the repository at this point in the history
Fix transport bandwidth logs
  • Loading branch information
jdknives authored Aug 29, 2023
2 parents e206959 + 134214e commit 0984a10
Show file tree
Hide file tree
Showing 622 changed files with 206,606 additions and 90,465 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ jobs:
steps:
- uses: actions/setup-go@v3
with:
go-version: 1.19.x
go-version: 1.20.x
- uses: actions/checkout@v3
- name: Install Requirements
run: |
Expand All @@ -25,7 +25,7 @@ jobs:
steps:
- uses: actions/setup-go@v3
with:
go-version: 1.19.x
go-version: 1.20.x
- uses: actions/checkout@v3
- name: Install Requirements
run: |
Expand All @@ -44,7 +44,7 @@ jobs:
steps:
- uses: actions/setup-go@v3
with:
go-version: 1.19.x
go-version: 1.20.x
- uses: actions/checkout@v3
- name: Install Requirements
run: |
Expand Down
230 changes: 175 additions & 55 deletions cmd/skywire-cli/commands/log/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ import (
"net/http"
"os"
"sync"
"sync/atomic"
"time"

"github.com/hashicorp/go-version"
"github.com/sirupsen/logrus"
"github.com/skycoin/dmsg/pkg/dmsgget"
"github.com/skycoin/dmsg/pkg/dmsghttp"
"github.com/spf13/cobra"
Expand All @@ -38,16 +40,22 @@ var (
logOnly bool
surveyOnly bool
deleteOnErrors bool
fetchFile string
fetchFrom string
writeDir string
)

func init() {
logCmd.Flags().SortFlags = false
logCmd.Flags().StringVarP(&env, "env", "e", "prod", "deployment to get uptimes from")
logCmd.Flags().BoolVarP(&logOnly, "log", "l", false, "fetch only transport logs")
logCmd.Flags().BoolVarP(&surveyOnly, "survey", "v", false, "fetch only surveys")
logCmd.Flags().StringVarP(&fetchFile, "file", "f", "", "fetch only a specific file from all online visors")
logCmd.Flags().StringVarP(&fetchFrom, "pks", "k", "", "fetch only from specific public keys ; semicolon separated")
logCmd.Flags().StringVarP(&writeDir, "dir", "d", "log_collecting", "save files to specified dir")
logCmd.Flags().BoolVarP(&deleteOnErrors, "clean", "c", false, "delete files and folders on errors")
logCmd.Flags().StringVar(&minv, "minv", "v1.3.4", "minimum visor version to fetch from")
logCmd.Flags().IntVarP(&duration, "duration", "n", 0, "numberof days before today to fetch transport logs for")
logCmd.Flags().StringVar(&minv, "minv", "v1.3.11", "minimum visor version to fetch from")
logCmd.Flags().IntVarP(&duration, "duration", "n", 0, "number of days before today to fetch transport logs for")
logCmd.Flags().BoolVar(&allVisors, "all", false, "consider all visors ; no version filtering")
logCmd.Flags().IntVar(&batchSize, "batchSize", 50, "number of visor in each batch")
logCmd.Flags().Int64Var(&maxFileSize, "maxfilesize", 30, "maximum file size allowed to download during collecting logs, in KB")
Expand All @@ -59,13 +67,13 @@ func init() {
logCmd.Flags().VarP(&sk, "sk", "s", "a random key is generated if unspecified\n\r")
}

// RootCmd is surveyCmd
// RootCmd is logCmd
var RootCmd = logCmd

var logCmd = &cobra.Command{
Use: "log",
Short: "survey & transport log collection",
Long: "collect surveys and transport logging from visors which are online in the uptime tracker",
Long: "Fetch health, survey, and transport logging from visors which are online in the uptime tracker\nhttp://ut.skywire.skycoin.com/uptimes?v=v2\nhttp://ut.skywire.skycoin.com/uptimes?v=v2&visors=<pk1>;<pk2>;<pk3>",
Run: func(cmd *cobra.Command, args []string) {
log := logging.MustGetLogger("log-collecting")
if logOnly && surveyOnly {
Expand All @@ -81,15 +89,15 @@ var logCmd = &cobra.Command{
}()

// Preparing directories
if _, err := os.ReadDir("log_collecting"); err != nil {
if err := os.Mkdir("log_collecting", 0750); err != nil {
log.Error("Unable to log_collecting directory")
if _, err := os.ReadDir(writeDir); err != nil {
if err := os.Mkdir(writeDir, 0750); err != nil {
log.Error("Unable to create directory " + writeDir)
return
}
}

if err := os.Chdir("log_collecting"); err != nil {
log.Error("Unable to change directory to log_collecting")
if err := os.Chdir(writeDir); err != nil {
log.Error("Unable to change directory to " + writeDir)
return
}

Expand All @@ -98,13 +106,19 @@ var logCmd = &cobra.Command{
flag.Parse()

// Set the uptime tracker to fetch data from
endpoint := skyenv.UptimeTrackerAddr + "/uptimes?v=v2"
endpoint := skyenv.UptimeTrackerAddr
if env == "test" {
endpoint = skyenv.TestUptimeTrackerAddr + "/uptimes?v=v2"
endpoint = skyenv.TestUptimeTrackerAddr
}
endpoint = endpoint + "/uptimes?v=v2"
if utAddr != "" {
endpoint = utAddr
}

if fetchFrom != "" {
endpoint = endpoint + "&visors=" + fetchFrom
}

//Fetch the uptime data over http
uptimes, err := getUptimes(endpoint, log)
if err != nil {
Expand Down Expand Up @@ -141,56 +155,74 @@ var logCmd = &cobra.Command{
// Get visors data
var wg sync.WaitGroup
for _, v := range uptimes {
//only attempt to fetch from online visors
if v.Online {
if fetchFile == "" {
visorVersion, err := version.NewVersion(v.Version) //nolint
if err != nil {
log.Warnf("The version %s for visor %s is not valid", v.Version, v.PubKey)
continue
}
if !allVisors && visorVersion.LessThan(minimumVersion) {
log.Warnf("The version %s for visor %s does not satisfy our minimum version condition", v.Version, v.PubKey)
continue
}
wg.Add(1)
go func(key string, wg *sync.WaitGroup) {
httpC := http.Client{Transport: dmsghttp.MakeHTTPTransport(ctx, dmsgC), Timeout: 10 * time.Second}
defer httpC.CloseIdleConnections()
defer wg.Done()

visorVersion, err := version.NewVersion(v.Version) //nolint
if err != nil {
log.Warnf("The version %s for visor %s is not valid", v.Version, v.PubKey)
continue
}
if !allVisors && visorVersion.LessThan(minimumVersion) {
log.Warnf("The version %s for visor %s does not satisfy our minimum version condition", v.Version, v.PubKey)
continue
}
wg.Add(1)
go func(key string, wg *sync.WaitGroup) {
httpC := http.Client{Transport: dmsghttp.MakeHTTPTransport(ctx, dmsgC), Timeout: 10 * time.Second}
defer httpC.CloseIdleConnections()
defer wg.Done()

deleteOnError := false
if _, err := os.ReadDir(key); err != nil {
if err := os.Mkdir(key, 0750); err != nil {
log.Errorf("Unable to create directory for visor %s", key)
return
deleteOnError := false
if _, err := os.ReadDir(key); err != nil {
if err := os.Mkdir(key, 0750); err != nil {
log.Errorf("Unable to create directory for visor %s", key)
return
}
deleteOnError = true
}
deleteOnError = true
}
// health check before downloading anything else
// delete that folder if the health check fails
err = download(ctx, log, httpC, "health", "health.json", key, maxFileSize)
if err != nil {
if deleteOnErrors {
if deleteOnError {
bulkFolders = append(bulkFolders, key)
// health check before downloading anything else
// delete that folder if the health check fails
err = download(ctx, log, httpC, "health", "health.json", key, maxFileSize)
if err != nil {
if deleteOnErrors {
if deleteOnError {
bulkFolders = append(bulkFolders, key)
}
return
}
return
}
}
if !logOnly {
download(ctx, log, httpC, "node-info.json", "node-info.json", key, maxFileSize) //nolint
}
if !surveyOnly {
for i := 0; i <= duration; i++ {
date := time.Now().AddDate(0, 0, -i).UTC().Format("2006-01-02")
download(ctx, log, httpC, "transport_logs/"+date+".csv", date+".csv", key, maxFileSize) //nolint
if !logOnly {
download(ctx, log, httpC, "node-info.json", "node-info.json", key, maxFileSize) //nolint
}
if !surveyOnly {
for i := 0; i <= duration; i++ {
date := time.Now().AddDate(0, 0, -i).UTC().Format("2006-01-02")
download(ctx, log, httpC, date+".csv", date+".csv", key, maxFileSize) //nolint
}
}
}(v.PubKey, &wg)
batchSize--
if batchSize == 0 {
time.Sleep(15 * time.Second)
batchSize = 50
}
}(v.PubKey, &wg)
batchSize--
if batchSize == 0 {
time.Sleep(15 * time.Second)
batchSize = 50
}
//omit the filters if a file was specified
if fetchFile != "" {
wg.Add(1)
go func(key string, wg *sync.WaitGroup) {
httpC := http.Client{Transport: dmsghttp.MakeHTTPTransport(ctx, dmsgC), Timeout: 10 * time.Second}
defer httpC.CloseIdleConnections()
defer wg.Done()
if _, err := os.ReadDir(key); err != nil {
if err := os.Mkdir(key, 0750); err != nil {
log.Errorf("Unable to create directory for visor %s", key)
return
}
}
_ = download(ctx, log, httpC, fetchFile, fetchFile, key, maxFileSize) //nolint
}(v.PubKey, &wg)
}
}
}
Expand All @@ -210,13 +242,93 @@ func download(ctx context.Context, log *logging.Logger, httpC http.Client, targe
file, _ := os.Create(pubkey + "/" + fileName) //nolint
defer file.Close() //nolint

if err := dmsgget.Download(ctx, log, &httpC, file, target, maxSize); err != nil {
if err := downloadDmsg(ctx, log, &httpC, file, target, maxSize); err != nil {
log.WithError(err).Errorf("The %s for visor %s not available", fileName, pubkey)
return err
}
return nil
}

// downloadDmsg downloads a file from the given URL into 'w'.
func downloadDmsg(ctx context.Context, log logrus.FieldLogger, httpC *http.Client, w io.Writer, urlStr string, maxSize int64) error {
req, err := http.NewRequest(http.MethodGet, urlStr, nil)
if err != nil {
log.WithError(err).Fatal("Failed to formulate HTTP request.")
}
resp, err := httpC.Do(req)
if err != nil {
return fmt.Errorf("failed to connect to HTTP server: %w", err)
}
if resp.StatusCode == http.StatusOK {
// 200 OK
if maxSize > 0 {
if resp.ContentLength > maxSize*1024 {
return fmt.Errorf("requested file size is more than allowed size: %d KB > %d KB", (resp.ContentLength / 1024), maxSize)
}
}
n, err := CancellableCopy(ctx, w, resp.Body, resp.ContentLength)
if err != nil {
return fmt.Errorf("download failed at %d/%dB: %w", n, resp.ContentLength, err)
}
defer func() {
if err := resp.Body.Close(); err != nil {
log.WithError(err).Warn("HTTP Response body closed with non-nil error.")
}
}()
return nil
}
// Convert the non-200 status code to an error
return &httpError{Status: resp.StatusCode}
}

type readerFunc func(p []byte) (n int, err error)

func (rf readerFunc) Read(p []byte) (n int, err error) { return rf(p) }

// CancellableCopy will call the Reader and Writer interface multiple time, in order
// to copy by chunk (avoiding loading the whole file in memory).
func CancellableCopy(ctx context.Context, w io.Writer, body io.ReadCloser, length int64) (int64, error) {

n, err := io.Copy(io.MultiWriter(w, &ProgressWriter{Total: length}), readerFunc(func(p []byte) (int, error) {

// golang non-blocking channel: https://gobyexample.com/non-blocking-channel-operations
select {

// if context has been canceled
case <-ctx.Done():
// stop process and propagate "Download Canceled" error
return 0, errors.New("Download Canceled")
default:
// otherwise just run default io.Reader implementation
return body.Read(p)
}
}))
return n, err
}

// ProgressWriter prints the progress of a download to stdout.
type ProgressWriter struct {
// atomic requires 64-bit alignment for struct field access
Current int64
Total int64
}

// Write implements io.Writer
func (pw *ProgressWriter) Write(p []byte) (int, error) {
n := len(p)

current := atomic.AddInt64(&pw.Current, int64(n))
total := atomic.LoadInt64(&pw.Total)
pc := fmt.Sprintf("%d%%", current*100/total)
fmt.Printf("Downloading: %d/%dB (%s)", current, total, pc)
if current != total {
fmt.Print("\r")
} else {
fmt.Print("\n")
}
return n, nil
}

func getUptimes(endpoint string, log *logging.Logger) ([]VisorUptimeResponse, error) {
var results []VisorUptimeResponse

Expand Down Expand Up @@ -272,3 +384,11 @@ func getAllDMSGServers() []dmsgServer {
type dmsgServer struct {
PK cipher.PubKey `json:"static"`
}

type httpError struct {
Status int
}

func (e *httpError) Error() string {
return fmt.Sprintf("http error: %d", e.Status)
}
Loading

0 comments on commit 0984a10

Please sign in to comment.