Skip to content

Commit

Permalink
Update ElasticSearchDocumentStore to use latest haystack-ai versi…
Browse files Browse the repository at this point in the history
…on (#63)

* Update haystack version

* Update imports

* Support new filters

* Update ElasticSearchDocumentStore

* Update tests

* Fix corner cases when filter value is None

* Convert legacy filters if used

* Fix linting
  • Loading branch information
silvanocerza authored Nov 29, 2023
1 parent 1591c01 commit be89825
Show file tree
Hide file tree
Showing 9 changed files with 428 additions and 503 deletions.
4 changes: 1 addition & 3 deletions integrations/elasticsearch/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@ classifiers = [
"Programming Language :: Python :: Implementation :: PyPy",
]
dependencies = [
# we distribute the preview version of Haystack 2.0 under the package "haystack-ai"
"haystack-ai==0.143.0",
"haystack-ai",
"elasticsearch>=8,<9",
"typing_extensions", # This is not a direct dependency, but `haystack-ai` is missing it cause `canals` is missing it
]

[project.urls]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
# SPDX-License-Identifier: Apache-2.0
from typing import Any, Dict, List, Optional

from haystack.preview import component, default_from_dict, default_to_dict
from haystack.preview.dataclasses import Document
from haystack import component, default_from_dict, default_to_dict
from haystack.dataclasses import Document

from elasticsearch_haystack.document_store import ElasticsearchDocumentStore

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@
# There are no import stubs for elastic_transport and elasticsearch so mypy fails
from elastic_transport import NodeConfig # type: ignore[import-not-found]
from elasticsearch import Elasticsearch, helpers # type: ignore[import-not-found]
from haystack.preview import default_from_dict, default_to_dict
from haystack.preview.dataclasses import Document
from haystack.preview.document_stores.decorator import document_store
from haystack.preview.document_stores.errors import DocumentStoreError, DuplicateDocumentError
from haystack.preview.document_stores.protocols import DuplicatePolicy
from haystack import default_from_dict, default_to_dict
from haystack.dataclasses import Document
from haystack.document_stores import DocumentStoreError, DuplicateDocumentError, DuplicatePolicy, document_store
from haystack.utils.filters import convert

from elasticsearch_haystack.filters import _normalize_filters

Expand Down Expand Up @@ -130,103 +129,29 @@ def _search_documents(self, **kwargs) -> List[Document]:
return documents

def filter_documents(self, filters: Optional[Dict[str, Any]] = None) -> List[Document]:
"""
Returns the documents that match the filters provided.
Filters are defined as nested dictionaries. The keys of the dictionaries can be a logical operator (`"$and"`,
`"$or"`, `"$not"`), a comparison operator (`"$eq"`, `$ne`, `"$in"`, `$nin`, `"$gt"`, `"$gte"`, `"$lt"`,
`"$lte"`) or a metadata field name.
Logical operator keys take a dictionary of metadata field names and/or logical operators as value. Metadata
field names take a dictionary of comparison operators as value. Comparison operator keys take a single value or
(in case of `"$in"`) a list of values as value. If no logical operator is provided, `"$and"` is used as default
operation. If no comparison operator is provided, `"$eq"` (or `"$in"` if the comparison value is a list) is used
as default operation.
Example:
```python
filters = {
"$and": {
"type": {"$eq": "article"},
"date": {"$gte": "2015-01-01", "$lt": "2021-01-01"},
"rating": {"$gte": 3},
"$or": {
"genre": {"$in": ["economy", "politics"]},
"publisher": {"$eq": "nytimes"}
}
}
}
# or simpler using default operators
filters = {
"type": "article",
"date": {"$gte": "2015-01-01", "$lt": "2021-01-01"},
"rating": {"$gte": 3},
"$or": {
"genre": ["economy", "politics"],
"publisher": "nytimes"
}
}
```
To use the same logical operator multiple times on the same level, logical operators can take a list of
dictionaries as value.
Example:
```python
filters = {
"$or": [
{
"$and": {
"Type": "News Paper",
"Date": {
"$lt": "2019-01-01"
}
}
},
{
"$and": {
"Type": "Blog Post",
"Date": {
"$gte": "2019-01-01"
}
}
}
]
}
```
if filters and "operator" not in filters and "conditions" not in filters:
filters = convert(filters)

:param filters: the filters to apply to the document list.
:return: a list of Documents that match the given filters.
"""
query = {"bool": {"filter": _normalize_filters(filters)}} if filters else None
documents = self._search_documents(query=query)
return documents

def write_documents(self, documents: List[Document], policy: DuplicatePolicy = DuplicatePolicy.FAIL) -> None:
def write_documents(self, documents: List[Document], policy: DuplicatePolicy = DuplicatePolicy.NONE) -> int:
"""
Writes (or overwrites) documents into the store.
:param documents: a list of documents.
:param policy: documents with the same ID count as duplicates. When duplicates are met,
the store can:
- skip: keep the existing document and ignore the new one.
- overwrite: remove the old document and write the new one.
- fail: an error is raised
:raises ValueError: if 'documents' parameter is not a list of Document objects
:raises DuplicateDocumentError: Exception trigger on duplicate document if `policy=DuplicatePolicy.FAIL`
:raises DocumentStoreError: Exception trigger on any other error when writing documents
:return: None
Writes Documents to Elasticsearch.
If policy is not specified or set to DuplicatePolicy.NONE, it will raise an exception if a document with the
same ID already exists in the document store.
"""
if len(documents) > 0:
if not isinstance(documents[0], Document):
msg = "param 'documents' must contain a list of objects of type Document"
raise ValueError(msg)

if policy == DuplicatePolicy.NONE:
policy = DuplicatePolicy.FAIL

action = "index" if policy == DuplicatePolicy.OVERWRITE else "create"
_, errors = helpers.bulk(
documents_written, errors = helpers.bulk(
client=self._client,
actions=(
{
Expand Down Expand Up @@ -262,6 +187,8 @@ def write_documents(self, documents: List[Document], policy: DuplicatePolicy = D
msg = f"Failed to write documents to Elasticsearch. Errors:\n{other_errors}"
raise DocumentStoreError(msg)

return documents_written

def _deserialize_document(self, hit: Dict[str, Any]) -> Document:
"""
Creates a Document from the search hit provided.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
# SPDX-License-Identifier: Apache-2.0
from typing import Any, Dict, List, Optional

from haystack.preview import component, default_from_dict, default_to_dict
from haystack.preview.dataclasses import Document
from haystack import component, default_from_dict, default_to_dict
from haystack.dataclasses import Document

from elasticsearch_haystack.document_store import ElasticsearchDocumentStore

Expand Down
Loading

0 comments on commit be89825

Please sign in to comment.