Skip to content

langchain-elasticsearch

PyPI - Version PyPI - License PyPI - Downloads

AsyncElasticsearchEmbeddings

Bases: Embeddings

Elasticsearch embedding models.

This class provides an interface to generate embeddings using a model deployed in an Elasticsearch cluster. It requires an Elasticsearch connection and the model_id of the model deployed in the cluster.

In Elasticsearch you need to have an embedding model loaded and deployed. - https://www.elastic.co/guide/en/elasticsearch/reference/current/infer-trained-model.html - https://www.elastic.co/guide/en/machine-learning/current/ml-nlp-deploy-models.html

Setup

Install langchain_elasticsearch and start Elasticsearch locally using the start-local script.

pip install -qU langchain_elasticsearch
curl -fsSL https://elastic.co/start-local | sh

This will create an elastic-start-local folder. To start Elasticsearch and Kibana:

cd elastic-start-local
./start.sh

Elasticsearch will be available at http://localhost:9200. The password for the elastic user and API key are stored in the .env file in the elastic-start-local folder.

Key init args

model_id: str The model_id of the model deployed in the Elasticsearch cluster. input_field: str The name of the key for the input text field in the document. Defaults to 'text_field'.

Key init args — client params: client: Optional[AsyncElasticsearch or Elasticsearch] Pre-existing Elasticsearch connection. Either provide this OR credentials. es_url: Optional[str] URL of the Elasticsearch instance to connect to. es_cloud_id: Optional[str] Cloud ID of the Elasticsearch instance to connect to. es_user: Optional[str] Username to use when connecting to Elasticsearch. es_api_key: Optional[str] API key to use when connecting to Elasticsearch. es_password: Optional[str] Password to use when connecting to Elasticsearch.

Instantiate
from langchain_elasticsearch import ElasticsearchEmbeddings

embeddings = ElasticsearchEmbeddings(
    model_id="your_model_id",
    es_url="http://localhost:9200"
)

Instantiate with API key (URL):

from langchain_elasticsearch import ElasticsearchEmbeddings

embeddings = ElasticsearchEmbeddings(
    model_id="your_model_id",
    es_url="http://localhost:9200",
    es_api_key="your-api-key"
)

Instantiate with username/password (URL):

from langchain_elasticsearch import ElasticsearchEmbeddings

embeddings = ElasticsearchEmbeddings(
    model_id="your_model_id",
    es_url="http://localhost:9200",
    es_user="elastic",
    es_password="password"
)

If you want to use a cloud hosted Elasticsearch instance, you can pass in the es_cloud_id argument instead of the es_url argument.

Instantiate from cloud (with username/password):

from langchain_elasticsearch import ElasticsearchEmbeddings

embeddings = ElasticsearchEmbeddings(
    model_id="your_model_id",
    es_cloud_id="<cloud_id>",
    es_user="elastic",
    es_password="<password>"
)

Instantiate from cloud (with API key):

from langchain_elasticsearch import ElasticsearchEmbeddings

embeddings = ElasticsearchEmbeddings(
    model_id="your_model_id",
    es_cloud_id="<cloud_id>",
    es_api_key="your-api-key"
)

You can also connect to an existing Elasticsearch instance by passing in a pre-existing Elasticsearch connection via the client argument.

Instantiate from existing connection
from langchain_elasticsearch import ElasticsearchEmbeddings
from elasticsearch import Elasticsearch

client = Elasticsearch("http://localhost:9200")
embeddings = ElasticsearchEmbeddings(
    model_id="your_model_id",
    client=client
)
Generate embeddings
documents = [
    "This is an example document.",
    "Another example document to generate embeddings for.",
]
embeddings_list = embeddings.embed_documents(documents)
Generate query embedding
query_embedding = embeddings.embed_query("What is this about?")

For synchronous applications, use the ElasticsearchEmbeddings class. For asynchronous applications, use the AsyncElasticsearchEmbeddings class.

METHOD DESCRIPTION
__init__

Initialize the ElasticsearchEmbeddings instance.

aembed_documents

Generate embeddings for a list of documents.

aembed_query

Generate an embedding for a single query text.

embed_documents

Embed search docs.

embed_query

Embed query text.

__init__

__init__(
    model_id: str,
    *,
    input_field: str = "text_field",
    client: AsyncElasticsearch | None = None,
    es_url: str | None = None,
    es_cloud_id: str | None = None,
    es_user: str | None = None,
    es_api_key: str | None = None,
    es_password: str | None = None,
)

Initialize the ElasticsearchEmbeddings instance.

PARAMETER DESCRIPTION
model_id

The model_id of the model deployed in the Elasticsearch cluster.

TYPE: str

input_field

The name of the key for the input text field in the document. Defaults to 'text_field'.

TYPE: str DEFAULT: 'text_field'

client

Pre-existing Elasticsearch connection. Either provide this OR credentials.

TYPE: AsyncElasticsearch or Elasticsearch DEFAULT: None

es_url

URL of the Elasticsearch instance to connect to.

TYPE: str DEFAULT: None

es_cloud_id

Cloud ID of the Elasticsearch instance.

TYPE: str DEFAULT: None

es_user

Username to use when connecting to Elasticsearch.

TYPE: str DEFAULT: None

es_api_key

API key to use when connecting to Elasticsearch.

TYPE: str DEFAULT: None

es_password

Password to use when connecting to Elasticsearch.

TYPE: str DEFAULT: None

aembed_documents async

aembed_documents(texts: list[str]) -> list[list[float]]

Generate embeddings for a list of documents.

PARAMETER DESCRIPTION
texts

A list of document text strings to generate embeddings for.

TYPE: list[str]

RETURNS DESCRIPTION
list[list[float]]

List[List[float]]: A list of embeddings, one for each document in the input list.

aembed_query async

aembed_query(text: str) -> list[float]

Generate an embedding for a single query text.

PARAMETER DESCRIPTION
text

The query text to generate an embedding for.

TYPE: str

RETURNS DESCRIPTION
list[float]

List[float]: The embedding for the input query text.

embed_documents abstractmethod

embed_documents(texts: list[str]) -> list[list[float]]

Embed search docs.

PARAMETER DESCRIPTION
texts

List of text to embed.

TYPE: list[str]

RETURNS DESCRIPTION
list[list[float]]

List of embeddings.

embed_query abstractmethod

embed_query(text: str) -> list[float]

Embed query text.

PARAMETER DESCRIPTION
text

Text to embed.

TYPE: str

RETURNS DESCRIPTION
list[float]

Embedding.

AsyncElasticsearchStore

Bases: VectorStore

Elasticsearch vector store.

Setup

Install langchain_elasticsearch and start Elasticsearch locally using the start-local script.

pip install -qU langchain_elasticsearch
curl -fsSL https://elastic.co/start-local | sh

This will create an elastic-start-local folder. To start Elasticsearch and Kibana:

cd elastic-start-local
./start.sh

Elasticsearch will be available at http://localhost:9200. The password for the elastic user and API key are stored in the .env file in the elastic-start-local folder.

Key init args — indexing params: index_name: str Name of the index to create. embedding: Embeddings Embedding function to use. custom_index_settings: Optional[Dict[str, Any]] A dictionary of custom settings for the index. This can include configurations like the number of shards, number of replicas, analysis settings, and other index-specific settings. If not provided, default settings will be used. Note that if the same setting is provided by both the user and the strategy, will raise an error.

Key init args — client params: client: Optional[Elasticsearch or AsyncElasticsearch] Pre-existing Elasticsearch connection. Either provide this OR credentials. es_url: Optional[str] URL of the Elasticsearch instance to connect to. es_cloud_id: Optional[str] Cloud ID of the Elasticsearch instance to connect to. es_user: Optional[str] Username to use when connecting to Elasticsearch. es_password: Optional[str] Password to use when connecting to Elasticsearch. es_api_key: Optional[str] API key to use when connecting to Elasticsearch. es_params: Optional[Dict[str, Any]] Additional parameters for the Elasticsearch client.

Instantiate
from langchain_elasticsearch import ElasticsearchStore
from langchain_openai import OpenAIEmbeddings

vector_store = ElasticsearchStore(
    index_name="langchain-demo",
    embedding=OpenAIEmbeddings(),
    es_url="http://localhost:9200",
)

Instantiate with API key (URL):

from langchain_elasticsearch import ElasticsearchStore
from langchain_openai import OpenAIEmbeddings

store = ElasticsearchStore(
    index_name="langchain-demo",
    embedding=OpenAIEmbeddings(),
    es_url="http://localhost:9200",
    es_api_key="your-api-key"
)

Instantiate with username/password (URL):

from langchain_elasticsearch import ElasticsearchStore
from langchain_openai import OpenAIEmbeddings

store = ElasticsearchStore(
    index_name="langchain-demo",
    embedding=OpenAIEmbeddings(),
    es_url="http://localhost:9200",
    es_user="elastic",
    es_password="password"
)

If you want to use a cloud hosted Elasticsearch instance, you can pass in the cloud_id argument instead of the es_url argument.

Instantiate from cloud (with username/password):

from langchain_elasticsearch.vectorstores import ElasticsearchStore
from langchain_openai import OpenAIEmbeddings

store = ElasticsearchStore(
    embedding=OpenAIEmbeddings(),
    index_name="langchain-demo",
    es_cloud_id="<cloud_id>",
    es_user="elastic",
    es_password="<password>"
)

Instantiate from cloud (with API key):

from langchain_elasticsearch.vectorstores import ElasticsearchStore
from langchain_openai import OpenAIEmbeddings

store = ElasticsearchStore(
    embedding=OpenAIEmbeddings(),
    index_name="langchain-demo",
    es_cloud_id="<cloud_id>",
    es_api_key="your-api-key"
)

You can also connect to an existing Elasticsearch instance by passing in a pre-existing Elasticsearch connection via the client argument.

Instantiate from existing connection
from langchain_elasticsearch.vectorstores import ElasticsearchStore
from langchain_openai import OpenAIEmbeddings
from elasticsearch import Elasticsearch

client = Elasticsearch("http://localhost:9200")

store = ElasticsearchStore(
    embedding=OpenAIEmbeddings(),
    index_name="langchain-demo",
    client=client
)

Class methods (afrom_texts, afrom_documents) accept the same connection options:

Instantiate from texts with credentials
from langchain_elasticsearch import ElasticsearchStore

store = await ElasticsearchStore.afrom_texts(
    texts=["text1", "text2"],
    index_name="langchain-demo",
    es_url="http://localhost:9200"
)
Instantiate from texts with client
from langchain_elasticsearch import ElasticsearchStore
from elasticsearch import Elasticsearch

client = Elasticsearch("http://localhost:9200")
store = await ElasticsearchStore.afrom_texts(
    texts=["text1", "text2"],
    index_name="langchain-demo",
    client=client
)
Add Documents
from langchain_core.documents import Document

document_1 = Document(page_content="foo", metadata={"baz": "bar"})
document_2 = Document(page_content="thud", metadata={"bar": "baz"})
document_3 = Document(page_content="i will be deleted :(")

documents = [document_1, document_2, document_3]
ids = ["1", "2", "3"]
vector_store.add_documents(documents=documents, ids=ids)
Delete Documents
vector_store.delete(ids=["3"])
Search with filter
results = vector_store.similarity_search(query="thud",k=1,filter=[{"term": {"metadata.bar.keyword": "baz"}}])
for doc in results:
    print(f"* {doc.page_content} [{doc.metadata}]")
* thud [{'bar': 'baz'}]
Search with score
results = vector_store.similarity_search_with_score(query="qux",k=1)
for doc, score in results:
    print(f"* [SIM={score:3f}] {doc.page_content} [{doc.metadata}]")
* [SIM=0.916092] foo [{'baz': 'bar'}]
Async
from langchain_elasticsearch import AsyncElasticsearchStore

vector_store = AsyncElasticsearchStore(...)

# add documents
await vector_store.aadd_documents(documents=documents, ids=ids)

# delete documents
await vector_store.adelete(ids=["3"])

# search
results = vector_store.asimilarity_search(query="thud",k=1)

# search with score
results = await vector_store.asimilarity_search_with_score(query="qux",k=1)
for doc,score in results:
    print(f"* [SIM={score:3f}] {doc.page_content} [{doc.metadata}]")
* [SIM=0.916092] foo [{'baz': 'bar'}]

Use as Retriever:

```bash
pip install "elasticsearch[vectorstore_mmr]"
```

```python
retriever = vector_store.as_retriever(
    search_type="mmr",
    search_kwargs={"k": 1, "fetch_k": 2, "lambda_mult": 0.5},
)
retriever.invoke("thud")
```

```python
[Document(metadata={'bar': 'baz'}, page_content='thud')]
```

Advanced Uses:

ElasticsearchStore by default uses the ApproxRetrievalStrategy, which uses the HNSW algorithm to perform approximate nearest neighbor search. This is the fastest and most memory efficient algorithm.

If you want to use the Brute force / Exact strategy for searching vectors, you can pass in the ExactRetrievalStrategy to the ElasticsearchStore constructor.

Use ExactRetrievalStrategy
from langchain_elasticsearch.vectorstores import ElasticsearchStore
from langchain_openai import OpenAIEmbeddings

store = ElasticsearchStore(
    embedding=OpenAIEmbeddings(),
    index_name="langchain-demo",
    es_url="http://localhost:9200",
    strategy=ElasticsearchStore.ExactRetrievalStrategy()
)

Both strategies require that you know the similarity metric you want to use when creating the index. The default is cosine similarity, but you can also use dot product or euclidean distance.

Use dot product similarity
from langchain_elasticsearch.vectorstores import ElasticsearchStore
from langchain_openai import OpenAIEmbeddings
from langchain_elasticsearch import DistanceStrategy

store = ElasticsearchStore(
    "langchain-demo",
    embedding=OpenAIEmbeddings(),
    es_url="http://localhost:9200",
    distance_strategy="DOT_PRODUCT"
)
METHOD DESCRIPTION
asimilarity_search

Return Elasticsearch documents most similar to query.

amax_marginal_relevance_search

Return docs selected using the maximal marginal relevance.

asimilarity_search_with_score

Return Elasticsearch documents most similar to query, along with scores.

asimilarity_search_by_vector_with_relevance_scores

Return Elasticsearch documents most similar to query, along with scores.

adelete

Delete documents from the Elasticsearch index.

aadd_texts

Run more texts through the embeddings and add to the store.

aadd_embeddings

Add the given texts and embeddings to the store.

afrom_texts

Construct ElasticsearchStore wrapper from raw documents.

afrom_documents

Construct ElasticsearchStore wrapper from documents.

ExactRetrievalStrategy

Used to perform brute force / exact

ApproxRetrievalStrategy

Used to perform approximate nearest neighbor search

SparseVectorRetrievalStrategy

Used to perform sparse vector search via text_expansion.

BM25RetrievalStrategy

Used to apply BM25 without vector search.

add_texts

Run more texts through the embeddings and add to the VectorStore.

delete

Delete by vector ID or other criteria.

get_by_ids

Get documents by their IDs.

aget_by_ids

Async get documents by their IDs.

add_documents

Add or update documents in the VectorStore.

aadd_documents

Async run more documents through the embeddings and add to the VectorStore.

search

Return docs most similar to query using a specified search type.

asearch

Async return docs most similar to query using a specified search type.

similarity_search

Return docs most similar to query.

similarity_search_with_score

Run similarity search with distance.

similarity_search_with_relevance_scores

Return docs and relevance scores in the range [0, 1].

asimilarity_search_with_relevance_scores

Async return docs and relevance scores in the range [0, 1].

similarity_search_by_vector

Return docs most similar to embedding vector.

asimilarity_search_by_vector

Async return docs most similar to embedding vector.

max_marginal_relevance_search

Return docs selected using the maximal marginal relevance.

max_marginal_relevance_search_by_vector

Return docs selected using the maximal marginal relevance.

amax_marginal_relevance_search_by_vector

Async return docs selected using the maximal marginal relevance.

from_documents

Return VectorStore initialized from documents and embeddings.

from_texts

Return VectorStore initialized from texts and embeddings.

as_retriever

Return VectorStoreRetriever initialized from this VectorStore.

embeddings property

embeddings: Embeddings | None

Access the query embedding object if available.

asimilarity_search(
    query: str,
    k: int = 4,
    fetch_k: int = 50,
    filter: list[dict] | None = None,
    *,
    custom_query: Callable[[Dict[str, Any], Optional[str]], dict[str, Any]]
    | None = None,
    doc_builder: Callable[[Dict], Document] | None = None,
    **kwargs: Any,
) -> list[Document]

Return Elasticsearch documents most similar to query.

PARAMETER DESCRIPTION
query

Text to look up documents similar to.

TYPE: str

k

Number of Documents to return. Defaults to 4.

TYPE: int DEFAULT: 4

fetch_k

Number of Documents to fetch to pass to knn num_candidates.

TYPE: int DEFAULT: 50

filter

Array of Elasticsearch filter clauses to apply to the query.

TYPE: list[dict] | None DEFAULT: None

RETURNS DESCRIPTION
list[Document]

List of Documents most similar to the query,

list[Document]

in descending order of similarity.

amax_marginal_relevance_search(
    query: str,
    k: int = 4,
    fetch_k: int = 20,
    lambda_mult: float = 0.5,
    fields: list[str] | None = None,
    *,
    custom_query: Callable[[Dict[str, Any], Optional[str]], dict[str, Any]]
    | None = None,
    doc_builder: Callable[[Dict], Document] | None = None,
    **kwargs: Any,
) -> list[Document]

Return docs selected using the maximal marginal relevance.

Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents.

PARAMETER DESCRIPTION
query

Text to look up documents similar to.

TYPE: str

k

Number of Documents to return. Defaults to 4.

TYPE: int DEFAULT: 4

fetch_k

Number of Documents to fetch to pass to MMR algorithm.

TYPE: int DEFAULT: 20

lambda_mult

Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5.

TYPE: float DEFAULT: 0.5

fields

Other fields to get from elasticsearch source. These fields will be added to the document metadata.

TYPE: list[str] | None DEFAULT: None

RETURNS DESCRIPTION
list[Document]

List[Document]: A list of Documents selected by maximal marginal relevance.

asimilarity_search_with_score async

asimilarity_search_with_score(
    query: str,
    k: int = 4,
    filter: list[dict] | None = None,
    *,
    custom_query: Callable[[Dict[str, Any], Optional[str]], dict[str, Any]]
    | None = None,
    doc_builder: Callable[[Dict], Document] | None = None,
    **kwargs: Any,
) -> list[tuple[Document, float]]

Return Elasticsearch documents most similar to query, along with scores.

PARAMETER DESCRIPTION
query

Text to look up documents similar to.

TYPE: str

k

Number of Documents to return. Defaults to 4.

TYPE: int DEFAULT: 4

filter

Array of Elasticsearch filter clauses to apply to the query.

TYPE: list[dict] | None DEFAULT: None

RETURNS DESCRIPTION
list[tuple[Document, float]]

List of Documents most similar to the query and score for each

asimilarity_search_by_vector_with_relevance_scores async

asimilarity_search_by_vector_with_relevance_scores(
    embedding: list[float],
    k: int = 4,
    filter: list[dict] | None = None,
    *,
    custom_query: Callable[[Dict[str, Any], Optional[str]], dict[str, Any]]
    | None = None,
    doc_builder: Callable[[Dict], Document] | None = None,
    **kwargs: Any,
) -> list[tuple[Document, float]]

Return Elasticsearch documents most similar to query, along with scores.

PARAMETER DESCRIPTION
embedding

Embedding to look up documents similar to.

TYPE: list[float]

k

Number of Documents to return. Defaults to 4.

TYPE: int DEFAULT: 4

filter

Array of Elasticsearch filter clauses to apply to the query.

TYPE: list[dict] | None DEFAULT: None

RETURNS DESCRIPTION
list[tuple[Document, float]]

List of Documents most similar to the embedding and score for each

adelete async

adelete(
    ids: list[str] | None = None, refresh_indices: bool | None = True, **kwargs: Any
) -> bool | None

Delete documents from the Elasticsearch index.

PARAMETER DESCRIPTION
ids

List of ids of documents to delete.

TYPE: list[str] | None DEFAULT: None

refresh_indices

Whether to refresh the index after deleting documents. Defaults to True.

TYPE: bool | None DEFAULT: True

aadd_texts async

aadd_texts(
    texts: Iterable[str],
    metadatas: list[dict[Any, Any]] | None = None,
    ids: list[str] | None = None,
    refresh_indices: bool = True,
    create_index_if_not_exists: bool = True,
    bulk_kwargs: dict | None = None,
    **kwargs: Any,
) -> list[str]

Run more texts through the embeddings and add to the store.

PARAMETER DESCRIPTION
texts

Iterable of strings to add to the store.

TYPE: Iterable[str]

metadatas

Optional list of metadatas associated with the texts.

TYPE: list[dict[Any, Any]] | None DEFAULT: None

ids

Optional list of ids to associate with the texts.

TYPE: list[str] | None DEFAULT: None

refresh_indices

Whether to refresh the Elasticsearch indices after adding the texts.

TYPE: bool DEFAULT: True

create_index_if_not_exists

Whether to create the Elasticsearch index if it doesn't already exist.

TYPE: bool DEFAULT: True

*bulk_kwargs

Additional arguments to pass to Elasticsearch bulk. - chunk_size: Optional. Number of texts to add to the index at a time. Defaults to 500.

TYPE: dict | None DEFAULT: None

RETURNS DESCRIPTION
list[str]

List of ids from adding the texts into the store.

aadd_embeddings async

aadd_embeddings(
    text_embeddings: Iterable[tuple[str, list[float]]],
    metadatas: list[dict] | None = None,
    ids: list[str] | None = None,
    refresh_indices: bool = True,
    create_index_if_not_exists: bool = True,
    bulk_kwargs: dict | None = None,
    **kwargs: Any,
) -> list[str]

Add the given texts and embeddings to the store.

PARAMETER DESCRIPTION
text_embeddings

Iterable pairs of string and embedding to add to the store.

TYPE: Iterable[tuple[str, list[float]]]

metadatas

Optional list of metadatas associated with the texts.

TYPE: list[dict] | None DEFAULT: None

ids

Optional list of unique IDs.

TYPE: list[str] | None DEFAULT: None

refresh_indices

Whether to refresh the Elasticsearch indices after adding the texts.

TYPE: bool DEFAULT: True

create_index_if_not_exists

Whether to create the Elasticsearch index if it doesn't already exist.

TYPE: bool DEFAULT: True

*bulk_kwargs

Additional arguments to pass to Elasticsearch bulk. - chunk_size: Optional. Number of texts to add to the index at a time. Defaults to 500.

TYPE: dict | None DEFAULT: None

RETURNS DESCRIPTION
list[str]

List of ids from adding the texts into the store.

afrom_texts async classmethod

afrom_texts(
    texts: list[str],
    embedding: Embeddings | None = None,
    metadatas: list[dict[str, Any]] | None = None,
    bulk_kwargs: dict | None = None,
    client: AsyncElasticsearch | None = None,
    **kwargs: Any,
) -> AsyncElasticsearchStore

Construct ElasticsearchStore wrapper from raw documents.

Example
from langchain_elasticsearch.vectorstores import ElasticsearchStore
from langchain_openai import OpenAIEmbeddings

db = ElasticsearchStore.from_texts(
    texts,
    // embeddings optional if using
    // a strategy that doesn't require inference
    embeddings,
    index_name="langchain-demo",
    es_url="http://localhost:9200"
)
PARAMETER DESCRIPTION
texts

List of texts to add to the Elasticsearch index.

TYPE: list[str]

embedding

Embedding function to use to embed the texts.

TYPE: Embeddings | None DEFAULT: None

metadatas

Optional list of metadatas associated with the texts.

TYPE: list[dict[str, Any]] | None DEFAULT: None

bulk_kwargs

Optional. Additional arguments to pass to Elasticsearch bulk.

TYPE: dict | None DEFAULT: None

client

Optional pre-existing client connection.

Alternatively, provide credentials (es_url, es_cloud_id, etc.).

TYPE: AsyncElasticsearch | None DEFAULT: None

**kwargs

Additional keyword arguments passed to the constructor.

See AsyncElasticsearchStore for supported options including index_name, es_url, cloud_id, es_user, es_password, es_api_key, vector_query_field, query_field, and distance_strategy.

TYPE: Any DEFAULT: {}

afrom_documents async classmethod

afrom_documents(
    documents: list[Document],
    embedding: Embeddings | None = None,
    bulk_kwargs: dict | None = None,
    client: AsyncElasticsearch | None = None,
    **kwargs: Any,
) -> AsyncElasticsearchStore

Construct ElasticsearchStore wrapper from documents.

Example
from langchain_elasticsearch.vectorstores import ElasticsearchStore
from langchain_openai import OpenAIEmbeddings

db = ElasticsearchStore.from_documents(
    texts,
    embeddings,
    index_name="langchain-demo",
    es_url="http://localhost:9200"
)
PARAMETER DESCRIPTION
documents

List of documents to add to the Elasticsearch index.

TYPE: list[Document]

embedding

Embedding function to use to embed the texts. Do not provide if using a strategy that doesn't require inference.

TYPE: Embeddings | None DEFAULT: None

bulk_kwargs

Optional. Additional arguments to pass to Elasticsearch bulk.

TYPE: dict | None DEFAULT: None

client

Optional pre-existing client connection.

Alternatively, provide credentials (es_url, es_cloud_id, etc.).

TYPE: AsyncElasticsearch | None DEFAULT: None

**kwargs

Additional keyword arguments passed to the constructor.

See AsyncElasticsearchStore for supported options including index_name, es_url, cloud_id, es_user, es_password, es_api_key, vector_query_field, and query_field.

TYPE: Any DEFAULT: {}

ExactRetrievalStrategy staticmethod

ExactRetrievalStrategy() -> ExactRetrievalStrategy

Used to perform brute force / exact nearest neighbor search via script_score.

ApproxRetrievalStrategy staticmethod

ApproxRetrievalStrategy(
    query_model_id: str | None = None,
    hybrid: bool | None = False,
    rrf: dict | bool | None = True,
) -> ApproxRetrievalStrategy

Used to perform approximate nearest neighbor search using the HNSW algorithm.

At build index time, this strategy will create a dense vector field in the index and store the embedding vectors in the index.

At query time, the text will either be embedded using the provided embedding function or the query_model_id will be used to embed the text using the model deployed to Elasticsearch.

if query_model_id is used, do not provide an embedding function.

PARAMETER DESCRIPTION
query_model_id

Optional. ID of the model to use to embed the query text within the stack. Requires embedding model to be deployed to Elasticsearch.

TYPE: str | None DEFAULT: None

hybrid

Optional. If True, will perform a hybrid search using both the knn query and a text query. Defaults to False.

TYPE: bool | None DEFAULT: False

rrf

Optional. rrf is Reciprocal Rank Fusion. When hybrid is True, and rrf is True, then rrf: {}. and rrf is False, then rrf is omitted. and isinstance(rrf, dict) is True, then pass in the dict values. rrf could be passed for adjusting 'rank_constant' and 'window_size'.

TYPE: dict | bool | None DEFAULT: True

SparseVectorRetrievalStrategy staticmethod

SparseVectorRetrievalStrategy(model_id: str | None = None) -> SparseRetrievalStrategy

Used to perform sparse vector search via text_expansion. Used for when you want to use ELSER model to perform document search.

At build index time, this strategy will create a pipeline that will embed the text using the ELSER model and store the resulting tokens in the index.

At query time, the text will be embedded using the ELSER model and the resulting tokens will be used to perform a text_expansion query.

PARAMETER DESCRIPTION
model_id

Optional. Default is ".elser_model_1". ID of the model to use to embed the query text within the stack. Requires embedding model to be deployed to Elasticsearch.

TYPE: str | None DEFAULT: None

BM25RetrievalStrategy staticmethod

BM25RetrievalStrategy(
    k1: float | None = None, b: float | None = None
) -> BM25RetrievalStrategy

Used to apply BM25 without vector search.

PARAMETER DESCRIPTION
k1

Optional. This corresponds to the BM25 parameter, k1. Default is None, which uses the default setting of Elasticsearch.

TYPE: float | None DEFAULT: None

b

Optional. This corresponds to the BM25 parameter, b. Default is None, which uses the default setting of Elasticsearch.

TYPE: float | None DEFAULT: None

add_texts

add_texts(
    texts: Iterable[str],
    metadatas: list[dict] | None = None,
    *,
    ids: list[str] | None = None,
    **kwargs: Any,
) -> list[str]

Run more texts through the embeddings and add to the VectorStore.

PARAMETER DESCRIPTION
texts

Iterable of strings to add to the VectorStore.

TYPE: Iterable[str]

metadatas

Optional list of metadatas associated with the texts.

TYPE: list[dict] | None DEFAULT: None

ids

Optional list of IDs associated with the texts.

TYPE: list[str] | None DEFAULT: None

**kwargs

VectorStore specific parameters. One of the kwargs should be ids which is a list of ids associated with the texts.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
list[str]

List of IDs from adding the texts into the VectorStore.

RAISES DESCRIPTION
ValueError

If the number of metadatas does not match the number of texts.

ValueError

If the number of IDs does not match the number of texts.

delete

delete(ids: list[str] | None = None, **kwargs: Any) -> bool | None

Delete by vector ID or other criteria.

PARAMETER DESCRIPTION
ids

List of IDs to delete. If None, delete all.

TYPE: list[str] | None DEFAULT: None

**kwargs

Other keyword arguments that subclasses might use.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
bool | None

True if deletion is successful, False otherwise, None if not implemented.

get_by_ids

get_by_ids(ids: Sequence[str]) -> list[Document]

Get documents by their IDs.

The returned documents are expected to have the ID field set to the ID of the document in the vector store.

Fewer documents may be returned than requested if some IDs are not found or if there are duplicated IDs.

Users should not assume that the order of the returned documents matches the order of the input IDs. Instead, users should rely on the ID field of the returned documents.

This method should NOT raise exceptions if no documents are found for some IDs.

PARAMETER DESCRIPTION
ids

List of IDs to retrieve.

TYPE: Sequence[str]

RETURNS DESCRIPTION
list[Document]

List of Document objects.

aget_by_ids async

aget_by_ids(ids: Sequence[str]) -> list[Document]

Async get documents by their IDs.

The returned documents are expected to have the ID field set to the ID of the document in the vector store.

Fewer documents may be returned than requested if some IDs are not found or if there are duplicated IDs.

Users should not assume that the order of the returned documents matches the order of the input IDs. Instead, users should rely on the ID field of the returned documents.

This method should NOT raise exceptions if no documents are found for some IDs.

PARAMETER DESCRIPTION
ids

List of IDs to retrieve.

TYPE: Sequence[str]

RETURNS DESCRIPTION
list[Document]

List of Document objects.

add_documents

add_documents(documents: list[Document], **kwargs: Any) -> list[str]

Add or update documents in the VectorStore.

PARAMETER DESCRIPTION
documents

Documents to add to the VectorStore.

TYPE: list[Document]

**kwargs

Additional keyword arguments.

If kwargs contains IDs and documents contain ids, the IDs in the kwargs will receive precedence.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
list[str]

List of IDs of the added texts.

aadd_documents async

aadd_documents(documents: list[Document], **kwargs: Any) -> list[str]

Async run more documents through the embeddings and add to the VectorStore.

PARAMETER DESCRIPTION
documents

Documents to add to the VectorStore.

TYPE: list[Document]

**kwargs

Additional keyword arguments.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
list[str]

List of IDs of the added texts.

search

search(query: str, search_type: str, **kwargs: Any) -> list[Document]

Return docs most similar to query using a specified search type.

PARAMETER DESCRIPTION
query

Input text.

TYPE: str

search_type

Type of search to perform.

Can be 'similarity', 'mmr', or 'similarity_score_threshold'.

TYPE: str

**kwargs

Arguments to pass to the search method.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
list[Document]

List of Document objects most similar to the query.

RAISES DESCRIPTION
ValueError

If search_type is not one of 'similarity', 'mmr', or 'similarity_score_threshold'.

asearch async

asearch(query: str, search_type: str, **kwargs: Any) -> list[Document]

Async return docs most similar to query using a specified search type.

PARAMETER DESCRIPTION
query

Input text.

TYPE: str

search_type

Type of search to perform.

Can be 'similarity', 'mmr', or 'similarity_score_threshold'.

TYPE: str

**kwargs

Arguments to pass to the search method.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
list[Document]

List of Document objects most similar to the query.

RAISES DESCRIPTION
ValueError

If search_type is not one of 'similarity', 'mmr', or 'similarity_score_threshold'.

similarity_search(query: str, k: int = 4, **kwargs: Any) -> list[Document]

Return docs most similar to query.

PARAMETER DESCRIPTION
query

Input text.

TYPE: str

k

Number of Document objects to return.

TYPE: int DEFAULT: 4

**kwargs

Arguments to pass to the search method.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
list[Document]

List of Document objects most similar to the query.

similarity_search_with_score

similarity_search_with_score(*args: Any, **kwargs: Any) -> list[tuple[Document, float]]

Run similarity search with distance.

PARAMETER DESCRIPTION
*args

Arguments to pass to the search method.

TYPE: Any DEFAULT: ()

**kwargs

Arguments to pass to the search method.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
list[tuple[Document, float]]

List of tuples of (doc, similarity_score).

similarity_search_with_relevance_scores

similarity_search_with_relevance_scores(
    query: str, k: int = 4, **kwargs: Any
) -> list[tuple[Document, float]]

Return docs and relevance scores in the range [0, 1].

0 is dissimilar, 1 is most similar.

PARAMETER DESCRIPTION
query

Input text.

TYPE: str

k

Number of Document objects to return.

TYPE: int DEFAULT: 4

**kwargs

Kwargs to be passed to similarity search.

Should include score_threshold, an optional floating point value between 0 to 1 to filter the resulting set of retrieved docs.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
list[tuple[Document, float]]

List of tuples of (doc, similarity_score).

asimilarity_search_with_relevance_scores async

asimilarity_search_with_relevance_scores(
    query: str, k: int = 4, **kwargs: Any
) -> list[tuple[Document, float]]

Async return docs and relevance scores in the range [0, 1].

0 is dissimilar, 1 is most similar.

PARAMETER DESCRIPTION
query

Input text.

TYPE: str

k

Number of Document objects to return.

TYPE: int DEFAULT: 4

**kwargs

Kwargs to be passed to similarity search.

Should include score_threshold, an optional floating point value between 0 to 1 to filter the resulting set of retrieved docs.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
list[tuple[Document, float]]

List of tuples of (doc, similarity_score)

similarity_search_by_vector

similarity_search_by_vector(
    embedding: list[float], k: int = 4, **kwargs: Any
) -> list[Document]

Return docs most similar to embedding vector.

PARAMETER DESCRIPTION
embedding

Embedding to look up documents similar to.

TYPE: list[float]

k

Number of Document objects to return.

TYPE: int DEFAULT: 4

**kwargs

Arguments to pass to the search method.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
list[Document]

List of Document objects most similar to the query vector.

asimilarity_search_by_vector async

asimilarity_search_by_vector(
    embedding: list[float], k: int = 4, **kwargs: Any
) -> list[Document]

Async return docs most similar to embedding vector.

PARAMETER DESCRIPTION
embedding

Embedding to look up documents similar to.

TYPE: list[float]

k

Number of Document objects to return.

TYPE: int DEFAULT: 4

**kwargs

Arguments to pass to the search method.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
list[Document]

List of Document objects most similar to the query vector.

max_marginal_relevance_search(
    query: str, k: int = 4, fetch_k: int = 20, lambda_mult: float = 0.5, **kwargs: Any
) -> list[Document]

Return docs selected using the maximal marginal relevance.

Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents.

PARAMETER DESCRIPTION
query

Text to look up documents similar to.

TYPE: str

k

Number of Document objects to return.

TYPE: int DEFAULT: 4

fetch_k

Number of Document objects to fetch to pass to MMR algorithm.

TYPE: int DEFAULT: 20

lambda_mult

Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity.

TYPE: float DEFAULT: 0.5

**kwargs

Arguments to pass to the search method.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
list[Document]

List of Document objects selected by maximal marginal relevance.

max_marginal_relevance_search_by_vector

max_marginal_relevance_search_by_vector(
    embedding: list[float],
    k: int = 4,
    fetch_k: int = 20,
    lambda_mult: float = 0.5,
    **kwargs: Any,
) -> list[Document]

Return docs selected using the maximal marginal relevance.

Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents.

PARAMETER DESCRIPTION
embedding

Embedding to look up documents similar to.

TYPE: list[float]

k

Number of Document objects to return.

TYPE: int DEFAULT: 4

fetch_k

Number of Document objects to fetch to pass to MMR algorithm.

TYPE: int DEFAULT: 20

lambda_mult

Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity.

TYPE: float DEFAULT: 0.5

**kwargs

Arguments to pass to the search method.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
list[Document]

List of Document objects selected by maximal marginal relevance.

amax_marginal_relevance_search_by_vector async

amax_marginal_relevance_search_by_vector(
    embedding: list[float],
    k: int = 4,
    fetch_k: int = 20,
    lambda_mult: float = 0.5,
    **kwargs: Any,
) -> list[Document]

Async return docs selected using the maximal marginal relevance.

Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents.

PARAMETER DESCRIPTION
embedding

Embedding to look up documents similar to.

TYPE: list[float]

k

Number of Document objects to return.

TYPE: int DEFAULT: 4

fetch_k

Number of Document objects to fetch to pass to MMR algorithm.

TYPE: int DEFAULT: 20

lambda_mult

Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity.

TYPE: float DEFAULT: 0.5

**kwargs

Arguments to pass to the search method.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
list[Document]

List of Document objects selected by maximal marginal relevance.

from_documents classmethod

from_documents(documents: list[Document], embedding: Embeddings, **kwargs: Any) -> Self

Return VectorStore initialized from documents and embeddings.

PARAMETER DESCRIPTION
documents

List of Document objects to add to the VectorStore.

TYPE: list[Document]

embedding

Embedding function to use.

TYPE: Embeddings

**kwargs

Additional keyword arguments.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
Self

VectorStore initialized from documents and embeddings.

from_texts abstractmethod classmethod

from_texts(
    texts: list[str],
    embedding: Embeddings,
    metadatas: list[dict] | None = None,
    *,
    ids: list[str] | None = None,
    **kwargs: Any,
) -> VST

Return VectorStore initialized from texts and embeddings.

PARAMETER DESCRIPTION
texts

Texts to add to the VectorStore.

TYPE: list[str]

embedding

Embedding function to use.

TYPE: Embeddings

metadatas

Optional list of metadatas associated with the texts.

TYPE: list[dict] | None DEFAULT: None

ids

Optional list of IDs associated with the texts.

TYPE: list[str] | None DEFAULT: None

**kwargs

Additional keyword arguments.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
VST

VectorStore initialized from texts and embeddings.

as_retriever

as_retriever(**kwargs: Any) -> VectorStoreRetriever

Return VectorStoreRetriever initialized from this VectorStore.

PARAMETER DESCRIPTION
**kwargs

Keyword arguments to pass to the search function.

Can include:

  • search_type: Defines the type of search that the Retriever should perform. Can be 'similarity' (default), 'mmr', or 'similarity_score_threshold'.
  • search_kwargs: Keyword arguments to pass to the search function.

    Can include things like:

    • k: Amount of documents to return (Default: 4)
    • score_threshold: Minimum relevance threshold for similarity_score_threshold
    • fetch_k: Amount of documents to pass to MMR algorithm (Default: 20)
    • lambda_mult: Diversity of results returned by MMR; 1 for minimum diversity and 0 for maximum. (Default: 0.5)
    • filter: Filter by document metadata

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
VectorStoreRetriever

Retriever class for VectorStore.

Examples:

# Retrieve more documents with higher diversity
# Useful if your dataset has many similar documents
docsearch.as_retriever(
    search_type="mmr", search_kwargs={"k": 6, "lambda_mult": 0.25}
)

# Fetch more documents for the MMR algorithm to consider
# But only return the top 5
docsearch.as_retriever(search_type="mmr", search_kwargs={"k": 5, "fetch_k": 50})

# Only retrieve documents that have a relevance score
# Above a certain threshold
docsearch.as_retriever(
    search_type="similarity_score_threshold",
    search_kwargs={"score_threshold": 0.8},
)

# Only get the single most similar document from the dataset
docsearch.as_retriever(search_kwargs={"k": 1})

# Use a filter to only retrieve documents from a specific paper
docsearch.as_retriever(
    search_kwargs={"filter": {"paper_title": "GPT-4 Technical Report"}}
)

AsyncElasticsearchRetriever

Bases: BaseRetriever

Elasticsearch retriever.

Setup:
    Install `langchain_elasticsearch` and start Elasticsearch locally using
    the start-local script.

    ```bash
    pip install -qU langchain_elasticsearch
    curl -fsSL https://elastic.co/start-local | sh
    ```

    This will create an `elastic-start-local` folder. To start Elasticsearch
    and Kibana:
    ```bash
    cd elastic-start-local
    ./start.sh
    ```

    Elasticsearch will be available at `http://localhost:9200`. The password
    for the `elastic` user and API key are stored in the `.env` file in the
    `elastic-start-local` folder.

Key init args — query params:
    index_name: Union[str, Sequence[str]]
        The name of the index to query. Can also be a list of names.
    body_func: Callable[[str], Dict]
        Function to create an Elasticsearch DSL query body from a search string.
        The returned query body must fit what you would normally send in a POST
        request to the _search endpoint. If applicable, it also includes parameters
        like the `size` parameter etc.
    content_field: Optional[Union[str, Mapping[str, str]]]
        The document field name that contains the page content. If multiple indices
        are queried, specify a dict {index_name: field_name} here.
    document_mapper: Optional[Callable[[Mapping], Document]]
        Function to map Elasticsearch hits to LangChain Documents. If not provided,
        will be automatically created based on content_field.

Key init args — client params:
    client: Optional[AsyncElasticsearch or Elasticsearch]
        Pre-existing Elasticsearch connection. Either provide this OR credentials.
    es_url: Optional[str]
        URL of the Elasticsearch instance to connect to.
    es_cloud_id: Optional[str]
        Cloud ID of the Elasticsearch instance to connect to.
    es_user: Optional[str]
        Username to use when connecting to Elasticsearch.
    es_api_key: Optional[str]
        API key to use when connecting to Elasticsearch.
    es_password: Optional[str]
        Password to use when connecting to Elasticsearch.

Instantiate:
    ```python
    from langchain_elasticsearch import ElasticsearchRetriever

    def body_func(query: str) -> dict:
        return {"query": {"match": {"text": {"query": query}}}}

    retriever = ElasticsearchRetriever(
        index_name="langchain-demo",
        body_func=body_func,
        content_field="text",
        es_url="http://localhost:9200",
    )
    ```

Instantiate with API key (URL):
    ```python
    from langchain_elasticsearch import ElasticsearchRetriever

    def body_func(query: str) -> dict:
        return {"query": {"match": {"text": {"query": query}}}}

    retriever = ElasticsearchRetriever(
        index_name="langchain-demo",
        body_func=body_func,
        content_field="text",
        es_url="http://localhost:9200",
        es_api_key="your-api-key"
    )
    ```

Instantiate with username/password (URL):
    ```python
    from langchain_elasticsearch import ElasticsearchRetriever

    def body_func(query: str) -> dict:
        return {"query": {"match": {"text": {"query": query}}}}

    retriever = ElasticsearchRetriever(
        index_name="langchain-demo",
        body_func=body_func,
        content_field="text",
        es_url="http://localhost:9200",
        es_user="elastic",
        es_password="password"
    )
    ```

If you want to use a cloud hosted Elasticsearch instance, you can pass in the
es_cloud_id argument instead of the es_url argument.

Instantiate from cloud (with username/password):
    ```python
    from langchain_elasticsearch import ElasticsearchRetriever

    def body_func(query: str) -> dict:
        return {"query": {"match": {"text": {"query": query}}}}

    retriever = ElasticsearchRetriever(
        index_name="langchain-demo",
        body_func=body_func,
        content_field="text",
        es_cloud_id="<cloud_id>",
        es_user="elastic",
        es_password="<password>"
    )
    ```

Instantiate from cloud (with API key):
    ```python
    from langchain_elasticsearch import ElasticsearchRetriever

    def body_func(query: str) -> dict:
        return {"query": {"match": {"text": {"query": query}}}}

    retriever = ElasticsearchRetriever(
        index_name="langchain-demo",
        body_func=body_func,
        content_field="text",
        es_cloud_id="<cloud_id>",
        es_api_key="your-api-key"
    )
    ```

You can also connect to an existing Elasticsearch instance by passing in a
pre-existing Elasticsearch connection via the client argument.

Instantiate from existing connection:
    ```python
    from langchain_elasticsearch import ElasticsearchRetriever
    from elasticsearch import Elasticsearch

    def body_func(query: str) -> dict:
        return {"query": {"match": {"text": {"query": query}}}}

    client = Elasticsearch("http://localhost:9200")
    retriever = ElasticsearchRetriever(
        index_name="langchain-demo",
        body_func=body_func,
        content_field="text",
        client=client
    )
    ```

Retrieve documents:
    Note: Use `invoke()` or `ainvoke()` instead of the deprecated
    `get_relevant_documents()` or `aget_relevant_documents()` methods.

    First, index some documents:
    ```python
    from elasticsearch import Elasticsearch

    client = Elasticsearch("http://localhost:9200")

    # Index sample documents
    client.index(
        index="some-index",
        document={"text": "The quick brown fox jumps over the lazy dog"},
        id="1",
        refresh=True
    )
    client.index(
        index="some-index",
        document={"text": "Python is a popular programming language"},
        id="2",
        refresh=True
    )
    client.index(
        index="some-index",
        document={"text": "Elasticsearch is a search engine"},
        id="3",
        refresh=True
    )
    ```

    Then retrieve documents:
    ```python
    from langchain_elasticsearch import ElasticsearchRetriever

    def body_func(query: str) -> dict:
        return {"query": {"match": {"text": {"query": query}}}}

    retriever = ElasticsearchRetriever(
        index_name="some-index",
        body_func=body_func,
        content_field="text",
        es_url="http://localhost:9200"
    )

    # Retrieve documents
    documents = retriever.invoke("Python")
    for doc in documents:
        print(f"* {doc.page_content}")
    ```
    ```python
    * Python is a popular programming language
    ```



Use custom document mapper:
    ```python
    from langchain_elasticsearch import ElasticsearchRetriever
    from langchain_core.documents import Document
    from elasticsearch import Elasticsearch
    from typing import Mapping, Any

    def body_func(query: str) -> dict:
        return {"query": {"match": {"custom_field": {"query": query}}}}

    def custom_mapper(hit: Mapping[str, Any]) -> Document:
        # Custom logic to extract content and metadata
        return Document(
            page_content=hit["_source"]["custom_field"],
            metadata={"score": hit["_score"]}
        )

    client = Elasticsearch("http://localhost:9200")
    retriever = ElasticsearchRetriever(
        index_name="langchain-demo",
        body_func=body_func,
        document_mapper=custom_mapper,
        client=client
    )
    ```

Use with multiple indices:
    ```python
    from langchain_elasticsearch import ElasticsearchRetriever
    from elasticsearch import Elasticsearch

    def body_func(query: str) -> dict:
        return {
            "query": {
                "multi_match": {
                    "query": query,
                    "fields": ["text_field_1", "text_field_2"]
                }
            }
        }

    client = Elasticsearch("http://localhost:9200")
    retriever = ElasticsearchRetriever(
        index_name=["index1", "index2"],
        body_func=body_func,
        content_field={
            "index1": "text_field_1",
            "index2": "text_field_2"
        },
        client=client
    )
    ```

Use as LangChain retriever in chains:
    Note: Before running this example, ensure you have indexed documents
    in your Elasticsearch index. The retriever will search this index
    for relevant documents to use as context.

    ```python
    from langchain_elasticsearch import ElasticsearchRetriever
    from langchain_core.runnables import RunnablePassthrough
    from langchain_core.prompts import ChatPromptTemplate
    from langchain_ollama import ChatOllama

    # ElasticsearchRetriever is already a BaseRetriever
    retriever = ElasticsearchRetriever(
        index_name="some-index",
        body_func=lambda q: {"query": {"match": {"text": {"query": q}}}},
        content_field="text",
        es_url="http://localhost:9200"
    )

    llm = ChatOllama(model="llama3", temperature=0)

    # Create a chain that retrieves documents and then generates a response
    def format_docs(docs):
        # Format documents for the prompt
        return "

".join(doc.page_content for doc in docs)

    system_prompt = (
        "You are an assistant for question-answering tasks. "
        "Use the following pieces of retrieved context to answer "
        "the question. If you don't know the answer, say that you "
        "don't know. Use three sentences maximum and keep the "
        "answer concise."
        "

" "Context: {context}" )

    prompt = ChatPromptTemplate.from_messages([
        ("system", system_prompt),
        ("human", "{question}"),
    ])

    chain = (
        {"context": retriever | format_docs, "question": RunnablePassthrough()}
        | prompt
        | llm
    )

    result = chain.invoke("what is the answer to this question?")
    ```

For synchronous applications, use the `ElasticsearchRetriever` class.
For asynchronous applications, use the `AsyncElasticsearchRetriever` class.
METHOD DESCRIPTION
__init__
get_name

Get the name of the Runnable.

get_input_schema

Get a Pydantic model that can be used to validate input to the Runnable.

get_input_jsonschema

Get a JSON schema that represents the input to the Runnable.

get_output_schema

Get a Pydantic model that can be used to validate output to the Runnable.

get_output_jsonschema

Get a JSON schema that represents the output of the Runnable.

config_schema

The type of config this Runnable accepts specified as a Pydantic model.

get_config_jsonschema

Get a JSON schema that represents the config of the Runnable.

get_graph

Return a graph representation of this Runnable.

get_prompts

Return a list of prompts used by this Runnable.

__or__

Runnable "or" operator.

__ror__

Runnable "reverse-or" operator.

pipe

Pipe Runnable objects.

pick

Pick keys from the output dict of this Runnable.

assign

Assigns new fields to the dict output of this Runnable.

invoke

Invoke the retriever to get relevant documents.

ainvoke

Asynchronously invoke the retriever to get relevant documents.

batch

Default implementation runs invoke in parallel using a thread pool executor.

batch_as_completed

Run invoke in parallel on a list of inputs.

abatch

Default implementation runs ainvoke in parallel using asyncio.gather.

abatch_as_completed

Run ainvoke in parallel on a list of inputs.

stream

Default implementation of stream, which calls invoke.

astream

Default implementation of astream, which calls ainvoke.

astream_log

Stream all output from a Runnable, as reported to the callback system.

astream_events

Generate a stream of events.

transform

Transform inputs to outputs.

atransform

Transform inputs to outputs.

bind

Bind arguments to a Runnable, returning a new Runnable.

with_config

Bind config to a Runnable, returning a new Runnable.

with_listeners

Bind lifecycle listeners to a Runnable, returning a new Runnable.

with_alisteners

Bind async lifecycle listeners to a Runnable.

with_types

Bind input and output types to a Runnable, returning a new Runnable.

with_retry

Create a new Runnable that retries the original Runnable on exceptions.

map

Return a new Runnable that maps a list of inputs to a list of outputs.

with_fallbacks

Add fallbacks to a Runnable, returning a new Runnable.

as_tool

Create a BaseTool from a Runnable.

is_lc_serializable

Is this class serializable?

get_lc_namespace

Get the namespace of the LangChain object.

lc_id

Return a unique identifier for this class for serialization purposes.

to_json

Serialize the Runnable to JSON.

to_json_not_implemented

Serialize a "not implemented" object.

configurable_fields

Configure particular Runnable fields at runtime.

configurable_alternatives

Configure alternatives for Runnable objects that can be set at runtime.

name class-attribute instance-attribute

name: str | None = None

The name of the Runnable. Used for debugging and tracing.

InputType property

InputType: type[Input]

Input type.

The type of input this Runnable accepts specified as a type annotation.

RAISES DESCRIPTION
TypeError

If the input type cannot be inferred.

OutputType property

OutputType: type[Output]

Output Type.

The type of output this Runnable produces specified as a type annotation.

RAISES DESCRIPTION
TypeError

If the output type cannot be inferred.

input_schema property

input_schema: type[BaseModel]

The type of input this Runnable accepts specified as a Pydantic model.

output_schema property

output_schema: type[BaseModel]

Output schema.

The type of output this Runnable produces specified as a Pydantic model.

config_specs property

config_specs: list[ConfigurableFieldSpec]

List configurable fields for this Runnable.

lc_secrets property

lc_secrets: dict[str, str]

A map of constructor argument names to secret ids.

For example, {"openai_api_key": "OPENAI_API_KEY"}

lc_attributes property

lc_attributes: dict

List of attribute names that should be included in the serialized kwargs.

These attributes must be accepted by the constructor.

Default is an empty dictionary.

tags class-attribute instance-attribute

tags: list[str] | None = None

Optional list of tags associated with the retriever.

These tags will be associated with each call to this retriever, and passed as arguments to the handlers defined in callbacks.

You can use these to eg identify a specific instance of a retriever with its use case.

metadata class-attribute instance-attribute

metadata: dict[str, Any] | None = None

Optional metadata associated with the retriever.

This metadata will be associated with each call to this retriever, and passed as arguments to the handlers defined in callbacks.

You can use these to eg identify a specific instance of a retriever with its use case.

__init__

__init__(
    index_name: str | Sequence[str],
    body_func: Callable[[str], dict],
    *,
    content_field: str | Mapping[str, str] | None = None,
    document_mapper: Callable[[Mapping], Document] | None = None,
    client: AsyncElasticsearch | None = None,
    es_url: str | None = None,
    es_cloud_id: str | None = None,
    es_user: str | None = None,
    es_api_key: str | None = None,
    es_password: str | None = None,
) -> None

get_name

get_name(suffix: str | None = None, *, name: str | None = None) -> str

Get the name of the Runnable.

PARAMETER DESCRIPTION
suffix

An optional suffix to append to the name.

TYPE: str | None DEFAULT: None

name

An optional name to use instead of the Runnable's name.

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
str

The name of the Runnable.

get_input_schema

get_input_schema(config: RunnableConfig | None = None) -> type[BaseModel]

Get a Pydantic model that can be used to validate input to the Runnable.

Runnable objects that leverage the configurable_fields and configurable_alternatives methods will have a dynamic input schema that depends on which configuration the Runnable is invoked with.

This method allows to get an input schema for a specific configuration.

PARAMETER DESCRIPTION
config

A config to use when generating the schema.

TYPE: RunnableConfig | None DEFAULT: None

RETURNS DESCRIPTION
type[BaseModel]

A Pydantic model that can be used to validate input.

get_input_jsonschema

get_input_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]

Get a JSON schema that represents the input to the Runnable.

PARAMETER DESCRIPTION
config

A config to use when generating the schema.

TYPE: RunnableConfig | None DEFAULT: None

RETURNS DESCRIPTION
dict[str, Any]

A JSON schema that represents the input to the Runnable.

Example
from langchain_core.runnables import RunnableLambda


def add_one(x: int) -> int:
    return x + 1


runnable = RunnableLambda(add_one)

print(runnable.get_input_jsonschema())

Added in langchain-core 0.3.0

get_output_schema

get_output_schema(config: RunnableConfig | None = None) -> type[BaseModel]

Get a Pydantic model that can be used to validate output to the Runnable.

Runnable objects that leverage the configurable_fields and configurable_alternatives methods will have a dynamic output schema that depends on which configuration the Runnable is invoked with.

This method allows to get an output schema for a specific configuration.

PARAMETER DESCRIPTION
config

A config to use when generating the schema.

TYPE: RunnableConfig | None DEFAULT: None

RETURNS DESCRIPTION
type[BaseModel]

A Pydantic model that can be used to validate output.

get_output_jsonschema

get_output_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]

Get a JSON schema that represents the output of the Runnable.

PARAMETER DESCRIPTION
config

A config to use when generating the schema.

TYPE: RunnableConfig | None DEFAULT: None

RETURNS DESCRIPTION
dict[str, Any]

A JSON schema that represents the output of the Runnable.

Example
from langchain_core.runnables import RunnableLambda


def add_one(x: int) -> int:
    return x + 1


runnable = RunnableLambda(add_one)

print(runnable.get_output_jsonschema())

Added in langchain-core 0.3.0

config_schema

config_schema(*, include: Sequence[str] | None = None) -> type[BaseModel]

The type of config this Runnable accepts specified as a Pydantic model.

To mark a field as configurable, see the configurable_fields and configurable_alternatives methods.

PARAMETER DESCRIPTION
include

A list of fields to include in the config schema.

TYPE: Sequence[str] | None DEFAULT: None

RETURNS DESCRIPTION
type[BaseModel]

A Pydantic model that can be used to validate config.

get_config_jsonschema

get_config_jsonschema(*, include: Sequence[str] | None = None) -> dict[str, Any]

Get a JSON schema that represents the config of the Runnable.

PARAMETER DESCRIPTION
include

A list of fields to include in the config schema.

TYPE: Sequence[str] | None DEFAULT: None

RETURNS DESCRIPTION
dict[str, Any]

A JSON schema that represents the config of the Runnable.

Added in langchain-core 0.3.0

get_graph

get_graph(config: RunnableConfig | None = None) -> Graph

Return a graph representation of this Runnable.

get_prompts

get_prompts(config: RunnableConfig | None = None) -> list[BasePromptTemplate]

Return a list of prompts used by this Runnable.

__or__

__or__(
    other: Runnable[Any, Other]
    | Callable[[Iterator[Any]], Iterator[Other]]
    | Callable[[AsyncIterator[Any]], AsyncIterator[Other]]
    | Callable[[Any], Other]
    | Mapping[str, Runnable[Any, Other] | Callable[[Any], Other] | Any],
) -> RunnableSerializable[Input, Other]

Runnable "or" operator.

Compose this Runnable with another object to create a RunnableSequence.

PARAMETER DESCRIPTION
other

Another Runnable or a Runnable-like object.

TYPE: Runnable[Any, Other] | Callable[[Iterator[Any]], Iterator[Other]] | Callable[[AsyncIterator[Any]], AsyncIterator[Other]] | Callable[[Any], Other] | Mapping[str, Runnable[Any, Other] | Callable[[Any], Other] | Any]

RETURNS DESCRIPTION
RunnableSerializable[Input, Other]

A new Runnable.

__ror__

__ror__(
    other: Runnable[Other, Any]
    | Callable[[Iterator[Other]], Iterator[Any]]
    | Callable[[AsyncIterator[Other]], AsyncIterator[Any]]
    | Callable[[Other], Any]
    | Mapping[str, Runnable[Other, Any] | Callable[[Other], Any] | Any],
) -> RunnableSerializable[Other, Output]

Runnable "reverse-or" operator.

Compose this Runnable with another object to create a RunnableSequence.

PARAMETER DESCRIPTION
other

Another Runnable or a Runnable-like object.

TYPE: Runnable[Other, Any] | Callable[[Iterator[Other]], Iterator[Any]] | Callable[[AsyncIterator[Other]], AsyncIterator[Any]] | Callable[[Other], Any] | Mapping[str, Runnable[Other, Any] | Callable[[Other], Any] | Any]

RETURNS DESCRIPTION
RunnableSerializable[Other, Output]

A new Runnable.

pipe

pipe(
    *others: Runnable[Any, Other] | Callable[[Any], Other], name: str | None = None
) -> RunnableSerializable[Input, Other]

Pipe Runnable objects.

Compose this Runnable with Runnable-like objects to make a RunnableSequence.

Equivalent to RunnableSequence(self, *others) or self | others[0] | ...

Example
from langchain_core.runnables import RunnableLambda


def add_one(x: int) -> int:
    return x + 1


def mul_two(x: int) -> int:
    return x * 2


runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1.pipe(runnable_2)
# Or equivalently:
# sequence = runnable_1 | runnable_2
# sequence = RunnableSequence(first=runnable_1, last=runnable_2)
sequence.invoke(1)
await sequence.ainvoke(1)
# -> 4

sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
# -> [4, 6, 8]
PARAMETER DESCRIPTION
*others

Other Runnable or Runnable-like objects to compose

TYPE: Runnable[Any, Other] | Callable[[Any], Other] DEFAULT: ()

name

An optional name for the resulting RunnableSequence.

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
RunnableSerializable[Input, Other]

A new Runnable.

pick

pick(keys: str | list[str]) -> RunnableSerializable[Any, Any]

Pick keys from the output dict of this Runnable.

Pick a single key

import json

from langchain_core.runnables import RunnableLambda, RunnableMap

as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
chain = RunnableMap(str=as_str, json=as_json)

chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3]}

json_only_chain = chain.pick("json")
json_only_chain.invoke("[1, 2, 3]")
# -> [1, 2, 3]

Pick a list of keys

from typing import Any

import json

from langchain_core.runnables import RunnableLambda, RunnableMap

as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)


def as_bytes(x: Any) -> bytes:
    return bytes(x, "utf-8")


chain = RunnableMap(
    str=as_str, json=as_json, bytes=RunnableLambda(as_bytes)
)

chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3], "bytes": b"[1, 2, 3]"}

json_and_bytes_chain = chain.pick(["json", "bytes"])
json_and_bytes_chain.invoke("[1, 2, 3]")
# -> {"json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
PARAMETER DESCRIPTION
keys

A key or list of keys to pick from the output dict.

TYPE: str | list[str]

RETURNS DESCRIPTION
RunnableSerializable[Any, Any]

a new Runnable.

assign

Assigns new fields to the dict output of this Runnable.

from langchain_core.language_models.fake import FakeStreamingListLLM
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import SystemMessagePromptTemplate
from langchain_core.runnables import Runnable
from operator import itemgetter

prompt = (
    SystemMessagePromptTemplate.from_template("You are a nice assistant.")
    + "{question}"
)
model = FakeStreamingListLLM(responses=["foo-lish"])

chain: Runnable = prompt | model | {"str": StrOutputParser()}

chain_with_assign = chain.assign(hello=itemgetter("str") | model)

print(chain_with_assign.input_schema.model_json_schema())
# {'title': 'PromptInput', 'type': 'object', 'properties':
{'question': {'title': 'Question', 'type': 'string'}}}
print(chain_with_assign.output_schema.model_json_schema())
# {'title': 'RunnableSequenceOutput', 'type': 'object', 'properties':
{'str': {'title': 'Str',
'type': 'string'}, 'hello': {'title': 'Hello', 'type': 'string'}}}
PARAMETER DESCRIPTION
**kwargs

A mapping of keys to Runnable or Runnable-like objects that will be invoked with the entire output dict of this Runnable.

TYPE: Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any] | Mapping[str, Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any]] DEFAULT: {}

RETURNS DESCRIPTION
RunnableSerializable[Any, Any]

A new Runnable.

invoke

invoke(
    input: str, config: RunnableConfig | None = None, **kwargs: Any
) -> list[Document]

Invoke the retriever to get relevant documents.

Main entry point for synchronous retriever invocations.

PARAMETER DESCRIPTION
input

The query string.

TYPE: str

config

Configuration for the retriever.

TYPE: RunnableConfig | None DEFAULT: None

**kwargs

Additional arguments to pass to the retriever.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
list[Document]

List of relevant documents.

Examples:

retriever.invoke("query")

ainvoke async

ainvoke(
    input: str, config: RunnableConfig | None = None, **kwargs: Any
) -> list[Document]

Asynchronously invoke the retriever to get relevant documents.

Main entry point for asynchronous retriever invocations.

PARAMETER DESCRIPTION
input

The query string.

TYPE: str

config

Configuration for the retriever.

TYPE: RunnableConfig | None DEFAULT: None

**kwargs

Additional arguments to pass to the retriever.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
list[Document]

List of relevant documents.

Examples:

await retriever.ainvoke("query")

batch

batch(
    inputs: list[Input],
    config: RunnableConfig | list[RunnableConfig] | None = None,
    *,
    return_exceptions: bool = False,
    **kwargs: Any | None,
) -> list[Output]

Default implementation runs invoke in parallel using a thread pool executor.

The default implementation of batch works well for IO bound runnables.

Subclasses must override this method if they can batch more efficiently; e.g., if the underlying Runnable uses an API which supports a batch mode.

PARAMETER DESCRIPTION
inputs

A list of inputs to the Runnable.

TYPE: list[Input]

config

A config to use when invoking the Runnable. The config supports standard keys like 'tags', 'metadata' for tracing purposes, 'max_concurrency' for controlling how much work to do in parallel, and other keys.

Please refer to RunnableConfig for more details.

TYPE: RunnableConfig | list[RunnableConfig] | None DEFAULT: None

return_exceptions

Whether to return exceptions instead of raising them.

TYPE: bool DEFAULT: False

**kwargs

Additional keyword arguments to pass to the Runnable.

TYPE: Any | None DEFAULT: {}

RETURNS DESCRIPTION
list[Output]

A list of outputs from the Runnable.

batch_as_completed

batch_as_completed(
    inputs: Sequence[Input],
    config: RunnableConfig | Sequence[RunnableConfig] | None = None,
    *,
    return_exceptions: bool = False,
    **kwargs: Any | None,
) -> Iterator[tuple[int, Output | Exception]]

Run invoke in parallel on a list of inputs.

Yields results as they complete.

PARAMETER DESCRIPTION
inputs

A list of inputs to the Runnable.

TYPE: Sequence[Input]

config

A config to use when invoking the Runnable.

The config supports standard keys like 'tags', 'metadata' for tracing purposes, 'max_concurrency' for controlling how much work to do in parallel, and other keys.

Please refer to RunnableConfig for more details.

TYPE: RunnableConfig | Sequence[RunnableConfig] | None DEFAULT: None

return_exceptions

Whether to return exceptions instead of raising them.

TYPE: bool DEFAULT: False

**kwargs

Additional keyword arguments to pass to the Runnable.

TYPE: Any | None DEFAULT: {}

YIELDS DESCRIPTION
tuple[int, Output | Exception]

Tuples of the index of the input and the output from the Runnable.

abatch async

abatch(
    inputs: list[Input],
    config: RunnableConfig | list[RunnableConfig] | None = None,
    *,
    return_exceptions: bool = False,
    **kwargs: Any | None,
) -> list[Output]

Default implementation runs ainvoke in parallel using asyncio.gather.

The default implementation of batch works well for IO bound runnables.

Subclasses must override this method if they can batch more efficiently; e.g., if the underlying Runnable uses an API which supports a batch mode.

PARAMETER DESCRIPTION
inputs

A list of inputs to the Runnable.

TYPE: list[Input]

config

A config to use when invoking the Runnable.

The config supports standard keys like 'tags', 'metadata' for tracing purposes, 'max_concurrency' for controlling how much work to do in parallel, and other keys.

Please refer to RunnableConfig for more details.

TYPE: RunnableConfig | list[RunnableConfig] | None DEFAULT: None

return_exceptions

Whether to return exceptions instead of raising them.

TYPE: bool DEFAULT: False

**kwargs

Additional keyword arguments to pass to the Runnable.

TYPE: Any | None DEFAULT: {}

RETURNS DESCRIPTION
list[Output]

A list of outputs from the Runnable.

abatch_as_completed async

abatch_as_completed(
    inputs: Sequence[Input],
    config: RunnableConfig | Sequence[RunnableConfig] | None = None,
    *,
    return_exceptions: bool = False,
    **kwargs: Any | None,
) -> AsyncIterator[tuple[int, Output | Exception]]

Run ainvoke in parallel on a list of inputs.

Yields results as they complete.

PARAMETER DESCRIPTION
inputs

A list of inputs to the Runnable.

TYPE: Sequence[Input]

config

A config to use when invoking the Runnable.

The config supports standard keys like 'tags', 'metadata' for tracing purposes, 'max_concurrency' for controlling how much work to do in parallel, and other keys.

Please refer to RunnableConfig for more details.

TYPE: RunnableConfig | Sequence[RunnableConfig] | None DEFAULT: None

return_exceptions

Whether to return exceptions instead of raising them.

TYPE: bool DEFAULT: False

**kwargs

Additional keyword arguments to pass to the Runnable.

TYPE: Any | None DEFAULT: {}

YIELDS DESCRIPTION
AsyncIterator[tuple[int, Output | Exception]]

A tuple of the index of the input and the output from the Runnable.

stream

stream(
    input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> Iterator[Output]

Default implementation of stream, which calls invoke.

Subclasses must override this method if they support streaming output.

PARAMETER DESCRIPTION
input

The input to the Runnable.

TYPE: Input

config

The config to use for the Runnable.

TYPE: RunnableConfig | None DEFAULT: None

**kwargs

Additional keyword arguments to pass to the Runnable.

TYPE: Any | None DEFAULT: {}

YIELDS DESCRIPTION
Output

The output of the Runnable.

astream async

astream(
    input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> AsyncIterator[Output]

Default implementation of astream, which calls ainvoke.

Subclasses must override this method if they support streaming output.

PARAMETER DESCRIPTION
input

The input to the Runnable.

TYPE: Input

config

The config to use for the Runnable.

TYPE: RunnableConfig | None DEFAULT: None

**kwargs

Additional keyword arguments to pass to the Runnable.

TYPE: Any | None DEFAULT: {}

YIELDS DESCRIPTION
AsyncIterator[Output]

The output of the Runnable.

astream_log async

astream_log(
    input: Any,
    config: RunnableConfig | None = None,
    *,
    diff: bool = True,
    with_streamed_output_list: bool = True,
    include_names: Sequence[str] | None = None,
    include_types: Sequence[str] | None = None,
    include_tags: Sequence[str] | None = None,
    exclude_names: Sequence[str] | None = None,
    exclude_types: Sequence[str] | None = None,
    exclude_tags: Sequence[str] | None = None,
    **kwargs: Any,
) -> AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]

Stream all output from a Runnable, as reported to the callback system.

This includes all inner runs of LLMs, Retrievers, Tools, etc.

Output is streamed as Log objects, which include a list of Jsonpatch ops that describe how the state of the run has changed in each step, and the final state of the run.

The Jsonpatch ops can be applied in order to construct state.

PARAMETER DESCRIPTION
input

The input to the Runnable.

TYPE: Any

config

The config to use for the Runnable.

TYPE: RunnableConfig | None DEFAULT: None

diff

Whether to yield diffs between each step or the current state.

TYPE: bool DEFAULT: True

with_streamed_output_list

Whether to yield the streamed_output list.

TYPE: bool DEFAULT: True

include_names

Only include logs with these names.

TYPE: Sequence[str] | None DEFAULT: None

include_types

Only include logs with these types.

TYPE: Sequence[str] | None DEFAULT: None

include_tags

Only include logs with these tags.

TYPE: Sequence[str] | None DEFAULT: None

exclude_names

Exclude logs with these names.

TYPE: Sequence[str] | None DEFAULT: None

exclude_types

Exclude logs with these types.

TYPE: Sequence[str] | None DEFAULT: None

exclude_tags

Exclude logs with these tags.

TYPE: Sequence[str] | None DEFAULT: None

**kwargs

Additional keyword arguments to pass to the Runnable.

TYPE: Any DEFAULT: {}

YIELDS DESCRIPTION
AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]

A RunLogPatch or RunLog object.

astream_events async

astream_events(
    input: Any,
    config: RunnableConfig | None = None,
    *,
    version: Literal["v1", "v2"] = "v2",
    include_names: Sequence[str] | None = None,
    include_types: Sequence[str] | None = None,
    include_tags: Sequence[str] | None = None,
    exclude_names: Sequence[str] | None = None,
    exclude_types: Sequence[str] | None = None,
    exclude_tags: Sequence[str] | None = None,
    **kwargs: Any,
) -> AsyncIterator[StreamEvent]

Generate a stream of events.

Use to create an iterator over StreamEvent that provide real-time information about the progress of the Runnable, including StreamEvent from intermediate results.

A StreamEvent is a dictionary with the following schema:

  • event: Event names are of the format: on_[runnable_type]_(start|stream|end).
  • name: The name of the Runnable that generated the event.
  • run_id: Randomly generated ID associated with the given execution of the Runnable that emitted the event. A child Runnable that gets invoked as part of the execution of a parent Runnable is assigned its own unique ID.
  • parent_ids: The IDs of the parent runnables that generated the event. The root Runnable will have an empty list. The order of the parent IDs is from the root to the immediate parent. Only available for v2 version of the API. The v1 version of the API will return an empty list.
  • tags: The tags of the Runnable that generated the event.
  • metadata: The metadata of the Runnable that generated the event.
  • data: The data associated with the event. The contents of this field depend on the type of event. See the table below for more details.

Below is a table that illustrates some events that might be emitted by various chains. Metadata fields have been omitted from the table for brevity. Chain definitions have been included after the table.

Note

This reference table is for the v2 version of the schema.

event name chunk input output
on_chat_model_start '[model name]' {"messages": [[SystemMessage, HumanMessage]]}
on_chat_model_stream '[model name]' AIMessageChunk(content="hello")
on_chat_model_end '[model name]' {"messages": [[SystemMessage, HumanMessage]]} AIMessageChunk(content="hello world")
on_llm_start '[model name]' {'input': 'hello'}
on_llm_stream '[model name]' 'Hello'
on_llm_end '[model name]' 'Hello human!'
on_chain_start 'format_docs'
on_chain_stream 'format_docs' 'hello world!, goodbye world!'
on_chain_end 'format_docs' [Document(...)] 'hello world!, goodbye world!'
on_tool_start 'some_tool' {"x": 1, "y": "2"}
on_tool_end 'some_tool' {"x": 1, "y": "2"}
on_retriever_start '[retriever name]' {"query": "hello"}
on_retriever_end '[retriever name]' {"query": "hello"} [Document(...), ..]
on_prompt_start '[template_name]' {"question": "hello"}
on_prompt_end '[template_name]' {"question": "hello"} ChatPromptValue(messages: [SystemMessage, ...])

In addition to the standard events, users can also dispatch custom events (see example below).

Custom events will be only be surfaced with in the v2 version of the API!

A custom event has following format:

Attribute Type Description
name str A user defined name for the event.
data Any The data associated with the event. This can be anything, though we suggest making it JSON serializable.

Here are declarations associated with the standard events shown above:

format_docs:

def format_docs(docs: list[Document]) -> str:
    '''Format the docs.'''
    return ", ".join([doc.page_content for doc in docs])


format_docs = RunnableLambda(format_docs)

some_tool:

@tool
def some_tool(x: int, y: str) -> dict:
    '''Some_tool.'''
    return {"x": x, "y": y}

prompt:

template = ChatPromptTemplate.from_messages(
    [
        ("system", "You are Cat Agent 007"),
        ("human", "{question}"),
    ]
).with_config({"run_name": "my_template", "tags": ["my_template"]})

Example

from langchain_core.runnables import RunnableLambda


async def reverse(s: str) -> str:
    return s[::-1]


chain = RunnableLambda(func=reverse)

events = [
    event async for event in chain.astream_events("hello", version="v2")
]

# Will produce the following events
# (run_id, and parent_ids has been omitted for brevity):
[
    {
        "data": {"input": "hello"},
        "event": "on_chain_start",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
    {
        "data": {"chunk": "olleh"},
        "event": "on_chain_stream",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
    {
        "data": {"output": "olleh"},
        "event": "on_chain_end",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
]
Dispatch custom event
from langchain_core.callbacks.manager import (
    adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio


async def slow_thing(some_input: str, config: RunnableConfig) -> str:
    """Do something that takes a long time."""
    await asyncio.sleep(1) # Placeholder for some slow operation
    await adispatch_custom_event(
        "progress_event",
        {"message": "Finished step 1 of 3"},
        config=config # Must be included for python < 3.10
    )
    await asyncio.sleep(1) # Placeholder for some slow operation
    await adispatch_custom_event(
        "progress_event",
        {"message": "Finished step 2 of 3"},
        config=config # Must be included for python < 3.10
    )
    await asyncio.sleep(1) # Placeholder for some slow operation
    return "Done"

slow_thing = RunnableLambda(slow_thing)

async for event in slow_thing.astream_events("some_input", version="v2"):
    print(event)
PARAMETER DESCRIPTION
input

The input to the Runnable.

TYPE: Any

config

The config to use for the Runnable.

TYPE: RunnableConfig | None DEFAULT: None

version

The version of the schema to use, either 'v2' or 'v1'.

Users should use 'v2'.

'v1' is for backwards compatibility and will be deprecated in 0.4.0.

No default will be assigned until the API is stabilized. custom events will only be surfaced in 'v2'.

TYPE: Literal['v1', 'v2'] DEFAULT: 'v2'

include_names

Only include events from Runnable objects with matching names.

TYPE: Sequence[str] | None DEFAULT: None

include_types

Only include events from Runnable objects with matching types.

TYPE: Sequence[str] | None DEFAULT: None

include_tags

Only include events from Runnable objects with matching tags.

TYPE: Sequence[str] | None DEFAULT: None

exclude_names

Exclude events from Runnable objects with matching names.

TYPE: Sequence[str] | None DEFAULT: None

exclude_types

Exclude events from Runnable objects with matching types.

TYPE: Sequence[str] | None DEFAULT: None

exclude_tags

Exclude events from Runnable objects with matching tags.

TYPE: Sequence[str] | None DEFAULT: None

**kwargs

Additional keyword arguments to pass to the Runnable.

These will be passed to astream_log as this implementation of astream_events is built on top of astream_log.

TYPE: Any DEFAULT: {}

YIELDS DESCRIPTION
AsyncIterator[StreamEvent]

An async stream of StreamEvent.

RAISES DESCRIPTION
NotImplementedError

If the version is not 'v1' or 'v2'.

transform

transform(
    input: Iterator[Input], config: RunnableConfig | None = None, **kwargs: Any | None
) -> Iterator[Output]

Transform inputs to outputs.

Default implementation of transform, which buffers input and calls astream.

Subclasses must override this method if they can start producing output while input is still being generated.

PARAMETER DESCRIPTION
input

An iterator of inputs to the Runnable.

TYPE: Iterator[Input]

config

The config to use for the Runnable.

TYPE: RunnableConfig | None DEFAULT: None

**kwargs

Additional keyword arguments to pass to the Runnable.

TYPE: Any | None DEFAULT: {}

YIELDS DESCRIPTION
Output

The output of the Runnable.

atransform async

atransform(
    input: AsyncIterator[Input],
    config: RunnableConfig | None = None,
    **kwargs: Any | None,
) -> AsyncIterator[Output]

Transform inputs to outputs.

Default implementation of atransform, which buffers input and calls astream.

Subclasses must override this method if they can start producing output while input is still being generated.

PARAMETER DESCRIPTION
input

An async iterator of inputs to the Runnable.

TYPE: AsyncIterator[Input]

config

The config to use for the Runnable.

TYPE: RunnableConfig | None DEFAULT: None

**kwargs

Additional keyword arguments to pass to the Runnable.

TYPE: Any | None DEFAULT: {}

YIELDS DESCRIPTION
AsyncIterator[Output]

The output of the Runnable.

bind

bind(**kwargs: Any) -> Runnable[Input, Output]

Bind arguments to a Runnable, returning a new Runnable.

Useful when a Runnable in a chain requires an argument that is not in the output of the previous Runnable or included in the user input.

PARAMETER DESCRIPTION
**kwargs

The arguments to bind to the Runnable.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
Runnable[Input, Output]

A new Runnable with the arguments bound.

Example
from langchain_ollama import ChatOllama
from langchain_core.output_parsers import StrOutputParser

model = ChatOllama(model="llama3.1")

# Without bind
chain = model | StrOutputParser()

chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two three four five.'

# With bind
chain = model.bind(stop=["three"]) | StrOutputParser()

chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two'

with_config

with_config(
    config: RunnableConfig | None = None, **kwargs: Any
) -> Runnable[Input, Output]

Bind config to a Runnable, returning a new Runnable.

PARAMETER DESCRIPTION
config

The config to bind to the Runnable.

TYPE: RunnableConfig | None DEFAULT: None

**kwargs

Additional keyword arguments to pass to the Runnable.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
Runnable[Input, Output]

A new Runnable with the config bound.

with_listeners

with_listeners(
    *,
    on_start: Callable[[Run], None]
    | Callable[[Run, RunnableConfig], None]
    | None = None,
    on_end: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None,
    on_error: Callable[[Run], None]
    | Callable[[Run, RunnableConfig], None]
    | None = None,
) -> Runnable[Input, Output]

Bind lifecycle listeners to a Runnable, returning a new Runnable.

The Run object contains information about the run, including its id, type, input, output, error, start_time, end_time, and any tags or metadata added to the run.

PARAMETER DESCRIPTION
on_start

Called before the Runnable starts running, with the Run object.

TYPE: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None DEFAULT: None

on_end

Called after the Runnable finishes running, with the Run object.

TYPE: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None DEFAULT: None

on_error

Called if the Runnable throws an error, with the Run object.

TYPE: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None DEFAULT: None

RETURNS DESCRIPTION
Runnable[Input, Output]

A new Runnable with the listeners bound.

Example
from langchain_core.runnables import RunnableLambda
from langchain_core.tracers.schemas import Run

import time


def test_runnable(time_to_sleep: int):
    time.sleep(time_to_sleep)


def fn_start(run_obj: Run):
    print("start_time:", run_obj.start_time)


def fn_end(run_obj: Run):
    print("end_time:", run_obj.end_time)


chain = RunnableLambda(test_runnable).with_listeners(
    on_start=fn_start, on_end=fn_end
)
chain.invoke(2)

with_alisteners

with_alisteners(
    *,
    on_start: AsyncListener | None = None,
    on_end: AsyncListener | None = None,
    on_error: AsyncListener | None = None,
) -> Runnable[Input, Output]

Bind async lifecycle listeners to a Runnable.

Returns a new Runnable.

The Run object contains information about the run, including its id, type, input, output, error, start_time, end_time, and any tags or metadata added to the run.

PARAMETER DESCRIPTION
on_start

Called asynchronously before the Runnable starts running, with the Run object.

TYPE: AsyncListener | None DEFAULT: None

on_end

Called asynchronously after the Runnable finishes running, with the Run object.

TYPE: AsyncListener | None DEFAULT: None

on_error

Called asynchronously if the Runnable throws an error, with the Run object.

TYPE: AsyncListener | None DEFAULT: None

RETURNS DESCRIPTION
Runnable[Input, Output]

A new Runnable with the listeners bound.

Example
from langchain_core.runnables import RunnableLambda, Runnable
from datetime import datetime, timezone
import time
import asyncio


def format_t(timestamp: float) -> str:
    return datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()


async def test_runnable(time_to_sleep: int):
    print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}")
    await asyncio.sleep(time_to_sleep)
    print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}")


async def fn_start(run_obj: Runnable):
    print(f"on start callback starts at {format_t(time.time())}")
    await asyncio.sleep(3)
    print(f"on start callback ends at {format_t(time.time())}")


async def fn_end(run_obj: Runnable):
    print(f"on end callback starts at {format_t(time.time())}")
    await asyncio.sleep(2)
    print(f"on end callback ends at {format_t(time.time())}")


runnable = RunnableLambda(test_runnable).with_alisteners(
    on_start=fn_start, on_end=fn_end
)


async def concurrent_runs():
    await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3))


asyncio.run(concurrent_runs())
# Result:
# on start callback starts at 2025-03-01T07:05:22.875378+00:00
# on start callback starts at 2025-03-01T07:05:22.875495+00:00
# on start callback ends at 2025-03-01T07:05:25.878862+00:00
# on start callback ends at 2025-03-01T07:05:25.878947+00:00
# Runnable[2s]: starts at 2025-03-01T07:05:25.879392+00:00
# Runnable[3s]: starts at 2025-03-01T07:05:25.879804+00:00
# Runnable[2s]: ends at 2025-03-01T07:05:27.881998+00:00
# on end callback starts at 2025-03-01T07:05:27.882360+00:00
# Runnable[3s]: ends at 2025-03-01T07:05:28.881737+00:00
# on end callback starts at 2025-03-01T07:05:28.882428+00:00
# on end callback ends at 2025-03-01T07:05:29.883893+00:00
# on end callback ends at 2025-03-01T07:05:30.884831+00:00

with_types

with_types(
    *, input_type: type[Input] | None = None, output_type: type[Output] | None = None
) -> Runnable[Input, Output]

Bind input and output types to a Runnable, returning a new Runnable.

PARAMETER DESCRIPTION
input_type

The input type to bind to the Runnable.

TYPE: type[Input] | None DEFAULT: None

output_type

The output type to bind to the Runnable.

TYPE: type[Output] | None DEFAULT: None

RETURNS DESCRIPTION
Runnable[Input, Output]

A new Runnable with the types bound.

with_retry

with_retry(
    *,
    retry_if_exception_type: tuple[type[BaseException], ...] = (Exception,),
    wait_exponential_jitter: bool = True,
    exponential_jitter_params: ExponentialJitterParams | None = None,
    stop_after_attempt: int = 3,
) -> Runnable[Input, Output]

Create a new Runnable that retries the original Runnable on exceptions.

PARAMETER DESCRIPTION
retry_if_exception_type

A tuple of exception types to retry on.

TYPE: tuple[type[BaseException], ...] DEFAULT: (Exception,)

wait_exponential_jitter

Whether to add jitter to the wait time between retries.

TYPE: bool DEFAULT: True

stop_after_attempt

The maximum number of attempts to make before giving up.

TYPE: int DEFAULT: 3

exponential_jitter_params

Parameters for tenacity.wait_exponential_jitter. Namely: initial, max, exp_base, and jitter (all float values).

TYPE: ExponentialJitterParams | None DEFAULT: None

RETURNS DESCRIPTION
Runnable[Input, Output]

A new Runnable that retries the original Runnable on exceptions.

Example
from langchain_core.runnables import RunnableLambda

count = 0


def _lambda(x: int) -> None:
    global count
    count = count + 1
    if x == 1:
        raise ValueError("x is 1")
    else:
        pass


runnable = RunnableLambda(_lambda)
try:
    runnable.with_retry(
        stop_after_attempt=2,
        retry_if_exception_type=(ValueError,),
    ).invoke(1)
except ValueError:
    pass

assert count == 2

map

map() -> Runnable[list[Input], list[Output]]

Return a new Runnable that maps a list of inputs to a list of outputs.

Calls invoke with each input.

RETURNS DESCRIPTION
Runnable[list[Input], list[Output]]

A new Runnable that maps a list of inputs to a list of outputs.

Example
from langchain_core.runnables import RunnableLambda


def _lambda(x: int) -> int:
    return x + 1


runnable = RunnableLambda(_lambda)
print(runnable.map().invoke([1, 2, 3]))  # [2, 3, 4]

with_fallbacks

with_fallbacks(
    fallbacks: Sequence[Runnable[Input, Output]],
    *,
    exceptions_to_handle: tuple[type[BaseException], ...] = (Exception,),
    exception_key: str | None = None,
) -> RunnableWithFallbacks[Input, Output]

Add fallbacks to a Runnable, returning a new Runnable.

The new Runnable will try the original Runnable, and then each fallback in order, upon failures.

PARAMETER DESCRIPTION
fallbacks

A sequence of runnables to try if the original Runnable fails.

TYPE: Sequence[Runnable[Input, Output]]

exceptions_to_handle

A tuple of exception types to handle.

TYPE: tuple[type[BaseException], ...] DEFAULT: (Exception,)

exception_key

If string is specified then handled exceptions will be passed to fallbacks as part of the input under the specified key.

If None, exceptions will not be passed to fallbacks.

If used, the base Runnable and its fallbacks must accept a dictionary as input.

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
RunnableWithFallbacks[Input, Output]

A new Runnable that will try the original Runnable, and then each Fallback in order, upon failures.

Example
from typing import Iterator

from langchain_core.runnables import RunnableGenerator


def _generate_immediate_error(input: Iterator) -> Iterator[str]:
    raise ValueError()
    yield ""


def _generate(input: Iterator) -> Iterator[str]:
    yield from "foo bar"


runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks(
    [RunnableGenerator(_generate)]
)
print("".join(runnable.stream({})))  # foo bar
PARAMETER DESCRIPTION
fallbacks

A sequence of runnables to try if the original Runnable fails.

TYPE: Sequence[Runnable[Input, Output]]

exceptions_to_handle

A tuple of exception types to handle.

TYPE: tuple[type[BaseException], ...] DEFAULT: (Exception,)

exception_key

If string is specified then handled exceptions will be passed to fallbacks as part of the input under the specified key.

If None, exceptions will not be passed to fallbacks.

If used, the base Runnable and its fallbacks must accept a dictionary as input.

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
RunnableWithFallbacks[Input, Output]

A new Runnable that will try the original Runnable, and then each Fallback in order, upon failures.

as_tool

as_tool(
    args_schema: type[BaseModel] | None = None,
    *,
    name: str | None = None,
    description: str | None = None,
    arg_types: dict[str, type] | None = None,
) -> BaseTool

Create a BaseTool from a Runnable.

as_tool will instantiate a BaseTool with a name, description, and args_schema from a Runnable. Where possible, schemas are inferred from runnable.get_input_schema.

Alternatively (e.g., if the Runnable takes a dict as input and the specific dict keys are not typed), the schema can be specified directly with args_schema.

You can also pass arg_types to just specify the required arguments and their types.

PARAMETER DESCRIPTION
args_schema

The schema for the tool.

TYPE: type[BaseModel] | None DEFAULT: None

name

The name of the tool.

TYPE: str | None DEFAULT: None

description

The description of the tool.

TYPE: str | None DEFAULT: None

arg_types

A dictionary of argument names to types.

TYPE: dict[str, type] | None DEFAULT: None

RETURNS DESCRIPTION
BaseTool

A BaseTool instance.

TypedDict input

from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda


class Args(TypedDict):
    a: int
    b: list[int]


def f(x: Args) -> str:
    return str(x["a"] * max(x["b"]))


runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})

dict input, specifying schema via args_schema

from typing import Any
from pydantic import BaseModel, Field
from langchain_core.runnables import RunnableLambda

def f(x: dict[str, Any]) -> str:
    return str(x["a"] * max(x["b"]))

class FSchema(BaseModel):
    """Apply a function to an integer and list of integers."""

    a: int = Field(..., description="Integer")
    b: list[int] = Field(..., description="List of ints")

runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})

dict input, specifying schema via arg_types

from typing import Any
from langchain_core.runnables import RunnableLambda


def f(x: dict[str, Any]) -> str:
    return str(x["a"] * max(x["b"]))


runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": list[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})

str input

from langchain_core.runnables import RunnableLambda


def f(x: str) -> str:
    return x + "a"


def g(x: str) -> str:
    return x + "z"


runnable = RunnableLambda(f) | g
as_tool = runnable.as_tool()
as_tool.invoke("b")

is_lc_serializable classmethod

is_lc_serializable() -> bool

Is this class serializable?

By design, even if a class inherits from Serializable, it is not serializable by default. This is to prevent accidental serialization of objects that should not be serialized.

RETURNS DESCRIPTION
bool

Whether the class is serializable. Default is False.

get_lc_namespace classmethod

get_lc_namespace() -> list[str]

Get the namespace of the LangChain object.

For example, if the class is langchain.llms.openai.OpenAI, then the namespace is ["langchain", "llms", "openai"]

RETURNS DESCRIPTION
list[str]

The namespace.

lc_id classmethod

lc_id() -> list[str]

Return a unique identifier for this class for serialization purposes.

The unique identifier is a list of strings that describes the path to the object.

For example, for the class langchain.llms.openai.OpenAI, the id is ["langchain", "llms", "openai", "OpenAI"].

to_json

to_json() -> SerializedConstructor | SerializedNotImplemented

Serialize the Runnable to JSON.

RETURNS DESCRIPTION
SerializedConstructor | SerializedNotImplemented

A JSON-serializable representation of the Runnable.

to_json_not_implemented

to_json_not_implemented() -> SerializedNotImplemented

Serialize a "not implemented" object.

RETURNS DESCRIPTION
SerializedNotImplemented

SerializedNotImplemented.

configurable_fields

configurable_fields(
    **kwargs: AnyConfigurableField,
) -> RunnableSerializable[Input, Output]

Configure particular Runnable fields at runtime.

PARAMETER DESCRIPTION
**kwargs

A dictionary of ConfigurableField instances to configure.

TYPE: AnyConfigurableField DEFAULT: {}

RAISES DESCRIPTION
ValueError

If a configuration key is not found in the Runnable.

RETURNS DESCRIPTION
RunnableSerializable[Input, Output]

A new Runnable with the fields configured.

Example

from langchain_core.runnables import ConfigurableField
from langchain_openai import ChatOpenAI

model = ChatOpenAI(max_tokens=20).configurable_fields(
    max_tokens=ConfigurableField(
        id="output_token_number",
        name="Max tokens in the output",
        description="The maximum number of tokens in the output",
    )
)

# max_tokens = 20
print(
    "max_tokens_20: ", model.invoke("tell me something about chess").content
)

# max_tokens = 200
print(
    "max_tokens_200: ",
    model.with_config(configurable={"output_token_number": 200})
    .invoke("tell me something about chess")
    .content,
)

configurable_alternatives

configurable_alternatives(
    which: ConfigurableField,
    *,
    default_key: str = "default",
    prefix_keys: bool = False,
    **kwargs: Runnable[Input, Output] | Callable[[], Runnable[Input, Output]],
) -> RunnableSerializable[Input, Output]

Configure alternatives for Runnable objects that can be set at runtime.

PARAMETER DESCRIPTION
which

The ConfigurableField instance that will be used to select the alternative.

TYPE: ConfigurableField

default_key

The default key to use if no alternative is selected.

TYPE: str DEFAULT: 'default'

prefix_keys

Whether to prefix the keys with the ConfigurableField id.

TYPE: bool DEFAULT: False

**kwargs

A dictionary of keys to Runnable instances or callables that return Runnable instances.

TYPE: Runnable[Input, Output] | Callable[[], Runnable[Input, Output]] DEFAULT: {}

RETURNS DESCRIPTION
RunnableSerializable[Input, Output]

A new Runnable with the alternatives configured.

Example

from langchain_anthropic import ChatAnthropic
from langchain_core.runnables.utils import ConfigurableField
from langchain_openai import ChatOpenAI

model = ChatAnthropic(
    model_name="claude-sonnet-4-5-20250929"
).configurable_alternatives(
    ConfigurableField(id="llm"),
    default_key="anthropic",
    openai=ChatOpenAI(),
)

# uses the default model ChatAnthropic
print(model.invoke("which organization created you?").content)

# uses ChatOpenAI
print(
    model.with_config(configurable={"llm": "openai"})
    .invoke("which organization created you?")
    .content
)

AsyncElasticsearchCache

Bases: BaseCache

Elasticsearch LLM cache.

Caches LLM responses in Elasticsearch to avoid repeated calls for identical prompts.

Setup

Install langchain_elasticsearch and start Elasticsearch locally using the start-local script.

pip install -qU langchain_elasticsearch
curl -fsSL https://elastic.co/start-local | sh

This will create an elastic-start-local folder. To start Elasticsearch and Kibana:

cd elastic-start-local
./start.sh

Elasticsearch will be available at http://localhost:9200. The password for the elastic user and API key are stored in the .env file in the elastic-start-local folder.

Key init args

index_name: str The name of the index or alias to use for the cache. store_input: bool Whether to store the LLM input (prompt) in the cache. Default True. store_input_params: bool Whether to store the LLM parameters in the cache. Default True. metadata: Optional[Dict[str, Any]] Additional metadata to store in the cache for filtering.

Key init args — client params: client: Optional[AsyncElasticsearch or Elasticsearch] Pre-existing Elasticsearch connection. Either provide this OR credentials. es_url: Optional[str] URL of the Elasticsearch instance to connect to. es_cloud_id: Optional[str] Cloud ID of the Elasticsearch instance to connect to. es_user: Optional[str] Username to use when connecting to Elasticsearch. es_api_key: Optional[str] API key to use when connecting to Elasticsearch. es_password: Optional[str] Password to use when connecting to Elasticsearch.

Instantiate
from langchain_elasticsearch import ElasticsearchCache

cache = ElasticsearchCache(
    index_name="llm-cache",
    es_url="http://localhost:9200"
)
Instantiate with API key
from langchain_elasticsearch import ElasticsearchCache

cache = ElasticsearchCache(
    index_name="llm-cache",
    es_url="http://localhost:9200",
    es_api_key="your-api-key"
)
Instantiate from cloud
from langchain_elasticsearch import ElasticsearchCache

cache = ElasticsearchCache(
    index_name="llm-cache",
    es_cloud_id="<cloud_id>",
    es_api_key="your-api-key"
)
Instantiate from existing connection
from langchain_elasticsearch import ElasticsearchCache
from elasticsearch import Elasticsearch

client = Elasticsearch("http://localhost:9200")
cache = ElasticsearchCache(
    index_name="llm-cache",
    client=client
)
Use with LangChain
from langchain.globals import set_llm_cache
from langchain_elasticsearch import ElasticsearchCache

set_llm_cache(ElasticsearchCache(
    index_name="llm-cache",
    es_url="http://localhost:9200"
))

For synchronous applications, use the ElasticsearchCache class. For asynchronous applications, use the AsyncElasticsearchCache class.

METHOD DESCRIPTION
__init__

Initialize the Elasticsearch cache store.

alookup

Look up based on prompt and llm_string.

build_document

Build the Elasticsearch document for storing a single LLM interaction

aupdate

Update based on prompt and llm_string.

aclear

Clear cache.

lookup

Look up based on prompt and llm_string.

update

Update cache based on prompt and llm_string.

clear

Clear cache that can take additional keyword arguments.

mapping cached property

mapping: dict[str, Any]

Get the default mapping for the index.

__init__

__init__(
    index_name: str,
    *,
    store_input: bool = True,
    store_input_params: bool = True,
    metadata: dict[str, Any] | None = None,
    client: AsyncElasticsearch | None = None,
    es_url: str | None = None,
    es_cloud_id: str | None = None,
    es_user: str | None = None,
    es_api_key: str | None = None,
    es_password: str | None = None,
)

Initialize the Elasticsearch cache store.

PARAMETER DESCRIPTION
index_name

The name of the index or alias to use for the cache. If it doesn't exist, an index is created according to the default mapping.

TYPE: str

store_input

Whether to store the LLM input (prompt) in the cache. Default True.

TYPE: bool DEFAULT: True

store_input_params

Whether to store the LLM parameters in the cache. Default True.

TYPE: bool DEFAULT: True

metadata

Additional metadata to store in the cache for filtering. Must be JSON serializable.

TYPE: dict DEFAULT: None

client

Pre-existing Elasticsearch connection. Either provide this OR credentials.

TYPE: AsyncElasticsearch DEFAULT: None

es_url

URL of the Elasticsearch instance.

TYPE: str DEFAULT: None

es_cloud_id

Cloud ID of the Elasticsearch instance.

TYPE: str DEFAULT: None

es_user

Username for Elasticsearch.

TYPE: str DEFAULT: None

es_api_key

API key for Elasticsearch.

TYPE: str DEFAULT: None

es_password

Password for Elasticsearch.

TYPE: str DEFAULT: None

alookup async

alookup(prompt: str, llm_string: str) -> RETURN_VAL_TYPE | None

Look up based on prompt and llm_string.

build_document

build_document(
    prompt: str, llm_string: str, return_val: RETURN_VAL_TYPE
) -> dict[str, Any]

Build the Elasticsearch document for storing a single LLM interaction

aupdate async

aupdate(prompt: str, llm_string: str, return_val: RETURN_VAL_TYPE) -> None

Update based on prompt and llm_string.

aclear async

aclear(**kwargs: Any) -> None

Clear cache.

lookup abstractmethod

lookup(prompt: str, llm_string: str) -> RETURN_VAL_TYPE | None

Look up based on prompt and llm_string.

A cache implementation is expected to generate a key from the 2-tuple of prompt and llm_string (e.g., by concatenating them with a delimiter).

PARAMETER DESCRIPTION
prompt

A string representation of the prompt. In the case of a chat model, the prompt is a non-trivial serialization of the prompt into the language model.

TYPE: str

llm_string

A string representation of the LLM configuration.

This is used to capture the invocation parameters of the LLM (e.g., model name, temperature, stop tokens, max tokens, etc.).

These invocation parameters are serialized into a string representation.

TYPE: str

RETURNS DESCRIPTION
RETURN_VAL_TYPE | None

On a cache miss, return None. On a cache hit, return the cached value.

RETURN_VAL_TYPE | None

The cached value is a list of Generation (or subclasses).

update abstractmethod

update(prompt: str, llm_string: str, return_val: RETURN_VAL_TYPE) -> None

Update cache based on prompt and llm_string.

The prompt and llm_string are used to generate a key for the cache. The key should match that of the lookup method.

PARAMETER DESCRIPTION
prompt

A string representation of the prompt. In the case of a chat model, the prompt is a non-trivial serialization of the prompt into the language model.

TYPE: str

llm_string

A string representation of the LLM configuration.

This is used to capture the invocation parameters of the LLM (e.g., model name, temperature, stop tokens, max tokens, etc.).

These invocation parameters are serialized into a string representation.

TYPE: str

return_val

The value to be cached. The value is a list of Generation (or subclasses).

TYPE: RETURN_VAL_TYPE

clear abstractmethod

clear(**kwargs: Any) -> None

Clear cache that can take additional keyword arguments.

AsyncElasticsearchEmbeddingsCache

Bases: ByteStore

Elasticsearch embeddings cache.

Caches embeddings in Elasticsearch to avoid repeated embedding computations.

Setup

Install langchain_elasticsearch and start Elasticsearch locally using the start-local script.

pip install -qU langchain_elasticsearch
curl -fsSL https://elastic.co/start-local | sh

This will create an elastic-start-local folder. To start Elasticsearch and Kibana:

cd elastic-start-local
./start.sh

Elasticsearch will be available at http://localhost:9200. The password for the elastic user and API key are stored in the .env file in the elastic-start-local folder.

Key init args

index_name: str The name of the index or alias to use for the cache. store_input: bool Whether to store the input text in the cache. Default True. metadata: Optional[Dict[str, Any]] Additional metadata to store in the cache for filtering. namespace: Optional[str] A namespace to organize the cache. maximum_duplicates_allowed: int Maximum duplicate keys permitted when using aliases. Default 1.

Key init args — client params: client: Optional[AsyncElasticsearch or Elasticsearch] Pre-existing Elasticsearch connection. Either provide this OR credentials. es_url: Optional[str] URL of the Elasticsearch instance to connect to. es_cloud_id: Optional[str] Cloud ID of the Elasticsearch instance to connect to. es_user: Optional[str] Username to use when connecting to Elasticsearch. es_api_key: Optional[str] API key to use when connecting to Elasticsearch. es_password: Optional[str] Password to use when connecting to Elasticsearch.

Instantiate
from langchain_elasticsearch import ElasticsearchEmbeddingsCache

cache = ElasticsearchEmbeddingsCache(
    index_name="embeddings-cache",
    es_url="http://localhost:9200"
)
Instantiate with API key
from langchain_elasticsearch import ElasticsearchEmbeddingsCache

cache = ElasticsearchEmbeddingsCache(
    index_name="embeddings-cache",
    es_url="http://localhost:9200",
    es_api_key="your-api-key"
)
Instantiate from cloud
from langchain_elasticsearch import ElasticsearchEmbeddingsCache

cache = ElasticsearchEmbeddingsCache(
    index_name="embeddings-cache",
    es_cloud_id="<cloud_id>",
    es_api_key="your-api-key"
)
Instantiate from existing connection
from langchain_elasticsearch import ElasticsearchEmbeddingsCache
from elasticsearch import Elasticsearch

client = Elasticsearch("http://localhost:9200")
cache = ElasticsearchEmbeddingsCache(
    index_name="embeddings-cache",
    client=client
)
Use with CacheBackedEmbeddings
from langchain.embeddings import CacheBackedEmbeddings
from langchain_openai import OpenAIEmbeddings
from langchain_elasticsearch import ElasticsearchEmbeddingsCache

underlying_embeddings = OpenAIEmbeddings()
cache = ElasticsearchEmbeddingsCache(
    index_name="embeddings-cache",
    es_url="http://localhost:9200"
)
cached_embeddings = CacheBackedEmbeddings.from_bytes_store(
    underlying_embeddings,
    cache,
    namespace=underlying_embeddings.model
)

For synchronous applications, use the ElasticsearchEmbeddingsCache class. For asynchronous applications, use the AsyncElasticsearchEmbeddingsCache class.

METHOD DESCRIPTION
__init__

Initialize the Elasticsearch embeddings cache store.

encode_vector

Encode the vector data as bytes to as a base64 string.

decode_vector

Decode the base64 string to vector data as bytes.

amget

Get the values associated with the given keys.

build_document

Build the Elasticsearch document for storing a single embedding

amset

Set the values for the given keys.

amdelete

Delete the given keys and their associated values.

ayield_keys

Get an iterator over keys that match the given prefix.

mapping cached property

mapping: dict[str, Any]

Get the default mapping for the index.

__init__

__init__(
    index_name: str,
    *,
    store_input: bool = True,
    metadata: dict[str, Any] | None = None,
    namespace: str | None = None,
    maximum_duplicates_allowed: int = 1,
    client: AsyncElasticsearch | None = None,
    es_url: str | None = None,
    es_cloud_id: str | None = None,
    es_user: str | None = None,
    es_api_key: str | None = None,
    es_password: str | None = None,
)

Initialize the Elasticsearch embeddings cache store.

PARAMETER DESCRIPTION
index_name

The name of the index or alias to use for the cache. If it doesn't exist, an index is created according to the default mapping.

TYPE: str

store_input

Whether to store the input text in the cache. Default True.

TYPE: bool DEFAULT: True

metadata

Additional metadata to store in the cache for filtering. Must be JSON serializable.

TYPE: dict DEFAULT: None

namespace

A namespace to organize the cache.

TYPE: str DEFAULT: None

maximum_duplicates_allowed

Maximum duplicate keys permitted when using aliases across multiple indices. Default 1.

TYPE: int DEFAULT: 1

client

Pre-existing Elasticsearch connection. Either provide this OR credentials.

TYPE: AsyncElasticsearch DEFAULT: None

es_url

URL of the Elasticsearch instance.

TYPE: str DEFAULT: None

es_cloud_id

Cloud ID of the Elasticsearch instance.

TYPE: str DEFAULT: None

es_user

Username for Elasticsearch.

TYPE: str DEFAULT: None

es_api_key

API key for Elasticsearch.

TYPE: str DEFAULT: None

es_password

Password for Elasticsearch.

TYPE: str DEFAULT: None

encode_vector staticmethod

encode_vector(data: bytes) -> str

Encode the vector data as bytes to as a base64 string.

decode_vector staticmethod

decode_vector(data: str) -> bytes

Decode the base64 string to vector data as bytes.

amget async

amget(keys: Sequence[str]) -> list[bytes | None]

Get the values associated with the given keys.

build_document

build_document(text_input: str, vector: bytes) -> dict[str, Any]

Build the Elasticsearch document for storing a single embedding

amset async

amset(key_value_pairs: Sequence[tuple[str, bytes]]) -> None

Set the values for the given keys.

amdelete async

amdelete(keys: Sequence[str]) -> None

Delete the given keys and their associated values.

ayield_keys async

ayield_keys(*, prefix: str | None = None) -> AsyncIterator[str]

Get an iterator over keys that match the given prefix.