Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable Azure Open AI function calling via Azure Functions #1133

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
225 changes: 196 additions & 29 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
convert_to_pf_format,
format_pf_non_streaming_response,
)
import requests

bp = Blueprint("routes", __name__, static_folder="static", template_folder="static")

Expand Down Expand Up @@ -111,6 +112,9 @@ async def assets(path):
MS_DEFENDER_ENABLED = os.environ.get("MS_DEFENDER_ENABLED", "true").lower() == "true"


azure_openai_tools = []
azure_openai_available_tools = []

# Initialize Azure OpenAI Client
async def init_openai_client():
azure_openai_client = None
Expand Down Expand Up @@ -159,6 +163,19 @@ async def init_openai_client():
# Default Headers
default_headers = {"x-ms-useragent": USER_AGENT}

# Remote function calls
if app_settings.azure_openai.function_call_azure_functions_enabled:
azure_functions_tools_url = f"{app_settings.azure_openai.function_call_azure_functions_tools_base_url}?code={app_settings.azure_openai.function_call_azure_functions_tools_key}"
response = requests.get(azure_functions_tools_url)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's use an async call here

response_status_code = response.status_code
if response_status_code == requests.codes.ok:
azure_openai_tools.extend(json.loads(response.text))
for tool in azure_openai_tools:
azure_openai_available_tools.append(tool["function"]["name"])
else:
logging.error(f"An error occurred while getting OpenAI Function Call tools metadata: {response.status_code}")


azure_openai_client = AsyncAzureOpenAI(
api_version=app_settings.azure_openai.preview_api_version,
api_key=aoai_api_key,
Expand All @@ -173,6 +190,20 @@ async def init_openai_client():
azure_openai_client = None
raise e

def openai_remote_azure_function_call(function_name, function_args):
if app_settings.azure_openai.function_call_azure_functions_enabled is not True:
return

azure_functions_tool_url = f"{app_settings.azure_openai.function_call_azure_functions_tool_base_url}?code={app_settings.azure_openai.function_call_azure_functions_tool_key}"
headers = {'content-type': 'application/json'}
body = {
"tool_name": function_name,
"tool_arguments": json.loads(function_args)
}
response = requests.post(azure_functions_tool_url, data=json.dumps(body), headers=headers)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's make this async as well

response.raise_for_status()

return response.text

async def init_cosmosdb_client():
cosmos_conversation_client = None
Expand Down Expand Up @@ -219,22 +250,28 @@ def prepare_model_args(request_body, request_headers):

for message in request_messages:
if message:
if message["role"] == "assistant" and "context" in message:
context_obj = json.loads(message["context"])
messages.append(
{
"role": message["role"],
"content": message["content"],
"context": context_obj
}
)
else:
messages.append(
{
"role": message["role"],
"content": message["content"]
}
)
match message["role"]:
case "user":
messages.append(
{
"role": message["role"],
"content": message["content"]
}
)
case "assistant" | "function" | "tool":
messages_helper = {}
messages_helper["role"] = message["role"]
if "name" in message:
messages_helper["name"] = message["name"]
if "function_call" in message:
messages_helper["function_call"] = message["function_call"]
messages_helper["content"] = message["content"]
if "context" in message:
context_obj = json.loads(message["context"])
messages_helper["context"] = context_obj

messages.append(messages_helper)


user_json = None
if (MS_DEFENDER_ENABLED):
Expand All @@ -254,14 +291,18 @@ def prepare_model_args(request_body, request_headers):
"user": user_json
}

if app_settings.datasource:
model_args["extra_body"] = {
"data_sources": [
app_settings.datasource.construct_payload_configuration(
request=request
)
]
}
if messages[-1]["role"] == "user":
Copy link
Contributor

@sarah-widder sarah-widder Jan 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a safety check here that len(messages) > 0 would be good

if app_settings.azure_openai.function_call_azure_functions_enabled and len(azure_openai_tools) > 0:
model_args["tools"] = azure_openai_tools

if app_settings.datasource:
model_args["extra_body"] = {
"data_sources": [
app_settings.datasource.construct_payload_configuration(
request=request
)
]
}

model_args_clean = copy.deepcopy(model_args)
if model_args_clean.get("extra_body"):
Expand Down Expand Up @@ -335,6 +376,43 @@ async def promptflow_request(request):
logging.error(f"An error occurred while making promptflow_request: {e}")


def process_function_call(response):
response_message = response.choices[0].message
messages = []

if response_message.tool_calls:
for tool_call in response_message.tool_calls:
# Check if function exists
if tool_call.function.name not in azure_openai_available_tools:
continue

function_response = openai_remote_azure_function_call(tool_call.function.name, tool_call.function.arguments)

# adding assistant response to messages
messages.append(
{
"role": response_message.role,
"function_call": {
"name": tool_call.function.name,
"arguments": tool_call.function.arguments,
},
"content": None,
}
)

# adding function response to messages
messages.append(
{
"role": "function",
"name": tool_call.function.name,
"content": function_response,
}
) # extend conversation with function response

return messages

return None

async def send_chat_request(request_body, request_headers):
filtered_messages = []
messages = request_body.get("messages", [])
Expand Down Expand Up @@ -370,18 +448,107 @@ async def complete_chat_request(request_body, request_headers):
else:
response, apim_request_id = await send_chat_request(request_body, request_headers)
history_metadata = request_body.get("history_metadata", {})
return format_non_streaming_response(response, history_metadata, apim_request_id)
non_streaming_response = format_non_streaming_response(response, history_metadata, apim_request_id)

if app_settings.azure_openai.function_call_azure_functions_enabled:
function_response = process_function_call(response)

if function_response:
request_body["messages"].extend(function_response)

response, apim_request_id = await send_chat_request(request_body, request_headers)
history_metadata = request_body.get("history_metadata", {})
non_streaming_response = format_non_streaming_response(response, history_metadata, apim_request_id)

return non_streaming_response


async def stream_chat_request(request_body, request_headers):
response, apim_request_id = await send_chat_request(request_body, request_headers)
history_metadata = request_body.get("history_metadata", {})

async def generate():
async for completionChunk in response:
yield format_stream_response(completionChunk, history_metadata, apim_request_id)
messages = []

async def generate(apim_request_id, history_metadata):
tool_calls = []
current_tool_call = None
tool_arguments_stream = ""
function_messages = []
tool_name = ""
tool_call_streaming_state = "INITIAL"

return generate()
async for completionChunk in response:
if app_settings.azure_openai.function_call_azure_functions_enabled:
if hasattr(completionChunk, "choices") and len(completionChunk.choices) > 0:
response_message = completionChunk.choices[0].delta

# Function calling stream processing
if response_message.tool_calls and tool_call_streaming_state in ["INITIAL", "STREAMING"]:
tool_call_streaming_state = "STREAMING"
for tool_call_chunk in response_message.tool_calls:
# New tool call
if tool_call_chunk.id:
if current_tool_call:
tool_arguments_stream += tool_call_chunk.function.arguments if tool_call_chunk.function.arguments else ""
current_tool_call["tool_arguments"] = tool_arguments_stream
tool_arguments_stream = ""
tool_name = ""
tool_calls.append(current_tool_call)

current_tool_call = {
"tool_id": tool_call_chunk.id,
"tool_name": tool_call_chunk.function.name if tool_name == "" else tool_name
}
else:
tool_arguments_stream += tool_call_chunk.function.arguments if tool_call_chunk.function.arguments else ""

# Function call - Streaming completed
elif response_message.tool_calls is None and tool_call_streaming_state == "STREAMING":
current_tool_call["tool_arguments"] = tool_arguments_stream
tool_calls.append(current_tool_call)

for tool_call in tool_calls:
tool_response = openai_remote_azure_function_call(tool_call["tool_name"], tool_call["tool_arguments"])

function_messages.append({
"role": "assistant",
"function_call": {
"name" : tool_call["tool_name"],
"arguments": tool_call["tool_arguments"]
},
"content": None
})
function_messages.append({
"tool_call_id": tool_call["tool_id"],
"role": "function",
"name": tool_call["tool_name"],
"content": tool_response,
})

# Reset for the next tool call
messages = function_messages
function_messages = []
tool_calls = []
current_tool_call = None
tool_arguments_stream = ""
tool_name = ""
tool_id = None
tool_call_streaming_state = "COMPLETED"

request_body["messages"].extend(messages)

function_response, apim_request_id = await send_chat_request(request_body, request_headers)

async for functionCompletionChunk in function_response:
yield format_stream_response(functionCompletionChunk, history_metadata, apim_request_id)
goventur marked this conversation as resolved.
Show resolved Hide resolved

else:
# No function call, asistant response
yield format_stream_response(completionChunk, history_metadata, apim_request_id)

else:
yield format_stream_response(completionChunk, history_metadata, apim_request_id)
return generate(apim_request_id=apim_request_id, history_metadata=history_metadata)


async def conversation_internal(request_body, request_headers):
Expand Down
5 changes: 5 additions & 0 deletions backend/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ class _AzureOpenAISettings(BaseSettings):
embedding_endpoint: Optional[str] = None
embedding_key: Optional[str] = None
embedding_name: Optional[str] = None
function_call_azure_functions_enabled: bool = False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make this Optional[bool] for consistency?

function_call_azure_functions_tools_key: Optional[str] = None
function_call_azure_functions_tools_base_url: Optional[str] = None
function_call_azure_functions_tool_key: Optional[str] = None
function_call_azure_functions_tool_base_url: Optional[str] = None

@field_validator('tools', mode='before')
@classmethod
Expand Down
16 changes: 16 additions & 0 deletions backend/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,22 @@ def format_stream_response(chatCompletionChunk, history_metadata, apim_request_i
}
response_obj["choices"][0]["messages"].append(messageObj)
return response_obj
if delta.tool_calls:
messageObj = {
"role": "tool",
"tool_calls": {
"id": delta.tool_calls[0].id,
"function": {
"name" : delta.tool_calls[0].function.name,
"arguments": delta.tool_calls[0].function.arguments
},
"type": delta.tool_calls[0].type
}
}
if hasattr(delta, "context"):
messageObj["context"] = json.dumps(delta.context)
response_obj["choices"][0]["messages"].append(messageObj)
return response_obj
else:
if delta.content:
messageObj = {
Expand Down
Loading