Skip to content

Commit

Permalink
feat: add pearson_vue module and rti backend (#152)
Browse files Browse the repository at this point in the history
* feat: add pearson_vue module and rti backend

* test: add pipeline_index unit test
  • Loading branch information
andrey-canon authored May 17, 2024
1 parent 9f32722 commit 4dc2fe1
Show file tree
Hide file tree
Showing 6 changed files with 261 additions and 0 deletions.
Empty file.
49 changes: 49 additions & 0 deletions eox_nelp/pearson_vue/rti_backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""
This module provides the RealTimeImport class, which is responsible for orchestrating the RTI pipeline
and executing various processes related to rti.
Classes:
RealTimeImport: Class for managing RTI operations and executing the pipeline.
"""


class RealTimeImport:
"""
Class for managing RTI (Real Time Import) operations and executing the pipeline.
Attributes:
backend_data (dict): A dictionary containing backend-specific data.
Methods:
run_pipeline(): Executes the RTI pipeline by iterating through the pipeline functions.
get_pipeline(): Returns the RTI pipeline, which is a list of functions to be executed.
"""

def __init__(self, **kwargs):
"""
Initializes the RealTimeImport instance with the provided keyword arguments.
Args:
**kwargs: Additional keyword arguments to configure the RealTimeImport instance.
"""
self.backend_data = kwargs.copy()

def run_pipeline(self):
"""
Executes the RTI pipeline by iterating through the pipeline functions.
"""
pipeline = self.get_pipeline()
pipeline_index = self.backend_data.get("pipeline_index", 0)

for idx, func in enumerate(pipeline[pipeline_index:]):
self.backend_data["pipeline_index"] = pipeline_index + idx
result = func(**self.backend_data) or {}
self.backend_data.update(result)

def get_pipeline(self):
"""
Returns the RTI pipeline, which is a list of functions to be executed.
"""
return [
# Add RTI pipeline functions here
]
30 changes: 30 additions & 0 deletions eox_nelp/pearson_vue/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""
This module contains functions and classes for making asynchronous calls to Pearson VUE's RTI services.
Functions:
real_time_import_task(data: dict) -> None: Performs an asynchronous call to the RTI service.
"""

from celery import shared_task

from eox_nelp.pearson_vue.rti_backend import RealTimeImport


@shared_task(bind=True)
def real_time_import_task(self, pipeline_index=0, **kwargs):
"""
Performs an asynchronous call to Pearson VUE's RTI (Real Time Import) service.
This task initiates the real-time import process using the provided pipeline index and optional keyword arguments.
Args:
self: The Celery task instance.
pipeline_index (int): The index of the pipeline to be executed (default is 0).
**kwargs: Additional keyword arguments to configure the RTI service.
"""
rti = RealTimeImport(pipeline_index=pipeline_index, **kwargs.copy())

try:
rti.run_pipeline()
except Exception as exc: # pylint: disable=broad-exception-caught
self.retry(exc=exc, kwargs=rti.backend_data)
Empty file.
111 changes: 111 additions & 0 deletions eox_nelp/pearson_vue/tests/test_rti_backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
"""
This module contains unit tests for the RealTimeImport class and its methods in rti_backend.py.
"""
import unittest
from unittest.mock import MagicMock, call

from eox_nelp.pearson_vue.rti_backend import RealTimeImport


class TestRealTimeImport(unittest.TestCase):
"""
Unit tests for the RealTimeImport class.
"""

def setUp(self):
"""
Set up the test environment.
"""
self.backend_data = {"pipeline_index": 0}
self.rti = RealTimeImport(**self.backend_data)

def test_init(self):
"""
Test the initialization of the RealTimeImport class.
Expected behavior:
- Instance backend_data is the same as the initialization data.
"""
self.assertDictEqual(self.rti.backend_data, self.backend_data)

def test_run_pipeline(self):
"""
Test the execution of the RTI pipeline.
Expected behavior:
- Pipeline method 1 is called with the original data.
- Pipeline method 2 is called with updated data.
- backend_data attribute is the expected value.
"""
# Mock pipeline functions
func1 = MagicMock(return_value={"updated_key": "value1"})
func2 = MagicMock(return_value={"additional_key": "value2"})
self.rti.get_pipeline = MagicMock(return_value=[func1, func2])

self.rti.run_pipeline()

func1.assert_called_once_with(**self.backend_data)
func2.assert_called_once_with(**{"updated_key": "value1", "pipeline_index": 1})
self.assertDictEqual(
self.rti.backend_data,
{
"pipeline_index": len(self.rti.get_pipeline()) - 1, # includes total of pipeline methods
**func1(), # Include data from func1
**func2(), # Include data from func2
},
)

def test_pipeline_index(self):
"""
Test that the pipeline start from the pipeline_index position.
Expected behavior:
- Pipeline method 1 is called once.
- Pipeline method 2 is called once.
- Pipeline method 3 is called twice
- backend_data attribute is the expected value.
"""
# Mock pipeline functions
func1 = MagicMock(return_value={"updated_key": "value1"})
func2 = MagicMock(return_value={"additional_key": "value2"})
func3 = MagicMock()
func3_output = {"last_value": "value3"}
func3.side_effect = [Exception("Test exception"), func3_output]
rti = RealTimeImport(pipeline_index=0)
rti.get_pipeline = MagicMock(return_value=[func1, func2, func3])

with self.assertRaises(Exception):
# Running first time until the func3 raises an exception
rti.run_pipeline()

# This execution only runs the third method
rti.run_pipeline()

func1.assert_called_once_with(**{"pipeline_index": 0})
func2.assert_called_once_with(**{"updated_key": "value1", "pipeline_index": 1})
func3.assert_has_calls([
call(**{"updated_key": "value1", "additional_key": "value2", "pipeline_index": 2}),
call(**{"updated_key": "value1", "additional_key": "value2", "pipeline_index": 2}),
])
self.assertDictEqual(
rti.backend_data,
{
"pipeline_index": len(rti.get_pipeline()) - 1, # includes total of pipeline methods
**func1(), # Include data from func1
**func2(), # Include data from func2
**func3_output, # Include data from func3
},
)

def test_get_pipeline(self):
"""
Test the retrieval of the RTI pipeline.
Expected behavior:
- Method return a list instance
- All the pipeline items are callable.
"""
pipeline = self.rti.get_pipeline()

self.assertIsInstance(pipeline, list)
self.assertTrue(all(callable(func) for func in pipeline))
71 changes: 71 additions & 0 deletions eox_nelp/pearson_vue/tests/test_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
"""
This module contains unit tests for the tasks.py module and its functions.
"""
import unittest
from unittest.mock import MagicMock, call

from eox_nelp.pearson_vue.tasks import real_time_import_task # Importa tu función aquí


class TestRealTimeImportTask(unittest.TestCase):
"""
Unit tests for the real_time_import_task function.
"""

def setUp(self):
"""
Set up the test environment.
"""
self.mock_rti = MagicMock()
self.mock_rti.run_pipeline.side_effect = lambda: None
self.mock_rti_instance = MagicMock(return_value=self.mock_rti)

def test_real_time_import_task_success(self):
"""
Test real_time_import_task with successful execution of RTI pipeline.
Expected behavior:
- Task result is None.
- Instance is initialized just once.
- Method run_pipeline is called just once.
"""
input_data = {
"user_id": 5,
"course_id": "course-v1:FutureX+guide+2023"
}

with unittest.mock.patch("eox_nelp.pearson_vue.tasks.RealTimeImport", self.mock_rti_instance):
self.assertIsNone(real_time_import_task.apply(kwargs=input_data).get())

self.mock_rti_instance.assert_called_once_with(pipeline_index=0, **input_data)
self.mock_rti.run_pipeline.assert_called_once()

def test_real_time_import_task_retry(self):
"""
Test real_time_import_task with retry due to exception during RTI pipeline execution.
Expected behavior:
- Instance is initialized twice with variable data.
- Method run_pipeline is called twice
"""
input_data = {
"user_id": 5,
"course_id": "course-v1:FutureX+guide+2023"
}
second_attempt_data = {
"user_id": 5,
"course_id": "course-v1:FutureX+guide+2023",
"email": "[email protected]",
}
mock_exception = Exception("Test exception")
self.mock_rti.run_pipeline.side_effect = [mock_exception, None]
self.mock_rti.backend_data = second_attempt_data

with unittest.mock.patch("eox_nelp.pearson_vue.tasks.RealTimeImport", self.mock_rti_instance):
real_time_import_task.apply(kwargs=input_data)

self.mock_rti_instance.assert_has_calls([
call(pipeline_index=0, **input_data),
call(pipeline_index=0, **second_attempt_data),
])
self.assertEqual(self.mock_rti.run_pipeline.call_count, 2)

0 comments on commit 4dc2fe1

Please sign in to comment.