Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposed changes to api.py and updated congestion, rating and traffic image classes. #8

Closed
wants to merge 11 commits into from
16 changes: 16 additions & 0 deletions pipeline/TrafficImage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import APIClient

class TrafficImage:
def __init__(self, image=None,processed=False, congestion_rating=None, camera_id="Not Set!", longitude="Longitude not set", latitude="Latitude not set"):
self.image = image #numpy array
self.processed = processed
self.conestion_rating = congestion_rating
self.camera_id = camera_id
self.longitude = longitude
self.latitude = latitude

def processed(self, congestion_rating):
self.processed = True
self.conestion_rating = congestion_rating


85 changes: 27 additions & 58 deletions pipeline/api.py
Original file line number Diff line number Diff line change
@@ -1,70 +1,39 @@
#
# Flowmotion
# Pipeline
# Traffic Images API Client
#

import asyncio
import requests

import httpx
API_URL = "https://api.data.gov.sg/v1/transport/traffic-images"

from model import Camera, Location
class APIClient():
def __init__(self, url="No API URL"):
self.url = url
self.timestamp = None
self.api_status = "Unverified"
self.metadata = None

# Get API response
response = requests.get(self.url)
response_json = response.json()
self.metadata = response_json

class TrafficImageAPI:
"""Data.gov.sg Traffic Images API Client."""
# Get and set API status
self.api_status = self.metadata["api_info"]["status"]

API_URL = "https://api.data.gov.sg/v1/transport/traffic-images"
# Get and set timestamp
self.timestamp = self.metadata["items"][0]["timestamp"]

def __init__(self):
self._sync = httpx.Client()
self._async = httpx.AsyncClient()
print(f"The API status is: {self.api_status}")
print(f"The API was called at: {self.timestamp}")

def get_cameras(self) -> list[Camera]:
"""Get Traffic Camera metadata from traffic images API endpoint.
def extract_image(self, camera_id):
# Loop through the items and cameras to find the correct camera_id
for item in self.metadata["items"]:
for camera in item["cameras"]:
if camera["camera_id"] == str(camera_id):
return camera["image"] # Return the image URL if the camera ID matches
# If camera ID is not found
return f"Camera ID {camera_id} not found."

Returns:
Parsed traffic camera metadata.
"""
# fetch traffic-images api endpoint
response = self._sync.get(TrafficImageAPI.API_URL)
response.raise_for_status()
meta = response.json()
return parse_cameras(meta)

# parse traffic camera metadata


def get_images(self, cameras: list[Camera]) -> list[bytes]:
"""Get Traffic Camera images from given Traffic Cameras.

Args:
cameras:
List of traffic cameras to retrieve traffic images from.
Returns:
List of JPEG image bytes camera captured by each given Camera.
"""

async def fetch():
responses = [self._async.get(camera.image_url) for camera in cameras]
images = [(await r).aread() for r in responses]
return await asyncio.gather(*images)

return asyncio.run(fetch())


def parse_cameras(meta: dict) -> list[Camera]:
meta = meta["items"][0]
retrieved_on = meta["timestamp"]
return [
Camera(
camera_id=c["camera_id"],
retrieved_on=retrieved_on,
captured_on=c["timestamp"],
image_url=c["image"],
location=Location(
longitude=c["location"]["longitude"],
latitude=c["location"]["latitude"],
),
)
for c in meta["cameras"]
]
8 changes: 6 additions & 2 deletions pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,9 @@
#

if __name__ == "__main__":
# TODO: pipeline code starts here
pass
# call api and populate TrafficImage objects.
# create an array of TrafficImage objects
# Pass arrray through the ML model
#-> ensure model calls .processed()


Loading