Skip to content

Commit

Permalink
Merge pull request HomeAdvisor#33 from lcary/lcary/avro_integration
Browse files Browse the repository at this point in the history
Add Avro message deserialization capability
  • Loading branch information
dhayha authored Dec 19, 2018
2 parents 06b42d5 + 2c935cb commit 16eca46
Show file tree
Hide file tree
Showing 12 changed files with 363 additions and 51 deletions.
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ Kafdrop is a UI for monitoring Apache Kafka clusters. The tool displays informat
* Kafka (0.8.1 or 0.8.2 is known to work)
* Zookeeper (3.4.5 or later)

Optional, additional integration:

* Schema Registry

## Building

After cloning the repository, building should just be a matter of running a standard Maven build:
Expand All @@ -30,6 +34,17 @@ Then open a browser and navigate to http://localhost:9000. The port can be overr
--server.port=<port>
```

Additionally, you can optionally configure a schema registry connection with:
```
--schemaregistry.connect=http://localhost:8081
```

Finally, a default message format (e.g. to deserialize Avro messages) can optionally be configured as follows:
```
--message.format=AVRO
```
Valid format values are "DEFAULT" and "AVRO". This setting can also be configured at the topic level via dropdown when viewing messages.

## Running with Docker

Note for Mac Users: You need to convert newline formatting of the kafdrop.sh file *before* running this command:
Expand Down
45 changes: 37 additions & 8 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,17 @@
<tag>HEAD</tag>
</scm>

<repositories>
<repository>
<id>central</id>
<url>http://repo1.maven.org/maven2</url>
</repository>
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>
</repositories>

<dependencyManagement>
<dependencies>
<dependency>
Expand Down Expand Up @@ -73,6 +84,31 @@
<artifactId>spring-retry</artifactId>
<version>1.1.3.RELEASE</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.2</version>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<version>3.0.2</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.5</version>
</dependency>

<!-- Spring Boot -->
<dependency>
Expand Down Expand Up @@ -152,13 +188,6 @@
<version>0.7.1</version>
<scope>test</scope>
</dependency>
<!-- Right now only needed by integration tests -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.2</version>
<scope>test</scope>
</dependency>
</dependencies>


Expand Down Expand Up @@ -313,7 +342,7 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.2</version>
<version>0.10.2.2</version>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.homeadvisor.kafdrop.config;

import javax.annotation.PostConstruct;

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

import com.homeadvisor.kafdrop.util.MessageFormat;


@Configuration
public class MessageFormatConfiguration {

@Component
@ConfigurationProperties(prefix = "message")
public static class MessageFormatProperties
{

private MessageFormat format;

@PostConstruct
public void init() {
// Set a default message format if not configured.
if (format == null) {
format = MessageFormat.DEFAULT;
}
}

public MessageFormat getFormat()
{
return format;
}

public void setFormat(MessageFormat format)
{
this.format = format;
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.homeadvisor.kafdrop.config;

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.regex.Pattern;
import java.util.stream.Collectors;


@Configuration
public class SchemaRegistryConfiguration {

@Component
@ConfigurationProperties(prefix = "schemaregistry")
public static class SchemaRegistryProperties
{

public static final Pattern CONNECT_SEPARATOR = Pattern.compile("\\s*,\\s*");

private String connect;

public String getConnect()
{
return connect;
}

public void setConnect(String connect)
{
this.connect = connect;
}

public List<String> getConnectList()
{
return CONNECT_SEPARATOR.splitAsStream(this.connect)
.map(String::trim)
.filter(s -> s.length() > 0)
.collect(Collectors.toList());
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,18 @@

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.homeadvisor.kafdrop.config.MessageFormatConfiguration;
import com.homeadvisor.kafdrop.config.SchemaRegistryConfiguration;
import com.homeadvisor.kafdrop.model.MessageVO;
import com.homeadvisor.kafdrop.model.TopicVO;
import com.homeadvisor.kafdrop.service.KafkaMonitor;
import com.homeadvisor.kafdrop.service.MessageInspector;
import com.homeadvisor.kafdrop.service.TopicNotFoundException;
import com.homeadvisor.kafdrop.util.AvroMessageDeserializer;
import com.homeadvisor.kafdrop.util.DefaultMessageDeserializer;
import com.homeadvisor.kafdrop.util.MessageDeserializer;
import com.homeadvisor.kafdrop.util.MessageFormat;

import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
Expand All @@ -51,6 +58,12 @@ public class MessageController
@Autowired
private MessageInspector messageInspector;

@Autowired
private MessageFormatConfiguration.MessageFormatProperties messageFormatProperties;

@Autowired
private SchemaRegistryConfiguration.SchemaRegistryProperties schemaRegistryProperties;

/**
* Human friendly view of reading messages.
* @param topicName Name of topic
Expand All @@ -65,24 +78,39 @@ public String viewMessageForm(@PathVariable("name") String topicName,
BindingResult errors,
Model model)
{
final MessageFormat defaultFormat = messageFormatProperties.getFormat();

if (messageForm.isEmpty())
{
final PartitionOffsetInfo defaultForm = new PartitionOffsetInfo();
defaultForm.setCount(1l);

defaultForm.setCount(10l);
defaultForm.setOffset(0l);
defaultForm.setPartition(0);
defaultForm.setFormat(defaultFormat);

model.addAttribute("messageForm", defaultForm);
}

final TopicVO topic = kafkaMonitor.getTopic(topicName)
.orElseThrow(() -> new TopicNotFoundException(topicName));
model.addAttribute("topic", topic);

model.addAttribute("defaultFormat", defaultFormat);
model.addAttribute("messageFormats", MessageFormat.values());

if (!messageForm.isEmpty() && !errors.hasErrors())
{
final MessageDeserializer deserializer = getDeserializer(
topicName, messageForm.getFormat());

model.addAttribute("messages",
messageInspector.getMessages(topicName,
messageForm.getPartition(),
messageForm.getOffset(),
messageForm.getCount()));
messageForm.getCount(),
deserializer));

}

return "message-inspector";
Expand Down Expand Up @@ -120,12 +148,16 @@ public String viewMessageForm(@PathVariable("name") String topicName,
}
else
{
// Currently, only default deserialization supported via JSON API.
final MessageDeserializer deserializer = new DefaultMessageDeserializer();

List<Object> messages = new ArrayList<>();
List<MessageVO> vos = messageInspector.getMessages(
topicName,
partition,
offset,
count);
count,
deserializer);

if(vos != null)
{
Expand All @@ -136,6 +168,19 @@ public String viewMessageForm(@PathVariable("name") String topicName,
}
}

private MessageDeserializer getDeserializer(String topicName, MessageFormat format) {
final MessageDeserializer deserializer;

if (format == MessageFormat.AVRO) {
final String schemaRegistryUrl = schemaRegistryProperties.getConnect();
deserializer = new AvroMessageDeserializer(topicName, schemaRegistryUrl);
} else {
deserializer = new DefaultMessageDeserializer();
}

return deserializer;
}

/**
* Encapsulates offset data for a single partition.
*/
Expand Down Expand Up @@ -166,11 +211,19 @@ public static class PartitionOffsetInfo
@JsonProperty("lastOffset")
private Long count;

public PartitionOffsetInfo(int partition, long offset, long count)
private MessageFormat format;

public PartitionOffsetInfo(int partition, long offset, long count, MessageFormat format)
{
this.partition = partition;
this.offset = offset;
this.count = count;
this.format = format;
}

public PartitionOffsetInfo(int partition, long offset, long count)
{
this(partition, offset, count, MessageFormat.DEFAULT);
}

public PartitionOffsetInfo()
Expand Down Expand Up @@ -213,5 +266,15 @@ public void setCount(Long count)
{
this.count = count;
}

public MessageFormat getFormat()
{
return format;
}

public void setFormat(MessageFormat format)
{
this.format = format;
}
}
}
Loading

0 comments on commit 16eca46

Please sign in to comment.