keywords.py 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. from core.moderation.base import Moderation, ModerationAction, ModerationInputsResult, ModerationOutputsResult
  2. class KeywordsModeration(Moderation):
  3. name: str = "keywords"
  4. @classmethod
  5. def validate_config(cls, tenant_id: str, config: dict) -> None:
  6. """
  7. Validate the incoming form config data.
  8. :param tenant_id: the id of workspace
  9. :param config: the form config data
  10. :return:
  11. """
  12. cls._validate_inputs_and_outputs_config(config, True)
  13. if not config.get("keywords"):
  14. raise ValueError("keywords is required")
  15. if len(config.get("keywords")) > 10000:
  16. raise ValueError("keywords length must be less than 10000")
  17. keywords_row_len = config["keywords"].split("\n")
  18. if len(keywords_row_len) > 100:
  19. raise ValueError("the number of rows for the keywords must be less than 100")
  20. def moderation_for_inputs(self, inputs: dict, query: str = "") -> ModerationInputsResult:
  21. flagged = False
  22. preset_response = ""
  23. if self.config["inputs_config"]["enabled"]:
  24. preset_response = self.config["inputs_config"]["preset_response"]
  25. if query:
  26. inputs["query__"] = query
  27. # Filter out empty values
  28. keywords_list = [keyword for keyword in self.config["keywords"].split("\n") if keyword]
  29. flagged = self._is_violated(inputs, keywords_list)
  30. return ModerationInputsResult(
  31. flagged=flagged, action=ModerationAction.DIRECT_OUTPUT, preset_response=preset_response
  32. )
  33. def moderation_for_outputs(self, text: str) -> ModerationOutputsResult:
  34. flagged = False
  35. preset_response = ""
  36. if self.config["outputs_config"]["enabled"]:
  37. # Filter out empty values
  38. keywords_list = [keyword for keyword in self.config["keywords"].split("\n") if keyword]
  39. flagged = self._is_violated({"text": text}, keywords_list)
  40. preset_response = self.config["outputs_config"]["preset_response"]
  41. return ModerationOutputsResult(
  42. flagged=flagged, action=ModerationAction.DIRECT_OUTPUT, preset_response=preset_response
  43. )
  44. def _is_violated(self, inputs: dict, keywords_list: list) -> bool:
  45. return any(self._check_keywords_in_value(keywords_list, value) for value in inputs.values())
  46. def _check_keywords_in_value(self, keywords_list, value) -> bool:
  47. return any(keyword.lower() in value.lower() for keyword in keywords_list)