forked from databricks-industry-solutions/omop-cdm
-
Notifications
You must be signed in to change notification settings - Fork 0
/
00-setup.py
106 lines (83 loc) · 3.37 KB
/
00-setup.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# Databricks notebook source
# MAGIC %md
# MAGIC You may find this series of notebooks at https://github.com/databricks-industry-solutions/omop-cdm. For more information about this solution accelerator, visit https://www.databricks.com/blog/2021/07/19/unlocking-the-power-of-health-data-with-a-modern-data-lakehouse.html.
# COMMAND ----------
# DBTITLE 0,add widgets
import mlflow
project_name='omop-cdm-100K'
# COMMAND ----------
# DBTITLE 1,specify path to raw data for each project
project_data_paths = {'omop-cdm-100K':"s3://hls-eng-data-public/data/rwe/all-states-90K/","omop-cdm-10K":"s3://hls-eng-data-public/data/synthea/",'psm':"s3://hls-eng-data-public/data/rwe/dbx-covid-sim/"}
# COMMAND ----------
# DBTITLE 1,class for project setup
import mlflow
class SolAccUtil:
def __init__(self,project_name,data_path=None,base_path=None):
user=dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().apply('user')
project_name = project_name.strip().replace(' ','-')
self.settings = {}
if base_path!=None:
base_path=base_path
else:
base_path = f'/home/{user}/health-lakehouse'
if data_path != None:
data_path=data_path
else:
data_path=project_data_paths[project_name]
dbutils.fs.mkdirs(base_path)
delta_path=f'{base_path}/{project_name}/delta'
experiment_name=f'/Users/{user}/{project_name}'
if not mlflow.get_experiment_by_name(experiment_name):
experiment_id = mlflow.create_experiment(experiment_name)
experiment = mlflow.get_experiment(experiment_id)
else:
experiment = mlflow.get_experiment_by_name(experiment_name)
self.settings['base_path']=base_path
self.settings['delta_path']=delta_path
self.settings['data_path']=data_path
self.settings['experiment_name']=experiment.name
self.settings['experiment_id']=experiment.experiment_id
self.settings['artifact_location']=experiment.artifact_location
self.settings['tags']=experiment.tags
def load_remote_data(self,url,unpack=False):
import requests
fname=url.split('/')[-1]
r = requests.get(url)
print('*'*100)
print(f'downloading file {fname} to {self.data_path}')
print('*'*100)
open(f'/dbfs{self.data_path}/{fname}','wb').write(r.content)
if unpack:
print(f'unpacking file {fname} into {self.data_path}')
import tarfile
# open file
file = tarfile.open(f'/dbfs{self.data_path}/{fname}')
file.extractall(f'/dbfs{self.data_path}')
file.close()
def print_info(self):
_html='<p>'
for key,val in self.settings.items():
_html+=f'<b>{key}</b> = <i>{val}</i><br>'
_html+='</p>'
displayHTML(_html)
def display_data(self):
files=dbutils.fs.ls(f'{self.data_path}')
if len(files)==0:
print('no data available, please run load_remote_data(<url for the data>)')
else:
print('*'*100)
print(f'data available in {self.data_path} are:')
print('*'*100)
display(files)
# COMMAND ----------
# DBTITLE 1,define project settings
project_settings = SolAccUtil(project_name=project_name)
# COMMAND ----------
# DBTITLE 1,write configurations for later access
import json
with open(f'/tmp/{project_name}_configs.json','w') as f:
f.write(json.dumps(project_settings.settings,indent=4))
f.close()
# COMMAND ----------
# DBTITLE 1,display project settings
project_settings.print_info()