Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated to new kafka client version with an option to connect to secured kafka cluster #30

Open
wants to merge 24 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions kaas_local_jaas.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="xyz"
password="xyz";
};

KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="xyz"
password="xyz";
};
14 changes: 13 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,18 @@
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.2.2</version>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.6</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.2</version>
</dependency>
<dependency>
<groupId>org.freemarker</groupId>
<artifactId>freemarker</artifactId>
Expand Down Expand Up @@ -197,7 +209,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<version>3.7.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
Expand Down
62 changes: 44 additions & 18 deletions src/main/java/com/homeadvisor/kafdrop/KafDrop.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.base.Throwables;
import com.homeadvisor.kafdrop.config.ini.IniFilePropertySource;
import com.homeadvisor.kafdrop.config.ini.IniFileReader;
import joptsimple.internal.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.Banner;
Expand All @@ -36,7 +37,12 @@
import org.springframework.web.servlet.config.annotation.ContentNegotiationConfigurer;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurerAdapter;

import java.io.*;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.util.Objects;
import java.util.stream.Stream;

Expand Down Expand Up @@ -83,17 +89,15 @@ public void onApplicationEvent(ApplicationEnvironmentPreparedEvent event)
catch (Exception ex)
{
System.err.println("Unable to set up logging.dir from logging.file " + loggingFile + ": " +
Throwables.getStackTraceAsString(ex));
Throwables.getStackTraceAsString(ex));
}
}
if (environment.containsProperty("debug") &&
!"false".equalsIgnoreCase(environment.getProperty("debug", String.class)))
!"false".equalsIgnoreCase(environment.getProperty("debug", String.class)))
{
System.setProperty(PROP_SPRING_BOOT_LOG_LEVEL, "DEBUG");
}

}

}

private static class EnvironmentSetupListener implements ApplicationListener<ApplicationEnvironmentPreparedEvent>, Ordered
Expand All @@ -107,19 +111,41 @@ public int getOrder()
return Ordered.HIGHEST_PRECEDENCE + 10;
}

@Override
public void onApplicationEvent(ApplicationEnvironmentPreparedEvent event)
{
final ConfigurableEnvironment environment = event.getEnvironment();
if (environment.containsProperty(SM_CONFIG_DIR))
{
Stream.of("kafdrop", "global")
.map(name -> readProperties(environment, name))
.filter(Objects::nonNull)
.forEach(iniPropSource -> environment.getPropertySources()
.addBefore("applicationConfigurationProperties", iniPropSource));
}
}
@Override
public void onApplicationEvent(ApplicationEnvironmentPreparedEvent event)
{
final ConfigurableEnvironment environment = event.getEnvironment();

LOG.info("Initializing jaas config");
String env = environment.getProperty("kafka.env");
Boolean isSecured = environment.getProperty("kafka.isSecured", Boolean.class);
LOG.info("env: {} .Issecured kafka: {}", env, isSecured);
if (isSecured && Strings.isNullOrEmpty(env)) {
throw new RuntimeException("'env' cannot be null if connecting to secured kafka.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like IllegalArgumentException or IllegalStateException would be a more appropriate exception here.

}

LOG.info("ENV: {}", env);
String path;

if (isSecured) {
if ((env.equalsIgnoreCase("stage") || env.equalsIgnoreCase("prod") || env.equalsIgnoreCase("local"))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there something special about these particular environment names? Would rather not dictate how folks organize their environments. There's already a check for a non-null environment, so couldn't this just use the environment name as provided?

path = environment.getProperty("user.dir") + "/kaas_" + env.toLowerCase() + "_jaas.conf";
LOG.info("PATH: {}", path);
System.setProperty("java.security.auth.login.config", path);
}
else {
throw new RuntimeException("unable to identify env. set 'evn' variable either to 'stage' or 'prod' or local");
}
}

if (environment.containsProperty(SM_CONFIG_DIR)) {
Stream.of("kafdrop", "global")
.map(name -> readProperties(environment, name))
.filter(Objects::nonNull)
.forEach(iniPropSource -> environment.getPropertySources()
.addBefore("applicationConfigurationProperties", iniPropSource));
}
}

private IniFilePropertySource readProperties(Environment environment, String name)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.homeadvisor.kafdrop.config;

import lombok.*;
import org.springframework.boot.context.properties.*;
import org.springframework.stereotype.*;

/**
* Created by Satendra Sahu on 9/26/18
*/
@Component
@ConfigurationProperties(prefix = "kafka")
@Data
public class KafkaConfiguration
{
private String env = "local";
private String brokerConnect;
private Boolean isSecured = false;
private String keyDeserializer;
private String valueDeserializer;
private String saslMechanism;
private String securityProtocol;
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,16 @@
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.ResponseStatus;

import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -112,4 +118,10 @@ public static class ClusterInfoVO
public List<BrokerVO> brokers;
public List<TopicVO> topics;
}

@ResponseStatus(HttpStatus.OK)
@RequestMapping("/health_check")
public void healthCheck()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spring Boot already provides a health status endpoint. What is this endpoint needed for?

{
}
}
53 changes: 39 additions & 14 deletions src/main/java/com/homeadvisor/kafdrop/model/BrokerVO.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,88 +18,113 @@

package com.homeadvisor.kafdrop.model;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.Date;

@JsonIgnoreProperties(ignoreUnknown = true)
public class BrokerVO
{
private int id;
private String host;
private String[] endpoints;
private int port;
private int jmxPort;
private int version;
private boolean controller;
private Date timestamp;

public void setEndpoints(String[] endpoints)
{
this.endpoints = endpoints;
if (host == null)
{
String[] hostPort = endpoints[0].split("://")[1].split(":");
this.host = hostPort[0];
this.port = Integer.parseInt(hostPort[1]);
}
}

public String[] getEndpoints()
{
return this.endpoints;
}

public int getId()
{
return id;
return id;
}

public void setId(int id)
{
this.id = id;
this.id = id;
}

public String getHost()
{
return host;
return host;
}

public void setHost(String host)
{
this.host = host;
if (host != null)
{
this.host = host;
}
}

public int getPort()
{
return port;
return port;
}

public void setPort(int port)
{
this.port = port;
if (port > 0)
{
this.port = port;
}
}

public int getJmxPort()
{
return jmxPort;
return jmxPort;
}

@JsonProperty("jmx_port")
public void setJmxPort(int jmxPort)
{
this.jmxPort = jmxPort;
this.jmxPort = jmxPort;
}

public int getVersion()
{
return version;
return version;
}

public void setVersion(int version)
{
this.version = version;
this.version = version;
}

public Date getTimestamp()
{
return timestamp;
return timestamp;
}

public void setTimestamp(Date timestamp)
{
this.timestamp = timestamp;
this.timestamp = timestamp;
}

public boolean isController()
{
return controller;
return controller;
}

public void setController(boolean controller)
{
this.controller = controller;
this.controller = controller;
}
}
Loading