Skip to content

Commit

Permalink
factorising both progress trackers into struct progressTracker.
Browse files Browse the repository at this point in the history
  • Loading branch information
nicolas-graves committed Apr 24, 2023
1 parent 5a12a4f commit 4d39dfb
Showing 1 changed file with 33 additions and 47 deletions.
80 changes: 33 additions & 47 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,6 @@ type Error struct {
Message string `json:"message"`
}

type progressTrackingReader struct {
io.Reader
Oid string
TotalSize int64
Writer *bufio.Writer
ErrWriter *bufio.Writer
bytesRead int64
}

func (r *progressTrackingReader) Read(p []byte) (n int, err error) {
n, err = r.Reader.Read(p)
if n > 0 {
r.bytesRead += int64(n)
api.SendProgress(r.Oid, r.bytesRead, n, r.Writer, r.ErrWriter)
}
return
}

type writerAtWrapper struct {
w io.Writer
}
Expand All @@ -57,27 +39,32 @@ func (waw *writerAtWrapper) WriteAt(p []byte, off int64) (n int, err error) {
return waw.w.Write(p)
}

type progressTrackingWriter struct {
writer io.WriterAt
oid string
totalSize int64
writerResponse *bufio.Writer
errWriter *bufio.Writer
bytesWritten int64
type progressTracker struct {
Reader io.Reader
Writer io.WriterAt
Oid string
TotalSize int64
RespWriter *bufio.Writer
ErrWriter *bufio.Writer
bytesProcessed int64
}

func (ptw *progressTrackingWriter) WriteAt(p []byte, off int64) (int, error) {
n, err := ptw.writer.WriteAt(p, off)
if err != nil {
return n, err
func (rw *progressTracker) Read(p []byte) (n int, err error) {
n, err = rw.Reader.Read(p)
if n > 0 {
rw.bytesProcessed += int64(n)
api.SendProgress(rw.Oid, rw.bytesProcessed, n, rw.RespWriter, rw.ErrWriter)
}
return
}

func (rw *progressTracker) WriteAt(p []byte, off int64) (n int, err error) {
n, err = rw.Writer.WriteAt(p, off)
if n > 0 {
ptw.bytesWritten += int64(n)
api.SendProgress(ptw.oid, ptw.bytesWritten, n, ptw.writerResponse, ptw.errWriter)
rw.bytesProcessed += int64(n)
api.SendProgress(rw.Oid, rw.bytesProcessed, n, rw.RespWriter, rw.ErrWriter)
}

return n, nil
return
}

func Serve(stdin io.Reader, stdout, stderr io.Writer) {
Expand Down Expand Up @@ -148,21 +135,20 @@ func retrieve(oid string, size int64, action *api.Action, writer, errWriter *buf
}()

waw := &writerAtWrapper{file}

ptw := &progressTrackingWriter{
writer: waw,
oid: oid,
totalSize: size,
writerResponse: writer,
errWriter: errWriter,
progressWriter := &progressTracker{
Writer: waw,
Oid: oid,
TotalSize: size,
RespWriter: writer,
ErrWriter: errWriter,
}

downloader := manager.NewDownloader(client, func(d *manager.Downloader) {
d.PartSize = 5 * 1024 * 1024 // 1 MB part size
d.Concurrency = 1 // Concurrent downloads
})

_, err = downloader.Download(context.Background(), ptw, &s3.GetObjectInput{
_, err = downloader.Download(context.Background(), progressWriter, &s3.GetObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(oid),
})
Expand Down Expand Up @@ -199,12 +185,12 @@ func store(oid string, size int64, action *api.Action, writer, errWriter *bufio.
// u.LeavePartsOnError = true // Keep uploaded parts on error
})

progressReader := &progressTrackingReader{
Reader: file,
Oid: oid,
TotalSize: size,
Writer: writer,
ErrWriter: errWriter,
progressReader := &progressTracker{
Reader: file,
Oid: oid,
TotalSize: size,
RespWriter: writer,
ErrWriter: errWriter,
}

_, err = uploader.Upload(context.Background(), &s3.PutObjectInput{
Expand Down

0 comments on commit 4d39dfb

Please sign in to comment.