forked from CenturyLinkLabs/panamax-contest-templates
-
Notifications
You must be signed in to change notification settings - Fork 2
/
kafka_server_cluster_levidehaan.pmx
129 lines (98 loc) · 3.26 KB
/
kafka_server_cluster_levidehaan.pmx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
---
name: Kafka Server Cluster
description: A deploy of a local Kafka cluster with 2 Kafka severs.
keywords: Kafka
type: Default
documentation: |
You will need to expose port 9050 and 9051 on virtualbox in order to connect with python.
VBoxManage controlvm panamax-vm natpf1 kafka,tcp,,9050,,9051
If you're just getting started with Kafka you can try out sending massages with Kafka Python.
Here are some directions to get you started.
You'll need to use the docker image to create a topic
In your shell type:
panamax ssh
docker run -ti --link zookeeper:zk levidehaan/kafka:0.8.1 /bin/bash
/opt/kafka_2.8.0-0.8.1.1/bin/kafka-topics.sh --create --topic my_topic --partitions 4 --zookeeper $ZK_PORT_2181_TCP_ADDR --replication-factor 2
/opt/kafka_2.8.0-0.8.1.1/bin/kafka-console-consumer.sh --topic=my_topic --zookeeper=$ZK_PORT_2181_TCP_ADDR
Now you can exit, type: exit
Exit again to leave the Panamax shell
Install kafka-python
pip install kafka-python
---------------------------
Kafka Emitter:
from kafka.client import KafkaClient
from kafka.consumer import SimpleConsumer
from kafka.producer import SimpleProducer, KeyedProducer
kafka = KafkaClient("localhost:9050")
producer = SimpleProducer(kafka)
producer.send_messages("my_topic", "some message")
producer.send_messages("my_topic", "this method", "is variadic")
producer = SimpleProducer(kafka, async=True)
producer.send_messages("my_topic", "async message")
producer = SimpleProducer(kafka, async=False,
req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE,
ack_timeout=2000)
response = producer.send_messages("my_topic", "async message")
if response:
print(response[0].error)
print(response[0].offset)
producer = SimpleProducer(kafka, batch_send=True, batch_send_every_n=20, batch_send_every_t=60)
kafka.close()
-----------------------------------------------
Kafka Consumer:
from kafka.client import KafkaClient
from kafka.consumer import MultiProcessConsumer
kafka = KafkaClient("localhost:9050")
consumer = MultiProcessConsumer(kafka, "my-group", "my_topic", num_procs=2)
consumer = MultiProcessConsumer(kafka, "my-group", "my_topic",
partitions_per_proc=2)
print("Listening for Kafka")
for message in consumer:
print(message)
for message in consumer.get_messages(count=5, block=True, timeout=4):
print(message)
images:
- name: zookeeper
source: jplock/zookeeper:latest
category: Kafka
type: Default
ports:
- host_port: '49181'
container_port: '2181'
proto: TCP
- name: kafka
source: levidehaan/kafka:0.8.1
category: Kafka
type: Default
ports:
- host_port: '9050'
container_port: '9050'
proto: TCP
links:
- service: zookeeper
alias: zk
environment:
- variable: BROKER_ID
value: 1
- variable: PORT
value: 9050
- variable: HOST_IP
value: 172.17.42.1
- name: kafka2
source: levidehaan/kafka:0.8.1
category: Kafka
type: Default
ports:
- host_port: '9051'
container_port: '9051'
proto: TCP
links:
- service: zookeeper
alias: zk
environment:
- variable: BROKER_ID
value: 2
- variable: PORT
value: 9051
- variable: HOST_IP
value: 172.17.42.1