| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667 | 
							- from core.moderation.base import Moderation, ModerationAction, ModerationInputsResult, ModerationOutputsResult
 
- class KeywordsModeration(Moderation):
 
-     name: str = "keywords"
 
-     @classmethod
 
-     def validate_config(cls, tenant_id: str, config: dict) -> None:
 
-         """
 
-         Validate the incoming form config data.
 
-         :param tenant_id: the id of workspace
 
-         :param config: the form config data
 
-         :return:
 
-         """
 
-         cls._validate_inputs_and_outputs_config(config, True)
 
-         if not config.get("keywords"):
 
-             raise ValueError("keywords is required")
 
-         if len(config.get("keywords")) > 10000:
 
-             raise ValueError("keywords length must be less than 10000")
 
-         keywords_row_len = config["keywords"].split("\n")
 
-         if len(keywords_row_len) > 100:
 
-             raise ValueError("the number of rows for the keywords must be less than 100")
 
-     def moderation_for_inputs(self, inputs: dict, query: str = "") -> ModerationInputsResult:
 
-         flagged = False
 
-         preset_response = ""
 
-         if self.config["inputs_config"]["enabled"]:
 
-             preset_response = self.config["inputs_config"]["preset_response"]
 
-             if query:
 
-                 inputs["query__"] = query
 
-             # Filter out empty values
 
-             keywords_list = [keyword for keyword in self.config["keywords"].split("\n") if keyword]
 
-             flagged = self._is_violated(inputs, keywords_list)
 
-         return ModerationInputsResult(
 
-             flagged=flagged, action=ModerationAction.DIRECT_OUTPUT, preset_response=preset_response
 
-         )
 
-     def moderation_for_outputs(self, text: str) -> ModerationOutputsResult:
 
-         flagged = False
 
-         preset_response = ""
 
-         if self.config["outputs_config"]["enabled"]:
 
-             # Filter out empty values
 
-             keywords_list = [keyword for keyword in self.config["keywords"].split("\n") if keyword]
 
-             flagged = self._is_violated({"text": text}, keywords_list)
 
-             preset_response = self.config["outputs_config"]["preset_response"]
 
-         return ModerationOutputsResult(
 
-             flagged=flagged, action=ModerationAction.DIRECT_OUTPUT, preset_response=preset_response
 
-         )
 
-     def _is_violated(self, inputs: dict, keywords_list: list) -> bool:
 
-         return any(self._check_keywords_in_value(keywords_list, value) for value in inputs.values())
 
-     def _check_keywords_in_value(self, keywords_list, value) -> bool:
 
-         return any(keyword.lower() in value.lower() for keyword in keywords_list)
 
 
  |