Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
dat-a-man committed Jan 25, 2024
1 parent aaf50c6 commit c23b85c
Showing 1 changed file with 78 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ Often, you have a data set of monetary value in one currency. It would be best t
in a different currency for different reasons (like reporting, analysis, or operations in a global
context).

## Setup Guide
## Currency conversion process

To enable this is currency conversion data enrichment. A few important steps are involved:
To enable currency conversion data enrichment, there are a few important steps involved:

1. Define base and target currencies. e.g., USD (base) to EUR (target).
1. Obtain current exchange rates from a reliable source like a financial data API.
Expand Down Expand Up @@ -59,8 +59,8 @@ currency_conversion_enrichment/
`dlt` works on the principle of [sources](../../general-usage/source.md) and
[resources.](../../general-usage/resource.md)

1. The last part of our data enrichment (part one) involved enriching the data with user-agent
device data. This included adding two new columns to the dataset as folows:
1. The last part of our data enrichment ([part one](../../general-usage/data-enrichments/user_agent_device_data_enrichment.md))
involved enriching the data with user-agent device data. This included adding two new columns to the dataset as folows:

- `device_price_usd`: average price of the device in USD.

Expand All @@ -82,10 +82,13 @@ currency_conversion_enrichment/
@dlt.resource()
def enriched_data_part_two():
data_enrichment_part_one = [
{"user_id": 1, "device_name": "Sony Experia XZ",
"page_referer": "https://b2venture.lightning.force.com/",
"device_price_usd": 313.01,
"price_updated_at": "2024-01-15 04:08:45.088499+00:00"},
{
"user_id": 1,
"device_name": "Sony Experia XZ",
"page_referer": "https://b2venture.lightning.force.com/",
"device_price_usd": 313.01,
"price_updated_at": "2024-01-15 04:08:45.088499+00:00"
},
]
"""
Similar data for the other users.
Expand Down Expand Up @@ -119,71 +122,69 @@ API token.
1. Create the `converted_amount` function as follows:

```python
def converted_amount(record):
"""
Converts an amount from base currency to target currency using the latest exchange rate.
This function retrieves the current exchange rate from an external API and
applies it to the specified amount in the record. It handles updates to the exchange rate
if the current rate is over 12 hours.
Args:
record (dict): A dictionary containing the 'amount' key with the value to be converted.
Yields:
dict: A dictionary containing the original amount in USD, converted amount in EUR,
the exchange rate, and the last update time of the rate.
Note:
The base currency (USD) and target currency (EUR) are hard coded in this function,
that can be changed
The API key is retrieved from the `dlt` secrets.
"""

# Hardcoded base and target currencies
base_currency = "USD"
target_currency = "EUR"

# Retrieve the API key from DLT secrets
api_key = dlt.secrets.get("sources.api_key")

# Initialize or retrieve the state for currency rates
rates_state = dlt.current.resource_state().setdefault("rates", {})
currency_pair_key = f"{base_currency}-{target_currency}"
currency_pair_state = rates_state.setdefault(currency_pair_key, {
"last_update": datetime.min,
"rate": None
})

# Update the exchange rate if it's older than 12 hours
if (
currency_pair_state.get("rate") is None
or (datetime.utcnow() - currency_pair_state["last_update"] >= timedelta(hours=12))):
url = f"https://v6.exchangerate-api.com/v6/{api_key}/pair/{base_currency}/{target_currency}"
response = requests.get(url)
if response.status_code == 200:
data = response.json()
currency_pair_state.update({
"rate": data.get("conversion_rate"),
"last_update": datetime.fromtimestamp(data.get("time_last_update_unix"))
})
print(f"The latest rate of {data.get('conversion_rate')} for the currency pair {currency_pair_key} is fetched and updated.")
else:
raise Exception(f"Error fetching the exchange rate: HTTP {response.status_code}")

# Convert the amount using the retrieved or stored exchange rate
amount = record['device_price_usd']
rate = currency_pair_state["rate"]
yield {
"actual_amount": amount,
"base_currency": base_currency,
"converted_amount": round(amount * rate, 2),
"target_currency": target_currency,
"rate": rate,
"rate_last_updated": currency_pair_state["last_update"],
}
```

# @transformer(data_from=enriched_data_part_two)
def converted_amount(record):
"""
Converts an amount from base currency to target currency using the latest exchange rate.
This function retrieves the current exchange rate from an external API and
applies it to the specified amount in the record. It handles updates to the exchange rate
if the current rate is over 12 hours old.
Args:
record (dict): A dictionary containing the 'amount' key with the value to be converted.
Yields:
dict: A dictionary containing the original amount in USD, converted amount in EUR,
the exchange rate, and the last update time of the rate.
Note:
The base currency (USD) and target currency (EUR) are hard coded in this function,
but that can be changed.
The API key is retrieved from the `dlt` secrets.
"""
# Hardcoded base and target currencies
base_currency = "USD"
target_currency = "EUR"

# Retrieve the API key from DLT secrets
api_key = dlt.secrets.get("sources.api_key")

# Initialize or retrieve the state for currency rates
rates_state = dlt.current.resource_state().setdefault("rates", {})
currency_pair_key = f"{base_currency}-{target_currency}"
currency_pair_state = rates_state.setdefault(currency_pair_key, {
"last_update": datetime.min,
"rate": None
})

# Update the exchange rate if it's older than 12 hours
if (currency_pair_state.get("rate") is None or
(datetime.utcnow() - currency_pair_state["last_update"] >= timedelta(hours=12))):
url = f"https://v6.exchangerate-api.com/v6/{api_key}/pair/{base_currency}/{target_currency}"
response = requests.get(url)
if response.status_code == 200:
data = response.json()
currency_pair_state.update({
"rate": data.get("conversion_rate"),
"last_update": datetime.fromtimestamp(data.get("time_last_update_unix"))
})
print(f"The latest rate of {data.get('conversion_rate')} for the currency pair {currency_pair_key} is fetched and updated.")
else:
raise Exception(f"Error fetching the exchange rate: HTTP {response.status_code}")

# Convert the amount using the retrieved or stored exchange rate
amount = record['device_price_usd'] # Assuming the key is 'amount' as per the function documentation
rate = currency_pair_state["rate"]
yield {
"actual_amount": amount,
"base_currency": base_currency,
"converted_amount": round(amount * rate, 2),
"target_currency": target_currency,
"rate": rate,
"rate_last_updated": currency_pair_state["last_update"],
}
```
1. Next, follow the instructions in
[Destinations](../../dlt-ecosystem/destinations/duckdb.md) to add credentials for
your chosen destination. This will ensure that your data is properly routed to its final
Expand All @@ -199,9 +200,8 @@ API token.
The `dlt` library's `transformer` and `add_map` functions serve distinct purposes in data
processing.

`Transformers` used to process a resource and are ideal for post-load data transformations in a
pipeline, compatible with tools like `dbt`, the `dlt SQL client`, or Pandas for intricate data
manipulation. To read more:
`Transformers` are a form of A form of `dlt resource` that takes input from other resources
via `data_from` argument in order to enrich or transform the data.
[Click here.](../../general-usage/resource.md#process-resources-with-dlttransformer)

Conversely, `add_map` used to customize a resource applies transformations at an item level
Expand All @@ -226,8 +226,8 @@ API token.
```

:::info
Please note that the same outcome can be achieved by using the transformer function. To
do so, you need to add the transformer decorator at the top of the `converted_amount` function.
Please note that the same outcome can be achieved by using the `@dlt.transformer` decorator function.
To do so, you need to add the transformer decorator at the top of the `converted_amount` function.
For `pipeline.run`, you can use the following code:

```python
Expand Down

0 comments on commit c23b85c

Please sign in to comment.