Skip to content

Commit

Permalink
Merge pull request #55 from Tapjoy/update_aws_gem
Browse files Browse the repository at this point in the history
OPER-5336 Update AWS SDK library
  • Loading branch information
alanbrent authored Sep 14, 2020
2 parents d616ab0 + 4bdde71 commit 44844c5
Show file tree
Hide file tree
Showing 38 changed files with 751 additions and 521 deletions.
20 changes: 20 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,26 @@
**Cleanups**
- N/A

## [v4.0.0](https://github.com/Tapjoy/chore/tree/v4.0.0)

**Features**

- AWS SDK library has been updated to support AWS authentication via
[Web Federated Identity](https://docs.aws.amazon.com/sdk-for-javascript/v2/developer-guide/loading-browser-credentials-federated-id.html)
- The API has jumped ahead 2 major versions, so some of the internals (e.g.
[`Chore::UnitOfWork`](lib/chore/unit_of_work.rb)) had to be changed to accommodate its changes. In the end, however,
this update is designed to function as a drop-in replacement for earlier versions of Chore

**Fixed bugs**

- Some of the SQS specs were not actually testing output values

**Cleanups**

- Many more YARD docs
- Documented the release process
- Mild overhaul of the base README

## [v3.1.0](https://github.com/Tapjoy/chore/tree/v3.1.0) (2017-09-15)

**Features**
Expand Down
2 changes: 1 addition & 1 deletion LICENSE.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
The MIT License (MIT)

Copyright (c) 2014, Tapjoy, Inc.
Copyright (c) 2020, Tapjoy, Inc.

Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
Expand Down
323 changes: 170 additions & 153 deletions README.md

Large diffs are not rendered by default.

5 changes: 2 additions & 3 deletions chore-core.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,10 @@ Gem::Specification.new do |s|
s.summary = "Job processing... for the future!"

s.add_runtime_dependency(%q<json>, [">= 0"])
s.add_runtime_dependency(%q<aws-sdk-v1>, ["~> 1.56", ">= 1.56.0"])
s.add_runtime_dependency(%q<aws-sdk-sqs>, ["~> 1"])
s.add_runtime_dependency(%q<thread>, ["~> 0.1.3"])
s.add_runtime_dependency('get_process_mem', ["~> 0.2.0"])
s.add_development_dependency(%q<rspec>, ["~> 3.3.0"])
s.add_development_dependency(%q<rspec>, ["~> 3.3"])
s.add_development_dependency(%q<rdoc>, ["~> 3.12"])
s.add_development_dependency(%q<bundler>, [">= 0"])
end

5 changes: 2 additions & 3 deletions chore.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,9 @@ Gem::Specification.new do |s|
s.summary = "Job processing... for the future!"

s.add_runtime_dependency(%q<json>, [">= 0"])
s.add_runtime_dependency(%q<aws-sdk-v1>, ["~> 1.56", ">= 1.56.0"])
s.add_runtime_dependency(%q<aws-sdk-sqs>, ["~> 1"])
s.add_runtime_dependency(%q<thread>, ["~> 0.1.3"])
s.add_development_dependency(%q<rspec>, ["~> 2.12.0"])
s.add_development_dependency(%q<rspec>, ["~> 3.3"])
s.add_development_dependency(%q<rdoc>, ["~> 3.12"])
s.add_development_dependency(%q<bundler>, [">= 0"])
end

30 changes: 15 additions & 15 deletions docs/Delayed Jobs.md
Original file line number Diff line number Diff line change
@@ -1,28 +1,28 @@
# Delayed Jobs, Backoffs and SQS

We handle delays in SQS by changing the `VisiblityTimeout` of the in flight message. This operation is *additive*,
meaning if you have a Queue with a default `VisiblityTimeout` of 5 minutes, the message is processed then fails 1
minute after the message is sent, and the step function returns 60 seconds, the new `VisiblityTimeout` of the message
will be at 5 minutes.
We handle delays in SQS by changing the `VisibilityTimeout` of the in-flight message. This operation is *additive*,
meaning a queue has a default `VisibilityTimeout` of 5 minutes, the message is processed then fails 1 minute after the
message is sent, and the step function returns 60 seconds, the new `VisibilityTimeout` of the message will be at 5
(rather than 4) minutes.

This means small steps will not behave as you might expect, unless you have a `VisiblityTimeout` of 0. However, a
`VisiblityTimeout` of 0 would cause the message to never remain in flight and would likely break the entire world. You
want the default `VisiblityTimeout` to be as low as possible, but not low enough that it could be waiting in a Chore
batch when the timeout hits 0 (causing the job to process twice!).
This means small steps will not behave as one might expect, unless `VisibilityTimeout` is set to 0. However, a
`VisibilityTimeout` of 0 would cause the message to never remain in flight and would likely break the entire world
when every worker tries to work a copy of the same job. The default `VisibilityTimeout` should be set as low as possible
given the expected execution duration of the job code, but not so low that the message might be still be waiting to be
scheduled on a Chore worker when the timeout hits 0 (causing the job to process twice!).

Something else to consider is the `RetentionPeriod` of a queue. If the `RetentionPeriod` is 1 hour, and an initial
message continues to retry through that hour, it will suddenly disappear! That is to say: the `Retentionperiod` starts
message continues to retry through that hour, it will suddenly disappear! That is to say: the `RetentionPeriod` starts
counting down from the moment the message is *first* sent, and does not reset.

And finally, if the queue has a `DeliveryDelay` configured any delay applied by the backoff function will be *in
addition* to the `DeliveryDelay` when the message is removed from its "in flight" status.
Finally, if the queue has a `DeliveryDelay` configured any delay applied by the backoff function will be *in addition*
to the `DeliveryDelay` when the message is removed from its "in flight" status.

All of this should be taken into consideration when using this feature. In general, any queue requiring this will likely
need to run as its own Facet and not share workers with other queues.

### Further Reading

* [Expanded Documentation](docs/Delayed Jobs.md)
* [SQS VisiblityTimeout](http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/AboutVT.html)
* [SQS Delay Queues](http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-delay-queues.html)
* [SQS Message Lifecycle](http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/MessageLifecycle.html)
* [SQS Message Lifecycle](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-basic-architecture.html)
* [SQS VisibilityTimeout](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html)
* [SQS Delay Queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-delay-queues.html)
39 changes: 39 additions & 0 deletions docs/Releasing Chore.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# How To Release Chore

## Non-Tapjoy Contributors

1. Open a PR against the main branch (currently `master`) with the changes in it
- make sure to update the [version](../lib/chore/version.rb)!

1. Tag (at least) a Tapjoy engineer for review when the PR is ready
(see [Contributors](https://github.com/Tapjoy/chore/graphs/contributors) page)

1. Assuming the PR is accepted, a Tapjoy engineer will cut a release with the changes.

## Tapjoy Engineers

Once a PR has been reviewed and accepted for release

1. Merge the PR into the main branch

1. [Create a new GitHub Release](https://github.com/Tapjoy/chore/releases/new) based on the updated main branch and tag
it with the appropriate version (e.g. v4.0.0). Make sure to provide a useful title (PR title, for example) and
description.

A new release should subsequently show up in the GitHub [Releases](https://github.com/Tapjoy/chore/releases) page

1. Generate and publish YARD documentation

```
bundle exec rake yard
bundle exec rake yard:publish
```
The [Chore Github Pages](https://tapjoy.github.io/chore) site should be updated with the updated documentation.
1. Build and publish the new gem to RubyGems
```
gem build chore-core.gemspec
gem push chore-core-${GEM_VERSION}.gem
```
20 changes: 20 additions & 0 deletions lib/chore.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,24 @@ def self.logger
end
end

def self.log_level_to_sym
return self.config[:log_level] if self.config[:log_level].is_a?(Symbol)
case self.config[:log_level]
when 0
:debug
when 1
:info
when 2
:warn
when 3
:error
when 4
:fatal
else
:unknown
end
end

# Reopens any open files. This will match any logfile that was opened by Chore,
# Rails, or any other library.
def self.reopen_logs
Expand Down Expand Up @@ -218,6 +236,8 @@ def self.configuring=(value)
end

# List of queue_names as configured via Chore::Job including their prefix, if set.
#
# @return [Array<String>]
def self.prefixed_queue_names
Chore::Job.job_classes.collect {|klass| c = constantize(klass); c.prefixed_queue_name}
end
Expand Down
3 changes: 1 addition & 2 deletions lib/chore/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ def parse(args=ARGV) #:nodoc:
validate_strategy!
end


private

def setup_options #:nodoc:
register_option "queues", "-q", "--queues QUEUE1,QUEUE2", "Names of queues to process (default: all known)" do |arg|
# This will remove duplicates. We ultimately force this to be a Set further below
Expand Down Expand Up @@ -289,4 +289,3 @@ def validate_strategy!
end
end
end

2 changes: 1 addition & 1 deletion lib/chore/configuration.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module Chore
# Wrapper around an OpenStruct to define configuration data
# (TODO): Add required opts, and validate that they're set
# TODO: Add required opts, and validate that they're set
class Configuration < OpenStruct
# Helper method to make merging Hashes into OpenStructs easier
def merge_hash(hsh={})
Expand Down
50 changes: 41 additions & 9 deletions lib/chore/consumer.rb
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
module Chore
# Raised when Chore is booting up, but encounters a set of configuration that is impossible to boot from. Typically
# you'll find additional information around the cause of the exception by examining the logfiles
# you'll find additional information around the cause of the exception by examining the logfiles.
# You can raise this exception if your queue is in a terrible state and must shut down.
class TerribleMistake < Exception
# You can raise this exception if your queue is in a terrible state and must shut down
end

# Base class for a Chore Consumer. Provides the basic interface to adhere to for building custom
# Chore Consumers.
# Base class for a Chore Consumer. Provides the interface that a Chore::Consumer implementation should adhere to.
class Consumer

attr_accessor :queue_name

# @param [String] queue_name Name of queue to be consumed from
# @param [Hash] opts
def initialize(queue_name, opts={})
@queue_name = queue_name
@running = true
Expand All @@ -24,18 +25,25 @@ def self.reset_connection!
# Consume takes a block with an arity of two. The two params are
# |message_id,message_body| where message_id is any object that the
# consumer will need to be able to act on a message later (reject, complete, etc)
def consume(&block)
#
# @param [Block] &handler Message handler, used by the calling context (worker) to create & assigns a UnitOfWork
def consume(&handler)
raise NotImplementedError
end

# Reject should put a message back on a queue to be processed again later. It takes
# a message_id as returned via consume.
#
# @param [String] message_id Unique ID of the message
def reject(message_id)
raise NotImplementedError
end

# Complete should mark a message as finished. It takes a message_id as returned via consume
def complete(message_id)
# Complete should mark a message as finished.
#
# @param [String] message_id Unique ID of the message
# @param [Hash] receipt_handle Unique ID of the consuming transaction in non-filesystem implementations
def complete(message_id, receipt_handle)
raise NotImplementedError
end

Expand All @@ -45,23 +53,47 @@ def stop
end

# Returns true if the Consumer is currently running
#
# @return [TrueClass, FalseClass]
def running?
@running
end

# returns up to n work
# Returns up to n work
#
# @param n
def provide_work(n)
raise NotImplementedError
end

# now, given an arbitrary key and klass, have we seen the key already?
# Determine whether or not we have already seen this message
#
# @param [String] dedupe_key
# @param [Class] klass
# @param [Integer] queue_timeout
#
# @return [TrueClass, FalseClass]
def duplicate_message?(dedupe_key, klass, queue_timeout)
dupe_detector.found_duplicate?(:id=>dedupe_key, :queue=>klass.to_s, :visibility_timeout=>queue_timeout)
end

# Instance of duplicate detection implementation class
#
# @return [DuplicateDetector]
def dupe_detector
@dupes ||= DuplicateDetector.new({:servers => Chore.config.dedupe_servers,
:dupe_on_cache_failure => false})
end

private

# Gets messages from queue implementation and invokes the provided block over each one. Afterwards, the :on_fetch
# hook will be invoked per message. This block call provides data necessary for the worker (calling context) to
# populate a UnitOfWork struct.
#
# @param [Block] &handler Message handler, passed along by #consume
def handle_messages(&handler)
raise NotImplementedError
end
end
end
2 changes: 2 additions & 0 deletions lib/chore/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ def job_hash(job_params)
end

# The name of the configured queue, combined with an optional prefix
#
# @return [String]
def prefixed_queue_name
"#{Chore.config.queue_prefix}#{self.options[:name]}"
end
Expand Down
20 changes: 18 additions & 2 deletions lib/chore/publisher.rb
Original file line number Diff line number Diff line change
@@ -1,26 +1,42 @@
module Chore
# Base class for Chore Publishers. Provides the bare interface one needs to adhere to when writing custom publishers
# Base class for a Chore Publisher. Provides the interface that a Chore::Publisher implementation should adhere to.
class Publisher
DEFAULT_OPTIONS = { :encoder => Encoder::JsonEncoder }

attr_accessor :options

# @param [Hash] opts
def initialize(opts={})
self.options = DEFAULT_OPTIONS.merge(opts)
end

# Publishes the provided +job+ to the queue identified by the +queue_name+. Not designed to be used directly, this
# method ferries to the publish method on an instance of your configured Publisher.
#
# @param [String] queue_name Name of queue to be consumed from
# @param [Hash] job Job instance definition, will be encoded to JSON
def self.publish(queue_name,job)
self.new.publish(queue_name,job)
end

# Raises a NotImplementedError. This method should be overridden in your descendent, custom publisher class
# Publishes a message to queue
#
# @param [String] queue_name Name of the SQS queue
# @param [Hash] job Job instance definition, will be encoded to JSON
def publish(queue_name,job)
raise NotImplementedError
end

# Sets a flag that instructs the publisher to reset the connection the next time it's used.
# Should be overriden in publishers (but is not required)
def self.reset_connection!
end

protected

# Encodes the job class to format provided by endoder implementation
#
# @param [Any] job
def encode_job(job)
options[:encoder].encode(job)
end
Expand Down
Loading

0 comments on commit 44844c5

Please sign in to comment.