Skip to content

Commit

Permalink
Merge pull request #60 from facebookincubator/hunter/logging_configur…
Browse files Browse the repository at this point in the history
…able

Make logging configurable and add debug logging to s3 preprocessor
  • Loading branch information
hunterjackson authored Jul 16, 2024
2 parents 9fa2174 + c96e9b0 commit 217f2d1
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 53 deletions.
48 changes: 24 additions & 24 deletions src/main/java/com/meta/cp4m/S3PreProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import com.meta.cp4m.message.ThreadState;
import java.time.Instant;
import java.util.Objects;

import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -36,11 +35,11 @@ public class S3PreProcessor<T extends Message> implements PreProcessor<T> {
private final AwsCredentialsProvider credentials;

public S3PreProcessor(
String awsAccessKeyID,
String awsSecretAccessKey,
String region,
String bucket,
@Nullable String textMessageAddition) {
@Nullable String awsAccessKeyID,
@Nullable String awsSecretAccessKey,
String region,
String bucket,
@Nullable String textMessageAddition) {
this.awsAccessKeyID = awsAccessKeyID;
this.awsSecretAccessKey = awsSecretAccessKey;
this.region = region;
Expand All @@ -50,57 +49,58 @@ public S3PreProcessor(
@Nullable StaticCredentialsProvider staticCredentials;
if (!this.awsAccessKeyID.isEmpty() && !this.awsSecretAccessKey.isEmpty()) {
AwsSessionCredentials sessionCredentials =
AwsSessionCredentials.create(this.awsAccessKeyID, this.awsSecretAccessKey, "");
AwsSessionCredentials.create(this.awsAccessKeyID, this.awsSecretAccessKey, "");
staticCredentials = StaticCredentialsProvider.create(sessionCredentials);
} else {
staticCredentials = null;
}

this.credentials = Objects.requireNonNullElse(staticCredentials, DefaultCredentialsProvider.create());
this.credentials =
Objects.requireNonNullElse(staticCredentials, DefaultCredentialsProvider.create());
}

@Override
public ThreadState<T> run(ThreadState<T> in) {

switch (in.tail().payload()) {
case Payload.Image i -> {
this.sendRequest(i.value(), in.userId().toString(), i.extension());
LOGGER.atDebug().addKeyValue("payload", i).log("Received image payload");
this.sendRequest(i.value(), in.userId().toString(), i.extension(), i.mimeType());
}
case Payload.Document i -> {
this.sendRequest(i.value(), in.userId().toString(), i.extension());
LOGGER.atDebug().addKeyValue("payload", i).log("Received document payload");
this.sendRequest(i.value(), in.userId().toString(), i.extension(), i.mimeType());
}
default -> {
LOGGER.debug("Received text payload");
return in;
}
}

return textMessageAddition == null
? in
: in.with(
in.newMessageFromUser(
Instant.now(),
textMessageAddition,
Identifier.random())); // TODO: remove last message
: in.with(in.newMessageFromUser(Instant.now(), textMessageAddition, Identifier.random()));
}

public void sendRequest(byte[] media, String senderID, String extension) {
public void sendRequest(byte[] media, String senderID, String extension, String mimeType) {
String key = senderID + '_' + Instant.now().toEpochMilli() + '.' + extension;
LOGGER.debug("attempting to upload \"" + key + "\" file to AWS S3");
try (S3Client s3Client =
S3Client.builder()
.region(Region.of(this.region))
.credentialsProvider(this.credentials)
.build()) {

PutObjectRequest request =
PutObjectRequest.builder()
.bucket(this.bucket)
.key(key)
.contentType("application/" + extension)
.build();
s3Client.putObject(request, RequestBody.fromBytes(media));
PutObjectRequest.builder().bucket(this.bucket).key(key).contentType(mimeType).build();
PutObjectResponse response = s3Client.putObject(request, RequestBody.fromBytes(media));
LOGGER
.atDebug()
.addKeyValue("response", response)
.addKeyValue("file", key)
.log("AWS S3 response received on successful upload");
LOGGER.info("Media upload to AWS S3 successful");
} catch (Exception e) {
LOGGER.warn("Media upload to AWS S3 failed, {e}", e);
LOGGER.error("Media upload to AWS S3 failed, {e}", e);
}
}
}
4 changes: 2 additions & 2 deletions src/main/java/com/meta/cp4m/S3PreProcessorConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

public record S3PreProcessorConfig(
String name,
@Nullable String awsAccessKeyId,
@Nullable String awsSecretAccessKey,
String awsAccessKeyId,
String awsSecretAccessKey,
String region,
String bucket,
@Nullable String textMessageAddition)
Expand Down
28 changes: 10 additions & 18 deletions src/main/java/com/meta/cp4m/ServicesRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ public class ServicesRunner implements AutoCloseable {
private final Javalin app = Javalin.create();
private final Set<Service<?>> services = new LinkedHashSet<>();

private boolean logWebhooks = false;
private String heartbeatPath = "/heartbeat";
private boolean started = false;
private int port = 8080;
Expand Down Expand Up @@ -75,17 +74,16 @@ private void routeSelectorAndHandler(Context ctx, List<Route<?>> routes) {
return this;
}

if (logWebhooks) {
app.before(
ctx ->
LOGGER
.atInfo()
.addKeyValue("headers", ctx.headerMap())
.addKeyValue("body", ctx.body())
.addKeyValue("path", ctx.path())
.addKeyValue("request_type", ctx.handlerType())
.log("received webhook"));
}
app.before(
ctx ->
LOGGER
.atDebug()
.addKeyValue("headers", ctx.headerMap())
.addKeyValue("body", ctx.body())
.addKeyValue("path", ctx.path())
.addKeyValue("request_method", ctx.method())
.log("received webhook"));

app.addHttpHandler(HandlerType.GET, heartbeatPath, ctx -> {});
record RouteGroup(String path, HandlerType handlerType) {}
Map<RouteGroup, List<Route<?>>> routeGroups = new HashMap<>();
Expand Down Expand Up @@ -152,10 +150,4 @@ public int port() {
public void close() {
app.stop();
}

public @This ServicesRunner logWebhooks(boolean logWebhooks) {
Preconditions.checkState(!started, "cannot adjust logging, server already started");
this.logWebhooks = logWebhooks;
return this;
}
}
28 changes: 20 additions & 8 deletions src/main/java/com/meta/cp4m/configuration/RootConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.core.config.Configuration;
import org.apache.logging.log4j.core.config.LoggerConfig;
import org.checkerframework.checker.nullness.qual.Nullable;

public class RootConfiguration {
Expand All @@ -34,7 +39,7 @@ public class RootConfiguration {

private final int port;
private final String heartbeatPath;
private final boolean logWebhooks;
private final Level logLevel;

@JsonCreator
RootConfiguration(
Expand All @@ -45,10 +50,17 @@ public class RootConfiguration {
@JsonProperty("services") Collection<ServiceConfiguration> services,
@JsonProperty("port") @Nullable Integer port,
@JsonProperty("heartbeat_path") @Nullable String heartbeatPath,
@JsonProperty("log_webhooks") @Nullable Boolean logWebhooks) {
@JsonProperty("log_level") @Nullable Level logLevel) {

LoggerContext ctx = (LoggerContext) LogManager.getContext(false);
Configuration config = ctx.getConfiguration();
LoggerConfig loggerConfig = config.getLoggerConfig(LogManager.ROOT_LOGGER_NAME);
loggerConfig.setLevel(logLevel);
ctx.updateLoggers();

this.port = port == null ? 8080 : port;
this.heartbeatPath = heartbeatPath == null ? "/heartbeat" : heartbeatPath;
this.logWebhooks = Objects.requireNonNullElse(logWebhooks, false);
this.logLevel = Objects.requireNonNullElse(logLevel, Level.INFO);
stores = stores == null ? Collections.emptyList() : stores;
preProcessors = preProcessors == null ? Collections.emptyList() : preProcessors;
Preconditions.checkArgument(
Expand Down Expand Up @@ -167,15 +179,15 @@ private <T extends Message> Service<T> createService(
}

public ServicesRunner toServicesRunner() {
ServicesRunner runner =
ServicesRunner.newInstance()
.port(port)
.heartbeatPath(heartbeatPath)
.logWebhooks(logWebhooks);
ServicesRunner runner = ServicesRunner.newInstance().port(port).heartbeatPath(heartbeatPath);
for (ServiceConfiguration service : services) {
MessageHandler<?> handler = handlers.get(service.handler()).toMessageHandler();
runner.service(createService(handler, service));
}
return runner;
}

public Level logLevel() {
return logLevel;
}
}
2 changes: 1 addition & 1 deletion src/main/resources/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

<Loggers>
<Root level="info">
<AppenderRef ref="console" level="info"/>
<AppenderRef ref="console"/>
</Root>
</Loggers>
</Configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,19 @@
import java.nio.file.Path;
import org.apache.hc.client5.http.fluent.Request;
import org.apache.hc.core5.net.URIBuilder;
import org.apache.logging.log4j.Level;
import org.checkerframework.common.returnsreceiver.qual.This;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.LoggerFactory;

class RootConfigurationTest {
private static final String TOML =
"""
port = 8081
log_level = "INFO"
[[plugins]]
name = "openai_test"
Expand Down Expand Up @@ -162,6 +165,9 @@ void valid(@TempDir Path dir) throws IOException {
Files.writeString(configFile, TOML);
RootConfiguration config =
ConfigurationUtils.tomlMapper().readValue(configFile.toFile(), RootConfiguration.class);
assertThat(config.logLevel()).isEqualTo(Level.INFO);
LoggerFactory.getLogger(RootConfigurationTest.class).atDebug().log("debug test");
LoggerFactory.getLogger(RootConfigurationTest.class).atInfo().log("info test");
assertThat(config.plugins())
.hasSize(1)
.allSatisfy(p -> assertThat(p).isInstanceOf(OpenAIConfig.class));
Expand Down

0 comments on commit 217f2d1

Please sign in to comment.