Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding OpenTelemetry tracing #41

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion classads/classads.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func attributeSplitFunc(data []byte, atEOF bool) (advance int, token []byte, err
for i, curChar := range data {
if curChar == '"' && !(i > 0 && data[i-1] == '\\') {
insideQuotes = !insideQuotes
} else if curChar == ';' && !insideQuotes {
} else if (curChar == ';' || curChar == '\n') && !insideQuotes {
// Do not return the semi-colon
// Trim any spaces
return i + 1, bytes.TrimSpace(data[0:i]), nil
Expand Down
18 changes: 18 additions & 0 deletions classads/classads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,21 @@ func TestAttributeSplitFunc(t *testing.T) {
assert.Equal(t, `Url = "stash:///osgconnect/public/$USER/file1; stash:///osgconnect/public/$USER/file2"`, attributes[1])

}

func TestJobAd(t *testing.T) {
input := `AccountingGroup = "group_opportunistic.OSG-Staff.dweitzel"
AcctGroup = "OSG-Staff"
AcctGroupUser = "dweitzel"
AllowedExecuteDuration = 72000
Arguments = ""
AutoClusterAttrs = "DESIRED_Sites,DiskUsage,DynamicSlot,FirstUpdateUptimeGPUsSeconds,HAS_CVMFS_connect_opensciencegrid_org,HAS_CVMFS_oasis_opensciencegrid_org,Has_MPI,HasJava,ITB_Factory,ITB_Sites,JobDurationCategory,JobUniverse,LastHeardFrom,LastUpdateUptimeGPUsSeconds,MachineLastMatchTime,MemoryUsage,OSG_NODE_VALIDATED,PartitionableSlot,ProjectName,Rank,RecentJobDurationAvg,RecentJobDurationCount,RemoteOwner,RequestCpus,RequestDisk,RequestGPUs,RequestK8SNamespace,RequestMemory,SINGULARITY_DISK_IS_FULL,SingularityImage,Slot1_SelfMonitorAge,Slot1_TotalTimeClaimedBusy,Slot1_TotalTimeUnclaimedIdle,StartOfJobUptimeGPUsSeconds,STASHCP_VERIFIED,TotalJobRuntime,undeined,UNDESIRED_Sites,UptimeGPUsSeconds,Want_MPI,XENON_DESIRED_Sites,ConcurrencyLimits,FlockTo,Requirements,IDTOKEN,FromJupyterLab,Owner,Memory,HasExcessiveLoad,IsBlackHole,HAS_MODULES,SINGULARITY_CAN_USE_SIF,FileSystemDomain"
AutoClusterId = 1772
ClusterId = 35063371
Cmd = "condor_exec.exe"`
ad, err := ReadClassAd(strings.NewReader(input))
assert.NoError(t, err, "ParseClassAd() failed")
assert.Equal(t, 1, len(ad), "ParseClassAd() returned %d ads, expected 1", len(ad))
accountGroup, err := ad[0].Get("AccountingGroup")
assert.NoError(t, err, "Get(AccountingGroup) failed")
assert.Equal(t, "group_opportunistic.OSG-Staff.dweitzel", accountGroup.(string))
}
14 changes: 11 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,21 @@ require (
github.com/cavaliercoder/grab v2.0.0+incompatible
github.com/jessevdk/go-flags v1.5.0
github.com/jsipprell/keyctl v1.0.4-0.20211208153515-36ca02672b6c
github.com/kr/pretty v0.3.0 // indirect
github.com/rogpeppe/go-internal v1.9.0 // indirect
github.com/sirupsen/logrus v1.8.1
github.com/spf13/cobra v1.6.1
github.com/stretchr/testify v1.2.2
github.com/stretchr/testify v1.8.1
github.com/studio-b12/gowebdav v0.0.0-20210203212356-8244b5a5f51a
github.com/vbauerster/mpb/v7 v7.1.5
github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a
golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073
golang.org/x/term v0.2.0
go.opentelemetry.io/otel v1.12.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.12.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.12.0
go.opentelemetry.io/otel/sdk v1.12.0
go.opentelemetry.io/otel/trace v1.12.0
golang.org/x/crypto v0.0.0-20221010152910-d6f0a8c073c2
golang.org/x/term v0.3.0
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v3 v3.0.1
)
1,071 changes: 1,064 additions & 7 deletions go.sum

Large diffs are not rendered by default.

24 changes: 23 additions & 1 deletion handle_cvmfs.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,29 @@
package stashcp

import (
"context"
"errors"
"io"
"os"
"path"

log "github.com/sirupsen/logrus"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
)

func download_cvmfs(sourceFile string, destination string, payload *payloadStruct) (int64, error) {
func download_cvmfs(ctx context.Context, sourceFile string, destination string, payload *payloadStruct) (int64, error) {
//Check if file is available in cvfms

// Set the span name
_, span := otel.Tracer(name).Start(ctx, "stashcp.download_cvmfs")
defer span.End()

var cvmfs_file string = path.Join("/cvmfs/stash.osgstorage.org", sourceFile)
span.SetAttributes(
attribute.String("cvmfs_file", cvmfs_file),
)

// Log
log.Debugf("Checking if the CVMFS file exists: %s", cvmfs_file)
Expand All @@ -22,25 +34,33 @@ func download_cvmfs(sourceFile string, destination string, payload *payloadStruc
in, err := os.Open(cvmfs_file)
if err != nil {
log.Debugln("Failed to open the source file:", err)
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return 0, err
}
defer in.Close()

out, err := os.Create(destination)
if err != nil {
log.Debugln("Failed to create destination file:", err)
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return 0, err
}
defer out.Close()

_, err = io.Copy(out, in)
if err != nil {
log.Debugln("Copy of file failed:", err)
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return 0, err
}
err = out.Close()
if err != nil {
log.Debugln("Error while closing output file:", err)
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return 0, err
}
log.Debug("Succesfully copied file from CVMFS!")
Expand All @@ -56,6 +76,7 @@ func download_cvmfs(sourceFile string, destination string, payload *payloadStruc

} else {
log.Debugf("CVMFS File does not exist")
span.SetStatus(codes.Error, "CVMFS File does not exist")
return 0, errors.New("CVMFS File does not exist")
}

Expand All @@ -64,5 +85,6 @@ func download_cvmfs(sourceFile string, destination string, payload *payloadStruc
if err != nil {
return 0, err
}
span.SetAttributes(attribute.Int64("size", dest_stat.Size()))
return dest_stat.Size(), nil
}
54 changes: 47 additions & 7 deletions handle_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ import (
"github.com/studio-b12/gowebdav"
"github.com/vbauerster/mpb/v7"
"github.com/vbauerster/mpb/v7/decor"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)

var p = mpb.New()
Expand Down Expand Up @@ -86,6 +90,9 @@ type TransferDetails struct {

// Proxy specifies if a proxy should be used
Proxy bool

// Cache being accessed
Cache Cache
}

// NewTransferDetails creates the TransferDetails struct with the given cache
Expand Down Expand Up @@ -122,6 +129,7 @@ func NewTransferDetails(cache Cache, https bool) []TransferDetails {
details = append(details, TransferDetails{
Url: *cacheURL,
Proxy: false,
Cache: cache,
})
// Strip the port off and add 8443
cacheURL.Host = cacheURL.Host[:len(cacheURL.Host)-5] + ":8443"
Expand All @@ -130,6 +138,7 @@ func NewTransferDetails(cache Cache, https bool) []TransferDetails {
details = append(details, TransferDetails{
Url: *cacheURL,
Proxy: false,
Cache: cache,
})
} else {
cacheURL.Scheme = "http"
Expand All @@ -139,11 +148,13 @@ func NewTransferDetails(cache Cache, https bool) []TransferDetails {
details = append(details, TransferDetails{
Url: *cacheURL,
Proxy: true,
Cache: cache,
})
if canDisableProxy {
details = append(details, TransferDetails{
Url: *cacheURL,
Proxy: false,
Cache: cache,
})
}
}
Expand All @@ -156,7 +167,7 @@ type TransferResults struct {
Downloaded int64
}

func download_http(source string, destination string, payload *payloadStruct, namespace Namespace, recursive bool, tokenName string) (bytesTransferred int64, err error) {
func download_http(ctx context.Context, source string, destination string, payload *payloadStruct, namespace Namespace, recursive bool, tokenName string) (bytesTransferred int64, err error) {

// First, create a handler for any panics that occur
defer func() {
Expand All @@ -171,6 +182,9 @@ func download_http(source string, destination string, payload *payloadStruct, na
}
}()

spanCtx, span := otel.Tracer(name).Start(ctx, "stashcp.download_http")
defer span.End()

// Generate the downloadUrl
var token string
if namespace.UseTokenOnRead {
Expand Down Expand Up @@ -216,6 +230,7 @@ func download_http(source string, destination string, payload *payloadStruct, na
cachesToTry = len(closestNamespaceCaches)
}
log.Debugln("Trying the caches:", closestNamespaceCaches[:cachesToTry])
span.SetAttributes(attribute.String("caches", fmt.Sprintf("%v", closestNamespaceCaches[:cachesToTry])))
var transfers []TransferDetails
downloadUrl := url.URL{Path: source}
var files []string
Expand Down Expand Up @@ -253,7 +268,7 @@ func download_http(source string, destination string, payload *payloadStruct, na
// Start the workers
for i := 1; i <= 5; i++ {
wg.Add(1)
go startDownloadWorker(source, destination, token, transfers, &wg, workChan, results)
go startDownloadWorker(spanCtx, source, destination, token, transfers, &wg, workChan, results)
}

// For each file, send it to the worker
Expand Down Expand Up @@ -285,7 +300,7 @@ func download_http(source string, destination string, payload *payloadStruct, na

}

func startDownloadWorker(source string, destination string, token string, transfers []TransferDetails, wg *sync.WaitGroup, workChan <-chan string, results chan<- TransferResults) {
func startDownloadWorker(ctx context.Context, source string, destination string, token string, transfers []TransferDetails, wg *sync.WaitGroup, workChan <-chan string, results chan<- TransferResults) {

defer wg.Done()
var success bool
Expand All @@ -301,18 +316,35 @@ func startDownloadWorker(source string, destination string, token string, transf
continue
}
for _, transfer := range transfers {
// New span for each transfer
transferCtx, transferSpan := otel.Tracer(name).Start(ctx, "stashcp.startDownloadWorker.transfer")
transfer.Url.Path = file
log.Debugln("Constructed URL:", transfer.Url.String())
if downloaded, err = DownloadHTTP(transfer, finalDest, token); err != nil {
transferSpan.SetAttributes(
attribute.String("url", transfer.Url.String()),
attribute.Bool("proxy", transfer.Proxy),
attribute.String("source", source),
attribute.String("destination", destination),
attribute.String("cache", transfer.Cache.Endpoint),
attribute.String("cache_resource", transfer.Cache.Resource),
)
transferSpan.SetAttributes(attribute.String("url", transfer.Url.String()))
if downloaded, err = DownloadHTTP(transferCtx, transfer, finalDest, token); err != nil {
log.Debugln("Failed to download:", err)
toAccum := errors.New("Failed to download from " + transfer.Url.String() +
" + proxy=" + strconv.FormatBool(transfer.Proxy) +
": " + err.Error())
AddError(toAccum)
transferSpan.SetStatus(codes.Error, toAccum.Error())
transferSpan.RecordError(toAccum)
transferSpan.End()
continue
} else {
log.Debugln("Downloaded bytes:", downloaded)
success = true
transferSpan.SetAttributes(attribute.Int64("downloaded", downloaded))
transferSpan.SetStatus(codes.Ok, "Downloaded")
transferSpan.End()
break
}

Expand All @@ -331,8 +363,9 @@ func startDownloadWorker(source string, destination string, token string, transf
}

// DownloadHTTP - Perform the actual download of the file
func DownloadHTTP(transfer TransferDetails, dest string, token string) (int64, error) {
func DownloadHTTP(traceCtx context.Context, transfer TransferDetails, dest string, token string) (int64, error) {

span := trace.SpanFromContext(traceCtx)
// Create the client, request, and context
client := grab.NewClient()
transport := http.Transport{
Expand Down Expand Up @@ -392,6 +425,8 @@ func DownloadHTTP(transfer TransferDetails, dest string, token string) (int64, e
// Check the error real quick
if resp.IsComplete() {
if err := resp.Err(); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
log.Errorln("Failed to download:", err)
return 0, &ConnectionSetupError{Err: err}
}
Expand Down Expand Up @@ -439,13 +474,15 @@ Loop:
continue
} else if startBelowLimit == 0 {
log.Warnln("Download speed of ", resp.BytesPerSecond(), "bytes/s", " is below the limit of", downloadLimit, "bytes/s")
span.AddEvent("slow_download", trace.WithAttributes(attribute.Int64("downloaded", resp.BytesComplete()), attribute.Int64("download_limit", downloadLimit)))
startBelowLimit = time.Now().Unix()
continue
} else if (time.Now().Unix() - startBelowLimit) < 30 {
// If the download is below the threshold for less than 30 seconds, continue
continue
}
// The download is below the threshold for more than 30 seconds, cancel the download
span.AddEvent("Download too slow, cancelling", trace.WithAttributes(attribute.Int64("downloaded", resp.BytesComplete()), attribute.Int64("download_limit", downloadLimit)))
cancel()
if Options.ProgressBars {
var cancelledProgressBar = p.AddBar(0,
Expand All @@ -463,7 +500,6 @@ Loop:
progressBar.SetTotal(resp.Size, true)
cancelledProgressBar.SetTotal(resp.Size, true)
}

return 0, &SlowTransferError{
BytesTransferred: resp.BytesComplete(),
BytesPerSecond: int64(resp.BytesPerSecond()),
Expand Down Expand Up @@ -548,7 +584,11 @@ func (pr *ProgressReader) Close() error {
}

// UploadFile Uploads a file using HTTP
func UploadFile(src string, dest *url.URL, token string, namespace Namespace) (int64, error) {
func UploadFile(ctx context.Context, src string, dest *url.URL, token string, namespace Namespace) (int64, error) {

// Create the span
_, span := otel.Tracer(name).Start(ctx, "UploadFile")
defer span.End()

log.Debugln("In UploadFile")
log.Debugln("Dest", dest.String())
Expand Down
3 changes: 2 additions & 1 deletion handle_xrootd.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package stashcp

import (
"context"
"errors"
)

func download_xrootd(sourceFile string, destination string, payload *payloadStruct) (int64, error) {
func download_xrootd(ctx context.Context, sourceFile string, destination string, payload *payloadStruct) (int64, error) {

// Download from the nearest cache, if that fails, fallback to the stash origin.
return 0, errors.New("XrootD not implemented")
Expand Down
Loading