123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268 |
- import re
- from collections.abc import Generator
- from json import dumps
- from time import time
- # import monkeypatch
- from typing import Any, Literal, Optional, Union
- from openai import AzureOpenAI, OpenAI
- from openai._types import NOT_GIVEN, NotGiven
- from openai.resources.chat.completions import Completions
- from openai.types import Completion as CompletionMessage
- from openai.types.chat import (
- ChatCompletionChunk,
- ChatCompletionMessageParam,
- ChatCompletionMessageToolCall,
- ChatCompletionToolParam,
- completion_create_params,
- )
- from openai.types.chat.chat_completion import ChatCompletion as _ChatCompletion
- from openai.types.chat.chat_completion import Choice as _ChatCompletionChoice
- from openai.types.chat.chat_completion_chunk import (
- Choice,
- ChoiceDelta,
- ChoiceDeltaFunctionCall,
- ChoiceDeltaToolCall,
- ChoiceDeltaToolCallFunction,
- )
- from openai.types.chat.chat_completion_message import ChatCompletionMessage, FunctionCall
- from openai.types.chat.chat_completion_message_tool_call import Function
- from openai.types.completion_usage import CompletionUsage
- from core.model_runtime.errors.invoke import InvokeAuthorizationError
- class MockChatClass:
- @staticmethod
- def generate_function_call(
- functions: list[completion_create_params.Function] | NotGiven = NOT_GIVEN,
- ) -> Optional[FunctionCall]:
- if not functions or len(functions) == 0:
- return None
- function: completion_create_params.Function = functions[0]
- function_name = function["name"]
- function_description = function["description"]
- function_parameters = function["parameters"]
- function_parameters_type = function_parameters["type"]
- if function_parameters_type != "object":
- return None
- function_parameters_properties = function_parameters["properties"]
- function_parameters_required = function_parameters["required"]
- parameters = {}
- for parameter_name, parameter in function_parameters_properties.items():
- if parameter_name not in function_parameters_required:
- continue
- parameter_type = parameter["type"]
- if parameter_type == "string":
- if "enum" in parameter:
- if len(parameter["enum"]) == 0:
- continue
- parameters[parameter_name] = parameter["enum"][0]
- else:
- parameters[parameter_name] = "kawaii"
- elif parameter_type == "integer":
- parameters[parameter_name] = 114514
- elif parameter_type == "number":
- parameters[parameter_name] = 1919810.0
- elif parameter_type == "boolean":
- parameters[parameter_name] = True
- return FunctionCall(name=function_name, arguments=dumps(parameters))
- @staticmethod
- def generate_tool_calls(tools=NOT_GIVEN) -> Optional[list[ChatCompletionMessageToolCall]]:
- list_tool_calls = []
- if not tools or len(tools) == 0:
- return None
- tool = tools[0]
- if "type" in tools and tools["type"] != "function":
- return None
- function = tool["function"]
- function_call = MockChatClass.generate_function_call(functions=[function])
- if function_call is None:
- return None
- list_tool_calls.append(
- ChatCompletionMessageToolCall(
- id="sakurajima-mai",
- function=Function(
- name=function_call.name,
- arguments=function_call.arguments,
- ),
- type="function",
- )
- )
- return list_tool_calls
- @staticmethod
- def mocked_openai_chat_create_sync(
- model: str,
- functions: list[completion_create_params.Function] | NotGiven = NOT_GIVEN,
- tools: list[ChatCompletionToolParam] | NotGiven = NOT_GIVEN,
- ) -> CompletionMessage:
- tool_calls = []
- function_call = MockChatClass.generate_function_call(functions=functions)
- if not function_call:
- tool_calls = MockChatClass.generate_tool_calls(tools=tools)
- return _ChatCompletion(
- id="cmpl-3QJQa5jXJ5Z5X",
- choices=[
- _ChatCompletionChoice(
- finish_reason="content_filter",
- index=0,
- message=ChatCompletionMessage(
- content="elaina", role="assistant", function_call=function_call, tool_calls=tool_calls
- ),
- )
- ],
- created=int(time()),
- model=model,
- object="chat.completion",
- system_fingerprint="",
- usage=CompletionUsage(
- prompt_tokens=2,
- completion_tokens=1,
- total_tokens=3,
- ),
- )
- @staticmethod
- def mocked_openai_chat_create_stream(
- model: str,
- functions: list[completion_create_params.Function] | NotGiven = NOT_GIVEN,
- tools: list[ChatCompletionToolParam] | NotGiven = NOT_GIVEN,
- ) -> Generator[ChatCompletionChunk, None, None]:
- tool_calls = []
- function_call = MockChatClass.generate_function_call(functions=functions)
- if not function_call:
- tool_calls = MockChatClass.generate_tool_calls(tools=tools)
- full_text = "Hello, world!\n\n```python\nprint('Hello, world!')\n```"
- for i in range(0, len(full_text) + 1):
- if i == len(full_text):
- yield ChatCompletionChunk(
- id="cmpl-3QJQa5jXJ5Z5X",
- choices=[
- Choice(
- delta=ChoiceDelta(
- content="",
- function_call=ChoiceDeltaFunctionCall(
- name=function_call.name,
- arguments=function_call.arguments,
- )
- if function_call
- else None,
- role="assistant",
- tool_calls=[
- ChoiceDeltaToolCall(
- index=0,
- id="misaka-mikoto",
- function=ChoiceDeltaToolCallFunction(
- name=tool_calls[0].function.name,
- arguments=tool_calls[0].function.arguments,
- ),
- type="function",
- )
- ]
- if tool_calls and len(tool_calls) > 0
- else None,
- ),
- finish_reason="function_call",
- index=0,
- )
- ],
- created=int(time()),
- model=model,
- object="chat.completion.chunk",
- system_fingerprint="",
- usage=CompletionUsage(
- prompt_tokens=2,
- completion_tokens=17,
- total_tokens=19,
- ),
- )
- else:
- yield ChatCompletionChunk(
- id="cmpl-3QJQa5jXJ5Z5X",
- choices=[
- Choice(
- delta=ChoiceDelta(
- content=full_text[i],
- role="assistant",
- ),
- finish_reason="content_filter",
- index=0,
- )
- ],
- created=int(time()),
- model=model,
- object="chat.completion.chunk",
- system_fingerprint="",
- )
- def chat_create(
- self: Completions,
- *,
- messages: list[ChatCompletionMessageParam],
- model: Union[
- str,
- Literal[
- "gpt-4-1106-preview",
- "gpt-4-vision-preview",
- "gpt-4",
- "gpt-4-0314",
- "gpt-4-0613",
- "gpt-4-32k",
- "gpt-4-32k-0314",
- "gpt-4-32k-0613",
- "gpt-3.5-turbo-1106",
- "gpt-3.5-turbo",
- "gpt-3.5-turbo-16k",
- "gpt-3.5-turbo-0301",
- "gpt-3.5-turbo-0613",
- "gpt-3.5-turbo-16k-0613",
- ],
- ],
- functions: list[completion_create_params.Function] | NotGiven = NOT_GIVEN,
- response_format: completion_create_params.ResponseFormat | NotGiven = NOT_GIVEN,
- stream: Optional[Literal[False]] | NotGiven = NOT_GIVEN,
- tools: list[ChatCompletionToolParam] | NotGiven = NOT_GIVEN,
- **kwargs: Any,
- ):
- openai_models = [
- "gpt-4-1106-preview",
- "gpt-4-vision-preview",
- "gpt-4",
- "gpt-4-0314",
- "gpt-4-0613",
- "gpt-4-32k",
- "gpt-4-32k-0314",
- "gpt-4-32k-0613",
- "gpt-3.5-turbo-1106",
- "gpt-3.5-turbo",
- "gpt-3.5-turbo-16k",
- "gpt-3.5-turbo-0301",
- "gpt-3.5-turbo-0613",
- "gpt-3.5-turbo-16k-0613",
- ]
- azure_openai_models = ["gpt35", "gpt-4v", "gpt-35-turbo"]
- if not re.match(r"^(https?):\/\/[^\s\/$.?#].[^\s]*$", str(self._client.base_url)):
- raise InvokeAuthorizationError("Invalid base url")
- if model in openai_models + azure_openai_models:
- if not re.match(r"sk-[a-zA-Z0-9]{24,}$", self._client.api_key) and type(self._client) == OpenAI:
- # sometime, provider use OpenAI compatible API will not have api key or have different api key format
- # so we only check if model is in openai_models
- raise InvokeAuthorizationError("Invalid api key")
- if len(self._client.api_key) < 18 and type(self._client) == AzureOpenAI:
- raise InvokeAuthorizationError("Invalid api key")
- if stream:
- return MockChatClass.mocked_openai_chat_create_stream(model=model, functions=functions, tools=tools)
- return MockChatClass.mocked_openai_chat_create_sync(model=model, functions=functions, tools=tools)
|