keywords.py 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. from core.moderation.base import Moderation, ModerationAction, ModerationInputsResult, ModerationOutputsResult
  2. class KeywordsModeration(Moderation):
  3. name: str = "keywords"
  4. @classmethod
  5. def validate_config(cls, tenant_id: str, config: dict) -> None:
  6. """
  7. Validate the incoming form config data.
  8. :param tenant_id: the id of workspace
  9. :param config: the form config data
  10. :return:
  11. """
  12. cls._validate_inputs_and_outputs_config(config, True)
  13. if not config.get("keywords"):
  14. raise ValueError("keywords is required")
  15. if len(config.get("keywords")) > 1000:
  16. raise ValueError("keywords length must be less than 1000")
  17. def moderation_for_inputs(self, inputs: dict, query: str = "") -> ModerationInputsResult:
  18. flagged = False
  19. preset_response = ""
  20. if self.config["inputs_config"]["enabled"]:
  21. preset_response = self.config["inputs_config"]["preset_response"]
  22. if query:
  23. inputs["query__"] = query
  24. # Filter out empty values
  25. keywords_list = [keyword for keyword in self.config["keywords"].split("\n") if keyword]
  26. flagged = self._is_violated(inputs, keywords_list)
  27. return ModerationInputsResult(
  28. flagged=flagged, action=ModerationAction.DIRECT_OUTPUT, preset_response=preset_response
  29. )
  30. def moderation_for_outputs(self, text: str) -> ModerationOutputsResult:
  31. flagged = False
  32. preset_response = ""
  33. if self.config["outputs_config"]["enabled"]:
  34. # Filter out empty values
  35. keywords_list = [keyword for keyword in self.config["keywords"].split("\n") if keyword]
  36. flagged = self._is_violated({"text": text}, keywords_list)
  37. preset_response = self.config["outputs_config"]["preset_response"]
  38. return ModerationOutputsResult(
  39. flagged=flagged, action=ModerationAction.DIRECT_OUTPUT, preset_response=preset_response
  40. )
  41. def _is_violated(self, inputs: dict, keywords_list: list) -> bool:
  42. return any(self._check_keywords_in_value(keywords_list, value) for value in inputs.values())
  43. def _check_keywords_in_value(self, keywords_list, value) -> bool:
  44. return any(keyword.lower() in value.lower() for keyword in keywords_list)