This project is created to show how to track events including players movement in Minecraft via Apache Kafka and Quarkus.
The Minecraft mod kafkamod
might be a bit to chatty, but it is nice doing development.
Tip
|
Start Kafka before starting the Minecraft server. |
Spinning up the web-app Use Kafka to track players in MineCraft
The kafkamod
mod will connect to one or more Apache Kafka brokers on startup. It will create the topics if needed with configuration from server.properties
.
-
kafka-mod-player-event-enable | default: true
-
kafka-mod-entity-event-enable | default: true
-
kafka-mod-server-chat-event-enable | default: true
-
kafka-mod-topic-num-partitions | default: 1
-
kafka-mod-topic-replication-factor | default: 1
-
kafka-mod-kafka-brokers | default: localhost:9092
Warning
|
Security - we conect via PLAINTEXT
|
The kafkamod
mod use Apache Kafka to send event about
-
Chat
-
Entities entering or leaving the world
-
Entities crafted or picked up by a player
-
Player
-
PlayerEvent.PlayerLoggedInEvent → Apache Kafka
-
PlayerEvent.PlayerLoggedOutEvent → Apache Kafka
-
PlayerEvent.PlayerChangedDimensionEvent → All Players - client message
-
PlayerEvent.PlayerRespawnEvent → All Players - client message
-
PlayerEvent.ItemCraftedEvent → Apache Kafka
-
PlayerEvent.ItemPickupEvent → Apache Kafka
-
net.minecraftforge.event.ServerChatEvent → Apache Kafka
-
EntityJoinLevelEvent → Apache Kafka
-
EntityLeaveLevelEvent → Apache Kafka
@SubscribeEvent
public void onEntityJoinLevelEvent(EntityJoinLevelEvent event) {
EntityRecord entityRecord = new EntityRecord(event);
String key = entityRecord.getName();
KafkaMod.addRecordToTopic(
key,
entityRecord.toJsonNode(),
KafkaProperties.KAFKA_MOD_ENTITY_EVENT);
}
Warning
|
The methode addRecordToTopic will get one record at a time and send it to Apache Kafka - we need to have something left on the ToDo list
|
public static void addRecordToTopic(String key, JsonNode record, String topic) {
LOGGER.debug("Producing record: %s\t%s%n", key, record);
getProducer().ifPresent(
producer -> {
producer.send(new ProducerRecord<String, JsonNode>(topic, key, record),
new Callback() {
@Override
public void onCompletion(RecordMetadata m, Exception e) {
if (e != null) {
e.printStackTrace();
} else {
LOGGER.debug(
"Produced record to topic %s partition [%d] @ offset %d%n",
m.topic(),
m.partition(),
m.offset());
}
}
}
);
});
}
All entities is able to map to JsonNode
like
public JsonNode toJsonNode() {
return KafkaMod.objectMapper.valueToTree(this);
}
Buildin the kafkamod
mod is done with
cd minecraft-kafka-mod
./gradlew build
Package the kafkamod
mod is done with the use of shadowJar
in build.gradle
The gradle build use `shadowJar` to inlude all dependencies into our `kafka-1.0.8.jar` file. Included is `org.apache.kafka:kafka-clients:3.8.0` and `com.fasterxml.jackson.core:jackson-databind:2.17.2`.
An alternative to use shadowJar
is to add the dependencies (five jar-files) manually one my one in the section -DlegacyClassPath=
in the file
/opt/minecraft/forge/.minecraft_1.21.4-54.0.26/libraries/net/minecraftforge/forge/1.21.4-54.0.26/unix_args.txt
I use minecraft-kafka-mod/build_and_deploy.sh to deploy minecraftforge
. The
script build the binary and deploy to the folder $FORGE_SERVER_LOCATION/mods/kafka-1.0.8.jar
- no housekeep when
doing a bump of version.
The app quarkus-kafka
is created to display the data we write/read to/from Apache Kafka in a nice human readable way.
# Location of a Kafka broker (default is broker1.jarry.dk:9192) kafka.bootstrap.servers=broker1.jarry.dk:9192 # Configure the incoming `kafka-mod-item-stack` Kafka topic mp.messaging.incoming.kafka-mod-item-stack.topic=kafka-mod-item-stack mp.messaging.incoming.kafka-mod-item-stack.auto.offset.reset=earliest
The class ItemStackProcessor
gets records from Apache Kafka, extract the player and send it to players
.
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
@Incoming("item-stack")
@Outgoing("players")
public Player process(String itemStack) throws InterruptedException {
Player player = null;
try {
JsonNode itemStackObj = objectMapper.readTree(itemStack);
JsonNode playerObj = itemStackObj.get("player");
player = new Player(playerObj);
} catch (Exception e) {
e.printStackTrace();
}
return player;
}
Note
|
We do not need to know if the outgoing players is internal or external - in this case it is an internal.
|
The class PlayerResource
pick up the Player
and expose it as a text/event-stream
endpoint for all updates to players
.
import org.eclipse.microprofile.reactive.messaging.Channel;
import io.smallrye.mutiny.Multi;
@Path("/players")
public class PlayerResource {
@Channel("players")
Multi<Player> players;
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public Multi<Player> stream() {
return players;
}
}
Starting the app we are now able to use http://localhost:8080/players.html if in dev
mode to see updates to players.
-
PLAINTEXT : 9192
-
SSL : 9193
-
INTERNAL_PLAINTEXT : 9194
Overwrite `inter.broker.listener.name` with `INTERNAL_PLAINTEXT`
#!/bin/bash KAFKA_ZOOKEEPER_CONNECT="zookeeper.jarry.dk:2181" KAFKA_LISTENERS="PLAINTEXT://:9192,SSL://:9193,INTERNAL_PLAINTEXT://:9194" KAFKA_LISTENER_SECURITY_PROTOCAL_MAP="PLAINTEXT:PLAINTEXT,INTERNAL_PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL" KAFKA_INTER_BROKER_LISTENER_NAME="INTERNAL_PLAINTEXT" KAFKA_SSL_TRUSTSTORE_LOCATION="/opt/apache/kafka/jarry_dk/kafka.server.truststore.jks" KAFKA_SSL_TRUSTSTORE_PASSWORD="password1234" KAFKA_SSL_KEYSTORE_LOCATION="/opt/apache/kafka/jarry_dk/broker1.jarry.dk.keystore.jks" KAFKA_SSL_KEYSTORE_PASSWORD="password1234" KAFKA_SSL_CLIENT_AUTH="requested" KAFKA_ADVERTISED_LISTENERS="PLAINTEXT://broker1.jarry.dk:9192,SSL://broker1.jarry.dk:9193,INTERNAL_PLAINTEXT://broker1.jarry.dk:9194" KAFKA_BROKER_ID="1" KAFKA_LOG_DIRS="/tmp/kafka-logs-broker-1" $KAFKA_HOME/bin/kafka-server-start.sh \ $KAFKA_HOME/config/server.properties \ --override zookeeper.connect=$KAFKA_ZOOKEEPER_CONNECT \ --override listeners=$KAFKA_LISTENERS \ --override listener.security.protocol.map=$KAFKA_LISTENER_SECURITY_PROTOCAL_MAP \ --override inter.broker.listener.name=$KAFKA_INTER_BROKER_LISTENER_NAME \ --override ssl.truststore.location=$KAFKA_SSL_TRUSTSTORE_LOCATION \ --override ssl.truststore.password=$KAFKA_SSL_TRUSTSTORE_PASSWORD \ --override ssl.keystore.location=$KAFKA_SSL_KEYSTORE_LOCATION \ --override ssl.keystore.password=$KAFKA_SSL_KEYSTORE_PASSWORD \ --override ssl.client.auth=$KAFKA_SSL_CLIENT_AUTH \ --override advertised.listeners=$KAFKA_ADVERTISED_LISTENERS \ --override broker.id=$KAFKA_BROKER_ID \ --override log.dirs=$KAFKA_LOG_DIRS
Follow the Apache Kafka Quarickstart to install the zookeeper
and the server
.
Do the steeps in the folder /opt/apache/kafka
and you will have Apache Kafka
installation in the folder /opt/apache/kafka/kafka_2.13-3.6.1
export KAFKA_HOME=/opt/apache/kafka/kafka_2.13-3.8.0
$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.propertie
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
Tip
|
Add ` | jq` to get the json from the topic in a nice format. How to install jq. |
$KAFKA_HOME/bin/kafka-console-consumer.sh \
--bootstrap-server broker1.jarry.dk:9192 \
--topic kafka-mod-chat \
--from-beginning | jq
$KAFKA_HOME/bin/kafka-console-consumer.sh \
--bootstrap-server broker1.jarry.dk:9192 \
--topic kafka-mod-item-stack \
--from-beginning | jq
$KAFKA_HOME/bin/kafka-console-consumer.sh \
--bootstrap-server broker1.jarry.dk:9192 \
--topic kafka-mod-entity-event \
--from-beginning | jq
$KAFKA_HOME/bin/kafka-console-consumer.sh \
--bootstrap-server broker1.jarry.dk:9192 \
--topic kafka-mod-player-event \
--from-beginning | jq
$KAFKA_HOME/bin/kafka-topics.sh \
--bootstrap-server broker1.jarry.dk:9192 \
--topic kafka-mod-entity-event \
--create
$KAFKA_HOME/bin/kafka-topics.sh \
--bootstrap-server broker1.jarry.dk:9192 \
--topic kafka-mod-entity-event \
--replica-assignment 0:1:2,0:1:2,0:1:2 \
--create
$KAFKA_HOME/bin/kafka-topics.sh \
--bootstrap-server broker1.jarry.dk:9192 \
--topic kafka-mod-entity-event \
--replication-factor 2 \
--create
$KAFKA_HOME/bin/kafka-topics.sh \
--bootstrap-server broker1.jarry.dk:9192 \
--topic kafka-mod-entity-event \
--describe
$KAFKA_HOME/bin/kafka-topics.sh \
--bootstrap-server broker1.jarry.dk:9192 \
--topic kafka-mod-entity-event \
--partitions 3 \
--alter
Note
|
This can be done with kafka-reassign-partitions.sh too. |
$KAFKA_HOME/bin/kafka-topics.sh \
--bootstrap-server broker1.jarry.dk:9192 \
--topic kafka-mod-entity-event \
--delete
cat > increase-replication-factor.json << EOF
{
"version": 1,
"partitions": [
{
"topic": "kafka-mod-entity-event",
"partition": 0,
"replicas": [
0,
1
],
"replication-factor" : 2
}
]
}
EOF
$KAFKA_HOME/bin/kafka-reassign-partitions.sh \
--bootstrap-server broker1.jarry.dk:9192 \
--reassignment-json-file increase-replication-factor.json \
--execute
-
https://kafka.apache.org/32/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
-
https://kafka.apache.org/32/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
-
https://medium.com/@_amanarora/replication-in-kafka-58b39e91b64e
-
https://www.confluent.io/blog/hands-free-kafka-replication-a-lesson-in-operational-simplicity/
-
https://kafka.apache.org/documentation/#basic_ops_increase_replication_factor