langchain-elasticsearch¶
AsyncElasticsearchEmbeddings
¶
Bases: Embeddings
Elasticsearch embedding models.
This class provides an interface to generate embeddings using a model deployed in an Elasticsearch cluster. It requires an Elasticsearch connection and the model_id of the model deployed in the cluster.
In Elasticsearch you need to have an embedding model loaded and deployed. - https://www.elastic.co/guide/en/elasticsearch/reference/current/infer-trained-model.html - https://www.elastic.co/guide/en/machine-learning/current/ml-nlp-deploy-models.html
Setup
Install langchain_elasticsearch and start Elasticsearch locally using
the start-local script.
This will create an elastic-start-local folder. To start Elasticsearch
and Kibana:
Elasticsearch will be available at http://localhost:9200. The password
for the elastic user and API key are stored in the .env file in the
elastic-start-local folder.
Key init args
model_id: str The model_id of the model deployed in the Elasticsearch cluster. input_field: str The name of the key for the input text field in the document. Defaults to 'text_field'.
Key init args — client params: client: Optional[AsyncElasticsearch or Elasticsearch] Pre-existing Elasticsearch connection. Either provide this OR credentials. es_url: Optional[str] URL of the Elasticsearch instance to connect to. es_cloud_id: Optional[str] Cloud ID of the Elasticsearch instance to connect to. es_user: Optional[str] Username to use when connecting to Elasticsearch. es_api_key: Optional[str] API key to use when connecting to Elasticsearch. es_password: Optional[str] Password to use when connecting to Elasticsearch.
Instantiate
Instantiate with API key (URL):
from langchain_elasticsearch import ElasticsearchEmbeddings
embeddings = ElasticsearchEmbeddings(
model_id="your_model_id",
es_url="http://localhost:9200",
es_api_key="your-api-key"
)
Instantiate with username/password (URL):
from langchain_elasticsearch import ElasticsearchEmbeddings
embeddings = ElasticsearchEmbeddings(
model_id="your_model_id",
es_url="http://localhost:9200",
es_user="elastic",
es_password="password"
)
If you want to use a cloud hosted Elasticsearch instance, you can pass in the es_cloud_id argument instead of the es_url argument.
Instantiate from cloud (with username/password):
from langchain_elasticsearch import ElasticsearchEmbeddings
embeddings = ElasticsearchEmbeddings(
model_id="your_model_id",
es_cloud_id="<cloud_id>",
es_user="elastic",
es_password="<password>"
)
Instantiate from cloud (with API key):
from langchain_elasticsearch import ElasticsearchEmbeddings
embeddings = ElasticsearchEmbeddings(
model_id="your_model_id",
es_cloud_id="<cloud_id>",
es_api_key="your-api-key"
)
You can also connect to an existing Elasticsearch instance by passing in a pre-existing Elasticsearch connection via the client argument.
Instantiate from existing connection
For synchronous applications, use the ElasticsearchEmbeddings class.
For asynchronous applications, use the AsyncElasticsearchEmbeddings class.
| METHOD | DESCRIPTION |
|---|---|
__init__ |
Initialize the ElasticsearchEmbeddings instance. |
aembed_documents |
Generate embeddings for a list of documents. |
aembed_query |
Generate an embedding for a single query text. |
embed_documents |
Embed search docs. |
embed_query |
Embed query text. |
__init__
¶
__init__(
model_id: str,
*,
input_field: str = "text_field",
client: AsyncElasticsearch | None = None,
es_url: str | None = None,
es_cloud_id: str | None = None,
es_user: str | None = None,
es_api_key: str | None = None,
es_password: str | None = None,
)
Initialize the ElasticsearchEmbeddings instance.
| PARAMETER | DESCRIPTION |
|---|---|
model_id
|
The model_id of the model deployed in the Elasticsearch cluster.
TYPE:
|
input_field
|
The name of the key for the input text field in the document. Defaults to 'text_field'.
TYPE:
|
client
|
Pre-existing Elasticsearch connection. Either provide this OR credentials.
TYPE:
|
es_url
|
URL of the Elasticsearch instance to connect to.
TYPE:
|
es_cloud_id
|
Cloud ID of the Elasticsearch instance.
TYPE:
|
es_user
|
Username to use when connecting to Elasticsearch.
TYPE:
|
es_api_key
|
API key to use when connecting to Elasticsearch.
TYPE:
|
es_password
|
Password to use when connecting to Elasticsearch.
TYPE:
|
aembed_documents
async
¶
aembed_query
async
¶
embed_documents
abstractmethod
¶
AsyncElasticsearchStore
¶
Bases: VectorStore
Elasticsearch vector store.
Setup
Install langchain_elasticsearch and start Elasticsearch locally using
the start-local script.
This will create an elastic-start-local folder. To start Elasticsearch
and Kibana:
Elasticsearch will be available at http://localhost:9200. The password
for the elastic user and API key are stored in the .env file in the
elastic-start-local folder.
Key init args — indexing params: index_name: str Name of the index to create. embedding: Embeddings Embedding function to use. custom_index_settings: Optional[Dict[str, Any]] A dictionary of custom settings for the index. This can include configurations like the number of shards, number of replicas, analysis settings, and other index-specific settings. If not provided, default settings will be used. Note that if the same setting is provided by both the user and the strategy, will raise an error.
Key init args — client params: client: Optional[Elasticsearch or AsyncElasticsearch] Pre-existing Elasticsearch connection. Either provide this OR credentials. es_url: Optional[str] URL of the Elasticsearch instance to connect to. es_cloud_id: Optional[str] Cloud ID of the Elasticsearch instance to connect to. es_user: Optional[str] Username to use when connecting to Elasticsearch. es_password: Optional[str] Password to use when connecting to Elasticsearch. es_api_key: Optional[str] API key to use when connecting to Elasticsearch. es_params: Optional[Dict[str, Any]] Additional parameters for the Elasticsearch client.
Instantiate
Instantiate with API key (URL):
from langchain_elasticsearch import ElasticsearchStore
from langchain_openai import OpenAIEmbeddings
store = ElasticsearchStore(
index_name="langchain-demo",
embedding=OpenAIEmbeddings(),
es_url="http://localhost:9200",
es_api_key="your-api-key"
)
Instantiate with username/password (URL):
from langchain_elasticsearch import ElasticsearchStore
from langchain_openai import OpenAIEmbeddings
store = ElasticsearchStore(
index_name="langchain-demo",
embedding=OpenAIEmbeddings(),
es_url="http://localhost:9200",
es_user="elastic",
es_password="password"
)
If you want to use a cloud hosted Elasticsearch instance, you can pass in the cloud_id argument instead of the es_url argument.
Instantiate from cloud (with username/password):
from langchain_elasticsearch.vectorstores import ElasticsearchStore
from langchain_openai import OpenAIEmbeddings
store = ElasticsearchStore(
embedding=OpenAIEmbeddings(),
index_name="langchain-demo",
es_cloud_id="<cloud_id>",
es_user="elastic",
es_password="<password>"
)
Instantiate from cloud (with API key):
from langchain_elasticsearch.vectorstores import ElasticsearchStore
from langchain_openai import OpenAIEmbeddings
store = ElasticsearchStore(
embedding=OpenAIEmbeddings(),
index_name="langchain-demo",
es_cloud_id="<cloud_id>",
es_api_key="your-api-key"
)
You can also connect to an existing Elasticsearch instance by passing in a pre-existing Elasticsearch connection via the client argument.
Instantiate from existing connection
from langchain_elasticsearch.vectorstores import ElasticsearchStore
from langchain_openai import OpenAIEmbeddings
from elasticsearch import Elasticsearch
client = Elasticsearch("http://localhost:9200")
store = ElasticsearchStore(
embedding=OpenAIEmbeddings(),
index_name="langchain-demo",
client=client
)
Class methods (afrom_texts, afrom_documents) accept the same connection options:
Instantiate from texts with credentials
Instantiate from texts with client
Add Documents
from langchain_core.documents import Document
document_1 = Document(page_content="foo", metadata={"baz": "bar"})
document_2 = Document(page_content="thud", metadata={"bar": "baz"})
document_3 = Document(page_content="i will be deleted :(")
documents = [document_1, document_2, document_3]
ids = ["1", "2", "3"]
vector_store.add_documents(documents=documents, ids=ids)
Search
Search with filter
Search with score
Async
from langchain_elasticsearch import AsyncElasticsearchStore
vector_store = AsyncElasticsearchStore(...)
# add documents
await vector_store.aadd_documents(documents=documents, ids=ids)
# delete documents
await vector_store.adelete(ids=["3"])
# search
results = vector_store.asimilarity_search(query="thud",k=1)
# search with score
results = await vector_store.asimilarity_search_with_score(query="qux",k=1)
for doc,score in results:
print(f"* [SIM={score:3f}] {doc.page_content} [{doc.metadata}]")
Use as Retriever:
```bash
pip install "elasticsearch[vectorstore_mmr]"
```
```python
retriever = vector_store.as_retriever(
search_type="mmr",
search_kwargs={"k": 1, "fetch_k": 2, "lambda_mult": 0.5},
)
retriever.invoke("thud")
```
```python
[Document(metadata={'bar': 'baz'}, page_content='thud')]
```
Advanced Uses:
ElasticsearchStore by default uses the ApproxRetrievalStrategy, which uses the HNSW algorithm to perform approximate nearest neighbor search. This is the fastest and most memory efficient algorithm.
If you want to use the Brute force / Exact strategy for searching vectors, you can pass in the ExactRetrievalStrategy to the ElasticsearchStore constructor.
Use ExactRetrievalStrategy
Both strategies require that you know the similarity metric you want to use when creating the index. The default is cosine similarity, but you can also use dot product or euclidean distance.
Use dot product similarity
from langchain_elasticsearch.vectorstores import ElasticsearchStore
from langchain_openai import OpenAIEmbeddings
from langchain_elasticsearch import DistanceStrategy
store = ElasticsearchStore(
"langchain-demo",
embedding=OpenAIEmbeddings(),
es_url="http://localhost:9200",
distance_strategy="DOT_PRODUCT"
)
| METHOD | DESCRIPTION |
|---|---|
asimilarity_search |
Return Elasticsearch documents most similar to query. |
amax_marginal_relevance_search |
Return docs selected using the maximal marginal relevance. |
asimilarity_search_with_score |
Return Elasticsearch documents most similar to query, along with scores. |
asimilarity_search_by_vector_with_relevance_scores |
Return Elasticsearch documents most similar to query, along with scores. |
adelete |
Delete documents from the Elasticsearch index. |
aadd_texts |
Run more texts through the embeddings and add to the store. |
aadd_embeddings |
Add the given texts and embeddings to the store. |
afrom_texts |
Construct ElasticsearchStore wrapper from raw documents. |
afrom_documents |
Construct ElasticsearchStore wrapper from documents. |
ExactRetrievalStrategy |
Used to perform brute force / exact |
ApproxRetrievalStrategy |
Used to perform approximate nearest neighbor search |
SparseVectorRetrievalStrategy |
Used to perform sparse vector search via text_expansion. |
BM25RetrievalStrategy |
Used to apply BM25 without vector search. |
add_texts |
Run more texts through the embeddings and add to the |
delete |
Delete by vector ID or other criteria. |
get_by_ids |
Get documents by their IDs. |
aget_by_ids |
Async get documents by their IDs. |
add_documents |
Add or update documents in the |
aadd_documents |
Async run more documents through the embeddings and add to the |
search |
Return docs most similar to query using a specified search type. |
asearch |
Async return docs most similar to query using a specified search type. |
similarity_search |
Return docs most similar to query. |
similarity_search_with_score |
Run similarity search with distance. |
similarity_search_with_relevance_scores |
Return docs and relevance scores in the range |
asimilarity_search_with_relevance_scores |
Async return docs and relevance scores in the range |
similarity_search_by_vector |
Return docs most similar to embedding vector. |
asimilarity_search_by_vector |
Async return docs most similar to embedding vector. |
max_marginal_relevance_search |
Return docs selected using the maximal marginal relevance. |
max_marginal_relevance_search_by_vector |
Return docs selected using the maximal marginal relevance. |
amax_marginal_relevance_search_by_vector |
Async return docs selected using the maximal marginal relevance. |
from_documents |
Return |
from_texts |
Return |
as_retriever |
Return |
asimilarity_search
async
¶
asimilarity_search(
query: str,
k: int = 4,
fetch_k: int = 50,
filter: list[dict] | None = None,
*,
custom_query: Callable[[Dict[str, Any], Optional[str]], dict[str, Any]]
| None = None,
doc_builder: Callable[[Dict], Document] | None = None,
**kwargs: Any,
) -> list[Document]
Return Elasticsearch documents most similar to query.
| PARAMETER | DESCRIPTION |
|---|---|
query
|
Text to look up documents similar to.
TYPE:
|
k
|
Number of Documents to return. Defaults to 4.
TYPE:
|
fetch_k
|
Number of Documents to fetch to pass to knn num_candidates.
TYPE:
|
filter
|
Array of Elasticsearch filter clauses to apply to the query. |
| RETURNS | DESCRIPTION |
|---|---|
list[Document]
|
List of Documents most similar to the query, |
list[Document]
|
in descending order of similarity. |
amax_marginal_relevance_search
async
¶
amax_marginal_relevance_search(
query: str,
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
fields: list[str] | None = None,
*,
custom_query: Callable[[Dict[str, Any], Optional[str]], dict[str, Any]]
| None = None,
doc_builder: Callable[[Dict], Document] | None = None,
**kwargs: Any,
) -> list[Document]
Return docs selected using the maximal marginal relevance.
Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents.
| PARAMETER | DESCRIPTION |
|---|---|
query
|
Text to look up documents similar to.
TYPE:
|
k
|
Number of Documents to return. Defaults to 4.
TYPE:
|
fetch_k
|
Number of Documents to fetch to pass to MMR algorithm.
TYPE:
|
lambda_mult
|
Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5.
TYPE:
|
fields
|
Other fields to get from elasticsearch source. These fields will be added to the document metadata. |
| RETURNS | DESCRIPTION |
|---|---|
list[Document]
|
List[Document]: A list of Documents selected by maximal marginal relevance. |
asimilarity_search_with_score
async
¶
asimilarity_search_with_score(
query: str,
k: int = 4,
filter: list[dict] | None = None,
*,
custom_query: Callable[[Dict[str, Any], Optional[str]], dict[str, Any]]
| None = None,
doc_builder: Callable[[Dict], Document] | None = None,
**kwargs: Any,
) -> list[tuple[Document, float]]
Return Elasticsearch documents most similar to query, along with scores.
| PARAMETER | DESCRIPTION |
|---|---|
query
|
Text to look up documents similar to.
TYPE:
|
k
|
Number of Documents to return. Defaults to 4.
TYPE:
|
filter
|
Array of Elasticsearch filter clauses to apply to the query. |
| RETURNS | DESCRIPTION |
|---|---|
list[tuple[Document, float]]
|
List of Documents most similar to the query and score for each |
asimilarity_search_by_vector_with_relevance_scores
async
¶
asimilarity_search_by_vector_with_relevance_scores(
embedding: list[float],
k: int = 4,
filter: list[dict] | None = None,
*,
custom_query: Callable[[Dict[str, Any], Optional[str]], dict[str, Any]]
| None = None,
doc_builder: Callable[[Dict], Document] | None = None,
**kwargs: Any,
) -> list[tuple[Document, float]]
Return Elasticsearch documents most similar to query, along with scores.
| PARAMETER | DESCRIPTION |
|---|---|
embedding
|
Embedding to look up documents similar to. |
k
|
Number of Documents to return. Defaults to 4.
TYPE:
|
filter
|
Array of Elasticsearch filter clauses to apply to the query. |
| RETURNS | DESCRIPTION |
|---|---|
list[tuple[Document, float]]
|
List of Documents most similar to the embedding and score for each |
adelete
async
¶
aadd_texts
async
¶
aadd_texts(
texts: Iterable[str],
metadatas: list[dict[Any, Any]] | None = None,
ids: list[str] | None = None,
refresh_indices: bool = True,
create_index_if_not_exists: bool = True,
bulk_kwargs: dict | None = None,
**kwargs: Any,
) -> list[str]
Run more texts through the embeddings and add to the store.
| PARAMETER | DESCRIPTION |
|---|---|
texts
|
Iterable of strings to add to the store. |
metadatas
|
Optional list of metadatas associated with the texts. |
ids
|
Optional list of ids to associate with the texts. |
refresh_indices
|
Whether to refresh the Elasticsearch indices after adding the texts.
TYPE:
|
create_index_if_not_exists
|
Whether to create the Elasticsearch index if it doesn't already exist.
TYPE:
|
*bulk_kwargs
|
Additional arguments to pass to Elasticsearch bulk. - chunk_size: Optional. Number of texts to add to the index at a time. Defaults to 500.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[str]
|
List of ids from adding the texts into the store. |
aadd_embeddings
async
¶
aadd_embeddings(
text_embeddings: Iterable[tuple[str, list[float]]],
metadatas: list[dict] | None = None,
ids: list[str] | None = None,
refresh_indices: bool = True,
create_index_if_not_exists: bool = True,
bulk_kwargs: dict | None = None,
**kwargs: Any,
) -> list[str]
Add the given texts and embeddings to the store.
| PARAMETER | DESCRIPTION |
|---|---|
text_embeddings
|
Iterable pairs of string and embedding to add to the store. |
metadatas
|
Optional list of metadatas associated with the texts. |
ids
|
Optional list of unique IDs. |
refresh_indices
|
Whether to refresh the Elasticsearch indices after adding the texts.
TYPE:
|
create_index_if_not_exists
|
Whether to create the Elasticsearch index if it doesn't already exist.
TYPE:
|
*bulk_kwargs
|
Additional arguments to pass to Elasticsearch bulk. - chunk_size: Optional. Number of texts to add to the index at a time. Defaults to 500.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[str]
|
List of ids from adding the texts into the store. |
afrom_texts
async
classmethod
¶
afrom_texts(
texts: list[str],
embedding: Embeddings | None = None,
metadatas: list[dict[str, Any]] | None = None,
bulk_kwargs: dict | None = None,
client: AsyncElasticsearch | None = None,
**kwargs: Any,
) -> AsyncElasticsearchStore
Construct ElasticsearchStore wrapper from raw documents.
Example
from langchain_elasticsearch.vectorstores import ElasticsearchStore
from langchain_openai import OpenAIEmbeddings
db = ElasticsearchStore.from_texts(
texts,
// embeddings optional if using
// a strategy that doesn't require inference
embeddings,
index_name="langchain-demo",
es_url="http://localhost:9200"
)
| PARAMETER | DESCRIPTION |
|---|---|
texts
|
List of texts to add to the Elasticsearch index. |
embedding
|
Embedding function to use to embed the texts.
TYPE:
|
metadatas
|
Optional list of metadatas associated with the texts. |
bulk_kwargs
|
Optional. Additional arguments to pass to Elasticsearch bulk.
TYPE:
|
client
|
Optional pre-existing client connection. Alternatively, provide credentials (
TYPE:
|
**kwargs
|
Additional keyword arguments passed to the constructor. See
TYPE:
|
afrom_documents
async
classmethod
¶
afrom_documents(
documents: list[Document],
embedding: Embeddings | None = None,
bulk_kwargs: dict | None = None,
client: AsyncElasticsearch | None = None,
**kwargs: Any,
) -> AsyncElasticsearchStore
Construct ElasticsearchStore wrapper from documents.
Example
| PARAMETER | DESCRIPTION |
|---|---|
documents
|
List of documents to add to the Elasticsearch index. |
embedding
|
Embedding function to use to embed the texts. Do not provide if using a strategy that doesn't require inference.
TYPE:
|
bulk_kwargs
|
Optional. Additional arguments to pass to Elasticsearch bulk.
TYPE:
|
client
|
Optional pre-existing client connection. Alternatively, provide credentials (
TYPE:
|
**kwargs
|
Additional keyword arguments passed to the constructor. See
TYPE:
|
ExactRetrievalStrategy
staticmethod
¶
ExactRetrievalStrategy() -> ExactRetrievalStrategy
Used to perform brute force / exact nearest neighbor search via script_score.
ApproxRetrievalStrategy
staticmethod
¶
ApproxRetrievalStrategy(
query_model_id: str | None = None,
hybrid: bool | None = False,
rrf: dict | bool | None = True,
) -> ApproxRetrievalStrategy
Used to perform approximate nearest neighbor search using the HNSW algorithm.
At build index time, this strategy will create a dense vector field in the index and store the embedding vectors in the index.
At query time, the text will either be embedded using the provided embedding function or the query_model_id will be used to embed the text using the model deployed to Elasticsearch.
if query_model_id is used, do not provide an embedding function.
| PARAMETER | DESCRIPTION |
|---|---|
query_model_id
|
Optional. ID of the model to use to embed the query text within the stack. Requires embedding model to be deployed to Elasticsearch.
TYPE:
|
hybrid
|
Optional. If True, will perform a hybrid search using both the knn query and a text query. Defaults to False.
TYPE:
|
rrf
|
Optional. rrf is Reciprocal Rank Fusion.
When |
SparseVectorRetrievalStrategy
staticmethod
¶
SparseVectorRetrievalStrategy(model_id: str | None = None) -> SparseRetrievalStrategy
Used to perform sparse vector search via text_expansion. Used for when you want to use ELSER model to perform document search.
At build index time, this strategy will create a pipeline that will embed the text using the ELSER model and store the resulting tokens in the index.
At query time, the text will be embedded using the ELSER model and the resulting tokens will be used to perform a text_expansion query.
| PARAMETER | DESCRIPTION |
|---|---|
model_id
|
Optional. Default is ".elser_model_1". ID of the model to use to embed the query text within the stack. Requires embedding model to be deployed to Elasticsearch.
TYPE:
|
BM25RetrievalStrategy
staticmethod
¶
BM25RetrievalStrategy(
k1: float | None = None, b: float | None = None
) -> BM25RetrievalStrategy
Used to apply BM25 without vector search.
| PARAMETER | DESCRIPTION |
|---|---|
k1
|
Optional. This corresponds to the BM25 parameter, k1. Default is None, which uses the default setting of Elasticsearch.
TYPE:
|
b
|
Optional. This corresponds to the BM25 parameter, b. Default is None, which uses the default setting of Elasticsearch.
TYPE:
|
add_texts
¶
add_texts(
texts: Iterable[str],
metadatas: list[dict] | None = None,
*,
ids: list[str] | None = None,
**kwargs: Any,
) -> list[str]
Run more texts through the embeddings and add to the VectorStore.
| PARAMETER | DESCRIPTION |
|---|---|
texts
|
Iterable of strings to add to the |
metadatas
|
Optional list of metadatas associated with the texts. |
ids
|
Optional list of IDs associated with the texts. |
**kwargs
|
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[str]
|
List of IDs from adding the texts into the |
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If the number of metadatas does not match the number of texts. |
ValueError
|
If the number of IDs does not match the number of texts. |
delete
¶
get_by_ids
¶
Get documents by their IDs.
The returned documents are expected to have the ID field set to the ID of the document in the vector store.
Fewer documents may be returned than requested if some IDs are not found or if there are duplicated IDs.
Users should not assume that the order of the returned documents matches the order of the input IDs. Instead, users should rely on the ID field of the returned documents.
This method should NOT raise exceptions if no documents are found for some IDs.
| PARAMETER | DESCRIPTION |
|---|---|
ids
|
List of IDs to retrieve. |
| RETURNS | DESCRIPTION |
|---|---|
list[Document]
|
List of |
aget_by_ids
async
¶
Async get documents by their IDs.
The returned documents are expected to have the ID field set to the ID of the document in the vector store.
Fewer documents may be returned than requested if some IDs are not found or if there are duplicated IDs.
Users should not assume that the order of the returned documents matches the order of the input IDs. Instead, users should rely on the ID field of the returned documents.
This method should NOT raise exceptions if no documents are found for some IDs.
| PARAMETER | DESCRIPTION |
|---|---|
ids
|
List of IDs to retrieve. |
| RETURNS | DESCRIPTION |
|---|---|
list[Document]
|
List of |
add_documents
¶
Add or update documents in the VectorStore.
| PARAMETER | DESCRIPTION |
|---|---|
documents
|
Documents to add to the |
**kwargs
|
Additional keyword arguments. If kwargs contains IDs and documents contain ids, the IDs in the kwargs will receive precedence.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[str]
|
List of IDs of the added texts. |
aadd_documents
async
¶
search
¶
Return docs most similar to query using a specified search type.
| PARAMETER | DESCRIPTION |
|---|---|
query
|
Input text.
TYPE:
|
search_type
|
Type of search to perform. Can be
TYPE:
|
**kwargs
|
Arguments to pass to the search method.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[Document]
|
List of |
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If |
asearch
async
¶
Async return docs most similar to query using a specified search type.
| PARAMETER | DESCRIPTION |
|---|---|
query
|
Input text.
TYPE:
|
search_type
|
Type of search to perform. Can be
TYPE:
|
**kwargs
|
Arguments to pass to the search method.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[Document]
|
List of |
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If |
similarity_search
abstractmethod
¶
Return docs most similar to query.
| PARAMETER | DESCRIPTION |
|---|---|
query
|
Input text.
TYPE:
|
k
|
Number of
TYPE:
|
**kwargs
|
Arguments to pass to the search method.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[Document]
|
List of |
similarity_search_with_score
¶
similarity_search_with_relevance_scores
¶
similarity_search_with_relevance_scores(
query: str, k: int = 4, **kwargs: Any
) -> list[tuple[Document, float]]
Return docs and relevance scores in the range [0, 1].
0 is dissimilar, 1 is most similar.
| PARAMETER | DESCRIPTION |
|---|---|
query
|
Input text.
TYPE:
|
k
|
Number of
TYPE:
|
**kwargs
|
Kwargs to be passed to similarity search. Should include
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[tuple[Document, float]]
|
List of tuples of |
asimilarity_search_with_relevance_scores
async
¶
asimilarity_search_with_relevance_scores(
query: str, k: int = 4, **kwargs: Any
) -> list[tuple[Document, float]]
Async return docs and relevance scores in the range [0, 1].
0 is dissimilar, 1 is most similar.
| PARAMETER | DESCRIPTION |
|---|---|
query
|
Input text.
TYPE:
|
k
|
Number of
TYPE:
|
**kwargs
|
Kwargs to be passed to similarity search. Should include
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[tuple[Document, float]]
|
List of tuples of |
similarity_search_by_vector
¶
Return docs most similar to embedding vector.
| PARAMETER | DESCRIPTION |
|---|---|
embedding
|
Embedding to look up documents similar to. |
k
|
Number of
TYPE:
|
**kwargs
|
Arguments to pass to the search method.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[Document]
|
List of |
asimilarity_search_by_vector
async
¶
Async return docs most similar to embedding vector.
| PARAMETER | DESCRIPTION |
|---|---|
embedding
|
Embedding to look up documents similar to. |
k
|
Number of
TYPE:
|
**kwargs
|
Arguments to pass to the search method.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[Document]
|
List of |
max_marginal_relevance_search
¶
max_marginal_relevance_search(
query: str, k: int = 4, fetch_k: int = 20, lambda_mult: float = 0.5, **kwargs: Any
) -> list[Document]
Return docs selected using the maximal marginal relevance.
Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents.
| PARAMETER | DESCRIPTION |
|---|---|
query
|
Text to look up documents similar to.
TYPE:
|
k
|
Number of
TYPE:
|
fetch_k
|
Number of
TYPE:
|
lambda_mult
|
Number between
TYPE:
|
**kwargs
|
Arguments to pass to the search method.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[Document]
|
List of |
max_marginal_relevance_search_by_vector
¶
max_marginal_relevance_search_by_vector(
embedding: list[float],
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
**kwargs: Any,
) -> list[Document]
Return docs selected using the maximal marginal relevance.
Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents.
| PARAMETER | DESCRIPTION |
|---|---|
embedding
|
Embedding to look up documents similar to. |
k
|
Number of
TYPE:
|
fetch_k
|
Number of
TYPE:
|
lambda_mult
|
Number between
TYPE:
|
**kwargs
|
Arguments to pass to the search method.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[Document]
|
List of |
amax_marginal_relevance_search_by_vector
async
¶
amax_marginal_relevance_search_by_vector(
embedding: list[float],
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
**kwargs: Any,
) -> list[Document]
Async return docs selected using the maximal marginal relevance.
Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents.
| PARAMETER | DESCRIPTION |
|---|---|
embedding
|
Embedding to look up documents similar to. |
k
|
Number of
TYPE:
|
fetch_k
|
Number of
TYPE:
|
lambda_mult
|
Number between
TYPE:
|
**kwargs
|
Arguments to pass to the search method.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[Document]
|
List of |
from_documents
classmethod
¶
from_documents(documents: list[Document], embedding: Embeddings, **kwargs: Any) -> Self
Return VectorStore initialized from documents and embeddings.
| PARAMETER | DESCRIPTION |
|---|---|
documents
|
List of |
embedding
|
Embedding function to use.
TYPE:
|
**kwargs
|
Additional keyword arguments.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Self
|
|
from_texts
abstractmethod
classmethod
¶
from_texts(
texts: list[str],
embedding: Embeddings,
metadatas: list[dict] | None = None,
*,
ids: list[str] | None = None,
**kwargs: Any,
) -> VST
Return VectorStore initialized from texts and embeddings.
| PARAMETER | DESCRIPTION |
|---|---|
texts
|
Texts to add to the |
embedding
|
Embedding function to use.
TYPE:
|
metadatas
|
Optional list of metadatas associated with the texts. |
ids
|
Optional list of IDs associated with the texts. |
**kwargs
|
Additional keyword arguments.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
VST
|
|
as_retriever
¶
as_retriever(**kwargs: Any) -> VectorStoreRetriever
Return VectorStoreRetriever initialized from this VectorStore.
| PARAMETER | DESCRIPTION |
|---|---|
**kwargs
|
Keyword arguments to pass to the search function. Can include:
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
VectorStoreRetriever
|
Retriever class for |
Examples:
# Retrieve more documents with higher diversity
# Useful if your dataset has many similar documents
docsearch.as_retriever(
search_type="mmr", search_kwargs={"k": 6, "lambda_mult": 0.25}
)
# Fetch more documents for the MMR algorithm to consider
# But only return the top 5
docsearch.as_retriever(search_type="mmr", search_kwargs={"k": 5, "fetch_k": 50})
# Only retrieve documents that have a relevance score
# Above a certain threshold
docsearch.as_retriever(
search_type="similarity_score_threshold",
search_kwargs={"score_threshold": 0.8},
)
# Only get the single most similar document from the dataset
docsearch.as_retriever(search_kwargs={"k": 1})
# Use a filter to only retrieve documents from a specific paper
docsearch.as_retriever(
search_kwargs={"filter": {"paper_title": "GPT-4 Technical Report"}}
)
AsyncElasticsearchRetriever
¶
Bases: BaseRetriever
Elasticsearch retriever.
Setup:
Install `langchain_elasticsearch` and start Elasticsearch locally using
the start-local script.
```bash
pip install -qU langchain_elasticsearch
curl -fsSL https://elastic.co/start-local | sh
```
This will create an `elastic-start-local` folder. To start Elasticsearch
and Kibana:
```bash
cd elastic-start-local
./start.sh
```
Elasticsearch will be available at `http://localhost:9200`. The password
for the `elastic` user and API key are stored in the `.env` file in the
`elastic-start-local` folder.
Key init args — query params:
index_name: Union[str, Sequence[str]]
The name of the index to query. Can also be a list of names.
body_func: Callable[[str], Dict]
Function to create an Elasticsearch DSL query body from a search string.
The returned query body must fit what you would normally send in a POST
request to the _search endpoint. If applicable, it also includes parameters
like the `size` parameter etc.
content_field: Optional[Union[str, Mapping[str, str]]]
The document field name that contains the page content. If multiple indices
are queried, specify a dict {index_name: field_name} here.
document_mapper: Optional[Callable[[Mapping], Document]]
Function to map Elasticsearch hits to LangChain Documents. If not provided,
will be automatically created based on content_field.
Key init args — client params:
client: Optional[AsyncElasticsearch or Elasticsearch]
Pre-existing Elasticsearch connection. Either provide this OR credentials.
es_url: Optional[str]
URL of the Elasticsearch instance to connect to.
es_cloud_id: Optional[str]
Cloud ID of the Elasticsearch instance to connect to.
es_user: Optional[str]
Username to use when connecting to Elasticsearch.
es_api_key: Optional[str]
API key to use when connecting to Elasticsearch.
es_password: Optional[str]
Password to use when connecting to Elasticsearch.
Instantiate:
```python
from langchain_elasticsearch import ElasticsearchRetriever
def body_func(query: str) -> dict:
return {"query": {"match": {"text": {"query": query}}}}
retriever = ElasticsearchRetriever(
index_name="langchain-demo",
body_func=body_func,
content_field="text",
es_url="http://localhost:9200",
)
```
Instantiate with API key (URL):
```python
from langchain_elasticsearch import ElasticsearchRetriever
def body_func(query: str) -> dict:
return {"query": {"match": {"text": {"query": query}}}}
retriever = ElasticsearchRetriever(
index_name="langchain-demo",
body_func=body_func,
content_field="text",
es_url="http://localhost:9200",
es_api_key="your-api-key"
)
```
Instantiate with username/password (URL):
```python
from langchain_elasticsearch import ElasticsearchRetriever
def body_func(query: str) -> dict:
return {"query": {"match": {"text": {"query": query}}}}
retriever = ElasticsearchRetriever(
index_name="langchain-demo",
body_func=body_func,
content_field="text",
es_url="http://localhost:9200",
es_user="elastic",
es_password="password"
)
```
If you want to use a cloud hosted Elasticsearch instance, you can pass in the
es_cloud_id argument instead of the es_url argument.
Instantiate from cloud (with username/password):
```python
from langchain_elasticsearch import ElasticsearchRetriever
def body_func(query: str) -> dict:
return {"query": {"match": {"text": {"query": query}}}}
retriever = ElasticsearchRetriever(
index_name="langchain-demo",
body_func=body_func,
content_field="text",
es_cloud_id="<cloud_id>",
es_user="elastic",
es_password="<password>"
)
```
Instantiate from cloud (with API key):
```python
from langchain_elasticsearch import ElasticsearchRetriever
def body_func(query: str) -> dict:
return {"query": {"match": {"text": {"query": query}}}}
retriever = ElasticsearchRetriever(
index_name="langchain-demo",
body_func=body_func,
content_field="text",
es_cloud_id="<cloud_id>",
es_api_key="your-api-key"
)
```
You can also connect to an existing Elasticsearch instance by passing in a
pre-existing Elasticsearch connection via the client argument.
Instantiate from existing connection:
```python
from langchain_elasticsearch import ElasticsearchRetriever
from elasticsearch import Elasticsearch
def body_func(query: str) -> dict:
return {"query": {"match": {"text": {"query": query}}}}
client = Elasticsearch("http://localhost:9200")
retriever = ElasticsearchRetriever(
index_name="langchain-demo",
body_func=body_func,
content_field="text",
client=client
)
```
Retrieve documents:
Note: Use `invoke()` or `ainvoke()` instead of the deprecated
`get_relevant_documents()` or `aget_relevant_documents()` methods.
First, index some documents:
```python
from elasticsearch import Elasticsearch
client = Elasticsearch("http://localhost:9200")
# Index sample documents
client.index(
index="some-index",
document={"text": "The quick brown fox jumps over the lazy dog"},
id="1",
refresh=True
)
client.index(
index="some-index",
document={"text": "Python is a popular programming language"},
id="2",
refresh=True
)
client.index(
index="some-index",
document={"text": "Elasticsearch is a search engine"},
id="3",
refresh=True
)
```
Then retrieve documents:
```python
from langchain_elasticsearch import ElasticsearchRetriever
def body_func(query: str) -> dict:
return {"query": {"match": {"text": {"query": query}}}}
retriever = ElasticsearchRetriever(
index_name="some-index",
body_func=body_func,
content_field="text",
es_url="http://localhost:9200"
)
# Retrieve documents
documents = retriever.invoke("Python")
for doc in documents:
print(f"* {doc.page_content}")
```
```python
* Python is a popular programming language
```
Use custom document mapper:
```python
from langchain_elasticsearch import ElasticsearchRetriever
from langchain_core.documents import Document
from elasticsearch import Elasticsearch
from typing import Mapping, Any
def body_func(query: str) -> dict:
return {"query": {"match": {"custom_field": {"query": query}}}}
def custom_mapper(hit: Mapping[str, Any]) -> Document:
# Custom logic to extract content and metadata
return Document(
page_content=hit["_source"]["custom_field"],
metadata={"score": hit["_score"]}
)
client = Elasticsearch("http://localhost:9200")
retriever = ElasticsearchRetriever(
index_name="langchain-demo",
body_func=body_func,
document_mapper=custom_mapper,
client=client
)
```
Use with multiple indices:
```python
from langchain_elasticsearch import ElasticsearchRetriever
from elasticsearch import Elasticsearch
def body_func(query: str) -> dict:
return {
"query": {
"multi_match": {
"query": query,
"fields": ["text_field_1", "text_field_2"]
}
}
}
client = Elasticsearch("http://localhost:9200")
retriever = ElasticsearchRetriever(
index_name=["index1", "index2"],
body_func=body_func,
content_field={
"index1": "text_field_1",
"index2": "text_field_2"
},
client=client
)
```
Use as LangChain retriever in chains:
Note: Before running this example, ensure you have indexed documents
in your Elasticsearch index. The retriever will search this index
for relevant documents to use as context.
```python
from langchain_elasticsearch import ElasticsearchRetriever
from langchain_core.runnables import RunnablePassthrough
from langchain_core.prompts import ChatPromptTemplate
from langchain_ollama import ChatOllama
# ElasticsearchRetriever is already a BaseRetriever
retriever = ElasticsearchRetriever(
index_name="some-index",
body_func=lambda q: {"query": {"match": {"text": {"query": q}}}},
content_field="text",
es_url="http://localhost:9200"
)
llm = ChatOllama(model="llama3", temperature=0)
# Create a chain that retrieves documents and then generates a response
def format_docs(docs):
# Format documents for the prompt
return "
".join(doc.page_content for doc in docs)
system_prompt = (
"You are an assistant for question-answering tasks. "
"Use the following pieces of retrieved context to answer "
"the question. If you don't know the answer, say that you "
"don't know. Use three sentences maximum and keep the "
"answer concise."
"
" "Context: {context}" )
prompt = ChatPromptTemplate.from_messages([
("system", system_prompt),
("human", "{question}"),
])
chain = (
{"context": retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| llm
)
result = chain.invoke("what is the answer to this question?")
```
For synchronous applications, use the `ElasticsearchRetriever` class.
For asynchronous applications, use the `AsyncElasticsearchRetriever` class.
| METHOD | DESCRIPTION |
|---|---|
__init__ |
|
get_name |
Get the name of the |
get_input_schema |
Get a Pydantic model that can be used to validate input to the |
get_input_jsonschema |
Get a JSON schema that represents the input to the |
get_output_schema |
Get a Pydantic model that can be used to validate output to the |
get_output_jsonschema |
Get a JSON schema that represents the output of the |
config_schema |
The type of config this |
get_config_jsonschema |
Get a JSON schema that represents the config of the |
get_graph |
Return a graph representation of this |
get_prompts |
Return a list of prompts used by this |
__or__ |
Runnable "or" operator. |
__ror__ |
Runnable "reverse-or" operator. |
pipe |
Pipe |
pick |
Pick keys from the output |
assign |
Assigns new fields to the |
invoke |
Invoke the retriever to get relevant documents. |
ainvoke |
Asynchronously invoke the retriever to get relevant documents. |
batch |
Default implementation runs invoke in parallel using a thread pool executor. |
batch_as_completed |
Run |
abatch |
Default implementation runs |
abatch_as_completed |
Run |
stream |
Default implementation of |
astream |
Default implementation of |
astream_log |
Stream all output from a |
astream_events |
Generate a stream of events. |
transform |
Transform inputs to outputs. |
atransform |
Transform inputs to outputs. |
bind |
Bind arguments to a |
with_config |
Bind config to a |
with_listeners |
Bind lifecycle listeners to a |
with_alisteners |
Bind async lifecycle listeners to a |
with_types |
Bind input and output types to a |
with_retry |
Create a new |
map |
Return a new |
with_fallbacks |
Add fallbacks to a |
as_tool |
Create a |
is_lc_serializable |
Is this class serializable? |
get_lc_namespace |
Get the namespace of the LangChain object. |
lc_id |
Return a unique identifier for this class for serialization purposes. |
to_json |
Serialize the |
to_json_not_implemented |
Serialize a "not implemented" object. |
configurable_fields |
Configure particular |
configurable_alternatives |
Configure alternatives for |
name
class-attribute
instance-attribute
¶
name: str | None = None
The name of the Runnable. Used for debugging and tracing.
InputType
property
¶
InputType: type[Input]
Input type.
The type of input this Runnable accepts specified as a type annotation.
| RAISES | DESCRIPTION |
|---|---|
TypeError
|
If the input type cannot be inferred. |
OutputType
property
¶
OutputType: type[Output]
Output Type.
The type of output this Runnable produces specified as a type annotation.
| RAISES | DESCRIPTION |
|---|---|
TypeError
|
If the output type cannot be inferred. |
input_schema
property
¶
The type of input this Runnable accepts specified as a Pydantic model.
output_schema
property
¶
Output schema.
The type of output this Runnable produces specified as a Pydantic model.
config_specs
property
¶
config_specs: list[ConfigurableFieldSpec]
List configurable fields for this Runnable.
lc_secrets
property
¶
A map of constructor argument names to secret ids.
For example, {"openai_api_key": "OPENAI_API_KEY"}
lc_attributes
property
¶
lc_attributes: dict
List of attribute names that should be included in the serialized kwargs.
These attributes must be accepted by the constructor.
Default is an empty dictionary.
tags
class-attribute
instance-attribute
¶
Optional list of tags associated with the retriever.
These tags will be associated with each call to this retriever,
and passed as arguments to the handlers defined in callbacks.
You can use these to eg identify a specific instance of a retriever with its use case.
metadata
class-attribute
instance-attribute
¶
Optional metadata associated with the retriever.
This metadata will be associated with each call to this retriever,
and passed as arguments to the handlers defined in callbacks.
You can use these to eg identify a specific instance of a retriever with its use case.
__init__
¶
__init__(
index_name: str | Sequence[str],
body_func: Callable[[str], dict],
*,
content_field: str | Mapping[str, str] | None = None,
document_mapper: Callable[[Mapping], Document] | None = None,
client: AsyncElasticsearch | None = None,
es_url: str | None = None,
es_cloud_id: str | None = None,
es_user: str | None = None,
es_api_key: str | None = None,
es_password: str | None = None,
) -> None
get_name
¶
get_input_schema
¶
get_input_schema(config: RunnableConfig | None = None) -> type[BaseModel]
Get a Pydantic model that can be used to validate input to the Runnable.
Runnable objects that leverage the configurable_fields and
configurable_alternatives methods will have a dynamic input schema that
depends on which configuration the Runnable is invoked with.
This method allows to get an input schema for a specific configuration.
| PARAMETER | DESCRIPTION |
|---|---|
config
|
A config to use when generating the schema.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
type[BaseModel]
|
A Pydantic model that can be used to validate input. |
get_input_jsonschema
¶
get_input_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
Get a JSON schema that represents the input to the Runnable.
| PARAMETER | DESCRIPTION |
|---|---|
config
|
A config to use when generating the schema.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
dict[str, Any]
|
A JSON schema that represents the input to the |
Example
Added in langchain-core 0.3.0
get_output_schema
¶
get_output_schema(config: RunnableConfig | None = None) -> type[BaseModel]
Get a Pydantic model that can be used to validate output to the Runnable.
Runnable objects that leverage the configurable_fields and
configurable_alternatives methods will have a dynamic output schema that
depends on which configuration the Runnable is invoked with.
This method allows to get an output schema for a specific configuration.
| PARAMETER | DESCRIPTION |
|---|---|
config
|
A config to use when generating the schema.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
type[BaseModel]
|
A Pydantic model that can be used to validate output. |
get_output_jsonschema
¶
get_output_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
Get a JSON schema that represents the output of the Runnable.
| PARAMETER | DESCRIPTION |
|---|---|
config
|
A config to use when generating the schema.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
dict[str, Any]
|
A JSON schema that represents the output of the |
Example
Added in langchain-core 0.3.0
config_schema
¶
The type of config this Runnable accepts specified as a Pydantic model.
To mark a field as configurable, see the configurable_fields
and configurable_alternatives methods.
| PARAMETER | DESCRIPTION |
|---|---|
include
|
A list of fields to include in the config schema. |
| RETURNS | DESCRIPTION |
|---|---|
type[BaseModel]
|
A Pydantic model that can be used to validate config. |
get_config_jsonschema
¶
get_graph
¶
get_graph(config: RunnableConfig | None = None) -> Graph
Return a graph representation of this Runnable.
get_prompts
¶
get_prompts(config: RunnableConfig | None = None) -> list[BasePromptTemplate]
Return a list of prompts used by this Runnable.
__or__
¶
__or__(
other: Runnable[Any, Other]
| Callable[[Iterator[Any]], Iterator[Other]]
| Callable[[AsyncIterator[Any]], AsyncIterator[Other]]
| Callable[[Any], Other]
| Mapping[str, Runnable[Any, Other] | Callable[[Any], Other] | Any],
) -> RunnableSerializable[Input, Other]
Runnable "or" operator.
Compose this Runnable with another object to create a
RunnableSequence.
| PARAMETER | DESCRIPTION |
|---|---|
other
|
Another
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
RunnableSerializable[Input, Other]
|
A new |
__ror__
¶
__ror__(
other: Runnable[Other, Any]
| Callable[[Iterator[Other]], Iterator[Any]]
| Callable[[AsyncIterator[Other]], AsyncIterator[Any]]
| Callable[[Other], Any]
| Mapping[str, Runnable[Other, Any] | Callable[[Other], Any] | Any],
) -> RunnableSerializable[Other, Output]
Runnable "reverse-or" operator.
Compose this Runnable with another object to create a
RunnableSequence.
| PARAMETER | DESCRIPTION |
|---|---|
other
|
Another
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
RunnableSerializable[Other, Output]
|
A new |
pipe
¶
pipe(
*others: Runnable[Any, Other] | Callable[[Any], Other], name: str | None = None
) -> RunnableSerializable[Input, Other]
Pipe Runnable objects.
Compose this Runnable with Runnable-like objects to make a
RunnableSequence.
Equivalent to RunnableSequence(self, *others) or self | others[0] | ...
Example
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
return x + 1
def mul_two(x: int) -> int:
return x * 2
runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1.pipe(runnable_2)
# Or equivalently:
# sequence = runnable_1 | runnable_2
# sequence = RunnableSequence(first=runnable_1, last=runnable_2)
sequence.invoke(1)
await sequence.ainvoke(1)
# -> 4
sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
# -> [4, 6, 8]
| PARAMETER | DESCRIPTION |
|---|---|
*others
|
Other
TYPE:
|
name
|
An optional name for the resulting
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
RunnableSerializable[Input, Other]
|
A new |
pick
¶
Pick keys from the output dict of this Runnable.
Pick a single key
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
chain = RunnableMap(str=as_str, json=as_json)
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3]}
json_only_chain = chain.pick("json")
json_only_chain.invoke("[1, 2, 3]")
# -> [1, 2, 3]
Pick a list of keys
from typing import Any
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
def as_bytes(x: Any) -> bytes:
return bytes(x, "utf-8")
chain = RunnableMap(
str=as_str, json=as_json, bytes=RunnableLambda(as_bytes)
)
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
json_and_bytes_chain = chain.pick(["json", "bytes"])
json_and_bytes_chain.invoke("[1, 2, 3]")
# -> {"json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
| PARAMETER | DESCRIPTION |
|---|---|
keys
|
A key or list of keys to pick from the output dict. |
| RETURNS | DESCRIPTION |
|---|---|
RunnableSerializable[Any, Any]
|
a new |
assign
¶
assign(
**kwargs: Runnable[dict[str, Any], Any]
| Callable[[dict[str, Any]], Any]
| Mapping[str, Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any]],
) -> RunnableSerializable[Any, Any]
Assigns new fields to the dict output of this Runnable.
from langchain_core.language_models.fake import FakeStreamingListLLM
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import SystemMessagePromptTemplate
from langchain_core.runnables import Runnable
from operator import itemgetter
prompt = (
SystemMessagePromptTemplate.from_template("You are a nice assistant.")
+ "{question}"
)
model = FakeStreamingListLLM(responses=["foo-lish"])
chain: Runnable = prompt | model | {"str": StrOutputParser()}
chain_with_assign = chain.assign(hello=itemgetter("str") | model)
print(chain_with_assign.input_schema.model_json_schema())
# {'title': 'PromptInput', 'type': 'object', 'properties':
{'question': {'title': 'Question', 'type': 'string'}}}
print(chain_with_assign.output_schema.model_json_schema())
# {'title': 'RunnableSequenceOutput', 'type': 'object', 'properties':
{'str': {'title': 'Str',
'type': 'string'}, 'hello': {'title': 'Hello', 'type': 'string'}}}
| PARAMETER | DESCRIPTION |
|---|---|
**kwargs
|
A mapping of keys to
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
RunnableSerializable[Any, Any]
|
A new |
invoke
¶
invoke(
input: str, config: RunnableConfig | None = None, **kwargs: Any
) -> list[Document]
Invoke the retriever to get relevant documents.
Main entry point for synchronous retriever invocations.
| PARAMETER | DESCRIPTION |
|---|---|
input
|
The query string.
TYPE:
|
config
|
Configuration for the retriever.
TYPE:
|
**kwargs
|
Additional arguments to pass to the retriever.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[Document]
|
List of relevant documents. |
Examples:
ainvoke
async
¶
ainvoke(
input: str, config: RunnableConfig | None = None, **kwargs: Any
) -> list[Document]
Asynchronously invoke the retriever to get relevant documents.
Main entry point for asynchronous retriever invocations.
| PARAMETER | DESCRIPTION |
|---|---|
input
|
The query string.
TYPE:
|
config
|
Configuration for the retriever.
TYPE:
|
**kwargs
|
Additional arguments to pass to the retriever.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[Document]
|
List of relevant documents. |
Examples:
batch
¶
batch(
inputs: list[Input],
config: RunnableConfig | list[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> list[Output]
Default implementation runs invoke in parallel using a thread pool executor.
The default implementation of batch works well for IO bound runnables.
Subclasses must override this method if they can batch more efficiently;
e.g., if the underlying Runnable uses an API which supports a batch mode.
| PARAMETER | DESCRIPTION |
|---|---|
inputs
|
A list of inputs to the
TYPE:
|
config
|
A config to use when invoking the Please refer to
TYPE:
|
return_exceptions
|
Whether to return exceptions instead of raising them.
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[Output]
|
A list of outputs from the |
batch_as_completed
¶
batch_as_completed(
inputs: Sequence[Input],
config: RunnableConfig | Sequence[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> Iterator[tuple[int, Output | Exception]]
Run invoke in parallel on a list of inputs.
Yields results as they complete.
| PARAMETER | DESCRIPTION |
|---|---|
inputs
|
A list of inputs to the
TYPE:
|
config
|
A config to use when invoking the The config supports standard keys like Please refer to
TYPE:
|
return_exceptions
|
Whether to return exceptions instead of raising them.
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
tuple[int, Output | Exception]
|
Tuples of the index of the input and the output from the |
abatch
async
¶
abatch(
inputs: list[Input],
config: RunnableConfig | list[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> list[Output]
Default implementation runs ainvoke in parallel using asyncio.gather.
The default implementation of batch works well for IO bound runnables.
Subclasses must override this method if they can batch more efficiently;
e.g., if the underlying Runnable uses an API which supports a batch mode.
| PARAMETER | DESCRIPTION |
|---|---|
inputs
|
A list of inputs to the
TYPE:
|
config
|
A config to use when invoking the The config supports standard keys like Please refer to
TYPE:
|
return_exceptions
|
Whether to return exceptions instead of raising them.
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[Output]
|
A list of outputs from the |
abatch_as_completed
async
¶
abatch_as_completed(
inputs: Sequence[Input],
config: RunnableConfig | Sequence[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> AsyncIterator[tuple[int, Output | Exception]]
Run ainvoke in parallel on a list of inputs.
Yields results as they complete.
| PARAMETER | DESCRIPTION |
|---|---|
inputs
|
A list of inputs to the
TYPE:
|
config
|
A config to use when invoking the The config supports standard keys like Please refer to
TYPE:
|
return_exceptions
|
Whether to return exceptions instead of raising them.
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
AsyncIterator[tuple[int, Output | Exception]]
|
A tuple of the index of the input and the output from the |
stream
¶
stream(
input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> Iterator[Output]
Default implementation of stream, which calls invoke.
Subclasses must override this method if they support streaming output.
| PARAMETER | DESCRIPTION |
|---|---|
input
|
The input to the
TYPE:
|
config
|
The config to use for the
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
Output
|
The output of the |
astream
async
¶
astream(
input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> AsyncIterator[Output]
Default implementation of astream, which calls ainvoke.
Subclasses must override this method if they support streaming output.
| PARAMETER | DESCRIPTION |
|---|---|
input
|
The input to the
TYPE:
|
config
|
The config to use for the
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
AsyncIterator[Output]
|
The output of the |
astream_log
async
¶
astream_log(
input: Any,
config: RunnableConfig | None = None,
*,
diff: bool = True,
with_streamed_output_list: bool = True,
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any,
) -> AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
Stream all output from a Runnable, as reported to the callback system.
This includes all inner runs of LLMs, Retrievers, Tools, etc.
Output is streamed as Log objects, which include a list of Jsonpatch ops that describe how the state of the run has changed in each step, and the final state of the run.
The Jsonpatch ops can be applied in order to construct state.
| PARAMETER | DESCRIPTION |
|---|---|
input
|
The input to the
TYPE:
|
config
|
The config to use for the
TYPE:
|
diff
|
Whether to yield diffs between each step or the current state.
TYPE:
|
with_streamed_output_list
|
Whether to yield the
TYPE:
|
include_names
|
Only include logs with these names. |
include_types
|
Only include logs with these types. |
include_tags
|
Only include logs with these tags. |
exclude_names
|
Exclude logs with these names. |
exclude_types
|
Exclude logs with these types. |
exclude_tags
|
Exclude logs with these tags. |
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
|
A |
astream_events
async
¶
astream_events(
input: Any,
config: RunnableConfig | None = None,
*,
version: Literal["v1", "v2"] = "v2",
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any,
) -> AsyncIterator[StreamEvent]
Generate a stream of events.
Use to create an iterator over StreamEvent that provide real-time information
about the progress of the Runnable, including StreamEvent from intermediate
results.
A StreamEvent is a dictionary with the following schema:
event: Event names are of the format:on_[runnable_type]_(start|stream|end).name: The name of theRunnablethat generated the event.run_id: Randomly generated ID associated with the given execution of theRunnablethat emitted the event. A childRunnablethat gets invoked as part of the execution of a parentRunnableis assigned its own unique ID.parent_ids: The IDs of the parent runnables that generated the event. The rootRunnablewill have an empty list. The order of the parent IDs is from the root to the immediate parent. Only available for v2 version of the API. The v1 version of the API will return an empty list.tags: The tags of theRunnablethat generated the event.metadata: The metadata of theRunnablethat generated the event.data: The data associated with the event. The contents of this field depend on the type of event. See the table below for more details.
Below is a table that illustrates some events that might be emitted by various chains. Metadata fields have been omitted from the table for brevity. Chain definitions have been included after the table.
Note
This reference table is for the v2 version of the schema.
| event | name | chunk | input | output |
|---|---|---|---|---|
on_chat_model_start |
'[model name]' |
{"messages": [[SystemMessage, HumanMessage]]} |
||
on_chat_model_stream |
'[model name]' |
AIMessageChunk(content="hello") |
||
on_chat_model_end |
'[model name]' |
{"messages": [[SystemMessage, HumanMessage]]} |
AIMessageChunk(content="hello world") |
|
on_llm_start |
'[model name]' |
{'input': 'hello'} |
||
on_llm_stream |
'[model name]' |
'Hello' |
||
on_llm_end |
'[model name]' |
'Hello human!' |
||
on_chain_start |
'format_docs' |
|||
on_chain_stream |
'format_docs' |
'hello world!, goodbye world!' |
||
on_chain_end |
'format_docs' |
[Document(...)] |
'hello world!, goodbye world!' |
|
on_tool_start |
'some_tool' |
{"x": 1, "y": "2"} |
||
on_tool_end |
'some_tool' |
{"x": 1, "y": "2"} |
||
on_retriever_start |
'[retriever name]' |
{"query": "hello"} |
||
on_retriever_end |
'[retriever name]' |
{"query": "hello"} |
[Document(...), ..] |
|
on_prompt_start |
'[template_name]' |
{"question": "hello"} |
||
on_prompt_end |
'[template_name]' |
{"question": "hello"} |
ChatPromptValue(messages: [SystemMessage, ...]) |
In addition to the standard events, users can also dispatch custom events (see example below).
Custom events will be only be surfaced with in the v2 version of the API!
A custom event has following format:
| Attribute | Type | Description |
|---|---|---|
name |
str |
A user defined name for the event. |
data |
Any |
The data associated with the event. This can be anything, though we suggest making it JSON serializable. |
Here are declarations associated with the standard events shown above:
format_docs:
def format_docs(docs: list[Document]) -> str:
'''Format the docs.'''
return ", ".join([doc.page_content for doc in docs])
format_docs = RunnableLambda(format_docs)
some_tool:
prompt:
template = ChatPromptTemplate.from_messages(
[
("system", "You are Cat Agent 007"),
("human", "{question}"),
]
).with_config({"run_name": "my_template", "tags": ["my_template"]})
Example
from langchain_core.runnables import RunnableLambda
async def reverse(s: str) -> str:
return s[::-1]
chain = RunnableLambda(func=reverse)
events = [
event async for event in chain.astream_events("hello", version="v2")
]
# Will produce the following events
# (run_id, and parent_ids has been omitted for brevity):
[
{
"data": {"input": "hello"},
"event": "on_chain_start",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"chunk": "olleh"},
"event": "on_chain_stream",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"output": "olleh"},
"event": "on_chain_end",
"metadata": {},
"name": "reverse",
"tags": [],
},
]
from langchain_core.callbacks.manager import (
adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio
async def slow_thing(some_input: str, config: RunnableConfig) -> str:
"""Do something that takes a long time."""
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 1 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 2 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
return "Done"
slow_thing = RunnableLambda(slow_thing)
async for event in slow_thing.astream_events("some_input", version="v2"):
print(event)
| PARAMETER | DESCRIPTION |
|---|---|
input
|
The input to the
TYPE:
|
config
|
The config to use for the
TYPE:
|
version
|
The version of the schema to use, either Users should use
No default will be assigned until the API is stabilized.
custom events will only be surfaced in
TYPE:
|
include_names
|
Only include events from |
include_types
|
Only include events from |
include_tags
|
Only include events from |
exclude_names
|
Exclude events from |
exclude_types
|
Exclude events from |
exclude_tags
|
Exclude events from |
**kwargs
|
Additional keyword arguments to pass to the These will be passed to
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
AsyncIterator[StreamEvent]
|
An async stream of |
| RAISES | DESCRIPTION |
|---|---|
NotImplementedError
|
If the version is not |
transform
¶
transform(
input: Iterator[Input], config: RunnableConfig | None = None, **kwargs: Any | None
) -> Iterator[Output]
Transform inputs to outputs.
Default implementation of transform, which buffers input and calls astream.
Subclasses must override this method if they can start producing output while input is still being generated.
| PARAMETER | DESCRIPTION |
|---|---|
input
|
An iterator of inputs to the
TYPE:
|
config
|
The config to use for the
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
Output
|
The output of the |
atransform
async
¶
atransform(
input: AsyncIterator[Input],
config: RunnableConfig | None = None,
**kwargs: Any | None,
) -> AsyncIterator[Output]
Transform inputs to outputs.
Default implementation of atransform, which buffers input and calls astream.
Subclasses must override this method if they can start producing output while input is still being generated.
| PARAMETER | DESCRIPTION |
|---|---|
input
|
An async iterator of inputs to the
TYPE:
|
config
|
The config to use for the
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
AsyncIterator[Output]
|
The output of the |
bind
¶
Bind arguments to a Runnable, returning a new Runnable.
Useful when a Runnable in a chain requires an argument that is not
in the output of the previous Runnable or included in the user input.
| PARAMETER | DESCRIPTION |
|---|---|
**kwargs
|
The arguments to bind to the
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Runnable[Input, Output]
|
A new |
Example
from langchain_ollama import ChatOllama
from langchain_core.output_parsers import StrOutputParser
model = ChatOllama(model="llama3.1")
# Without bind
chain = model | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two three four five.'
# With bind
chain = model.bind(stop=["three"]) | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two'
with_config
¶
with_config(
config: RunnableConfig | None = None, **kwargs: Any
) -> Runnable[Input, Output]
Bind config to a Runnable, returning a new Runnable.
| PARAMETER | DESCRIPTION |
|---|---|
config
|
The config to bind to the
TYPE:
|
**kwargs
|
Additional keyword arguments to pass to the
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Runnable[Input, Output]
|
A new |
with_listeners
¶
with_listeners(
*,
on_start: Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None = None,
on_end: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None,
on_error: Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None = None,
) -> Runnable[Input, Output]
Bind lifecycle listeners to a Runnable, returning a new Runnable.
The Run object contains information about the run, including its id,
type, input, output, error, start_time, end_time, and
any tags or metadata added to the run.
| PARAMETER | DESCRIPTION |
|---|---|
on_start
|
Called before the
TYPE:
|
on_end
|
Called after the
TYPE:
|
on_error
|
Called if the
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Runnable[Input, Output]
|
A new |
Example
from langchain_core.runnables import RunnableLambda
from langchain_core.tracers.schemas import Run
import time
def test_runnable(time_to_sleep: int):
time.sleep(time_to_sleep)
def fn_start(run_obj: Run):
print("start_time:", run_obj.start_time)
def fn_end(run_obj: Run):
print("end_time:", run_obj.end_time)
chain = RunnableLambda(test_runnable).with_listeners(
on_start=fn_start, on_end=fn_end
)
chain.invoke(2)
with_alisteners
¶
with_alisteners(
*,
on_start: AsyncListener | None = None,
on_end: AsyncListener | None = None,
on_error: AsyncListener | None = None,
) -> Runnable[Input, Output]
Bind async lifecycle listeners to a Runnable.
Returns a new Runnable.
The Run object contains information about the run, including its id,
type, input, output, error, start_time, end_time, and
any tags or metadata added to the run.
| PARAMETER | DESCRIPTION |
|---|---|
on_start
|
Called asynchronously before the
TYPE:
|
on_end
|
Called asynchronously after the
TYPE:
|
on_error
|
Called asynchronously if the
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Runnable[Input, Output]
|
A new |
Example
from langchain_core.runnables import RunnableLambda, Runnable
from datetime import datetime, timezone
import time
import asyncio
def format_t(timestamp: float) -> str:
return datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()
async def test_runnable(time_to_sleep: int):
print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}")
await asyncio.sleep(time_to_sleep)
print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}")
async def fn_start(run_obj: Runnable):
print(f"on start callback starts at {format_t(time.time())}")
await asyncio.sleep(3)
print(f"on start callback ends at {format_t(time.time())}")
async def fn_end(run_obj: Runnable):
print(f"on end callback starts at {format_t(time.time())}")
await asyncio.sleep(2)
print(f"on end callback ends at {format_t(time.time())}")
runnable = RunnableLambda(test_runnable).with_alisteners(
on_start=fn_start, on_end=fn_end
)
async def concurrent_runs():
await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3))
asyncio.run(concurrent_runs())
# Result:
# on start callback starts at 2025-03-01T07:05:22.875378+00:00
# on start callback starts at 2025-03-01T07:05:22.875495+00:00
# on start callback ends at 2025-03-01T07:05:25.878862+00:00
# on start callback ends at 2025-03-01T07:05:25.878947+00:00
# Runnable[2s]: starts at 2025-03-01T07:05:25.879392+00:00
# Runnable[3s]: starts at 2025-03-01T07:05:25.879804+00:00
# Runnable[2s]: ends at 2025-03-01T07:05:27.881998+00:00
# on end callback starts at 2025-03-01T07:05:27.882360+00:00
# Runnable[3s]: ends at 2025-03-01T07:05:28.881737+00:00
# on end callback starts at 2025-03-01T07:05:28.882428+00:00
# on end callback ends at 2025-03-01T07:05:29.883893+00:00
# on end callback ends at 2025-03-01T07:05:30.884831+00:00
with_types
¶
with_types(
*, input_type: type[Input] | None = None, output_type: type[Output] | None = None
) -> Runnable[Input, Output]
Bind input and output types to a Runnable, returning a new Runnable.
| PARAMETER | DESCRIPTION |
|---|---|
input_type
|
The input type to bind to the
TYPE:
|
output_type
|
The output type to bind to the
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Runnable[Input, Output]
|
A new |
with_retry
¶
with_retry(
*,
retry_if_exception_type: tuple[type[BaseException], ...] = (Exception,),
wait_exponential_jitter: bool = True,
exponential_jitter_params: ExponentialJitterParams | None = None,
stop_after_attempt: int = 3,
) -> Runnable[Input, Output]
Create a new Runnable that retries the original Runnable on exceptions.
| PARAMETER | DESCRIPTION |
|---|---|
retry_if_exception_type
|
A tuple of exception types to retry on.
TYPE:
|
wait_exponential_jitter
|
Whether to add jitter to the wait time between retries.
TYPE:
|
stop_after_attempt
|
The maximum number of attempts to make before giving up.
TYPE:
|
exponential_jitter_params
|
Parameters for
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Runnable[Input, Output]
|
A new |
Example
from langchain_core.runnables import RunnableLambda
count = 0
def _lambda(x: int) -> None:
global count
count = count + 1
if x == 1:
raise ValueError("x is 1")
else:
pass
runnable = RunnableLambda(_lambda)
try:
runnable.with_retry(
stop_after_attempt=2,
retry_if_exception_type=(ValueError,),
).invoke(1)
except ValueError:
pass
assert count == 2
map
¶
with_fallbacks
¶
with_fallbacks(
fallbacks: Sequence[Runnable[Input, Output]],
*,
exceptions_to_handle: tuple[type[BaseException], ...] = (Exception,),
exception_key: str | None = None,
) -> RunnableWithFallbacks[Input, Output]
Add fallbacks to a Runnable, returning a new Runnable.
The new Runnable will try the original Runnable, and then each fallback
in order, upon failures.
| PARAMETER | DESCRIPTION |
|---|---|
fallbacks
|
A sequence of runnables to try if the original |
exceptions_to_handle
|
A tuple of exception types to handle.
TYPE:
|
exception_key
|
If If If used, the base
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
RunnableWithFallbacks[Input, Output]
|
A new |
Example
from typing import Iterator
from langchain_core.runnables import RunnableGenerator
def _generate_immediate_error(input: Iterator) -> Iterator[str]:
raise ValueError()
yield ""
def _generate(input: Iterator) -> Iterator[str]:
yield from "foo bar"
runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks(
[RunnableGenerator(_generate)]
)
print("".join(runnable.stream({}))) # foo bar
| PARAMETER | DESCRIPTION |
|---|---|
fallbacks
|
A sequence of runnables to try if the original |
exceptions_to_handle
|
A tuple of exception types to handle.
TYPE:
|
exception_key
|
If If If used, the base
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
RunnableWithFallbacks[Input, Output]
|
A new |
as_tool
¶
as_tool(
args_schema: type[BaseModel] | None = None,
*,
name: str | None = None,
description: str | None = None,
arg_types: dict[str, type] | None = None,
) -> BaseTool
Create a BaseTool from a Runnable.
as_tool will instantiate a BaseTool with a name, description, and
args_schema from a Runnable. Where possible, schemas are inferred
from runnable.get_input_schema.
Alternatively (e.g., if the Runnable takes a dict as input and the specific
dict keys are not typed), the schema can be specified directly with
args_schema.
You can also pass arg_types to just specify the required arguments and their
types.
| PARAMETER | DESCRIPTION |
|---|---|
args_schema
|
The schema for the tool. |
name
|
The name of the tool.
TYPE:
|
description
|
The description of the tool.
TYPE:
|
arg_types
|
A dictionary of argument names to types. |
| RETURNS | DESCRIPTION |
|---|---|
BaseTool
|
A |
TypedDict input
dict input, specifying schema via args_schema
from typing import Any
from pydantic import BaseModel, Field
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
class FSchema(BaseModel):
"""Apply a function to an integer and list of integers."""
a: int = Field(..., description="Integer")
b: list[int] = Field(..., description="List of ints")
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})
dict input, specifying schema via arg_types
is_lc_serializable
classmethod
¶
is_lc_serializable() -> bool
Is this class serializable?
By design, even if a class inherits from Serializable, it is not serializable
by default. This is to prevent accidental serialization of objects that should
not be serialized.
| RETURNS | DESCRIPTION |
|---|---|
bool
|
Whether the class is serializable. Default is |
get_lc_namespace
classmethod
¶
lc_id
classmethod
¶
Return a unique identifier for this class for serialization purposes.
The unique identifier is a list of strings that describes the path to the object.
For example, for the class langchain.llms.openai.OpenAI, the id is
["langchain", "llms", "openai", "OpenAI"].
to_json
¶
Serialize the Runnable to JSON.
| RETURNS | DESCRIPTION |
|---|---|
SerializedConstructor | SerializedNotImplemented
|
A JSON-serializable representation of the |
to_json_not_implemented
¶
Serialize a "not implemented" object.
| RETURNS | DESCRIPTION |
|---|---|
SerializedNotImplemented
|
|
configurable_fields
¶
configurable_fields(
**kwargs: AnyConfigurableField,
) -> RunnableSerializable[Input, Output]
Configure particular Runnable fields at runtime.
| PARAMETER | DESCRIPTION |
|---|---|
**kwargs
|
A dictionary of
TYPE:
|
| RAISES | DESCRIPTION |
|---|---|
ValueError
|
If a configuration key is not found in the |
| RETURNS | DESCRIPTION |
|---|---|
RunnableSerializable[Input, Output]
|
A new |
Example
from langchain_core.runnables import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatOpenAI(max_tokens=20).configurable_fields(
max_tokens=ConfigurableField(
id="output_token_number",
name="Max tokens in the output",
description="The maximum number of tokens in the output",
)
)
# max_tokens = 20
print(
"max_tokens_20: ", model.invoke("tell me something about chess").content
)
# max_tokens = 200
print(
"max_tokens_200: ",
model.with_config(configurable={"output_token_number": 200})
.invoke("tell me something about chess")
.content,
)
configurable_alternatives
¶
configurable_alternatives(
which: ConfigurableField,
*,
default_key: str = "default",
prefix_keys: bool = False,
**kwargs: Runnable[Input, Output] | Callable[[], Runnable[Input, Output]],
) -> RunnableSerializable[Input, Output]
Configure alternatives for Runnable objects that can be set at runtime.
| PARAMETER | DESCRIPTION |
|---|---|
which
|
The
TYPE:
|
default_key
|
The default key to use if no alternative is selected.
TYPE:
|
prefix_keys
|
Whether to prefix the keys with the
TYPE:
|
**kwargs
|
A dictionary of keys to
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
RunnableSerializable[Input, Output]
|
A new |
Example
from langchain_anthropic import ChatAnthropic
from langchain_core.runnables.utils import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatAnthropic(
model_name="claude-sonnet-4-5-20250929"
).configurable_alternatives(
ConfigurableField(id="llm"),
default_key="anthropic",
openai=ChatOpenAI(),
)
# uses the default model ChatAnthropic
print(model.invoke("which organization created you?").content)
# uses ChatOpenAI
print(
model.with_config(configurable={"llm": "openai"})
.invoke("which organization created you?")
.content
)
AsyncElasticsearchCache
¶
Bases: BaseCache
Elasticsearch LLM cache.
Caches LLM responses in Elasticsearch to avoid repeated calls for identical prompts.
Setup
Install langchain_elasticsearch and start Elasticsearch locally using
the start-local script.
This will create an elastic-start-local folder. To start Elasticsearch
and Kibana:
Elasticsearch will be available at http://localhost:9200. The password
for the elastic user and API key are stored in the .env file in the
elastic-start-local folder.
Key init args
index_name: str The name of the index or alias to use for the cache. store_input: bool Whether to store the LLM input (prompt) in the cache. Default True. store_input_params: bool Whether to store the LLM parameters in the cache. Default True. metadata: Optional[Dict[str, Any]] Additional metadata to store in the cache for filtering.
Key init args — client params: client: Optional[AsyncElasticsearch or Elasticsearch] Pre-existing Elasticsearch connection. Either provide this OR credentials. es_url: Optional[str] URL of the Elasticsearch instance to connect to. es_cloud_id: Optional[str] Cloud ID of the Elasticsearch instance to connect to. es_user: Optional[str] Username to use when connecting to Elasticsearch. es_api_key: Optional[str] API key to use when connecting to Elasticsearch. es_password: Optional[str] Password to use when connecting to Elasticsearch.
Instantiate
Instantiate with API key
Instantiate from cloud
Instantiate from existing connection
Use with LangChain
For synchronous applications, use the ElasticsearchCache class.
For asynchronous applications, use the AsyncElasticsearchCache class.
| METHOD | DESCRIPTION |
|---|---|
__init__ |
Initialize the Elasticsearch cache store. |
alookup |
Look up based on prompt and llm_string. |
build_document |
Build the Elasticsearch document for storing a single LLM interaction |
aupdate |
Update based on prompt and llm_string. |
aclear |
Clear cache. |
lookup |
Look up based on |
update |
Update cache based on |
clear |
Clear cache that can take additional keyword arguments. |
__init__
¶
__init__(
index_name: str,
*,
store_input: bool = True,
store_input_params: bool = True,
metadata: dict[str, Any] | None = None,
client: AsyncElasticsearch | None = None,
es_url: str | None = None,
es_cloud_id: str | None = None,
es_user: str | None = None,
es_api_key: str | None = None,
es_password: str | None = None,
)
Initialize the Elasticsearch cache store.
| PARAMETER | DESCRIPTION |
|---|---|
index_name
|
The name of the index or alias to use for the cache. If it doesn't exist, an index is created according to the default mapping.
TYPE:
|
store_input
|
Whether to store the LLM input (prompt) in the cache. Default True.
TYPE:
|
store_input_params
|
Whether to store the LLM parameters in the cache. Default True.
TYPE:
|
metadata
|
Additional metadata to store in the cache for filtering. Must be JSON serializable.
TYPE:
|
client
|
Pre-existing Elasticsearch connection. Either provide this OR credentials.
TYPE:
|
es_url
|
URL of the Elasticsearch instance.
TYPE:
|
es_cloud_id
|
Cloud ID of the Elasticsearch instance.
TYPE:
|
es_user
|
Username for Elasticsearch.
TYPE:
|
es_api_key
|
API key for Elasticsearch.
TYPE:
|
es_password
|
Password for Elasticsearch.
TYPE:
|
alookup
async
¶
Look up based on prompt and llm_string.
build_document
¶
Build the Elasticsearch document for storing a single LLM interaction
aupdate
async
¶
Update based on prompt and llm_string.
lookup
abstractmethod
¶
Look up based on prompt and llm_string.
A cache implementation is expected to generate a key from the 2-tuple
of prompt and llm_string (e.g., by concatenating them with a delimiter).
| PARAMETER | DESCRIPTION |
|---|---|
prompt
|
A string representation of the prompt. In the case of a chat model, the prompt is a non-trivial serialization of the prompt into the language model.
TYPE:
|
llm_string
|
A string representation of the LLM configuration. This is used to capture the invocation parameters of the LLM (e.g., model name, temperature, stop tokens, max tokens, etc.). These invocation parameters are serialized into a string representation.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
RETURN_VAL_TYPE | None
|
On a cache miss, return |
RETURN_VAL_TYPE | None
|
The cached value is a list of |
update
abstractmethod
¶
Update cache based on prompt and llm_string.
The prompt and llm_string are used to generate a key for the cache. The key should match that of the lookup method.
| PARAMETER | DESCRIPTION |
|---|---|
prompt
|
A string representation of the prompt. In the case of a chat model, the prompt is a non-trivial serialization of the prompt into the language model.
TYPE:
|
llm_string
|
A string representation of the LLM configuration. This is used to capture the invocation parameters of the LLM (e.g., model name, temperature, stop tokens, max tokens, etc.). These invocation parameters are serialized into a string representation.
TYPE:
|
return_val
|
The value to be cached. The value is a list of
TYPE:
|
AsyncElasticsearchEmbeddingsCache
¶
Bases: ByteStore
Elasticsearch embeddings cache.
Caches embeddings in Elasticsearch to avoid repeated embedding computations.
Setup
Install langchain_elasticsearch and start Elasticsearch locally using
the start-local script.
This will create an elastic-start-local folder. To start Elasticsearch
and Kibana:
Elasticsearch will be available at http://localhost:9200. The password
for the elastic user and API key are stored in the .env file in the
elastic-start-local folder.
Key init args
index_name: str The name of the index or alias to use for the cache. store_input: bool Whether to store the input text in the cache. Default True. metadata: Optional[Dict[str, Any]] Additional metadata to store in the cache for filtering. namespace: Optional[str] A namespace to organize the cache. maximum_duplicates_allowed: int Maximum duplicate keys permitted when using aliases. Default 1.
Key init args — client params: client: Optional[AsyncElasticsearch or Elasticsearch] Pre-existing Elasticsearch connection. Either provide this OR credentials. es_url: Optional[str] URL of the Elasticsearch instance to connect to. es_cloud_id: Optional[str] Cloud ID of the Elasticsearch instance to connect to. es_user: Optional[str] Username to use when connecting to Elasticsearch. es_api_key: Optional[str] API key to use when connecting to Elasticsearch. es_password: Optional[str] Password to use when connecting to Elasticsearch.
Instantiate
Instantiate with API key
Instantiate from cloud
Instantiate from existing connection
For synchronous applications, use the ElasticsearchEmbeddingsCache class.
For asynchronous applications, use the AsyncElasticsearchEmbeddingsCache
class.
| METHOD | DESCRIPTION |
|---|---|
__init__ |
Initialize the Elasticsearch embeddings cache store. |
encode_vector |
Encode the vector data as bytes to as a base64 string. |
decode_vector |
Decode the base64 string to vector data as bytes. |
amget |
Get the values associated with the given keys. |
build_document |
Build the Elasticsearch document for storing a single embedding |
amset |
Set the values for the given keys. |
amdelete |
Delete the given keys and their associated values. |
ayield_keys |
Get an iterator over keys that match the given prefix. |
__init__
¶
__init__(
index_name: str,
*,
store_input: bool = True,
metadata: dict[str, Any] | None = None,
namespace: str | None = None,
maximum_duplicates_allowed: int = 1,
client: AsyncElasticsearch | None = None,
es_url: str | None = None,
es_cloud_id: str | None = None,
es_user: str | None = None,
es_api_key: str | None = None,
es_password: str | None = None,
)
Initialize the Elasticsearch embeddings cache store.
| PARAMETER | DESCRIPTION |
|---|---|
index_name
|
The name of the index or alias to use for the cache. If it doesn't exist, an index is created according to the default mapping.
TYPE:
|
store_input
|
Whether to store the input text in the cache. Default True.
TYPE:
|
metadata
|
Additional metadata to store in the cache for filtering. Must be JSON serializable.
TYPE:
|
namespace
|
A namespace to organize the cache.
TYPE:
|
maximum_duplicates_allowed
|
Maximum duplicate keys permitted when using aliases across multiple indices. Default 1.
TYPE:
|
client
|
Pre-existing Elasticsearch connection. Either provide this OR credentials.
TYPE:
|
es_url
|
URL of the Elasticsearch instance.
TYPE:
|
es_cloud_id
|
Cloud ID of the Elasticsearch instance.
TYPE:
|
es_user
|
Username for Elasticsearch.
TYPE:
|
es_api_key
|
API key for Elasticsearch.
TYPE:
|
es_password
|
Password for Elasticsearch.
TYPE:
|
encode_vector
staticmethod
¶
Encode the vector data as bytes to as a base64 string.
decode_vector
staticmethod
¶
Decode the base64 string to vector data as bytes.
amget
async
¶
Get the values associated with the given keys.
build_document
¶
Build the Elasticsearch document for storing a single embedding
amset
async
¶
Set the values for the given keys.
amdelete
async
¶
Delete the given keys and their associated values.
ayield_keys
async
¶
ayield_keys(*, prefix: str | None = None) -> AsyncIterator[str]
Get an iterator over keys that match the given prefix.