@@ -17,6 +17,7 @@ use sbp::messages::{
17
17
SBP ,
18
18
} ;
19
19
20
+ use crate :: errors:: FILEIO_CHANNEL_SEND_FAILURE ;
20
21
use crate :: types:: { MsgSender , Result } ;
21
22
22
23
const MAX_RETRIES : usize = 20 ;
@@ -88,7 +89,9 @@ impl<'a> Fileio<'a> {
88
89
backoff. snooze ( ) ;
89
90
}
90
91
send_msg ( sequence, offset) ?;
91
- req_tx. send ( ( sequence, ReadReq :: new ( offset) ) ) . unwrap ( ) ;
92
+ req_tx
93
+ . send ( ( sequence, ReadReq :: new ( offset) ) )
94
+ . expect ( FILEIO_CHANNEL_SEND_FAILURE ) ;
92
95
offset += READ_CHUNK_SIZE as u32 ;
93
96
sequence += 1 ;
94
97
open_requests. fetch_add ( 1 ) ;
@@ -98,7 +101,7 @@ impl<'a> Fileio<'a> {
98
101
} ) ;
99
102
100
103
let key = self . link . register ( move |msg : MsgFileioReadResp | {
101
- res_tx. send ( msg) . unwrap ( ) ;
104
+ res_tx. send ( msg) . expect ( FILEIO_CHANNEL_SEND_FAILURE ) ;
102
105
} ) ;
103
106
104
107
loop {
@@ -127,7 +130,7 @@ impl<'a> Fileio<'a> {
127
130
open_requests. fetch_sub( 1 ) ;
128
131
if !last_sent && bytes_read != READ_CHUNK_SIZE as usize {
129
132
last_sent = true ;
130
- stop_req_tx. send( true ) . unwrap ( ) ;
133
+ stop_req_tx. send( true ) . expect ( FILEIO_CHANNEL_SEND_FAILURE ) ;
131
134
}
132
135
if last_sent && open_requests. load( ) == 0 {
133
136
break
@@ -230,7 +233,9 @@ impl<'a> Fileio<'a> {
230
233
let chunk_len = std:: cmp:: min ( state. chunk_size , data_len - slice_offset) ;
231
234
let req = WriteReq :: new ( slice_offset, end_offset, chunk_len < state. chunk_size ) ;
232
235
send_msg ( & state, & req) ?;
233
- req_tx. send ( ( state. clone ( ) , req) ) . unwrap ( ) ;
236
+ req_tx
237
+ . send ( ( state. clone ( ) , req) )
238
+ . expect ( FILEIO_CHANNEL_SEND_FAILURE ) ;
234
239
state. update ( chunk_len) ;
235
240
slice_offset += chunk_len;
236
241
open_requests. fetch_add ( 1 ) ;
@@ -240,7 +245,7 @@ impl<'a> Fileio<'a> {
240
245
} ) ;
241
246
242
247
let key = self . link . register ( move |msg : MsgFileioWriteResp | {
243
- res_tx. send ( msg) . unwrap ( ) ;
248
+ res_tx. send ( msg) . expect ( FILEIO_CHANNEL_SEND_FAILURE ) ;
244
249
} ) ;
245
250
246
251
let mut pending: HashMap < u32 , ( WriteState , WriteReq ) > = HashMap :: new ( ) ;
@@ -294,7 +299,7 @@ impl<'a> Fileio<'a> {
294
299
let ( tx, rx) = channel:: unbounded ( ) ;
295
300
296
301
let key = self . link . register ( move |msg : MsgFileioReadDirResp | {
297
- tx. send ( msg) . unwrap ( ) ;
302
+ tx. send ( msg) . expect ( FILEIO_CHANNEL_SEND_FAILURE ) ;
298
303
} ) ;
299
304
300
305
self . sender . send ( SBP :: from ( MsgFileioReadDirReq {
@@ -358,10 +363,11 @@ impl<'a> Fileio<'a> {
358
363
let sequence = new_sequence ( ) ;
359
364
let ( stop_tx, stop_rx) = channel:: bounded ( 0 ) ;
360
365
let ( tx, rx) = channel:: bounded ( 1 ) ;
361
-
366
+ let stop_tx_clone = stop_tx . clone ( ) ;
362
367
let key = self . link . register ( move |msg : MsgFileioConfigResp | {
363
- tx. send ( FileioConfig :: new ( msg) ) . unwrap ( ) ;
364
- stop_tx. send ( true ) . unwrap ( ) ;
368
+ tx. send ( FileioConfig :: new ( msg) )
369
+ . expect ( FILEIO_CHANNEL_SEND_FAILURE ) ;
370
+ stop_tx_clone. send ( true ) . expect ( FILEIO_CHANNEL_SEND_FAILURE ) ;
365
371
} ) ;
366
372
367
373
let sender = & self . sender ;
@@ -376,10 +382,12 @@ impl<'a> Fileio<'a> {
376
382
}
377
383
} ) ;
378
384
379
- match rx. recv_timeout ( CONFIG_REQ_TIMEOUT ) {
385
+ let res = match rx. recv_timeout ( CONFIG_REQ_TIMEOUT ) {
380
386
Ok ( config) => config,
381
387
Err ( _) => Default :: default ( ) ,
382
- }
388
+ } ;
389
+ stop_tx. send ( true ) . expect ( FILEIO_CHANNEL_SEND_FAILURE ) ;
390
+ res
383
391
} )
384
392
. unwrap ( ) ;
385
393
0 commit comments