milvus.py 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859
  1. """Wrapper around the Milvus vector database."""
  2. from __future__ import annotations
  3. import logging
  4. from typing import Any, Iterable, List, Optional, Tuple, Union, Sequence
  5. from uuid import uuid4
  6. import numpy as np
  7. from langchain.docstore.document import Document
  8. from langchain.embeddings.base import Embeddings
  9. from langchain.vectorstores.base import VectorStore
  10. from langchain.vectorstores.utils import maximal_marginal_relevance
  11. logger = logging.getLogger(__name__)
  12. DEFAULT_MILVUS_CONNECTION = {
  13. "host": "localhost",
  14. "port": "19530",
  15. "user": "",
  16. "password": "",
  17. "secure": False,
  18. }
  19. class Milvus(VectorStore):
  20. """Initialize wrapper around the milvus vector database.
  21. In order to use this you need to have `pymilvus` installed and a
  22. running Milvus
  23. See the following documentation for how to run a Milvus instance:
  24. https://milvus.io/docs/install_standalone-docker.md
  25. If looking for a hosted Milvus, take a look at this documentation:
  26. https://zilliz.com/cloud and make use of the Zilliz vectorstore found in
  27. this project,
  28. IF USING L2/IP metric IT IS HIGHLY SUGGESTED TO NORMALIZE YOUR DATA.
  29. Args:
  30. embedding_function (Embeddings): Function used to embed the text.
  31. collection_name (str): Which Milvus collection to use. Defaults to
  32. "LangChainCollection".
  33. connection_args (Optional[dict[str, any]]): The connection args used for
  34. this class comes in the form of a dict.
  35. consistency_level (str): The consistency level to use for a collection.
  36. Defaults to "Session".
  37. index_params (Optional[dict]): Which index params to use. Defaults to
  38. HNSW/AUTOINDEX depending on service.
  39. search_params (Optional[dict]): Which search params to use. Defaults to
  40. default of index.
  41. drop_old (Optional[bool]): Whether to drop the current collection. Defaults
  42. to False.
  43. The connection args used for this class comes in the form of a dict,
  44. here are a few of the options:
  45. address (str): The actual address of Milvus
  46. instance. Example address: "localhost:19530"
  47. uri (str): The uri of Milvus instance. Example uri:
  48. "http://randomwebsite:19530",
  49. "tcp:foobarsite:19530",
  50. "https://ok.s3.south.com:19530".
  51. host (str): The host of Milvus instance. Default at "localhost",
  52. PyMilvus will fill in the default host if only port is provided.
  53. port (str/int): The port of Milvus instance. Default at 19530, PyMilvus
  54. will fill in the default port if only host is provided.
  55. user (str): Use which user to connect to Milvus instance. If user and
  56. password are provided, we will add related header in every RPC call.
  57. password (str): Required when user is provided. The password
  58. corresponding to the user.
  59. secure (bool): Default is false. If set to true, tls will be enabled.
  60. client_key_path (str): If use tls two-way authentication, need to
  61. write the client.key path.
  62. client_pem_path (str): If use tls two-way authentication, need to
  63. write the client.pem path.
  64. ca_pem_path (str): If use tls two-way authentication, need to write
  65. the ca.pem path.
  66. server_pem_path (str): If use tls one-way authentication, need to
  67. write the server.pem path.
  68. server_name (str): If use tls, need to write the common name.
  69. Example:
  70. .. code-block:: python
  71. from langchain import Milvus
  72. from langchain.embeddings import OpenAIEmbeddings
  73. embedding = OpenAIEmbeddings()
  74. # Connect to a milvus instance on localhost
  75. milvus_store = Milvus(
  76. embedding_function = Embeddings,
  77. collection_name = "LangChainCollection",
  78. drop_old = True,
  79. )
  80. Raises:
  81. ValueError: If the pymilvus python package is not installed.
  82. """
  83. def __init__(
  84. self,
  85. embedding_function: Embeddings,
  86. collection_name: str = "LangChainCollection",
  87. connection_args: Optional[dict[str, Any]] = None,
  88. consistency_level: str = "Session",
  89. index_params: Optional[dict] = None,
  90. search_params: Optional[dict] = None,
  91. drop_old: Optional[bool] = False,
  92. ):
  93. """Initialize the Milvus vector store."""
  94. try:
  95. from pymilvus import Collection, utility
  96. except ImportError:
  97. raise ValueError(
  98. "Could not import pymilvus python package. "
  99. "Please install it with `pip install pymilvus`."
  100. )
  101. # Default search params when one is not provided.
  102. self.default_search_params = {
  103. "IVF_FLAT": {"metric_type": "L2", "params": {"nprobe": 10}},
  104. "IVF_SQ8": {"metric_type": "L2", "params": {"nprobe": 10}},
  105. "IVF_PQ": {"metric_type": "L2", "params": {"nprobe": 10}},
  106. "HNSW": {"metric_type": "L2", "params": {"ef": 10}},
  107. "RHNSW_FLAT": {"metric_type": "L2", "params": {"ef": 10}},
  108. "RHNSW_SQ": {"metric_type": "L2", "params": {"ef": 10}},
  109. "RHNSW_PQ": {"metric_type": "L2", "params": {"ef": 10}},
  110. "IVF_HNSW": {"metric_type": "L2", "params": {"nprobe": 10, "ef": 10}},
  111. "ANNOY": {"metric_type": "L2", "params": {"search_k": 10}},
  112. "AUTOINDEX": {"metric_type": "L2", "params": {}},
  113. }
  114. self.embedding_func = embedding_function
  115. self.collection_name = collection_name
  116. self.index_params = index_params
  117. self.search_params = search_params
  118. self.consistency_level = consistency_level
  119. # In order for a collection to be compatible, pk needs to be auto'id and int
  120. self._primary_field = "id"
  121. # In order for compatibility, the text field will need to be called "text"
  122. self._text_field = "page_content"
  123. # In order for compatibility, the vector field needs to be called "vector"
  124. self._vector_field = "vectors"
  125. # In order for compatibility, the metadata field will need to be called "metadata"
  126. self._metadata_field = "metadata"
  127. self.fields: list[str] = []
  128. # Create the connection to the server
  129. if connection_args is None:
  130. connection_args = DEFAULT_MILVUS_CONNECTION
  131. self.alias = self._create_connection_alias(connection_args)
  132. self.col: Optional[Collection] = None
  133. # Grab the existing collection if it exists
  134. if utility.has_collection(self.collection_name, using=self.alias):
  135. self.col = Collection(
  136. self.collection_name,
  137. using=self.alias,
  138. )
  139. # If need to drop old, drop it
  140. if drop_old and isinstance(self.col, Collection):
  141. self.col.drop()
  142. self.col = None
  143. # Initialize the vector store
  144. self._init()
  145. @property
  146. def embeddings(self) -> Embeddings:
  147. return self.embedding_func
  148. def _create_connection_alias(self, connection_args: dict) -> str:
  149. """Create the connection to the Milvus server."""
  150. from pymilvus import MilvusException, connections
  151. # Grab the connection arguments that are used for checking existing connection
  152. host: str = connection_args.get("host", None)
  153. port: Union[str, int] = connection_args.get("port", None)
  154. address: str = connection_args.get("address", None)
  155. uri: str = connection_args.get("uri", None)
  156. user = connection_args.get("user", None)
  157. # Order of use is host/port, uri, address
  158. if host is not None and port is not None:
  159. given_address = str(host) + ":" + str(port)
  160. elif uri is not None:
  161. given_address = uri.split("https://")[1]
  162. elif address is not None:
  163. given_address = address
  164. else:
  165. given_address = None
  166. logger.debug("Missing standard address type for reuse atttempt")
  167. # User defaults to empty string when getting connection info
  168. if user is not None:
  169. tmp_user = user
  170. else:
  171. tmp_user = ""
  172. # If a valid address was given, then check if a connection exists
  173. if given_address is not None:
  174. for con in connections.list_connections():
  175. addr = connections.get_connection_addr(con[0])
  176. if (
  177. con[1]
  178. and ("address" in addr)
  179. and (addr["address"] == given_address)
  180. and ("user" in addr)
  181. and (addr["user"] == tmp_user)
  182. ):
  183. logger.debug("Using previous connection: %s", con[0])
  184. return con[0]
  185. # Generate a new connection if one doesn't exist
  186. alias = uuid4().hex
  187. try:
  188. connections.connect(alias=alias, **connection_args)
  189. logger.debug("Created new connection using: %s", alias)
  190. return alias
  191. except MilvusException as e:
  192. logger.error("Failed to create new connection using: %s", alias)
  193. raise e
  194. def _init(
  195. self, embeddings: Optional[list] = None, metadatas: Optional[list[dict]] = None
  196. ) -> None:
  197. if embeddings is not None:
  198. self._create_collection(embeddings, metadatas)
  199. self._extract_fields()
  200. self._create_index()
  201. self._create_search_params()
  202. self._load()
  203. def _create_collection(
  204. self, embeddings: list, metadatas: Optional[list[dict]] = None
  205. ) -> None:
  206. from pymilvus import (
  207. Collection,
  208. CollectionSchema,
  209. DataType,
  210. FieldSchema,
  211. MilvusException,
  212. )
  213. from pymilvus.orm.types import infer_dtype_bydata
  214. # Determine embedding dim
  215. dim = len(embeddings[0])
  216. fields = []
  217. # Determine metadata schema
  218. # if metadatas:
  219. # # Create FieldSchema for each entry in metadata.
  220. # for key, value in metadatas[0].items():
  221. # # Infer the corresponding datatype of the metadata
  222. # dtype = infer_dtype_bydata(value)
  223. # # Datatype isn't compatible
  224. # if dtype == DataType.UNKNOWN or dtype == DataType.NONE:
  225. # logger.error(
  226. # "Failure to create collection, unrecognized dtype for key: %s",
  227. # key,
  228. # )
  229. # raise ValueError(f"Unrecognized datatype for {key}.")
  230. # # Dataype is a string/varchar equivalent
  231. # elif dtype == DataType.VARCHAR:
  232. # fields.append(FieldSchema(key, DataType.VARCHAR, max_length=65_535))
  233. # else:
  234. # fields.append(FieldSchema(key, dtype))
  235. if metadatas:
  236. fields.append(FieldSchema(self._metadata_field, DataType.JSON, max_length=65_535))
  237. # Create the text field
  238. fields.append(
  239. FieldSchema(self._text_field, DataType.VARCHAR, max_length=65_535)
  240. )
  241. # Create the primary key field
  242. fields.append(
  243. FieldSchema(
  244. self._primary_field, DataType.INT64, is_primary=True, auto_id=True
  245. )
  246. )
  247. # Create the vector field, supports binary or float vectors
  248. fields.append(
  249. FieldSchema(self._vector_field, infer_dtype_bydata(embeddings[0]), dim=dim)
  250. )
  251. # Create the schema for the collection
  252. schema = CollectionSchema(fields)
  253. # Create the collection
  254. try:
  255. self.col = Collection(
  256. name=self.collection_name,
  257. schema=schema,
  258. consistency_level=self.consistency_level,
  259. using=self.alias,
  260. )
  261. except MilvusException as e:
  262. logger.error(
  263. "Failed to create collection: %s error: %s", self.collection_name, e
  264. )
  265. raise e
  266. def _extract_fields(self) -> None:
  267. """Grab the existing fields from the Collection"""
  268. from pymilvus import Collection
  269. if isinstance(self.col, Collection):
  270. schema = self.col.schema
  271. for x in schema.fields:
  272. self.fields.append(x.name)
  273. # Since primary field is auto-id, no need to track it
  274. self.fields.remove(self._primary_field)
  275. def _get_index(self) -> Optional[dict[str, Any]]:
  276. """Return the vector index information if it exists"""
  277. from pymilvus import Collection
  278. if isinstance(self.col, Collection):
  279. for x in self.col.indexes:
  280. if x.field_name == self._vector_field:
  281. return x.to_dict()
  282. return None
  283. def _create_index(self) -> None:
  284. """Create a index on the collection"""
  285. from pymilvus import Collection, MilvusException
  286. if isinstance(self.col, Collection) and self._get_index() is None:
  287. try:
  288. # If no index params, use a default HNSW based one
  289. if self.index_params is None:
  290. self.index_params = {
  291. "metric_type": "IP",
  292. "index_type": "HNSW",
  293. "params": {"M": 8, "efConstruction": 64},
  294. }
  295. try:
  296. self.col.create_index(
  297. self._vector_field,
  298. index_params=self.index_params,
  299. using=self.alias,
  300. )
  301. # If default did not work, most likely on Zilliz Cloud
  302. except MilvusException:
  303. # Use AUTOINDEX based index
  304. self.index_params = {
  305. "metric_type": "L2",
  306. "index_type": "AUTOINDEX",
  307. "params": {},
  308. }
  309. self.col.create_index(
  310. self._vector_field,
  311. index_params=self.index_params,
  312. using=self.alias,
  313. )
  314. logger.debug(
  315. "Successfully created an index on collection: %s",
  316. self.collection_name,
  317. )
  318. except MilvusException as e:
  319. logger.error(
  320. "Failed to create an index on collection: %s", self.collection_name
  321. )
  322. raise e
  323. def _create_search_params(self) -> None:
  324. """Generate search params based on the current index type"""
  325. from pymilvus import Collection
  326. if isinstance(self.col, Collection) and self.search_params is None:
  327. index = self._get_index()
  328. if index is not None:
  329. index_type: str = index["index_param"]["index_type"]
  330. metric_type: str = index["index_param"]["metric_type"]
  331. self.search_params = self.default_search_params[index_type]
  332. self.search_params["metric_type"] = metric_type
  333. def _load(self) -> None:
  334. """Load the collection if available."""
  335. from pymilvus import Collection
  336. if isinstance(self.col, Collection) and self._get_index() is not None:
  337. self.col.load()
  338. def add_texts(
  339. self,
  340. texts: Iterable[str],
  341. metadatas: Optional[List[dict]] = None,
  342. timeout: Optional[int] = None,
  343. batch_size: int = 1000,
  344. **kwargs: Any,
  345. ) -> List[str]:
  346. """Insert text data into Milvus.
  347. Inserting data when the collection has not be made yet will result
  348. in creating a new Collection. The data of the first entity decides
  349. the schema of the new collection, the dim is extracted from the first
  350. embedding and the columns are decided by the first metadata dict.
  351. Metada keys will need to be present for all inserted values. At
  352. the moment there is no None equivalent in Milvus.
  353. Args:
  354. texts (Iterable[str]): The texts to embed, it is assumed
  355. that they all fit in memory.
  356. metadatas (Optional[List[dict]]): Metadata dicts attached to each of
  357. the texts. Defaults to None.
  358. timeout (Optional[int]): Timeout for each batch insert. Defaults
  359. to None.
  360. batch_size (int, optional): Batch size to use for insertion.
  361. Defaults to 1000.
  362. Raises:
  363. MilvusException: Failure to add texts
  364. Returns:
  365. List[str]: The resulting keys for each inserted element.
  366. """
  367. from pymilvus import Collection, MilvusException
  368. texts = list(texts)
  369. try:
  370. embeddings = self.embedding_func.embed_documents(texts)
  371. except NotImplementedError:
  372. embeddings = [self.embedding_func.embed_query(x) for x in texts]
  373. if len(embeddings) == 0:
  374. logger.debug("Nothing to insert, skipping.")
  375. return []
  376. # If the collection hasn't been initialized yet, perform all steps to do so
  377. if not isinstance(self.col, Collection):
  378. self._init(embeddings, metadatas)
  379. # Dict to hold all insert columns
  380. insert_dict: dict[str, list] = {
  381. self._text_field: texts,
  382. self._vector_field: embeddings,
  383. }
  384. # Collect the metadata into the insert dict.
  385. # if metadatas is not None:
  386. # for d in metadatas:
  387. # for key, value in d.items():
  388. # if key in self.fields:
  389. # insert_dict.setdefault(key, []).append(value)
  390. if metadatas is not None:
  391. for d in metadatas:
  392. insert_dict.setdefault(self._metadata_field, []).append(d)
  393. # Total insert count
  394. vectors: list = insert_dict[self._vector_field]
  395. total_count = len(vectors)
  396. pks: list[str] = []
  397. assert isinstance(self.col, Collection)
  398. for i in range(0, total_count, batch_size):
  399. # Grab end index
  400. end = min(i + batch_size, total_count)
  401. # Convert dict to list of lists batch for insertion
  402. insert_list = [insert_dict[x][i:end] for x in self.fields]
  403. # Insert into the collection.
  404. try:
  405. res: Collection
  406. res = self.col.insert(insert_list, timeout=timeout, **kwargs)
  407. pks.extend(res.primary_keys)
  408. except MilvusException as e:
  409. logger.error(
  410. "Failed to insert batch starting at entity: %s/%s", i, total_count
  411. )
  412. raise e
  413. return pks
  414. def similarity_search(
  415. self,
  416. query: str,
  417. k: int = 4,
  418. param: Optional[dict] = None,
  419. expr: Optional[str] = None,
  420. timeout: Optional[int] = None,
  421. **kwargs: Any,
  422. ) -> List[Document]:
  423. """Perform a similarity search against the query string.
  424. Args:
  425. query (str): The text to search.
  426. k (int, optional): How many results to return. Defaults to 4.
  427. param (dict, optional): The search params for the index type.
  428. Defaults to None.
  429. expr (str, optional): Filtering expression. Defaults to None.
  430. timeout (int, optional): How long to wait before timeout error.
  431. Defaults to None.
  432. kwargs: Collection.search() keyword arguments.
  433. Returns:
  434. List[Document]: Document results for search.
  435. """
  436. if self.col is None:
  437. logger.debug("No existing collection to search.")
  438. return []
  439. res = self.similarity_search_with_score(
  440. query=query, k=k, param=param, expr=expr, timeout=timeout, **kwargs
  441. )
  442. return [doc for doc, _ in res]
  443. def similarity_search_by_vector(
  444. self,
  445. embedding: List[float],
  446. k: int = 4,
  447. param: Optional[dict] = None,
  448. expr: Optional[str] = None,
  449. timeout: Optional[int] = None,
  450. **kwargs: Any,
  451. ) -> List[Document]:
  452. """Perform a similarity search against the query string.
  453. Args:
  454. embedding (List[float]): The embedding vector to search.
  455. k (int, optional): How many results to return. Defaults to 4.
  456. param (dict, optional): The search params for the index type.
  457. Defaults to None.
  458. expr (str, optional): Filtering expression. Defaults to None.
  459. timeout (int, optional): How long to wait before timeout error.
  460. Defaults to None.
  461. kwargs: Collection.search() keyword arguments.
  462. Returns:
  463. List[Document]: Document results for search.
  464. """
  465. if self.col is None:
  466. logger.debug("No existing collection to search.")
  467. return []
  468. res = self.similarity_search_with_score_by_vector(
  469. embedding=embedding, k=k, param=param, expr=expr, timeout=timeout, **kwargs
  470. )
  471. return [doc for doc, _ in res]
  472. def similarity_search_with_score(
  473. self,
  474. query: str,
  475. k: int = 4,
  476. param: Optional[dict] = None,
  477. expr: Optional[str] = None,
  478. timeout: Optional[int] = None,
  479. **kwargs: Any,
  480. ) -> List[Tuple[Document, float]]:
  481. """Perform a search on a query string and return results with score.
  482. For more information about the search parameters, take a look at the pymilvus
  483. documentation found here:
  484. https://milvus.io/api-reference/pymilvus/v2.2.6/Collection/search().md
  485. Args:
  486. query (str): The text being searched.
  487. k (int, optional): The amount of results to return. Defaults to 4.
  488. param (dict): The search params for the specified index.
  489. Defaults to None.
  490. expr (str, optional): Filtering expression. Defaults to None.
  491. timeout (int, optional): How long to wait before timeout error.
  492. Defaults to None.
  493. kwargs: Collection.search() keyword arguments.
  494. Returns:
  495. List[float], List[Tuple[Document, any, any]]:
  496. """
  497. if self.col is None:
  498. logger.debug("No existing collection to search.")
  499. return []
  500. # Embed the query text.
  501. embedding = self.embedding_func.embed_query(query)
  502. res = self.similarity_search_with_score_by_vector(
  503. embedding=embedding, k=k, param=param, expr=expr, timeout=timeout, **kwargs
  504. )
  505. return res
  506. def _similarity_search_with_relevance_scores(
  507. self,
  508. query: str,
  509. k: int = 4,
  510. **kwargs: Any,
  511. ) -> List[Tuple[Document, float]]:
  512. """Return docs and relevance scores in the range [0, 1].
  513. 0 is dissimilar, 1 is most similar.
  514. Args:
  515. query: input text
  516. k: Number of Documents to return. Defaults to 4.
  517. **kwargs: kwargs to be passed to similarity search. Should include:
  518. score_threshold: Optional, a floating point value between 0 to 1 to
  519. filter the resulting set of retrieved docs
  520. Returns:
  521. List of Tuples of (doc, similarity_score)
  522. """
  523. return self.similarity_search_with_score(query, k, **kwargs)
  524. def similarity_search_with_score_by_vector(
  525. self,
  526. embedding: List[float],
  527. k: int = 4,
  528. param: Optional[dict] = None,
  529. expr: Optional[str] = None,
  530. timeout: Optional[int] = None,
  531. **kwargs: Any,
  532. ) -> List[Tuple[Document, float]]:
  533. """Perform a search on a query string and return results with score.
  534. For more information about the search parameters, take a look at the pymilvus
  535. documentation found here:
  536. https://milvus.io/api-reference/pymilvus/v2.2.6/Collection/search().md
  537. Args:
  538. embedding (List[float]): The embedding vector being searched.
  539. k (int, optional): The amount of results to return. Defaults to 4.
  540. param (dict): The search params for the specified index.
  541. Defaults to None.
  542. expr (str, optional): Filtering expression. Defaults to None.
  543. timeout (int, optional): How long to wait before timeout error.
  544. Defaults to None.
  545. kwargs: Collection.search() keyword arguments.
  546. Returns:
  547. List[Tuple[Document, float]]: Result doc and score.
  548. """
  549. if self.col is None:
  550. logger.debug("No existing collection to search.")
  551. return []
  552. if param is None:
  553. param = self.search_params
  554. # Determine result metadata fields.
  555. output_fields = self.fields[:]
  556. output_fields.remove(self._vector_field)
  557. # Perform the search.
  558. res = self.col.search(
  559. data=[embedding],
  560. anns_field=self._vector_field,
  561. param=param,
  562. limit=k,
  563. expr=expr,
  564. output_fields=output_fields,
  565. timeout=timeout,
  566. **kwargs,
  567. )
  568. # Organize results.
  569. ret = []
  570. for result in res[0]:
  571. meta = {x: result.entity.get(x) for x in output_fields}
  572. doc = Document(page_content=meta.pop(self._text_field), metadata=meta.get('metadata'))
  573. pair = (doc, result.score)
  574. ret.append(pair)
  575. return ret
  576. def max_marginal_relevance_search(
  577. self,
  578. query: str,
  579. k: int = 4,
  580. fetch_k: int = 20,
  581. lambda_mult: float = 0.5,
  582. param: Optional[dict] = None,
  583. expr: Optional[str] = None,
  584. timeout: Optional[int] = None,
  585. **kwargs: Any,
  586. ) -> List[Document]:
  587. """Perform a search and return results that are reordered by MMR.
  588. Args:
  589. query (str): The text being searched.
  590. k (int, optional): How many results to give. Defaults to 4.
  591. fetch_k (int, optional): Total results to select k from.
  592. Defaults to 20.
  593. lambda_mult: Number between 0 and 1 that determines the degree
  594. of diversity among the results with 0 corresponding
  595. to maximum diversity and 1 to minimum diversity.
  596. Defaults to 0.5
  597. param (dict, optional): The search params for the specified index.
  598. Defaults to None.
  599. expr (str, optional): Filtering expression. Defaults to None.
  600. timeout (int, optional): How long to wait before timeout error.
  601. Defaults to None.
  602. kwargs: Collection.search() keyword arguments.
  603. Returns:
  604. List[Document]: Document results for search.
  605. """
  606. if self.col is None:
  607. logger.debug("No existing collection to search.")
  608. return []
  609. embedding = self.embedding_func.embed_query(query)
  610. return self.max_marginal_relevance_search_by_vector(
  611. embedding=embedding,
  612. k=k,
  613. fetch_k=fetch_k,
  614. lambda_mult=lambda_mult,
  615. param=param,
  616. expr=expr,
  617. timeout=timeout,
  618. **kwargs,
  619. )
  620. def max_marginal_relevance_search_by_vector(
  621. self,
  622. embedding: list[float],
  623. k: int = 4,
  624. fetch_k: int = 20,
  625. lambda_mult: float = 0.5,
  626. param: Optional[dict] = None,
  627. expr: Optional[str] = None,
  628. timeout: Optional[int] = None,
  629. **kwargs: Any,
  630. ) -> List[Document]:
  631. """Perform a search and return results that are reordered by MMR.
  632. Args:
  633. embedding (str): The embedding vector being searched.
  634. k (int, optional): How many results to give. Defaults to 4.
  635. fetch_k (int, optional): Total results to select k from.
  636. Defaults to 20.
  637. lambda_mult: Number between 0 and 1 that determines the degree
  638. of diversity among the results with 0 corresponding
  639. to maximum diversity and 1 to minimum diversity.
  640. Defaults to 0.5
  641. param (dict, optional): The search params for the specified index.
  642. Defaults to None.
  643. expr (str, optional): Filtering expression. Defaults to None.
  644. timeout (int, optional): How long to wait before timeout error.
  645. Defaults to None.
  646. kwargs: Collection.search() keyword arguments.
  647. Returns:
  648. List[Document]: Document results for search.
  649. """
  650. if self.col is None:
  651. logger.debug("No existing collection to search.")
  652. return []
  653. if param is None:
  654. param = self.search_params
  655. # Determine result metadata fields.
  656. output_fields = self.fields[:]
  657. output_fields.remove(self._vector_field)
  658. # Perform the search.
  659. res = self.col.search(
  660. data=[embedding],
  661. anns_field=self._vector_field,
  662. param=param,
  663. limit=fetch_k,
  664. expr=expr,
  665. output_fields=output_fields,
  666. timeout=timeout,
  667. **kwargs,
  668. )
  669. # Organize results.
  670. ids = []
  671. documents = []
  672. scores = []
  673. for result in res[0]:
  674. meta = {x: result.entity.get(x) for x in output_fields}
  675. doc = Document(page_content=meta.pop(self._text_field), metadata=meta)
  676. documents.append(doc)
  677. scores.append(result.score)
  678. ids.append(result.id)
  679. vectors = self.col.query(
  680. expr=f"{self._primary_field} in {ids}",
  681. output_fields=[self._primary_field, self._vector_field],
  682. timeout=timeout,
  683. )
  684. # Reorganize the results from query to match search order.
  685. vectors = {x[self._primary_field]: x[self._vector_field] for x in vectors}
  686. ordered_result_embeddings = [vectors[x] for x in ids]
  687. # Get the new order of results.
  688. new_ordering = maximal_marginal_relevance(
  689. np.array(embedding), ordered_result_embeddings, k=k, lambda_mult=lambda_mult
  690. )
  691. # Reorder the values and return.
  692. ret = []
  693. for x in new_ordering:
  694. # Function can return -1 index
  695. if x == -1:
  696. break
  697. else:
  698. ret.append(documents[x])
  699. return ret
  700. @classmethod
  701. def from_texts(
  702. cls,
  703. texts: List[str],
  704. embedding: Embeddings,
  705. metadatas: Optional[List[dict]] = None,
  706. collection_name: str = "LangChainCollection",
  707. connection_args: dict[str, Any] = DEFAULT_MILVUS_CONNECTION,
  708. consistency_level: str = "Session",
  709. index_params: Optional[dict] = None,
  710. search_params: Optional[dict] = None,
  711. drop_old: bool = False,
  712. batch_size: int = 100,
  713. ids: Optional[Sequence[str]] = None,
  714. **kwargs: Any,
  715. ) -> Milvus:
  716. """Create a Milvus collection, indexes it with HNSW, and insert data.
  717. Args:
  718. texts (List[str]): Text data.
  719. embedding (Embeddings): Embedding function.
  720. metadatas (Optional[List[dict]]): Metadata for each text if it exists.
  721. Defaults to None.
  722. collection_name (str, optional): Collection name to use. Defaults to
  723. "LangChainCollection".
  724. connection_args (dict[str, Any], optional): Connection args to use. Defaults
  725. to DEFAULT_MILVUS_CONNECTION.
  726. consistency_level (str, optional): Which consistency level to use. Defaults
  727. to "Session".
  728. index_params (Optional[dict], optional): Which index_params to use. Defaults
  729. to None.
  730. search_params (Optional[dict], optional): Which search params to use.
  731. Defaults to None.
  732. drop_old (Optional[bool], optional): Whether to drop the collection with
  733. that name if it exists. Defaults to False.
  734. batch_size:
  735. How many vectors upload per-request.
  736. Default: 100
  737. ids: Optional[Sequence[str]] = None,
  738. Returns:
  739. Milvus: Milvus Vector Store
  740. """
  741. vector_db = cls(
  742. embedding_function=embedding,
  743. collection_name=collection_name,
  744. connection_args=connection_args,
  745. consistency_level=consistency_level,
  746. index_params=index_params,
  747. search_params=search_params,
  748. drop_old=drop_old,
  749. **kwargs,
  750. )
  751. vector_db.add_texts(texts=texts, metadatas=metadatas, batch_size=batch_size)
  752. return vector_db