Skip to content

Commit

Permalink
main files
Browse files Browse the repository at this point in the history
  • Loading branch information
nadje committed Jul 18, 2024
1 parent 82b08e8 commit 9304aac
Show file tree
Hide file tree
Showing 6 changed files with 325 additions and 14 deletions.
4 changes: 2 additions & 2 deletions docs/index.rst
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Welcome to |project| documentation!
Welcome to "Automate custom events" documentation!
===================================

.. toctree::
Expand All @@ -7,7 +7,7 @@ Welcome to |project| documentation!
history


.. automodule:: skeleton
.. automodule:: automate_custom_events
:members:
:undoc-members:
:show-inheritance:
Expand Down
24 changes: 19 additions & 5 deletions setup.cfg
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
[metadata]
name = pupil_labs_project_name
description = Project description
name = pupil_labs.automate_custom_events
description = Automate custom events
long_description = file: README.rst
long_description_content_type = text/x-rst
url = https://github.com/pupil-labs/python-module-skeleton
url = https://github.com/pupil-labs/automate-custom-events
author = Pupil Labs GmbH
author_email = [email protected]
license = MIT
Expand All @@ -18,12 +18,26 @@ classifiers =
[options]
packages = find_namespace:
install_requires =
importlib-metadata;python_version<"3.8"
python_requires = >=3.7
numpy
matplotlib
pandas
scipy
pupil-labs-dynamic-rim
pupil-labs-automate_custom_events
rich
openai
av
requests
opencv-python
python_requires = >=3.9
include_package_data = true
package_dir =
=src

[options.entry_points]
console_scripts =
pl-automate-custom-events = pupil_labs.automate_custom_events.__main__:run_main

[options.packages.find]
where = src
exclude =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
"""Top-level entry-point for the <project_name> package"""
"""Top-level entry-point for the automate_custom_events package"""

try:
from importlib.metadata import PackageNotFoundError, version
except ImportError:
import sys

if sys.version_info < (3, 8):
from importlib_metadata import PackageNotFoundError, version
else:
from importlib.metadata import PackageNotFoundError, version

try:
__version__ = version("pupil_labs.project_name")
__version__ = version("pupil_labs_automate_custom_events")
except PackageNotFoundError:
# package is not installed
pass

__all__ = ["__version__"]
__all__ = ["__version__"]
247 changes: 247 additions & 0 deletions src/pupil_labs/automate_custom_events/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
import base64
import requests
import av
import base64
import io
import logging
import cv2
from openai import OpenAI
import numpy as np
from rich.progress import Progress
import pandas as pd
import re
from pathlib import Path
import os
import json
import glob
from pupil_labs.automate_custom_events.cloud_interaction import download_recording, send_event_to_cloud

def isMonotonicInc(arr):
return np.all(np.diff(arr) >= 0)

class ActivityRecognition:
def __init__(self):
self.base64Frames = []
self.client = OpenAI(api_key=OPENAI_API_KEY)
self.object = "book"
self.base_prompt = """Act as an experienced video-annotator. You are assigned a task to identify frames with pre-defined descriptions and tag them by adding labels.
Find which frame number in the length of the entire base64Frames this frame corresponds to. Ignore grey frames. Return the number of frame where an activity is found and print it.
The response format should be: 'Frame X: Description, Activity, Timestamp'.
The 'Frame' is followed by the frame number
The 'Description' can be a sentence of up to 10 words.
The 'Activity' has to be a summary of the description in 2 words separated by underscores.
The 'Timestamp' is the video_df['timestamp [s]'] that corresponds to this frame number. Find the frames where one of the following activities happen,
only the first occurrence of this activity should be returned.
You will be penalized if you return more frames of each activity:"""


self.picking_up = f"""Activity 1: Identify the frame where "Someone picks up a {self.object} that is placed on a desk".
This frame should depict the exact moment the person makes contact with the {self.object} and begins to pick it up.
Code this frame as 'picking_up'.
"""
self.opening = """
Activity 2: Identify the frame where "Someone opens a book for the first time".
This frame should depict the exact moment the book starts to open.
Code this frame as 'opens_book'.
"""
self.closing = """
Activity 3: Identify the frame where "Someone closes an open book they hold in their hands".
This frame should depict the exact moment the person begins to close the book.
Code this frame as 'closes_book'.
"""
self.putting_down = f"""
Activity 4: Identify the frame where "Someone puts down the {self.object} on the desk".
This frame should depict the exact moment the {self.object} makes contact with the desk surface.
Code this frame as 'putting_down'.
"""
self.format = 'The format should be always the same except for the variables that change (<number>, <description>, <activity_variable>, <timestamp_float_number>). Do not add any extra dots, letters, or characters. Here is the format: "Frame <frame_number>: Description - <description in a few words>, Activity - <activity_variable>, Timestamp - <timestamp_float_number>". Consider this example: "Frame 120: Description - The user picks up the book, Activity - picks_up_book, Timestamp - 4.798511111"'

def read_video_ts(self, video_path, audio=False, auto_thread_type=True):
"""
A function to read a video, extract frames, and store them as base64 encoded strings.
:param video_path: the path to the video
"""
# Read the video
with av.open(video_path) as video_container, Progress() as progress:
if audio:
stream = video_container.streams.audio[0]
else:
stream = video_container.streams.video[0]
if auto_thread_type:
stream.thread_type = "AUTO"
fps = stream.average_rate # alt base_rate or guessed_rate
nframes = stream.frames
logging.info("Extracting pts...")
pts, dts, ts = (list() for i in range(3))
decode_task = progress.add_task("👓 Decoding...", total=nframes)
for packet in video_container.demux(stream):
for frame in packet.decode():
if frame is not None and frame.pts is not None:
pts.append(frame.pts)
dts.append(frame.dts) if frame.dts is not None else logging.info(
f"Decoding timestamp is missing at frame {len(pts)}"
)
ts.append(
(
frame.pts * frame.time_base
- stream.start_time * frame.time_base
)
* 1e9
)

# Convert the frame to an image and encode it in base64
img = frame.to_ndarray(format='bgr24')
_, buffer = cv2.imencode(".jpg", img)
self.base64Frames.append(base64.b64encode(buffer).decode("utf-8"))

progress.advance(decode_task)
progress.refresh()
pts, dts, ts = (
np.array(pts, dtype=np.uint64),
np.array(dts, dtype=np.uint64),
np.array(ts, dtype=np.uint64),
)
if not isMonotonicInc(pts):
logging.warning("Pts are not monotonic increasing!.")
if np.array_equal(pts, dts):
logging.info("Pts and dts are equal, using pts")

idc = pts.argsort()
pts = pts[idc]
ts = ts[idc]

if nframes != len(pts):
nframes = len(pts)
else:
logging.info(f"Video has {nframes} frames")

timestamps_s = ts / 1e9
self.video_df = pd.DataFrame(
{
"frames": np.arange(nframes),
"pts": [int(pt) for pt in pts],
"timestamp [ns]": ts,
"timestamp [s]": timestamps_s
}
)
return self.video_df #, fps, nframes, pts, ts

def query_frame(self, index):

base64_frames_content = [{"image": self.base64Frames[index], "resize": 768}]
video_df_content = [self.video_df.iloc[index].to_dict()]

PROMPT_MESSAGES = [
{
"role": "system",
"content": (self.base_prompt + self.format),
},
{
"role": "user",
"content": f"Here are the activities: {self.picking_up} , {self.opening} {self.closing}, {self.putting_down})",
},
{
"role": "user",
"content": f"The frames are extracted from this video and the timestamps and frame numbers are stored in video_df: {json.dumps(video_df_content)}",
},
{
"role": "user",
"content": base64_frames_content},
]

params = {
"model": "gpt-4o",
"messages": PROMPT_MESSAGES,
"max_tokens": 2000,
}

result = self.client.chat.completions.create(**params)
response = result.choices[0].message.content
print("Response from OpenAI API:", response)

pattern = r'Frame\s(\d+):\sDescription\s-\s.*?,\sActivity\s-\s([^,]+),\sTimestamp\s-\s([\d.]+)(?=\s|$)'
match = re.search(pattern, response)

# if match:
if match:
frame_number = int(match.group(1))
activity = match.group(2)
timestamp = float(match.group(3))
return {
"frame_id": frame_number,
"timestamp [s]": timestamp,
"activity": activity,
}
else:
print("No match found in the response")


def binary_search(self, start, end, identified_activities):
if start >= end:
return []

mid = (start + end) // 2
print(f"Binary search range: {start}-{end}, mid: {mid}")

mid_frame_result = self.query_frame(mid)
results = []

if mid_frame_result:
activity = mid_frame_result["activity"]
if activity not in identified_activities:
identified_activities.add(activity)
results.append(mid_frame_result)
left_results = self.binary_search(start, mid, identified_activities)
results.extend(left_results)
else:
right_results = self.binary_search(mid + 1, end, identified_activities)
results.extend(right_results)
return results

def prompting(self, save_path):
identified_activities = set()
activity_data = self.binary_search(0, len(self.base64Frames),identified_activities)
print("Filtered Activity Data:", activity_data)
output_df = pd.DataFrame(activity_data)
output_df.to_csv(os.path.join(save_path, "output.csv"), index=False)
return output_df

if __name__ == "__main__":
worksp_id = "<your_workspace_id>"
rec_id = "<your_recording_id>"
cloud_api_key = "<your_cloud_API_token>"
DOWNLOAD_PATH = "<your_path>/recs.zip"
OPENAI_API_KEY = '<your_openAI_key>'

download_path = os.path.normpath(DOWNLOAD_PATH)
download_path = Path(download_path)

# Download recording
download_recording(rec_id, worksp_id, download_path, cloud_api_key)
recpath = Path(download_path / rec_id)
files = glob.glob(str(Path(recpath, "*.mp4")))
if len(files) != 1:
error = "There should be only one video in the raw folder!"
raise Exception(error)
video_path = files[0]

# Process video
activity_rec_module = ActivityRecognition()
video_df = activity_rec_module.read_video_ts(video_path)
output_df = pd.DataFrame(video_df)
output_df.to_csv(os.path.join(recpath, 'video_df.csv'), index=False)
print(len(activity_rec_module.base64Frames), "frames read.")
print(len(video_df['timestamp [s]']))

output = activity_rec_module.prompting(recpath)
if output is not None:
print(output)
else:
print("No valid data returned from the API.")

for index, row in output.iterrows():
print(f"Relative timestamp: {row['timestamp [s]']}")
print(row['activity'])
send_event_to_cloud(worksp_id, rec_id, row['activity'], row['timestamp [s]'], cloud_api_key)


48 changes: 48 additions & 0 deletions src/pupil_labs/automate_custom_events/cloud_interaction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import requests
import json
import logging
import glob
import shutil
from pathlib import Path
import os

API_URL = "https://api.cloud.pupil-labs.com/v2"

def download_url(path: str, save_path: str, API_KEY, chunk_size=128) -> None:
url = f"{API_URL}/{path}"
r = requests.get(url, stream=True, headers={"api-key": API_KEY})
r.raise_for_status() # Ensure we raise an exception for bad status codes
save_path = Path(save_path)
with open(save_path, 'wb') as fd:
for chunk in r.iter_content(chunk_size=chunk_size):
fd.write(chunk)

return r.status_code

def download_recording(recording_id: str, workspace_id: str, download_path: str, API_KEY) -> None:
download_path = Path(download_path) # Ensure download_path is a Path object
download_path.mkdir(parents=True, exist_ok=True) # Create directory if it doesn't exist

save_path = download_path / f"{recording_id}.zip"
status = download_url(f"workspaces/{workspace_id}/recordings:raw-data-export?ids={recording_id}", save_path, API_KEY, chunk_size=128)
shutil.unpack_archive(save_path, download_path / recording_id)
os.remove(save_path)
for file_source in glob.glob(str(download_path / f"{recording_id}/*/*")):
file_source = Path(file_source)
file_destination = file_source.parents[1] / file_source.name
shutil.move(file_source, file_destination)
return status

def send_event_to_cloud(workspace_id, recording_id, keyword, timestamp_sec, API_KEY):
url = f"{API_URL}/workspaces/{workspace_id}/recordings/{recording_id}/events"
headers = {
"accept": "application/json",
"Content-Type": "application/json",
"api-key": API_KEY,
}
data = {"name": keyword, "offset_s": timestamp_sec}
response = requests.post(url, headers=headers, data=json.dumps(data))
if response.status_code == 200:
logging.info(f"Event sent successfully: {data}")
else:
logging.error(f"Failed to send event: {response.status_code}, {response.text}")
2 changes: 1 addition & 1 deletion tests/test_api.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import pupil_labs.project_name as this_project
import pupil_labs.automate_custom_events as this_project


def test_package_metadata() -> None:
Expand Down

0 comments on commit 9304aac

Please sign in to comment.