Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature Request]: In IngestionPipeline, cache node transformations at node level #16394

Open
aloysius-lim opened this issue Oct 7, 2024 · 3 comments
Labels
enhancement New feature or request triage Issue needs to be triaged/prioritized

Comments

@aloysius-lim
Copy link

Feature Description

In IngestionPipeline, cache the transformations on individual nodes, so that the same node+transformation can be retrieved from the cache, instead of requiring the whole list of nodes to be the same.

Reason

The hash used to key the cache is based on the whole list of Nodes (e.g. Documents), instead of individual Nodes. Even if only a single Node has changed, the transformations are executed on the whole list again, since the hash is different.

def get_transformation_hash(
nodes: Sequence[BaseNode], transformation: TransformComponent
) -> str:
"""Get the hash of a transformation."""
nodes_str = "".join(
[str(node.get_content(metadata_mode=MetadataMode.ALL)) for node in nodes]
)
transformation_dict = transformation.to_dict()
transform_string = remove_unstable_values(str(transformation_dict))
return sha256((nodes_str + transform_string).encode("utf-8")).hexdigest()
def run_transformations(
nodes: Sequence[BaseNode],
transformations: Sequence[TransformComponent],
in_place: bool = True,
cache: Optional[IngestionCache] = None,
cache_collection: Optional[str] = None,
**kwargs: Any,
) -> Sequence[BaseNode]:
"""
Run a series of transformations on a set of nodes.
Args:
nodes: The nodes to transform.
transformations: The transformations to apply to the nodes.
Returns:
The transformed nodes.
"""
if not in_place:
nodes = list(nodes)
for transform in transformations:
if cache is not None:
hash = get_transformation_hash(nodes, transform)
cached_nodes = cache.get(hash, collection=cache_collection)
if cached_nodes is not None:
nodes = cached_nodes
else:
nodes = transform(nodes, **kwargs)
cache.put(hash, nodes, collection=cache_collection)
else:
nodes = transform(nodes, **kwargs)
return nodes

Code example:

import random

from llama_index.core import Document
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.ingestion import IngestionPipeline
from llama_index.core.ingestion.cache import DEFAULT_CACHE_NAME

pipeline = IngestionPipeline(
    # Use only 1 transformation for simplicity
    transformations=[SentenceSplitter(chunk_size=50, chunk_overlap=0)]
)

docs = [
    Document(id_=str(i), text=" ".join([random.choice("abcdefg") for _ in range(100)]))
    for i in range(3)
]

# Before running pipeline, cache is empty
pipeline.cache.cache.get_all(collection=DEFAULT_CACHE_NAME)
# {}

# Run with docs 1 and 2
pipeline.run(documents=docs[0:3])

# Cache contains a single entry, keyed by the hash combining first 2 docs, with output of 6 nodes
for k, v in pipeline.cache.cache.get_all(collection=DEFAULT_CACHE_NAME).items():
    print(f"Hash: {k}")
    print(f"# Output Nodes: {len(v['nodes'])}")
# Hash: af724bae923c6dc5d7f83f6874e983b2ea18b1f7a14d840d824dd1decff2e45e
# # Output Nodes: 6

# Run with doc 1 only
_ = pipeline.run(documents=docs[0:1])

# A new cache entry is added representing only doc 1, even though it has been transformed before.
for k, v in pipeline.cache.cache.get_all(collection=DEFAULT_CACHE_NAME).items():
    print(f"Hash: {k}")
    print(f"# Output Nodes: {len(v['nodes'])}")
# Hash: af724bae923c6dc5d7f83f6874e983b2ea18b1f7a14d840d824dd1decff2e45e
# # Output Nodes: 6
# Hash: d790f504a8904444c4bd4a10c21cabf367d6b8f4dcaa29207dda54d47887f763
# # Output Nodes: 2

# Run with docs 2 (already transformed) and 3 (new)
_ = pipeline.run(documents=list(docs[1:]))

# A new cache entry is added for this combination, even though doc 2 has already been transformed.
for k, v in pipeline.cache.cache.get_all(collection=DEFAULT_CACHE_NAME).items():
    print(f"Hash: {k}")
    print(f"# Output Nodes: {len(v['nodes'])}")
# Hash: af724bae923c6dc5d7f83f6874e983b2ea18b1f7a14d840d824dd1decff2e45e
# # Output Nodes: 6
# Hash: d790f504a8904444c4bd4a10c21cabf367d6b8f4dcaa29207dda54d47887f763
# # Output Nodes: 2
# Hash: 4c989444d6af891d03550461b21741eb80215ba4e94d80834bb6a0388d75eaa9
# # Output Nodes: 4

Value of Feature

I would like to use IngestionPipeline as part of a data preparation step in an ML pipeline. The inputs are Documents, and outputs are embeddings that are passed to an ML model. There is no need for storage or retrieval. For example:

embeddings = pipeline.run(documents=list_of_docs)
results = model.predict(embeddings)

In cases where incoming data represents a full snapshot (e.g. daily snaphots of Documents in a production system), the set of Documents comprises:

  1. New documents
  2. Updated documents
  3. Unchanged documents

It would be more efficient if the transformations can be run only on (1) and (2), and retrieve the cached results for (3), especially if (3) represents a large proportion of the batch of data.

Currently, the cache works on the whole list of Documents passed to IngestionPIpeline.run(), so even if only 1 Document has changed, all the other Documents are processed again.

Furthermore, the cache will become bloated over time, as each batch of data is likely to be unique (albeit with many overlapping Documents). It is extremely unlikely that the exact same list of Documents is encountered again, which diminishes the utility of the cache.

@aloysius-lim aloysius-lim added enhancement New feature or request triage Issue needs to be triaged/prioritized labels Oct 7, 2024
@fcakyon
Copy link
Contributor

fcakyon commented Oct 27, 2024

@aloysius-lim I agree with you. Current ingestion caching is ineffective for my use cases. Node-level caching would be ideal!

@logan-markewich if you are OK with this feature, I am willing to work on a PR

@shiftan
Copy link

shiftan commented Jan 23, 2025

@fcakyon - I just happen to see the same issue as reported by @aloysius-lim .
The current implementation contradicts the expected behavior as documented here stating "In an IngestionPipeline, each node + transformation combination is hashed and cached"

@fcakyon
Copy link
Contributor

fcakyon commented Jan 23, 2025

@shiftan after actively using llama-index for 6-8 months, I decided that it is nothing more than a hype project. Existing features dont work or too inefficient for production.

Based on my professional experience, it's almost always more time-efficient and cost-effective to stick to native LLM API's without relying on a wrapper framework as llama-index 👌🏻

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request triage Issue needs to be triaged/prioritized
Projects
None yet
Development

No branches or pull requests

3 participants