-
Notifications
You must be signed in to change notification settings - Fork 32
/
post_reform.py
91 lines (74 loc) · 2.42 KB
/
post_reform.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import requests
import time
# from all_params_reform import get_formatted_reform, None, CG_REFORM
LOCAL_BASE_URL = "http://127.0.0.1:8000/taxbrain/"
TEST_BASE_URL = "http://ospc-taxes7.herokuapp.com/taxbrain/"
PROD_BASE_URL = "http://www.ospc.org/taxbrain/"
MINI_REFORM = {u'EITC_rt_0': 0.4,
u'EITC_rt_1': 0.7,
u'EITC_rt_2': 0.8,
u'EITC_rt_3': 1.5}
DATA = {u'start_year': str(2017), u'csrfmiddlewaretoken': None,
u'has_errors': [u'False']}
def get_session(url=LOCAL_BASE_URL):
session = requests.Session()
session.get(url)
csrftoken = session.cookies['csrftoken']
return session, csrftoken
def get_data(reform=None):
"""read taxbrain styled reform"""
DATA.update(get_formatted_reform(reform=reform))
return DATA
def post_reform(session, data, files=None, url=LOCAL_BASE_URL):
response = session.post(url, data=data, files=files)
# assert response.status_code == 200
print("RESPONSE", response)
print("URL", response.url)
pk = response.url[:-1].split('/')[-1]
return session, pk, response
def post_file(session, data, reform_path, assump_path=None, url=None):
if url is None:
url = LOCAL_BASE_URL + 'file/'
assert url.endswith('file/')
files = {}
reform_file = open(reform_path, 'r')
files['docfile'] = reform_file
if assump_path is not None:
assump_file = open(assump_path, 'r')
files['assumpfile'] = assump_file
print(files)
res = post_reform(session, data, files=files, url=url)
reform_file.close()
if assump_path is not None:
assump_file.close()
return res
# def wait(session, pk):
# time.sleep(10)
#
# print ('unique_url', unique_url)
# result_response = s.get(unique_url)
#
# print(result_response)
# print(dir(result_response))
# print(result_response.text)
# print(result_response.status_code)
# print(result_response.json)
# result_json = result_response.json()
# print(result_json)
#
# while result_json['status_code'] == 202:
# result_response = s.get(unique_url)
# result_json = result_response.json()
# print(result_response)
# print(result_json)
# time.sleep(5)
#
# def read_csv(session, pk):
# # time.sleep(300)
# print(result_json)
# print('now getting csv?')
# csv_url = unique_url + 'output.csv/'
# r = s.get(csv_url)
#
# print(r)
# print(r.text)