18
18
19
19
#include < list>
20
20
21
+ #include " app/memory/shared_ptr.h"
21
22
#include " app/src/log.h"
22
23
#include " app/src/mutex.h"
23
24
#include " app/src/semaphore.h"
@@ -32,7 +33,7 @@ namespace callback {
32
33
33
34
class CallbackEntry ;
34
35
35
- class CallbackQueue : public std ::list<CallbackEntry* > {
36
+ class CallbackQueue : public std ::list<SharedPtr< CallbackEntry> > {
36
37
public:
37
38
CallbackQueue () {}
38
39
~CallbackQueue () {}
@@ -51,7 +52,7 @@ class CallbackEntry {
51
52
// callback_mutex_ is used to enforce a critical section for callback
52
53
// execution and destruction.
53
54
CallbackEntry (Callback* callback, Mutex* callback_mutex)
54
- : callback_(callback), mutex_(callback_mutex) {}
55
+ : callback_(callback), mutex_(callback_mutex), executing_( false ) {}
55
56
56
57
// Destroy the callback. This blocks if the callback is currently
57
58
// executing.
@@ -61,35 +62,47 @@ class CallbackEntry {
61
62
// Returns true if a callback was associated with this entry and was executed,
62
63
// false otherwise.
63
64
bool Execute () {
64
- bool executed = false ;
65
- MutexLock lock (*mutex_);
66
- if (callback_) {
67
- callback_->Run ();
68
- // Note: The implementation of BlockingCallback below relies on the
69
- // callback being disabled after being run. If that changes, please
70
- // make sure to also update BlockingCallback.
71
- DisableCallback ();
72
- executed = true ;
65
+ {
66
+ MutexLock lock (*mutex_);
67
+ if (!callback_) return false ;
68
+ executing_ = true ;
73
69
}
74
- return executed;
70
+
71
+ callback_->Run ();
72
+
73
+ {
74
+ MutexLock lock (*mutex_);
75
+ executing_ = false ;
76
+ }
77
+
78
+ // Note: The implementation of BlockingCallback below relies on the
79
+ // callback being disabled after being run. If that changes, please
80
+ // make sure to also update BlockingCallback.
81
+ DisableCallback ();
82
+
83
+ return true ;
75
84
}
76
85
77
86
// Remove the callback method from this entry.
78
87
bool DisableCallback () {
79
- MutexLock lock (*mutex_);
80
- if (callback_) {
81
- delete callback_;
88
+ Callback* callback_to_delete = nullptr ;
89
+ {
90
+ MutexLock lock (*mutex_);
91
+ if (executing_ || !callback_) return false ;
92
+ callback_to_delete = callback_;
82
93
callback_ = nullptr ;
83
- return true ;
84
94
}
85
- return false ;
95
+ delete callback_to_delete;
96
+ return true ;
86
97
}
87
98
88
99
private:
89
100
// Callback to call from PollCallbacks().
90
101
Callback* callback_;
91
- // Mutex that is held when modifying callback_.
102
+ // Mutex that is held when modifying callback_ and executing_ .
92
103
Mutex* mutex_;
104
+ // A flag set to true when callback_ is about to be called.
105
+ bool executing_;
93
106
};
94
107
95
108
// Dispatches a queue of callbacks.
@@ -106,18 +119,18 @@ class CallbackDispatcher {
106
119
remaining_callbacks);
107
120
}
108
121
while (!queue_.empty ()) {
109
- delete queue_.back ();
122
+ queue_.back (). reset ();
110
123
queue_.pop_back ();
111
124
}
112
125
}
113
126
114
127
// Add a callback to the dispatch queue returning a reference
115
128
// to the entry which can be optionally be removed prior to dispatch.
116
129
void * AddCallback (Callback* callback) {
117
- CallbackEntry* entry = new CallbackEntry (callback, &execution_mutex_);
130
+ auto entry = MakeShared< CallbackEntry> (callback, &execution_mutex_);
118
131
MutexLock lock (*queue_.mutex ());
119
132
queue_.push_back (entry);
120
- return entry;
133
+ return entry. get () ;
121
134
}
122
135
123
136
// Remove the callback reference from the specified entry.
@@ -137,16 +150,19 @@ class CallbackDispatcher {
137
150
int DispatchCallbacks () {
138
151
int dispatched = 0 ;
139
152
Mutex* queue_mutex = queue_.mutex ();
140
- MutexLock lock (* queue_mutex);
153
+ queue_mutex-> Acquire ( );
141
154
while (!queue_.empty ()) {
142
- CallbackEntry* callback_entry = queue_.front ();
155
+ // Make a copy of the SharedPtr in case FlushCallbacks() is called
156
+ // currently.
157
+ SharedPtr<CallbackEntry> callback_entry = queue_.front ();
143
158
queue_.pop_front ();
144
159
queue_mutex->Release ();
145
160
callback_entry->Execute ();
146
161
dispatched++;
147
162
queue_mutex->Acquire ();
148
- delete callback_entry;
163
+ callback_entry. reset () ;
149
164
}
165
+ queue_mutex->Release ();
150
166
return dispatched;
151
167
}
152
168
@@ -155,7 +171,7 @@ class CallbackDispatcher {
155
171
int flushed = 0 ;
156
172
MutexLock lock (*queue_.mutex ());
157
173
while (!queue_.empty ()) {
158
- delete queue_.front ();
174
+ queue_.front (). reset ();
159
175
queue_.pop_front ();
160
176
flushed++;
161
177
}
@@ -299,7 +315,7 @@ void RemoveCallback(void* callback_reference) {
299
315
// remove the CallbackEntry from the queue so we don't need an additional
300
316
// Terminate() here to decrement the reference count that was added by
301
317
// AddCallback().
302
- static_cast <CallbackEntry*>(callback_reference) ->DisableCallback ();
318
+ g_callback_dispatcher ->DisableCallback (callback_reference );
303
319
Terminate (false );
304
320
}
305
321
}
0 commit comments