Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
HannahShiSFB committed Apr 3, 2023
1 parent 3cc8a72 commit 22c9f36
Show file tree
Hide file tree
Showing 7 changed files with 23 additions and 18 deletions.
12 changes: 6 additions & 6 deletions src/core/lib/event_engine/cf_engine/cf_engine.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 The gRPC Authors
// Copyright 2023 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -39,7 +39,7 @@ struct CFEventEngine::Closure final : public EventEngine::Closure {
GRPC_EVENT_ENGINE_TRACE("CFEventEngine:%p executing callback:%s", engine,
HandleToString(handle).c_str());
{
grpc_core::MutexLock lock(&engine->mu_);
grpc_core::MutexLock lock(&engine->task_mu_);
engine->known_handles_.erase(handle);
}
cb();
Expand All @@ -52,7 +52,7 @@ CFEventEngine::CFEventEngine()

CFEventEngine::~CFEventEngine() {
{
grpc_core::MutexLock lock(&mu_);
grpc_core::MutexLock lock(&task_mu_);
if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) {
for (auto handle : known_handles_) {
gpr_log(GPR_ERROR,
Expand Down Expand Up @@ -98,7 +98,7 @@ CFEventEngine::ConnectionHandle CFEventEngine::Connect(

auto on_connect2 =
[that = std::static_pointer_cast<CFEventEngine>(shared_from_this()),
deadline_timer = std::move(deadline_timer), handle,
deadline_timer, handle,
on_connect = std::move(on_connect)](absl::Status status) mutable {
// best effort canceling deadline timer
that->Cancel(deadline_timer);
Expand Down Expand Up @@ -174,7 +174,7 @@ EventEngine::TaskHandle CFEventEngine::RunAfter(
}

bool CFEventEngine::Cancel(TaskHandle handle) {
grpc_core::MutexLock lock(&mu_);
grpc_core::MutexLock lock(&task_mu_);
if (!known_handles_.contains(handle)) return false;
auto* cd = reinterpret_cast<Closure*>(handle.keys[0]);
bool r = timer_manager_.TimerCancel(&cd->timer);
Expand All @@ -191,7 +191,7 @@ EventEngine::TaskHandle CFEventEngine::RunAfterInternal(
cd->engine = this;
EventEngine::TaskHandle handle{reinterpret_cast<intptr_t>(cd),
aba_token_.fetch_add(1)};
grpc_core::MutexLock lock(&mu_);
grpc_core::MutexLock lock(&task_mu_);
known_handles_.insert(handle);
cd->handle = handle;
GRPC_EVENT_ENGINE_TRACE("CFEventEngine:%p scheduling callback:%s", this,
Expand Down
6 changes: 3 additions & 3 deletions src/core/lib/event_engine/cf_engine/cf_engine.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 The gRPC Authors
// Copyright 2023 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -67,8 +67,8 @@ class CFEventEngine : public EventEngine,

bool CancelConnectInternal(ConnectionHandle handle, absl::Status status);

grpc_core::Mutex mu_;
TaskHandleSet known_handles_ ABSL_GUARDED_BY(mu_);
grpc_core::Mutex task_mu_;
TaskHandleSet known_handles_ ABSL_GUARDED_BY(task_mu_);
std::atomic<intptr_t> aba_token_{0};

grpc_core::Mutex conn_mu_;
Expand Down
12 changes: 7 additions & 5 deletions src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 The gRPC Authors
// Copyright 2023 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -25,6 +25,8 @@ namespace experimental {

namespace {

int kDefaultReadBufferSize = 8192;

absl::Status CFErrorToStatus(CFTypeUniqueRef<CFErrorRef> cf_error) {
CFErrorDomain cf_domain = CFErrorGetDomain((cf_error));
CFIndex code = CFErrorGetCode((cf_error));
Expand Down Expand Up @@ -152,6 +154,7 @@ void CFStreamEndpoint::Connect(

switch (type) {
case kCFStreamEventOpenCompleted:
// wait for write stream open completed to signal connection ready
break;
case kCFStreamEventHasBytesAvailable:
ABSL_FALLTHROUGH_INTENDED;
Expand Down Expand Up @@ -250,15 +253,14 @@ void CFStreamEndpoint::DoRead(absl::AnyInvocable<void(absl::Status)> on_read,
SliceBuffer* buffer) {
GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStreamEndpoint::DoRead, this: %p", this);

int buffer_size = 8192;
auto buffer_index =
buffer->AppendIndexed(Slice(memory_allocator_.MakeSlice(buffer_size)));
auto buffer_index = buffer->AppendIndexed(
Slice(memory_allocator_.MakeSlice(kDefaultReadBufferSize)));

CFIndex read_size = CFReadStreamRead(
cf_read_stream_,
internal::SliceCast<MutableSlice>(buffer->MutableSliceAt(buffer_index))
.begin(),
buffer_size);
kDefaultReadBufferSize);

if (read_size < 0) {
auto status = CFErrorToStatus(CFReadStreamCopyError(cf_read_stream_));
Expand Down
2 changes: 1 addition & 1 deletion src/core/lib/event_engine/cf_engine/cfstream_endpoint.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 The gRPC Authors
// Copyright 2023 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
5 changes: 4 additions & 1 deletion src/core/lib/event_engine/cf_engine/cftype_unique_ref.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 The gRPC Authors
// Copyright 2023 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -57,6 +57,9 @@ class CFTypeUniqueRef {
}

void reset(T other = nullptr) {
if (cf_type_ref_ == other) {
return;
}
T old = cf_type_ref_;
cf_type_ref_ = other;
if (old) {
Expand Down
2 changes: 1 addition & 1 deletion test/core/event_engine/cf/cf_engine_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,4 @@ int main(int argc, char** argv) {

#else // not GPR_APPLE
int main(int /* argc */, char** /* argv */) { return 0; }
#endif
#endif
2 changes: 1 addition & 1 deletion test/core/event_engine/test_suite/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ grpc_cc_test(
"no_linux",
"no_windows",
],
uses_polling = False,
uses_polling = True,
deps = [
"//src/core:cf_event_engine",
"//test/core/event_engine/test_suite/posix:oracle_event_engine_posix",
Expand Down

0 comments on commit 22c9f36

Please sign in to comment.