Streaming Function Filters? #7006
-
I'm trying to emulate some of the behavior in the python guidance library around constraining llm outputs, but for the dotnet semantic kernel sdk. I'd like to create a pluggable component that is idiomatic with the design of SK, and not shim behavior around it; but, I'm struggling to find the appropriate points of extension. Ideally, I'd have a StreamingFunctionInvocationFilter that I could use to intercept and cancel the output stream from OpenAI, but I don't see anything that looks like this. The closest approximation I can find is to do something like this: public record struct FunctionStreamInvocationContext(
FunctionInvocationContext Context,
int TotalTokenCount,
string ResultAggregate,
string CurrentToken
);
public class ConstraintFunctionFilter(
Func<FunctionStreamInvocationContext, bool> streamPredicate)
: IFunctionInvocationFilter
{
public async Task OnFunctionInvocationAsync(
FunctionInvocationContext context,
Func<FunctionInvocationContext, Task> next)
{
CancellationTokenSource tokenSource = new();
var token = tokenSource.Token;
var tokenCount = 0;
var resultAggregate = string.Empty;
await next(context);
var result = context.Result;
switch(result.GetValue<object>())
{
case IAsyncEnumerable<string> stream:
await foreach (var item in stream.WithCancellation(token))
{
resultAggregate += item;
var shouldContinue = streamPredicate(new(context, ++tokenCount, resultAggregate, item));
if (!shouldContinue)
{
tokenSource.Cancel();
break;
}
}
context.Result = new(context.Function, resultAggregate);
break;
}
}
} The idea here is to grab the stream, cancel it if criteria are met, and override the stream with a different result so subsequent filters don't continue enumeration of the original OpenAI request. The problem is, I don't seem to be able to grab the byte data for tokens using this code - and I don't seem to be able to cancel the actual request (the stream continues generating tokens, since the cancellation only appears to apply to my enumeration of the stream and not the stream itself). Any tips for intercepting this correctly in an idiomatic way? |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 3 replies
-
Hi @Tyler-R-Kendrick , I think your approach is correct, here is an example how to use filters in streaming scenario: But if you want to get byte data for tokens or cancel actual HTTP request, you can define your interception and cancellation logic by extending You can then pass your I think at some point we can add |
Beta Was this translation helpful? Give feedback.
-
Just so I'm sure I understand the approach here, I would likely capture the cancellation token from the client request and request cancellation in the function filter. Is that correct? I think that should be a sufficient workaround in the interim, but it seems that the cancellation token really should be passed down through function invocation. Should I make a feature request and PR for this behavior? Or is there something already in the pipeline for it? |
Beta Was this translation helpful? Give feedback.
Hi @Tyler-R-Kendrick , I think your approach is correct, here is an example how to use filters in streaming scenario:
semantic-kernel/dotnet/samples/Concepts/Filtering/FunctionInvocationFiltering.cs
Lines 167 to 187 in fe38a5f