diff --git a/pipelines/rj_cor/meteorologia/backfill_satelite_via_storage.ipynb b/pipelines/rj_cor/meteorologia/backfill_satelite_via_storage.ipynb new file mode 100644 index 000000000..e1eb02e14 --- /dev/null +++ b/pipelines/rj_cor/meteorologia/backfill_satelite_via_storage.ipynb @@ -0,0 +1,1233 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + ">>>>>>>>>date_save 2022\n", + ">>>>>>>>> hour_utc 0300\n", + ">>>>>>>>>julian_day 300\n" + ] + } + ], + "source": [ + "filename = \"OR_ABI-L2-TPWF-M6_G16_s20223000300207_e20223000309515_c20223000311343.nc\"\n", + "\n", + "start = filename[filename.find(\"_s\") + 2 : filename.find(\"_e\")]\n", + "# Get year\n", + "year = int(start[0:4])\n", + "# Get julian day\n", + "julian_day = int(start[4:7])\n", + "\n", + "# Time (UTC) as string\n", + "hour_utc = (start[7:11])\n", + "\n", + "print(f\">>>>>>>>>date_save {year}\")\n", + "print(f\">>>>>>>>> hour_utc {hour_utc}\")\n", + "print(f\">>>>>>>>>julian_day {julian_day}\")" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [ + { + "ename": "ValueError", + "evalue": "hour must be in 0..23", + "output_type": "error", + "traceback": [ + "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", + "\u001b[0;31mValueError\u001b[0m Traceback (most recent call last)", + "\u001b[1;32m/home/patricia/Documentos/escritorio_dados/prefeitura-rio/pipelines/pipelines/rj_cor/meteorologia/backfill_satelite_via_storage.ipynb Cell 1\u001b[0m in \u001b[0;36m\u001b[0;34m()\u001b[0m\n\u001b[1;32m 1\u001b[0m \u001b[39mimport\u001b[39;00m \u001b[39mpendulum\u001b[39;00m\n\u001b[1;32m 2\u001b[0m datetime_save \u001b[39m=\u001b[39m \u001b[39m\"\u001b[39m\u001b[39m20221027 300\u001b[39m\u001b[39m\"\u001b[39m\n\u001b[0;32m----> 3\u001b[0m pendulum\u001b[39m.\u001b[39;49mfrom_format(datetime_save, \u001b[39m\"\u001b[39;49m\u001b[39mYYYYMMDD HHmm\u001b[39;49m\u001b[39m\"\u001b[39;49m)\n", + "File \u001b[0;32m~/Documentos/escritorio_dados/venv/lib/python3.9/site-packages/pendulum/__init__.py:263\u001b[0m, in \u001b[0;36mfrom_format\u001b[0;34m(string, fmt, tz, locale)\u001b[0m\n\u001b[1;32m 260\u001b[0m \u001b[39mif\u001b[39;00m parts[\u001b[39m\"\u001b[39m\u001b[39mtz\u001b[39m\u001b[39m\"\u001b[39m] \u001b[39mis\u001b[39;00m \u001b[39mNone\u001b[39;00m:\n\u001b[1;32m 261\u001b[0m parts[\u001b[39m\"\u001b[39m\u001b[39mtz\u001b[39m\u001b[39m\"\u001b[39m] \u001b[39m=\u001b[39m tz\n\u001b[0;32m--> 263\u001b[0m \u001b[39mreturn\u001b[39;00m datetime(\u001b[39m*\u001b[39;49m\u001b[39m*\u001b[39;49mparts)\n", + "File \u001b[0;32m~/Documentos/escritorio_dados/venv/lib/python3.9/site-packages/pendulum/__init__.py:116\u001b[0m, in \u001b[0;36mdatetime\u001b[0;34m(year, month, day, hour, minute, second, microsecond, tz, dst_rule)\u001b[0m\n\u001b[1;32m 114\u001b[0m dt \u001b[39m=\u001b[39m naive(year, month, day, hour, minute, second, microsecond)\n\u001b[1;32m 115\u001b[0m \u001b[39melse\u001b[39;00m:\n\u001b[0;32m--> 116\u001b[0m dt \u001b[39m=\u001b[39m _datetime\u001b[39m.\u001b[39;49mdatetime(year, month, day, hour, minute, second, microsecond)\n\u001b[1;32m 117\u001b[0m \u001b[39mif\u001b[39;00m tz \u001b[39mis\u001b[39;00m \u001b[39mnot\u001b[39;00m \u001b[39mNone\u001b[39;00m:\n\u001b[1;32m 118\u001b[0m dt \u001b[39m=\u001b[39m tz\u001b[39m.\u001b[39mconvert(dt, dst_rule\u001b[39m=\u001b[39mdst_rule)\n", + "\u001b[0;31mValueError\u001b[0m: hour must be in 0..23" + ] + } + ], + "source": [ + "import pendulum\n", + "datetime_save = \"20221027 300\"\n", + "pendulum.from_format(datetime_save, \"YYYYMMDD HHmm\")" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "# https://github.com/prefeitura-rio/pipelines/blob/master/pipelines/utils/utils.py#L369\n", + "import basedosdados as bd\n", + "import numpy as np\n", + "import pandas as pd\n", + "import pendulum\n", + "import s3fs\n", + "\n", + "\n", + "from satelite.constants import (\n", + " constants as satelite_constants,\n", + ") #pipelines.rj_cor.meteorologia." + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "def get_path_files_s3(variabel):\n", + " s3_fs = s3fs.S3FileSystem(anon=True)\n", + "\n", + " # Get all files of GOES-16 data (multiband format) at this hour\n", + " path_files = s3_fs.find(f\"noaa-goes16/ABI-L2-{variabel}/\")\n", + " return path_files\n", + "\n", + "variavel_rr = satelite_constants.VARIAVEL_RR.value\n", + "variavel_tpw = satelite_constants.VARIAVEL_TPW.value\n", + "variavel_cmip = satelite_constants.VARIAVEL_cmip.value\n", + "# cmip_files = get_all_files_s3(variavel=variavel_rr)\n", + "# cmip_files" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "RRQPEF\n", + "TPWF\n", + "CMIPF\n" + ] + }, + { + "ename": "EndpointConnectionError", + "evalue": "Could not connect to the endpoint URL: \"https://noaa-goes16.s3.amazonaws.com/?list-type=2&prefix=ABI-L2-CMIPF%2F&delimiter=&continuation-token=1C32vNDW3YKleSwOEY6pMb6AdRaTOeDflzk3oEuqx9UQmxGoJBK0IjXgpr6FVJLBymddxEV1GLxiD4mOSq7ETyfLUChUlCCQyiFHF5SH49a7l3KOotsF05Nk5IBV5XZnTdslDA3f9B0RP4z0%2Fn4zAxSFBcTai19LMxaLSHx3QLmc%3D&encoding-type=url\"", + "output_type": "error", + "traceback": [ + "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", + "\u001b[0;31mgaierror\u001b[0m Traceback (most recent call last)", + "File \u001b[0;32m~/Documentos/escritorio_dados/venv/lib/python3.9/site-packages/aiohttp/connector.py:1154\u001b[0m, in \u001b[0;36mTCPConnector._create_direct_connection\u001b[0;34m(self, req, traces, timeout, client_error)\u001b[0m\n\u001b[1;32m 1150\u001b[0m \u001b[39mtry\u001b[39;00m:\n\u001b[1;32m 1151\u001b[0m \u001b[39m# Cancelling this lookup should not cancel the underlying lookup\u001b[39;00m\n\u001b[1;32m 1152\u001b[0m \u001b[39m# or else the cancel event will get broadcast to all the waiters\u001b[39;00m\n\u001b[1;32m 1153\u001b[0m \u001b[39m# across all connections.\u001b[39;00m\n\u001b[0;32m-> 1154\u001b[0m hosts \u001b[39m=\u001b[39m \u001b[39mawait\u001b[39;00m asyncio\u001b[39m.\u001b[39mshield(host_resolved)\n\u001b[1;32m 1155\u001b[0m \u001b[39mexcept\u001b[39;00m asyncio\u001b[39m.\u001b[39mCancelledError:\n", + "File \u001b[0;32m~/Documentos/escritorio_dados/venv/lib/python3.9/site-packages/aiohttp/connector.py:880\u001b[0m, in \u001b[0;36mTCPConnector._resolve_host\u001b[0;34m(self, host, port, traces)\u001b[0m\n\u001b[1;32m 878\u001b[0m \u001b[39mawait\u001b[39;00m trace\u001b[39m.\u001b[39msend_dns_resolvehost_start(host)\n\u001b[0;32m--> 880\u001b[0m addrs \u001b[39m=\u001b[39m \u001b[39mawait\u001b[39;00m \u001b[39mself\u001b[39m\u001b[39m.\u001b[39m_resolver\u001b[39m.\u001b[39mresolve(host, port, family\u001b[39m=\u001b[39m\u001b[39mself\u001b[39m\u001b[39m.\u001b[39m_family)\n\u001b[1;32m 881\u001b[0m \u001b[39mif\u001b[39;00m traces:\n", + "File \u001b[0;32m~/Documentos/escritorio_dados/venv/lib/python3.9/site-packages/aiohttp/resolver.py:33\u001b[0m, in \u001b[0;36mThreadedResolver.resolve\u001b[0;34m(self, hostname, port, family)\u001b[0m\n\u001b[1;32m 30\u001b[0m \u001b[39masync\u001b[39;00m \u001b[39mdef\u001b[39;00m \u001b[39mresolve\u001b[39m(\n\u001b[1;32m 31\u001b[0m \u001b[39mself\u001b[39m, hostname: \u001b[39mstr\u001b[39m, port: \u001b[39mint\u001b[39m \u001b[39m=\u001b[39m \u001b[39m0\u001b[39m, family: \u001b[39mint\u001b[39m \u001b[39m=\u001b[39m socket\u001b[39m.\u001b[39mAF_INET\n\u001b[1;32m 32\u001b[0m ) \u001b[39m-\u001b[39m\u001b[39m>\u001b[39m List[Dict[\u001b[39mstr\u001b[39m, Any]]:\n\u001b[0;32m---> 33\u001b[0m infos \u001b[39m=\u001b[39m \u001b[39mawait\u001b[39;00m \u001b[39mself\u001b[39m\u001b[39m.\u001b[39m_loop\u001b[39m.\u001b[39mgetaddrinfo(\n\u001b[1;32m 34\u001b[0m hostname,\n\u001b[1;32m 35\u001b[0m port,\n\u001b[1;32m 36\u001b[0m \u001b[39mtype\u001b[39m\u001b[39m=\u001b[39msocket\u001b[39m.\u001b[39mSOCK_STREAM,\n\u001b[1;32m 37\u001b[0m family\u001b[39m=\u001b[39mfamily,\n\u001b[1;32m 38\u001b[0m flags\u001b[39m=\u001b[39msocket\u001b[39m.\u001b[39mAI_ADDRCONFIG,\n\u001b[1;32m 39\u001b[0m )\n\u001b[1;32m 41\u001b[0m hosts \u001b[39m=\u001b[39m []\n", + "File \u001b[0;32m/usr/lib/python3.9/asyncio/base_events.py:856\u001b[0m, in \u001b[0;36mBaseEventLoop.getaddrinfo\u001b[0;34m(self, host, port, family, type, proto, flags)\u001b[0m\n\u001b[1;32m 854\u001b[0m getaddr_func \u001b[39m=\u001b[39m socket\u001b[39m.\u001b[39mgetaddrinfo\n\u001b[0;32m--> 856\u001b[0m \u001b[39mreturn\u001b[39;00m \u001b[39mawait\u001b[39;00m \u001b[39mself\u001b[39m\u001b[39m.\u001b[39mrun_in_executor(\n\u001b[1;32m 857\u001b[0m \u001b[39mNone\u001b[39;00m, getaddr_func, host, port, family, \u001b[39mtype\u001b[39m, proto, flags)\n", + "File \u001b[0;32m/usr/lib/python3.9/concurrent/futures/thread.py:52\u001b[0m, in \u001b[0;36m_WorkItem.run\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 51\u001b[0m \u001b[39mtry\u001b[39;00m:\n\u001b[0;32m---> 52\u001b[0m result \u001b[39m=\u001b[39m \u001b[39mself\u001b[39;49m\u001b[39m.\u001b[39;49mfn(\u001b[39m*\u001b[39;49m\u001b[39mself\u001b[39;49m\u001b[39m.\u001b[39;49margs, \u001b[39m*\u001b[39;49m\u001b[39m*\u001b[39;49m\u001b[39mself\u001b[39;49m\u001b[39m.\u001b[39;49mkwargs)\n\u001b[1;32m 53\u001b[0m \u001b[39mexcept\u001b[39;00m \u001b[39mBaseException\u001b[39;00m \u001b[39mas\u001b[39;00m exc:\n", + "File \u001b[0;32m/usr/lib/python3.9/socket.py:953\u001b[0m, in \u001b[0;36mgetaddrinfo\u001b[0;34m(host, port, family, type, proto, flags)\u001b[0m\n\u001b[1;32m 952\u001b[0m addrlist \u001b[39m=\u001b[39m []\n\u001b[0;32m--> 953\u001b[0m \u001b[39mfor\u001b[39;00m res \u001b[39min\u001b[39;00m _socket\u001b[39m.\u001b[39;49mgetaddrinfo(host, port, family, \u001b[39mtype\u001b[39;49m, proto, flags):\n\u001b[1;32m 954\u001b[0m af, socktype, proto, canonname, sa \u001b[39m=\u001b[39m res\n", + "\u001b[0;31mgaierror\u001b[0m: [Errno -3] Temporary failure in name resolution", + "\nThe above exception was the direct cause of the following exception:\n", + "\u001b[0;31mClientConnectorError\u001b[0m Traceback (most recent call last)", + "File \u001b[0;32m~/Documentos/escritorio_dados/venv/lib/python3.9/site-packages/aiobotocore/httpsession.py:172\u001b[0m, in \u001b[0;36mAIOHTTPSession.send\u001b[0;34m(self, request)\u001b[0m\n\u001b[1;32m 171\u001b[0m url \u001b[39m=\u001b[39m URL(url, encoded\u001b[39m=\u001b[39m\u001b[39mTrue\u001b[39;00m)\n\u001b[0;32m--> 172\u001b[0m resp \u001b[39m=\u001b[39m \u001b[39mawait\u001b[39;00m \u001b[39mself\u001b[39m\u001b[39m.\u001b[39m_session\u001b[39m.\u001b[39mrequest(\n\u001b[1;32m 173\u001b[0m request\u001b[39m.\u001b[39mmethod, url\u001b[39m=\u001b[39murl, headers\u001b[39m=\u001b[39mheaders_, data\u001b[39m=\u001b[39mdata, proxy\u001b[39m=\u001b[39mproxy_url,\n\u001b[1;32m 174\u001b[0m proxy_headers\u001b[39m=\u001b[39mproxy_headers\n\u001b[1;32m 175\u001b[0m )\n\u001b[1;32m 177\u001b[0m \u001b[39mif\u001b[39;00m \u001b[39mnot\u001b[39;00m request\u001b[39m.\u001b[39mstream_output:\n\u001b[1;32m 178\u001b[0m \u001b[39m# Cause the raw stream to be exhausted immediately. We do it\u001b[39;00m\n\u001b[1;32m 179\u001b[0m \u001b[39m# this way instead of using preload_content because\u001b[39;00m\n\u001b[1;32m 180\u001b[0m \u001b[39m# preload_content will never buffer chunked responses\u001b[39;00m\n", + "File \u001b[0;32m~/Documentos/escritorio_dados/venv/lib/python3.9/site-packages/aiohttp/client.py:535\u001b[0m, in \u001b[0;36mClientSession._request\u001b[0;34m(self, method, str_or_url, params, data, json, cookies, headers, skip_auto_headers, auth, allow_redirects, max_redirects, compress, chunked, expect100, raise_for_status, read_until_eof, proxy, proxy_auth, timeout, verify_ssl, fingerprint, ssl_context, ssl, proxy_headers, trace_request_ctx, read_bufsize)\u001b[0m\n\u001b[1;32m 534\u001b[0m \u001b[39massert\u001b[39;00m \u001b[39mself\u001b[39m\u001b[39m.\u001b[39m_connector \u001b[39mis\u001b[39;00m \u001b[39mnot\u001b[39;00m \u001b[39mNone\u001b[39;00m\n\u001b[0;32m--> 535\u001b[0m conn \u001b[39m=\u001b[39m \u001b[39mawait\u001b[39;00m \u001b[39mself\u001b[39m\u001b[39m.\u001b[39m_connector\u001b[39m.\u001b[39mconnect(\n\u001b[1;32m 536\u001b[0m req, traces\u001b[39m=\u001b[39mtraces, timeout\u001b[39m=\u001b[39mreal_timeout\n\u001b[1;32m 537\u001b[0m )\n\u001b[1;32m 538\u001b[0m \u001b[39mexcept\u001b[39;00m asyncio\u001b[39m.\u001b[39mTimeoutError \u001b[39mas\u001b[39;00m exc:\n", + "File \u001b[0;32m~/Documentos/escritorio_dados/venv/lib/python3.9/site-packages/aiohttp/connector.py:542\u001b[0m, in \u001b[0;36mBaseConnector.connect\u001b[0;34m(self, req, traces, timeout)\u001b[0m\n\u001b[1;32m 541\u001b[0m \u001b[39mtry\u001b[39;00m:\n\u001b[0;32m--> 542\u001b[0m proto \u001b[39m=\u001b[39m \u001b[39mawait\u001b[39;00m \u001b[39mself\u001b[39m\u001b[39m.\u001b[39m_create_connection(req, traces, timeout)\n\u001b[1;32m 543\u001b[0m \u001b[39mif\u001b[39;00m \u001b[39mself\u001b[39m\u001b[39m.\u001b[39m_closed:\n", + "File \u001b[0;32m~/Documentos/escritorio_dados/venv/lib/python3.9/site-packages/aiohttp/connector.py:907\u001b[0m, in \u001b[0;36mTCPConnector._create_connection\u001b[0;34m(self, req, traces, timeout)\u001b[0m\n\u001b[1;32m 906\u001b[0m \u001b[39melse\u001b[39;00m:\n\u001b[0;32m--> 907\u001b[0m _, proto \u001b[39m=\u001b[39m \u001b[39mawait\u001b[39;00m \u001b[39mself\u001b[39m\u001b[39m.\u001b[39m_create_direct_connection(req, traces, timeout)\n\u001b[1;32m 909\u001b[0m \u001b[39mreturn\u001b[39;00m proto\n", + "File \u001b[0;32m~/Documentos/escritorio_dados/venv/lib/python3.9/site-packages/aiohttp/connector.py:1166\u001b[0m, in \u001b[0;36mTCPConnector._create_direct_connection\u001b[0;34m(self, req, traces, timeout, client_error)\u001b[0m\n\u001b[1;32m 1163\u001b[0m \u001b[39mexcept\u001b[39;00m \u001b[39mOSError\u001b[39;00m \u001b[39mas\u001b[39;00m exc:\n\u001b[1;32m 1164\u001b[0m \u001b[39m# in case of proxy it is not ClientProxyConnectionError\u001b[39;00m\n\u001b[1;32m 1165\u001b[0m \u001b[39m# it is problem of resolving proxy ip itself\u001b[39;00m\n\u001b[0;32m-> 1166\u001b[0m \u001b[39mraise\u001b[39;00m ClientConnectorError(req\u001b[39m.\u001b[39mconnection_key, exc) \u001b[39mfrom\u001b[39;00m \u001b[39mexc\u001b[39;00m\n\u001b[1;32m 1168\u001b[0m last_exc \u001b[39m=\u001b[39m \u001b[39mNone\u001b[39;00m \u001b[39m# type: Optional[Exception]\u001b[39;00m\n", + "\u001b[0;31mClientConnectorError\u001b[0m: Cannot connect to host noaa-goes16.s3.amazonaws.com:443 ssl:default [Temporary failure in name resolution]", + "\nDuring handling of the above exception, another exception occurred:\n", + "\u001b[0;31mEndpointConnectionError\u001b[0m Traceback (most recent call last)", + "\u001b[1;32m/home/patricia/Documentos/escritorio_dados/prefeitura-rio/pipelines/pipelines/rj_cor/meteorologia/backfill_satelite_via_storage.ipynb Cell 3\u001b[0m in \u001b[0;36m\u001b[0;34m()\u001b[0m\n\u001b[1;32m 3\u001b[0m \u001b[39mfor\u001b[39;00m var \u001b[39min\u001b[39;00m [variavel_rr, variavel_tpw, variavel_cmip]:\n\u001b[1;32m 4\u001b[0m \u001b[39mprint\u001b[39m(var)\n\u001b[0;32m----> 5\u001b[0m all_files \u001b[39m=\u001b[39m all_files \u001b[39m+\u001b[39m get_path_files_s3(var)\n\u001b[1;32m 7\u001b[0m \u001b[39m# keep only the start scan datetime\u001b[39;00m\n\u001b[1;32m 8\u001b[0m \u001b[39mprint\u001b[39m(\u001b[39mlen\u001b[39m(all_files))\n", + "\u001b[1;32m/home/patricia/Documentos/escritorio_dados/prefeitura-rio/pipelines/pipelines/rj_cor/meteorologia/backfill_satelite_via_storage.ipynb Cell 3\u001b[0m in \u001b[0;36mget_path_files_s3\u001b[0;34m(variabel)\u001b[0m\n\u001b[1;32m 2\u001b[0m s3_fs \u001b[39m=\u001b[39m s3fs\u001b[39m.\u001b[39mS3FileSystem(anon\u001b[39m=\u001b[39m\u001b[39mTrue\u001b[39;00m)\n\u001b[1;32m 4\u001b[0m \u001b[39m# Get all files of GOES-16 data (multiband format) at this hour\u001b[39;00m\n\u001b[0;32m----> 5\u001b[0m path_files \u001b[39m=\u001b[39m s3_fs\u001b[39m.\u001b[39;49mfind(\u001b[39mf\u001b[39;49m\u001b[39m\"\u001b[39;49m\u001b[39mnoaa-goes16/ABI-L2-\u001b[39;49m\u001b[39m{\u001b[39;49;00mvariabel\u001b[39m}\u001b[39;49;00m\u001b[39m/\u001b[39;49m\u001b[39m\"\u001b[39;49m)\n\u001b[1;32m 6\u001b[0m \u001b[39mreturn\u001b[39;00m path_files\n", + "File \u001b[0;32m~/Documentos/escritorio_dados/venv/lib/python3.9/site-packages/fsspec/asyn.py:85\u001b[0m, in \u001b[0;36msync_wrapper..wrapper\u001b[0;34m(*args, **kwargs)\u001b[0m\n\u001b[1;32m 82\u001b[0m \u001b[39m@functools\u001b[39m\u001b[39m.\u001b[39mwraps(func)\n\u001b[1;32m 83\u001b[0m \u001b[39mdef\u001b[39;00m \u001b[39mwrapper\u001b[39m(\u001b[39m*\u001b[39margs, \u001b[39m*\u001b[39m\u001b[39m*\u001b[39mkwargs):\n\u001b[1;32m 84\u001b[0m \u001b[39mself\u001b[39m \u001b[39m=\u001b[39m obj \u001b[39mor\u001b[39;00m args[\u001b[39m0\u001b[39m]\n\u001b[0;32m---> 85\u001b[0m \u001b[39mreturn\u001b[39;00m sync(\u001b[39mself\u001b[39;49m\u001b[39m.\u001b[39;49mloop, func, \u001b[39m*\u001b[39;49margs, \u001b[39m*\u001b[39;49m\u001b[39m*\u001b[39;49mkwargs)\n", + "File \u001b[0;32m~/Documentos/escritorio_dados/venv/lib/python3.9/site-packages/fsspec/asyn.py:65\u001b[0m, in \u001b[0;36msync\u001b[0;34m(loop, func, timeout, *args, **kwargs)\u001b[0m\n\u001b[1;32m 63\u001b[0m \u001b[39mraise\u001b[39;00m FSTimeoutError \u001b[39mfrom\u001b[39;00m \u001b[39mreturn_result\u001b[39;00m\n\u001b[1;32m 64\u001b[0m \u001b[39melif\u001b[39;00m \u001b[39misinstance\u001b[39m(return_result, \u001b[39mBaseException\u001b[39;00m):\n\u001b[0;32m---> 65\u001b[0m \u001b[39mraise\u001b[39;00m return_result\n\u001b[1;32m 66\u001b[0m \u001b[39melse\u001b[39;00m:\n\u001b[1;32m 67\u001b[0m \u001b[39mreturn\u001b[39;00m return_result\n", + "File \u001b[0;32m~/Documentos/escritorio_dados/venv/lib/python3.9/site-packages/fsspec/asyn.py:25\u001b[0m, in \u001b[0;36m_runner\u001b[0;34m(event, coro, result, timeout)\u001b[0m\n\u001b[1;32m 23\u001b[0m coro \u001b[39m=\u001b[39m asyncio\u001b[39m.\u001b[39mwait_for(coro, timeout\u001b[39m=\u001b[39mtimeout)\n\u001b[1;32m 24\u001b[0m \u001b[39mtry\u001b[39;00m:\n\u001b[0;32m---> 25\u001b[0m result[\u001b[39m0\u001b[39m] \u001b[39m=\u001b[39m \u001b[39mawait\u001b[39;00m coro\n\u001b[1;32m 26\u001b[0m \u001b[39mexcept\u001b[39;00m \u001b[39mException\u001b[39;00m \u001b[39mas\u001b[39;00m ex:\n\u001b[1;32m 27\u001b[0m result[\u001b[39m0\u001b[39m] \u001b[39m=\u001b[39m ex\n", + "File \u001b[0;32m~/Documentos/escritorio_dados/venv/lib/python3.9/site-packages/s3fs/core.py:686\u001b[0m, in \u001b[0;36mS3FileSystem._find\u001b[0;34m(self, path, maxdepth, withdirs, detail, prefix)\u001b[0m\n\u001b[1;32m 674\u001b[0m \u001b[39mreturn\u001b[39;00m \u001b[39mawait\u001b[39;00m \u001b[39msuper\u001b[39m()\u001b[39m.\u001b[39m_find(\n\u001b[1;32m 675\u001b[0m bucket \u001b[39m+\u001b[39m \u001b[39m\"\u001b[39m\u001b[39m/\u001b[39m\u001b[39m\"\u001b[39m \u001b[39m+\u001b[39m key, maxdepth\u001b[39m=\u001b[39mmaxdepth, withdirs\u001b[39m=\u001b[39mwithdirs, detail\u001b[39m=\u001b[39mdetail\n\u001b[1;32m 676\u001b[0m )\n\u001b[1;32m 677\u001b[0m \u001b[39m# TODO: implement find from dircache, if all listings are present\u001b[39;00m\n\u001b[1;32m 678\u001b[0m \u001b[39m# if refresh is False:\u001b[39;00m\n\u001b[1;32m 679\u001b[0m \u001b[39m# out = incomplete_tree_dirs(self.dircache, path)\u001b[39;00m\n\u001b[0;32m (...)\u001b[0m\n\u001b[1;32m 684\u001b[0m \u001b[39m# return super().find(path)\u001b[39;00m\n\u001b[1;32m 685\u001b[0m \u001b[39m# # else: we refresh anyway, having at least two missing trees\u001b[39;00m\n\u001b[0;32m--> 686\u001b[0m out \u001b[39m=\u001b[39m \u001b[39mawait\u001b[39;00m \u001b[39mself\u001b[39m\u001b[39m.\u001b[39m_lsdir(path, delimiter\u001b[39m=\u001b[39m\u001b[39m\"\u001b[39m\u001b[39m\"\u001b[39m, prefix\u001b[39m=\u001b[39mprefix)\n\u001b[1;32m 687\u001b[0m \u001b[39mif\u001b[39;00m \u001b[39mnot\u001b[39;00m out \u001b[39mand\u001b[39;00m key:\n\u001b[1;32m 688\u001b[0m \u001b[39mtry\u001b[39;00m:\n", + "File \u001b[0;32m~/Documentos/escritorio_dados/venv/lib/python3.9/site-packages/s3fs/core.py:614\u001b[0m, in \u001b[0;36mS3FileSystem._lsdir\u001b[0;34m(self, path, refresh, max_items, delimiter, prefix)\u001b[0m\n\u001b[1;32m 612\u001b[0m files \u001b[39m=\u001b[39m []\n\u001b[1;32m 613\u001b[0m dircache \u001b[39m=\u001b[39m []\n\u001b[0;32m--> 614\u001b[0m \u001b[39masync\u001b[39;00m \u001b[39mfor\u001b[39;00m i \u001b[39min\u001b[39;00m it:\n\u001b[1;32m 615\u001b[0m dircache\u001b[39m.\u001b[39mextend(i\u001b[39m.\u001b[39mget(\u001b[39m\"\u001b[39m\u001b[39mCommonPrefixes\u001b[39m\u001b[39m\"\u001b[39m, []))\n\u001b[1;32m 616\u001b[0m \u001b[39mfor\u001b[39;00m c \u001b[39min\u001b[39;00m i\u001b[39m.\u001b[39mget(\u001b[39m\"\u001b[39m\u001b[39mContents\u001b[39m\u001b[39m\"\u001b[39m, []):\n", + "File \u001b[0;32m~/Documentos/escritorio_dados/venv/lib/python3.9/site-packages/aiobotocore/paginate.py:32\u001b[0m, in \u001b[0;36mAioPageIterator.__anext__\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 29\u001b[0m \u001b[39mself\u001b[39m\u001b[39m.\u001b[39m_inject_starting_params(current_kwargs)\n\u001b[1;32m 31\u001b[0m \u001b[39mwhile\u001b[39;00m \u001b[39mTrue\u001b[39;00m:\n\u001b[0;32m---> 32\u001b[0m response \u001b[39m=\u001b[39m \u001b[39mawait\u001b[39;00m \u001b[39mself\u001b[39m\u001b[39m.\u001b[39m_make_request(current_kwargs)\n\u001b[1;32m 33\u001b[0m parsed \u001b[39m=\u001b[39m \u001b[39mself\u001b[39m\u001b[39m.\u001b[39m_extract_parsed_response(response)\n\u001b[1;32m 34\u001b[0m \u001b[39mif\u001b[39;00m first_request:\n\u001b[1;32m 35\u001b[0m \u001b[39m# The first request is handled differently. We could\u001b[39;00m\n\u001b[1;32m 36\u001b[0m \u001b[39m# possibly have a resume/starting token that tells us where\u001b[39;00m\n\u001b[1;32m 37\u001b[0m \u001b[39m# to index into the retrieved page.\u001b[39;00m\n", + "File \u001b[0;32m~/Documentos/escritorio_dados/venv/lib/python3.9/site-packages/aiobotocore/client.py:211\u001b[0m, in \u001b[0;36mAioBaseClient._make_api_call\u001b[0;34m(self, operation_name, api_params)\u001b[0m\n\u001b[1;32m 209\u001b[0m http, parsed_response \u001b[39m=\u001b[39m event_response\n\u001b[1;32m 210\u001b[0m \u001b[39melse\u001b[39;00m:\n\u001b[0;32m--> 211\u001b[0m http, parsed_response \u001b[39m=\u001b[39m \u001b[39mawait\u001b[39;00m \u001b[39mself\u001b[39m\u001b[39m.\u001b[39m_make_request(\n\u001b[1;32m 212\u001b[0m operation_model, request_dict, request_context)\n\u001b[1;32m 214\u001b[0m \u001b[39mawait\u001b[39;00m \u001b[39mself\u001b[39m\u001b[39m.\u001b[39mmeta\u001b[39m.\u001b[39mevents\u001b[39m.\u001b[39memit(\n\u001b[1;32m 215\u001b[0m \u001b[39m'\u001b[39m\u001b[39mafter-call.\u001b[39m\u001b[39m{service_id}\u001b[39;00m\u001b[39m.\u001b[39m\u001b[39m{operation_name}\u001b[39;00m\u001b[39m'\u001b[39m\u001b[39m.\u001b[39mformat(\n\u001b[1;32m 216\u001b[0m service_id\u001b[39m=\u001b[39mservice_id,\n\u001b[0;32m (...)\u001b[0m\n\u001b[1;32m 219\u001b[0m model\u001b[39m=\u001b[39moperation_model, context\u001b[39m=\u001b[39mrequest_context\n\u001b[1;32m 220\u001b[0m )\n\u001b[1;32m 222\u001b[0m \u001b[39mif\u001b[39;00m http\u001b[39m.\u001b[39mstatus_code \u001b[39m>\u001b[39m\u001b[39m=\u001b[39m \u001b[39m300\u001b[39m:\n", + "File \u001b[0;32m~/Documentos/escritorio_dados/venv/lib/python3.9/site-packages/aiobotocore/client.py:231\u001b[0m, in \u001b[0;36mAioBaseClient._make_request\u001b[0;34m(self, operation_model, request_dict, request_context)\u001b[0m\n\u001b[1;32m 229\u001b[0m \u001b[39masync\u001b[39;00m \u001b[39mdef\u001b[39;00m \u001b[39m_make_request\u001b[39m(\u001b[39mself\u001b[39m, operation_model, request_dict, request_context):\n\u001b[1;32m 230\u001b[0m \u001b[39mtry\u001b[39;00m:\n\u001b[0;32m--> 231\u001b[0m \u001b[39mreturn\u001b[39;00m \u001b[39mawait\u001b[39;00m \u001b[39mself\u001b[39m\u001b[39m.\u001b[39m_endpoint\u001b[39m.\u001b[39mmake_request(operation_model, request_dict)\n\u001b[1;32m 232\u001b[0m \u001b[39mexcept\u001b[39;00m \u001b[39mException\u001b[39;00m \u001b[39mas\u001b[39;00m e:\n\u001b[1;32m 233\u001b[0m \u001b[39mawait\u001b[39;00m \u001b[39mself\u001b[39m\u001b[39m.\u001b[39mmeta\u001b[39m.\u001b[39mevents\u001b[39m.\u001b[39memit(\n\u001b[1;32m 234\u001b[0m \u001b[39m'\u001b[39m\u001b[39mafter-call-error.\u001b[39m\u001b[39m{service_id}\u001b[39;00m\u001b[39m.\u001b[39m\u001b[39m{operation_name}\u001b[39;00m\u001b[39m'\u001b[39m\u001b[39m.\u001b[39mformat(\n\u001b[1;32m 235\u001b[0m service_id\u001b[39m=\u001b[39m\u001b[39mself\u001b[39m\u001b[39m.\u001b[39m_service_model\u001b[39m.\u001b[39mservice_id\u001b[39m.\u001b[39mhyphenize(),\n\u001b[1;32m 236\u001b[0m operation_name\u001b[39m=\u001b[39moperation_model\u001b[39m.\u001b[39mname),\n\u001b[1;32m 237\u001b[0m exception\u001b[39m=\u001b[39me, context\u001b[39m=\u001b[39mrequest_context\n\u001b[1;32m 238\u001b[0m )\n", + "File \u001b[0;32m~/Documentos/escritorio_dados/venv/lib/python3.9/site-packages/aiobotocore/endpoint.py:81\u001b[0m, in \u001b[0;36mAioEndpoint._send_request\u001b[0;34m(self, request_dict, operation_model)\u001b[0m\n\u001b[1;32m 78\u001b[0m context \u001b[39m=\u001b[39m request_dict[\u001b[39m'\u001b[39m\u001b[39mcontext\u001b[39m\u001b[39m'\u001b[39m]\n\u001b[1;32m 79\u001b[0m success_response, exception \u001b[39m=\u001b[39m \u001b[39mawait\u001b[39;00m \u001b[39mself\u001b[39m\u001b[39m.\u001b[39m_get_response(\n\u001b[1;32m 80\u001b[0m request, operation_model, context)\n\u001b[0;32m---> 81\u001b[0m \u001b[39mwhile\u001b[39;00m \u001b[39mawait\u001b[39;00m \u001b[39mself\u001b[39m\u001b[39m.\u001b[39m_needs_retry(attempts, operation_model,\n\u001b[1;32m 82\u001b[0m request_dict, success_response,\n\u001b[1;32m 83\u001b[0m exception):\n\u001b[1;32m 84\u001b[0m attempts \u001b[39m+\u001b[39m\u001b[39m=\u001b[39m \u001b[39m1\u001b[39m\n\u001b[1;32m 85\u001b[0m \u001b[39m# If there is a stream associated with the request, we need\u001b[39;00m\n\u001b[1;32m 86\u001b[0m \u001b[39m# to reset it before attempting to send the request again.\u001b[39;00m\n\u001b[1;32m 87\u001b[0m \u001b[39m# This will ensure that we resend the entire contents of the\u001b[39;00m\n\u001b[1;32m 88\u001b[0m \u001b[39m# body.\u001b[39;00m\n", + "File \u001b[0;32m~/Documentos/escritorio_dados/venv/lib/python3.9/site-packages/aiobotocore/endpoint.py:213\u001b[0m, in \u001b[0;36mAioEndpoint._needs_retry\u001b[0;34m(self, attempts, operation_model, request_dict, response, caught_exception)\u001b[0m\n\u001b[1;32m 209\u001b[0m service_id \u001b[39m=\u001b[39m operation_model\u001b[39m.\u001b[39mservice_model\u001b[39m.\u001b[39mservice_id\u001b[39m.\u001b[39mhyphenize()\n\u001b[1;32m 210\u001b[0m event_name \u001b[39m=\u001b[39m \u001b[39m'\u001b[39m\u001b[39mneeds-retry.\u001b[39m\u001b[39m%s\u001b[39;00m\u001b[39m.\u001b[39m\u001b[39m%s\u001b[39;00m\u001b[39m'\u001b[39m \u001b[39m%\u001b[39m (\n\u001b[1;32m 211\u001b[0m service_id,\n\u001b[1;32m 212\u001b[0m operation_model\u001b[39m.\u001b[39mname)\n\u001b[0;32m--> 213\u001b[0m responses \u001b[39m=\u001b[39m \u001b[39mawait\u001b[39;00m \u001b[39mself\u001b[39m\u001b[39m.\u001b[39m_event_emitter\u001b[39m.\u001b[39memit(\n\u001b[1;32m 214\u001b[0m event_name, response\u001b[39m=\u001b[39mresponse, endpoint\u001b[39m=\u001b[39m\u001b[39mself\u001b[39m,\n\u001b[1;32m 215\u001b[0m operation\u001b[39m=\u001b[39moperation_model, attempts\u001b[39m=\u001b[39mattempts,\n\u001b[1;32m 216\u001b[0m caught_exception\u001b[39m=\u001b[39mcaught_exception, request_dict\u001b[39m=\u001b[39mrequest_dict)\n\u001b[1;32m 217\u001b[0m handler_response \u001b[39m=\u001b[39m first_non_none_response(responses)\n\u001b[1;32m 218\u001b[0m \u001b[39mif\u001b[39;00m handler_response \u001b[39mis\u001b[39;00m \u001b[39mNone\u001b[39;00m:\n", + "File \u001b[0;32m~/Documentos/escritorio_dados/venv/lib/python3.9/site-packages/aiobotocore/hooks.py:29\u001b[0m, in \u001b[0;36mAioHierarchicalEmitter._emit\u001b[0;34m(self, event_name, kwargs, stop_on_response)\u001b[0m\n\u001b[1;32m 27\u001b[0m response \u001b[39m=\u001b[39m \u001b[39mawait\u001b[39;00m handler(\u001b[39m*\u001b[39m\u001b[39m*\u001b[39mkwargs)\n\u001b[1;32m 28\u001b[0m \u001b[39melse\u001b[39;00m:\n\u001b[0;32m---> 29\u001b[0m response \u001b[39m=\u001b[39m handler(\u001b[39m*\u001b[39;49m\u001b[39m*\u001b[39;49mkwargs)\n\u001b[1;32m 31\u001b[0m responses\u001b[39m.\u001b[39mappend((handler, response))\n\u001b[1;32m 32\u001b[0m \u001b[39mif\u001b[39;00m stop_on_response \u001b[39mand\u001b[39;00m response \u001b[39mis\u001b[39;00m \u001b[39mnot\u001b[39;00m \u001b[39mNone\u001b[39;00m:\n", + "File \u001b[0;32m~/Documentos/escritorio_dados/venv/lib/python3.9/site-packages/botocore/retryhandler.py:183\u001b[0m, in \u001b[0;36mRetryHandler.__call__\u001b[0;34m(self, attempts, response, caught_exception, **kwargs)\u001b[0m\n\u001b[1;32m 176\u001b[0m \u001b[39mdef\u001b[39;00m \u001b[39m__call__\u001b[39m(\u001b[39mself\u001b[39m, attempts, response, caught_exception, \u001b[39m*\u001b[39m\u001b[39m*\u001b[39mkwargs):\n\u001b[1;32m 177\u001b[0m \u001b[39m\"\"\"Handler for a retry.\u001b[39;00m\n\u001b[1;32m 178\u001b[0m \n\u001b[1;32m 179\u001b[0m \u001b[39m Intended to be hooked up to an event handler (hence the **kwargs),\u001b[39;00m\n\u001b[1;32m 180\u001b[0m \u001b[39m this will process retries appropriately.\u001b[39;00m\n\u001b[1;32m 181\u001b[0m \n\u001b[1;32m 182\u001b[0m \u001b[39m \"\"\"\u001b[39;00m\n\u001b[0;32m--> 183\u001b[0m \u001b[39mif\u001b[39;00m \u001b[39mself\u001b[39;49m\u001b[39m.\u001b[39;49m_checker(attempts, response, caught_exception):\n\u001b[1;32m 184\u001b[0m result \u001b[39m=\u001b[39m \u001b[39mself\u001b[39m\u001b[39m.\u001b[39m_action(attempts\u001b[39m=\u001b[39mattempts)\n\u001b[1;32m 185\u001b[0m logger\u001b[39m.\u001b[39mdebug(\u001b[39m\"\u001b[39m\u001b[39mRetry needed, action of: \u001b[39m\u001b[39m%s\u001b[39;00m\u001b[39m\"\u001b[39m, result)\n", + "File \u001b[0;32m~/Documentos/escritorio_dados/venv/lib/python3.9/site-packages/botocore/retryhandler.py:250\u001b[0m, in \u001b[0;36mMaxAttemptsDecorator.__call__\u001b[0;34m(self, attempt_number, response, caught_exception)\u001b[0m\n\u001b[1;32m 249\u001b[0m \u001b[39mdef\u001b[39;00m \u001b[39m__call__\u001b[39m(\u001b[39mself\u001b[39m, attempt_number, response, caught_exception):\n\u001b[0;32m--> 250\u001b[0m should_retry \u001b[39m=\u001b[39m \u001b[39mself\u001b[39;49m\u001b[39m.\u001b[39;49m_should_retry(attempt_number, response,\n\u001b[1;32m 251\u001b[0m caught_exception)\n\u001b[1;32m 252\u001b[0m \u001b[39mif\u001b[39;00m should_retry:\n\u001b[1;32m 253\u001b[0m \u001b[39mif\u001b[39;00m attempt_number \u001b[39m>\u001b[39m\u001b[39m=\u001b[39m \u001b[39mself\u001b[39m\u001b[39m.\u001b[39m_max_attempts:\n\u001b[1;32m 254\u001b[0m \u001b[39m# explicitly set MaxAttemptsReached\u001b[39;00m\n", + "File \u001b[0;32m~/Documentos/escritorio_dados/venv/lib/python3.9/site-packages/botocore/retryhandler.py:277\u001b[0m, in \u001b[0;36mMaxAttemptsDecorator._should_retry\u001b[0;34m(self, attempt_number, response, caught_exception)\u001b[0m\n\u001b[1;32m 273\u001b[0m \u001b[39mreturn\u001b[39;00m \u001b[39mTrue\u001b[39;00m\n\u001b[1;32m 274\u001b[0m \u001b[39melse\u001b[39;00m:\n\u001b[1;32m 275\u001b[0m \u001b[39m# If we've exceeded the max attempts we just let the exception\u001b[39;00m\n\u001b[1;32m 276\u001b[0m \u001b[39m# propogate if one has occurred.\u001b[39;00m\n\u001b[0;32m--> 277\u001b[0m \u001b[39mreturn\u001b[39;00m \u001b[39mself\u001b[39;49m\u001b[39m.\u001b[39;49m_checker(attempt_number, response, caught_exception)\n", + "File \u001b[0;32m~/Documentos/escritorio_dados/venv/lib/python3.9/site-packages/botocore/retryhandler.py:316\u001b[0m, in \u001b[0;36mMultiChecker.__call__\u001b[0;34m(self, attempt_number, response, caught_exception)\u001b[0m\n\u001b[1;32m 314\u001b[0m \u001b[39mdef\u001b[39;00m \u001b[39m__call__\u001b[39m(\u001b[39mself\u001b[39m, attempt_number, response, caught_exception):\n\u001b[1;32m 315\u001b[0m \u001b[39mfor\u001b[39;00m checker \u001b[39min\u001b[39;00m \u001b[39mself\u001b[39m\u001b[39m.\u001b[39m_checkers:\n\u001b[0;32m--> 316\u001b[0m checker_response \u001b[39m=\u001b[39m checker(attempt_number, response,\n\u001b[1;32m 317\u001b[0m caught_exception)\n\u001b[1;32m 318\u001b[0m \u001b[39mif\u001b[39;00m checker_response:\n\u001b[1;32m 319\u001b[0m \u001b[39mreturn\u001b[39;00m checker_response\n", + "File \u001b[0;32m~/Documentos/escritorio_dados/venv/lib/python3.9/site-packages/botocore/retryhandler.py:222\u001b[0m, in \u001b[0;36mBaseChecker.__call__\u001b[0;34m(self, attempt_number, response, caught_exception)\u001b[0m\n\u001b[1;32m 220\u001b[0m \u001b[39mreturn\u001b[39;00m \u001b[39mself\u001b[39m\u001b[39m.\u001b[39m_check_response(attempt_number, response)\n\u001b[1;32m 221\u001b[0m \u001b[39melif\u001b[39;00m caught_exception \u001b[39mis\u001b[39;00m \u001b[39mnot\u001b[39;00m \u001b[39mNone\u001b[39;00m:\n\u001b[0;32m--> 222\u001b[0m \u001b[39mreturn\u001b[39;00m \u001b[39mself\u001b[39;49m\u001b[39m.\u001b[39;49m_check_caught_exception(\n\u001b[1;32m 223\u001b[0m attempt_number, caught_exception)\n\u001b[1;32m 224\u001b[0m \u001b[39melse\u001b[39;00m:\n\u001b[1;32m 225\u001b[0m \u001b[39mraise\u001b[39;00m \u001b[39mValueError\u001b[39;00m(\u001b[39m\"\u001b[39m\u001b[39mBoth response and caught_exception are None.\u001b[39m\u001b[39m\"\u001b[39m)\n", + "File \u001b[0;32m~/Documentos/escritorio_dados/venv/lib/python3.9/site-packages/botocore/retryhandler.py:359\u001b[0m, in \u001b[0;36mExceptionRaiser._check_caught_exception\u001b[0;34m(self, attempt_number, caught_exception)\u001b[0m\n\u001b[1;32m 351\u001b[0m \u001b[39mdef\u001b[39;00m \u001b[39m_check_caught_exception\u001b[39m(\u001b[39mself\u001b[39m, attempt_number, caught_exception):\n\u001b[1;32m 352\u001b[0m \u001b[39m# This is implementation specific, but this class is useful by\u001b[39;00m\n\u001b[1;32m 353\u001b[0m \u001b[39m# coordinating with the MaxAttemptsDecorator.\u001b[39;00m\n\u001b[0;32m (...)\u001b[0m\n\u001b[1;32m 357\u001b[0m \u001b[39m# the MaxAttemptsDecorator is not interested in retrying the exception\u001b[39;00m\n\u001b[1;32m 358\u001b[0m \u001b[39m# then this exception just propogates out past the retry code.\u001b[39;00m\n\u001b[0;32m--> 359\u001b[0m \u001b[39mraise\u001b[39;00m caught_exception\n", + "File \u001b[0;32m~/Documentos/escritorio_dados/venv/lib/python3.9/site-packages/aiobotocore/endpoint.py:147\u001b[0m, in \u001b[0;36mAioEndpoint._do_get_response\u001b[0;34m(self, request, operation_model)\u001b[0m\n\u001b[1;32m 145\u001b[0m http_response \u001b[39m=\u001b[39m first_non_none_response(responses)\n\u001b[1;32m 146\u001b[0m \u001b[39mif\u001b[39;00m http_response \u001b[39mis\u001b[39;00m \u001b[39mNone\u001b[39;00m:\n\u001b[0;32m--> 147\u001b[0m http_response \u001b[39m=\u001b[39m \u001b[39mawait\u001b[39;00m \u001b[39mself\u001b[39m\u001b[39m.\u001b[39m_send(request)\n\u001b[1;32m 148\u001b[0m \u001b[39mexcept\u001b[39;00m aiohttp\u001b[39m.\u001b[39mClientConnectionError \u001b[39mas\u001b[39;00m e:\n\u001b[1;32m 149\u001b[0m e\u001b[39m.\u001b[39mrequest \u001b[39m=\u001b[39m request \u001b[39m# botocore expects the request property\u001b[39;00m\n", + "File \u001b[0;32m~/Documentos/escritorio_dados/venv/lib/python3.9/site-packages/aiobotocore/endpoint.py:229\u001b[0m, in \u001b[0;36mAioEndpoint._send\u001b[0;34m(self, request)\u001b[0m\n\u001b[1;32m 228\u001b[0m \u001b[39masync\u001b[39;00m \u001b[39mdef\u001b[39;00m \u001b[39m_send\u001b[39m(\u001b[39mself\u001b[39m, request):\n\u001b[0;32m--> 229\u001b[0m \u001b[39mreturn\u001b[39;00m \u001b[39mawait\u001b[39;00m \u001b[39mself\u001b[39m\u001b[39m.\u001b[39mhttp_session\u001b[39m.\u001b[39msend(request)\n", + "File \u001b[0;32m~/Documentos/escritorio_dados/venv/lib/python3.9/site-packages/aiobotocore/httpsession.py:193\u001b[0m, in \u001b[0;36mAIOHTTPSession.send\u001b[0;34m(self, request)\u001b[0m\n\u001b[1;32m 190\u001b[0m \u001b[39mif\u001b[39;00m DEPRECATED_1_4_0_APIS:\n\u001b[1;32m 191\u001b[0m \u001b[39mraise\u001b[39;00m\n\u001b[0;32m--> 193\u001b[0m \u001b[39mraise\u001b[39;00m EndpointConnectionError(endpoint_url\u001b[39m=\u001b[39mrequest\u001b[39m.\u001b[39murl, error\u001b[39m=\u001b[39me)\n\u001b[1;32m 194\u001b[0m \u001b[39mexcept\u001b[39;00m (ClientProxyConnectionError, ClientHttpProxyError) \u001b[39mas\u001b[39;00m e:\n\u001b[1;32m 195\u001b[0m \u001b[39mif\u001b[39;00m DEPRECATED_1_4_0_APIS:\n", + "\u001b[0;31mEndpointConnectionError\u001b[0m: Could not connect to the endpoint URL: \"https://noaa-goes16.s3.amazonaws.com/?list-type=2&prefix=ABI-L2-CMIPF%2F&delimiter=&continuation-token=1C32vNDW3YKleSwOEY6pMb6AdRaTOeDflzk3oEuqx9UQmxGoJBK0IjXgpr6FVJLBymddxEV1GLxiD4mOSq7ETyfLUChUlCCQyiFHF5SH49a7l3KOotsF05Nk5IBV5XZnTdslDA3f9B0RP4z0%2Fn4zAxSFBcTai19LMxaLSHx3QLmc%3D&encoding-type=url\"" + ] + } + ], + "source": [ + "# get_all_path_files():\n", + "all_files = []\n", + "for var in [variavel_rr, variavel_tpw, variavel_cmip]:\n", + " print(var)\n", + " all_files = all_files + get_path_files_s3(var)\n", + "\n", + "# keep only the start scan datetime\n", + "print(len(all_files))\n", + "ref_dates = list(set([file[file.find(\"_s\") + 1: file.find(\"_e\")] for file in all_files]))\n", + "print(len(ref_dates))" + ] + }, + { + "cell_type": "code", + "execution_count": 24, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "['noaa-goes16/ABI-L2-RRQPEF/2019',\n", + " 'noaa-goes16/ABI-L2-RRQPEF/2020',\n", + " 'noaa-goes16/ABI-L2-RRQPEF/2021',\n", + " 'noaa-goes16/ABI-L2-RRQPEF/2022',\n", + " 'noaa-goes16/ABI-L2-TPWF/2019',\n", + " 'noaa-goes16/ABI-L2-TPWF/2020',\n", + " 'noaa-goes16/ABI-L2-TPWF/2021',\n", + " 'noaa-goes16/ABI-L2-TPWF/2022',\n", + " 'noaa-goes16/ABI-L2-CMIPF/2000',\n", + " 'noaa-goes16/ABI-L2-CMIPF/2017',\n", + " 'noaa-goes16/ABI-L2-CMIPF/2018',\n", + " 'noaa-goes16/ABI-L2-CMIPF/2019',\n", + " 'noaa-goes16/ABI-L2-CMIPF/2020',\n", + " 'noaa-goes16/ABI-L2-CMIPF/2021',\n", + " 'noaa-goes16/ABI-L2-CMIPF/2022']" + ] + }, + "execution_count": 24, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "all_files" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "https://stackoverflow.com/questions/45429556/how-list-amazon-s3-bucket-contents-by-modified-date" + ] + }, + { + "cell_type": "code", + "execution_count": 51, + "metadata": {}, + "outputs": [], + "source": [ + "\n", + "\n", + "\n", + "def get_storage_blobs(dataset_id: str, table_id: str) -> list:\n", + " \"\"\"\n", + " Get all blobs from a table in a dataset.\n", + " \"\"\"\n", + "\n", + " storage = bd.Storage(dataset_id=dataset_id, table_id=table_id)\n", + "\n", + " return list(\n", + " storage.client[\"storage_staging\"]\n", + " .bucket(storage.bucket_name)\n", + " .list_blobs(prefix=f\"staging/{storage.dataset_id}/{storage.table_id}/\")\n", + " )\n", + "\n", + "def get_storage_blobs_adaptado(bucket_name: str, dataset_id: str, table_id: str) -> list:\n", + " \"\"\"\n", + " Get all blobs from a table in a dataset.\n", + " \"\"\"\n", + "\n", + " storage = bd.Storage(dataset_id=dataset_id, table_id=table_id)\n", + "\n", + " return list(\n", + " storage.client[\"storage_staging\"]\n", + " .bucket(bucket_name)\n", + " .list_blobs(prefix=f\"staging/{dataset_id}/{table_id}/\")\n", + " )\n", + "\n", + "\n", + "def parser_blobs_to_partition_dict(blobs: list) -> dict:\n", + " \"\"\"\n", + " Extracts the partition information from the blobs.\n", + " \"\"\"\n", + "\n", + " partitions_dict = {}\n", + " partitions_list = []\n", + "\n", + " for blob in blobs:\n", + " for folder in blob.name.split(\"/\"):\n", + " if \"=\" in folder:\n", + " key = folder.split(\"=\")[0]\n", + " value = folder.split(\"=\")[1]\n", + " try:\n", + " partitions_dict[key].append(value)\n", + " except KeyError:\n", + " partitions_dict[key] = [value]\n", + " return partitions_dict, partitions_list\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": 52, + "metadata": {}, + "outputs": [], + "source": [ + "def obter_datas_particoes(list_storage_blobs):\n", + " # Para o caso de dados de satélite que temos partição de ano, mes, dia e hora\n", + " dates_blob = []\n", + " for blob in list_storage_blobs:\n", + " for folder in blob.name.split(\"/\"):\n", + " #print(folder)\n", + " if \"=\" in folder:\n", + " key = folder.split(\"=\")[0]\n", + " value = folder.split(\"=\")[1]\n", + " if key=='ano_particao':\n", + " ano = value\n", + " elif key=='mes_particao':\n", + " mes = value.zfill(2)\n", + " elif key=='data_particao':\n", + " dia = value[-2:].zfill(2)\n", + " elif key=='hora':\n", + " hora = value.zfill(2)\n", + " data = ano + '-' + mes + '-' + dia + ' ' + hora + ':00:00'\n", + " dates_blob.append(data)\n", + "\n", + " return dates_blob\n", + "\n", + "\n", + "\n", + "def datas_para_mover_datas_faltantes(bucket_name, bucket_name_antigo, table_id, table_id_antigo, dataset_id, dates_range):\n", + " # seleciona tanto as datas faltantes pq deu algum problema na hora de mover de um bucket pro outro\n", + " # quanto as datas faltantes pq deu problema no backfill\n", + "\n", + " list_storage_blobs_antigo = get_storage_blobs_adaptado(bucket_name=bucket_name_antigo, dataset_id=dataset_id, table_id=table_id_antigo)\n", + " list_storage_blobs = get_storage_blobs_adaptado(bucket_name=bucket_name, dataset_id=dataset_id, table_id=table_id)\n", + "\n", + " bucket_antigo = obter_datas_particoes(list_storage_blobs_antigo)\n", + " bucket_novo = obter_datas_particoes(list_storage_blobs)\n", + "\n", + " print(len(bucket_antigo), len(bucket_novo))\n", + "\n", + " # pegar todas as datas do bucket antigo que não estão no novo para movermos\n", + " mover = [i for i in bucket_antigo if i not in bucket_novo]\n", + " print('quantos mover', len(mover))\n", + "\n", + " # datas faltantes devido ao backfill\n", + " missing_dates = [i for i in dates_range if i not in bucket_novo]\n", + " print('quantas datas faltantes', len(missing_dates))\n", + "\n", + " return mover, missing_dates\n", + "\n", + "\n", + "\n", + "def datas_faltantes(bucket_name, table_id, dataset_id, dates_range):\n", + " # seleciona tanto as datas faltantes pq deu problema no backfill\n", + "\n", + " list_storage_blobs = get_storage_blobs_adaptado(bucket_name=bucket_name, dataset_id=dataset_id, table_id=table_id)\n", + "\n", + " bucket_novo = obter_datas_particoes(list_storage_blobs)\n", + "\n", + " print(len(bucket_novo))\n", + "\n", + " # datas faltantes devido ao backfill\n", + " missing_dates = [i for i in dates_range if i not in bucket_novo]\n", + " print('quantas datas faltantes', len(missing_dates))\n", + "\n", + " return bucket_novo, missing_dates" + ] + }, + { + "cell_type": "code", + "execution_count": 54, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "17665" + ] + }, + "execution_count": 54, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# range de datas que deveriam estar preenchidas\n", + "dates_range = list(pd.date_range(start='1/1/2020', end='1/6/2022', freq='H').astype(str))\n", + "len(dates_range)" + ] + }, + { + "cell_type": "code", + "execution_count": 55, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "19054\n", + "quantas datas faltantes 1560\n" + ] + } + ], + "source": [ + "# bucket_name_antigo = 'rj-escritorio-dev'\n", + "# table_id_antigo = 'satelite_quantidade_agua_precipitavel'\n", + "bucket_name = 'rj-cor'\n", + "table_id = 'quantidade_agua_precipitavel_satelite'\n", + "dataset_id = 'meio_ambiente_clima'\n", + "\n", + "# mover_qt, missing_dates_qt = datas_para_mover_datas_faltantes(bucket_name, bucket_name_antigo, table_id, table_id_antigo, dataset_id, dates_range)\n", + "bucket_qt, missing_dates_qt = datas_faltantes(bucket_name, table_id, dataset_id, dates_range)" + ] + }, + { + "cell_type": "code", + "execution_count": 27, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "['2020-12-31 22:00:00',\n", + " '2020-12-31 23:00:00',\n", + " '2021-01-01 00:00:00',\n", + " '2021-01-01 01:00:00',\n", + " '2021-01-01 10:00:00',\n", + " '2021-01-01 11:00:00',\n", + " '2021-01-01 12:00:00',\n", + " '2021-01-01 13:00:00',\n", + " '2021-01-01 14:00:00',\n", + " '2021-01-01 15:00:00']" + ] + }, + "execution_count": 27, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "bucket_qt[:10]" + ] + }, + { + "cell_type": "code", + "execution_count": 56, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "['2020-01-01 00:00:00',\n", + " '2020-01-01 01:00:00',\n", + " '2020-01-01 02:00:00',\n", + " '2020-01-01 03:00:00',\n", + " '2020-01-01 04:00:00']" + ] + }, + "execution_count": 56, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "dates_range[:5]" + ] + }, + { + "cell_type": "code", + "execution_count": 57, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "21057\n", + "quantas datas faltantes 110\n" + ] + } + ], + "source": [ + "# bucket_name_antigo = 'rj-escritorio-dev'\n", + "# table_id_antigo = 'satelite_taxa_precipitacao'\n", + "bucket_name = 'rj-cor'\n", + "table_id = 'taxa_precipitacao_satelite'\n", + "dataset_id = 'meio_ambiente_clima'\n", + "\n", + "# mover_tx, missing_dates_tx = datas_para_mover_datas_faltantes(bucket_name, bucket_name_antigo, table_id, table_id_antigo, dataset_id, dates_range)\n", + "bucket_tx, missing_dates_tx = datas_faltantes(bucket_name, table_id, dataset_id, dates_range)" + ] + }, + { + "cell_type": "code", + "execution_count": 58, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "108" + ] + }, + "execution_count": 58, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# seleciona quais datas faltantes são comuns a ambos\n", + "missing_dates_comum = [i for i in missing_dates_tx if i in missing_dates_qt]\n", + "len(missing_dates_comum)" + ] + }, + { + "cell_type": "code", + "execution_count": 59, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "2\n", + "1452\n" + ] + } + ], + "source": [ + "# datas faltantes que não são comuns\n", + "missing_dates_tx_so = [i for i in missing_dates_tx if i not in missing_dates_comum]\n", + "print(len(missing_dates_tx_so))\n", + "\n", + "missing_dates_qt_so = [i for i in missing_dates_qt if i not in missing_dates_comum]\n", + "print(len(missing_dates_qt_so))" + ] + }, + { + "cell_type": "code", + "execution_count": 60, + "metadata": {}, + "outputs": [], + "source": [ + "# como as datas vieram das partições elas estão no timezone br, precisamos converter pro tz UTC para rodar no backfill\n", + "def converte_tz(data):\n", + " return pendulum.from_format(data, 'YYYY-MM-DD HH:mm:SS', tz='America/Sao_Paulo').in_tz('UTC').to_datetime_string()\n" + ] + }, + { + "cell_type": "code", + "execution_count": 61, + "metadata": {}, + "outputs": [], + "source": [ + "missing_dates_qt_so_tz = [converte_tz(i) for i in missing_dates_qt_so]\n", + "missing_dates_tx_so_tz = [converte_tz(i) for i in missing_dates_tx_so]\n", + "missing_dates_comum_tz = [converte_tz(i) for i in missing_dates_comum]\n", + "\n", + "# print(missing_dates_tx_so_tz)\n", + "# print(missing_dates_qt_so_tz)" + ] + }, + { + "cell_type": "code", + "execution_count": 62, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "108" + ] + }, + "execution_count": 62, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "len(missing_dates_comum_tz)" + ] + }, + { + "cell_type": "code", + "execution_count": 67, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "['2020-06-03 17:00:00',\n", + " '2020-06-03 18:00:00',\n", + " '2020-06-03 19:00:00',\n", + " '2020-06-03 20:00:00',\n", + " '2020-06-03 21:00:00',\n", + " '2020-06-03 22:00:00',\n", + " '2020-06-03 23:00:00',\n", + " '2020-06-04 00:00:00',\n", + " '2020-07-26 21:00:00',\n", + " '2020-07-26 22:00:00',\n", + " '2020-07-26 23:00:00',\n", + " '2020-07-27 00:00:00',\n", + " '2020-07-27 01:00:00',\n", + " '2020-07-27 02:00:00',\n", + " '2020-08-11 16:00:00',\n", + " '2020-08-11 17:00:00',\n", + " '2020-08-11 18:00:00',\n", + " '2021-01-21 20:00:00',\n", + " '2021-01-21 21:00:00',\n", + " '2021-03-16 15:00:00',\n", + " '2021-03-23 14:00:00',\n", + " '2021-03-23 15:00:00',\n", + " '2021-03-23 16:00:00',\n", + " '2021-03-23 17:00:00',\n", + " '2021-03-23 18:00:00',\n", + " '2021-03-23 19:00:00',\n", + " '2021-03-23 20:00:00',\n", + " '2021-04-29 21:00:00',\n", + " '2021-04-29 22:00:00',\n", + " '2021-06-15 12:00:00',\n", + " '2021-06-29 12:00:00',\n", + " '2021-06-29 13:00:00',\n", + " '2021-06-29 14:00:00',\n", + " '2021-06-29 15:00:00',\n", + " '2021-06-29 16:00:00',\n", + " '2021-06-29 17:00:00',\n", + " '2021-06-29 18:00:00',\n", + " '2021-07-21 17:00:00',\n", + " '2021-09-18 03:00:00',\n", + " '2021-09-18 04:00:00',\n", + " '2021-09-18 05:00:00',\n", + " '2021-09-18 06:00:00',\n", + " '2021-09-18 07:00:00',\n", + " '2021-09-18 08:00:00',\n", + " '2021-09-18 09:00:00',\n", + " '2021-09-18 10:00:00',\n", + " '2021-09-18 11:00:00',\n", + " '2021-09-18 12:00:00',\n", + " '2021-09-18 13:00:00',\n", + " '2021-09-18 14:00:00',\n", + " '2021-09-18 15:00:00',\n", + " '2021-09-18 16:00:00',\n", + " '2021-09-18 17:00:00',\n", + " '2021-09-18 18:00:00',\n", + " '2021-09-18 19:00:00',\n", + " '2021-10-02 03:00:00',\n", + " '2021-10-02 04:00:00',\n", + " '2021-10-02 05:00:00',\n", + " '2021-10-02 06:00:00',\n", + " '2021-10-02 07:00:00',\n", + " '2021-10-02 08:00:00',\n", + " '2021-10-02 09:00:00',\n", + " '2021-10-02 10:00:00',\n", + " '2021-10-16 03:00:00',\n", + " '2021-10-16 04:00:00',\n", + " '2021-10-16 05:00:00',\n", + " '2021-10-16 06:00:00',\n", + " '2021-10-16 07:00:00',\n", + " '2021-10-16 08:00:00',\n", + " '2021-10-16 09:00:00',\n", + " '2021-10-30 03:00:00',\n", + " '2021-10-30 04:00:00',\n", + " '2021-10-30 05:00:00',\n", + " '2021-10-30 06:00:00',\n", + " '2021-10-30 07:00:00',\n", + " '2021-10-30 08:00:00',\n", + " '2021-10-30 09:00:00',\n", + " '2021-11-13 03:00:00',\n", + " '2021-11-13 04:00:00',\n", + " '2021-11-13 05:00:00',\n", + " '2021-11-13 06:00:00',\n", + " '2021-11-13 07:00:00',\n", + " '2021-11-13 08:00:00',\n", + " '2021-11-13 09:00:00',\n", + " '2021-12-11 03:00:00',\n", + " '2021-12-11 04:00:00',\n", + " '2021-12-11 05:00:00',\n", + " '2021-12-11 06:00:00',\n", + " '2021-12-11 07:00:00',\n", + " '2021-12-11 08:00:00',\n", + " '2021-12-11 09:00:00',\n", + " '2021-12-11 10:00:00',\n", + " '2021-12-11 11:00:00',\n", + " '2021-12-11 12:00:00',\n", + " '2021-12-11 13:00:00',\n", + " '2021-12-11 14:00:00',\n", + " '2021-12-11 15:00:00',\n", + " '2021-12-11 16:00:00',\n", + " '2021-12-11 17:00:00',\n", + " '2021-12-11 18:00:00',\n", + " '2021-12-11 19:00:00',\n", + " '2021-12-25 04:00:00',\n", + " '2021-12-25 05:00:00',\n", + " '2021-12-25 06:00:00',\n", + " '2021-12-25 07:00:00',\n", + " '2021-12-25 08:00:00',\n", + " '2021-12-25 09:00:00',\n", + " '2021-12-25 10:00:00']" + ] + }, + "execution_count": 67, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "missing_dates_comum_tz" + ] + }, + { + "cell_type": "code", + "execution_count": 66, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "['2020-09-11 10:00:00',\n", + " '2020-09-11 16:00:00',\n", + " '2020-09-11 17:00:00',\n", + " '2020-09-12 00:00:00',\n", + " '2020-09-12 10:00:00',\n", + " '2020-09-12 16:00:00',\n", + " '2020-09-12 17:00:00',\n", + " '2020-09-12 18:00:00',\n", + " '2020-09-13 06:00:00',\n", + " '2020-09-13 14:00:00',\n", + " '2020-09-13 19:00:00',\n", + " '2020-09-14 13:00:00',\n", + " '2020-09-14 17:00:00',\n", + " '2020-09-14 18:00:00',\n", + " '2020-09-14 19:00:00',\n", + " '2020-09-15 01:00:00',\n", + " '2020-09-15 07:00:00',\n", + " '2020-09-15 09:00:00',\n", + " '2020-09-15 13:00:00',\n", + " '2020-09-15 14:00:00',\n", + " '2020-09-15 15:00:00',\n", + " '2020-09-15 20:00:00',\n", + " '2020-09-16 09:00:00',\n", + " '2020-09-16 11:00:00',\n", + " '2020-09-16 16:00:00',\n", + " '2020-09-16 21:00:00',\n", + " '2020-09-17 09:00:00',\n", + " '2020-09-17 13:00:00',\n", + " '2020-09-17 14:00:00',\n", + " '2020-09-17 21:00:00',\n", + " '2020-09-18 02:00:00',\n", + " '2020-09-18 11:00:00',\n", + " '2020-09-18 20:00:00',\n", + " '2020-09-18 21:00:00',\n", + " '2020-09-19 11:00:00',\n", + " '2020-09-19 15:00:00',\n", + " '2020-09-20 02:00:00',\n", + " '2020-09-20 07:00:00',\n", + " '2020-09-20 08:00:00',\n", + " '2020-09-20 13:00:00',\n", + " '2020-09-20 17:00:00',\n", + " '2020-09-20 22:00:00',\n", + " '2020-09-21 05:00:00',\n", + " '2020-09-21 14:00:00',\n", + " '2020-09-21 21:00:00',\n", + " '2020-09-22 07:00:00',\n", + " '2020-09-22 08:00:00',\n", + " '2020-09-22 09:00:00',\n", + " '2020-09-23 05:00:00',\n", + " '2020-09-23 12:00:00',\n", + " '2020-09-23 13:00:00',\n", + " '2020-09-23 20:00:00',\n", + " '2020-09-24 01:00:00',\n", + " '2020-09-24 05:00:00',\n", + " '2020-09-24 06:00:00',\n", + " '2020-09-24 10:00:00',\n", + " '2020-09-24 19:00:00',\n", + " '2020-09-25 01:00:00',\n", + " '2020-09-25 02:00:00',\n", + " '2020-09-25 07:00:00',\n", + " '2020-09-25 08:00:00',\n", + " '2020-09-25 09:00:00',\n", + " '2020-09-25 17:00:00',\n", + " '2020-09-25 19:00:00',\n", + " '2020-09-26 10:00:00',\n", + " '2020-09-26 11:00:00',\n", + " '2020-09-27 03:00:00',\n", + " '2020-09-27 04:00:00',\n", + " '2020-09-27 08:00:00',\n", + " '2020-09-27 18:00:00',\n", + " '2020-09-27 19:00:00',\n", + " '2020-09-27 23:00:00',\n", + " '2020-09-28 10:00:00',\n", + " '2020-09-28 20:00:00',\n", + " '2020-09-28 22:00:00',\n", + " '2020-09-29 00:00:00',\n", + " '2020-09-29 05:00:00',\n", + " '2020-09-29 17:00:00',\n", + " '2020-09-29 19:00:00',\n", + " '2020-09-30 03:00:00',\n", + " '2020-09-30 08:00:00',\n", + " '2020-09-30 14:00:00',\n", + " '2020-09-30 19:00:00',\n", + " '2020-10-01 01:00:00',\n", + " '2020-10-01 02:00:00',\n", + " '2020-10-01 08:00:00',\n", + " '2020-10-01 09:00:00',\n", + " '2020-10-01 17:00:00',\n", + " '2020-10-01 18:00:00',\n", + " '2020-10-02 03:00:00',\n", + " '2020-10-02 15:00:00',\n", + " '2020-10-02 16:00:00',\n", + " '2020-10-02 22:00:00',\n", + " '2020-10-03 03:00:00',\n", + " '2020-10-03 04:00:00',\n", + " '2020-10-03 13:00:00',\n", + " '2020-10-03 23:00:00',\n", + " '2020-10-04 00:00:00',\n", + " '2020-10-04 06:00:00',\n", + " '2020-10-04 17:00:00',\n", + " '2020-10-04 18:00:00',\n", + " '2020-10-04 19:00:00',\n", + " '2020-10-05 16:00:00',\n", + " '2020-10-05 22:00:00',\n", + " '2020-10-06 08:00:00',\n", + " '2020-10-06 10:00:00',\n", + " '2020-10-07 03:00:00',\n", + " '2020-10-07 07:00:00',\n", + " '2020-10-07 11:00:00',\n", + " '2020-10-07 12:00:00',\n", + " '2020-10-07 17:00:00',\n", + " '2020-10-08 07:00:00',\n", + " '2020-10-08 13:00:00',\n", + " '2020-10-08 22:00:00',\n", + " '2020-10-08 23:00:00',\n", + " '2020-10-09 05:00:00',\n", + " '2020-10-09 06:00:00',\n", + " '2020-10-09 11:00:00',\n", + " '2020-10-10 03:00:00',\n", + " '2020-10-10 04:00:00',\n", + " '2020-10-10 10:00:00',\n", + " '2020-10-10 12:00:00',\n", + " '2020-10-10 14:00:00',\n", + " '2020-10-10 21:00:00',\n", + " '2020-10-10 22:00:00',\n", + " '2020-10-11 02:00:00',\n", + " '2020-10-11 03:00:00',\n", + " '2020-10-11 08:00:00',\n", + " '2020-10-11 13:00:00',\n", + " '2020-10-11 14:00:00',\n", + " '2020-10-12 09:00:00',\n", + " '2020-10-12 10:00:00',\n", + " '2020-10-12 11:00:00',\n", + " '2020-10-12 23:00:00',\n", + " '2020-10-13 00:00:00',\n", + " '2020-10-13 06:00:00',\n", + " '2020-10-13 07:00:00',\n", + " '2020-10-13 14:00:00',\n", + " '2020-10-13 16:00:00',\n", + " '2020-10-13 22:00:00',\n", + " '2020-10-14 04:00:00',\n", + " '2020-10-14 10:00:00',\n", + " '2020-10-14 11:00:00',\n", + " '2020-10-14 12:00:00',\n", + " '2020-10-15 01:00:00',\n", + " '2020-10-15 13:00:00',\n", + " '2020-10-16 05:00:00',\n", + " '2020-10-16 09:00:00',\n", + " '2020-10-16 13:00:00',\n", + " '2020-10-16 14:00:00',\n", + " '2020-10-16 15:00:00',\n", + " '2020-10-16 22:00:00',\n", + " '2020-10-16 23:00:00',\n", + " '2020-10-17 00:00:00',\n", + " '2020-10-17 14:00:00',\n", + " '2020-10-17 15:00:00',\n", + " '2020-10-17 16:00:00',\n", + " '2020-10-18 03:00:00',\n", + " '2020-10-18 13:00:00',\n", + " '2020-10-18 14:00:00',\n", + " '2020-10-19 00:00:00',\n", + " '2020-10-19 12:00:00',\n", + " '2020-10-19 19:00:00',\n", + " '2020-10-19 20:00:00',\n", + " '2020-10-20 04:00:00',\n", + " '2020-10-20 17:00:00',\n", + " '2020-10-20 18:00:00',\n", + " '2020-10-20 19:00:00',\n", + " '2020-10-21 01:00:00',\n", + " '2020-10-21 09:00:00',\n", + " '2020-10-21 10:00:00',\n", + " '2020-10-21 11:00:00',\n", + " '2020-10-21 19:00:00',\n", + " '2020-10-22 05:00:00',\n", + " '2020-10-22 06:00:00',\n", + " '2020-10-22 07:00:00',\n", + " '2020-10-22 14:00:00',\n", + " '2020-10-22 18:00:00',\n", + " '2020-10-22 23:00:00',\n", + " '2020-10-23 13:00:00',\n", + " '2020-10-24 01:00:00',\n", + " '2020-10-24 02:00:00',\n", + " '2020-10-24 03:00:00',\n", + " '2020-10-24 11:00:00',\n", + " '2020-10-24 12:00:00',\n", + " '2020-10-25 05:00:00',\n", + " '2020-10-25 11:00:00',\n", + " '2020-10-25 19:00:00',\n", + " '2020-10-25 20:00:00',\n", + " '2020-10-26 03:00:00',\n", + " '2020-10-26 05:00:00',\n", + " '2020-10-26 10:00:00',\n", + " '2020-10-26 14:00:00',\n", + " '2020-10-26 20:00:00',\n", + " '2020-10-27 00:00:00',\n", + " '2020-10-27 04:00:00',\n", + " '2020-10-27 05:00:00',\n", + " '2020-10-27 23:00:00',\n", + " '2020-10-28 10:00:00',\n", + " '2020-10-28 11:00:00',\n", + " '2020-10-28 12:00:00',\n", + " '2020-10-28 22:00:00',\n", + " '2020-10-28 23:00:00',\n", + " '2020-10-29 05:00:00',\n", + " '2020-10-29 12:00:00',\n", + " '2020-10-29 17:00:00',\n", + " '2020-10-29 18:00:00',\n", + " '2020-10-29 19:00:00',\n", + " '2020-10-30 00:00:00',\n", + " '2020-10-30 01:00:00',\n", + " '2020-10-30 19:00:00',\n", + " '2020-10-30 20:00:00',\n", + " '2020-10-30 21:00:00',\n", + " '2020-10-31 14:00:00',\n", + " '2020-10-31 15:00:00',\n", + " '2020-11-01 04:00:00',\n", + " '2020-11-01 13:00:00',\n", + " '2020-11-01 14:00:00',\n", + " '2020-11-01 23:00:00',\n", + " '2020-11-02 11:00:00',\n", + " '2020-11-03 00:00:00',\n", + " '2020-11-03 02:00:00',\n", + " '2020-11-03 07:00:00',\n", + " '2020-11-03 18:00:00',\n", + " '2020-11-04 01:00:00',\n", + " '2020-11-04 06:00:00',\n", + " '2020-11-04 17:00:00',\n", + " '2020-11-04 18:00:00',\n", + " '2020-11-05 01:00:00',\n", + " '2020-11-06 05:00:00',\n", + " '2020-11-06 11:00:00',\n", + " '2020-11-06 15:00:00',\n", + " '2020-11-07 00:00:00',\n", + " '2020-11-07 11:00:00',\n", + " '2020-11-07 12:00:00',\n", + " '2020-11-07 16:00:00',\n", + " '2020-11-07 23:00:00',\n", + " '2020-11-08 01:00:00',\n", + " '2020-11-08 07:00:00',\n", + " '2020-11-08 20:00:00',\n", + " '2020-11-09 02:00:00',\n", + " '2020-11-09 22:00:00',\n", + " '2020-11-09 23:00:00',\n", + " '2020-11-10 18:00:00',\n", + " '2020-11-10 19:00:00',\n", + " '2020-11-10 20:00:00',\n", + " '2020-11-11 06:00:00',\n", + " '2020-11-11 11:00:00',\n", + " '2020-11-11 16:00:00',\n", + " '2020-11-11 18:00:00',\n", + " '2020-11-12 00:00:00',\n", + " '2020-11-12 04:00:00',\n", + " '2020-11-12 11:00:00',\n", + " '2020-11-12 12:00:00',\n", + " '2020-11-12 21:00:00',\n", + " '2020-11-13 09:00:00',\n", + " '2020-11-13 13:00:00',\n", + " '2020-11-13 15:00:00',\n", + " '2020-11-13 20:00:00',\n", + " '2020-11-13 21:00:00',\n", + " '2020-11-14 09:00:00',\n", + " '2020-11-14 13:00:00',\n", + " '2020-11-14 18:00:00',\n", + " '2020-11-14 19:00:00',\n", + " '2020-11-14 23:00:00',\n", + " '2020-11-15 00:00:00',\n", + " '2020-11-15 04:00:00',\n", + " '2020-11-15 06:00:00',\n", + " '2020-11-15 10:00:00',\n", + " '2020-11-15 11:00:00',\n", + " '2020-11-15 12:00:00',\n", + " '2020-11-15 22:00:00',\n", + " '2020-11-16 00:00:00',\n", + " '2020-11-16 04:00:00',\n", + " '2020-11-16 05:00:00',\n", + " '2020-11-16 06:00:00',\n", + " '2020-11-16 13:00:00',\n", + " '2020-11-16 20:00:00',\n", + " '2020-11-17 04:00:00',\n", + " '2020-11-17 06:00:00',\n", + " '2020-11-17 17:00:00',\n", + " '2020-11-17 18:00:00',\n", + " '2020-11-18 01:00:00',\n", + " '2020-11-18 03:00:00',\n", + " '2020-11-18 10:00:00',\n", + " '2020-11-18 15:00:00',\n", + " '2020-11-18 16:00:00',\n", + " '2020-11-18 17:00:00',\n", + " '2020-11-19 02:00:00',\n", + " '2020-11-19 12:00:00',\n", + " '2020-11-19 13:00:00',\n", + " '2020-11-19 18:00:00',\n", + " '2020-11-19 22:00:00',\n", + " '2020-11-19 23:00:00',\n", + " '2020-11-20 09:00:00',\n", + " '2020-11-20 13:00:00',\n", + " '2020-11-20 14:00:00',\n", + " '2020-11-21 09:00:00',\n", + " '2020-11-21 10:00:00',\n", + " '2020-11-21 11:00:00',\n", + " '2020-11-21 20:00:00',\n", + " '2020-11-23 04:00:00',\n", + " '2020-11-23 10:00:00',\n", + " '2020-11-23 11:00:00',\n", + " '2020-11-23 18:00:00',\n", + " '2020-11-23 19:00:00',\n", + " '2020-11-23 20:00:00',\n", + " '2020-11-24 04:00:00',\n", + " '2020-11-24 15:00:00',\n", + " '2020-11-24 16:00:00',\n", + " '2020-11-24 20:00:00',\n", + " '2020-11-25 00:00:00',\n", + " '2020-12-03 22:00:00',\n", + " '2020-12-04 00:00:00',\n", + " '2020-12-04 05:00:00',\n", + " '2020-12-04 06:00:00',\n", + " '2020-12-04 18:00:00',\n", + " '2020-12-04 20:00:00',\n", + " '2020-12-05 01:00:00',\n", + " '2020-12-05 05:00:00',\n", + " '2020-12-05 07:00:00',\n", + " '2020-12-05 18:00:00',\n", + " '2020-12-05 23:00:00',\n", + " '2020-12-06 00:00:00',\n", + " '2020-12-06 01:00:00',\n", + " '2020-12-06 05:00:00',\n", + " '2020-12-06 06:00:00',\n", + " '2020-12-06 07:00:00',\n", + " '2020-12-06 18:00:00',\n", + " '2020-12-07 03:00:00',\n", + " '2020-12-07 04:00:00',\n", + " '2020-12-07 14:00:00',\n", + " '2020-12-07 15:00:00',\n", + " '2020-12-07 16:00:00',\n", + " '2020-12-08 00:00:00',\n", + " '2020-12-08 05:00:00',\n", + " '2020-12-08 06:00:00',\n", + " '2020-12-09 08:00:00',\n", + " '2020-12-09 10:00:00',\n", + " '2020-12-09 19:00:00',\n", + " '2020-12-10 04:00:00',\n", + " '2020-12-10 06:00:00',\n", + " '2020-12-10 19:00:00',\n", + " '2020-12-11 01:00:00',\n", + " '2020-12-11 12:00:00',\n", + " '2020-12-11 14:00:00',\n", + " '2020-12-11 21:00:00',\n", + " '2020-12-11 23:00:00',\n", + " '2020-12-12 15:00:00',\n", + " '2020-12-13 01:00:00',\n", + " '2020-12-13 02:00:00',\n", + " '2020-12-13 08:00:00',\n", + " '2020-12-13 12:00:00',\n", + " '2020-12-13 14:00:00',\n", + " '2020-12-13 16:00:00',\n", + " '2020-12-13 20:00:00',\n", + " '2020-12-13 21:00:00',\n", + " '2020-12-14 08:00:00',\n", + " '2020-12-14 10:00:00',\n", + " '2020-12-14 12:00:00',\n", + " '2020-12-14 16:00:00',\n", + " '2020-12-14 23:00:00',\n", + " '2020-12-15 08:00:00',\n", + " '2020-12-15 09:00:00',\n", + " '2020-12-15 17:00:00',\n", + " '2020-12-15 23:00:00',\n", + " '2020-12-16 09:00:00',\n", + " '2020-12-16 18:00:00',\n", + " '2020-12-17 02:00:00',\n", + " '2020-12-17 17:00:00',\n", + " '2020-12-18 09:00:00',\n", + " '2020-12-18 16:00:00',\n", + " '2020-12-18 18:00:00',\n", + " '2020-12-19 12:00:00',\n", + " '2020-12-19 13:00:00',\n", + " '2020-12-19 19:00:00',\n", + " '2020-12-19 21:00:00',\n", + " '2020-12-20 09:00:00',\n", + " '2020-12-20 10:00:00',\n", + " '2020-12-20 20:00:00',\n", + " '2020-12-21 00:00:00',\n", + " '2020-12-21 13:00:00',\n", + " '2020-12-21 21:00:00',\n", + " '2020-12-21 22:00:00',\n", + " '2020-12-22 09:00:00',\n", + " '2020-12-22 10:00:00',\n", + " '2020-12-23 07:00:00',\n", + " '2020-12-23 08:00:00',\n", + " '2020-12-23 15:00:00',\n", + " '2020-12-23 17:00:00',\n", + " '2020-12-24 03:00:00',\n", + " '2020-12-24 08:00:00',\n", + " '2020-12-24 10:00:00',\n", + " '2020-12-24 16:00:00',\n", + " '2020-12-24 21:00:00',\n", + " '2020-12-24 23:00:00',\n", + " '2020-12-25 04:00:00',\n", + " '2020-12-25 06:00:00',\n", + " '2020-12-25 14:00:00',\n", + " '2020-12-25 21:00:00',\n", + " '2020-12-25 23:00:00',\n", + " '2020-12-26 04:00:00',\n", + " '2020-12-26 05:00:00',\n", + " '2020-12-26 06:00:00',\n", + " '2020-12-26 14:00:00',\n", + " '2020-12-26 15:00:00',\n", + " '2020-12-27 02:00:00',\n", + " '2020-12-27 07:00:00',\n", + " '2020-12-27 12:00:00',\n", + " '2020-12-27 14:00:00',\n", + " '2020-12-27 21:00:00',\n", + " '2020-12-27 23:00:00',\n", + " '2020-12-28 10:00:00',\n", + " '2020-12-28 11:00:00',\n", + " '2020-12-29 06:00:00',\n", + " '2020-12-29 10:00:00',\n", + " '2020-12-29 11:00:00',\n", + " '2020-12-29 15:00:00',\n", + " '2020-12-30 00:00:00',\n", + " '2020-12-30 01:00:00',\n", + " '2020-12-30 05:00:00',\n", + " '2020-12-30 06:00:00',\n", + " '2020-12-30 07:00:00',\n", + " '2020-12-30 22:00:00',\n", + " '2020-12-31 02:00:00',\n", + " '2020-12-31 03:00:00',\n", + " '2020-12-31 08:00:00',\n", + " '2020-12-31 16:00:00',\n", + " '2020-12-31 21:00:00',\n", + " '2020-12-31 22:00:00',\n", + " '2020-12-31 23:00:00',\n", + " '2021-06-29 09:00:00',\n", + " '2021-06-29 10:00:00',\n", + " '2021-06-29 11:00:00',\n", + " '2021-06-29 21:00:00',\n", + " '2021-06-29 22:00:00',\n", + " '2021-06-30 03:00:00',\n", + " '2021-06-30 07:00:00',\n", + " '2021-06-30 08:00:00',\n", + " '2021-06-30 15:00:00',\n", + " '2021-06-30 16:00:00',\n", + " '2021-06-30 17:00:00',\n", + " '2021-06-30 21:00:00',\n", + " '2021-07-21 18:00:00',\n", + " '2021-12-30 12:00:00',\n", + " '2021-12-30 17:00:00',\n", + " '2021-12-30 23:00:00',\n", + " '2021-12-31 03:00:00',\n", + " '2021-12-31 09:00:00',\n", + " '2021-12-31 21:00:00',\n", + " '2021-12-31 22:00:00',\n", + " '2021-12-31 23:00:00']" + ] + }, + "execution_count": 66, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "missing_dates_qt_so_tz[1000:]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "missing_dates_comum_tz[2000:]" + ] + }, + { + "cell_type": "code", + "execution_count": 99, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "'155'" + ] + }, + "execution_count": 99, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import datetime as dt\n", + "current_time = '2021-07-21 18:00:00'\n", + "current_time = '2020-03-20 16:00:00'\n", + "current_time = '2020-06-03 23:00:00'\n", + "# current_time = '2020-03-21 11:00:00'\n", + "dt.datetime.strptime(current_time, \"%Y-%m-%d %H:%M:%S\").strftime(\"%j\")" + ] + }, + { + "cell_type": "code", + "execution_count": 50, + "metadata": {}, + "outputs": [], + "source": [ + "pd.DataFrame(missing_dates_comum_tz).to_csv(\"teste_apagar*.csv\", index=False)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "interpreter": { + "hash": "5ca4efd453f89b69dfc80cbb5ef222b3d24fc169b1efe916ea8f932329929361" + }, + "kernelspec": { + "display_name": "Python 3.9.5 ('venv': venv)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.5" + }, + "orig_nbformat": 4 + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/pipelines/rj_cor/meteorologia/satelite/flows.py b/pipelines/rj_cor/meteorologia/satelite/flows.py index 3a01f75ba..448028dff 100644 --- a/pipelines/rj_cor/meteorologia/satelite/flows.py +++ b/pipelines/rj_cor/meteorologia/satelite/flows.py @@ -93,6 +93,7 @@ table_id_rr, "prod", redis_files_rr_updated, + keep_last=200, wait=path_rr, ) @@ -130,6 +131,7 @@ table_id_tpw, "prod", redis_files_tpw_updated, + keep_last=200, wait=path_tpw, ) @@ -169,6 +171,7 @@ table_id_cmip, "prod", redis_files_cmip_updated, + keep_last=200, wait=path_cmip, ) diff --git a/pipelines/rj_cor/meteorologia/satelite/remap.py b/pipelines/rj_cor/meteorologia/satelite/remap.py index 95c91d22e..cdcd21189 100644 --- a/pipelines/rj_cor/meteorologia/satelite/remap.py +++ b/pipelines/rj_cor/meteorologia/satelite/remap.py @@ -3,7 +3,6 @@ """ Converte coordenada X,Y para latlon """ -import time as t import netCDF4 as nc import numpy as np @@ -106,9 +105,9 @@ def remap( # Perform the projection/resampling - print("Remapping", path) + # print("Remapping", path) - start = t.time() + # start = t.time() gdal.ReprojectImage( raw, @@ -119,7 +118,7 @@ def remap( options=["NUM_THREADS=ALL_CPUS"], ) - print("- finished! Time:", t.time() - start, "seconds") + # print("- finished! Time:", t.time() - start, "seconds") # Close file raw = None diff --git a/pipelines/rj_cor/meteorologia/satelite/satellite_utils.py b/pipelines/rj_cor/meteorologia/satelite/satellite_utils.py index 86f4606af..f56cbb928 100644 --- a/pipelines/rj_cor/meteorologia/satelite/satellite_utils.py +++ b/pipelines/rj_cor/meteorologia/satelite/satellite_utils.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- -# pylint: disable=too-many-locals +# pylint: disable=too-many-locals,R0913 +# flake8: noqa """ Funções úteis no tratamento de dados de satélite """ @@ -64,6 +65,7 @@ import datetime import os from pathlib import Path +import re from typing import Tuple, Union from google.cloud import storage @@ -111,6 +113,7 @@ def download_blob( credentials = get_credentials_from_env(mode=mode) storage_client = storage.Client(credentials=credentials) + # storage_client = storage.Client() bucket = storage_client.bucket(bucket_name) @@ -128,10 +131,11 @@ def converte_timezone(datetime_save: str) -> str: Recebe o formato de data hora em 'YYYYMMDD HHmm' no UTC e retorna no mesmo formato no horário São Paulo """ - log(f">>>>>>> {datetime_save}") - datahora = pendulum.from_format(datetime_save, "YYYYMMDD HHmm") + log(f">>>>>>> datetime_save {datetime_save}") + datahora = pendulum.from_format(datetime_save, "YYYYMMDD HHmmss") + log(f">>>>>>> datahora {datahora}") datahora = datahora.in_tz("America/Sao_Paulo") - return datahora.format("YYYYMMDD HHmm") + return datahora.format("YYYYMMDD HHmmss") def extract_julian_day_and_hour_from_filename(filename: str): @@ -154,7 +158,7 @@ def extract_julian_day_and_hour_from_filename(filename: str): julian_day = int(start[4:7]) # Time (UTC) as string - hour_utc = start[7:11] + hour_utc = start[7:13] # Time of the start of the Scan # time = start[7:9] + ":" + start[9:11] + ":" + start[11:13] + " UTC" @@ -207,7 +211,7 @@ def get_info(path: str) -> Tuple[dict, str]: if procura_m == -1: procura_m = path.find("-M4") product = path[path.find("L2-") + 3 : procura_m] - print(product) + # print(product) # Nem todos os produtos foram adicionados no dicionário de características # dos produtos. Olhar arquivo original caso o produto não estaja aqui @@ -367,8 +371,10 @@ def get_info(path: str) -> Tuple[dict, str]: if variable == "CMI": # Search for the GOES-16 channel in the file name + regex = "-M\\dC\\d" # noqa: W605 + find_expression = re.findall(regex, path)[0] product_caracteristics["band"] = int( - (path[path.find("M6C") + 3 : path.find("_G16")]) + (path[path.find(find_expression) + 4 : path.find("_G16")]) ) else: product_caracteristics["band"] = np.nan @@ -402,6 +408,7 @@ def remap_g16( resolution: int, variable: str, datetime_save: str, + mode_redis: str = "prod", ): """ the GOES-16 image is reprojected to the rectangular projection in the extent region @@ -436,7 +443,7 @@ def remap_g16( ) tif_path = os.path.join( - os.getcwd(), "data", "satelite", variable, "temp", partitions + os.getcwd(), mode_redis, "data", "satelite", variable, "temp", partitions ) if not os.path.exists(tif_path): @@ -519,7 +526,7 @@ def treat_data( def save_data_in_file( - variable: str, datetime_save: str, file_path: str + variable: str, datetime_save: str, file_path: str, mode_redis: str = "prod" ) -> Union[str, Path]: """ Save data in parquet @@ -539,7 +546,14 @@ def save_data_in_file( ) tif_data = os.path.join( - os.getcwd(), "data", "satelite", variable, "temp", partitions, "dados.tif" + os.getcwd(), + mode_redis, + "data", + "satelite", + variable, + "temp", + partitions, + "dados.tif", ) data = xr.open_dataset(tif_data, engine="rasterio") @@ -561,14 +575,20 @@ def save_data_in_file( ) # cria pasta de partições se elas não existem - output_path = os.path.join(os.getcwd(), "data", "satelite", variable, "output") + output_path = os.path.join( + os.getcwd(), mode_redis, "data", "satelite", variable, "output" + ) parquet_path = os.path.join(output_path, partitions) if not os.path.exists(parquet_path): os.makedirs(parquet_path) + data["horario"] = pendulum.from_format( + datetime_save, "YYYYMMDD HHmmss" + ).to_time_string() + # log(f">>>>> data head {data.head()}") # Fixa ordem das colunas - data = data[["longitude", "latitude", variable.lower()]] + data = data[["longitude", "latitude", "horario", variable.lower()]] # salva em csv filename = file_path.split("/")[-1].replace(".nc", "") @@ -579,7 +599,7 @@ def save_data_in_file( return output_path -def main(path: Union[str, Path]): +def main(path: Union[str, Path], mode_redis: str = "prod"): """ Função principal para converter dados x,y em lon,lat """ @@ -620,6 +640,7 @@ def main(path: Union[str, Path]): resolution, product_caracteristics["variable"], datetime_save, + mode_redis, ) info = { diff --git a/pipelines/rj_cor/meteorologia/satelite/tasks.py b/pipelines/rj_cor/meteorologia/satelite/tasks.py index 280f7e172..4446f806c 100644 --- a/pipelines/rj_cor/meteorologia/satelite/tasks.py +++ b/pipelines/rj_cor/meteorologia/satelite/tasks.py @@ -80,6 +80,7 @@ def download( ref_filename: str = None, redis_files: list = [], wait=None, + mode_redis: str = "prod", ) -> Union[str, Path]: """ Acessa o S3 e faz o download do primeiro arquivo da data-hora especificada @@ -121,14 +122,16 @@ def download( skip = Skipped("No available files on API") raise ENDRUN(state=skip) - base_path = os.path.join(os.getcwd(), "data", "satelite", variavel[:-1], "input") + base_path = os.path.join( + os.getcwd(), mode_redis, "data", "satelite", variavel[:-1], "input" + ) if not os.path.exists(base_path): os.makedirs(base_path) # Seleciona primeiro arquivo que não tem o nome salvo no redis - log(f"\n\n[DEBUG]: available files on API: {path_files}") - log(f"\n\n[DEBUG]: filenames already saved on redis_files: {redis_files}") + # log(f"\n\n[DEBUG]: available files on API: {path_files}") + # log(f"\n\n[DEBUG]: filenames already saved on redis_files: {redis_files}") log(f"\n\n[DEBUG]: ref_filename: {ref_filename}") # keep only ref_filename if it exists @@ -145,12 +148,13 @@ def download( download_file = None for path_file in path_files: filename = path_file.split("/")[-1] + log(f"\n\n[DEBUG]: {filename} check if is in redis") if filename not in redis_files: log(f"\n\n[DEBUG]: {filename} not in redis") redis_files.append(filename) path_filename = os.path.join(base_path, filename) download_file = path_file - log(f"[DEBUG]: filename to be append on redis_files: {redis_files}") + # log(f"[DEBUG]: filename to be append on redis_files: {redis_files}") break # Skip task if there is no new file @@ -174,18 +178,18 @@ def download( @task -def tratar_dados(filename: str) -> dict: +def tratar_dados(filename: str, mode_redis: str = "prod") -> dict: """ Converte coordenadas X, Y para latlon e recorta área """ log(f"\n>>>> Started treating file: {filename}") - grid, goes16_extent, info = main(filename) + grid, goes16_extent, info = main(filename, mode_redis) del grid, goes16_extent return info @task -def save_data(info: dict, file_path: str) -> Union[str, Path]: +def save_data(info: dict, file_path: str, mode_redis: str = "prod") -> Union[str, Path]: """ Convert tif data to csv """ @@ -193,5 +197,5 @@ def save_data(info: dict, file_path: str) -> Union[str, Path]: variable = info["variable"] datetime_save = info["datetime_save"] print(f"Saving {variable} in parquet") - output_path = save_data_in_file(variable, datetime_save, file_path) + output_path = save_data_in_file(variable, datetime_save, file_path, mode_redis) return output_path diff --git a/pipelines/rj_cor/meteorologia/satelite_backfill/__init__.py b/pipelines/rj_cor/meteorologia/satelite_backfill/__init__.py new file mode 100644 index 000000000..6198e4ad6 --- /dev/null +++ b/pipelines/rj_cor/meteorologia/satelite_backfill/__init__.py @@ -0,0 +1,8 @@ +# -*- coding: utf-8 -*- +""" +Prefect flows for workshop project +""" +############################################################################### +# Automatically managed, please do not touch +############################################################################### +from .flows import * diff --git a/pipelines/rj_cor/meteorologia/satelite_backfill/flows.py b/pipelines/rj_cor/meteorologia/satelite_backfill/flows.py new file mode 100644 index 000000000..4c964a75b --- /dev/null +++ b/pipelines/rj_cor/meteorologia/satelite_backfill/flows.py @@ -0,0 +1,300 @@ +# -*- coding: utf-8 -*- +# pylint: disable=C0103,E1101 +""" +Flows for emd +""" + +from prefect import Parameter +from prefect.run_configs import KubernetesRun +from prefect.storage import GCS + +# from prefect.tasks.prefect import create_flow_run, wait_for_flow_run + +from pipelines.constants import constants +from pipelines.rj_cor.meteorologia.satelite.constants import ( + constants as satelite_constants, +) + +# from pipelines.utils.constants import constants as utils_constants +from pipelines.rj_cor.meteorologia.satelite.tasks import ( + get_dates, + slice_data, + download, + tratar_dados, + save_data, +) +from pipelines.rj_cor.tasks import ( + get_on_redis, + save_on_redis, +) + +# from pipelines.rj_cor.meteorologia.satelite.schedules import hour_schedule +from pipelines.rj_cor.meteorologia.satelite_backfill.tasks import delete_files +from pipelines.utils.decorators import Flow + +from pipelines.utils.tasks import ( + create_table_and_upload_to_gcs, + # get_current_flow_labels, +) + + +with Flow( + "satelite_goes_16_backfill", + code_owners=[ + "paty", + ], +) as satelite_goes_16_backfill: + + # Materialization parameters + materialize_after_dump = Parameter( + "materialize_after_dump", default=False, required=False + ) + materialize_to_datario = Parameter( + "materialize_to_datario", default=False, required=False + ) + materialization_mode = Parameter("mode", default="dev", required=False) + + # Other parameters + dataset_id = satelite_constants.DATASET_ID.value + dump_mode = "append" + ref_filename = Parameter("ref_filename", default=None, required=False) + current_time = Parameter("current_time", default=None, required=False) + mode_redis = Parameter("mode_redis", default="prod", required=False) + current_time = get_dates(current_time) + + date_hour_info = slice_data(current_time=current_time, ref_filename=ref_filename) + + delete_folder = delete_files(mode_redis) + + # # Para taxa de precipitação + # variavel_rr = satelite_constants.VARIAVEL_RR.value + # table_id_rr = satelite_constants.TABLE_ID_RR.value + + # # Get filenames that were already treated on redis + # redis_files_rr = get_on_redis(dataset_id, table_id_rr, mode=mode_redis) + + # redis_files_rr.set_upstream(delete_folder) + + # # Download raw data from API + # filename_rr, redis_files_rr_updated = download( + # variavel=variavel_rr, + # date_hour_info=date_hour_info, + # redis_files=redis_files_rr, + # ref_filename=ref_filename, + # wait=redis_files_rr, + # mode_redis=mode_redis, + # ) + + # # Start data treatment if there are new files + # info_rr = tratar_dados(filename=filename_rr, mode_redis=mode_redis) + # path_rr = save_data(info=info_rr, file_path=filename_rr, mode_redis=mode_redis) + + # # Create table in BigQuery + # upload_table_rr = create_table_and_upload_to_gcs( + # data_path=path_rr, + # dataset_id=dataset_id, + # table_id=table_id_rr, + # dump_mode=dump_mode, + # wait=path_rr, + # ) + + # # Save new filenames on redis + # save_on_redis( + # dataset_id, + # table_id_rr, + # mode=mode_redis, + # files=redis_files_rr_updated, + # keep_last=2000, + # wait=path_rr, + # ) + + # Para quantidade de água precipitável + variavel_tpw = satelite_constants.VARIAVEL_TPW.value + table_id_tpw = satelite_constants.TABLE_ID_TPW.value + + # Get filenames that were already treated on redis + redis_files_tpw = get_on_redis(dataset_id, table_id_tpw, mode=mode_redis) + + redis_files_tpw.set_upstream(delete_folder) + + # Download raw data from API + filename_tpw, redis_files_tpw_updated = download( + variavel=variavel_tpw, + date_hour_info=date_hour_info, + redis_files=redis_files_tpw, + ref_filename=ref_filename, + wait=redis_files_tpw, + mode_redis=mode_redis, + ) + + # Start data treatment if there are new files + info_tpw = tratar_dados(filename=filename_tpw, mode_redis=mode_redis) + path_tpw = save_data(info=info_tpw, file_path=filename_tpw, mode_redis=mode_redis) + + upload_table_tpw = create_table_and_upload_to_gcs( + data_path=path_tpw, + dataset_id=dataset_id, + table_id=table_id_tpw, + dump_mode=dump_mode, + wait=path_tpw, + ) + + # Save new filenames on redis + save_on_redis( + dataset_id, + table_id_tpw, + mode_redis, + redis_files_tpw_updated, + keep_last=2000, + wait=path_tpw, + ) + + # Para clean_ir_longwave_window (band 13) CMIPF + variavel_cmip = satelite_constants.VARIAVEL_cmip.value + table_id_cmip = satelite_constants.TABLE_ID_cmip.value + + # Get filenames that were already treated on redis + redis_files_cmip = get_on_redis(dataset_id, table_id_cmip, mode=mode_redis) + + redis_files_cmip.set_upstream(delete_folder) + + # Download raw data from API + filename_cmip, redis_files_cmip_updated = download( + variavel=variavel_cmip, + date_hour_info=date_hour_info, + band="13", + redis_files=redis_files_cmip, + ref_filename=ref_filename, + wait=redis_files_cmip, + mode_redis=mode_redis, + ) + + # Start data treatment if there are new files + info_cmip = tratar_dados(filename=filename_cmip, mode_redis=mode_redis) + path_cmip = save_data( + info=info_cmip, file_path=filename_cmip, mode_redis=mode_redis + ) + + # Create table in BigQuery + upload_table_cmip = create_table_and_upload_to_gcs( + data_path=path_cmip, + dataset_id=dataset_id, + table_id=table_id_cmip, + dump_mode=dump_mode, + wait=path_cmip, + ) + + # Save new filenames on redis + save_on_redis( + dataset_id, + table_id_cmip, + mode_redis, + redis_files_cmip_updated, + keep_last=2000, + wait=path_cmip, + ) + + # # Trigger DBT flow run + # with case(materialize_after_dump, True): + # current_flow_labels = get_current_flow_labels() + + # # Materializar RR + # materialization_flow_rr = create_flow_run( + # flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, + # project_name=constants.PREFECT_DEFAULT_PROJECT.value, + # parameters={ + # "dataset_id": dataset_id, + # "table_id": table_id_rr, + # "mode": materialization_mode, + # "materialize_to_datario": materialize_to_datario, + # }, + # labels=current_flow_labels, + # run_name=f"Materialize {dataset_id}.{table_id_rr}", + # ) + + # materialization_flow_rr.set_upstream(upload_table_rr) + + # wait_for_materialization_rr = wait_for_flow_run( + # materialization_flow_rr, + # stream_states=True, + # stream_logs=True, + # raise_final_state=True, + # ) + + # # Materializar TPW + # materialization_flow_tpw = create_flow_run( + # flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, + # project_name=constants.PREFECT_DEFAULT_PROJECT.value, + # parameters={ + # "dataset_id": dataset_id, + # "table_id": table_id_tpw, + # "mode": materialization_mode, + # "materialize_to_datario": materialize_to_datario, + # }, + # labels=current_flow_labels, + # run_name=f"Materialize {dataset_id}.{table_id_tpw}", + # ) + + # materialization_flow_tpw.set_upstream(upload_table_tpw) + + # wait_for_materialization_tpw = wait_for_flow_run( + # materialization_flow_tpw, + # stream_states=True, + # stream_logs=True, + # raise_final_state=True, + # ) + + # # Materializar CMIP + # materialization_flow_cmip = create_flow_run( + # flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, + # project_name=constants.PREFECT_DEFAULT_PROJECT.value, + # parameters={ + # "dataset_id": dataset_id, + # "table_id": table_id_cmip, + # "mode": materialization_mode, + # "materialize_to_datario": materialize_to_datario, + # }, + # labels=current_flow_labels, + # run_name=f"Materialize {dataset_id}.{table_id_cmip}", + # ) + + # materialization_flow_cmip.set_upstream(upload_table_cmip) + + # wait_for_materialization_cmip = wait_for_flow_run( + # materialization_flow_cmip, + # stream_states=True, + # stream_logs=True, + # raise_final_state=True, + # ) + + ######### + # VARIAVEL_RR = "RRQPEF" + # DATASET_ID_RR = "meio_ambiente_clima" + # TABLE_ID_RR = "taxa_precipitacao_satelite" + + # filename_rr = download( + # variavel=VARIAVEL_RR, ano=ano, dia_juliano=dia_juliano, hora=hora + # ) + + # info_rr = tratar_dados(filename=filename_rr) + # path_rr, file_rr = salvar_parquet(info=info_rr) + + # waiting_rr = create_table_and_upload_to_gcs( + # data_path=path_rr, + # dataset_id=DATASET_ID_RR, + # table_id=TABLE_ID_RR, + # dump_mode=DUMP_MODE, + # wait=path_rr, + # ) + # # Edge(upstream_task = create_table_and_upload_to_gcs, + # # downstream_task = delete_files) + # delete_files(filename=file_rr, input_filename=filename_rr, wait=waiting_rr) + +# para rodar na cloud +satelite_goes_16_backfill.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +satelite_goes_16_backfill.run_config = KubernetesRun( + image=constants.DOCKER_IMAGE.value, + labels=[constants.RJ_COR_AGENT_LABEL.value], +) +# satelite_goes_16_backfill.add_edge(upstream_task = create_table_and_upload_to_gcs, +# downstream_task = delete_files) diff --git a/pipelines/rj_cor/meteorologia/satelite_backfill/tasks.py b/pipelines/rj_cor/meteorologia/satelite_backfill/tasks.py new file mode 100644 index 000000000..45ec497ee --- /dev/null +++ b/pipelines/rj_cor/meteorologia/satelite_backfill/tasks.py @@ -0,0 +1,32 @@ +# -*- coding: utf-8 -*- +# flake8: noqa: E501 +""" +Tasks for backfill pipeline. +""" + +import os +import shutil +from prefect import task + + +@task +def delete_files( + mode_redis: str, + wait=None, # pylint: disable=unused-argument +): + """ + Deletar tanto os arquivos raw, quanto os temporários e de output + """ + path = f"/home/patricia/Documentos/escritorio_dados/prefeitura-rio/pipelines/{mode_redis}/data/satelite/" + if os.path.exists(path): + print("\n\n\n\n path exist\n\n\n") + shutil.rmtree(path) + + # filename_temp = filename.replace("output", "temp").replace(".csv", ".tif") + # files = [filename, filename_temp, input_filename] + + # [remove(f) for f in files] + + # get_dir = filename.split("input")[0] + # print(f"Deleting files from {get_dir}") + # [f.unlink() for f in Path(r"{}".format(get_dir)).glob("**/*") if f.is_file()] diff --git a/pipelines/rj_cor/meteorologia/satelite_backfill/utils.py b/pipelines/rj_cor/meteorologia/satelite_backfill/utils.py new file mode 100644 index 000000000..ed2f6897c --- /dev/null +++ b/pipelines/rj_cor/meteorologia/satelite_backfill/utils.py @@ -0,0 +1,143 @@ +# -*- coding: utf-8 -*- +# pylint: disable=W0102 +""" +General utilities for backfill pipelines. +""" + +from typing import Any, Dict + +import pendulum +import prefect + +from pipelines.utils.utils import log + + +def run_local_backfill( + flow: prefect.Flow, + parameters: Dict[str, Any] = {}, + backfill_parameters: Dict[str, Any] = None, +): + """ + Runs backfill flow locally. + It runs backwards in time, so it runs the most recent dates first. + + Mandatory backfill_parameters: + * start_date: Start date for backfill + * end_date: End date for backfill + * format_date: Format in which dates were passed + * interval: Number of time interval + * interval_period: If the skip is in hours, days or weeks + Note: keepthe same formar for start_date and end_date + + Example: + backfill_parameters = { + 'start_date': '2022-03-01 00:20:00', + 'end_date': '2022-03-01 04:20:00', + 'format_date': 'YYYY-MM-DD HH:mm:ss', + 'interval': '1', + 'interval_period': 'hours' + } + run_local_backfill(flow, backfill_parameters=backfill_parameters) + """ + # Setup for local run + flow.storage = None + flow.run_config = None + flow.schedule = None + + if sorted(backfill_parameters.keys()) != [ + "end_date", + "format_date", + "interval", + "interval_period", + "mode_redis", + "start_date", + ]: + log( + "Your parameter input has missing information. Check if you have all of this\ + parameters: start_date, end_date, format_date, interval, interval_period" + ) + return + + start_date = backfill_parameters["start_date"] + end_date = backfill_parameters["end_date"] + format_date = backfill_parameters["format_date"] + interval = int(backfill_parameters["interval"]) + interval_period = backfill_parameters["interval_period"] + mode_redis = backfill_parameters["mode_redis"] + + if interval_period not in ("minutes", "hours", "days", "weeks"): + log( + "interval_period only accepts minutes, hours, days, weeks. Change the code if necessary" + ) + return + + start_date = pendulum.from_format(start_date, format_date) + end_date = pendulum.from_format(end_date, format_date) + + while start_date < end_date: + # Run flow + parameters["current_time"] = end_date.to_datetime_string() + parameters["mode_redis"] = mode_redis + flow.run(parameters=parameters) + # Update end_date backwards + if interval_period == "minutes": + end_date = end_date.subtract(minutes=interval) + if interval_period == "hours": + end_date = end_date.subtract(hours=interval) + elif interval_period == "days": + end_date = end_date.subtract(days=interval) + if interval_period == "weeks": + end_date = end_date.subtract(weeks=interval) + + +def run_local_missing( + flow: prefect.Flow, + parameters: Dict[str, Any] = {}, + backfill_parameters: Dict[str, Any] = None, +): + """ + Runs backfill flow locally. + It runs backwards in time, so it runs the most recent dates first. + + Mandatory backfill_parameters: + * start_date: Start date for backfill + * end_date: End date for backfill + * format_date: Format in which dates were passed + * interval: Number of time interval + * interval_period: If the skip is in hours, days or weeks + Note: keepthe same formar for start_date and end_date + + Example: + backfill_parameters = { + 'start_date': '2022-03-01 00:20:00', + 'end_date': '2022-03-01 04:20:00', + 'format_date': 'YYYY-MM-DD HH:mm:ss', + 'interval': '1', + 'interval_period': 'hours' + } + run_local_backfill(flow, backfill_parameters=backfill_parameters) + """ + # Setup for local run + flow.storage = None + flow.run_config = None + flow.schedule = None + + # if sorted(backfill_parameters.keys()) != ['end_date', 'format_date', + # 'interval', 'interval_period', 'start_date']: + # log("Your parameter input has missing information. Check if you have all of this\ + # parameters: start_date, end_date, format_date, interval, interval_period") + # return + + date_list = backfill_parameters["date_list"] + # save_removed_dates = [] + + while len(date_list): + # Run flow + parameters["current_time"] = date_list[0] + flow.run(parameters=parameters) + # Update end_date backwards + date_list.pop(0) + # removed = date_list.pop(0) + # save_removed_dates.append(removed) + + # print(f"Items removed: {save_removed_dates}") diff --git a/pipelines/rj_cor/tasks.py b/pipelines/rj_cor/tasks.py index ecc8c40a5..b5d4fc804 100644 --- a/pipelines/rj_cor/tasks.py +++ b/pipelines/rj_cor/tasks.py @@ -21,12 +21,15 @@ def get_on_redis( """ Set the last updated time on Redis. """ - redis_client = get_redis_client() + redis_client = get_redis_client(host="127.0.0.1") key = build_redis_key(dataset_id, table_id, "files", mode) + # print(f"\n\n get from key {key} \n\n") files_on_redis = redis_client.get(key) + # print("\n\n files saved on redis: ", files_on_redis, "\n\n") files_on_redis = [] if files_on_redis is None else files_on_redis files_on_redis = list(set(files_on_redis)) files_on_redis.sort() + # print("\n\n files saved on redis: ", files_on_redis, "\n\n") return files_on_redis @@ -42,10 +45,12 @@ def save_on_redis( """ Set the last updated time on Redis. """ - redis_client = get_redis_client() + redis_client = get_redis_client(host="127.0.0.1") key = build_redis_key(dataset_id, table_id, "files", mode) + # print(f"\n\n save on key {key} \n\n") files = list(set(files)) - print(">>>> save on redis files ", files) - files.sort() + files.sort(reverse=True) # no backfill que roda contrário tem que ter o reverse + # print("\n\n all files to save on redis: ", files, "\n\n") files = files[-keep_last:] + # print("\n\n files to save on redis: ", files, "\n\n") redis_client.set(key, files) diff --git a/pipelines/rj_cor/utils.py b/pipelines/rj_cor/utils.py index 55b78f570..907631753 100644 --- a/pipelines/rj_cor/utils.py +++ b/pipelines/rj_cor/utils.py @@ -9,6 +9,6 @@ def build_redis_key(dataset_id: str, table_id: str, name: str, mode: str = "prod Helper function for building a key where to store the last updated time """ key = dataset_id + "." + table_id + "." + name - if mode == "dev": + if mode != "prod": key = f"{mode}.{key}" return key diff --git a/pipelines/utils/__init__.py b/pipelines/utils/__init__.py index c9ba741d3..25e53807a 100644 --- a/pipelines/utils/__init__.py +++ b/pipelines/utils/__init__.py @@ -3,11 +3,13 @@ Helper flows that could fit any pipeline. """ from pipelines.utils.backfill_flow.flows import * -from pipelines.utils.dump_datario.flows import * + +# from pipelines.utils.dump_datario.flows import * from pipelines.utils.dump_db.flows import * from pipelines.utils.dump_to_gcs.flows import * from pipelines.utils.dump_url.flows import * from pipelines.utils.execute_dbt_model.flows import * -from pipelines.utils.georeference.flows import * + +# from pipelines.utils.georeference.flows import * from pipelines.utils.predict_flow.flows import * from pipelines.utils.whatsapp_bot.flows import *