-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Revert "feat(pipeline): Refactor APIClient, Add TrafficImage & Model …
- Loading branch information
Showing
9 changed files
with
129 additions
and
198 deletions.
There are no files selected for viewing
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,51 +1,70 @@ | ||
import requests | ||
|
||
|
||
class APIClient: | ||
def __init__(self, url="https://api.data.gov.sg/v1/transport/traffic-images"): | ||
self.url = url | ||
self.timestamp = None | ||
self.api_status = "Unverified" | ||
self.camera_id_array = [] | ||
|
||
# Get API response | ||
response = requests.get(self.url) | ||
response_json = response.json() | ||
self.metadata = response_json | ||
|
||
# Get and set API status | ||
self.api_status = self.metadata["api_info"]["status"] | ||
|
||
# Get and set timestamp | ||
self.timestamp = self.metadata["items"][0]["timestamp"] | ||
|
||
print(f"The API status is: {self.api_status}") | ||
print(f"The API was called at: {self.timestamp}") | ||
|
||
for item in self.metadata["items"]: | ||
for camera in item["cameras"]: | ||
self.camera_id_array.append(camera["camera_id"]) | ||
|
||
def extract_image(self, camera_id): | ||
# Loop through the items and cameras to find the correct camera_id | ||
for item in self.metadata["items"]: | ||
for camera in item["cameras"]: | ||
if camera["camera_id"] == str(camera_id): | ||
return camera[ | ||
"image" | ||
] # Return the image URL if the camera ID matches | ||
# If camera ID is not found | ||
raise RuntimeError(f"Camera ID {camera_id} not found.") | ||
|
||
def extract_latlon(self, camera_id): | ||
for item in self.metadata["items"]: | ||
for camera in item["cameras"]: | ||
if camera["camera_id"] == str(camera_id): | ||
longitude = camera["location"]["longitude"] | ||
latitude = camera["location"]["latitude"] | ||
return ( | ||
longitude, | ||
latitude, | ||
) # Return both longitude and latitude as a tuple | ||
# If camera ID is not found | ||
raise RuntimeError(f"Camera ID {camera_id} not found.") | ||
# | ||
# Flowmotion | ||
# Pipeline | ||
# Traffic Images API Client | ||
# | ||
|
||
import asyncio | ||
|
||
import httpx | ||
|
||
from model import Camera, Location | ||
|
||
|
||
class TrafficImageAPI: | ||
"""Data.gov.sg Traffic Images API Client.""" | ||
|
||
API_URL = "https://api.data.gov.sg/v1/transport/traffic-images" | ||
|
||
def __init__(self): | ||
self._sync = httpx.Client() | ||
self._async = httpx.AsyncClient() | ||
|
||
def get_cameras(self) -> list[Camera]: | ||
"""Get Traffic Camera metadata from traffic images API endpoint. | ||
Returns: | ||
Parsed traffic camera metadata. | ||
""" | ||
# fetch traffic-images api endpoint | ||
response = self._sync.get(TrafficImageAPI.API_URL) | ||
response.raise_for_status() | ||
meta = response.json() | ||
return parse_cameras(meta) | ||
|
||
# parse traffic camera metadata | ||
|
||
def get_images(self, cameras: list[Camera]) -> list[bytes]: | ||
"""Get Traffic Camera images from given Traffic Cameras. | ||
Args: | ||
cameras: | ||
List of traffic cameras to retrieve traffic images from. | ||
Returns: | ||
List of JPEG image bytes camera captured by each given Camera. | ||
""" | ||
|
||
async def fetch(): | ||
responses = [self._async.get(camera.image_url) for camera in cameras] | ||
images = [(await r).aread() for r in responses] | ||
return await asyncio.gather(*images) | ||
|
||
return asyncio.run(fetch()) | ||
|
||
|
||
def parse_cameras(meta: dict) -> list[Camera]: | ||
meta = meta["items"][0] | ||
retrieved_on = meta["timestamp"] | ||
return [ | ||
Camera( | ||
id=c["camera_id"], | ||
retrieved_on=retrieved_on, | ||
captured_on=c["timestamp"], | ||
image_url=c["image"], | ||
location=Location( | ||
longitude=c["location"]["longitude"], | ||
latitude=c["location"]["latitude"], | ||
), | ||
) | ||
for c in meta["cameras"] | ||
] |
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,22 +1,48 @@ | ||
# | ||
# Flowmotion | ||
# Pipeline | ||
# ML Model | ||
# Models | ||
# | ||
|
||
from TrafficImage import TrafficImage | ||
import json | ||
from datetime import datetime | ||
|
||
from pydantic import BaseModel | ||
|
||
class Model: | ||
def __init__(self): | ||
"""Initialise ML model for traffic congestion inference""" | ||
# TODO: initialise model | ||
pass | ||
|
||
def predict(self, images: list[TrafficImage]): | ||
"""Predict the traffic congestion rating (0.0-1.0) in the given images. | ||
class Location(BaseModel): | ||
"""Geolocation consisting of longitude and latitude.""" | ||
|
||
Set the predicted congestion rating for each TrafficImage using TrafficImage.set_processed() | ||
""" | ||
# TODO: make prediction and TrafficImage.set_processed() | ||
pass | ||
longitude: float | ||
latitude: float | ||
|
||
|
||
class Rating(BaseModel): | ||
"""Traffic Congestion rating performed by a model""" | ||
|
||
rated_on: datetime | ||
model_id: str | ||
value: float | ||
|
||
|
||
class Camera(BaseModel): | ||
"""Traffic Camera capturing traffic images.""" | ||
|
||
id: str | ||
image_url: str | ||
captured_on: datetime | ||
retrieved_on: datetime | ||
location: Location | ||
|
||
|
||
class Congestion(BaseModel): | ||
"""Traffic Congestion data.""" | ||
|
||
camera: Camera | ||
rating: Rating | ||
updated_on: datetime | ||
|
||
|
||
def to_json_dict(model: BaseModel): | ||
"""Convert given pydantic model into the its JSON dict representation""" | ||
return json.loads(model.model_dump_json()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,18 +1,8 @@ | ||
from api import APIClient | ||
from TrafficImage import TrafficImage | ||
# | ||
# Flowmotion | ||
# Pipeline | ||
# | ||
|
||
if __name__ == "__main__": | ||
apiclient = APIClient("https://api.data.gov.sg/v1/transport/traffic-images") | ||
active_cameraIDs = apiclient.camera_id_array | ||
current_traffic_camera_objects = [] # array of TrafficImage objects | ||
|
||
# populating array of TrafficImage objects | ||
for camera_id in active_cameraIDs: | ||
image_url = apiclient.extract_image(camera_id) | ||
longitude, latitude = apiclient.extract_latlon(camera_id) | ||
traffic_camera_obj = TrafficImage( | ||
camera_id=camera_id, image=image_url, longitude=longitude, latitude=latitude | ||
) | ||
current_traffic_camera_objects.append(traffic_camera_obj) | ||
|
||
# MLmodel(current_traffic_camera_objects) | ||
# TODO: pipeline code starts here | ||
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters