From 22c9f366cf075b23dbfccaae2f988b51c540253e Mon Sep 17 00:00:00 2001 From: Hannah Shi Date: Thu, 30 Mar 2023 16:29:51 -0700 Subject: [PATCH] address review comments --- src/core/lib/event_engine/cf_engine/cf_engine.cc | 12 ++++++------ src/core/lib/event_engine/cf_engine/cf_engine.h | 6 +++--- .../lib/event_engine/cf_engine/cfstream_endpoint.cc | 12 +++++++----- .../lib/event_engine/cf_engine/cfstream_endpoint.h | 2 +- .../lib/event_engine/cf_engine/cftype_unique_ref.h | 5 ++++- test/core/event_engine/cf/cf_engine_test.cc | 2 +- test/core/event_engine/test_suite/BUILD | 2 +- 7 files changed, 23 insertions(+), 18 deletions(-) diff --git a/src/core/lib/event_engine/cf_engine/cf_engine.cc b/src/core/lib/event_engine/cf_engine/cf_engine.cc index 57ad268a910de9..11d933e9223ac9 100644 --- a/src/core/lib/event_engine/cf_engine/cf_engine.cc +++ b/src/core/lib/event_engine/cf_engine/cf_engine.cc @@ -1,4 +1,4 @@ -// Copyright 2022 The gRPC Authors +// Copyright 2023 The gRPC Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -39,7 +39,7 @@ struct CFEventEngine::Closure final : public EventEngine::Closure { GRPC_EVENT_ENGINE_TRACE("CFEventEngine:%p executing callback:%s", engine, HandleToString(handle).c_str()); { - grpc_core::MutexLock lock(&engine->mu_); + grpc_core::MutexLock lock(&engine->task_mu_); engine->known_handles_.erase(handle); } cb(); @@ -52,7 +52,7 @@ CFEventEngine::CFEventEngine() CFEventEngine::~CFEventEngine() { { - grpc_core::MutexLock lock(&mu_); + grpc_core::MutexLock lock(&task_mu_); if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) { for (auto handle : known_handles_) { gpr_log(GPR_ERROR, @@ -98,7 +98,7 @@ CFEventEngine::ConnectionHandle CFEventEngine::Connect( auto on_connect2 = [that = std::static_pointer_cast(shared_from_this()), - deadline_timer = std::move(deadline_timer), handle, + deadline_timer, handle, on_connect = std::move(on_connect)](absl::Status status) mutable { // best effort canceling deadline timer that->Cancel(deadline_timer); @@ -174,7 +174,7 @@ EventEngine::TaskHandle CFEventEngine::RunAfter( } bool CFEventEngine::Cancel(TaskHandle handle) { - grpc_core::MutexLock lock(&mu_); + grpc_core::MutexLock lock(&task_mu_); if (!known_handles_.contains(handle)) return false; auto* cd = reinterpret_cast(handle.keys[0]); bool r = timer_manager_.TimerCancel(&cd->timer); @@ -191,7 +191,7 @@ EventEngine::TaskHandle CFEventEngine::RunAfterInternal( cd->engine = this; EventEngine::TaskHandle handle{reinterpret_cast(cd), aba_token_.fetch_add(1)}; - grpc_core::MutexLock lock(&mu_); + grpc_core::MutexLock lock(&task_mu_); known_handles_.insert(handle); cd->handle = handle; GRPC_EVENT_ENGINE_TRACE("CFEventEngine:%p scheduling callback:%s", this, diff --git a/src/core/lib/event_engine/cf_engine/cf_engine.h b/src/core/lib/event_engine/cf_engine/cf_engine.h index 3595484ed4588c..57aaa698afdd1d 100644 --- a/src/core/lib/event_engine/cf_engine/cf_engine.h +++ b/src/core/lib/event_engine/cf_engine/cf_engine.h @@ -1,4 +1,4 @@ -// Copyright 2022 The gRPC Authors +// Copyright 2023 The gRPC Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -67,8 +67,8 @@ class CFEventEngine : public EventEngine, bool CancelConnectInternal(ConnectionHandle handle, absl::Status status); - grpc_core::Mutex mu_; - TaskHandleSet known_handles_ ABSL_GUARDED_BY(mu_); + grpc_core::Mutex task_mu_; + TaskHandleSet known_handles_ ABSL_GUARDED_BY(task_mu_); std::atomic aba_token_{0}; grpc_core::Mutex conn_mu_; diff --git a/src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc b/src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc index 0f9a21412c1058..b036b1d2d2a025 100644 --- a/src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc +++ b/src/core/lib/event_engine/cf_engine/cfstream_endpoint.cc @@ -1,4 +1,4 @@ -// Copyright 2022 The gRPC Authors +// Copyright 2023 The gRPC Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -25,6 +25,8 @@ namespace experimental { namespace { +int kDefaultReadBufferSize = 8192; + absl::Status CFErrorToStatus(CFTypeUniqueRef cf_error) { CFErrorDomain cf_domain = CFErrorGetDomain((cf_error)); CFIndex code = CFErrorGetCode((cf_error)); @@ -152,6 +154,7 @@ void CFStreamEndpoint::Connect( switch (type) { case kCFStreamEventOpenCompleted: + // wait for write stream open completed to signal connection ready break; case kCFStreamEventHasBytesAvailable: ABSL_FALLTHROUGH_INTENDED; @@ -250,15 +253,14 @@ void CFStreamEndpoint::DoRead(absl::AnyInvocable on_read, SliceBuffer* buffer) { GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStreamEndpoint::DoRead, this: %p", this); - int buffer_size = 8192; - auto buffer_index = - buffer->AppendIndexed(Slice(memory_allocator_.MakeSlice(buffer_size))); + auto buffer_index = buffer->AppendIndexed( + Slice(memory_allocator_.MakeSlice(kDefaultReadBufferSize))); CFIndex read_size = CFReadStreamRead( cf_read_stream_, internal::SliceCast(buffer->MutableSliceAt(buffer_index)) .begin(), - buffer_size); + kDefaultReadBufferSize); if (read_size < 0) { auto status = CFErrorToStatus(CFReadStreamCopyError(cf_read_stream_)); diff --git a/src/core/lib/event_engine/cf_engine/cfstream_endpoint.h b/src/core/lib/event_engine/cf_engine/cfstream_endpoint.h index 1a6a40224099ca..aa5dc878ba2cca 100644 --- a/src/core/lib/event_engine/cf_engine/cfstream_endpoint.h +++ b/src/core/lib/event_engine/cf_engine/cfstream_endpoint.h @@ -1,4 +1,4 @@ -// Copyright 2022 The gRPC Authors +// Copyright 2023 The gRPC Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/src/core/lib/event_engine/cf_engine/cftype_unique_ref.h b/src/core/lib/event_engine/cf_engine/cftype_unique_ref.h index 7474906fb0e755..c7915170f62209 100644 --- a/src/core/lib/event_engine/cf_engine/cftype_unique_ref.h +++ b/src/core/lib/event_engine/cf_engine/cftype_unique_ref.h @@ -1,4 +1,4 @@ -// Copyright 2022 The gRPC Authors +// Copyright 2023 The gRPC Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -57,6 +57,9 @@ class CFTypeUniqueRef { } void reset(T other = nullptr) { + if (cf_type_ref_ == other) { + return; + } T old = cf_type_ref_; cf_type_ref_ = other; if (old) { diff --git a/test/core/event_engine/cf/cf_engine_test.cc b/test/core/event_engine/cf/cf_engine_test.cc index 23679f60bb1942..f50b40f82fa765 100644 --- a/test/core/event_engine/cf/cf_engine_test.cc +++ b/test/core/event_engine/cf/cf_engine_test.cc @@ -93,4 +93,4 @@ int main(int argc, char** argv) { #else // not GPR_APPLE int main(int /* argc */, char** /* argv */) { return 0; } -#endif \ No newline at end of file +#endif diff --git a/test/core/event_engine/test_suite/BUILD b/test/core/event_engine/test_suite/BUILD index f70302c4c965f6..8e8592896cf19b 100644 --- a/test/core/event_engine/test_suite/BUILD +++ b/test/core/event_engine/test_suite/BUILD @@ -99,7 +99,7 @@ grpc_cc_test( "no_linux", "no_windows", ], - uses_polling = False, + uses_polling = True, deps = [ "//src/core:cf_event_engine", "//test/core/event_engine/test_suite/posix:oracle_event_engine_posix",