Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mk 17 move le to scheduler #198

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
158 changes: 151 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ For issues https://github.com/mesos/kafka/issues
* [Passing multiple options](#passing-multiple-options)
* [Broker metrics](#broker-metrics)
* [Rolling restart](#rolling-restart)
* [Pinning controller to a broker](#pinning-controller-to-a-broker)


[Navigating the CLI](#navigating-the-cli)
Expand Down Expand Up @@ -130,8 +131,24 @@ Starting and using 1 broker

First let's start up and use 1 broker with the default settings. Further in the readme you can see how to change these from the defaults.

Each broker should be part of the cluster. Adding clusters will let you logically separate Kafka nodes. A Kafka-mesos cluster has a unique id
(specified on cluster creation) and Zookeeper connection string - a Zookeeper url (with optional chroot an the end) to store all Kafka metadata.
Zookeeper connection string specified on cluster level will be inherited by brokers as zk.connect config setting.

To add a cluster execute:

```
# ./kafka-mesos.sh cluster add kafka-cluster-1 --zk-connect master:2181/kafka-1
cluster added:
id: kafka-cluster-1
zk connection string: master:2181/kafka-1
```
# ./kafka-mesos.sh broker add 0
(You can also update and delete clusters, follow respective examples as with `broker` command line interface)

Now you can add a broker:

```
# ./kafka-mesos.sh broker add 0 --cluster kafka-cluster-1
broker added:
id: 0
active: false
Expand Down Expand Up @@ -305,10 +322,10 @@ current limit is 100Kb no matter how many lines being requested.

High Availability Scheduler State
-------------------------
The scheduler supports storing the cluster state in Zookeeper. It currently shares a znode within the mesos ensemble. To turn this on in properties
The scheduler supports storing the cluster state in Zookeeper. To turn this on in properties

```
clusterStorage=zk:/kafka-mesos
clusterStorage=zk:master:2181/kafka-mesos
```

Failed Broker Recovery
Expand Down Expand Up @@ -518,9 +535,136 @@ or for `start`
Error: broker 1 timeout on start
```

Pinning controller to a broker
-----------------------------

With cluster level option `controller` you can pin a controller to a particular broker. This means only defined
broker will be eligible for leader election and thus will always win in Zookeeper leader election rounds. Whenever controller
is (re-)elected the chosen broker, if alive, will become a controller.

Note, that everything else in terms of Kafka algorithms and procedures remains the same - if you pin a controller to a broker that is
**not** active you Kafka cluster may not operate correctly, in essence, all routines involving controller will be broken
(e.g. adding a new topic - brokers simply will not receive metadata update from the controller if it is not alive).

To get this feature in your Kafka cluster you need to patch and build Kafka distribution first. The patch is stored [here](https://issues.apache.org/jira/browse/KAFKA-2310)
and available for Kafka 0.8.1, 0.8.2 and 0.9. To apply a patch, e.g. to branch 0.8.2:

```
# cd path-to-cloned-kafka
# git checkout 0.8.2
# git am KAFKA-2310_0.8.2.patch
```

To build Kafka distribution:

```
# cd path-to-patched-kafka && ./gradlew releaseTarGz
```

To enable this functionality for particular Kafka cluster in Kafka-mesos:

```
# ./kafka-mesos.sh cluster add c1 --zk-connect master:2181 --controller 1
cluster added:
id: c1
zk connection string: master:2181
controller: 1
```

Now you can add brokers to the cluster but only broker with id `1` will become a controller.

Also, when a cluster has a pinned controller, default topic creation logic (`topic add`) is slightly different. In case you don't
specify `broker` parameter when creating a topic, the broker-controller will be removed from the pool of available brokers -
that is no partitions will be assigned to that broker:

```
# ./kafka-mesos.sh topic add t1 --cluster c1 --partitions 5
topic added:
name: t1
partitions: 0:[0], 1:[0], 2:[0], 3:[0], 4:[0]
```

To get an even partition assignment use `broker` option:

```
# ./kafka-mesos.sh topic add t2 --cluster c1 --partitions 5 --broker "*"
topic added:
name: t2
partitions: 0:[0], 1:[1], 2:[0], 3:[1], 4:[0]
```

Navigating the CLI
==================

Adding a cluster
-------------------------------

```
# ./kafka-mesos.sh help cluster add
Add cluster
Usage: cluster add <cluster-id> [options]

Option Description
------ -----------
--zk-connect REQUIRED. Connection string to Kafka Zookeeper cluster. E.g.: 192.168.0.1:2181,192.168.0.2:
2181/kafka1

Generic Options
Option Description
------ -----------
--api Api url. Example: http://master:7000
```

Updating a cluster
-------------------------------

```
# ./kafka-mesos.sh help cluster update
Update cluster
Usage: cluster update <cluster-id> [options]

Option Description
------ -----------
--zk-connect REQUIRED. Connection string to Kafka Zookeeper cluster. E.g.: 192.168.0.1:2181,192.168.0.2:
2181/kafka1

Generic Options
Option Description
------ -----------
--api Api url. Example: http://master:7000
```

Removing a cluster
-------------------------------

```
# ./kafka-mesos.sh help cluster remove
Remove cluster
Usage: cluster remove <cluster-id>

Generic Options
Option Description
------ -----------
--api Api url. Example: http://master:7000

```

Listing clusters
-------------------------------

```
# ./kafka-mesos.sh help cluster list
List brokers
Usage: cluster list

Generic Options
Option Description
------ -----------
--api Api url. Example: http://master:7000

```


Adding brokers to the cluster
-------------------------------

Expand Down Expand Up @@ -774,7 +918,7 @@ Listing Topics
```
#./kafka-mesos.sh help topic list
List topics
Usage: topic list [<topic-expr>]
Usage: topic list [<topic-expr>] --cluster <cluster-id>

Generic Options
Option Description
Expand All @@ -793,7 +937,7 @@ Adding Topic
```
#./kafka-mesos.sh help topic add
Add topic
Usage: topic add <topic-expr> [options]
Usage: topic add <topic-expr> [options] --cluster <cluster-id>

Option Description
------ -----------
Expand Down Expand Up @@ -830,7 +974,7 @@ Updating Topic
```
#./kafka-mesos.sh help topic update
Update topic
Usage: topic update <topic-expr> [options]
Usage: topic update <topic-expr> [options] --cluster <cluster-id>

Option Description
------ -----------
Expand All @@ -853,7 +997,7 @@ Rebalancing topics
```
#./kafka-mesos.sh help topic rebalance
Rebalance topics
Usage: topic rebalance <topic-expr>|status [options]
Usage: topic rebalance <topic-expr>|status [options] --cluster <cluster-id>

Option Description
------ -----------
Expand Down
4 changes: 1 addition & 3 deletions kafka-mesos.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@ debug=true

user=vagrant

storage=zk:/mesos-kafka-scheduler
storage=zk:master:2181

master=master:5050

zk=master:2181/chroot

#for testing on the vagrant master via ./kafka-mesos.sh scheduler
#you will eventually want to run this on a scheduler i.e marathon
#change the IP to what is service discoverable & routable for your setup
Expand Down
25 changes: 18 additions & 7 deletions src/scala/ly/stealth/mesos/kafka/Broker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,18 @@ import org.apache.mesos.Protos.Resource.DiskInfo.Persistence
import org.apache.mesos.Protos.Volume.Mode

import scala.collection.JavaConversions._
import scala.collection
import org.apache.mesos.Protos.{Volume, Value, Resource, Offer}
import java.util._
import java.util.{TimeZone, Collections, UUID, Date}
import ly.stealth.mesos.kafka.Broker.{Metrics, Stickiness, Failover}
import ly.stealth.mesos.kafka.Util.BindAddress
import net.elodina.mesos.util.{Strings, Period, Range, Repr}
import java.text.SimpleDateFormat
import scala.List
import scala.collection.Map
import scala.util.parsing.json.JSONObject

class Broker(_id: String = "0") {
var id: String = _id
var cluster: Cluster = null

@volatile var active: Boolean = false

var cpus: Double = 1
Expand All @@ -58,6 +57,16 @@ class Broker(_id: String = "0") {
// broker has been modified while being in non stopped state, once stopped or before task launch becomes false
var needsRestart: Boolean = false

def this(id: String, cluster: Cluster){
this(id)
this.cluster = cluster
}

def this(json: Map[String, Any], expanded: Boolean = false) = {
this
fromJson(json, expanded)
}

def options(defaults: util.Map[String, String] = null): util.Map[String, String] = {
val result = new util.LinkedHashMap[String, String]()
if (defaults != null) result.putAll(defaults)
Expand Down Expand Up @@ -244,8 +253,9 @@ class Broker(_id: String = "0") {
matches
}

def fromJson(node: Map[String, Object]): Unit = {
def fromJson(node: Map[String, Any], expanded: Boolean = false): Unit = {
id = node("id").asInstanceOf[String]
cluster = if (expanded) new Cluster(node("cluster").asInstanceOf[Map[String, Any]]) else Nodes.getCluster(node("cluster").asInstanceOf[String])
active = node("active").asInstanceOf[Boolean]

cpus = node("cpus").asInstanceOf[Number].doubleValue()
Expand Down Expand Up @@ -277,9 +287,10 @@ class Broker(_id: String = "0") {
if (node.contains("needsRestart")) needsRestart = node("needsRestart").asInstanceOf[Boolean]
}

def toJson: JSONObject = {
def toJson(expanded: Boolean = false): JSONObject = {
val obj = new collection.mutable.LinkedHashMap[String, Any]()
obj("id") = id
obj("cluster") = if (expanded) cluster.toJson else cluster.id
obj("active") = active

obj("cpus") = cpus
Expand Down Expand Up @@ -316,7 +327,7 @@ object Broker {

def idFromExecutorId(executorId: String): String = idFromTaskId(executorId)

def isOptionOverridable(name: String): Boolean = !List("broker.id", "port", "zookeeper.connect").contains(name)
def isOptionOverridable(name: String): Boolean = !scala.List("broker.id", "port", "zookeeper.connect").contains(name)

class Stickiness(_period: Period = new Period("10m")) {
var period: Period = _period
Expand Down
Loading