-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Version3 package refactor #54
Open
jasonyunicorn
wants to merge
110
commits into
master
Choose a base branch
from
version3_package_refactor
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
General: * Factored out a logging and utils package in attempt to reduce circular dependencies. Listener: * Pulled out non-shard operations from listener (CreateStream, DeleteStream, etc). This is primarily because it is a much safer to have the shard set and immutable at listener creation. Operations like SetShard in previous implementation were dangerous if the shard iterators have already been retrieved. Non-shard operations are now moved to the Kinetic object. * Use an embedded struct (listenerConfig) to share common fields in Listener and its Config. * Added a FromKinetic to Listener's Config to import any AWS/relevant config from an existing Kinetic object. * Fixed the logLevel implementationt to be a bit more sane. * Separated out testing of Listener's Config to config_test.go Added Kinetic: * Added a Kinetic object that can be used to create Kinesis streams and Firehose Delivery Streams. Note that since the APIs for Kinesis and Firehose differ, they can co-exist in Kinetic comfortably. General Notes: The new Kinetic struct is completely optional, but may be useful in situations where you do not know what your ShardId is. For example, you may be spawning off multiple Listeners to handle all shards, or you may have an external mechanism to determine which ShardId to listen to. The Kinetic object can be instantiated and the GetShards method can be used to obtain a list of all shards. Once you have figured out the ShardId, you can construct a Listener object using: import "github.com/rewardStyle/kinetic" import "github.com/rewardStyle/kinetic/listener" k := kinetic.New(kinetic.NewConfig(). WithCredentials(accessKey, secretKe, securityToken). WithRegion("us-east-1")) l := listener.NewListener( listener.NewConfig(stream, shard). FromKinetic(k)) This allows you to use the same config from your Kinetic object to instantiate the Listener. Alternatively, you can opt to create a Listener directly: l := listener.NewListener( listener.NewConfig(stream, shard). WithCredentials(accessKey, secretKey, securityToken). WithRegion("us-east")) I'm not very satisfied with the amount of code duplication between the Listener's Config and Kinetic's Config. But, with the prototypal pattern With*, I could not figure out an easy way to maintain the chaining of the Config using embedded structs. Furthermore, I'm not to thrilled at the dependency implementation. I think it would be easier to allow something like this: import "github.com/rewardStyle/kinetic" k := kinetic.New(kinetic.NewConfig(). WithCredentials(accessKey, secretKe, securityToken). WithRegion("us-east-1")) l := k.NewListener(stream, shard) Where: func (k *Kinetic) NewListener(stream, shard string) (*Listener, error) { return listener.NewListener( listener.NewConfig(stream, shard). FromKinetic(k)) } But, this would require the kinetic package to import the listener package. Unfortunately, the listener package also needs to import the kinetic package. Moving logging and utils out to their separate packages has helped minimize the circular dependencies, but the listener test cases still rely directly on the kinetic package. Finally, the tests do require unexported fields. Despite that being a smell, I am torn between several forces: * Several of the unexported variables carry important state that really should not be tampered with. * I don't like the idea of adding a bunch of accessors for internal state variables simply for the purpose of testing. Furthermore, this actually just makes the public API more unstable, as it couples the API to the internal implementation of the packages. * The tests really need access to the unexported fields. TODOS: * We need to implement checkpointing such that we don't always start at TRIM_HORIZON. This would probably be done via dependency injection of an interface, similar to the StatsListener. * We need to add support for merging and splitting straem. At minimum, the Kinetic library should provide a mechanism for the calling application to decide what to do when a stream closes.
* Use a simpler (more DRY) method for configuring Kinetic and Listener objects. TODO: * There should probably be a recover() inside the New functions to handle any panics inside the supplied configuration function. * Should re-evaluate whether embedding should be done by pointer. Could the caller retain a pointer to the structs and therefore gain access to internal state? Do we care? * Should re-evaluate whether much of the config should be exported or unexported. I've been told recently that exported/unexported isn't really about encapsulation, but rather more about stable vs unstable API. I'm not 100% convinced.
* There is probably a race condition in one of the tests. I've disabled it for now. It may not be worth adding complexity to fix the test.
- Use a timeout (bleh!) of 10s and a polling rate of 1s to give consume a chance to see the last planet.
… Introduced first draft of KclReader. Code cleanup based on code review.
…s processRecord message
Version3 listener unstable
… kinesis stream using the kinetic object with a producer and a listener
…eckpointer. Removed node expirary on checkpointer. Added checkpoint stats in consumer's stats collector. Added periodic update of checkpointer size in KclReader.
…t (instead of a binary search tree). Modified markDone so delete elements from the list upon forming a chain of sequence numbers marked done in order to address the potential for unchecked memory growth. Updated all unit tests to reflect the new expected behavior.
… the main (public) objects instead of the respective (private) option configuration structs.
…ency by adding a new concurrency field to the kinesisReaderOptions struct. Also added a new function method option to be able to adjust this parameter.
…ical bug in the producer's doWork exit condition. Added read response timeout to FirehoseWriter.
… Simplified the GetRecord / GetRecords API to only return an error (instead of returning count and size as well). Updated documentation on the Consumer / KinesisReader around rate limiting.
…th the count/size rate limiter.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
This is the one. Release candidate v3.0.0-rc1