@@ -155,6 +155,21 @@ impl<'a> Fileio<'a> {
155
155
/// This operation is NOT atomic. If the write fails and `filename` existed, it is gone forever.
156
156
/// For more context see: https://github.com/swift-nav/console_pp/pull/72#discussion_r654751414
157
157
pub fn overwrite ( & mut self , filename : String , data : impl Read ) -> Result < ( ) > {
158
+ self . overwrite_with_progress ( filename, data, |_| ( ) )
159
+ }
160
+
161
+ /// Deletes `filename` on the remote device (if it exists) and writes the contents of `data` to the file.
162
+ /// This operation is NOT atomic. If the write fails and `filename` existed, it is gone forever.
163
+ /// For more context see: https://github.com/swift-nav/console_pp/pull/72#discussion_r654751414
164
+ pub fn overwrite_with_progress < ' b , F > (
165
+ & mut self ,
166
+ filename : String ,
167
+ data : impl Read ,
168
+ mut on_progress : F ,
169
+ ) -> Result < ( ) >
170
+ where
171
+ F : FnMut ( usize ) + ' b ,
172
+ {
158
173
self . remove ( filename. clone ( ) ) ?;
159
174
160
175
let mut data = BufReader :: new ( data) ;
@@ -166,14 +181,22 @@ impl<'a> Fileio<'a> {
166
181
if bytes_read == 0 {
167
182
break ;
168
183
}
169
- state = self . write_slice ( state, buf) ?;
184
+ state = self . write_slice ( state, buf, & mut on_progress ) ?;
170
185
data. consume ( bytes_read) ;
171
186
}
172
187
173
188
Ok ( ( ) )
174
189
}
175
190
176
- fn write_slice ( & mut self , mut state : WriteState , data : & [ u8 ] ) -> Result < WriteState > {
191
+ fn write_slice < ' b , F > (
192
+ & mut self ,
193
+ mut state : WriteState ,
194
+ data : & [ u8 ] ,
195
+ on_progress : & mut F ,
196
+ ) -> Result < WriteState >
197
+ where
198
+ F : FnMut ( usize ) + ' b ,
199
+ {
177
200
let config = self . fetch_config ( ) ;
178
201
179
202
let ( req_tx, req_rx) = channel:: unbounded ( ) ;
@@ -234,9 +257,11 @@ impl<'a> Fileio<'a> {
234
257
} ,
235
258
recv( res_rx) -> msg => {
236
259
let msg = msg?;
237
- if pending. remove( & msg. sequence) . is_none( ) {
238
- continue
239
- }
260
+ let req = match pending. remove( & msg. sequence) {
261
+ Some ( ( _, req) ) => req,
262
+ _ => continue ,
263
+ } ;
264
+ on_progress( req. end_offset - req. offset) ;
240
265
open_requests. fetch_sub( 1 ) ;
241
266
if last_sent && open_requests. load( ) == 0 {
242
267
break ;
0 commit comments