moderation.py 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. import logging
  2. import random
  3. from typing import cast
  4. from core.app.entities.app_invoke_entities import ModelConfigWithCredentialsEntity
  5. from core.entities import DEFAULT_PLUGIN_ID
  6. from core.model_runtime.entities.model_entities import ModelType
  7. from core.model_runtime.errors.invoke import InvokeBadRequestError
  8. from core.model_runtime.model_providers.__base.moderation_model import ModerationModel
  9. from core.model_runtime.model_providers.model_provider_factory import ModelProviderFactory
  10. from extensions.ext_hosting_provider import hosting_configuration
  11. from models.provider import ProviderType
  12. logger = logging.getLogger(__name__)
  13. def check_moderation(tenant_id: str, model_config: ModelConfigWithCredentialsEntity, text: str) -> bool:
  14. moderation_config = hosting_configuration.moderation_config
  15. openai_provider_name = f"{DEFAULT_PLUGIN_ID}/openai/openai"
  16. if (
  17. moderation_config
  18. and moderation_config.enabled is True
  19. and openai_provider_name in hosting_configuration.provider_map
  20. and hosting_configuration.provider_map[openai_provider_name].enabled is True
  21. ):
  22. using_provider_type = model_config.provider_model_bundle.configuration.using_provider_type
  23. provider_name = model_config.provider
  24. if using_provider_type == ProviderType.SYSTEM and provider_name in moderation_config.providers:
  25. hosting_openai_config = hosting_configuration.provider_map[openai_provider_name]
  26. if hosting_openai_config.credentials is None:
  27. return False
  28. # 2000 text per chunk
  29. length = 2000
  30. text_chunks = [text[i : i + length] for i in range(0, len(text), length)]
  31. if len(text_chunks) == 0:
  32. return True
  33. text_chunk = random.choice(text_chunks)
  34. try:
  35. model_provider_factory = ModelProviderFactory(tenant_id)
  36. # Get model instance of LLM
  37. model_type_instance = model_provider_factory.get_model_type_instance(
  38. provider=openai_provider_name, model_type=ModelType.MODERATION
  39. )
  40. model_type_instance = cast(ModerationModel, model_type_instance)
  41. moderation_result = model_type_instance.invoke(
  42. model="omni-moderation-latest", credentials=hosting_openai_config.credentials, text=text_chunk
  43. )
  44. if moderation_result is True:
  45. return True
  46. except Exception:
  47. logger.exception(f"Fails to check moderation, provider_name: {provider_name}")
  48. raise InvokeBadRequestError("Rate limit exceeded, please try again later.")
  49. return False