Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Complete L1 Additions #101

Merged
merged 4 commits into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .streamlit/secrets_example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ DB_USER = 'analytics'
DB_PASS = 'analytics'
DB_HOST = 'db'
DB_PORT = '5432'
DB_ENV = 'prod'

[settings]
SHOW_TESTNETS = 'false'
Expand Down
3 changes: 2 additions & 1 deletion extractors/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ duckdb==0.10.2
polars-lts-cpu==1.1.0
pandas
numpy
synthetix==0.1.13
synthetix==0.1.13
web3==6.20.2
9 changes: 5 additions & 4 deletions scheduler/dags/v3_etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
# environment variables
WORKING_DIR = os.getenv("WORKING_DIR")
NETWORK_RPCS = {
"eth_mainnet": "NETWORK_1_RPC",
"base_mainnet": "NETWORK_8453_RPC",
"base_sepolia": "NETWORK_84532_RPC",
"arbitrum_mainnet": "NETWORK_42161_RPC",
Expand Down Expand Up @@ -49,7 +50,7 @@ def create_docker_operator(dag, task_id, config_file, image, command, network_en
)


def create_dag(network, rpc_var, target='dev'):
def create_dag(network, rpc_var, target="dev"):
version = f"{network}_{target}"

dag = DAG(
Expand Down Expand Up @@ -78,10 +79,10 @@ def create_dag(network, rpc_var, target='dev'):
config_file=None,
image="data-transformer",
command=f"dbt test --target {target if network != 'optimism_mainnet' else target + '-op'} --select tag:{network} --profiles-dir profiles --profile synthetix",
network_env_var=rpc_var
network_env_var=rpc_var,
)

if target == 'prod':
if target == "prod":
extract_task_id = f"extract_{version}"
config_file = f"configs/{network}.yaml"
extract_task = create_docker_operator(
Expand All @@ -101,5 +102,5 @@ def create_dag(network, rpc_var, target='dev'):


for network, rpc_var in NETWORK_RPCS.items():
for target in ['dev', 'prod']:
for target in ["dev", "prod"]:
globals()[f"v3_etl_{network}_{target}"] = create_dag(network, rpc_var, target)
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
) }}

WITH dim AS (

SELECT
generate_series(DATE_TRUNC('hour', MIN(t.ts)), DATE_TRUNC('hour', MAX(t.ts)), '1 hour' :: INTERVAL) AS ts,
p.pool_id,
Expand All @@ -22,9 +23,9 @@ WITH dim AS (
FROM
{{ ref('fct_pool_debt_eth_mainnet') }}
) AS p
GROUP BY
p.pool_id,
p.collateral_type
GROUP BY
p.pool_id,
p.collateral_type
),
issuance AS (
SELECT
Expand Down Expand Up @@ -72,22 +73,14 @@ ffill AS (
dim.ts,
dim.pool_id,
dim.collateral_type,
coalesce(
last(debt) over (
partition by dim.collateral_type, dim.pool_id
order by dim.ts
rows between unbounded preceding and current row
),
0
) as debt,
coalesce(
last(collateral_value) over (
partition by dim.collateral_type, dim.pool_id
order by dim.ts
rows between unbounded preceding and current row
),
0
) as collateral_value
COALESCE(last(debt) over (PARTITION BY dim.collateral_type, dim.pool_id
ORDER BY
dim.ts rows BETWEEN unbounded preceding
AND CURRENT ROW), 0) AS debt,
COALESCE(last(collateral_value) over (PARTITION BY dim.collateral_type, dim.pool_id
ORDER BY
dim.ts rows BETWEEN unbounded preceding
AND CURRENT ROW), 0) AS collateral_value
FROM
dim
LEFT JOIN debt
Expand Down Expand Up @@ -121,6 +114,15 @@ hourly_rewards AS (
FROM
{{ ref('fct_pool_rewards_hourly_eth_mainnet') }}
),
hourly_migration AS (
SELECT
ts,
pool_id,
collateral_type,
hourly_debt_migrated
FROM
{{ ref('fct_core_migration_hourly_eth_mainnet') }}
),
hourly_returns AS (
SELECT
pnl.ts,
Expand All @@ -132,9 +134,16 @@ hourly_returns AS (
iss.hourly_issuance,
0
) hourly_issuance,
COALESCE(
migration.hourly_debt_migrated,
0
) AS hourly_debt_migrated,
pnl.hourly_pnl + COALESCE(
iss.hourly_issuance,
0
) + COALESCE(
migration.hourly_debt_migrated,
0
) AS hourly_pnl,
COALESCE(
rewards.rewards_usd,
Expand All @@ -149,11 +158,11 @@ hourly_returns AS (
END AS hourly_rewards_pct,
CASE
WHEN pnl.collateral_value = 0 THEN 0
ELSE (COALESCE(iss.hourly_issuance, 0) + pnl.hourly_pnl) / pnl.collateral_value
ELSE (COALESCE(iss.hourly_issuance, 0) + pnl.hourly_pnl + COALESCE(migration.hourly_debt_migrated, 0)) / pnl.collateral_value
END AS hourly_pnl_pct,
CASE
WHEN pnl.collateral_value = 0 THEN 0
ELSE (COALESCE(rewards.rewards_usd, 0) + pnl.hourly_pnl + COALESCE(iss.hourly_issuance, 0)) / pnl.collateral_value
ELSE (COALESCE(rewards.rewards_usd, 0) + pnl.hourly_pnl + COALESCE(iss.hourly_issuance, 0) + COALESCE(migration.hourly_debt_migrated, 0)) / pnl.collateral_value
END AS hourly_total_pct
FROM
hourly_pnl pnl
Expand All @@ -169,8 +178,15 @@ hourly_returns AS (
) = LOWER(
iss.collateral_type
)
LEFT JOIN hourly_migration migration
ON pnl.ts = migration.ts
AND pnl.pool_id = migration.pool_id
AND LOWER(
pnl.collateral_type
) = LOWER(
migration.collateral_type
)
)

SELECT
ts,
pool_id,
Expand All @@ -179,6 +195,7 @@ SELECT
debt,
hourly_issuance,
hourly_pnl,
hourly_debt_migrated,
rewards_usd,
hourly_pnl_pct,
hourly_rewards_pct,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,36 +1,40 @@
with base as (
select
WITH base AS (
SELECT
block_number,
contract_address,
chain_id,
pool_id,
collateral_type,
cast(
value_1 as numeric
) as debt
from
CAST(
value_1 AS numeric
) AS debt
FROM
{{ source(
'raw_base_mainnet',
"core_get_vault_debt"
) }}
where
value_1 is not null
WHERE
value_1 IS NOT NULL
)

select
to_timestamp(blocks.timestamp) as ts,
cast(
blocks.block_number as integer
) as block_number,
SELECT
TO_TIMESTAMP(
blocks.timestamp
) AS ts,
CAST(
blocks.block_number AS INTEGER
) AS block_number,
base.contract_address,
cast(
base.pool_id as integer
) as pool_id,
cast(
base.collateral_type as varchar
) as collateral_type,
{{ convert_wei('base.debt') }} as debt
from
CAST(
base.pool_id AS INTEGER
) AS pool_id,
CAST(
base.collateral_type AS VARCHAR
) AS collateral_type,
{{ convert_wei('base.debt') }} AS debt
FROM
base
inner join {{ source('raw_base_mainnet', 'blocks_parquet') }} as blocks
on base.block_number = blocks.block_number
INNER JOIN {{ source(
'raw_base_mainnet',
'blocks_parquet'
) }} AS blocks
ON base.block_number = blocks.block_number
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ WITH base AS (
amount IS NOT NULL
)
SELECT
blocks.ts,
TO_TIMESTAMP(
blocks.timestamp
) AS ts,
base.block_number,
base.contract_address,
CAST(
Expand All @@ -33,5 +35,8 @@ SELECT
{{ convert_wei('base.collateral_value') }} AS collateral_value
FROM
base
JOIN {{ source('raw_eth_mainnet', 'blocks_parquet') }} AS blocks
JOIN {{ source(
'raw_eth_mainnet',
'blocks_parquet'
) }} AS blocks
ON base.block_number = blocks.block_number
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ WITH base AS (
value_1 IS NOT NULL
)
SELECT
blocks.ts,
TO_TIMESTAMP(
blocks.timestamp
) AS ts,
base.block_number,
base.contract_address,
CAST(
Expand All @@ -29,5 +31,8 @@ SELECT
{{ convert_wei('base.debt') }} AS debt
FROM
base
JOIN {{ source('raw_eth_mainnet', 'blocks_parquet') }} AS blocks
JOIN {{ source(
'raw_eth_mainnet',
'blocks_parquet'
) }} AS blocks
ON base.block_number = blocks.block_number
Loading