Skip to content

Commit

Permalink
finish get_population
Browse files Browse the repository at this point in the history
  • Loading branch information
luabida committed Apr 16, 2024
1 parent 035c9e1 commit 2b1c9f0
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 38 deletions.
39 changes: 15 additions & 24 deletions pysus/ftp/databases/ibge_datasus.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,38 +56,29 @@ def get_files(
year: Optional[Union[str, int, list]] = None,
*args, **kwargs
) -> List[File]:
sources = ["POP", "censo", "POPTCU", "projpop"]
source_dir = None

for dir in self.paths:
if (
source in ["POP", "censo", "POPTCU", "projpop"]
and source in dir.path
):
if source in sources and source in dir.path:
source_dir = dir

if not source_dir:
raise ValueError(f"Unkown source {source}")
raise ValueError(f"Unkown source {source}. Options: {sources}")

files = source_dir.content

if source in ["POPTCU", "censo", "POP"]:
if year:
if isinstance(year, (str, int)):
files = [
f for f in files if
self.describe(f)["year"] == zfill_year(year)
]
elif isinstance(year, list):
files = [
f for f in files
if str(self.describe(f)["year"])
in [str(zfill_year(y)) for y in year]
]
else:
if year:
logger.warning(
f"{source} files are not arranged in years, "
"returning all files for source"
)
if year:
if isinstance(year, (str, int)):
files = [
f for f in files if
self.describe(f)["year"] == zfill_year(year)
]
elif isinstance(year, list):
files = [
f for f in files
if str(self.describe(f)["year"])
in [str(zfill_year(y)) for y in year]
]

return files
74 changes: 66 additions & 8 deletions pysus/online_data/IBGE.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
"""
Helper functions to download official statistics from IBGE SIDRA
"""
from typing import Literal
from typing import Literal, Optional
from pathlib import Path
from zipfile import ZipFile
from tempfile import TemporaryDirectory

import ssl # Builtin
import urllib3
import requests
import pandas as pd

from pysus.data.local import ParquetSet
from pysus.ftp.databases.ibge_datasus import IBGEDATASUS

# requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS = 'ALL:@SECLEVEL=1'
Expand All @@ -16,6 +20,8 @@

APIBASE = 'https://servicodados.ibge.gov.br/api/v3/'

ibge = IBGEDATASUS().load()


def get_sidra_table(
table_id,
Expand Down Expand Up @@ -268,7 +274,7 @@ def to_dataframe(self):
"""


class CustomHttpAdapter(requests.adapters.HTTPAdapter):
class CustomHttpAdapter(requests.sessions.HTTPAdapter):
# "Transport adapter" that allows us to use custom ssl_context.

def __init__(self, ssl_context=None, **kwargs):
Expand All @@ -293,15 +299,67 @@ def get_legacy_session():


def get_population(
year,
year: int,
source: Literal["POP", "censo", "POPTCU", "projpop"] = "POPTCU",
):
censo_data: Literal["ALF", "ESCA", "ESCB", "IDOSO", "RENDA"] = "ALF"
) -> pd.DataFrame:
"""
Get population data from IBGE as shared by DATASUS
:param year: year of the data
:param source: 'POPTCU'|'POP'|'censo'|'projpop'
:param source:
"POP" - 1992-presente: Estimativas populacionais estratificadas por
idade e sexo.
"censo" - 1991, 2000 e 2010: Censos Demográficos
"POPTCU" - 1992-presente: Estimativas populacionais enviadas para o TCU,
estratificadas por idade e sexo pelo MS/SGEP/Datasus.
"projpop": Estimativas preliminares para os anos intercensitários dos
totais populacionais, estratificadas por idade e sexo pelo
MS/SGEP/Datasus.
:param censo_data:
"ALF": Censo Demográfico
"ESCA": Censo Escolar da Educação Básica
"ESCB": Censo Escolar da Educação Superior
"IDOSO": População de pessoas com 65 anos ou mais
"RENDA": População de pessoas de acordo com a renda familiar
:return: DataFrame with population data
"""
ibgedatasus = IBGEDATASUS().load()
files = ibgedatasus.get_files(year=year, source=source)
raise NotImplemented("TODO")

files = ibge.get_files(year=int(year), source=source)

if files == []:
return pd.DataFrame()

if source == "censo":
opts = ["ALF", "ESCA", "ESCB", "IDOSO", "RENDA"]
if not censo_data or censo_data not in opts:
raise ValueError(
f"Incorrect `censo_data` parameter. Options: {opts}"
)
file = [f for f in files if censo_data in f.name][0].download()
else:
file = files[0].download()

if isinstance(file, ParquetSet):
return file.to_dataframe()

file = Path(str(file))

if file.suffix.lower() == ".zip":
return _unzip_to_dataframe(str(file))
else:
raise NotImplementedError(f"Unkown file type '{file.suffix}'")


def _unzip_to_dataframe(file: str) -> pd.DataFrame:
zip_file = ZipFile(file) # pyright: ignore
with TemporaryDirectory() as tempdir:
for file in zip_file.namelist():
if file.lower().endswith(".csv"):
return pd.read_csv(zip_file.extract(file, tempdir))

if file.lower().endswith((".dbf", ".dbc")):
return ParquetSet(
zip_file.extract(file, tempdir)
).to_dataframe()

raise ValueError(f"No data found in {zip_file}")
12 changes: 6 additions & 6 deletions pysus/tests/test_ibge.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ def test_FetchData(self):

@pytest.mark.timeout(120)
def test_get_population(self):
l = IBGE.get_population(2021)
self.assertEqual(l[0].name, 'POPTBR21')
self.assertGreater(len(l), 0)
l = IBGE.get_population(2012, source='projpop')
self.assertEqual(l[0].name, 'projbr12')
self.assertGreater(len(l), 0)
l = IBGE.get_population(year=2021, source="POP")
self.assertEqual(type(l), pd.DataFrame)
self.assertEqual(len(l), 5570)
l = IBGE.get_population(year=2012, source='projpop')
self.assertEqual(type(l), pd.DataFrame)
self.assertEqual(len(l), 182)


if __name__ == '__main__':
Expand Down

0 comments on commit 2b1c9f0

Please sign in to comment.