From 67cc3f859c28a6a9d71780be8718bbcd5c5f7b40 Mon Sep 17 00:00:00 2001 From: Jordan Hall Date: Mon, 4 Dec 2023 17:17:56 +0000 Subject: [PATCH 1/3] Feat: Move logging to ZIO's official logging library Signed-off-by: Jordan Hall --- build.sbt | 47 ++- .../src/main/resources/logback.xml | 113 ------- .../com/mwam/kafkakewl/deploy/Main.scala | 27 +- .../src/main/resources/logback.xml | 113 ------- .../com/mwam/kafkakewl/metrics/Main.scala | 25 +- .../kafkakewl/utils/logging/JsonLayout.java | 287 ------------------ .../kafkakewl/utils/logging/LogFilter.scala | 39 --- .../kafkakewl/utils/logging/Logging.scala | 41 +++ 8 files changed, 86 insertions(+), 606 deletions(-) delete mode 100644 kafkakewl-deploy/src/main/resources/logback.xml delete mode 100644 kafkakewl-metrics/src/main/resources/logback.xml delete mode 100644 kafkakewl-utils/src/main/java/com/mwam/kafkakewl/utils/logging/JsonLayout.java delete mode 100644 kafkakewl-utils/src/main/scala/com/mwam/kafkakewl/utils/logging/LogFilter.scala create mode 100644 kafkakewl-utils/src/main/scala/com/mwam/kafkakewl/utils/logging/Logging.scala diff --git a/build.sbt b/build.sbt index 1ba74b8..d91e7e4 100644 --- a/build.sbt +++ b/build.sbt @@ -1,27 +1,29 @@ val orgName = "Marshall Wace" val year = 2023 -ThisBuild / scalaVersion := "3.3.1" -ThisBuild / version := "0.1.0-SNAPSHOT" -ThisBuild / organization := "com.mwam.kafkakewl" -ThisBuild / organizationName := orgName -ThisBuild / startYear := Some(year) - -ThisBuild / scalacOptions ++= Seq( - "-Xmax-inlines", "64", +ThisBuild / scalaVersion := "3.3.1" +ThisBuild / version := "0.1.0-SNAPSHOT" +ThisBuild / organization := "com.mwam.kafkakewl" +ThisBuild / organizationName := orgName +ThisBuild / startYear := Some(year) + +ThisBuild / scalacOptions ++= Seq( + "-Xmax-inlines", + "64", "-Yretain-trees", // so that zio-json supports default values - "-Wunused:imports", "-Wunused:params", "-deprecation", "-feature" + "-Wunused:imports", + "-Wunused:params", + "-deprecation", + "-feature" ) // Have to do this for the root project and enable it for the sub-projects as well as setting the headerLicense for them. disablePlugins(HeaderPlugin) -val license = Some(HeaderLicense.Custom( - s"""SPDX-FileCopyrightText: $year $orgName +val license = Some(HeaderLicense.Custom(s"""SPDX-FileCopyrightText: $year $orgName | |SPDX-License-Identifier: Apache-2.0 - |""".stripMargin -)) + |""".stripMargin)) val logbackVersion = "1.4.11" val logbackContribJsonVersion = "0.1.5" @@ -44,7 +46,7 @@ val tapir = Seq( "com.softwaremill.sttp.tapir" %% "tapir-zio-http-server" % tapirVersion, "com.softwaremill.sttp.tapir" %% "tapir-json-zio" % tapirVersion, "com.softwaremill.sttp.tapir" %% "tapir-swagger-ui-bundle" % tapirVersion, - "com.softwaremill.sttp.tapir" %% "tapir-zio-metrics" % tapirVersion, + "com.softwaremill.sttp.tapir" %% "tapir-zio-metrics" % tapirVersion ) val tapirCore = Seq( @@ -63,7 +65,7 @@ val zio = Seq( "dev.zio" %% "zio-test-junit" % zioVersion % Test, "dev.zio" %% "zio-json" % zioJsonVersion, "dev.zio" %% "zio-logging" % zioLoggingVersion, - "dev.zio" %% "zio-logging-slf4j" % zioLoggingVersion, + "dev.zio" %% "zio-logging-slf4j2-bridge" % zioLoggingVersion, "dev.zio" %% "zio-kafka" % zioKafkaVersion, "com.softwaremill.sttp.client3" %% "zio-json" % tapirZioJsonVersion % Test ) @@ -75,18 +77,13 @@ val config = Seq( ) val circeYaml = Seq( - "io.circe" %% "circe-yaml" % circeYamlVersion + "io.circe" %% "circe-yaml" % circeYamlVersion ) val kafkaClient = Seq( "org.apache.kafka" % "kafka-clients" % kafkaClientVersion ) -val logging = Seq( - "ch.qos.logback" % "logback-classic" % logbackVersion, - "ch.qos.logback.contrib" % "logback-jackson" % logbackContribJsonVersion -) - val telemetry = Seq( "dev.zio" %% "zio-opentelemetry" % zioTelemetryVersion, "io.opentelemetry" % "opentelemetry-api" % openTelemetryVersion, @@ -95,7 +92,7 @@ val telemetry = Seq( "io.opentelemetry" % "opentelemetry-extension-trace-propagators" % openTelemetryVersion, "io.opentelemetry" % "opentelemetry-semconv" % s"$openTelemetryVersion-alpha", "io.opentelemetry" % "opentelemetry-sdk-extension-autoconfigure" % openTelemetryVersion, - "io.grpc" % "grpc-netty-shaded" % openTelemetryGrpcVersion, + "io.grpc" % "grpc-netty-shaded" % openTelemetryGrpcVersion ) val tests = Seq( @@ -108,7 +105,7 @@ lazy val utils = project .settings( name := "kafkakewl-utils", headerLicense := license, - libraryDependencies ++= logging ++ zio ++ config + libraryDependencies ++= zio ++ config ) lazy val domain = project @@ -145,7 +142,7 @@ lazy val deploy = project // disabling scalaDoc fixes it (it's needed because stage wants to generate scalaDoc) packageDoc / publishArtifact := false, Compile / mainClass := Some("com.mwam.kafkakewl.deploy.Main"), - libraryDependencies ++= tapir ++ tapirCore ++ config ++ zio ++ tests ++ logging, + libraryDependencies ++= tapir ++ tapirCore ++ config ++ zio ++ tests, testFrameworks := Seq(new TestFramework("zio.test.sbt.ZTestFramework")) ) @@ -162,6 +159,6 @@ lazy val metrics = project // disabling scalaDoc fixes it (it's needed because stage wants to generate scalaDoc) packageDoc / publishArtifact := false, Compile / mainClass := Some("com.mwam.kafkakewl.metrics.Main"), - libraryDependencies ++= tapir ++ tapirCore ++ config ++ zio ++ tests ++ logging, + libraryDependencies ++= tapir ++ tapirCore ++ config ++ zio ++ tests, testFrameworks := Seq(new TestFramework("zio.test.sbt.ZTestFramework")) ) diff --git a/kafkakewl-deploy/src/main/resources/logback.xml b/kafkakewl-deploy/src/main/resources/logback.xml deleted file mode 100644 index 9245a74..0000000 --- a/kafkakewl-deploy/src/main/resources/logback.xml +++ /dev/null @@ -1,113 +0,0 @@ - - - - - - - - - - - - - - System.out - - %date{ISO8601} [%X{userName}] [%X{correlationId}] [%X{dryRun}] [%X{command}] [%X{topologyId}] [%-5level] %logger{0}: %msg %ex%n - - - - - - - - yyyy-MM-dd'T'HH:mm:ss.SSS - UTC - true - kafkakewl-deploy - ${MW_APP_TAG_VERSION:-0.0.0} - ${LOGBACK_INSTANCE} - - - env - ${LOGBACK_ENV} - - - userName - %X{userName} - false - - - correlationId - %X{correlationId} - false - - - dryRun - %X{dryRun} - false - - - command - %X{command} - false - - - change - %X{change} - false - - - topologyId - %X{topologyId} - false - - - stacktrace - %ex - false - - - logger - %logger{0} - false - - - class - %class - false - - - package - %.-18logger - false - - - method - %method - false - - - - - - - - ${LOGBACK_LOCALFILE_ROOT}/${LOGBACK_LOCALFILE_NAME}.log - - ${LOGBACK_LOCALFILE_ROOT}/${LOGBACK_LOCALFILE_NAME}-%d{yyyy-MM-dd}.log - 7 - - - %date{ISO8601} [%X{userName}] [%X{correlationId}] [%X{dryRun}] [%X{command}] [%X{topologyId}] [%-5level] %logger{0}: %msg %ex%n - - - - - - - - - - - diff --git a/kafkakewl-deploy/src/main/scala/com/mwam/kafkakewl/deploy/Main.scala b/kafkakewl-deploy/src/main/scala/com/mwam/kafkakewl/deploy/Main.scala index 5a4c8de..8d7cf9f 100644 --- a/kafkakewl-deploy/src/main/scala/com/mwam/kafkakewl/deploy/Main.scala +++ b/kafkakewl-deploy/src/main/scala/com/mwam/kafkakewl/deploy/Main.scala @@ -14,30 +14,27 @@ import com.mwam.kafkakewl.deploy.services.{TopologyDeploymentsService, TopologyD import com.mwam.kafkakewl.domain.config.KafkaClientConfig import sttp.tapir.server.metrics.zio.ZioMetrics import sttp.tapir.server.ziohttp.{ZioHttpInterpreter, ZioHttpServerOptions} -import zio.* import zio.http.Server -import zio.logging.LogFormat -import zio.logging.backend.SLF4J import zio.metrics.connectors.prometheus import zio.metrics.jvm.DefaultJvmMetrics import zio.telemetry.opentelemetry.context.ContextStorage import zio.telemetry.opentelemetry.tracing.Tracing +import com.mwam.kafkakewl.utils.logging.Logging.{deployLogger, localLogger} +import zio._ object Main extends ZIOAppDefault { override val bootstrap: ZLayer[ZIOAppArgs, Any, Any] = - Runtime.removeDefaultLoggers >>> - // TODO play with this more so that we support structured logging properly - SLF4J.slf4j( - LogFormat.make { - // For now we just append the message and the cause to the output (which will be the sl4fj message) - (builder, _, _, _, message, cause, _, _, _) => - { - builder.appendText(message()) - builder.appendCause(cause) - } - } - ) + ZLayer.fromZIO { + ZIOAppArgs.getArgs + .map(args => + if (args.mkString == "local") { + localLogger + } else { + deployLogger + } + ) + }.flatten override def run: ZIO[ZIOAppArgs with Scope, Any, Any] = { val options: ZioHttpServerOptions[Any] = ZioHttpServerOptions.customiseInterceptors diff --git a/kafkakewl-metrics/src/main/resources/logback.xml b/kafkakewl-metrics/src/main/resources/logback.xml deleted file mode 100644 index e66d3d3..0000000 --- a/kafkakewl-metrics/src/main/resources/logback.xml +++ /dev/null @@ -1,113 +0,0 @@ - - - - - - - - - - - - - - System.out - - %date{ISO8601} [%X{userName}] [%X{correlationId}] [%X{dryRun}] [%X{command}] [%X{topologyId}] [%-5level] %logger{0}: %msg %ex%n - - - - - - - - yyyy-MM-dd'T'HH:mm:ss.SSS - UTC - true - kafkakewl-metrics - ${MW_APP_TAG_VERSION:-0.0.0} - ${LOGBACK_INSTANCE} - - - env - ${LOGBACK_ENV} - - - userName - %X{userName} - false - - - correlationId - %X{correlationId} - false - - - dryRun - %X{dryRun} - false - - - command - %X{command} - false - - - change - %X{change} - false - - - topologyId - %X{topologyId} - false - - - stacktrace - %ex - false - - - logger - %logger{0} - false - - - class - %class - false - - - package - %.-18logger - false - - - method - %method - false - - - - - - - - ${LOGBACK_LOCALFILE_ROOT}/${LOGBACK_LOCALFILE_NAME}.log - - ${LOGBACK_LOCALFILE_ROOT}/${LOGBACK_LOCALFILE_NAME}-%d{yyyy-MM-dd}.log - 7 - - - %date{ISO8601} [%X{userName}] [%X{correlationId}] [%X{dryRun}] [%X{command}] [%X{topologyId}] [%-5level] %logger{0}: %msg %ex%n - - - - - - - - - - - diff --git a/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/Main.scala b/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/Main.scala index 65ae3bf..f744421 100644 --- a/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/Main.scala +++ b/kafkakewl-metrics/src/main/scala/com/mwam/kafkakewl/metrics/Main.scala @@ -12,13 +12,12 @@ import com.mwam.kafkakewl.common.telemetry.GlobalTracer import com.mwam.kafkakewl.domain.config.KafkaClientConfig import com.mwam.kafkakewl.metrics.endpoints.* import com.mwam.kafkakewl.metrics.services.* +import com.mwam.kafkakewl.utils.logging.Logging.{deployLogger, localLogger} import io.opentelemetry.api.trace.Tracer import sttp.tapir.server.metrics.zio.ZioMetrics import sttp.tapir.server.ziohttp.{ZioHttpInterpreter, ZioHttpServerOptions} import zio.* import zio.http.Server -import zio.logging.LogFormat -import zio.logging.backend.SLF4J import zio.metrics.connectors.prometheus import zio.metrics.jvm.DefaultJvmMetrics import zio.telemetry.opentelemetry.context.ContextStorage @@ -27,18 +26,16 @@ import zio.telemetry.opentelemetry.tracing.Tracing object Main extends ZIOAppDefault { override val bootstrap: ZLayer[ZIOAppArgs, Any, Any] = - Runtime.removeDefaultLoggers >>> - // TODO play with this more so that we support structured logging properly - SLF4J.slf4j( - LogFormat.make { - // For now we just append the message and the cause to the output (which will be the sl4fj message) - (builder, _, _, _, message, cause, _, _, _) => - { - builder.appendText(message()) - builder.appendCause(cause) - } - } - ) + ZLayer.fromZIO { + ZIOAppArgs.getArgs + .map(args => + if (args.mkString == "local") { + localLogger + } else { + deployLogger + } + ) + }.flatten override def run: ZIO[ZIOAppArgs with Scope, Any, Any] = { val options: ZioHttpServerOptions[Any] = ZioHttpServerOptions.customiseInterceptors diff --git a/kafkakewl-utils/src/main/java/com/mwam/kafkakewl/utils/logging/JsonLayout.java b/kafkakewl-utils/src/main/java/com/mwam/kafkakewl/utils/logging/JsonLayout.java deleted file mode 100644 index d506c46..0000000 --- a/kafkakewl-utils/src/main/java/com/mwam/kafkakewl/utils/logging/JsonLayout.java +++ /dev/null @@ -1,287 +0,0 @@ -/* - * SPDX-FileCopyrightText: 2023 Marshall Wace - * - * SPDX-License-Identifier: Apache-2.0 - */ - -package com.mwam.kafkakewl.utils.logging; - -import ch.qos.logback.classic.PatternLayout; -import ch.qos.logback.classic.pattern.ThrowableHandlingConverter; -import ch.qos.logback.classic.pattern.ThrowableProxyConverter; -import ch.qos.logback.classic.spi.ILoggingEvent; -import ch.qos.logback.classic.spi.IThrowableProxy; -import ch.qos.logback.contrib.json.JsonLayoutBase; -import ch.qos.logback.core.Context; -import ch.qos.logback.core.pattern.PatternLayoutBase; - -import java.util.*; - -/** - * Custom JSON Layout for Logback - */ -public class JsonLayout extends JsonLayoutBase { - - private static final String TIMESTAMP_ATTR_NAME = "timestampUtc"; - private static final String LEVEL_ATTR_NAME = "level"; - private static final String THREAD_ATTR_NAME = "thread"; - private static final String LOGGER_ATTR_NAME = "logger"; - private static final String FORMATTED_MESSAGE_ATTR_NAME = "message"; - private static final String EXCEPTION_ATTR_NAME = "exception"; - private static final String VERSION_ATTR_NAME = "version"; - private static final String APPLICATION_ATTR_NAME = "application"; - private static final String INSTANCE_ATTR_NAME = "instance"; - private static final String HOSTNAME_ATTR_NAME = "hostname"; - private static final String USERNAME_ATTR_NAME = "username"; - - // used to format exceptions - private ThrowableHandlingConverter throwableProxyConverter; - - // static application informations to set in the application's code - private static String APPLICATION_VALUE = null; - private static String VERSION_VALUE = null; - private static String INSTANCE_VALUE = null; - - // application informations set via logback.xml (will take precedence over the static values above) - private Fields metadata; - private Fields custom; - private String application; - private String version; - private String instance; - - // a field definition in logback.xml - public static class Field { - private String name; - private String value; - private boolean allowEmpty; - - private PatternLayoutBase layout; - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public String getValue() { - return value; - } - - public void setValue(String value) { - this.value = value; - } - - public boolean getAllowEmpty() { - return this.allowEmpty; - } - - public void setAllowEmpty(boolean value) { - this.allowEmpty = value; - } - - public void startLayout(Context context) { - this.layout = new PatternLayout(); - this.layout.setContext(context); - this.layout.setPattern(getValue()); - this.layout.setPostCompileProcessor(null); - this.layout.start(); - } - - public void stopLayout() { - this.layout.stop(); - } - - public String doLayout(ILoggingEvent event) { - return this.layout.doLayout(event); - } - } - - - // a fields list definition in logback.xml - public static class Fields { - private List fields; - private String fieldName = "metadata"; - - public Fields() { - this.fields = new ArrayList<>(); - } - - public List getFields() { - return fields; - } - - public void addField(Field field) { - fields.add(field); - } - - public String getFieldName() { - return fieldName; - } - - public void setFieldName(String fieldName) { - this.fieldName = fieldName; - } - - public void startLayout(Context context) { - for (Field field: fields) { - field.startLayout(context); - } - } - - public void stopLayout() { - for (Field field: fields) { - field.stopLayout(); - } - } - } - - // set static information about the application. To use in the application's code. Logback - // will use these values unless they are set in logback.xml (which will take priority). - public static void setApplication(String application, String instance, String version) { - APPLICATION_VALUE = application; - VERSION_VALUE = version; - INSTANCE_VALUE = instance; - } - - // constructor - public JsonLayout() { - super(); - this.throwableProxyConverter = new ThrowableProxyConverter(); - } - - // Setter for parameter in logback.xml - public void setMetadata(Fields metadata) { - this.metadata = metadata; - this.custom.setFieldName("metadata"); - } - - // Setter for parameter in logback.xml - public void setCustomFields(Fields custom) { - this.custom = custom; - this.custom.setFieldName("customFields"); - } - - // Setter for parameter in logback.xml - public void setApplication(String application) { - this.application = application; - } - - // Setter for parameter in logback.xml - public void setVersion(String version) { - this.version = version; - } - - // Setter for parameter in logback.xml - public void setInstance(String instance) { - this.instance = instance; - } - - // support for exception formatting - @Override - public void start() { - this.throwableProxyConverter.start(); - this.custom.startLayout(super.getContext()); - super.start(); - } - - // support for exception formatting - @Override - public void stop() { - super.stop(); - this.custom.stopLayout(); - this.throwableProxyConverter.stop(); - } - - // the formatting logic - @Override - protected Map toJsonMap(ILoggingEvent event) { - Map map = new LinkedHashMap<>(); - if (event == null) return map; - - // timestamp formatted using the options of ch.qos.logback.contrib.json.classic.JsonLayout - String timeString = formatTimestamp(event.getTimeStamp()); - map.put(TIMESTAMP_ATTR_NAME, timeString); - - // use either instance values set via logback.xml or static values initialised with setApplication - if (application != null) - map.put(APPLICATION_ATTR_NAME, application); - else if (APPLICATION_VALUE != null) - map.put(APPLICATION_ATTR_NAME, APPLICATION_VALUE); - if (version != null) - map.put(VERSION_ATTR_NAME, version); - else if (VERSION_VALUE != null) - map.put(VERSION_ATTR_NAME, VERSION_VALUE); - if (instance != null) - map.put(INSTANCE_ATTR_NAME, instance); - else if (INSTANCE_VALUE != null) - map.put(INSTANCE_ATTR_NAME, INSTANCE_VALUE); - - // hostname and username - try { - map.put(HOSTNAME_ATTR_NAME, java.net.InetAddress.getLocalHost().getHostName()); - } catch (Exception ignore) { - } - map.put(USERNAME_ATTR_NAME, System.getProperty("user.name")); - - // metadata - try { - if (metadata != null) { - Map metaMap = new HashMap<>(); - for (Field f : metadata.getFields()) { - metaMap.put(f.getName(), f.getValue()); - } - map.put(metadata.getFieldName(), metaMap); - } - } catch (Exception ignore) { - } - - // custom fields - try { - if (custom != null) { - for (Field f : custom.getFields()) { - String fieldValue = f.doLayout(event); - if (!fieldValue.isEmpty() || f.getAllowEmpty()) { - map.put(f.getName(), fieldValue); - } - } - } - } catch (Exception ignore) { - } - - // standard logging fields - map.put(LEVEL_ATTR_NAME, String.valueOf(event.getLevel())); - map.put(THREAD_ATTR_NAME, event.getThreadName()); - map.put(LOGGER_ATTR_NAME, event.getLoggerName()); - map.put(FORMATTED_MESSAGE_ATTR_NAME, event.getFormattedMessage()); - - // exception - try { - IThrowableProxy throwableProxy = event.getThrowableProxy(); - if (throwableProxy != null) { - String ex = throwableProxyConverter.convert(event); - if (ex != null && !ex.equals("")) { - map.put(EXCEPTION_ATTR_NAME, ex); - } - } - } catch (Exception ignore) { - } - - // extra key/values passed as AbstractMap.SimpleEntry. - // They may have been included with {} in the formatted message, or not. - try { - if (event.getArgumentArray() != null) { - for (Object o : event.getArgumentArray()) { - if (o instanceof Map.Entry) { - Map.Entry e = (Map.Entry) o; - map.put(e.getKey().toString(), e.getValue()); - } - } - } - } catch (Exception ignore) { - } - - return map; - } -} \ No newline at end of file diff --git a/kafkakewl-utils/src/main/scala/com/mwam/kafkakewl/utils/logging/LogFilter.scala b/kafkakewl-utils/src/main/scala/com/mwam/kafkakewl/utils/logging/LogFilter.scala deleted file mode 100644 index 98b6120..0000000 --- a/kafkakewl-utils/src/main/scala/com/mwam/kafkakewl/utils/logging/LogFilter.scala +++ /dev/null @@ -1,39 +0,0 @@ -/* - * SPDX-FileCopyrightText: 2023 Marshall Wace - * - * SPDX-License-Identifier: Apache-2.0 - */ - -package com.mwam.kafkakewl.utils.logging - -import ch.qos.logback.classic.turbo.TurboFilter -import ch.qos.logback.classic.{Level, Logger} -import ch.qos.logback.core.spi.FilterReply -import org.slf4j.Marker - -class LogFilter extends TurboFilter { - override def decide(marker: Marker, logger: Logger, level: Level, format: String, params: Array[AnyRef], t: Throwable): FilterReply = { - (level, logger.getName) match { - case (Level.WARN, "org.apache.kafka.clients.consumer.internals.ConsumerCoordinator") if format.contains("Offset commit failed on partition") => - // offset commit failed can be info - logger.info(marker, format, params) - FilterReply.DENY - case (Level.WARN, "org.apache.kafka.clients.admin.AdminClientConfig" | "org.apache.kafka.clients.consumer.ConsumerConfig") - if format.contains("supplied but isn't a known config.") => - // "The configuration 'sasl.kerberos.kinit.cmd' was supplied but isn't a known config." can be info - logger.info(marker, format, params) - FilterReply.DENY - case (Level.WARN, "org.apache.kafka.common.security.kerberos.KerberosLogin") - if format.contains("TGT renewal thread has been interrupted and will exit") => - // TGT renewal can be info - logger.info(marker, format, params) - FilterReply.DENY - case (Level.ERROR, "org.apache.curator.ConnectionState") if format.contains("Authentication failed") => - // this curator error can be warning, because we can still talk to ZK un-authenticated - logger.warn(marker, format, params) - FilterReply.DENY - case _ => - FilterReply.NEUTRAL - } - } -} diff --git a/kafkakewl-utils/src/main/scala/com/mwam/kafkakewl/utils/logging/Logging.scala b/kafkakewl-utils/src/main/scala/com/mwam/kafkakewl/utils/logging/Logging.scala new file mode 100644 index 0000000..67f70fb --- /dev/null +++ b/kafkakewl-utils/src/main/scala/com/mwam/kafkakewl/utils/logging/Logging.scala @@ -0,0 +1,41 @@ +/* + * SPDX-FileCopyrightText: 2023 Marshall Wace + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.mwam.kafkakewl.utils.logging + +import zio.{Config, Runtime, ZLayer} +import zio.logging.slf4j.bridge.Slf4jBridge +import zio.logging.{ConsoleLoggerConfig, FileLoggerConfig, LogFilter, LogFormat} + +import java.nio.file.Paths + +object Logging { + + private def format = LogFormat.default + + private def filter = + LogFilter.acceptAll + + private def fileLogger = + zio.logging.fileJsonLogger( + new FileLoggerConfig( + Paths.get("logs/kafkakewl-deploy.log"), + format, + filter, + rollingPolicy = Some(FileLoggerConfig.FileRollingPolicy.TimeBasedRollingPolicy) + ) + ) + + private def consoleLogger = + zio.logging.consoleLogger(ConsoleLoggerConfig(format, filter)) + + private def consoleJsonLogger = zio.logging.consoleJsonLogger() + + def localLogger: ZLayer[Any, Config.Error, Unit] = Runtime.removeDefaultLoggers >>> (fileLogger ++ consoleLogger) >+> Slf4jBridge.initialize + + def deployLogger: ZLayer[Any, Config.Error, Unit] = Runtime.removeDefaultLoggers >>> consoleJsonLogger >+> Slf4jBridge.initialize + +} From c130da7158005dccaae50c41aae380ad78abad64 Mon Sep 17 00:00:00 2001 From: Jordan Hall Date: Tue, 5 Dec 2023 11:40:57 +0000 Subject: [PATCH 2/3] Added log map for demoting certain log messages Signed-off-by: Jordan Hall --- .../kafkakewl/utils/logging/Logging.scala | 35 +++++++++++++++++-- 1 file changed, 33 insertions(+), 2 deletions(-) diff --git a/kafkakewl-utils/src/main/scala/com/mwam/kafkakewl/utils/logging/Logging.scala b/kafkakewl-utils/src/main/scala/com/mwam/kafkakewl/utils/logging/Logging.scala index 67f70fb..e69f4cf 100644 --- a/kafkakewl-utils/src/main/scala/com/mwam/kafkakewl/utils/logging/Logging.scala +++ b/kafkakewl-utils/src/main/scala/com/mwam/kafkakewl/utils/logging/Logging.scala @@ -6,15 +6,46 @@ package com.mwam.kafkakewl.utils.logging +import zio.* +import zio.logging.LogFormat.{quoted, space, *} import zio.{Config, Runtime, ZLayer} import zio.logging.slf4j.bridge.Slf4jBridge -import zio.logging.{ConsoleLoggerConfig, FileLoggerConfig, LogFilter, LogFormat} +import zio.logging.{ConsoleLoggerConfig, FileLoggerConfig, LogColor, LogFilter, LogFormat, LoggerNameExtractor} import java.nio.file.Paths object Logging { - private def format = LogFormat.default + private def levelMapper: LogFormat = + LogFormat.make { (builder, trace, _, logLevel, message, _, fibreRefs, _, annotations) => + { + val loggerName = LoggerNameExtractor.loggerNameAnnotationOrTrace(trace, fibreRefs, annotations).getOrElse("zio-logger") + + val newLogLevel = (logLevel, loggerName) match { + case (LogLevel.Warning, "org.apache.kafka.clients.consumer.internals.ConsumerCoordinator") + if message().contains("Offset commit failed on partition") => + LogLevel.Info + case (LogLevel.Warning, "org.apache.kafka.clients.admin.AdminClientConfig" | "org.apache.kafka.clients.consumer.ConsumerConfig") + if message().contains("supplied but isn't a known config.") => + LogLevel.Info + case (LogLevel.Warning, "org.apache.kafka.common.security.kerberos.KerberosLogin") + if message().contains("TGT renewal thread has been interrupted and will exit") => + LogLevel.Info + case (LogLevel.Error, "org.apache.curator.ConnectionState") if message().contains("Authentication failed") => + LogLevel.Warning + case _ => logLevel + } + + builder.appendText(newLogLevel.label) + + } + } + + private def format = label("timestamp", timestamp.fixed(32)).color(LogColor.BLUE) |-| + label("level", levelMapper).highlight |-| + label("thread", fiberId).color(LogColor.WHITE) |-| + label("message", quoted(line)).highlight + + (space + label("cause", cause).highlight).filter(LogFilter.causeNonEmpty) |-| allAnnotations private def filter = LogFilter.acceptAll From 1970a0aca7a834bd81695d1375eeff27159b9fc3 Mon Sep 17 00:00:00 2001 From: Jordan Lloyd Hall <135843610+j-hall-mwam@users.noreply.github.com> Date: Mon, 11 Dec 2023 14:12:00 +0000 Subject: [PATCH 3/3] Apply suggestions from code review Co-authored-by: Mohsin Niazi Signed-off-by: Jordan Lloyd Hall <135843610+j-hall-mwam@users.noreply.github.com> --- .../src/main/scala/com/mwam/kafkakewl/deploy/Main.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafkakewl-deploy/src/main/scala/com/mwam/kafkakewl/deploy/Main.scala b/kafkakewl-deploy/src/main/scala/com/mwam/kafkakewl/deploy/Main.scala index 8d7cf9f..6c9a548 100644 --- a/kafkakewl-deploy/src/main/scala/com/mwam/kafkakewl/deploy/Main.scala +++ b/kafkakewl-deploy/src/main/scala/com/mwam/kafkakewl/deploy/Main.scala @@ -20,7 +20,7 @@ import zio.metrics.jvm.DefaultJvmMetrics import zio.telemetry.opentelemetry.context.ContextStorage import zio.telemetry.opentelemetry.tracing.Tracing import com.mwam.kafkakewl.utils.logging.Logging.{deployLogger, localLogger} -import zio._ +import zio.* object Main extends ZIOAppDefault {