-
Notifications
You must be signed in to change notification settings - Fork 361
What is EventMachine and how we are using it
Disclaimer: This wiki talks about DEAs, but we no longer support DEAs. The examples in this doc are outdated, but we are not going to update them because we are looking into potentially moving off of EventMachine now that we don't need to use it. https://www.pivotaltracker.com/story/show/148164693
EventMachine is a gem that allows us to handle large number of concurrent requests.
EventMachine is designed to simultaneously meet two key needs:
Extremely high scalability, performance and stability for the most demanding production environments. An API that eliminates the complexities of high-performance threaded network programming, allowing engineers to concentrate on their application logic.
Example usages of EM should have a run
wrapper, and inside you can call EM methods such as the one shown below:
EventMachine.run {
EventMachine.start_server "127.0.0.1", 8081, EchoServer
}
We have two general usages of EventMachine.
In the first usage of EventMachine, we run it in a new thread that is concurrent with the main thread. This is necessary in the context of a Rails application, because otherwise EM will take over your entire app if it is not on its own thread. Putting EM on its own thread allows the EventMachine to poll for new events to react to while allowing the main process to run. This is used in several places, such as the rake db
, rake clock
, rake buildpack
, and rake jobs
tasks. For example, if the DEA client is called in one of these tasks, then EM will pick it up the next time it polls for new events.
lib/cloud_controller/background_job_environment.rb:
def setup_environment
Thread.new do
EM.run do
message_bus = MessageBus::Configurer.new(
servers: @config[:message_bus_servers],
logger: Steno.logger('cc.message_bus')).go
# The AppObserver need no knowledge of the DEA or stager pools
# so we are passing in no-op objects for these arguments
no_op_dea_pool = Object.new
runners = VCAP::CloudController::Runners.new(@config, message_bus, no_op_dea_pool)
CloudController::DependencyLocator.instance.register(:runners, runners)
stagers = VCAP::CloudController::Stagers.new(@config, message_bus, no_op_dea_pool)
CloudController::DependencyLocator.instance.register(:stagers, stagers)
VCAP::CloudController::AppObserver.configure(stagers, runners)
blobstore_url_generator = CloudController::DependencyLocator.instance.blobstore_url_generator
VCAP::CloudController::Dea::Client.configure(@config, message_bus, no_op_dea_pool, blobstore_url_generator)
end
end
if block_given?
yield
stop
end
end
From this snippet, we see that we are initializing objects in the setup block that is passed into the EM.run
function. In this EM.run
setup block, we are creating a message bus (which uses NATS, which in turn relies on EventMachine. This message bus can only execute code in the EM context, because it needs a EM reactor loop to be listening in order to schedule its IO operations.
The other objects here rely on the message bus, and so are included in the setup. After this setup is done, the reactor loop takes over the newly-spawned EventMachine thread and polls for events.
In the background_job_environment
snippet, there is a block_given?
section that stops EM. There are some jobs that run a block--that they pass to this function--as a one off task, and want to kill EM and its thread afterwards. Other jobs keep EM as a long-running thread, and do not use this block construct to stop the EventMachine.
There is a potential uninvestigated race condition between the block that is being yielded to and the setup block that is being passed to EM.run.
While this EventMachine reactor loop is running, objects can make processing requests wrapped in EM functions such as defer, schedule
. Below is an example where our DEA client makes requests that implement EM-compliant client behavior for communicating staging information.
lib/cloud_controller/dea/app_stager_task.rb:
module VCAP::CloudController
module Dea
class AppStagerTask
def stage_with_nats(msg)
subject = "staging.#{@stager_id}.start"
@multi_message_bus_request = MultiResponseMessageBusRequest.new(@message_bus, subject)
staging_result = EM.schedule_sync do |promise|
# First response is blocking stage_app.
@multi_message_bus_request.on_response(staging_timeout) do |response, error|
logger.info('staging.first-response', app_guid: @app.guid, response: response, error: error)
handle_first_response(response, error, promise)
end
# Second message is received after app staging finished and
# droplet was uploaded to the CC.
# Second response does NOT block stage_app
@multi_message_bus_request.on_response(staging_timeout) do |response, error|
logger.info('staging.second-response', app_guid: @app.guid, response: response, error: error)
handle_second_response(response, error)
end
@multi_message_bus_request.request(msg)
end
staging_result
end
def process_nats_response(response)
# Defer potentially expensive operation
# to avoid executing on reactor thread
EM.defer do
begin
staging_nats_completion(StagingResponse.new(response))
rescue => e
Loggregator.emit_error(@app.guid, "Encountered error: #{e.message}")
logger.error "Encountered error on stager with id #{@stager_id}: #{e}\n#{e.backtrace.join("\n")}"
end
end
end
end
end
end
The DEA app stager uses message bus and expects itself to be inside an EventMachine existent Ruby process. It also directly registers events with the EventMachine using EM.defer
.
We also start an EM.run
loop when we start our CloudController in Runner.
Below is where we have the EM loop that starts our thin_server.
lib/clound_controller/runner.rb:
EM.run do
begin
message_bus = MessageBus::Configurer.new(servers: @config[:message_bus_servers], logger: logger).go
start_cloud_controller(message_bus)
Dea::SubSystem.setup!(message_bus)
VCAP::Component.varz.threadsafe! # initialize varz
request_metrics = VCAP::CloudController::Metrics::RequestMetrics.new(statsd_client)
gather_periodic_metrics(message_bus)
builder = RackAppBuilder.new
app = builder.build(@config, request_metrics)
start_thin_server(app)
rescue => e
logger.error "Encountered error: #{e}\n#{e.backtrace.join("\n")}"
raise e
end
end
One interesting element of this EM.run
block is that we are starting a Thin server inside of the setup block that is being passed to the EventMachine. This is interesting because Thin runs EM itself. We think that the EM has this outer block to setup certain objects which are dependent on being in a context with a running EventMachine to work--in this case, the message bus and request_metrics. For example, the request_metrics sets updates with the PeriodicUpdater, which registers events with the EventMachine reactor loop to occur periodically.
One interesting thing when running Thin inside of an EM.run
block, the Thin server does not own the EM loop and cannot stop it (see this issue and this commit).
We noticed that we don't have EM loop methods being called throughout CloudController. Our only visible usages of it lies mostly in these two files. These two calls manage to wrap most of the code that we run. However, it doesn't seem like we are taking full advantage of EM's request handling capabilities because we don't have EM defers and callbacks in the rest of the codebase.
The EM loop is a global singleton and can only be started once. All EM calls for the entire ruby process are dispatched to the same EM.
We use message bus which uses NATS behind the scenes. The message bus uses EM to perform its operations.
Reference links:
https://github.com/eventmachine/eventmachine
http://code.macournoyer.com/thin/
-
Pipelines
-
Contributing
- Tips and Tricks
- Cloud Controller API v3 Style Guide
- Playbooks
- Development configuration
- Testing
-
Architectural Details
-
CC Resources
- Apps
- Audit Events
- Deployments
- Labels
- Services
- Sidecars
-
Dependencies
-
Troubleshooting
- Ruby Console Script to Find Fields that Cannot Be Decrypted
- Logging database queries in unit tests
- Inspecting blobstore cc resources and cc packages(webdav)
- How to Use USR1 Trap for Diagnostics
- How to Perf: Finding and Fixing Bottlenecks
- How to get access to mysql database
- How To Get a Ruby Heap Dumps & GC Stats from CC
- How to curl v4 internal endpoints with mtls
- How to access Bosh Director console and restore an outdated Cloud Config
- Analyzing Cloud Controller's NGINX logs using the toplogs script
-
k8s
-
Archive