Skip to content

Commit

Permalink
Merge pull request #22 from facebookincubator/multiservice_endpoints
Browse files Browse the repository at this point in the history
Allow for multiple services per endoint
  • Loading branch information
hunterjackson authored Jan 18, 2024
2 parents c9f66d9 + 107f90c commit a9ad255
Show file tree
Hide file tree
Showing 18 changed files with 670 additions and 249 deletions.
40 changes: 33 additions & 7 deletions src/main/java/com/meta/cp4m/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@
import com.meta.cp4m.llm.LLMPlugin;
import com.meta.cp4m.message.Message;
import com.meta.cp4m.message.MessageHandler;
import com.meta.cp4m.message.RequestProcessor;
import com.meta.cp4m.message.ThreadState;
import com.meta.cp4m.routing.Route;
import com.meta.cp4m.store.ChatStore;
import io.javalin.Javalin;
import io.javalin.http.Context;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.*;
Expand All @@ -40,19 +42,25 @@ public Service(
this.path = path;
}

void handle(Context ctx) {
List<T> messages = handler.processRequest(ctx);
<IN> void handler(Context ctx, IN in, RequestProcessor<IN, T> processor) {
List<T> messages = null;
try {
messages = processor.process(ctx, in);
} catch (Exception e) {
LOGGER
.atError()
.addKeyValue("body", ctx.body())
.addKeyValue("headers", ctx.headerMap())
.setMessage("unable to process request")
.log();
}
// TODO: once we have a non-volatile store, on startup send stored but not replied to messages
for (T m : messages) {
ThreadState<T> thread = store.add(m);
executorService.submit(() -> execute(thread));
}
}

public void register(Javalin app) {
handler.handlers().forEach(m -> app.addHandler(m, path, this::handle));
}

public String path() {
return path;
}
Expand All @@ -79,4 +87,22 @@ private void execute(ThreadState<T> thread) {
LOGGER.error("an error occurred while attempting to respond", e);
}
}

private <E> Route<E> toRoute(MessageHandler.RouteDetails<E, T> routeDetails) {
return new Route<>(
path,
routeDetails.handlerType(),
routeDetails.acceptor(),
(ctx, in) -> handler(ctx, in, routeDetails.requestProcessor()));
}

List<Route<?>> routes() {
List<MessageHandler.RouteDetails<?, T>> routeDetails = handler.routeDetails();
List<Route<?>> routes = new ArrayList<>(routeDetails.size());
for (MessageHandler.RouteDetails<?, T> routeDetail : routeDetails) {
Route<?> route = toRoute(routeDetail);
routes.add(route);
}
return routes;
}
}
72 changes: 64 additions & 8 deletions src/main/java/com/meta/cp4m/ServicesRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,21 @@
package com.meta.cp4m;

import com.google.common.base.Preconditions;
import com.meta.cp4m.routing.Route;
import io.javalin.Javalin;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import io.javalin.http.BadRequestResponse;
import io.javalin.http.Context;
import io.javalin.http.HandlerType;
import java.util.*;
import org.checkerframework.common.returnsreceiver.qual.This;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ServicesRunner implements AutoCloseable {

private static final Logger LOGGER = LoggerFactory.getLogger(ServicesRunner.class);
private final Javalin app = Javalin.create();
private final Set<Service<?>> services = new HashSet<>();
private final Set<Service<?>> services = new LinkedHashSet<>();
private boolean started = false;
private int port = 8080;

Expand All @@ -28,7 +33,59 @@ public static ServicesRunner newInstance() {
return new ServicesRunner();
}

private <T> boolean didAcceptAndHandle(Context ctx, Route<T> route) {
Optional<T> acceptorOutput = route.acceptor().accept(ctx);
if (acceptorOutput.isPresent()) {
try {
route.handler().handle(ctx, acceptorOutput.get());
} catch (Exception e) {
throw new BadRequestResponse("Unable to process request");
}
return true;
}
return false;
}

/**
* Find the first route that will accept this payload and then handle the payload
*
* @param ctx context from Javalin
* @param routes the routes to check for acceptability and process if accepted
*/
private void routeSelectorAndHandler(Context ctx, List<Route<?>> routes) {
for (Route<?> route : routes) {
if (didAcceptAndHandle(ctx, route)) {
return;
}
}
LOGGER
.atError()
.setMessage("Unable to handle incoming webhook")
.addKeyValue("body", ctx.body())
.addKeyValue("headers", ctx.headerMap())
.log();
throw new BadRequestResponse("unable to handle webhook");
}

public @This ServicesRunner start() {
record RouteGroup(String path, HandlerType handlerType) {}
Map<RouteGroup, List<Route<?>>> routeGroups = new HashMap<>();
for (Service<?> s : services) { // this is not a stream because order matters here
s.routes()
.forEach(
r ->
routeGroups
.computeIfAbsent(
new RouteGroup(r.path(), r.handlerType()), k -> new ArrayList<>())
.add(r));
}
routeGroups.forEach(
(routeGroup, routes) ->
app.addHandler(
routeGroup.handlerType(),
routeGroup.path(),
ctx -> this.routeSelectorAndHandler(ctx, routes)));

if (!started) {
started = true;
app.start(port);
Expand All @@ -38,9 +95,8 @@ public static ServicesRunner newInstance() {

public @This ServicesRunner service(Service<?> service) {
Preconditions.checkState(!started, "cannot add service, server already started");
if (services.add(service)) {
service.register(app);
}

services.add(service);
return this;
}

Expand Down
89 changes: 31 additions & 58 deletions src/main/java/com/meta/cp4m/message/FBMessageHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,66 +91,13 @@ public FBMessageHandler(String verifyToken, String pageAccessToken, String appSe
: null;
}

@Override
public List<FBMessage> processRequest(Context ctx) {
try {
switch (ctx.handlerType()) {
case GET -> {
return getHandler(ctx);
}
case POST -> {
return postHandler(ctx);
}
}
} catch (JsonProcessingException | NullPointerException e) {
LOGGER
.atWarn()
.setMessage("Unable to parse message from Meta webhook")
.setCause(e)
.addKeyValue("body", ctx.body())
.addKeyValue("headers", ctx.headerMap())
.log();
throw new BadRequestResponse("Invalid body");
} catch (RuntimeException e) {
LOGGER.error(e.getMessage(), e);
throw e;
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
throw new RuntimeException(e);
}
throw new UnsupportedOperationException("Only accepting get and post methods");
}

private List<FBMessage> getHandler(Context ctx) {
MetaHandlerUtils.subscriptionVerification(ctx, verifyToken);
LOGGER.debug("Meta verified callback url successfully");
return Collections.emptyList();
}

@TestOnly
String hmac(String body) {
// TODO: refactor test so we don't need this
return MetaHandlerUtils.hmac(body, appSecret);
}

private List<FBMessage> postHandler(Context ctx) throws JsonProcessingException {
MetaHandlerUtils.postHeaderValidator(ctx, appSecret);

String bodyString = ctx.body();
JsonNode body = MAPPER.readTree(bodyString);
String object = body.get("object").textValue();
if (!object.equals("page") && !object.equals("instagram")) {
LOGGER
.atWarn()
.setMessage(
"received body with value of "
+ object
+ " for 'object', expected 'page' or 'instagram'")
.addKeyValue("body", bodyString)
.log();
return Collections.emptyList();
}
// TODO: need better validation
private List<FBMessage> postHandler(Context ctx, JsonNode body) {
JsonNode entries = body.get("entry");
ArrayList<FBMessage> output = new ArrayList<>();
for (JsonNode entry : entries) {
Expand Down Expand Up @@ -191,15 +138,15 @@ private List<FBMessage> postHandler(Context ctx) throws JsonProcessingException
LOGGER
.atWarn()
.setMessage("received message without text, unable to handle this")
.addKeyValue("body", bodyString)
.addKeyValue("body", body)
.log();
}
} else {
LOGGER
.atWarn()
.setMessage(
"received a message without a 'message' key, unable to handle this message type")
.addKeyValue("body", bodyString)
.addKeyValue("body", body)
.log();
}
}
Expand Down Expand Up @@ -263,7 +210,33 @@ private void send(String message, Identifier recipient, Identifier sender) throw
}

@Override
public Collection<HandlerType> handlers() {
return List.of(HandlerType.GET, HandlerType.POST);
public List<RouteDetails<?, FBMessage>> routeDetails() {
RouteDetails<JsonNode, FBMessage> postDetails =
new RouteDetails<>(
HandlerType.POST,
ctx -> {
@Nullable String contentType = ctx.contentType();
if (contentType != null
&& ContentType.parse(contentType).isSameMimeType(ContentType.APPLICATION_JSON)
&& MetaHandlerUtils.postHeaderValid(ctx, appSecret)) {
JsonNode body;
try {
body = MAPPER.readTree(ctx.body());
} catch (JsonProcessingException e) {
throw new BadRequestResponse("unable to parse body");
}
// TODO: need better validation
String expectedObjectValue =
connectedFacebookPageForInstagram == null ? "page" : "instagram";
@Nullable JsonNode objectNode = body.get("object");
if (objectNode != null && objectNode.textValue().equals(expectedObjectValue)) {
return Optional.of(body);
}
}
return Optional.empty();
},
this::postHandler);

return List.of(MetaHandlerUtils.subscriptionVerificationRouteDetails(verifyToken), postDetails);
}
}
18 changes: 4 additions & 14 deletions src/main/java/com/meta/cp4m/message/MessageHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,14 @@

package com.meta.cp4m.message;

import io.javalin.http.Context;
import com.meta.cp4m.routing.Acceptor;
import io.javalin.http.HandlerType;
import java.io.IOException;
import java.util.Collection;
import java.util.List;

public interface MessageHandler<T extends Message> {

/**
* Process incoming requests from the messaging service, including messages from the user.
*
* @param ctx the context corresponding to an incoming request
* @return return a {@link Message} object if appropriate
*/
List<T> processRequest(Context ctx);
record RouteDetails<IN, OUT extends Message>(
HandlerType handlerType, Acceptor<IN> acceptor, RequestProcessor<IN, OUT> requestProcessor) {}

/**
* The method needed to respond to a message from a user
Expand All @@ -31,8 +24,5 @@ public interface MessageHandler<T extends Message> {
*/
void respond(T message) throws IOException;

/**
* @return The different {@link HandlerType}s that this handler expects to receive
*/
Collection<HandlerType> handlers();
List<RouteDetails<?, T>> routeDetails();
}
40 changes: 40 additions & 0 deletions src/main/java/com/meta/cp4m/message/MetaHandlerUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,17 @@

import io.javalin.http.Context;
import io.javalin.http.ForbiddenResponse;
import io.javalin.http.HandlerType;
import java.nio.charset.StandardCharsets;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import org.apache.hc.client5.http.utils.Hex;
import org.checkerframework.checker.nullness.qual.Nullable;

class MetaHandlerUtils {
static void subscriptionVerification(Context ctx, String verifyToken) {
Expand All @@ -27,6 +32,26 @@ static void subscriptionVerification(Context ctx, String verifyToken) {
ctx.result(String.valueOf(challenge));
}

static <T extends Message>
MessageHandler.RouteDetails<Integer, T> subscriptionVerificationRouteDetails(
String verifyToken) {
return new MessageHandler.RouteDetails<>(
HandlerType.GET,
ctx ->
// validateSubscription handles putting challenge into context response if it succeeds
{
if (Objects.equals(ctx.queryParam("hub.mode"), "subscribe")
&& Objects.equals(ctx.queryParam("hub.verify_token"), verifyToken)) {
return Optional.of(ctx.queryParamAsClass("hub.challenge", Integer.class).get());
}
return Optional.empty();
},
(ctx, challenge) -> {
ctx.result(String.valueOf(challenge));
return List.of();
});
}

static String hmac(String body, String appSecret) {
Mac sha256HMAC;
SecretKeySpec secretKey;
Expand Down Expand Up @@ -65,4 +90,19 @@ static void postHeaderValidator(Context ctx, String appSecret) {
"X-Hub-Signature-256 could not be validated")
.getOrThrow(ignored -> new ForbiddenResponse("X-Hub-Signature-256 could not be validated"));
}

static boolean postHeaderValid(Context ctx, String appSecret) {
@Nullable String sig = ctx.headerMap().get("X-Hub-Signature-256");
if (sig == null) {
return false;
}

String[] hashParts = sig.strip().split("=");
if (hashParts.length != 2) {
return false;
}

String calculatedHmac = hmac(ctx.body(), appSecret);
return hashParts[1].equals(calculatedHmac);
}
}
Loading

0 comments on commit a9ad255

Please sign in to comment.