-
Notifications
You must be signed in to change notification settings - Fork 56
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
3 changed files
with
239 additions
and
33 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
# Backend/proctor_core.py | ||
|
||
import cv2 | ||
import mediapipe as mp | ||
import numpy as np | ||
from typing import Dict, List, Tuple | ||
import logging | ||
import os | ||
|
||
# Local imports | ||
from .detection import run_detection | ||
from .head_pose import pose | ||
from .object_detection import detect_objects | ||
from .audio import process_audio | ||
from .screen_recorder import capture_screen | ||
|
||
class ProctorCore: | ||
def __init__(self): | ||
self.mp_face_detection = mp.solutions.face_detection | ||
self.mp_pose = mp.solutions.pose | ||
self.face_detection = self.mp_face_detection.FaceDetection(min_detection_confidence=0.7) | ||
self.pose_detection = self.mp_pose.Pose(min_detection_confidence=0.7) | ||
self.logger = self._setup_logger() | ||
|
||
def _setup_logger(self) -> logging.Logger: | ||
"""Configure logging for the proctoring system""" | ||
logger = logging.getLogger('ProctorCore') | ||
logger.setLevel(logging.INFO) | ||
handler = logging.StreamHandler() | ||
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') | ||
handler.setFormatter(formatter) | ||
logger.addHandler(handler) | ||
return logger | ||
|
||
def start_monitoring(self): | ||
"""Initialize and start all monitoring components""" | ||
try: | ||
# Start detection systems | ||
detection_result = run_detection() | ||
pose_result = pose() | ||
screen_capture = capture_screen() | ||
|
||
# Process and combine results | ||
combined_results = self._process_results( | ||
detection_result, | ||
pose_result, | ||
screen_capture | ||
) | ||
|
||
return combined_results | ||
|
||
except Exception as e: | ||
self.logger.error(f"Error in monitoring: {str(e)}") | ||
raise | ||
|
||
def _process_results(self, detection_data, pose_data, screen_data) -> Dict: | ||
"""Process and combine results from different detection systems""" | ||
results = { | ||
'timestamp': np.datetime64('now'), | ||
'detection': detection_data, | ||
'pose': pose_data, | ||
'screen': screen_data, | ||
'suspicious_level': 0.0 | ||
} | ||
|
||
# Calculate suspicious level based on combined factors | ||
suspicious_factors = [ | ||
detection_data.get('suspicious_score', 0), | ||
pose_data.get('deviation_score', 0), | ||
screen_data.get('activity_score', 0) | ||
] | ||
|
||
results['suspicious_level'] = np.mean([x for x in suspicious_factors if x is not None]) | ||
|
||
return results | ||
|
||
def save_results(self, results: Dict, output_path: str = None): | ||
"""Save monitoring results to specified location""" | ||
if output_path is None: | ||
output_path = os.path.join( | ||
os.path.dirname(__file__), | ||
'Dataset', | ||
f'proctor_results_{np.datetime64("now")}.json' | ||
) | ||
|
||
try: | ||
import json | ||
with open(output_path, 'w') as f: | ||
json.dump(results, f, indent=4, default=str) | ||
self.logger.info(f"Results saved to {output_path}") | ||
except Exception as e: | ||
self.logger.error(f"Error saving results: {str(e)}") | ||
|
||
def analyze_behavior(self, results: Dict) -> Dict: | ||
"""Analyze monitored behavior and generate insights""" | ||
analysis = { | ||
'timestamp': np.datetime64('now'), | ||
'overall_score': results.get('suspicious_level', 0), | ||
'warnings': [], | ||
'recommendations': [] | ||
} | ||
|
||
# Generate warnings based on thresholds | ||
if results.get('pose', {}).get('deviation_score', 0) > 0.7: | ||
analysis['warnings'].append('Significant head movement detected') | ||
|
||
if results.get('detection', {}).get('suspicious_score', 0) > 0.7: | ||
analysis['warnings'].append('Suspicious objects detected') | ||
|
||
if results.get('screen', {}).get('activity_score', 0) > 0.7: | ||
analysis['warnings'].append('Unusual screen activity detected') | ||
|
||
return analysis | ||
|
||
def cleanup(self): | ||
"""Cleanup resources and close connections""" | ||
try: | ||
cv2.destroyAllWindows() | ||
self.logger.info("Cleanup completed successfully") | ||
except Exception as e: | ||
self.logger.error(f"Error during cleanup: {str(e)}") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,28 +1,110 @@ | ||
import head_pose | ||
import detection | ||
import threading as th | ||
|
||
def run_threads(): | ||
try: | ||
# Create threads for each target function | ||
head_pose_thread = th.Thread(target=head_pose.pose) | ||
# audio_thread = th.Thread(target=audio.sound) # Uncomment if audio module is needed | ||
detection_thread = th.Thread(target=detection.run_detection) | ||
# Backend/run.py | ||
|
||
# Start the threads | ||
head_pose_thread.start() | ||
# audio_thread.start() # Uncomment to start audio thread | ||
detection_thread.start() | ||
import threading as th | ||
import logging | ||
import os | ||
from typing import Dict, List | ||
import queue | ||
from .proctor_core import ProctorCore | ||
|
||
# Wait for the threads to complete | ||
head_pose_thread.join() | ||
# audio_thread.join() # Uncomment to wait for audio thread | ||
detection_thread.join() | ||
class ProctorManager: | ||
def __init__(self): | ||
self.result_queue = queue.Queue() | ||
self.proctor = ProctorCore() | ||
self.is_running = False | ||
self.threads: List[th.Thread] = [] | ||
self.logger = self._setup_logger() | ||
|
||
def _setup_logger(self): | ||
logger = logging.getLogger('ProctorManager') | ||
logger.setLevel(logging.INFO) | ||
handler = logging.StreamHandler() | ||
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') | ||
handler.setFormatter(formatter) | ||
logger.addHandler(handler) | ||
return logger | ||
|
||
def _monitoring_worker(self): | ||
"""Worker function for continuous monitoring""" | ||
while self.is_running: | ||
try: | ||
results = self.proctor.start_monitoring() | ||
self.result_queue.put(results) | ||
except Exception as e: | ||
self.logger.error(f"Error in monitoring worker: {str(e)}") | ||
break | ||
|
||
def _analysis_worker(self): | ||
"""Worker function for analyzing results""" | ||
while self.is_running: | ||
try: | ||
results = self.result_queue.get(timeout=1) | ||
if results: | ||
analysis = self.proctor.analyze_behavior(results) | ||
self.proctor.save_results(analysis) | ||
except queue.Empty: | ||
continue | ||
except Exception as e: | ||
self.logger.error(f"Error in analysis worker: {str(e)}") | ||
break | ||
|
||
def start(self): | ||
"""Start the proctoring system""" | ||
try: | ||
self.is_running = True | ||
|
||
# Create worker threads | ||
monitoring_thread = th.Thread(target=self._monitoring_worker) | ||
analysis_thread = th.Thread(target=self._analysis_worker) | ||
|
||
# Start threads | ||
self.threads = [monitoring_thread, analysis_thread] | ||
for thread in self.threads: | ||
thread.start() | ||
|
||
self.logger.info("Proctoring system started successfully") | ||
|
||
except Exception as e: | ||
self.logger.error(f"Error starting proctoring system: {str(e)}") | ||
self.stop() | ||
|
||
def stop(self): | ||
"""Stop the proctoring system""" | ||
self.is_running = False | ||
|
||
# Wait for threads to complete | ||
for thread in self.threads: | ||
thread.join() | ||
|
||
# Cleanup resources | ||
self.proctor.cleanup() | ||
self.logger.info("Proctoring system stopped") | ||
|
||
def main(): | ||
# Setup logging | ||
logging.basicConfig( | ||
level=logging.INFO, | ||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | ||
) | ||
logger = logging.getLogger(__name__) | ||
|
||
try: | ||
# Initialize and start the proctoring system | ||
manager = ProctorManager() | ||
manager.start() | ||
|
||
# Keep running until interrupted | ||
while True: | ||
pass | ||
|
||
except KeyboardInterrupt: | ||
logger.info("Received shutdown signal") | ||
except Exception as e: | ||
print(f"An error occurred: {e}") | ||
logger.error(f"Unexpected error: {str(e)}") | ||
finally: | ||
print("All threads have been joined.") | ||
if 'manager' in locals(): | ||
manager.stop() | ||
logger.info("Application shutdown complete") | ||
|
||
if __name__ == "__main__": | ||
run_threads() | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters