Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implements passthrough to allow filtering and progress implementation #8

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

NachE
Copy link

@NachE NachE commented Aug 6, 2024

Introduces an anonymous function attribute Passthrough in opts param that allows manipulation and filtering objects, which can represent either send or receive jobs.

This could close issue #3

@povsister
Copy link
Owner

Thank you for the contribution!

I think this is a reasonable enhancement in making copying jobs more observable, but there are several things need to be discussed.

As I have mentioned in issue #3, I think a virtual filesystem like fs.FS which implements file type/extension filtering by its own, is a better choice to achieve such purpose. We can simply accept fs.FS as input, calling fs.WalkDir(fs, ".", WalkDirFunc)and processing those directories/files when WalkDirFunc returns nil error.
It help us keep simple and focus on copying works. Separate various filtering/selecting tasks into different WalkDirFunc implementations provided by users.

I believe thatscp itself should be easy to use and understand.
Thus, introducing a "interceptor" mechanism which is able to manipulate vital file properties like name/path etc, is consider to be not acceptable.

Here is my opinion of filtering. I think a dropin replace for traverse func with fs.WalkDir is enough for that.
I don't have much idea on progress reporting right now, but I concern it might be too "heavy" for using io.MultiWrite on it.

CopyDirFsToRemote(localFS fs.FS, remoteTarget string, *DirTransferOption) error 

var (
    SkipAll = fs.SkipAll
    SkipDir = fs.SkipDir
    // this is added for skipping files
    SkipFile = errors.New("skip this file")
)

type DirTransferOption struct {
    ...

    // WalkDirFunc is called on visiting each file or directory.
    // Compared to fs.WalkDirFunc, It adds additional capability to 
    // skip certain files when SkipFile error is returned.
    WalkDirFunc fs.WalkDirFunc
}

// here is the WalkDirFunc signature for reference
// 
// WalkDirFunc is the type of the function called by [WalkDir] to visit
// each file or directory.
//
// The path argument contains the argument to [WalkDir] as a prefix.
// That is, if WalkDir is called with root argument "dir" and finds a file
// named "a" in that directory, the walk function will be called with
// argument "dir/a".
//
// The d argument is the [DirEntry] for the named path.
//
// The error result returned by the function controls how [WalkDir]
// continues. If the function returns the special value [SkipDir], WalkDir
// skips the current directory (path if d.IsDir() is true, otherwise
// path's parent directory). If the function returns the special value
// [SkipAll], WalkDir skips all remaining files and directories. Otherwise,
// if the function returns a non-nil error, WalkDir stops entirely and
// returns that error.
//
// The err argument reports an error related to path, signaling that
// [WalkDir] will not walk into that directory. The function can decide how
// to handle that error; as described earlier, returning the error will
// cause WalkDir to stop walking the entire tree.
//
// [WalkDir] calls the function with a non-nil err argument in two cases.
//
// First, if the initial [Stat] on the root directory fails, WalkDir
// calls the function with path set to root, d set to nil, and err set to
// the error from [fs.Stat].
//
// Second, if a directory's ReadDir method (see [ReadDirFile]) fails, WalkDir calls the
// function with path set to the directory's path, d set to an
// [DirEntry] describing the directory, and err set to the error from
// ReadDir. In this second case, the function is called twice with the
// path of the directory: the first call is before the directory read is
// attempted and has err set to nil, giving the function a chance to
// return [SkipDir] or [SkipAll] and avoid the ReadDir entirely. The second call
// is after a failed ReadDir and reports the error from ReadDir.
// (If ReadDir succeeds, there is no second call.)
//
// The differences between WalkDirFunc compared to [path/filepath.WalkFunc] are:
//
//   - The second argument has type [DirEntry] instead of [FileInfo].
//   - The function is called before reading a directory, to allow [SkipDir]
//     or [SkipAll] to bypass the directory read entirely or skip all remaining
//     files and directories respectively.
//   - If a directory read fails, the function is called a second time
//     for that directory to report the error.
type WalkDirFunc func(path string, d DirEntry, err error) error

@NachE
Copy link
Author

NachE commented Aug 7, 2024

Thank you for your feedback and for considering my contribution!

I appreciate your suggestion regarding the use of a virtual filesystem like fs.FS for file type/extension filtering and agree that it can simplify the process by focusing on the copying tasks. However, I would like to point out that the proposed solution seems to primarily address file uploads. It doesn't appear to cover file downloads, which is an essential part of many SCP operations.

Additionally, regarding progress reporting, while I understand your concern about the potential overhead of using io.MultiWrite, it's a commonly used approach for implementing progress bars in various applications. It allows us to track progress efficiently without significantly impacting performance. I believe incorporating io.MultiWrite could provide a more user-friendly experience by giving real-time feedback during file transfers, which many users find valuable.

As a potential solution, I suggest we handle this pull request by focusing on the implementation of the progress bar. Specifically, we could add an io.Writer attribute to the configuration object and use it in conjunction with io.MultiWrite if it has been set. This way, we can provide progress reporting functionality without complicating the current structure too much.

For the file filtering and selection enhancements, we could consider addressing them in a separate pull request. This would allow us to thoroughly discuss and implement the best approach for those features without conflating them with the progress reporting changes.

Once again, thank you for your guidance. I look forward to your thoughts on these considerations and how we can proceed.

@povsister
Copy link
Owner

However, I would like to point out that the proposed solution seems to primarily address file uploads. It doesn't appear to cover file downloads, which is an essential part of many SCP operations.

You are absolutely right on that, for API & experience consistency, filtering should be able to apply on both upload & download.
Thank you for reminding me about that. The idea just came out of my mind, now proved to be impractical.

For the file filtering and selection enhancements, we could consider addressing them in a separate pull request. This would allow us to thoroughly discuss and implement the best approach for those features without conflating them with the progress reporting changes.

I agree, filtering design needs more discussion and consideration.
Solving the progress reporting issue is more practical.

I dig a littler deeper into io.MultiWriter, finding out that it just run a loop to write all io.Writer inside, one after another.
Apparently, the writer given in option, hoping to monitor progress, does not actually represent "bytes written to/received from wire".

I suggest using the return n of io.Writer/Reader for progress reporting, this may require some wrapping to stream.In/Out.
The anonymous function specified in option will be called on each Write/Read returns, with file metadata and tranferred bytes+total bytes.

For example:

// should be readonly and monitoring purpose only ?
type TransferMetaReadOnly struct {
  IsDir bool
  Name string // file or directory name
  Path string // directory path without name, could be absolutely or relatively, depending on condition
  TotalSize int64 // bytes
  TransferredSize int64 // bytes
  Perm os.FileMode // is this necessary ?
}

Additionally, a single write/read should be enough for transferring tiny file, but this might not be enough for progress reporting, An extra call to the anonymous function with proper metadata should be executed before write/read, in order to have the "start" and "end" event properly generated.

@NachE
Copy link
Author

NachE commented Aug 7, 2024

Regarding the use of io.MultiWriter, the bytes written in the io.Copy(multiwriter, stream) call are indeed the exact bytes of the file being transferred. The io.Writer designated for progress reporting within MultiWriter is still a call to the .Write() function with each chunk intended to be written.

The approach of wrapping stream.In/Out is certainly viable. However, since we are using io.Copy(), wrapping stream.In would mean wrapping the Write() method. For example:

func (w *Wrap) Write(p []byte) (n int, err error) {
  w.callbackfunc(somestuff) // some stuff to communicate progress
  return w.originalStreamIn.Write(p)
}

This means that for each chunk written (since io.Copy copies in small chunks), there would be a call to Wrap.Write(), and thus a call to w.callbackfunc.

If we were to wrap Read(), we would need to wrap sendJob.Reader for uploads and stream.Out for downloads. While this is feasible, it does introduce a certain complexity and might be considered an unusual approach.

I am eager to understand how you envision the solution in more detail. If you could provide further clarification or examples of how you would like the progress reporting to be implemented, I would be more than willing to adapt and implement it accordingly.

@povsister
Copy link
Owner

I am really busy on work today. Sorry for the delayed response.

No problem, let's go through your design and explain my concern, one by one.

The io.Writer designated for progress reporting within MultiWriter is still a call to the .Write() function with each chunk intended to be written.

(Feel free to correct me if I didn't get your point)
I think, you are going to replace the Writer with a MultiWriter [original, monitoring], because the added monitoring Writer will be called on each chunk written to original Writer.
By adding up written chunks' length, the progress of file during sending/receiving could be calculated.

This make sense for the most of time. But, as I have mentioned in previous post: looking a little deeper into implementation of MultiWriter, you will find that those writers inside MultiWriter are actually called one by one: If one fails to write out the data, MultiWriter exits immediately, reports error to its caller, and all the subsequent Writers are discarded.

Assuming the first chunk is 4MiB, and the file total size is 16MiB. An error occurs at 2MiB written to original Writer, according to the implementation described above, the monitoring Writer won't be called and the error will be immediately reported. Apparently, the real progress for transferring should be 2/16*100%=12.5%, but you actually get 0% with an IO error.

Here are some definition issue/corner cases need to be discussed.

  • How do we define "progress" of a file transfer ?
    I prefer using the bytes written to wire, divided by total size of current file, as the definition of progress.
    Instead of “mirroring write" which might not be accurate when error occurs, statistics collected by wrapping Write function is more precise and perfectly satisfis the definition of "bytes written to wire"

  • How do we calculate the progress of a partial transmitted chunk on error ?
    According to the "progress definition" above, I think we'd better include those bytes successfully transmitted of the chunk while calculating, not silently discard the partial failed chunk.

  • How do we define the observation approach of progress reporting ? Is there any key points/events ?
    Like other notification system, events are the key-point.
    I think, for monitoring purpose, "start" and "end" events must be generated for the caller on each item. Thus, there must be at least 2 calls to the anonymous callback function: one at the beginning, and one at the ending. The intermediate state "transmitting" might not be very useful for tiny files, and the chance when we could see it also depends on how io.Copy acts on tiny/huge files.

    If we are going to provide an approach for monitoring, WHAT and WHEN the caller get notified must be declared and guaranteed in advance.
    For example, you can not say: Caller will be notified twice or more times during transmission. It depends on the size of file.
    You should say: Caller will be at least notified twice(on beginning and ending), plus several times of "transmitting" events with the count of bytes already transmitted.
    And, to make it better, we can mandatory split file into chunks by using io.CopyN, it helps us always generate "transmitting" events to the caller at a relatively fixed rate, no matter how big/small the file is.

  • Performance issue.
    Both you and my approach are synchronous. Next chunk won't be written to wire until the callback returns. If user do some thing slow in the callback(eg: drawing a bar on screen), it will significantly slowdown the transmitting.
    This might not be important as we do not concentrate on performance, but at least there should be a warning about performance degradation on such usage.

That's a lot of discussion above. Let me file a checklist if you are still interested in.
Assuming we are going to make file transfer observable.

  1. List out what metadata should be provided for observer. (eg: file name/path, transmitted size, total size etc)
  2. Give an explanation of definition/terminology introduced if any.
  3. Declare WHEN the observer will be notified with those metadata.
  4. Potential side effects, or a brief introduction of how the monitoring mechanism works.

In a nutshell, I hope it can be defined clear and implemented without confusion or undefined behavior.

Thank you again for your patience and devotion.

@NachE
Copy link
Author

NachE commented Aug 8, 2024

No worries at all about the delay! I completely understand that work can get busy, and I appreciate you taking the time to provide such a detailed response.

It’s clear that we’re both aligned on the complexity and importance of solving this problem in a robust way. I appreciate your thoroughness in outlining the potential issues and the checklist for defining how progress reporting should work.

Given the points you’ve raised and the checklist you provided, I’ll take some time to consider and propose a solution that addresses these concerns. I believe this will help us continue iterating on the discussion and refine the approach to ensure we have a clear, well-defined, and reliable implementation.

@NachE
Copy link
Author

NachE commented Aug 9, 2024

I wanted to let you know that I've updated the pull request with a new proposal for the code.

In this proposal, I've introduced a new function copyWithCallback(dst io.Writer, src io.Reader, buf []byte, cb func(n int64)) (written int64, err error). This function is based on io.copyBuffer, but it accepts an anonymous callback function that is called with each chunk written. This function is used both for uploads and downloads within the new methods I've added to sessionStream, called get(w io.Writer, ti *streamTransferInfo) (n int64, err error) and put(r io.Reader, ti *streamTransferInfo) (n int64, err error).

These methods are responsible for sending the start, tick, and end events to the anonymous function that the user configures in the DirTransferOption object, called ObserverCallback func(scp.ObserveEventType, scp.TransferInfo). This callback function receives the event type as the first parameter and the transfer information as the second parameter, allowing the user to track the number of bytes transmitted.

Additionally, the calls to ObserverCallback are executed in separate goroutines, so they won't affect the transfer performance.

As a small improvement, I've also increased the size of the transfer buffer to try and enhance the transfer speed for small files.

I've made an effort to include detailed comments in the code to ensure it is clear and easy to understand.

Of course, I’m open to any changes or suggestions you may have. We can take this code as a starting point to align our positions and continue refining the solution.

Thank you for your time and feedback!

Implement callback mechanism for file transfer events and enhance TransferInfo interface with detailed transfer data
@povsister
Copy link
Owner

Thank you for the detailed explanation!

I've noticed a lot of changes since last time. This might need more time for reviewing and advice.

@NachE
Copy link
Author

NachE commented Aug 16, 2024

I've just made a new commit fixing a concurrency issue with the start/tick/end events. Now, each file will send the start, tick, and end events at least once.

@NachE
Copy link
Author

NachE commented Aug 16, 2024

I've tested the code in a real-world environment, and the proposed solution is actually faster than the original scp command. I've attached an image to show the results.

image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants