Skip to content

Commit

Permalink
Extend _examples-header, fix path in transformers
Browse files Browse the repository at this point in the history
  • Loading branch information
burnash committed Jan 16, 2024
1 parent 5e2f473 commit ecfb139
Show file tree
Hide file tree
Showing 7 changed files with 257 additions and 0 deletions.
Empty file.
63 changes: 63 additions & 0 deletions docs/examples/pdf_to_weaviate/pdf_to_weaviate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import os

import dlt
from dlt.destinations.weaviate import weaviate_adapter
from PyPDF2 import PdfReader


@dlt.resource(selected=False)
def list_files(folder_path: str):
folder_path = os.path.abspath(folder_path)
for filename in os.listdir(folder_path):
file_path = os.path.join(folder_path, filename)
yield {
"file_name": filename,
"file_path": file_path,
"mtime": os.path.getmtime(file_path)
}


@dlt.transformer(primary_key="page_id", write_disposition="merge")
def pdf_to_text(file_item, separate_pages: bool = False):
if not separate_pages:
raise NotImplementedError()
# extract data from PDF page by page
reader = PdfReader(file_item["file_path"])
for page_no in range(len(reader.pages)):
# add page content to file item
page_item = dict(file_item)
page_item["text"] = reader.pages[page_no].extract_text()
page_item["page_id"] = file_item["file_name"] + "_" + str(page_no)
yield page_item

pipeline = dlt.pipeline(
pipeline_name='pdf_to_text',
destination='weaviate'
)

# this constructs a simple pipeline that: (1) reads files from "invoices" folder (2) filters only those ending with ".pdf"
# (3) sends them to pdf_to_text transformer with pipe (|) operator
pdf_pipeline = list_files("assets/invoices").add_filter(
lambda item: item["file_name"].endswith(".pdf")
) | pdf_to_text(separate_pages=True)

# set the name of the destination table to receive pages
# NOTE: Weaviate, dlt's tables are mapped to classes
pdf_pipeline.table_name = "InvoiceText"

# use weaviate_adapter to tell destination to vectorize "text" column
load_info = pipeline.run(
weaviate_adapter(pdf_pipeline, vectorize="text")
)
row_counts = pipeline.last_trace.last_normalize_info
print(row_counts)
print("------")
print(load_info)

assert_load_info(load_info)

import weaviate

client = weaviate.Client("http://localhost:8080")
# get text of all the invoices in InvoiceText class we just created above
print(client.query.get("InvoiceText", ["text", "file_name", "mtime", "page_id"]).do())
6 changes: 6 additions & 0 deletions docs/website/docs/examples/chess_production/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,14 @@ import Header from '../_examples-header.md';
<Header
intro="In this tutorial, you will learn how to investigate, track, retry and test your loads."
slug="chess_production"
<<<<<<< HEAD
run_file="chess"
destination="duckdb" />
=======
install_comment="with duckdb"
install_packages='"dlt[duckdb]"'
run_file="chess" />
>>>>>>> d3a8e29b (Extend _examples-header, fix path in transformers)
## Run chess pipeline in production

Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from tests.pipeline.utils import assert_load_info

def pdf_to_weaviate_snippet() -> None:
# @@@DLT_SNIPPET_START example
# @@@DLT_SNIPPET_START pdf_to_weaviate
import os

import dlt
from dlt.destinations.weaviate import weaviate_adapter
from PyPDF2 import PdfReader


@dlt.resource(selected=False)
def list_files(folder_path: str):
folder_path = os.path.abspath(folder_path)
for filename in os.listdir(folder_path):
file_path = os.path.join(folder_path, filename)
yield {
"file_name": filename,
"file_path": file_path,
"mtime": os.path.getmtime(file_path)
}


@dlt.transformer(primary_key="page_id", write_disposition="merge")
def pdf_to_text(file_item, separate_pages: bool = False):
if not separate_pages:
raise NotImplementedError()
# extract data from PDF page by page
reader = PdfReader(file_item["file_path"])
for page_no in range(len(reader.pages)):
# add page content to file item
page_item = dict(file_item)
page_item["text"] = reader.pages[page_no].extract_text()
page_item["page_id"] = file_item["file_name"] + "_" + str(page_no)
yield page_item

pipeline = dlt.pipeline(
pipeline_name='pdf_to_text',
destination='weaviate'
)

# this constructs a simple pipeline that: (1) reads files from "invoices" folder (2) filters only those ending with ".pdf"
# (3) sends them to pdf_to_text transformer with pipe (|) operator
pdf_pipeline = list_files("assets/invoices").add_filter(
lambda item: item["file_name"].endswith(".pdf")
) | pdf_to_text(separate_pages=True)

# set the name of the destination table to receive pages
# NOTE: Weaviate, dlt's tables are mapped to classes
pdf_pipeline.table_name = "InvoiceText"

# use weaviate_adapter to tell destination to vectorize "text" column
load_info = pipeline.run(
weaviate_adapter(pdf_pipeline, vectorize="text")
)
row_counts = pipeline.last_trace.last_normalize_info
print(row_counts)
print("------")
print(load_info)
# @@@DLT_SNIPPET_END pdf_to_weaviate

assert_load_info(load_info)

# @@@DLT_SNIPPET_START pdf_to_weaviate_read
import weaviate

client = weaviate.Client("http://localhost:8080")
# get text of all the invoices in InvoiceText class we just created above
print(client.query.get("InvoiceText", ["text", "file_name", "mtime", "page_id"]).do())
# @@@DLT_SNIPPET_END pdf_to_weaviate_read
# @@@DLT_SNIPPET_END example
115 changes: 115 additions & 0 deletions docs/website/docs/examples/pdf_to_weaviate/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
---
title: Load PDFs to Weaviate
description: Extract text from PDF and load it into a vector database
keywords: [pdf, weaviate, vector store, vector database, ]
---

import Header from '../_examples-header.md';

<Header
intro="This example demonstrates how to extract text from PDF files and load it into Weaviate vector database."
slug="pdf_to_weaviate"
install_comment="with weaviate and PyPDF2"
install_packages='PyPDF2 "dlt[weaviate]"'
run_file="pdf_to_weaviate" />

## Example code

<!--@@@DLT_SNIPPET_START ./code/pdf_to_weaviate-snippets.py::pdf_to_weaviate -->
```py
import os

import dlt
from dlt.destinations.weaviate import weaviate_adapter
from PyPDF2 import PdfReader


@dlt.resource(selected=False)
def list_files(folder_path: str):
folder_path = os.path.abspath(folder_path)
for filename in os.listdir(folder_path):
file_path = os.path.join(folder_path, filename)
yield {
"file_name": filename,
"file_path": file_path,
"mtime": os.path.getmtime(file_path)
}


@dlt.transformer(primary_key="page_id", write_disposition="merge")
def pdf_to_text(file_item, separate_pages: bool = False):
if not separate_pages:
raise NotImplementedError()
# extract data from PDF page by page
reader = PdfReader(file_item["file_path"])
for page_no in range(len(reader.pages)):
# add page content to file item
page_item = dict(file_item)
page_item["text"] = reader.pages[page_no].extract_text()
page_item["page_id"] = file_item["file_name"] + "_" + str(page_no)
yield page_item

pipeline = dlt.pipeline(
pipeline_name='pdf_to_text',
destination='weaviate'
)

# this constructs a simple pipeline that: (1) reads files from "invoices" folder (2) filters only those ending with ".pdf"
# (3) sends them to pdf_to_text transformer with pipe (|) operator
pdf_pipeline = list_files("assets/invoices").add_filter(
lambda item: item["file_name"].endswith(".pdf")
) | pdf_to_text(separate_pages=True)

# set the name of the destination table to receive pages
# NOTE: Weaviate, dlt's tables are mapped to classes
pdf_pipeline.table_name = "InvoiceText"

# use weaviate_adapter to tell destination to vectorize "text" column
load_info = pipeline.run(
weaviate_adapter(pdf_pipeline, vectorize="text")
)
row_counts = pipeline.last_trace.last_normalize_info
print(row_counts)
print("------")
print(load_info)
```
<!--@@@DLT_SNIPPET_END ./code/pdf_to_weaviate-snippets.py::pdf_to_weaviate -->

We start with a simple resource that lists files in specified folder. To that we add a **filter** function that removes all files that are not pdfs.

To parse PDFs we use [PyPDF](https://pypdf2.readthedocs.io/en/3.0.0/user/extract-text.html) and return each page from a given PDF as separate data item.

Parsing happens in `@dlt.transformer` which receives data from `list_files` resource. It splits PDF into pages, extracts text and yields pages separately
so each PDF will correspond to many items in Weaviate `InvoiceText` class. We set the primary key and use merge disposition so if the same PDF comes twice
we'll just update the vectors, and not duplicate.

Look how we pipe data from `list_files` resource (note that resource is deselected so we do not load raw file items to destination) into `pdf_to_text` using **|** operator.

Just before load, the `weaviate_adapter` is used to tell `weaviate` destination which fields to vectorize.

Now it is time to query our documents.
<!--@@@DLT_SNIPPET_START ./code/pdf_to_weaviate-snippets.py::pdf_to_weaviate_read-->
```py
import weaviate

client = weaviate.Client("http://localhost:8080")
# get text of all the invoices in InvoiceText class we just created above
print(client.query.get("InvoiceText", ["text", "file_name", "mtime", "page_id"]).do())
```
<!--@@@DLT_SNIPPET_END ./code/pdf_to_weaviate-snippets.py::pdf_to_weaviate_read-->

Above we provide URL to local cluster. We also use `contextionary` to vectorize data. You may find information on our setup in links below.

:::tip

Change the destination to `duckdb` if you do not have access to Weaviate cluster or not able to run it locally.

:::

Learn more:

- [Setup Weaviate destination - local or cluster](dlt-ecosystem/destinations/weaviate.md).
- [Connect the transformers to the resources](general-usage/resource#feeding-data-from-one-resource-into-another)
to load additional data or enrich it.
- [Transform your data before loading](general-usage/resource#customize-resources) and see some
[examples of customizations like column renames and anonymization](general-usage/customising-pipelines/renaming_columns).
1 change: 1 addition & 0 deletions docs/website/sidebars.js
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ const sidebars = {
'examples/nested_data/index',
'examples/qdrant_zendesk/index',
'examples/google_sheets/index'
'examples/pdf_to_weaviate/index'
],
},
{
Expand Down

0 comments on commit ecfb139

Please sign in to comment.