Skip to content
This repository has been archived by the owner on Jun 23, 2023. It is now read-only.

Commit

Permalink
Merge pull request #19 from heetch/bootstap-documentation
Browse files Browse the repository at this point in the history
Bootstrap documentation
  • Loading branch information
tealeg authored Nov 13, 2018
2 parents 4c461b5 + d1e7c9a commit fd06df4
Show file tree
Hide file tree
Showing 11 changed files with 212 additions and 17 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,10 @@ Felice is very much a work in progress. We expect to move forward quickly, but
## Why "Felice"?
Felice Bauer was, at one time, Franz Kafka's fiance. He wrote her many messages, which she faithfuly kept, and later published.

## Where should I start?
If you wish to send messages via Kafka, you should start by reading
the documentation for the `producer` package. If you wish to consume
messages from Kafka, you should start by reading the documentation for
the `consumer` package. The `message` package contains the `Message` type that is
shared by both `consumer` and `producer` code.

12 changes: 10 additions & 2 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ type clusterConsumer interface {
Close() error
}

// Consumer is a Kafka consumer.
// Consumer is the structure used to consume messages from Kafka.
// Having constructed a Conusmer you should use its Handle method to
// register per-topic handlers and, finally, call it's Serve method to
// begin consuming messages.
type Consumer struct {
RetryInterval time.Duration
Metrics MetricsReporter
Expand Down Expand Up @@ -81,7 +84,12 @@ func newClusterConfig(clientID string) *cluster.Config {
return c
}

// Serve runs the consumer and listens for new messages on the given topics.
// Serve runs the consumer and listens for new messages on the given
// topics. You must provide it with unique clientID and the address
// of one or more Kafka brokers. Serve will block until it is
// instructed to stop, which you can achieve by calling Consumer.Stop.
// When Serve terminates it will return an Error or nil to indicate
// that it excited without error.
func (c *Consumer) Serve(clientID string, addrs ...string) error {
c.setup()

Expand Down
2 changes: 1 addition & 1 deletion consumer/consumer_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ type testHandler struct {

// HandleMessage will keep a count of how many times it is called and,
// if a testCase is set on the testHandler, it will run it with the
// message that HandleMessage recieved, allowing us to make assertions
// message that HandleMessage received, allowing us to make assertions
// about the nature of that message.
func (h *testHandler) HandleMessage(m *message.Message) error {
h.CallCount++
Expand Down
55 changes: 55 additions & 0 deletions consumer/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Consumer is Felice's primary entrance point for receiving messages
// from a Kafka cluster.
//
// There is no special construction function for the Consumer
// structure as all of its public members are optional, and we shall
// discuss them below. Thus you construct a Consumer by the normal Go
// means:
//
// c := felice.Consumer{}
//
// Once you've constructed a consumer you must add message handlers to
// it. This is done by calling the Consumer.Handle method. Each time
// you call Handle you'll pass a topic name and a type that implements
// the handler.Handler interface. Their can only ever be one handler
// associated with a topic so, if you call Handle multiple times with
// the same topic, they will update the handler registered for the
// topic, and only the final one will count. A typical call to Handle
// looks like this:
//
// c.Handle("testmsg", handler.HandlerFunc(func(m *message.Message) error {
// // Do something of your choice here!
// return nil // .. or return an actual error.
// }))
//
// Once you've registered all your handlers you may call
// Consumer.Serve. Serve requires a client ID and a slice of strings,
// each of which is the address of a Kafka broker to attempt to
// communicate with. Serve will start a go routine for each partition
// to consume messages and pass them to their per-topic
// handlers. Serve itself will block until Consumer.Stop is called.
// When Serve terminates it will return an error, which will be nil
// under normal circumstances.
//
// Note that any calls to Consumer.Handle after
// Consumer.Serve has been called will have no effect.
//
// Tweaking the consumer
// --------------------------
// The first public member is the RetryInterval, a time.Duration that
// controls how long the Felice consumer will wait before trying to
// consume a message from Kafka that failed the first time around.
// The default value if 1 second.
//
// The second public member is Metrics. Metrics stores a type that
// implements the felice.MetricsReporter interface. If you provide an
// implementation, then it's Report function will be called every time
// a message is successfully handled. The Report function will
// receive a copy of the message.Message that was handled, along with
// a map[string]string containing metrics about the handling of the
// message. Currently we pass the following metrics: "attempts" (the
// number of attempts it took before the message was handled);
// "msgOffset" (the marked offset for the message on the topic); and
// "remainingOffset" (the difference between the high water mark and
// the current offset).
package consumer
4 changes: 2 additions & 2 deletions consumer/handler/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ func (h *Collection) Topics() []string {
}

// Set associates the given Handler to the given Topic within the
// collection. If a Handler was already associated with the Topic,
// then that association will be lost and replaced by the new one. It
// collection. If a Handler was already associated with the Topic,
// then that association will be lost and replaced by the new one. It
// is safe to use Set from concurrent code.
func (h *Collection) Set(topic string, handler Handler) {
h.Lock()
Expand Down
56 changes: 56 additions & 0 deletions consumer/handler/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// The handler package defines the mechanisms by which a received
// message can be handled, and how handlers themselves can be managed.
// The most obvious way to use the handler package is via the consumer
// package, but this isn't an absolute requirement. We do however
// require that the messages handled are felice's message.Message
// type.
//
// The Handler interface defines the signature for all felice
// Handlers. There are two common ways to comply with this interface.
// The first is simply to create a type with the HandleMessage
// function:
//
// type MyFooHandler struct { }
//
// func (mfh MyFooHandler) HandleMessage(msg *message.Message) error {
// fmt.Printf("%+v", *msg)
// }
//
// This approach has the advantage of not actually requiring you to
// import the handler package when defining handlers for use with the
// consumer.Consumer.
//
// The second approach is to cast a function to the HandlerFunc type
// defined in this package:
//
// h := handler.HandlerFunc(func(msg *message.Message) error {
// fmt.Printf("%+v", *msg)
// })
//
// Handlers can also be managed in Collections. The Collection struct
// will allow exactly one handler to be associated with each topic,
// and will, on demand return a list of all topics for which a handler
// has been registered. For example:
//
// col := handler.Collection{}
//
// col.Set("my-topic", handler.HandlerFunc(func(msg *message.Message) error) {
// fmt.Printf("Got message on my-topic: %+v", *msg)
// return nil
// })
//
// col.Set("your-topic", handler.HandlerFunc(func(msg *message.Message) error) {
// fmt.Printf("Got message on your-topic: %+v", msg)
// return nil
// })
//
// yourHandler, ok := col.Get("your-topic")
// if !ok {
// fmt.Println("Couldn't find a handler for your-topic")
// }
//
// for _, t := range col.Topics {
// fmt.Printf("We have a handler for: %s", t)
// }
//
package handler
8 changes: 5 additions & 3 deletions consumer/handler/func.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import (
"github.com/heetch/felice/message"
)

// A HandlerFunc is a function type that mimics the interface of the
// HandleMessage function in the Handler interface. If you cast a
// function to this type, it will comply with the Handler interface.
// A HandlerFunc is a function that supports the Handler interface.
// Calls to Handler.HandleMessage made against any function cast to
// this type will result in the function itself being called.
type HandlerFunc func(*message.Message) error

// HandleMessage implements the Handler interface by calling the
// HandlerFunc to which it is associated.
func (h HandlerFunc) HandleMessage(msg *message.Message) error {
return h(msg)
}
17 changes: 16 additions & 1 deletion consumer/handler/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,22 @@ package handler

import "github.com/heetch/felice/message"

// A Handler handles a Message.
// Handler is the interface for handling consumed messages. You can
// either add a compliant HandleMessage function to some type by hand,
// or make use of the HandlerFunc. Note that felice Handlers receive
// felice Message types.
type Handler interface {
// HandleMessage functions will receive a *message.Message
// type, and may do with it what they wish. However, some
// caution should be used when returning an error from a
// HandleMessage function. Errors are considered an
// indication that the message has not been handled. After
// waiting a respectable amount of time, the HandleMessage
// function will be called again with the same message. This
// will continue until on of the following conditions is met:
//
// - 1. HandleMessage returns nil, instead of an error.
// - 2. A system administrator intervenes.
// - 3. The Earth is consumed by the sun.
HandleMessage(*message.Message) error
}
27 changes: 27 additions & 0 deletions message/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// The message package contains the Message type. When using
// the felice Producer, Message will be the type you send.
// When using the felice Consumer, you will register handlers that
// receive the Message type.
//
// You can create a new Message by calling New:
//
// msg := New("my-topic", "simple string message")
//
// The value passed as the 2nd argument can be any Go type that can be
// marshalled with encoding/json. Two default headers are added to
// all Messages:
//
// - Message-Id : a universally unique ID for the message
// - Produced-At : the current time in the UTC timezone.
//
// New can also be passed zero, one or many additional Options. An
// Option is a function that receives a pointer to the Message and can
// modify it directly prior to it being returned by New. Two
// predefined Options exists in felice: Header and Key.
//
// For example, if you want to create a Message with a custom header
// field you could request it as follows:
//
// msg := New("my-topic", "cold potatoes ain't hot!", Header("subject", "potatoes"))
//
package message
25 changes: 20 additions & 5 deletions message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,23 @@ import (
uuid "github.com/satori/go.uuid"
)

// Message contains informations about the message to be sent to Kafka.
// Message represents a message to be sent via Kafka, or received from
// it. When using Felice's Consumer, any Handlers that you register
// will receive Messages as they're arguments. When using the Felice
// Producer, you will be sending Messages. When making a Message to
// be sent it is essential that you use the New function to do so.
type Message struct {
// Kafka topic.
// The Kafka topic this Message applies to.
Topic string

// If specified, messages with the same key will be sent to the same Kafka partition.
Key string

// Body of the Kafka message.
// Body of the Kafka message. For now this will always be a
// JSON marshaled form of whatever was passed to New.
Body []byte

// The time at which this Message was produced.
ProducedAt time.Time

// Partition where this publication was stored.
Expand All @@ -31,11 +37,20 @@ type Message struct {
// Headers of the message.
Headers map[string]string

// Unique id of the message.
// Unique ID of the message.
ID string
}

// New creates a new configured message.
// New creates a new Message, correctly configured for use with the
// felice Producer. You should not attempt to create Message types by
// hand (unless you really know what you're doing! - I'm just some
// documentation, not the police).
//
// Two headers will be added to all Messages:
//
// - Message-Id : a universally unique ID for the message
// - Produced-At : the current time in the UTC timezone.
//
func New(topic string, value interface{}, opts ...Option) (*Message, error) {
if topic == "" {
return nil, fmt.Errorf("messages require a non-empty topic")
Expand Down
16 changes: 13 additions & 3 deletions message/option.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,26 @@
package message

// Option is used to customize a message before sending it.
// Option is a function type that receives a pointer to a Message and
// modifies it in place. Options are intended to customize a message
// before sending it. You can do this either by passing them as
// parameters to the New function, or by calling them directly against
// a Message.
type Option func(*Message)

// Header option adds a header to the message.
// Header is an Option that adds a custom header to the message. You
// may pass as many Header options to New as you wish. If multiple
// Header's are defined for the same key, the value of the last one
// past to New will be the value that appears on the Message.
func Header(k, v string) Option {
return func(m *Message) {
m.Headers[k] = v
}
}

// Key option specifies a key for the message.
// Key is an Option that specifies a key for the message. You should
// only pass this once to the New function, but if you pass it multiple
// times, the value set by the final one you pass will be what is set
// on the Message when it is returned by New.
func Key(key string) Option {
return func(m *Message) {
m.Key = key
Expand Down

0 comments on commit fd06df4

Please sign in to comment.