-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsipni_downloader_sp_old.py
124 lines (115 loc) · 5 KB
/
sipni_downloader_sp_old.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#!/usr/bin/python3
import os, sys
from datetime import date, timedelta, datetime
from lxml import html
import requests
import re
import locale
locale.setlocale(locale.LC_TIME, "pt_BR.UTF-8")
def get_file(download_address, output_file):
r = requests.get(download_address, verify=False, allow_redirects=True,
stream = True, timeout=100)
print(f"=== download size: {round(int(r.headers.get('content-length')) / (1024*1024))} M ===\n")
with open(output_file, 'wb') as f:
# 100M chunk size
chunk_size = 100 * 1024 * 1024
for chunk in r.iter_content(chunk_size=chunk_size):
f.write(chunk)
def get_UF_file(index_page_address, UF, output_file):
page = requests.get(index_page_address, verify=False, timeout=10)
print(page)
tree = html.fromstring(page.content)
print(24)
print(tree[2])
#e = tree.xpath(f'.//a[text()="Dados {UF}"]')
regexpNS = "http://exslt.org/regular-expressions"
#e = tree.xpath(f'//a[re:test(., "Dados {UF}( .*Parte [0-9])*")]', namespaces={"re": regexpNS})
e = tree.xpath(f'//a[re:test(., "Dados {UF} - Parte [0-9]")]', namespaces={"re": regexpNS})
print(28)
print(e)
print(len(e))
if len(e) == 0:
return False
if len(e) == 1:
print(33)
get_file(e[0].attrib['href'], output_file)
else:
for i, ei in enumerate(e):
print(37)
get_file(ei.attrib['href'], output_file + '.' + str(i))
print(39)
os.system(f'mv {output_file}.0 {output_file}')
print(40)
for i in range(1, len(e)):
print(43)
os.system(f'''tail -n +2 {output_file}.{i} >> {output_file} &&
rm {output_file}.{i}''')
print(46)
return True
def get_date(index_page_address):
page = requests.get(index_page_address, verify=False, timeout=10)
tree = html.fromstring(page.content)
data = tree.xpath('.//th[text()="Dados atualizados pela última vez"]')[0].getparent().getchildren()[1].text
#return datetime.strptime(data, "%d/%B/%Y")
return datetime.strptime(data, "%d de %B de %Y")
def check_for_new_file(index_page_address, last_date):
page = requests.get(index_page_address, verify=False, timeout=10)
tree = html.fromstring(page.content)
resources = tree.xpath('//li[@class="resource-item"]')
reg = re.compile(r".*SRAG (\d\d/\d\d/\d\d\d\d).*",
re.DOTALL|re.MULTILINE|re.IGNORECASE)
for item in resources:
g = reg.match(item.text_content())
if g:
data_read = datetime.strptime(g.groups()[0], "%d/%m/%Y").date()
if data_read > last_date:
address = item.xpath('.//a[@class="resource-url-analytics"]')[0].attrib['href']
return (data_read, address)
return False
if __name__ == '__main__':
index_pages_estados = [
("https://opendatasus.saude.gov.br/dataset/covid-19-vacinacao/resource/5093679f-12c3-4d6b-b7bd-07694de54173", ["AC", "AL", "AM", "AP", "BA", "CE", "DF", "ES"]),
("https://opendatasus.saude.gov.br/dataset/covid-19-vacinacao/resource/4ae86721-1bcc-47a4-a60d-75874727439b", ["GO", "MA", "MG", "MS", "MT"]),
("https://opendatasus.saude.gov.br/dataset/covid-19-vacinacao/resource/10aed154-04c8-4cf4-b78a-8f0fa1bc5af4", [ "PA", "PB", "PE", "PI", "PR", "RJ", "RN", "RO"]),
("https://opendatasus.saude.gov.br/dataset/covid-19-vacinacao/resource/a5f0bb2a-f6c2-4f28-b3da-bc79462c3774", ["RR", "RS", "SC", "SE", "SP", "TO"]) ]
output_folder = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'dados/')
data = []
for index_page, estados in index_pages_estados:
data.append(get_date(index_page))
for i in range(1, len(data)):
if data[i] != data[i-1]:
print("Datas de atualização discordam! Saindo....")
sys.exit(1)
print(f'data da última atualização: {data[0].strftime("%Y-%m-%d")}')
# if len(sys.argv) == 1:
# print("USO: sipni_downloader [UF1] [UF2] ... | [todas] -dYYYY-mm-dd")
# sys.exit(0)
print(83)
UFs = []
print(85)
data_ant = False
for arg in sys.argv[1:]:
if arg[:2] == "-d":
data_ant = datetime.strptime(arg[2:], "%Y-%m-%d")
else:
UFs.append(arg)
print(92)
#if 'todas' in UFs:
UFs = sum([ index_page[1] for index_page in index_pages_estados ], [])
print(96)
#if data_ant and data1 <= data_ant:
#print("Base não foi atualizada desde a data pedida.")
#sys.exit(2)
for UF in UFs:
print(UF)
for index_page, estados in index_pages_estados:
if UF == "SP":
print(f'=== baixando base de {UF} ===\n')
fname = f'dados_{data[0].strftime("%Y-%m-%d")}_{UF}.csv'
output_file = os.path.join(output_folder, fname)
print(output_file)
get_UF_file(index_page, UF, output_file)
print(108)
break
# else:
# print(f'\n "{UF}" não é uma UF válida\n')