diff --git a/.gitignore b/.gitignore index bc64a02e..f8a4177c 100644 --- a/.gitignore +++ b/.gitignore @@ -50,6 +50,9 @@ cover/ # Testing artifacts *query.csv +dsi_parquet_driver_output.ipynb +*.ipynb_checkpoints +parquet.data # Misc .vscode diff --git a/docs/core.rst b/docs/core.rst index e05294ad..cc64db60 100644 --- a/docs/core.rst +++ b/docs/core.rst @@ -2,30 +2,85 @@ Core =================== The DSI Core middleware defines the Terminal concept. An instantiated Terminal is the human/machine DSI interface. The person setting up a Core Terminal only needs to know how they want to ask questions, and what metadata they want to ask questions about. If they don’t see an option to ask questions the way they like, or they don’t see the metadata they want to ask questions about, then they should ask a Driver Contributor or a Plugin Contributor, respectively. -A Core Terminal is a home for Plugins, and an interface for Drivers. A Core Terminal is instantiated with a set of default Plugins and Drivers, but they must be loaded before a user query is attempted:: +A Core Terminal is a home for Plugins, and an interface for Drivers. A Core Terminal is instantiated with a set of default Plugins and Drivers, but they must be loaded before a user query is attempted. Here's an example of how you might work with DSI using an interactive Python interpreter for your data science workflows:: >>> from dsi.core import Terminal ->>> a = Terminal() +>>> a=Terminal() >>> a.list_available_modules('plugin') ->>> # [ Hostname, SystemKernel, Bueno ] ->>> a.list_available_modules('driver') ->>> # [ Gufi, Sqlite ] ->>> a.load_module('plugin', 'Bueno','consumer') +>>> # ['Bueno', 'Hostname', 'SystemKernel'] +>>> a.load_module('plugin','Bueno','consumer',filename='./data/bueno.data') >>> # Bueno plugin consumer loaded successfully. ->>> a.load_module('driver', 'Sqlite','front-end', filename='bogus') # Filename param requirement will be removed. ->>> # Sqlite driver front-end loaded successfully. ->>> a.load_module('driver', 'Sqlite','back-end',filename='yadda') ->>> # Sqlite driver back-end loaded successfully. +>>> a.load_module('plugin','Hostname','producer') +>>> # Hostname plugin producer loaded successfully. +>>> a.list_loaded_modules() +>>> # {'producer': [], +>>> # 'consumer': [], +>>> # 'front-end': [], +>>> # 'back-end': []} + + +At this point, you might decide that you are ready to collect data for inspection. It is possible to utilize DSI Drivers to load additional metadata to supplement your Plugin metadata, but you can also sample Plugin data and search it directly. + + +The process of transforming a set of Plugin producers and consumers into a querable format is called transloading. A DSI Core Terminal has a ``transload()`` method which may be called to execute all Plugins at once:: + +>>> a.transload() +>>> a.active_metadata +>>> # OrderedDict([('uid', [1000]), ('effective_gid', [1000]), ('moniker', ['qwofford'])... + +Once a Core Terminal has been transloaded, no further Plugins may be added. However, the transload method can be used to samples of each plugin as many times as you like:: + +>>> a.transload() +>>> a.transload() +>>> a.transload() +>>> a.active_metadata +>>> # OrderedDict([('uid', [1000, 1000, 1000, 1000]), ('effective_gid', [1000, 1000, 1000... + +If you perform data science tasks using Python, it is not necessary to create a DSI Core Terminal front-end because the data is already in a Python data structure. If your data science tasks can be completed in one session, it is not required to interact with DSI Drivers. However, if you do want to save your work, you can load a DSI Driver with a back-end function:: + +>>> a.list_available_modules('driver') +>>> # ['Gufi', 'Sqlite', 'Parquet'] +>>> a.load_module('driver','Parquet','back-end',filename='parquet.data') +>>> # Parquet driver back-end loaded successfully. +>>> a.list_loaded_modules() +>>> # {'producer': [], +>>> # 'consumer': [], +>>> # 'front-end': [], +>>> # 'back-end': []} +>>> a.artifact_handler(interaction_type='put') + +The contents of the active DSI Core Terminal metadata storage will be saved to a Parquet object at the path you provided at module loading time. + +It is possible that you prefer to perform data science tasks using a higher level abstraction than Python itself. This is the purpose of the DSI Driver front-end functionality. Unlike Plugins, Drivers can be added after the initial ``transload()`` operation has been performed:: + +>>> a.load_module('driver','Parquet','front-end',filename='parquet.data') +>>> # Parquet driver front-end loaded successfully. +>>> a.list_loaded_modules() +>>> # {'producer': [], +>>> # 'consumer': [], +>>> # 'front-end': [], +>>> # 'back-end': []} + +Any front-end may be used, but in this case the Parquet driver has a front-end implementation which builds a jupyter notebook from scratch that loads your metadata collection into a Pandas Dataframe. The Parquet front-end will then launch the Jupyter Notebook to support an interactive data science workflow:: + +>>> a.artifact_handler(interaction_type='inspect') +>>> # Writing Jupyter notebook... +>>> # Opening Jupyter notebook... + +.. image:: jupyter_frontend.png + :scale: 33% + +You can then close your Jupyter notebook, ``transload()`` additionally to increase your sample size, and use the interface to explore more data. + +Although this demonstration only used one Plugin per Plugin functionality, any number of plugins can be added to collect an arbitrary amount of queriable metadata:: + +>>> a.load_module('driver','Parquet','front-end',filename='parquet.data') +>>> # Parquet driver front-end loaded successfully. >>> a.list_loaded_modules() ->>> { ->>> 'producer': [], ->>> 'consumer': [], ->>> 'front-end': [], ->>> 'back-end': [] ->>> } ->>> a.execute() # Not yet implemented - -It is the Plugin contributor's responsibility to make sure consumer or producer functions succeed or report ``NotImplementedError``. It is the Driver contributor's responsiblity to make sure the front-end or back-end functions succeed or report ``NotImplementedError``. +>>> # {'producer': [, ], +>>> # 'consumer': [], +>>> # 'front-end': [], +>>> # 'back-end': []} .. automodule:: dsi.core :members: diff --git a/docs/drivers.rst b/docs/drivers.rst index 2f2d3363..d59d0f42 100644 --- a/docs/drivers.rst +++ b/docs/drivers.rst @@ -15,3 +15,7 @@ Drivers have front-end and back-end functions. Drivers connect users to DSI Core .. automodule:: dsi.drivers.gufi :members: +.. automodule:: dsi.drivers.parquet + :members: + + diff --git a/docs/index.rst b/docs/index.rst index e0c96cbd..5b3df85d 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -13,7 +13,6 @@ The Data Science Infrastructure Project (DSI) introduction installation - quickstart core plugins drivers diff --git a/docs/jupyter_frontend.png b/docs/jupyter_frontend.png new file mode 100644 index 00000000..11eadba6 Binary files /dev/null and b/docs/jupyter_frontend.png differ diff --git a/docs/quickstart.rst b/docs/quickstart.rst deleted file mode 100644 index bd891748..00000000 --- a/docs/quickstart.rst +++ /dev/null @@ -1,3 +0,0 @@ -Quickstart -================ -Just a little quickstart document diff --git a/dsi/core.py b/dsi/core.py index bbf27344..a8d5f783 100644 --- a/dsi/core.py +++ b/dsi/core.py @@ -19,7 +19,7 @@ class Terminal(): VALID_DRIVERS = ['Gufi','Sqlite','Parquet'] VALID_MODULES = VALID_PLUGINS + VALID_DRIVERS VALID_MODULE_FUNCTIONS = {'plugin':['producer','consumer'],'driver':['front-end','back-end']} - VALID_ARTIFACT_INTERACTION_TYPES = ['get','set','put'] + VALID_ARTIFACT_INTERACTION_TYPES = ['get','set','put', 'inspect'] def __init__(self): # Helper function to get parent module names. @@ -121,17 +121,7 @@ def transload(self, **kwargs): self.active_metadata[col_name] = col_metadata self.transload_lock=True - - def user_handler(self, **kwargs): - """ - The user_handler prepares a human/machine interface. - - A Driver implements the user_handler experience, defining how the user interacts with the - `self.active_metadata` currently loaded in the DSI Core Terminal. - """ - pass - - def artifact_handler(self,interaction_type,**kwargs): + def artifact_handler(self,interaction_type, **kwargs): """ Store or retrieve using all loaded DSI Drivers with back-end functionality. @@ -142,15 +132,28 @@ def artifact_handler(self,interaction_type,**kwargs): if interaction_type not in self.VALID_ARTIFACT_INTERACTION_TYPES: print('Hint: Did you declare your artifact interaction type in the Terminal Global vars?') raise NotImplementedError + operation_success = False + # Perform artifact movement first, because inspect implementation may rely on + # self.active_metadata or some stored artifact. selected_function_modules = dict((k,self.active_modules[k]) for k in (['back-end'])) for module_type, objs in selected_function_modules.items(): for obj in objs: if interaction_type=='put' or interaction_type=='set': obj.put_artifacts(collection=self.active_metadata, **kwargs) + operation_success = True elif interaction_type=='get': self.active_metadata=obj.get_artifacts(**kwargs) - else: - print('Hint: Did you implement a case for your artifact interaction in the artifact_handler loop?') - raise NotImplementedError + operation_success = True + if interaction_type=='inspect': + for module_type, objs in selected_function_modules.items(): + for obj in objs: + obj.put_artifacts(collection=self.active_metadata, **kwargs) + self.active_metadata=obj.inspect_artifacts(collection=self.active_metadata, **kwargs) + operation_success = True + if operation_success: + return + else: + print('Hint: Did you implement a case for your artifact interaction in the artifact_handler loop?') + raise NotImplementedError diff --git a/dsi/drivers/filesystem.py b/dsi/drivers/filesystem.py index f788929c..84151134 100644 --- a/dsi/drivers/filesystem.py +++ b/dsi/drivers/filesystem.py @@ -20,9 +20,13 @@ def put_artifacts(self, artifacts, kwargs) -> None: @abstractmethod def get_artifacts(self, query): pass + + @abstractmethod + def inspect_artifacts(self): + pass class Filesystem(Driver): - git_commit_sha='c8d495cd58ac9abf9f43f5c89f1ea84a89699bf4' + git_commit_sha='349706c7208d7ae6685262b77989002e434cd0b3' # Declare named types DOUBLE = "DOUBLE" STRING = "VARCHAR" @@ -47,3 +51,6 @@ def put_artifacts(self, artifacts, kwargs) -> None: def get_artifacts(self, query): pass + + def inspect_artifacts(self): + pass diff --git a/dsi/drivers/parquet.py b/dsi/drivers/parquet.py index 067baf17..925dc692 100644 --- a/dsi/drivers/parquet.py +++ b/dsi/drivers/parquet.py @@ -1,5 +1,7 @@ import pyarrow as pa from pyarrow import parquet as pq +import nbformat as nbf +import subprocess from dsi.drivers.filesystem import Filesystem @@ -29,4 +31,50 @@ def put_artifacts(self, collection): table = pa.table(collection) pq.write_table(table, self.filename, compression=self.compression) + @staticmethod + def get_cmd_output(cmd: list) -> str: + """ + Runs a given command and returns the stdout if successful. + If stderr is not empty, an exception is raised with the stderr text. + """ + proc = subprocess.run(cmd, capture_output=True, shell=True) + if proc.stderr != b"": + raise Exception(proc.stderr) + return proc.stdout.strip().decode("utf-8") + + def inspect_artifacts(self, collection): + """Populate a Jupyter notebook with tools required to look at Parquet data.""" + nb = nbf.v4.new_notebook() + text = """\ + # This notebook was auto-generated by a DSI Driver for Parquet. + # Execute the Jupyter notebook cells below and interact with "df" + # to explore your data. + """ + code1 = """\ + import pandas as pd + df = pd.read_parquet('{}') + df.head() + """.format(self.filename) + + code2 = """\ + df.info() + """.format(self.filename) + + code3 = """\ + df.describe() + """.format(self.filename) + + nb['cells'] = [nbf.v4.new_markdown_cell(text), + nbf.v4.new_code_cell(code1), + nbf.v4.new_code_cell(code2), + nbf.v4.new_code_cell(code3)] + + fname = 'dsi_parquet_driver_output.ipynb' + + print('Writing Jupyter notebook...') + with open(fname, 'w') as fh: + nbf.write(nb, fh) + + print('Opening Jupyter notebook...') + self.get_cmd_output(cmd=['jupyter-lab ./dsi_parquet_driver_output.ipynb']) diff --git a/dsi/plugins/env.py b/dsi/plugins/env.py index 0565f20f..b8500726 100644 --- a/dsi/plugins/env.py +++ b/dsi/plugins/env.py @@ -56,10 +56,10 @@ class Bueno(Environment): are delimited by ``:``. Keyval pairs are delimited by ``\\n``. """ - def __init__(self, path, **kwargs) -> None: + def __init__(self, filename, **kwargs) -> None: super().__init__() self.bueno_data = OrderedDict() - self.path = path + self.filename = filename def pack_header(self) -> None: """Set schema with POSIX and Bueno data.""" @@ -69,7 +69,7 @@ def pack_header(self) -> None: def add_row(self) -> None: """Parses environment provenance data and adds the row.""" if not self.schema_is_set(): - with open(self.path,'r') as fh: + with open(self.filename,'r') as fh: file_content=(fh.read()) rows = file_content.split('\n') drop_rows = [row for row in rows if row != ''] diff --git a/dsi/plugins/metadata.py b/dsi/plugins/metadata.py index 31aef2f8..c7a79953 100644 --- a/dsi/plugins/metadata.py +++ b/dsi/plugins/metadata.py @@ -29,7 +29,7 @@ def add_to_output(self, path): class StructuredMetadata(Plugin): """ plugin superclass that provides handy methods for structured data """ - git_commit_sha='c8d495cd58ac9abf9f43f5c89f1ea84a89699bf4' + git_commit_sha='349706c7208d7ae6685262b77989002e434cd0b3' def __init__(self): """ diff --git a/dsi/plugins/tests/test_env.py b/dsi/plugins/tests/test_env.py index 65217ae8..18047da6 100644 --- a/dsi/plugins/tests/test_env.py +++ b/dsi/plugins/tests/test_env.py @@ -45,16 +45,16 @@ def test_envprov_plugin_adds_rows(): assert len(plug.output_collector.keys()) > 100 # should def have more than 100 columns def test_bueno_plugin_type(): - plug = Bueno() path = '/'.join([get_git_root('.'),'dsi/data','bueno.data']) - plug.add_row(filename=path) + plug = Bueno(filename=path) + plug.add_row() assert type(plug.output_collector) == collections.OrderedDict def test_bueno_plugin_adds_rows(): - plug = Bueno() path = '/'.join([get_git_root('.'),'dsi/data','bueno.data']) - plug.add_row(path) - plug.add_row(path) + plug = Bueno(filename=path) + plug.add_row() + plug.add_row() for key, val in plug.output_collector.items(): assert len(val) == 2