-
Notifications
You must be signed in to change notification settings - Fork 126
/
s3_optimized.rs
208 lines (183 loc) · 5.83 KB
/
s3_optimized.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
use std::mem::ManuallyDrop;
use std::cell::UnsafeCell;
use std::ops::Deref;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::fence;
use std::sync::atomic::Ordering::{Acquire, Relaxed, Release};
use std::ptr::NonNull;
pub struct Arc<T> {
ptr: NonNull<ArcData<T>>,
}
unsafe impl<T: Sync + Send> Send for Arc<T> {}
unsafe impl<T: Sync + Send> Sync for Arc<T> {}
pub struct Weak<T> {
ptr: NonNull<ArcData<T>>,
}
unsafe impl<T: Sync + Send> Send for Weak<T> {}
unsafe impl<T: Sync + Send> Sync for Weak<T> {}
struct ArcData<T> {
/// Number of `Arc`s.
data_ref_count: AtomicUsize,
/// Number of `Weak`s, plus one if there are any `Arc`s.
alloc_ref_count: AtomicUsize,
/// The data. Dropped if there are only weak pointers left.
data: UnsafeCell<ManuallyDrop<T>>,
}
impl<T> Arc<T> {
pub fn new(data: T) -> Arc<T> {
Arc {
ptr: NonNull::from(Box::leak(Box::new(ArcData {
alloc_ref_count: AtomicUsize::new(1),
data_ref_count: AtomicUsize::new(1),
data: UnsafeCell::new(ManuallyDrop::new(data)),
}))),
}
}
fn data(&self) -> &ArcData<T> {
unsafe { self.ptr.as_ref() }
}
pub fn get_mut(arc: &mut Self) -> Option<&mut T> {
// Acquire matches Weak::drop's Release decrement, to make sure any
// upgraded pointers are visible in the next data_ref_count.load.
if arc.data().alloc_ref_count.compare_exchange(
1, usize::MAX, Acquire, Relaxed
).is_err() {
return None;
}
let is_unique = arc.data().data_ref_count.load(Relaxed) == 1;
// Release matches Acquire increment in `downgrade`, to make sure any
// changes to the data_ref_count that come after `downgrade` don't
// change the is_unique result above.
arc.data().alloc_ref_count.store(1, Release);
if !is_unique {
return None;
}
// Acquire to match Arc::drop's Release decrement, to make sure nothing
// else is accessing the data.
fence(Acquire);
unsafe { Some(&mut *arc.data().data.get()) }
}
pub fn downgrade(arc: &Self) -> Weak<T> {
let mut n = arc.data().alloc_ref_count.load(Relaxed);
loop {
if n == usize::MAX {
std::hint::spin_loop();
n = arc.data().alloc_ref_count.load(Relaxed);
continue;
}
assert!(n <= usize::MAX / 2);
// Acquire synchronises with get_mut's release-store.
if let Err(e) =
arc.data()
.alloc_ref_count
.compare_exchange_weak(n, n + 1, Acquire, Relaxed)
{
n = e;
continue;
}
return Weak { ptr: arc.ptr };
}
}
}
impl<T> Deref for Arc<T> {
type Target = T;
fn deref(&self) -> &T {
// Safety: Since there's an Arc to the data,
// the data exists and may be shared.
unsafe { &*self.data().data.get() }
}
}
impl<T> Weak<T> {
fn data(&self) -> &ArcData<T> {
unsafe { self.ptr.as_ref() }
}
pub fn upgrade(&self) -> Option<Arc<T>> {
let mut n = self.data().data_ref_count.load(Relaxed);
loop {
if n == 0 {
return None;
}
assert!(n <= usize::MAX / 2);
if let Err(e) =
self.data()
.data_ref_count
.compare_exchange_weak(n, n + 1, Relaxed, Relaxed)
{
n = e;
continue;
}
return Some(Arc { ptr: self.ptr });
}
}
}
impl<T> Clone for Weak<T> {
fn clone(&self) -> Self {
if self.data().alloc_ref_count.fetch_add(1, Relaxed) > usize::MAX / 2 {
std::process::abort();
}
Weak { ptr: self.ptr }
}
}
impl<T> Drop for Weak<T> {
fn drop(&mut self) {
if self.data().alloc_ref_count.fetch_sub(1, Release) == 1 {
fence(Acquire);
unsafe {
drop(Box::from_raw(self.ptr.as_ptr()));
}
}
}
}
impl<T> Clone for Arc<T> {
fn clone(&self) -> Self {
if self.data().data_ref_count.fetch_add(1, Relaxed) > usize::MAX / 2 {
std::process::abort();
}
Arc { ptr: self.ptr }
}
}
impl<T> Drop for Arc<T> {
fn drop(&mut self) {
if self.data().data_ref_count.fetch_sub(1, Release) == 1 {
fence(Acquire);
// Safety: The data reference counter is zero,
// so nothing will access the data anymore.
unsafe {
ManuallyDrop::drop(&mut *self.data().data.get());
}
// Now that there's no `Arc<T>`s left,
// drop the implicit weak pointer that represented all `Arc<T>`s.
drop(Weak { ptr: self.ptr });
}
}
}
#[test]
fn test() {
static NUM_DROPS: AtomicUsize = AtomicUsize::new(0);
struct DetectDrop;
impl Drop for DetectDrop {
fn drop(&mut self) {
NUM_DROPS.fetch_add(1, Relaxed);
}
}
// Create an Arc with two weak pointers.
let x = Arc::new(("hello", DetectDrop));
let y = Arc::downgrade(&x);
let z = Arc::downgrade(&x);
let t = std::thread::spawn(move || {
// Weak pointer should be upgradable at this point.
let y = y.upgrade().unwrap();
assert_eq!(y.0, "hello");
});
assert_eq!(x.0, "hello");
t.join().unwrap();
// The data shouldn't be dropped yet,
// and the weak pointer should be upgradable.
assert_eq!(NUM_DROPS.load(Relaxed), 0);
assert!(z.upgrade().is_some());
drop(x);
// Now, the data should be dropped, and the
// weak pointer should no longer be upgradable.
assert_eq!(NUM_DROPS.load(Relaxed), 1);
assert!(z.upgrade().is_none());
}