forked from ethyca/fidesops
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcreate_test_data.py
147 lines (131 loc) · 4.29 KB
/
create_test_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
"""Script to create test data for the Admin UI"""
import string
from datetime import datetime, timedelta
from uuid import uuid4
from sqlalchemy import orm
from fidesops.core.config import config
from fidesops.db.database import init_db
from fidesops.db.session import get_db_session
from fidesops.models.client import ClientDetail
from fidesops.models.fidesops_user import FidesopsUser
from fidesops.models.policy import ActionType, Policy, Rule, RuleTarget
from fidesops.models.privacy_request import PrivacyRequest, PrivacyRequestStatus
from fidesops.models.storage import ResponseFormat, StorageConfig
from fidesops.schemas.storage.storage import FileNaming, StorageDetails, StorageType
from fidesops.util.data_category import DataCategory
def _create_policy(
db: orm.Session,
action_type: str,
client_id: str,
policy_key: str,
) -> Policy:
"""
Util method to create policies
"""
created, policy = Policy.get_or_create(
db=db,
data={
"name": policy_key,
"key": policy_key,
"client_id": client_id,
},
)
if not created:
# If the Policy is already created, don't create it again
return policy
rand = string.ascii_lowercase[:5]
data = {}
if action_type == ActionType.erasure.value:
data = {
"action_type": action_type,
"name": f"{action_type} Rule {rand}",
"policy_id": policy.id,
"masking_strategy": {
"strategy": "null_rewrite",
"configuration": {},
},
"client_id": client_id,
}
elif action_type == ActionType.access.value:
_, storage_config = StorageConfig.get_or_create(
db=db,
data={
"name": "test storage config",
"type": StorageType.s3,
"details": {
StorageDetails.NAMING.value: FileNaming.request_id.value,
StorageDetails.BUCKET.value: "test_bucket",
},
"key": f"storage_config_for_{policy_key}",
"format": ResponseFormat.json,
},
)
data = {
"action_type": action_type,
"name": f"{action_type} Rule {rand}",
"policy_id": policy.id,
"storage_destination_id": storage_config.id,
"client_id": client_id,
}
rule = Rule.create(
db=db,
data=data,
)
RuleTarget.create(
db=db,
data={
"data_category": DataCategory("user.provided.identifiable.name").value,
"rule_id": rule.id,
"client_id": client_id,
},
)
return policy
def create_test_data(db: orm.Session) -> FidesopsUser:
"""Script to create test data for the Admin UI"""
print("Seeding database with privacy requests")
_, client = ClientDetail.get_or_create(
db=db,
data={
"fides_key": "ci_create_test_data",
"hashed_secret": "autoseededdata",
"salt": "autoseededdata",
"scopes": [],
},
)
policies = []
policies.append(
_create_policy(
db=db,
action_type=ActionType.erasure.value,
client_id=client.id,
policy_key="delete",
)
)
policies.append(
_create_policy(
db=db,
action_type=ActionType.access.value,
client_id=client.id,
policy_key="download",
)
)
for policy in policies:
for status in PrivacyRequestStatus.__members__.values():
PrivacyRequest.create(
db=db,
data={
"external_id": f"ext-{uuid4()}",
"started_processing_at": datetime.utcnow(),
"requested_at": datetime.utcnow() - timedelta(days=1),
"status": status,
"origin": f"https://example.com/{status.value}/",
"policy_id": policy.id,
"client_id": policy.client_id,
},
)
print("Data seeding complete!")
if __name__ == "__main__":
init_db(config.database.SQLALCHEMY_DATABASE_URI)
session_local = get_db_session()
with session_local() as session:
create_test_data(session)