Skip to content

Commit

Permalink
Merge pull request #1072 from jpohlmeyer/fix-handle-mismatchedinputex…
Browse files Browse the repository at this point in the history
…ception

Fix MismatchedInputException in Ollama streaming
  • Loading branch information
geoand authored Nov 12, 2024
2 parents 29d197e + 5c3742b commit 3b82fb6
Showing 1 changed file with 34 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.ProcessingException;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.WebApplicationException;
import jakarta.ws.rs.client.ClientRequestContext;
Expand Down Expand Up @@ -85,40 +86,43 @@ class OllamaRestApiReaderInterceptor implements ReaderInterceptor {
public Object aroundReadFrom(ReaderInterceptorContext context) throws IOException, WebApplicationException {
try {
return context.proceed();
} catch (ClientWebApplicationException e) {
Throwable cause = e.getCause();
if ((cause instanceof JsonParseException) || (cause instanceof MismatchedInputException)) {
if (e.getResponse().getStatus() == 200) {
Object invokedMethod = context.getProperty("org.eclipse.microprofile.rest.client.invokedMethod");
if ((invokedMethod != null) && invokedMethod.toString().contains("OllamaRestApi.streamingChat")) {
InputStream is = context.getInputStream();
if (is instanceof ByteArrayInputStream bis) {
bis.reset();
String chunk = new String(bis.readAllBytes());
final var ctx = Vertx.currentContext();
if (ctx == null) {
throw e;
}
} catch (ClientWebApplicationException | ProcessingException e) {
// Depending on the Quarkus version MismatchedInputException could be wrapped in ProcessingException
// or in WebApplicationException with Status 400.
if ((e instanceof ProcessingException pe && pe.getCause() instanceof MismatchedInputException) ||
(e instanceof WebApplicationException wae
&& ((wae.getCause() instanceof JsonParseException && wae.getResponse().getStatus() == 200) ||
(wae.getCause() instanceof MismatchedInputException
&& wae.getResponse().getStatus() == 400)))) {
Object invokedMethod = context.getProperty("org.eclipse.microprofile.rest.client.invokedMethod");
if ((invokedMethod != null) && invokedMethod.toString().contains("OllamaRestApi.streamingChat")) {
InputStream is = context.getInputStream();
if (is instanceof ByteArrayInputStream bis) {
bis.reset();
String chunk = new String(bis.readAllBytes());
final var ctx = Vertx.currentContext();
if (ctx == null) {
throw e;
}

// This piece of code deals with is the case where a message from Ollama is not received as an entire line
// but in pieces (my guess is that it is a Vertx bug).
// There is nothing we can do in this case except for returning empty responses and in the meantime buffer the pieces
// by storing them in the Vertx Duplicated Context
String existingBuffer = ctx.getLocal("buffer");
if ((existingBuffer != null) && !existingBuffer.isEmpty()) {
if (chunk.endsWith("}")) {
ctx.putLocal("buffer", "");
String entireLine = existingBuffer + chunk;
return QuarkusJsonCodecFactory.SnakeCaseObjectMapperHolder.MAPPER.readValue(entireLine,
ChatResponse.class);
} else {
ctx.putLocal("buffer", existingBuffer + chunk);
return ChatResponse.emptyNotDone();
}
// This piece of code deals with is the case where a message from Ollama is not received as an entire line
// but in pieces (my guess is that it is a Vertx bug).
// There is nothing we can do in this case except for returning empty responses and in the meantime buffer the pieces
// by storing them in the Vertx Duplicated Context
String existingBuffer = ctx.getLocal("buffer");
if ((existingBuffer != null) && !existingBuffer.isEmpty()) {
if (chunk.endsWith("}")) {
ctx.putLocal("buffer", "");
String entireLine = existingBuffer + chunk;
return QuarkusJsonCodecFactory.SnakeCaseObjectMapperHolder.MAPPER.readValue(entireLine,
ChatResponse.class);
} else {
ctx.putLocal("buffer", chunk);
ctx.putLocal("buffer", existingBuffer + chunk);
return ChatResponse.emptyNotDone();
}
} else {
ctx.putLocal("buffer", chunk);
return ChatResponse.emptyNotDone();
}
}
}
Expand Down

0 comments on commit 3b82fb6

Please sign in to comment.