diff --git a/docs/molgenis/use_usingpyclient.md b/docs/molgenis/use_usingpyclient.md index 1c03f15cbd..b52b14a1a8 100644 --- a/docs/molgenis/use_usingpyclient.md +++ b/docs/molgenis/use_usingpyclient.md @@ -64,28 +64,30 @@ The user roles on MOLGENIS EMX2 and their respective permissions are described i ### schema_names ```python -client.schema_names +Client().schema_names ``` A property that returns a list of the names of the schemas for which the user has at least _viewer_ permissions. ### status ```python -client.status +Client().status ``` A property that returns a string with information, including the server URL, the user (if applicable), the sign-in status, the version of MOLGENIS EMX2 running on the server, and the result of `schema_names`. ### get_schemas ```python -client.get_schemas() +def get_schemas(self) -> list[Schema]: + ... ``` Retrieves the schemas for which the user has at least _viewer_ permissions as a list of dictionaries containing for each schema the id, name, label and description. This method accepts no arguments. ### set_schema ```python -client.set_schema('My Schema') +def set_schema(self, name: str) -> str: + ... ``` Sets the default schema for the server in the property `default_schema`. Throws the `NoSuchSchemaException` if the user does not have at least _viewer_ permissions or if the schema does not exist. @@ -96,7 +98,8 @@ Throws the `NoSuchSchemaException` if the user does not have at least _viewer_ p ### set_token ```python -client.set_token(token='***************') +def set_token(self, token: str): + ... ``` Sets the client's token in case no token was supplied in the initialization. Raises the `TokenSigninException` when the client is already signed in with a username/password combination. @@ -108,12 +111,12 @@ Raises the `TokenSigninException` when the client is already signed in with a us ### get ```python -names = ['Alice', 'Benjamin'] -countries = ['Spain', 'Sweden'] -age_interval = [18, 75] -client.get(table='Data Table', schema='My Schema', query_filter=f"name != {names}" - f" and country.name == {countries}" - f" and age between {age_interval}", as_df=True) +def get(self, + table: str, + query_filter: str = None, + schema: str = None, + as_df: bool = False) -> list | pandas.DataFrame: + ... ``` Retrieves data from a table on a schema and returns the result either as a list of dictionaries or as a pandas DataFrame. Use the `query_filter` parameter to filter the results based on filters applied to the columns. @@ -138,7 +141,8 @@ Throws the `NoSuchColumnException` if the query filter contains a column id that ### get_schema_metadata ```python -client.get_schema_metadata(name='My Schema') +def get_schema_metadata(self, name: str = None) -> Schema: + ... ``` Retrieves the metadata of a schema and returns it in the _metadata.Schema_ format. See the description of the [Schema](use_usingpyclient.md#schema) metadata object below. @@ -151,9 +155,13 @@ See the description of the [Schema](use_usingpyclient.md#schema) metadata object ### export ```python -client.export(schema='My Schema', table='Data Table', fmt='csv') +async def export(self, + schema: str = None, + table: str = None, + fmt: OutputFormat = 'csv'): + ... ``` -Exports data from a schema to a file in the desired format. +Asynchronously exports data from a schema to a file in the desired format. If the table is specified, only data from that table is exported, otherwise the export contains all tables on the schema. If all tables from a schema are exported with given format `csv`, the data is exported as a zip file containing a csv file of each table. Throws the `NoSuchSchemaException` if the user does not have at least _viewer_ permissions or if the schema does not exist. @@ -167,8 +175,12 @@ Throws the `NoSuchSchemaException` if the user does not have at least _viewer_ p ### save_schema ```python -client.save_schema(table='Data Table', name='My Schema', - file='location/of/data/file.csv', data=[{'col1': value1, ...}, {'col2': value2, ...}, ...]) +def save_schema(self, + table: str, + name: str = None, + file: str = None, + data: list | pandas.DataFrame = None): + ``` Imports or updates records in a table of a named schema. The data either originates from a file on the disk, or is supplied by the user after, for example, preprocessing. @@ -186,7 +198,8 @@ Throws the `NoSuchSchemaException` if the schema is not found on the server. ### upload_file ```python -client.upload_file(file_path='location/of/data/file.zip', schema='My Schema') +async def upload_file(self, file_path: str | pathlib.Path, schema: str = None): + ... ``` Imports table data and/or metadata to a schema from a file on the disk. This method supports `zip`, `xlsx`, and `csv` files. @@ -202,8 +215,12 @@ Throws the `NoSuchSchemaException` if the schema is not found on the server. ### delete_records ```python -client.delete_records(table='Data Table', schema='My Schema', - file='location/of/data/file.csv', data=[{'col1': value1, ...}, {'col2': value2, ...}, ...]) +def delete_records(self, + table: str, + schema: str = None, + file: str = None, + data: list | pandas.DataFrame = None): + ... ``` Deletes data records from a table. As in the `save_schema` method, the data either originates from disk or the program. @@ -222,7 +239,12 @@ Throws the `NoSuchSchemaException` if the schema is not found on the server. ### create_schema ```python -client.create_schema(name='New Schema', description='My new schema!', template='DATA_CATALOGUE', include_demo_data=True) +async def create_schema(self, + name: str = None, + description: str = None, + template: str = None, + include_demo_data: bool = False): + ... ``` Creates a new schema on the server. If no template is selected, an empty schema is created. @@ -270,7 +292,8 @@ Only users with _admin_ privileges are able to perform this action. ### delete_schema ```python -client.delete_schema(name='Old Schema') +async def delete_schema(self, name: str = None): + ... ``` Deletes a schema from the server. @@ -286,7 +309,8 @@ Throws the `NoSuchSchemaException` if the schema is not found on the server. ### update_schema ```python -client.update_schema(name='My Schema', description='The new description of this schema.') +def update_schema(self, name: str = None, description: str = None): + ... ``` Updates a schema's description. Only users with _admin_ privileges are able to perform this action. @@ -300,8 +324,12 @@ Throws the `NoSuchSchemaException` if the schema is not found on the server. ### recreate_schema ```python -client.recreate_schema(name='My Schema', description='Updated description', - template='DATA_CATALOGUE', include_demo_data=False) +async def recreate_schema(self, + name: str = None, + description: str = None, + template: str = None, + include_demo_data: bool = None): + ... ``` Recreates a schema on the EMX2 server by deleting and subsequently creating it without data on the EMX2 server. diff --git a/tools/pyclient/dev/dev.py b/tools/pyclient/dev/dev.py index 99d88dab79..4ced069491 100644 --- a/tools/pyclient/dev/dev.py +++ b/tools/pyclient/dev/dev.py @@ -2,7 +2,7 @@ # FILE: dev.py # AUTHOR: David Ruvolo, Ype Zijlstra # CREATED: 2023-05-22 -# MODIFIED: 2024-07-16 +# MODIFIED: 2024-08-22 # PURPOSE: development script for initial testing of the py-client # STATUS: ongoing # PACKAGES: pandas, python-dotenv @@ -39,15 +39,15 @@ async def main(): async with Client('https://emx2.dev.molgenis.org/', schema='catalogue') as client: participant_range = [10_000, 20_000.5] - big_data = client.get(table='Subcohorts', + big_data = client.get(table='Collection subcohorts', query_filter=f'`numberOfParticipants` between {participant_range}', as_df=True) print(big_data.head().to_string()) - countries = ["Denmark", "France"] - cohorts = client.get(table='Cohorts', - query_filter=f'subcohorts.countries.name != {countries}', + excluded_countries = ["Denmark", "France"] + collections = client.get(table='Collections', + query_filter=f'subcohorts.countries.name != {excluded_countries}', as_df=True) - print(cohorts.to_string()) + print(collections.head().to_string()) var_values = client.get(table='Variable values', query_filter='label != No and value != 1', as_df=True) @@ -58,7 +58,7 @@ async def main(): async with Client('https://emx2.dev.molgenis.org/', token=token) as client: # Check sign in status print(client.__repr__()) - print(client.status) + # print(client.status) # Retrieve data from a table in a schema on the server using the 'get' method # Passing non-existing schema name yields a NoSuchSchemaException @@ -76,16 +76,16 @@ async def main(): print(e) # Export the entire 'pet store' schema to a .xlsx file - # and export the 'Cohorts' table from schema 'catalogue' to a .csv file - client.export(schema='pet store', fmt='xlsx') - client.export(schema='catalogue-demo', table='Cohorts', fmt='csv') + # and export the 'Collections' table from schema 'catalogue' to a .csv file + await client.export(schema='pet store', fmt='xlsx') + await client.export(schema='catalogue', table='Collections', fmt='csv') # Connect to server with a default schema specified with Client('https://emx2.dev.molgenis.org/', schema='pet store', token=token) as client: print(client.__repr__()) - client.export(fmt='csv') - client.export(table='Pet', fmt='csv') - client.export(table='Pet', fmt='xlsx') + await client.export(fmt='csv') + await client.export(table='Pet', fmt='csv') + await client.export(table='Pet', fmt='xlsx') # Retrieving data from table Pet as a list data = client.get(table='Pet') # get Pets @@ -217,13 +217,14 @@ async def main(): with Client('https://emx2.dev.molgenis.org/', token=token) as client: # Create a schema try: - client.create_schema(name='myNewSchema') + schema_create = asyncio.create_task(client.create_schema(name='myNewSchema')) print(client.schema_names) except (GraphQLException, PermissionDeniedException) as e: print(e) # Update the description try: + await schema_create client.update_schema(name='myNewSchema', description='I forgot the description') print(client.schema_names) print(client.schemas) @@ -232,14 +233,15 @@ async def main(): # Recreate the schema: delete and create try: - client.recreate_schema(name='myNewSchema') + schema_recreate = asyncio.create_task(client.recreate_schema(name='myNewSchema')) print(client.schema_names) except (GraphQLException, NoSuchSchemaException) as e: print(e) # Delete the schema try: - client.delete_schema(name='myNewSchema') + await schema_create + await asyncio.create_task(client.delete_schema(name='myNewSchema')) print(client.schema_names) except (GraphQLException, NoSuchSchemaException) as e: print(e) @@ -250,24 +252,24 @@ async def main(): catalogue_schema = Client('https://emx2.dev.molgenis.org/').get_schema_metadata('catalogue') # Find the tables inheriting from the 'Resources' table - resource_children = catalogue_schema.get_tables(by='inheritName', value='Resources') + resource_children = catalogue_schema.get_tables(by='inheritName', value='Collections') - print("Tables in the schema inheriting from the 'Resources' table.") + print("Tables in the schema inheriting from the 'Collections' table.") for res_chi in resource_children: print(f"{res_chi!s}\n{res_chi!r}") print("\n") - # Find the Cohorts table - cohorts = catalogue_schema.get_table(by='name', value='Cohorts') + # Find the table + collections = catalogue_schema.get_table(by='name', value='Collections') - # Find the columns in the Cohorts table referencing the Organisations table - orgs_refs = cohorts.get_columns(by='refTableName', value='Organisations') + # Find the columns in the Collections table referencing the Organisations table + orgs_refs = collections.get_columns(by='refTableName', value='Organisations') - # Find the columns in the Cohorts table referencing the Organisations table in a reference array - orgs_array_refs = cohorts.get_columns(by=['columnType', 'refTableName'], value=['REF_ARRAY', 'Organisations']) + # Find the columns in the Collections table referencing the Organisations table in a reference array + orgs_array_refs = collections.get_columns(by=['columnType', 'refTableName'], value=['REF_ARRAY', 'Collection organisations']) # Print the __str__ and __repr__ representations of these columns - print("Columns in the Cohorts table referencing the Organisations table in an array.") + print("Columns in the Collections table referencing the Collection organisations table in an array.") for orgs_ref in orgs_array_refs: print(f"{orgs_ref!s}\n{orgs_ref!r}\n") diff --git a/tools/pyclient/src/molgenis_emx2_pyclient/client.py b/tools/pyclient/src/molgenis_emx2_pyclient/client.py index bd539cb22f..b8753411a5 100644 --- a/tools/pyclient/src/molgenis_emx2_pyclient/client.py +++ b/tools/pyclient/src/molgenis_emx2_pyclient/client.py @@ -247,7 +247,7 @@ def save_schema(self, table: str, name: str = None, file: str = None, data: list log.error("Failed to import data into %s::%s\n%s", current_schema, table, errors) raise PyclientException(errors) - def upload_file(self, file_path: str | pathlib.Path, schema: str = None): + async def upload_file(self, file_path: str | pathlib.Path, schema: str = None): """Uploads a file to a database on the EMX2 server. :param file_path: the path where the file is located. @@ -294,49 +294,9 @@ def upload_file(self, file_path: str | pathlib.Path, schema: str = None): # Catch process URL process_id = response.json().get('id') - # Report subtask progress - p_response = self.session.post( - url=self.api_graphql, - json={'query': queries.task_status(process_id)} - ) - if p_response.status_code != 200: - raise PyclientException("Error uploading file") + # Report on task progress + await self._report_task_progress(process_id) - reported_tasks = [] - task = p_response.json().get('data').get('_tasks')[0] - while (status := task.get('status')) != 'COMPLETED': - if status == 'ERROR': - # TODO improve error handling - raise PyclientException(f"Error uploading file: {task.get('description')}") - subtasks = task.get('subTasks', []) - for st in subtasks: - if st['id'] not in reported_tasks and st['status'] == 'RUNNING': - log.info(f"Subtask: {st['description']}") - reported_tasks.append(st['id']) - if st['id'] not in reported_tasks and st['status'] == 'SKIPPED': - log.warning(f" Subtask: {st['description']}") - reported_tasks.append(st['id']) - for sst in st.get('subTasks', []): - if sst['id'] not in reported_tasks and sst['status'] == 'COMPLETED': - log.info(f" Subsubtask: {sst['description']}") - reported_tasks.append(sst['id']) - if sst['id'] not in reported_tasks and sst['status'] == 'SKIPPED': - log.warning(f" Subsubtask: {sst['description']}") - reported_tasks.append(sst['id']) - try: - p_response = self.session.post( - url=self.api_graphql, - json={'query': queries.task_status(process_id)} - ) - task = p_response.json().get('data').get('_tasks')[0] - except AttributeError: - time.sleep(1) - p_response = self.session.post( - url=self.api_graphql, - json={'query': queries.task_status(process_id)} - ) - task = p_response.json().get('data').get('_tasks')[0] - log.info(f"Completed task: {task.get('description')}") def _upload_csv(self, file_path: pathlib.Path, schema: str) -> str: """Uploads the CSV file from the filename to the schema. Returns the success or error message.""" @@ -450,7 +410,7 @@ def get(self, table: str, query_filter: str = None, schema: str = None, as_df: b return response_data.to_dict('records') return response_data - def export(self, schema: str = None, table: str = None, fmt: OutputFormat = 'csv'): + async def export(self, schema: str = None, table: str = None, fmt: OutputFormat = 'csv'): """Exports data from a schema to a file in the desired format. :param schema: the name of the schema @@ -515,10 +475,10 @@ def export(self, schema: str = None, table: str = None, fmt: OutputFormat = 'csv file.write(response.content) log.info("Exported data from table %s in schema %s to '%s'.", table, current_schema, filename) - def create_schema(self, name: str = None, + async def create_schema(self, name: str = None, description: str = None, template: str = None, - include_demo_data: bool = None): + include_demo_data: bool = False): """Creates a new schema on the EMX2 server. :param name: the name of the new schema @@ -550,10 +510,18 @@ def create_schema(self, name: str = None, mutation='createSchema', fallback_error_message=f"Failed to create schema {name!r}" ) + + # Catch process URL + process_id = response.json().get('data').get('createSchema').get('taskId') + + if process_id: + # Report on task progress + await self._report_task_progress(process_id) + self.schemas = self.get_schemas() log.info(f"Created schema {name!r}") - def delete_schema(self, name: str = None): + async def delete_schema(self, name: str = None): """Deletes a schema from the EMX2 server. :param name: the name of the new schema @@ -612,7 +580,7 @@ def update_schema(self, name: str = None, description: str = None): ) self.schemas = self.get_schemas() - def recreate_schema(self, name: str = None, + async def recreate_schema(self, name: str = None, description: str = None, template: str = None, include_demo_data: bool = None): @@ -640,8 +608,8 @@ def recreate_schema(self, name: str = None, schema_description = description if description else schema_meta.get('description', None) try: - self.delete_schema(name=current_schema) - self.create_schema( + await self.delete_schema(name=current_schema) + await self.create_schema( name=current_schema, description=schema_description, template=template, @@ -877,6 +845,59 @@ def set_schema(self, name: str) -> str: return name + async def _report_task_progress(self, process_id: int | str): + """Reports on the progress of a task and its subtasks.""" + + # Report subtask progress + p_response = self.session.post( + url=self.api_graphql, + json={'query': queries.task_status(process_id)} + ) + + reported_tasks = [] + task = p_response.json().get('data').get('_tasks')[0] + while (status := task.get('status')) != 'COMPLETED': + if status == 'ERROR': + raise PyclientException(f"Error uploading file: {task.get('description')}") + subtasks = task.get('subTasks', []) + for st in subtasks: + if st['id'] not in reported_tasks and st['status'] == 'RUNNING': + log.info(f"{st['description']}") + reported_tasks.append(st['id']) + if st['id'] not in reported_tasks and st['status'] == 'SKIPPED': + log.warning(f" {st['description']}") + reported_tasks.append(st['id']) + for sst in st.get('subTasks', []): + if sst['id'] not in reported_tasks and sst['status'] == 'COMPLETED': + log.info(f" {sst['description']}") + reported_tasks.append(sst['id']) + if sst['id'] not in reported_tasks and sst['status'] == 'SKIPPED': + log.warning(f" {sst['description']}") + reported_tasks.append(sst['id']) + for ssst in sst.get('subTasks', []): + if ssst['id'] not in reported_tasks and ssst['status'] == 'COMPLETED': + log.info(f" {ssst['description']}") + reported_tasks.append(ssst['id']) + if ssst['id'] not in reported_tasks and ssst['status'] == 'SKIPPED': + log.warning(f" {ssst['description']}") + reported_tasks.append(ssst['id']) + try: + p_response = self.session.post( + url=self.api_graphql, + json={'query': queries.task_status(process_id)} + ) + task = p_response.json().get('data').get('_tasks')[0] + except AttributeError as ae: + log.debug(ae) + time.sleep(1) + p_response = self.session.post( + url=self.api_graphql, + json={'query': queries.task_status(process_id)} + ) + task = p_response.json().get('data').get('_tasks')[0] + log.info(f"Completed task: {task.get('description')}") + + def _validate_graphql_response(self, response: Response, mutation: str = None, fallback_error_message: str = None): """Validates a GraphQL response and prints the appropriate message. @@ -902,7 +923,10 @@ def _validate_graphql_response(self, response: Response, mutation: str = None, f raise PermissionDeniedException(f"Transaction failed: permission denied.") if 'Graphql API error' in response.text: msg = response.json().get("errors", [])[0].get('message') + log.error(msg) raise GraphQLException(msg) + msg = response.json().get("errors", [])[0].get('message', '') + log.error(msg) raise PyclientException("An unknown error occurred when trying to reach this server.") if response.request.method == 'GET': @@ -981,4 +1005,3 @@ def _validate_url(self): except requests.exceptions.MissingSchema: raise ServerNotFoundError(f"Invalid URL {self.url!r}. " f"Perhaps you meant 'https://{self.url}'?") -