Bladeren bron

Merge branch '1.1.3-suhh' into 1.1.3-master

‘suhuihui’ 3 maanden geleden
bovenliggende
commit
8d1f10cdc0

+ 26 - 0
api/configs/feature/__init__.py

@@ -340,6 +340,32 @@ class HttpConfig(BaseSettings):
         description="Enable or disable SSL verification for HTTP requests",
         default=True,
     )
+    INTENT_RECON_TRAIN_MAX_CONNECT_TIMEOUT: Annotated[
+        PositiveInt, Field(ge=10, description="Maximum connection timeout in seconds for intent recon train")
+    ] = 10
+
+    INTENT_RECON_TRAIN_MAX_READ_TIMEOUT: Annotated[
+        PositiveInt, Field(ge=60, description="Maximum read timeout in seconds for intent recon train")
+    ] = 60
+
+    INTENT_RECON_TRAIN_MAX_WRITE_TIMEOUT: Annotated[
+        PositiveInt, Field(ge=10, description="Maximum write timeout in seconds for intent recon train")
+    ] = 20
+
+    INTENT_RECON_TRAIN_NODE_MAX_BINARY_SIZE: PositiveInt = Field(
+        description="Maximum allowed size in bytes for binary data in intent recon train",
+        default=10 * 1024 * 1024,
+    )
+
+    INTENT_RECON_TRAIN_NODE_MAX_TEXT_SIZE: PositiveInt = Field(
+        description="Maximum allowed size in bytes for text data in intent recon train",
+        default=1 * 1024 * 1024,
+    )
+
+    INTENT_RECON_TRAIN_NODE_SSL_VERIFY: bool = Field(
+        description="Enable or disable SSL verification for intent recon train",
+        default=True,
+    )
 
     SSRF_DEFAULT_MAX_RETRIES: PositiveInt = Field(
         description="Maximum number of retries for network requests (SSRF)",

+ 1 - 1
api/controllers/console/datasets/datasets_templates.py

@@ -168,7 +168,7 @@ class DatasetTemplateApi(Resource):
         template = Template.query.filter_by(id=template_id).first()
         file_name = template.name
         # as_attachment下载作为附件下载
-        return send_file(template.file_url, as_attachment=True,download_name=file_name)
+        return send_file(template.file_url, as_attachment=True, download_name=file_name)
 
     @setup_required
     @login_required

+ 7 - 0
api/core/workflow/graph_engine/graph_engine.py

@@ -669,6 +669,13 @@ class GraphEngine:
                                     and not node_instance.should_continue_on_error
                                 ):
                                     run_result.status = WorkflowNodeExecutionStatus.SUCCEEDED
+                                if (
+                                    retries == max_retries
+                                    and node_instance.node_type == NodeType.INTENT_RECON_TRAIN
+                                    and run_result.outputs
+                                    and not node_instance.should_continue_on_error
+                                ):
+                                    run_result.status = WorkflowNodeExecutionStatus.SUCCEEDED
                                 if node_instance.should_retry and retries < max_retries:
                                     retries += 1
                                     route_node_state.node_run_result = run_result

+ 8 - 1
api/core/workflow/nodes/enums.py

@@ -24,6 +24,7 @@ class NodeType(StrEnum):
     DOCUMENT_EXTRACTOR = "document-extractor"
     LIST_OPERATOR = "list-operator"
     AGENT = "agent"
+    INTENT_RECON_TRAIN = "intent-recon-train"
 
 
 class ErrorStrategy(StrEnum):
@@ -36,5 +37,11 @@ class FailBranchSourceHandle(StrEnum):
     SUCCESS = "success-branch"
 
 
-CONTINUE_ON_ERROR_NODE_TYPE = [NodeType.LLM, NodeType.CODE, NodeType.TOOL, NodeType.HTTP_REQUEST]
+CONTINUE_ON_ERROR_NODE_TYPE = [
+    NodeType.LLM,
+    NodeType.CODE,
+    NodeType.TOOL,
+    NodeType.HTTP_REQUEST,
+    NodeType.INTENT_RECON_TRAIN,
+]
 RETRY_ON_ERROR_NODE_TYPE = CONTINUE_ON_ERROR_NODE_TYPE

+ 10 - 0
api/core/workflow/nodes/intent_recon_train/__init__.py

@@ -0,0 +1,10 @@
+from .entities import BodyData, IntentReconTrainNodeAuthorization, IntentReconTrainNodeBody, IntentReconTrainNodeData
+from .node import IntentReconTrainNode
+
+__all__ = [
+    "BodyData",
+    "IntentReconTrainNode",
+    "IntentReconTrainNodeAuthorization",
+    "IntentReconTrainNodeBody",
+    "IntentReconTrainNodeData",
+]

+ 191 - 0
api/core/workflow/nodes/intent_recon_train/entities.py

@@ -0,0 +1,191 @@
+import mimetypes
+from collections.abc import Sequence
+from email.message import Message
+from typing import Any, Literal, Optional
+
+import httpx
+from pydantic import BaseModel, Field, ValidationInfo, field_validator
+
+from configs import dify_config
+from core.workflow.nodes.base import BaseNodeData
+
+
+class IntentReconTrainNodeAuthorizationConfig(BaseModel):
+    type: Literal["basic", "bearer", "custom"]
+    api_key: str
+    header: str = ""
+
+
+class IntentReconTrainNodeAuthorization(BaseModel):
+    type: Literal["no-auth", "api-key"]
+    config: Optional[IntentReconTrainNodeAuthorizationConfig] = None
+
+    @field_validator("config", mode="before")
+    @classmethod
+    def check_config(cls, v: IntentReconTrainNodeAuthorizationConfig, values: ValidationInfo):
+        """
+        Check config, if type is no-auth, config should be None, otherwise it should be a dict.
+        """
+        if values.data["type"] == "no-auth":
+            return None
+        else:
+            if not v or not isinstance(v, dict):
+                raise ValueError("config should be a dict")
+
+            return v
+
+
+class BodyData(BaseModel):
+    key: str = ""
+    type: Literal["file", "text"]
+    value: str = ""
+    file: Sequence[str] = Field(default_factory=list)
+
+
+class IntentReconTrainNodeBody(BaseModel):
+    type: Literal["none", "form-data", "x-www-form-urlencoded", "raw-text", "json", "binary"]
+    data: Sequence[BodyData] = Field(default_factory=list)
+
+    @field_validator("data", mode="before")
+    @classmethod
+    def check_data(cls, v: Any):
+        """For compatibility, if body is not set, return empty list."""
+        if not v:
+            return []
+        if isinstance(v, str):
+            return [BodyData(key="", type="text", value=v)]
+        return v
+
+
+class IntentReconTrainNodeTimeout(BaseModel):
+    connect: int = dify_config.INTENT_RECON_TRAIN_MAX_CONNECT_TIMEOUT
+    read: int = dify_config.INTENT_RECON_TRAIN_MAX_READ_TIMEOUT
+    write: int = dify_config.INTENT_RECON_TRAIN_MAX_WRITE_TIMEOUT
+
+
+class IntentReconTrainNodeData(BaseNodeData):
+    """
+    Code Node Data.
+    """
+
+    method: Literal[
+        "get",
+        "post",
+        "put",
+        "patch",
+        "delete",
+        "head",
+        "options",
+        "GET",
+        "POST",
+        "PUT",
+        "PATCH",
+        "DELETE",
+        "HEAD",
+        "OPTIONS",
+    ]
+    url: str
+    authorization: IntentReconTrainNodeAuthorization
+    headers: str
+    params: str
+    body: Optional[IntentReconTrainNodeBody] = None
+    timeout: Optional[IntentReconTrainNodeTimeout] = None
+
+
+class Response:
+    headers: dict[str, str]
+    response: httpx.Response
+
+    def __init__(self, response: httpx.Response):
+        self.response = response
+        self.headers = dict(response.headers)
+
+    @property
+    def is_file(self):
+        """
+        Determine if the response contains a file by checking:
+        1. Content-Disposition header (RFC 6266)
+        2. Content characteristics
+        3. MIME type analysis
+        """
+        content_type = self.content_type.split(";")[0].strip().lower()
+        parsed_content_disposition = self.parsed_content_disposition
+
+        # Check if it's explicitly marked as an attachment
+        if parsed_content_disposition:
+            disp_type = parsed_content_disposition.get_content_disposition()  # Returns 'attachment', 'inline', or None
+            filename = parsed_content_disposition.get_filename()  # Returns filename if present, None otherwise
+            if disp_type == "attachment" or filename is not None:
+                return True
+
+        # For 'text/' types, only 'csv' should be downloaded as file
+        if content_type.startswith("text/") and "csv" not in content_type:
+            return False
+
+        # For application types, try to detect if it's a text-based format
+        if content_type.startswith("application/"):
+            # Common text-based application types
+            if any(
+                text_type in content_type
+                for text_type in ("json", "xml", "javascript", "x-www-form-urlencoded", "yaml", "graphql")
+            ):
+                return False
+
+            # Try to detect if content is text-based by sampling first few bytes
+            try:
+                # Sample first 1024 bytes for text detection
+                content_sample = self.response.content[:1024]
+                content_sample.decode("utf-8")
+                # If we can decode as UTF-8 and find common text patterns, likely not a file
+                text_markers = (b"{", b"[", b"<", b"function", b"var ", b"const ", b"let ")
+                if any(marker in content_sample for marker in text_markers):
+                    return False
+            except UnicodeDecodeError:
+                # If we can't decode as UTF-8, likely a binary file
+                return True
+
+        # For other types, use MIME type analysis
+        main_type, _ = mimetypes.guess_type("dummy" + (mimetypes.guess_extension(content_type) or ""))
+        if main_type:
+            return main_type.split("/")[0] in ("application", "image", "audio", "video")
+
+        # For unknown types, check if it's a media type
+        return any(media_type in content_type for media_type in ("image/", "audio/", "video/"))
+
+    @property
+    def content_type(self) -> str:
+        return self.headers.get("content-type", "")
+
+    @property
+    def text(self) -> str:
+        return self.response.text
+
+    @property
+    def content(self) -> bytes:
+        return self.response.content
+
+    @property
+    def status_code(self) -> int:
+        return self.response.status_code
+
+    @property
+    def size(self) -> int:
+        return len(self.content)
+
+    @property
+    def readable_size(self) -> str:
+        if self.size < 1024:
+            return f"{self.size} bytes"
+        elif self.size < 1024 * 1024:
+            return f"{(self.size / 1024):.2f} KB"
+        else:
+            return f"{(self.size / 1024 / 1024):.2f} MB"
+
+    @property
+    def parsed_content_disposition(self) -> Optional[Message]:
+        content_disposition = self.headers.get("content-disposition", "")
+        if content_disposition:
+            msg = Message()
+            msg["content-disposition"] = content_disposition
+            return msg
+        return None

+ 26 - 0
api/core/workflow/nodes/intent_recon_train/exc.py

@@ -0,0 +1,26 @@
+class IntentReconTrainNodeError(ValueError):
+    """Custom error for intent recon train node."""
+
+
+class AuthorizationConfigError(IntentReconTrainNodeError):
+    """Raised when authorization config is missing or invalid."""
+
+
+class FileFetchError(IntentReconTrainNodeError):
+    """Raised when a file cannot be fetched."""
+
+
+class InvalidHttpMethodError(IntentReconTrainNodeError):
+    """Raised when an invalid intent recon train method is used."""
+
+
+class ResponseSizeError(IntentReconTrainNodeError):
+    """Raised when the response size exceeds the allowed threshold."""
+
+
+class RequestBodyError(IntentReconTrainNodeError):
+    """Raised when the request body is invalid."""
+
+
+class InvalidURLError(IntentReconTrainNodeError):
+    """Raised when the URL is invalid."""

+ 422 - 0
api/core/workflow/nodes/intent_recon_train/executor.py

@@ -0,0 +1,422 @@
+import json
+from collections.abc import Mapping
+from copy import deepcopy
+from random import randint
+from typing import Any, Literal
+from urllib.parse import urlencode, urlparse
+
+import httpx
+
+from configs import dify_config
+from core.file import file_manager
+from core.helper import ssrf_proxy
+from core.variables.segments import ArrayFileSegment, FileSegment
+from core.workflow.entities.variable_pool import VariablePool
+
+from .entities import (
+    IntentReconTrainNodeAuthorization,
+    IntentReconTrainNodeData,
+    IntentReconTrainNodeTimeout,
+    Response,
+)
+from .exc import (
+    AuthorizationConfigError,
+    FileFetchError,
+    IntentReconTrainNodeError,
+    InvalidHttpMethodError,
+    InvalidURLError,
+    RequestBodyError,
+    ResponseSizeError,
+)
+
+BODY_TYPE_TO_CONTENT_TYPE = {
+    "json": "application/json",
+    "x-www-form-urlencoded": "application/x-www-form-urlencoded",
+    "form-data": "multipart/form-data",
+    "raw-text": "text/plain",
+}
+
+
+class Executor:
+    method: Literal[
+        "get",
+        "head",
+        "post",
+        "put",
+        "delete",
+        "patch",
+        "options",
+        "GET",
+        "POST",
+        "PUT",
+        "PATCH",
+        "DELETE",
+        "HEAD",
+        "OPTIONS",
+    ]
+    url: str
+    params: list[tuple[str, str]] | None
+    content: str | bytes | None
+    data: Mapping[str, Any] | None
+    files: list[tuple[str, tuple[str | None, bytes, str]]] | None
+    json: Any
+    headers: dict[str, str]
+    auth: IntentReconTrainNodeAuthorization
+    timeout: IntentReconTrainNodeTimeout
+    max_retries: int
+
+    boundary: str
+
+    def __init__(
+        self,
+        *,
+        node_data: IntentReconTrainNodeData,
+        timeout: IntentReconTrainNodeTimeout,
+        variable_pool: VariablePool,
+        max_retries: int = dify_config.SSRF_DEFAULT_MAX_RETRIES,
+    ):
+        # If authorization API key is present, convert the API key using the variable pool
+        if node_data.authorization.type == "api-key":
+            if node_data.authorization.config is None:
+                raise AuthorizationConfigError("authorization config is required")
+            node_data.authorization.config.api_key = variable_pool.convert_template(
+                node_data.authorization.config.api_key
+            ).text
+
+        self.url: str = node_data.url
+        self.method = node_data.method
+        self.auth = node_data.authorization
+        self.timeout = timeout
+        self.params = []
+        self.headers = {}
+        self.content = None
+        self.files = None
+        self.data = None
+        self.json = None
+        self.max_retries = max_retries
+
+        # init template
+        self.variable_pool = variable_pool
+        self.node_data = node_data
+        self._initialize()
+
+    def _initialize(self):
+        self._init_url()
+        self._init_params()
+        self._init_headers()
+        self._init_body()
+
+    def _init_url(self):
+        self.url = self.variable_pool.convert_template(self.node_data.url).text
+
+        # check if url is a valid URL
+        if not self.url:
+            raise InvalidURLError("url is required")
+        if not self.url.startswith(("http://", "https://")):
+            raise InvalidURLError("url should start with http:// or https://")
+
+    def _init_params(self):
+        """
+        Almost same as _init_headers(), difference:
+        1. response a list tuple to support same key, like 'aa=1&aa=2'
+        2. param value may have '\n', we need to splitlines then extract the variable value.
+        """
+        result = []
+        for line in self.node_data.params.splitlines():
+            if not (line := line.strip()):
+                continue
+
+            key, *value = line.split(":", 1)
+            if not (key := key.strip()):
+                continue
+
+            value_str = value[0].strip() if value else ""
+            result.append(
+                (self.variable_pool.convert_template(key).text, self.variable_pool.convert_template(value_str).text)
+            )
+
+        self.params = result
+
+    def _init_headers(self):
+        """
+        Convert the header string of frontend to a dictionary.
+
+        Each line in the header string represents a key-value pair.
+        Keys and values are separated by ':'.
+        Empty values are allowed.
+
+        Examples:
+            'aa:bb\n cc:dd'  -> {'aa': 'bb', 'cc': 'dd'}
+            'aa:\n cc:dd\n'  -> {'aa': '', 'cc': 'dd'}
+            'aa\n cc : dd'   -> {'aa': '', 'cc': 'dd'}
+
+        """
+        headers = self.variable_pool.convert_template(self.node_data.headers).text
+        self.headers = {
+            key.strip(): (value[0].strip() if value else "")
+            for line in headers.splitlines()
+            if line.strip()
+            for key, *value in [line.split(":", 1)]
+        }
+
+    def _init_body(self):
+        body = self.node_data.body
+        if body is not None:
+            data = body.data
+            match body.type:
+                case "none":
+                    self.content = ""
+                case "raw-text":
+                    if len(data) != 1:
+                        raise RequestBodyError("raw-text body type should have exactly one item")
+                    self.content = self.variable_pool.convert_template(data[0].value).text
+                case "json":
+                    if len(data) != 1:
+                        raise RequestBodyError("json body type should have exactly one item")
+                    json_string = self.variable_pool.convert_template(data[0].value).text
+                    try:
+                        json_object = json.loads(json_string, strict=False)
+                    except json.JSONDecodeError as e:
+                        raise RequestBodyError(f"Failed to parse JSON: {json_string}") from e
+                    self.json = json_object
+                    # self.json = self._parse_object_contains_variables(json_object)
+                case "binary":
+                    if len(data) != 1:
+                        raise RequestBodyError("binary body type should have exactly one item")
+                    file_selector = data[0].file
+                    file_variable = self.variable_pool.get_file(file_selector)
+                    if file_variable is None:
+                        raise FileFetchError(f"cannot fetch file with selector {file_selector}")
+                    file = file_variable.value
+                    self.content = file_manager.download(file)
+                case "x-www-form-urlencoded":
+                    form_data = {
+                        self.variable_pool.convert_template(item.key).text: self.variable_pool.convert_template(
+                            item.value
+                        ).text
+                        for item in data
+                    }
+                    self.data = form_data
+                case "form-data":
+                    form_data = {
+                        self.variable_pool.convert_template(item.key).text: self.variable_pool.convert_template(
+                            item.value
+                        ).text
+                        for item in filter(lambda item: item.type == "text", data)
+                    }
+                    file_selectors = {
+                        self.variable_pool.convert_template(item.key).text: item.file
+                        for item in filter(lambda item: item.type == "file", data)
+                    }
+
+                    # get files from file_selectors, add support for array file variables
+                    files_list = []
+                    for key, selector in file_selectors.items():
+                        segment = self.variable_pool.get(selector)
+                        if isinstance(segment, FileSegment):
+                            files_list.append((key, [segment.value]))
+                        elif isinstance(segment, ArrayFileSegment):
+                            files_list.append((key, list(segment.value)))
+
+                    # get files from file_manager
+                    files: dict[str, list[tuple[str | None, bytes, str]]] = {}
+                    for key, files_in_segment in files_list:
+                        for file in files_in_segment:
+                            if file.related_id is not None:
+                                file_tuple = (
+                                    file.filename,
+                                    file_manager.download(file),
+                                    file.mime_type or "application/octet-stream",
+                                )
+                                if key not in files:
+                                    files[key] = []
+                                files[key].append(file_tuple)
+
+                    # convert files to list for httpx request
+                    if files:
+                        self.files = []
+                        for key, file_tuples in files.items():
+                            for file_tuple in file_tuples:
+                                self.files.append((key, file_tuple))
+
+                    self.data = form_data
+
+    def _assembling_headers(self) -> dict[str, Any]:
+        authorization = deepcopy(self.auth)
+        headers = deepcopy(self.headers) or {}
+        if self.auth.type == "api-key":
+            if self.auth.config is None:
+                raise AuthorizationConfigError("self.authorization config is required")
+            if authorization.config is None:
+                raise AuthorizationConfigError("authorization config is required")
+
+            if self.auth.config.api_key is None:
+                raise AuthorizationConfigError("api_key is required")
+
+            if not authorization.config.header:
+                authorization.config.header = "Authorization"
+
+            if self.auth.config.type == "bearer":
+                headers[authorization.config.header] = f"Bearer {authorization.config.api_key}"
+            elif self.auth.config.type == "basic":
+                headers[authorization.config.header] = f"Basic {authorization.config.api_key}"
+            elif self.auth.config.type == "custom":
+                headers[authorization.config.header] = authorization.config.api_key or ""
+
+        return headers
+
+    def _validate_and_parse_response(self, response: httpx.Response) -> Response:
+        executor_response = Response(response)
+
+        threshold_size = (
+            dify_config.HTTP_REQUEST_NODE_MAX_BINARY_SIZE
+            if executor_response.is_file
+            else dify_config.HTTP_REQUEST_NODE_MAX_TEXT_SIZE
+        )
+        if executor_response.size > threshold_size:
+            raise ResponseSizeError(
+                f"{'File' if executor_response.is_file else 'Text'} size is too large,"
+                f" max size is {threshold_size / 1024 / 1024:.2f} MB,"
+                f" but current size is {executor_response.readable_size}."
+            )
+
+        return executor_response
+
+    def _do_http_request(self, headers: dict[str, Any]) -> httpx.Response:
+        """
+        do http request depending on api bundle
+        """
+        if self.method not in {
+            "get",
+            "head",
+            "post",
+            "put",
+            "delete",
+            "patch",
+            "options",
+            "GET",
+            "POST",
+            "PUT",
+            "PATCH",
+            "DELETE",
+            "HEAD",
+            "OPTIONS",
+        }:
+            raise InvalidHttpMethodError(f"Invalid http method {self.method}")
+
+        request_args = {
+            "url": self.url,
+            "data": self.data,
+            "files": self.files,
+            "json": self.json,
+            "content": self.content,
+            "headers": headers,
+            "params": self.params,
+            "timeout": (self.timeout.connect, self.timeout.read, self.timeout.write),
+            "follow_redirects": True,
+            "max_retries": self.max_retries,
+        }
+        # request_args = {k: v for k, v in request_args.items() if v is not None}
+        try:
+            response = getattr(ssrf_proxy, self.method.lower())(**request_args)
+        except (ssrf_proxy.MaxRetriesExceededError, httpx.RequestError) as e:
+            raise IntentReconTrainNodeError(str(e))
+        # FIXME: fix type ignore, this maybe httpx type issue
+        return response  # type: ignore
+
+    def invoke(self) -> Response:
+        # assemble headers
+        headers = self._assembling_headers()
+        # do http request
+        response = self._do_http_request(headers)
+        # validate response
+        return self._validate_and_parse_response(response)
+
+    def to_log(self):
+        url_parts = urlparse(self.url)
+        path = url_parts.path or "/"
+
+        # Add query parameters
+        if self.params:
+            query_string = urlencode(self.params)
+            path += f"?{query_string}"
+        elif url_parts.query:
+            path += f"?{url_parts.query}"
+
+        raw = f"{self.method.upper()} {path} HTTP/1.1\r\n"
+        raw += f"Host: {url_parts.netloc}\r\n"
+
+        headers = self._assembling_headers()
+        body = self.node_data.body
+        boundary = f"----WebKitFormBoundary{_generate_random_string(16)}"
+        if body:
+            if "content-type" not in (k.lower() for k in self.headers) and body.type in BODY_TYPE_TO_CONTENT_TYPE:
+                headers["Content-Type"] = BODY_TYPE_TO_CONTENT_TYPE[body.type]
+            if body.type == "form-data":
+                headers["Content-Type"] = f"multipart/form-data; boundary={boundary}"
+        for k, v in headers.items():
+            if self.auth.type == "api-key":
+                authorization_header = "Authorization"
+                if self.auth.config and self.auth.config.header:
+                    authorization_header = self.auth.config.header
+                if k.lower() == authorization_header.lower():
+                    raw += f"{k}: {'*' * len(v)}\r\n"
+                    continue
+            raw += f"{k}: {v}\r\n"
+
+        body_string = ""
+        if self.files:
+            for key, (filename, content, mime_type) in self.files:
+                body_string += f"--{boundary}\r\n"
+                body_string += f'Content-Disposition: form-data; name="{key}"\r\n\r\n'
+                # decode content
+                try:
+                    body_string += content.decode("utf-8")
+                except UnicodeDecodeError:
+                    # fix: decode binary content
+                    pass
+                body_string += "\r\n"
+            body_string += f"--{boundary}--\r\n"
+        elif self.node_data.body:
+            if self.content:
+                if isinstance(self.content, str):
+                    body_string = self.content
+                elif isinstance(self.content, bytes):
+                    body_string = self.content.decode("utf-8", errors="replace")
+            elif self.data and self.node_data.body.type == "x-www-form-urlencoded":
+                body_string = urlencode(self.data)
+            elif self.data and self.node_data.body.type == "form-data":
+                for key, value in self.data.items():
+                    body_string += f"--{boundary}\r\n"
+                    body_string += f'Content-Disposition: form-data; name="{key}"\r\n\r\n'
+                    body_string += f"{value}\r\n"
+                body_string += f"--{boundary}--\r\n"
+            elif self.json:
+                body_string = json.dumps(self.json)
+            elif self.node_data.body.type == "raw-text":
+                if len(self.node_data.body.data) != 1:
+                    raise RequestBodyError("raw-text body type should have exactly one item")
+                body_string = self.node_data.body.data[0].value
+        if body_string:
+            raw += f"Content-Length: {len(body_string)}\r\n"
+        raw += "\r\n"  # Empty line between headers and body
+        raw += body_string
+
+        return raw
+
+
+def _generate_random_string(n: int) -> str:
+    """
+    Generate a random string of lowercase ASCII letters.
+
+    Args:
+        n (int): The length of the random string to generate.
+
+    Returns:
+        str: A random string of lowercase ASCII letters with length n.
+
+    Example:
+        >>> _generate_random_string(5)
+        'abcde'
+    """
+    return "".join([chr(randint(97, 122)) for _ in range(n)])

+ 212 - 0
api/core/workflow/nodes/intent_recon_train/node.py

@@ -0,0 +1,212 @@
+import logging
+import mimetypes
+from collections.abc import Mapping, Sequence
+from typing import Any, Optional
+
+from configs import dify_config
+from core.file import File, FileTransferMethod
+from core.tools.tool_file_manager import ToolFileManager
+from core.workflow.entities.node_entities import NodeRunResult
+from core.workflow.entities.variable_entities import VariableSelector
+from core.workflow.nodes.base import BaseNode
+from core.workflow.nodes.enums import NodeType
+from core.workflow.nodes.http_request.executor import Executor
+from core.workflow.utils import variable_template_parser
+from factories import file_factory
+from models.workflow import WorkflowNodeExecutionStatus
+
+from .entities import (
+    IntentReconTrainNodeData,
+    IntentReconTrainNodeTimeout,
+    Response,
+)
+from .exc import IntentReconTrainNodeError, RequestBodyError
+
+INTENT_RECON_TRAIN_DEFAULT_TIMEOUT = IntentReconTrainNodeTimeout(
+    connect=dify_config.INTENT_RECON_TRAIN_MAX_CONNECT_TIMEOUT,
+    read=dify_config.INTENT_RECON_TRAIN_MAX_READ_TIMEOUT,
+    write=dify_config.INTENT_RECON_TRAIN_MAX_WRITE_TIMEOUT,
+)
+
+logger = logging.getLogger(__name__)
+
+
+class IntentReconTrainNode(BaseNode[IntentReconTrainNodeData]):
+    _node_data_cls = IntentReconTrainNodeData
+    _node_type = NodeType.INTENT_RECON_TRAIN
+
+    @classmethod
+    def get_default_config(cls, filters: Optional[dict[str, Any]] = None) -> dict:
+        return {
+            "type": "intent-recon-train",
+            "config": {
+                "method": "get",
+                "authorization": {
+                    "type": "no-auth",
+                },
+                "body": {"type": "none"},
+                "timeout": {
+                    **INTENT_RECON_TRAIN_DEFAULT_TIMEOUT.model_dump(),
+                    "max_connect_timeout": dify_config.INTENT_RECON_TRAIN_MAX_CONNECT_TIMEOUT,
+                    "max_read_timeout": dify_config.INTENT_RECON_TRAIN_MAX_READ_TIMEOUT,
+                    "max_write_timeout": dify_config.INTENT_RECON_TRAIN_MAX_WRITE_TIMEOUT,
+                },
+            },
+            "retry_config": {
+                "max_retries": dify_config.SSRF_DEFAULT_MAX_RETRIES,
+                "retry_interval": 0.5 * (2**2),
+                "retry_enabled": True,
+            },
+        }
+
+    def _run(self) -> NodeRunResult:
+        process_data = {}
+        try:
+            http_executor = Executor(
+                node_data=self.node_data,
+                timeout=self._get_request_timeout(self.node_data),
+                variable_pool=self.graph_runtime_state.variable_pool,
+                max_retries=0,
+            )
+            process_data["request"] = http_executor.to_log()
+
+            response = http_executor.invoke()
+            files = self.extract_files(url=http_executor.url, response=response)
+            if not response.response.is_success and (self.should_continue_on_error or self.should_retry):
+                return NodeRunResult(
+                    status=WorkflowNodeExecutionStatus.FAILED,
+                    outputs={
+                        "status_code": response.status_code,
+                        "body": response.text if not files else "",
+                        "headers": response.headers,
+                        "files": files,
+                    },
+                    process_data={
+                        "request": http_executor.to_log(),
+                    },
+                    error=f"Request failed with status code {response.status_code}",
+                    error_type="HTTPResponseCodeError",
+                )
+            return NodeRunResult(
+                status=WorkflowNodeExecutionStatus.SUCCEEDED,
+                outputs={
+                    "status_code": response.status_code,
+                    "body": response.text if not files else "",
+                    "headers": response.headers,
+                    "files": files,
+                },
+                process_data={
+                    "request": http_executor.to_log(),
+                },
+            )
+        except IntentReconTrainNodeError as e:
+            logger.warning(f"http request node {self.node_id} failed to run: {e}")
+            return NodeRunResult(
+                status=WorkflowNodeExecutionStatus.FAILED,
+                error=str(e),
+                process_data=process_data,
+                error_type=type(e).__name__,
+            )
+
+    @staticmethod
+    def _get_request_timeout(node_data: IntentReconTrainNodeData) -> IntentReconTrainNodeTimeout:
+        timeout = node_data.timeout
+        if timeout is None:
+            return INTENT_RECON_TRAIN_DEFAULT_TIMEOUT
+
+        timeout.connect = timeout.connect or INTENT_RECON_TRAIN_DEFAULT_TIMEOUT.connect
+        timeout.read = timeout.read or INTENT_RECON_TRAIN_DEFAULT_TIMEOUT.read
+        timeout.write = timeout.write or INTENT_RECON_TRAIN_DEFAULT_TIMEOUT.write
+        return timeout
+
+    @classmethod
+    def _extract_variable_selector_to_variable_mapping(
+        cls,
+        *,
+        graph_config: Mapping[str, Any],
+        node_id: str,
+        node_data: IntentReconTrainNodeData,
+    ) -> Mapping[str, Sequence[str]]:
+        selectors: list[VariableSelector] = []
+        selectors += variable_template_parser.extract_selectors_from_template(node_data.url)
+        selectors += variable_template_parser.extract_selectors_from_template(node_data.headers)
+        selectors += variable_template_parser.extract_selectors_from_template(node_data.params)
+        if node_data.body:
+            body_type = node_data.body.type
+            data = node_data.body.data
+            match body_type:
+                case "binary":
+                    if len(data) != 1:
+                        raise RequestBodyError("invalid body data, should have only one item")
+                    selector = data[0].file
+                    selectors.append(VariableSelector(variable="#" + ".".join(selector) + "#", value_selector=selector))
+                case "json" | "raw-text":
+                    if len(data) != 1:
+                        raise RequestBodyError("invalid body data, should have only one item")
+                    selectors += variable_template_parser.extract_selectors_from_template(data[0].key)
+                    selectors += variable_template_parser.extract_selectors_from_template(data[0].value)
+                case "x-www-form-urlencoded":
+                    for item in data:
+                        selectors += variable_template_parser.extract_selectors_from_template(item.key)
+                        selectors += variable_template_parser.extract_selectors_from_template(item.value)
+                case "form-data":
+                    for item in data:
+                        selectors += variable_template_parser.extract_selectors_from_template(item.key)
+                        if item.type == "text":
+                            selectors += variable_template_parser.extract_selectors_from_template(item.value)
+                        elif item.type == "file":
+                            selectors.append(
+                                VariableSelector(variable="#" + ".".join(item.file) + "#", value_selector=item.file)
+                            )
+
+        mapping = {}
+        for selector_iter in selectors:
+            mapping[node_id + "." + selector_iter.variable] = selector_iter.value_selector
+
+        return mapping
+
+    def extract_files(self, url: str, response: Response) -> list[File]:
+        """
+        Extract files from response by checking both Content-Type header and URL
+        """
+        files: list[File] = []
+        is_file = response.is_file
+        content_type = response.content_type
+        content = response.content
+        parsed_content_disposition = response.parsed_content_disposition
+        content_disposition_type = None
+
+        if not is_file:
+            return files
+
+        if parsed_content_disposition:
+            content_disposition_filename = parsed_content_disposition.get_filename()
+            if content_disposition_filename:
+                # If filename is available from content-disposition, use it to guess the content type
+                content_disposition_type = mimetypes.guess_type(content_disposition_filename)[0]
+
+        # Guess file extension from URL or Content-Type header
+        filename = url.split("?")[0].split("/")[-1] or ""
+        mime_type = (
+            content_disposition_type or content_type or mimetypes.guess_type(filename)[0] or "application/octet-stream"
+        )
+
+        tool_file = ToolFileManager.create_file_by_raw(
+            user_id=self.user_id,
+            tenant_id=self.tenant_id,
+            conversation_id=None,
+            file_binary=content,
+            mimetype=mime_type,
+        )
+
+        mapping = {
+            "tool_file_id": tool_file.id,
+            "transfer_method": FileTransferMethod.TOOL_FILE.value,
+        }
+        file = file_factory.build_from_mapping(
+            mapping=mapping,
+            tenant_id=self.tenant_id,
+        )
+        files.append(file)
+
+        return files

+ 5 - 0
api/core/workflow/nodes/node_mapping.py

@@ -9,6 +9,7 @@ from core.workflow.nodes.end import EndNode
 from core.workflow.nodes.enums import NodeType
 from core.workflow.nodes.http_request import HttpRequestNode
 from core.workflow.nodes.if_else import IfElseNode
+from core.workflow.nodes.intent_recon_train import IntentReconTrainNode
 from core.workflow.nodes.iteration import IterationNode, IterationStartNode
 from core.workflow.nodes.knowledge_retrieval import KnowledgeRetrievalNode
 from core.workflow.nodes.list_operator import ListOperatorNode
@@ -66,6 +67,10 @@ NODE_TYPE_CLASSES_MAPPING: Mapping[NodeType, Mapping[str, type[BaseNode]]] = {
         LATEST_VERSION: HttpRequestNode,
         "1": HttpRequestNode,
     },
+    NodeType.INTENT_RECON_TRAIN: {
+        LATEST_VERSION: IntentReconTrainNode,
+        "1": IntentReconTrainNode,
+    },
     NodeType.TOOL: {
         LATEST_VERSION: ToolNode,
         "1": ToolNode,

+ 4 - 4
api/services/dataset_service.py

@@ -591,7 +591,7 @@ class TemplateService:
 
         file_name = upload_file.name
         file_id = upload_file.id
-        file_url=upload_file.key
+        file_url = upload_file.key
         data_source_info = {"upload_file_id": upload_file.id}
         template = TemplateService.build_template(
             dataset,
@@ -604,7 +604,7 @@ class TemplateService:
             file_id,
             file_name,
             batch,
-            file_url
+            file_url,
         )
         db.session.add(template)
         db.session.flush()
@@ -634,7 +634,7 @@ class TemplateService:
         file_id: str,
         file_name: str,
         batch: str,
-        file_url: str
+        file_url: str,
     ):
         template = Template(
             tenant_id=dataset.tenant_id,
@@ -648,7 +648,7 @@ class TemplateService:
             created_from=created_from,
             created_by=account.id,
             file_id=file_id,
-            file_url="./storage/"+file_url
+            file_url="./storage/" + file_url,
         )
         return template