-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathusb_sync.rs
161 lines (147 loc) · 5.11 KB
/
usb_sync.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
// Related issue: <https://github.com/kevinmehall/nusb/issues/4>.
use crate::Error;
use jni_min_helper::block_for_timeout;
use futures_lite::future::block_on;
use std::{io::ErrorKind, time::Duration};
use nusb::transfer::{Queue, RequestBuffer, TransferError};
type ReadQueue = Queue<RequestBuffer>;
type WriteQueue = Queue<Vec<u8>>;
/// Synchronous wrapper of a `nusb` IN transfer queue.
pub struct SyncReader {
queue: ReadQueue,
buf: Option<Vec<u8>>,
}
impl SyncReader {
/// Wraps the asynchronous queue.
pub fn new(queue: ReadQueue) -> Self {
Self {
queue,
buf: Some(Vec::new()),
}
}
/// It is similar to `read()` in the standard `Read` trait, requiring timeout parameter.
pub fn read(&mut self, buf: &mut [u8], timeout: Duration) -> std::io::Result<usize> {
if buf.is_empty() {
return Ok(0);
}
let buf_async = self.buf.take().unwrap();
// Safety: `RequestBuffer::reuse()` may reserve larger capacity to reach buf.len()
let req = nusb::transfer::RequestBuffer::reuse(buf_async, buf.len());
self.queue.submit(req);
let fut = self.queue.next_complete();
let comp = {
let mut maybe_comp = block_for_timeout(fut, timeout);
if maybe_comp.is_none() {
self.queue.cancel_all(); // the only one
if self.queue.pending() == 0 {
self.buf.replace(Vec::new());
return Err(Error::other("Unable to get the transfer result"));
}
let comp = block_on(self.queue.next_complete());
maybe_comp.replace(comp);
}
maybe_comp.unwrap()
};
let len_reveived = comp.data.len();
let result = match comp.status {
Ok(()) => {
buf[..len_reveived].copy_from_slice(&comp.data);
Ok(len_reveived)
}
Err(TransferError::Cancelled) => {
if len_reveived > 0 {
buf[..len_reveived].copy_from_slice(&comp.data);
Ok(len_reveived)
} else {
Err(Error::from(ErrorKind::TimedOut))
}
}
Err(TransferError::Disconnected) => Err(Error::from(ErrorKind::NotConnected)),
Err(TransferError::Stall) => {
let _ = self.queue.clear_halt();
Err(Error::other(TransferError::Stall))
}
Err(e) => Err(Error::other(e)),
};
self.buf.replace(comp.data);
result
}
}
impl From<ReadQueue> for SyncReader {
fn from(value: ReadQueue) -> Self {
Self::new(value)
}
}
impl From<SyncReader> for ReadQueue {
fn from(value: SyncReader) -> Self {
value.queue
}
}
/// Synchronous wrapper of a `nusb` OUT transfer queue.
pub struct SyncWriter {
queue: WriteQueue,
buf: Option<Vec<u8>>,
}
impl SyncWriter {
/// Wraps the asynchronous queue.
pub fn new(queue: WriteQueue) -> Self {
Self {
queue,
buf: Some(Vec::new()),
}
}
/// It is similar to `write()` in the standard `Write` trait, requiring timeout parameter.
/// It is always synchronous, and `flush()` is not needed.
pub fn write(&mut self, buf: &[u8], timeout: Duration) -> std::io::Result<usize> {
if buf.is_empty() {
return Ok(0);
}
let mut buf_async = self.buf.take().unwrap();
buf_async.clear(); // it has no effect on the allocated capacity
buf_async.extend_from_slice(buf);
self.queue.submit(buf_async);
let fut = self.queue.next_complete();
let comp = {
let mut maybe_comp = block_for_timeout(fut, timeout);
if maybe_comp.is_none() {
self.queue.cancel_all(); // the only one
if self.queue.pending() == 0 {
self.buf.replace(Vec::new());
return Err(Error::other("Unable to get the transfer result"));
}
let comp = block_on(self.queue.next_complete());
maybe_comp.replace(comp);
}
maybe_comp.unwrap()
};
let len_sent = comp.data.actual_length();
let result = match comp.status {
Ok(()) => Ok(len_sent),
Err(TransferError::Cancelled) => {
if len_sent > 0 {
Ok(len_sent)
} else {
Err(Error::from(ErrorKind::TimedOut))
}
}
Err(TransferError::Disconnected) => Err(Error::from(ErrorKind::NotConnected)),
Err(TransferError::Stall) => {
let _ = self.queue.clear_halt();
Err(Error::other(TransferError::Stall))
}
Err(e) => Err(Error::other(e)),
};
self.buf.replace(comp.data.reuse());
result
}
}
impl From<WriteQueue> for SyncWriter {
fn from(value: WriteQueue) -> Self {
Self::new(value)
}
}
impl From<SyncWriter> for WriteQueue {
fn from(value: SyncWriter) -> Self {
value.queue
}
}