diff --git a/${output_directory}/xchembku_dataface.sqlite b/${output_directory}/xchembku_dataface.sqlite deleted file mode 100644 index a5b0cb6..0000000 Binary files a/${output_directory}/xchembku_dataface.sqlite and /dev/null differ diff --git a/src/xchembku_api/datafaces/context.py b/src/xchembku_api/datafaces/context.py index 0a52b49..1367c62 100644 --- a/src/xchembku_api/datafaces/context.py +++ b/src/xchembku_api/datafaces/context.py @@ -35,6 +35,9 @@ async def aenter(self): # Open client session to the service or direct connection. await self.interface.open_client_session() + # For convenience, return the object which is the client interface. + return self.interface + # ---------------------------------------------------------------------------------------- async def aexit(self): """ """ diff --git a/src/xchembku_cli/subcommands/base.py b/src/xchembku_cli/subcommands/base.py index f252d00..37625ed 100644 --- a/src/xchembku_cli/subcommands/base.py +++ b/src/xchembku_cli/subcommands/base.py @@ -49,6 +49,7 @@ def get_multiconf(self, args_dict: dict): "USER": os.environ.get("USER", "USER"), "PATH": os.environ.get("PATH", "PATH"), "PYTHONPATH": os.environ.get("PYTHONPATH", "PYTHONPATH"), + "output_directory": os.environ.get("output_directory", "output_directory"), } if hasattr(self._args, "visit") and self._args.visit != "VISIT": diff --git a/src/xchembku_lib/datafaces/aiohttp.py b/src/xchembku_lib/datafaces/aiohttp.py index 5bcf47b..11c529c 100644 --- a/src/xchembku_lib/datafaces/aiohttp.py +++ b/src/xchembku_lib/datafaces/aiohttp.py @@ -40,7 +40,7 @@ def __init__(self, specification=None): specification["type_specific_tbd"]["aiohttp_specification"], ) - self.__actual_xchembku_dataface = None + self.__actual_dataface = None # ---------------------------------------------------------------------------------------- def callsign(self): @@ -85,7 +85,7 @@ async def activate_coro(self): route_tuples = [] # Build a local xchembku_dataface for our back-end. - self.__actual_xchembku_dataface = Datafaces().build_object( + self.__actual_dataface = Datafaces().build_object( self.specification()["type_specific_tbd"][ "actual_xchembku_dataface_specification" ] @@ -96,7 +96,7 @@ async def activate_coro(self): self.__transaction_lock = asyncio.Lock() # Get the local implementation started. - await self.__actual_xchembku_dataface.start() + await self.__actual_dataface.start() await self.activate_coro_base(route_tuples) @@ -108,7 +108,7 @@ async def direct_shutdown(self): """""" try: # Disconnect our local dataface connection, i.e. the one which holds the database connection. - await self.__actual_xchembku_dataface.disconnect() + await self.__actual_dataface.disconnect() except Exception as exception: raise RuntimeError( @@ -130,7 +130,7 @@ async def __do_actually(self, function, args, kwargs): # logger.info(describe("kwargs", kwargs)) # Get the function which the caller wants executed. - function = getattr(self.__actual_xchembku_dataface, function) + function = getattr(self.__actual_dataface, function) # Caller wants the function wrapped in a transaction? if "as_transaction" in kwargs: @@ -142,16 +142,16 @@ async def __do_actually(self, function, args, kwargs): if as_transaction: # Make sure we have an actual connection. - await self.__actual_xchembku_dataface.establish_database_connection() + await self.__actual_dataface.establish_database_connection() # Lock out all other requests from running their own transaction. async with self.__transaction_lock: try: - await self.__actual_xchembku_dataface.begin() + await self.__actual_dataface.begin() response = await function(*args, **kwargs) - await self.__actual_xchembku_dataface.commit() + await self.__actual_dataface.commit() except Exception: - await self.__actual_xchembku_dataface.rollback() + await self.__actual_dataface.rollback() raise else: response = await function(*args, **kwargs) diff --git a/tests/test_cli.py b/tests/test_cli1.py similarity index 80% rename from tests/test_cli.py rename to tests/test_cli1.py index 27824ef..810be6a 100644 --- a/tests/test_cli.py +++ b/tests/test_cli1.py @@ -12,7 +12,6 @@ from xchembku_api.datafaces.context import Context as ClientContext # Object managing datafaces. -from xchembku_api.datafaces.datafaces import xchembku_datafaces_get_default from xchembku_api.models.crystal_plate_filter_model import CrystalPlateFilterModel from xchembku_api.models.crystal_plate_model import CrystalPlateModel @@ -20,7 +19,7 @@ # ---------------------------------------------------------------------------------------- -class TestCliSqlite: +class TestCli1Sqlite: """ Test that we can start the service (which uses sqlite) via the command line and talk to it. """ @@ -28,7 +27,7 @@ class TestCliSqlite: def test(self, constants, logging_setup, output_directory): configuration_file = "tests/configurations/service_sqlite.yaml" - CliTester(configuration_file).main( + Cli1Tester(configuration_file).main( constants, configuration_file, output_directory, @@ -36,7 +35,7 @@ def test(self, constants, logging_setup, output_directory): # ---------------------------------------------------------------------------------------- -class TestCliMysql: +class TestCli1Mysql: """ Test that we can start the service (which uses mysql) via the command line and talk to it. """ @@ -44,7 +43,7 @@ class TestCliMysql: def test(self, constants, logging_setup, output_directory): configuration_file = "tests/configurations/service_mysql.yaml" - CliTester(configuration_file).main( + Cli1Tester(configuration_file).main( constants, configuration_file, output_directory, @@ -52,7 +51,7 @@ def test(self, constants, logging_setup, output_directory): # ---------------------------------------------------------------------------------------- -class CliTester(Base): +class Cli1Tester(Base): """ Class to test the dataface. """ @@ -64,7 +63,6 @@ def __init__(self, configuration_file): async def _main_coroutine(self, constants, output_directory): """ """ - logger.debug("in CliTester") # Command to run the service. xchembku_server_cli = [ @@ -81,7 +79,7 @@ async def _main_coroutine(self, constants, output_directory): os.environ["output_directory"] = output_directory # Launch the service as a process. - logger.debug(f"launching {' '.join(xchembku_server_cli)}") + logger.debug(f"launching subprocess {' '.join(xchembku_server_cli)}") process = subprocess.Popen( xchembku_server_cli, stdout=subprocess.PIPE, @@ -89,22 +87,20 @@ async def _main_coroutine(self, constants, output_directory): ) try: - # Read the configuration. + # Read the same configuration which the service process reads. multiconf_object = self.get_multiconf() multiconf_dict = await multiconf_object.load() # Get a client context to the server in the process we just started. xchembku_specification = multiconf_dict["xchembku_dataface_specification"] xchembku_client_context = ClientContext(xchembku_specification) - async with xchembku_client_context: - # Client to the dataface. - dataface = xchembku_datafaces_get_default() - - # Wait for process is able to give a health report. + async with xchembku_client_context as xchembku_client: + # Wait until process is able to give a non-exceptional health report. start_time = time.time() max_seconds = 5.0 while True: - health = await dataface.client_report_health() + # Try to check the health. + health = await xchembku_client.client_report_health() # Check if health report contains an exception. exception = health.get("exception") @@ -153,17 +149,17 @@ async def _main_coroutine(self, constants, output_directory): ) ) - await dataface.upsert_crystal_plates(models) + await xchembku_client.upsert_crystal_plates(models) # Check the filtered queries. await self.__check( - dataface, + xchembku_client, CrystalPlateFilterModel(), 3, "all", ) - await dataface.client_shutdown() + await xchembku_client.client_shutdown() finally: try: # Wait for the process to finish and get the output. @@ -177,13 +173,16 @@ async def _main_coroutine(self, constants, output_directory): return_code = process.returncode logger.debug(f"server return_code is {return_code}") - logger.debug( - f"================================== server stderr is:\n{stderr_bytes.decode()}" - ) - logger.debug( - f"================================== server stdout is:\n{stdout_bytes.decode()}" - ) - logger.debug("==================================") + if len(stderr_bytes) > 0: + logger.debug( + f"================================== server stderr is:\n{stderr_bytes.decode()}" + ) + if len(stdout_bytes) > 0: + logger.debug( + f"================================== server stdout is:\n{stdout_bytes.decode()}" + ) + if len(stderr_bytes) > 0 or len(stdout_bytes) > 0: + logger.debug("================================== end of server output") assert return_code == 0 diff --git a/tests/test_cli2.py b/tests/test_cli2.py new file mode 100644 index 0000000..e1f8c34 --- /dev/null +++ b/tests/test_cli2.py @@ -0,0 +1,217 @@ +import asyncio +import logging +import os +import subprocess +import time +from typing import Optional + +from dls_utilpack.describe import describe + +# Base class for the tester. +from tests.base import Base +from xchembku_api.datafaces.context import Context as ClientContext + +# Object managing datafaces. +from xchembku_api.models.crystal_plate_filter_model import CrystalPlateFilterModel +from xchembku_api.models.crystal_plate_model import CrystalPlateModel + +logger = logging.getLogger(__name__) + + +# ---------------------------------------------------------------------------------------- +class TestCli2Sqlite: + """ + Test that we can start the service (which uses sqlite) via the command line and talk to it. + """ + + def test(self, constants, logging_setup, output_directory): + + configuration_file = "tests/configurations/service_sqlite.yaml" + Cli2Tester(configuration_file).main( + constants, + configuration_file, + output_directory, + ) + + +# ---------------------------------------------------------------------------------------- +class TestCli2Mysql: + """ + Test that we can start the service (which uses mysql) via the command line and talk to it. + """ + + def test(self, constants, logging_setup, output_directory): + + configuration_file = "tests/configurations/service_mysql.yaml" + Cli2Tester(configuration_file).main( + constants, + configuration_file, + output_directory, + ) + + +# ---------------------------------------------------------------------------------------- +class Cli2Tester(Base): + """ + Class to test the dataface. + """ + + def __init__(self, configuration_file): + Base.__init__(self) + + self.__configuration_file = configuration_file + + async def _main_coroutine(self, constants, output_directory): + """ """ + + # Command to run the service. + xchembku_server_cli = [ + "python", + "-m", + "xchembku_cli.main", + "service", + "--verbose", + "--configuration", + self.__configuration_file, + ] + + # Let the output_directory symbol be replaced in the multiconf. + os.environ["output_directory"] = output_directory + + # Launch the service as a process. + logger.debug(f"launching subprocess {' '.join(xchembku_server_cli)}") + process = subprocess.Popen( + xchembku_server_cli, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + + try: + # Read the same configuration which the service process reads. + multiconf_object = self.get_multiconf() + multiconf_dict = await multiconf_object.load() + + # Get a client context to the server in the process we just started. + xchembku_specification = multiconf_dict["xchembku_dataface_specification"] + xchembku_client1_context = ClientContext(xchembku_specification) + async with xchembku_client1_context as dataface1: + + # Wait until process is able to give a non-exceptional health report. + start_time = time.time() + max_seconds = 5.0 + while True: + health = await dataface1.client_report_health() + + # Check if health report contains an exception. + exception = health.get("exception") + if exception is None: + logger.debug(describe("health", health)) + # Continue with test if no exception. + break + + logger.debug(f"[CONNRETRY] retrying after {exception}") + + if process.poll() is not None: + raise RuntimeError( + "server apprently died without being able to give a health check" + ) + + # Too much time has elapsed? + if time.time() - start_time > max_seconds: + raise RuntimeError( + f"server not answering within {max_seconds} seconds" + ) + + await asyncio.sleep(1.0) + + # Interact with the server now that it's up. + visit = "cm00001-1" + models = [] + models.append( + CrystalPlateModel( + formulatrix__plate__id=10, + barcode="xyz1", + visit=visit, + ) + ) + models.append( + CrystalPlateModel( + formulatrix__plate__id=20, + barcode="xyz2", + visit=visit, + ) + ) + models.append( + CrystalPlateModel( + formulatrix__plate__id=30, + barcode="xyz3", + visit=visit, + ) + ) + + # Insert all three plates at once. + await dataface1.upsert_crystal_plates(models) + + # Start a second concurrent client connection. + xchembku_client2_context = ClientContext(xchembku_specification) + async with xchembku_client2_context as dataface2: + + # Check the filtered queries from the second connection. + # This verifies the previous upserts have been done. + await self.__check( + dataface2, + CrystalPlateFilterModel(), + 3, + "all", + ) + + # Second client requests the shutdown. + await dataface2.client_shutdown() + finally: + try: + # Wait for the process to finish and get the output. + stdout_bytes, stderr_bytes = process.communicate(timeout=5) + except subprocess.TimeoutExpired: + # Timeout happens when client dies but server hasn't been told to shutdown. + process.kill() + stdout_bytes, stderr_bytes = process.communicate() + + # Get the return code of the process + return_code = process.returncode + logger.debug(f"server return_code is {return_code}") + + if len(stderr_bytes) > 0: + logger.debug( + f"================================== server stderr is:\n{stderr_bytes.decode()}" + ) + if len(stdout_bytes) > 0: + logger.debug( + f"================================== server stdout is:\n{stdout_bytes.decode()}" + ) + if len(stderr_bytes) > 0 or len(stdout_bytes) > 0: + logger.debug("================================== end of server output") + + assert return_code == 0 + + # ---------------------------------------------------------------------------------------- + + async def __check( + self, + dataface, + filter: CrystalPlateFilterModel, + expected: int, + note: str, + formulatrix__plate__id: Optional[int] = None, + ): + """ """ + + crystal_plate_models = await dataface.fetch_crystal_plates(filter) + + assert len(crystal_plate_models) == expected, note + + if formulatrix__plate__id is not None: + assert ( + crystal_plate_models[0].formulatrix__plate__id == formulatrix__plate__id + ), f"{note} formulatrix__plate__id" + + return crystal_plate_models