Skip to content

Commit 2afdb40

Browse files
authored
Merge branch 'master' into staging/dump_alertario
2 parents fc9151b + fc92660 commit 2afdb40

File tree

9 files changed

+801
-2
lines changed

9 files changed

+801
-2
lines changed

Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ RUN apt-get update && \
3535
curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add - && \
3636
echo "deb [arch=amd64,arm64,armhf] https://packages.microsoft.com/debian/12/prod bookworm main" > /etc/apt/sources.list.d/mssql-release.list && \
3737
apt-get update && \
38-
ACCEPT_EULA=Y apt-get install --no-install-recommends -y msodbcsql17 openssl unixodbc-dev && \
38+
ACCEPT_EULA=Y apt-get install --no-install-recommends -y ffmpeg libsm6 libxext6 msodbcsql17 openssl unixodbc-dev && \
3939
rm -rf /var/lib/apt/lists/* && \
4040
sh -c "echo /opt/oracle/instantclient_21_5 > /etc/ld.so.conf.d/oracle-instantclient.conf" && \
4141
ldconfig

pipelines/rj_escritorio/__init__.py

+1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from pipelines.rj_escritorio.dados_mestres_dump_datario.flows import *
1010
from pipelines.rj_escritorio.data_catalog.flows import *
1111
from pipelines.rj_escritorio.dummy_predict.flows import *
12+
from pipelines.rj_escritorio.flooding_detection.flows import *
1213
from pipelines.rj_escritorio.notify_flooding.flows import *
1314
from pipelines.rj_escritorio.template_pipeline.flows import *
1415
from pipelines.rj_escritorio.tweets_flamengo.flows import *

pipelines/rj_escritorio/flooding_detection/__init__.py

Whitespace-only changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
Flow definition for flooding detection using AI.
4+
"""
5+
from prefect import Parameter
6+
from prefect.run_configs import KubernetesRun
7+
from prefect.storage import GCS
8+
from prefect.utilities.edges import unmapped
9+
10+
from pipelines.constants import constants
11+
from pipelines.rj_escritorio.flooding_detection.schedules import (
12+
update_flooding_data_schedule,
13+
)
14+
from pipelines.rj_escritorio.flooding_detection.tasks import (
15+
get_last_update,
16+
get_openai_api_key,
17+
get_prediction,
18+
get_snapshot,
19+
pick_cameras,
20+
update_flooding_api_data,
21+
)
22+
from pipelines.utils.decorators import Flow
23+
24+
with Flow(
25+
name="EMD: flooding_detection - Atualizar detecção de alagamento (IA) na API",
26+
code_owners=[
27+
"gabriel",
28+
"diego",
29+
],
30+
) as rj_escritorio__flooding_detection__flow:
31+
# Parameters
32+
cameras_geodf_url = Parameter(
33+
"cameras_geodf_url",
34+
required=True,
35+
)
36+
mocked_cameras_number = Parameter(
37+
"mocked_cameras_number",
38+
default=0,
39+
)
40+
openai_api_max_tokens = Parameter("openai_api_max_tokens", default=300)
41+
openai_api_model = Parameter("openai_api_model", default="gpt-4-vision-preview")
42+
openai_api_url = Parameter(
43+
"openai_api_url",
44+
default="https://api.openai.com/v1/chat/completions",
45+
)
46+
openai_api_key_secret_path = Parameter("openai_api_key_secret_path", required=True)
47+
openai_flooding_detection_prompt = Parameter(
48+
"openai_flooding_detection_prompt", required=True
49+
)
50+
rain_api_data_url = Parameter(
51+
"rain_api_url",
52+
default="https://api.dados.rio/v2/clima_pluviometro/precipitacao_15min/",
53+
)
54+
rain_api_update_url = Parameter(
55+
"rain_api_update_url",
56+
default="https://api.dados.rio/v2/clima_pluviometro/ultima_atualizacao_precipitacao_15min/",
57+
)
58+
redis_key_predictions_buffer = Parameter(
59+
"redis_key_predictions_buffer", default="flooding_detection_predictions_buffer"
60+
)
61+
redis_key_flooding_detection_data = Parameter(
62+
"redis_key_flooding_detection_data", default="flooding_detection_data"
63+
)
64+
redis_key_flooding_detection_last_update = Parameter(
65+
"redis_key_flooding_detection_last_update",
66+
default="flooding_detection_last_update",
67+
)
68+
69+
# Flow
70+
last_update = get_last_update(rain_api_update_url=rain_api_update_url)
71+
cameras = pick_cameras(
72+
rain_api_data_url=rain_api_data_url,
73+
cameras_data_url=cameras_geodf_url,
74+
last_update=last_update,
75+
predictions_buffer_key=redis_key_predictions_buffer,
76+
number_mock_rain_cameras=mocked_cameras_number,
77+
)
78+
openai_api_key = get_openai_api_key(secret_path=openai_api_key_secret_path)
79+
images = get_snapshot.map(
80+
camera=cameras,
81+
)
82+
predictions = get_prediction.map(
83+
image=images,
84+
flooding_prompt=unmapped(openai_flooding_detection_prompt),
85+
openai_api_key=unmapped(openai_api_key),
86+
openai_api_model=unmapped(openai_api_model),
87+
openai_api_max_tokens=unmapped(openai_api_max_tokens),
88+
openai_api_url=unmapped(openai_api_url),
89+
)
90+
update_flooding_api_data(
91+
predictions=predictions,
92+
cameras=cameras,
93+
images=images,
94+
data_key=redis_key_flooding_detection_data,
95+
last_update_key=redis_key_flooding_detection_last_update,
96+
predictions_buffer_key=redis_key_predictions_buffer,
97+
)
98+
99+
100+
rj_escritorio__flooding_detection__flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
101+
rj_escritorio__flooding_detection__flow.run_config = KubernetesRun(
102+
image=constants.DOCKER_IMAGE.value,
103+
labels=[constants.RJ_ESCRITORIO_AGENT_LABEL.value],
104+
)
105+
rj_escritorio__flooding_detection__flow.schedule = update_flooding_data_schedule
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
Schedules for the data catalog pipeline.
4+
"""
5+
6+
from datetime import timedelta, datetime
7+
8+
from prefect.schedules import Schedule
9+
from prefect.schedules.clocks import IntervalClock
10+
import pytz
11+
12+
from pipelines.constants import constants
13+
14+
update_flooding_data_schedule = Schedule(
15+
clocks=[
16+
IntervalClock(
17+
interval=timedelta(minutes=3),
18+
start_date=datetime(2023, 1, 1, tzinfo=pytz.timezone("America/Sao_Paulo")),
19+
labels=[
20+
constants.RJ_ESCRITORIO_AGENT_LABEL.value,
21+
],
22+
parameter_defaults={
23+
"cameras_geodf_url": "https://prefeitura-rio.github.io/storage/cameras_geo_min.csv",
24+
"mocked_cameras_number": 0,
25+
"openai_api_key_secret_path": "openai-api-key-flooding-detection",
26+
"openai_api_max_tokens": 300,
27+
"openai_api_model": "gpt-4-vision-preview",
28+
"openai_api_url": "https://api.openai.com/v1/chat/completions",
29+
"openai_flooding_detection_prompt": """You are an expert flooding detector. You are
30+
given a image. You must detect if there is flooding in the image. The output MUST
31+
be a JSON object with a boolean value for the key "flooding_detected". If you don't
32+
know what to anwser, you can set the key "flooding_detect" as false. Example:
33+
{
34+
"flooding_detected": true
35+
}
36+
""",
37+
"rain_api_update_url": "https://api.dados.rio/v2/clima_pluviometro/ultima_atualizacao_precipitacao_15min/", # noqa
38+
"rain_api_url": "https://api.dados.rio/v2/clima_pluviometro/precipitacao_15min/",
39+
"redis_key_flooding_detection_data": "flooding_detection_data",
40+
"redis_key_flooding_detection_last_update": "flooding_detection_last_update",
41+
"redis_key_predictions_buffer": "flooding_detection_predictions_buffer",
42+
},
43+
),
44+
]
45+
)

0 commit comments

Comments
 (0)