Skip to content

Commit

Permalink
updated
Browse files Browse the repository at this point in the history
  • Loading branch information
dat-a-man committed Jan 17, 2024
1 parent 20c615e commit e21e7de
Showing 1 changed file with 111 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ description: Enriching the user-agent device data with average device price.
keywords: [data enrichment, user-agent data, device enrichment]
---

# Data Enrichment Part One: User-agent device data enrichment
# Data enrichment part one: User-agent device data enrichment

Data enrichment enhances raw data with valuable information from multiple sources, increasing its
analytical and decision-making value.
Expand All @@ -28,21 +28,23 @@ plan.


## Creating data enrichment pipeline
You can either follow this documentation to build a data enrichment pipeline or use the provided Colab notebook.
You can either follow the example in linked Colab notebook or follow this documentation to
create the user-agent device data enrichment pipeline.

### A. Colab notebook
The Colab notebook combines three data enrichment processes for a sample dataset, starting with the first
enrichment of user-agent device data.
The Colab notebook combines three data enrichment processes for a sample dataset, starting with "Data
enrichment part one: User-agent device data".

The first step is to register on [SerpApi](https://serpapi.com/) and obtain the
API token key. To set up credentials in Colab secrets:
1. In Colab notebook, click 'Colab secrets' on the left.
2. Add `serp_api_key` as an environmental variable with your API key as its value.
1. In the Colab notebook, click on "Colab Secrets" in the left sidebar.
2. Configure variable `serp_api_key` as with obtained API key.

Here's the link to the notebook:
**[Colab Notebook](https://colab.research.google.com/drive/1ZKEkf1LRSld7CWQFS36fUXjhJKPAon7P?usp=sharing).**

### B. Create a pipeline
Alternatively, To create a data enrichment pipeline, follow these steps:
You can start by creating the following directory structure:

```python
Expand Down Expand Up @@ -98,7 +100,9 @@ def tracked_data():
> specifics of what data is collected and how it's used can vary significantly among different
> tracking services.
Here's a breakdown of each element: `user_id`: Web trackers typically assign unique ID to users for
Here's a breakdown of each element:

`user_id`: Web trackers typically assign unique ID to users for
tracking their journeys and interactions over time.

`device_name`:User device information helps in understanding the user base's device.
Expand All @@ -125,129 +129,130 @@ The first step is to register on [SerpApi](https://serpapi.com/) and obtain the

1. Replace the value of the `api_key`.

1. Create fetch_average_price() function as follows:

```python
import datetime
import requests

# Uncomment transformer function if it is to be used as a transformer,
# otherwise it is being used with the `add_map` functionality.

# @dlt.transformer(data_from=tracked_data)
def fetch_average_price(user_tracked_data):
"""
Fetches the average price of a device from an external API and updates the user_data
dictionary.
This function retrieves the average price of a device specified in the user_data
dictionary by making an API request. The price data is cached in the device_info
state to reduce API calls. If the data for the device is older than 180 days,
a new API request is made.
Args:
user_tracked_data (dict): A dictionary containing user data, including the device name.
Returns:
dict: The updated user_data dictionary with added device price and updated timestamp.
"""

# Retrieve the API key from DLT secrets
api_key = dlt.secrets.get("sources.api_key")

# Get the current resource state for device information
device_info = dlt.current.resource_state().setdefault("devices", {})

# Current timestamp for checking the last update
current_timestamp = datetime.datetime.now()

# Print the current device information
# print(device_info) # if you need to check state

# Extract the device name from user data
device = user_tracked_data['device_name']
device_data = device_info.get(device, {})

# Calculate the time since the last update
last_updated = (current_timestamp - device_data.get('timestamp', datetime.datetime.min))

# Check if the device is not in state or data is older than 180 days
if device not in device_info or last_updated > datetime.timedelta(days=180):
try:
# Make an API request to fetch device prices
response = requests.get("https://serpapi.com/search", params={
"engine": "google_shopping", "q": device,
"api_key": api_key, "num": 10
})
except requests.RequestException as e:
print(f"Request failed: {e}")
return None

if response.status_code != 200:
print(f"Failed to retrieve data: {response.status_code}")
return None

# Process the response to extract prices
results = response.json().get("shopping_results", [])
prices = []
for r in results:
if r.get("price"):
# Split the price string and convert each part to float
price_parts = r.get("price").replace('$', '').replace(',', '').split()
for part in price_parts:
try:
prices.append(float(part))
except ValueError:
pass # Ignore parts that can't be converted to float

# Calculate the average price and update the device_info
device_price = round(sum(prices) / len(prices), 2) if prices else None
device_info[device] = {'timestamp': current_timestamp, 'price': device_price}

# Add the device price and timestamp to the user data
user_tracked_data['device_price_USD'] = device_price
user_tracked_data['price_updated_at'] = current_timestamp

else:
# Use cached price data if available and not outdated
user_tracked_data['device_price_USD'] = device_data.get('price')
user_tracked_data['price_updated_at'] = device_data.get('timestamp')

return user_tracked_data
```
1. Create `fetch_average_price()` function as follows:
```python
import datetime
import requests

# Uncomment transformer function if it is to be used as a transformer,
# otherwise it is being used with the `add_map` functionality.

# @dlt.transformer(data_from=tracked_data)
def fetch_average_price(user_tracked_data):
"""
Fetches the average price of a device from an external API and updates the user_data
dictionary.
This function retrieves the average price of a device specified in the user_data
dictionary by making an API request. The price data is cached in the device_info
state to reduce API calls. If the data for the device is older than 180 days,
a new API request is made.
Args:
user_tracked_data (dict): A dictionary containing user data, including the device name.
Returns:
dict: The updated user_data dictionary with added device price and updated timestamp.
"""

# Retrieve the API key from DLT secrets
api_key = dlt.secrets.get("sources.api_key")

# Get the current resource state for device information
device_info = dlt.current.resource_state().setdefault("devices", {})

# Current timestamp for checking the last update
current_timestamp = datetime.datetime.now()

# Print the current device information
# print(device_info) # if you need to check state

# Extract the device name from user data
device = user_tracked_data['device_name']
device_data = device_info.get(device, {})

# Calculate the time since the last update
last_updated = (current_timestamp - device_data.get('timestamp', datetime.datetime.min))

# Check if the device is not in state or data is older than 180 days
if device not in device_info or last_updated > datetime.timedelta(days=180):
try:
# Make an API request to fetch device prices
response = requests.get("https://serpapi.com/search", params={
"engine": "google_shopping", "q": device,
"api_key": api_key, "num": 10
})
except requests.RequestException as e:
print(f"Request failed: {e}")
return None

if response.status_code != 200:
print(f"Failed to retrieve data: {response.status_code}")
return None

# Process the response to extract prices
results = response.json().get("shopping_results", [])
prices = []
for r in results:
if r.get("price"):
# Split the price string and convert each part to float
price_parts = r.get("price").replace('$', '').replace(',', '').split()
for part in price_parts:
try:
prices.append(float(part))
except ValueError:
pass # Ignore parts that can't be converted to float

# Calculate the average price and update the device_info
device_price = round(sum(prices) / len(prices), 2) if prices else None
device_info[device] = {'timestamp': current_timestamp, 'price': device_price}

# Add the device price and timestamp to the user data
user_tracked_data['device_price_USD'] = device_price
user_tracked_data['price_updated_at'] = current_timestamp

else:
# Use cached price data if available and not outdated
user_tracked_data['device_price_USD'] = device_data.get('price')
user_tracked_data['price_updated_at'] = device_data.get('timestamp')

return user_tracked_data
```

### 3. Create your pipeline

### Create your pipeline

1. In creating the pipeline the `fetch_average_price` can be used in the following ways:


- Add map function
- Transformer function

:::note The `dlt` library's `transformer` and `add_map` functions serve distinct purposes in data

The `dlt` library's `transformer` and `add_map` functions serve distinct purposes in data
processing.

`Transformers` used to process a resource, are ideal for post-load data transformations in a
pipeline, compatible with tools like dbt, the `dlt` SQL client, or Pandas for intricate data
manipulation. To read more:
[Click here.](https://dlthub.com/docs/general-usage/resource#process-resources-with-dlttransformer)
[Click here.](../../general-usage/resource#process-resources-with-dlttransformer)

Conversely, `add_map` used to customize a resource applies transformations at an item level
within a resource. It's useful for tasks like anonymizing individual data records. More on this
can be found under
[Customize resources](https://dlthub.com/docs/general-usage/resource#customize-resources) in the
documentation. :::
[Customize resources](../../general-usage/resource#customize-resources) in the
documentation.


1. Here, we create the pipeline and use the `add_map` functionality to enrich the data.
1. Here, we create the pipeline and use the `add_map` functionality:

```python
# Create the pipeline
pipeline = dlt.pipeline(
pipeline_name="Data_enrichment_One",
pipeline_name="data_enrichment_one",
destination="duckdb",
dataset_name="data_enrichment_part_1",
full_refresh = True
dataset_name="user_device_enrichment",
)

# Run the pipeline with the transformed source
Expand Down Expand Up @@ -291,7 +296,7 @@ def fetch_average_price(user_tracked_data):
dlt pipeline <pipeline_name> show
```

For example, the pipeline_name for the above pipeline example is 'Data_enrichment_One'; you can use
For example, the pipeline_name for the above pipeline example is 'data_enrichment_one'; you can use
any custom name instead.


0 comments on commit e21e7de

Please sign in to comment.