Skip to content

Commit 130b2fb

Browse files
authored
Merge pull request #175 from CHESSComputing/foxden-writers
Add implementation for FoxdenMetaDataWriter and FoxdenMetaDataProcessor
2 parents 1bf6196 + 10ac55f commit 130b2fb

File tree

4 files changed

+212
-44
lines changed

4 files changed

+212
-44
lines changed

CHAP/foxden/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,4 @@
55
FoxdenProvenanceProcessor, FoxdenMetaDataProcessor
66
from CHAP.foxden.reader import \
77
FoxdenMetaDataReader, FoxdenProvenanceReader, FoxdenSpecScansReader
8-
from CHAP.foxden.writer import FoxdenWriter
8+
from CHAP.foxden.writer import FoxdenProvenanceWriter, FoxdenMetaDataWriter

CHAP/foxden/processor.py

Lines changed: 129 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -9,59 +9,168 @@
99

1010
# System modules
1111
from time import time
12+
import os
13+
import sys
14+
import platform
15+
import pkg_resources
16+
import subprocess
17+
import json
1218

1319
# Local modules
1420
from CHAP.processor import Processor
1521

1622
class FoxdenMetaDataProcessor(Processor):
1723
"""A Processor to communicate with FOXDEN MetaData server."""
1824

19-
def process(self, data, url, did, dryRun=False, verbose=False):
25+
def process(self, data, suffix='analysis=CHAP', verbose=False):
2026
"""FOXDEN MetaData processor
2127
2228
:param data: Input data.
2329
:type data: list[PipelineData]
24-
:param url: URL of service.
25-
:type url: str
26-
:param did: FOXDEN dataset identifier (did)
27-
:type did: string
28-
:param dryRun: `dryRun` option to verify HTTP workflow,
29-
defaults to `False`.
30-
:type dryRun: bool, optional
30+
:param suffix: did suffix to add, default 'analysis=CHAP'
31+
:type suffix: string, optional
3132
:param verbose: verbose output
3233
:type verbose: bool, optional
3334
:return: data from FOXDEN MetaData service
3435
"""
3536
t0 = time()
3637
self.logger.info(
37-
f'Executing "process" with url={url} data={data} did={did}')
38+
f'Executing "process" with data={data}')
39+
output = []
40+
for item in data:
41+
# each item in data list is a CHAP record {'name': ..., 'data': {}}
42+
for rec in item['data']: # get data part of processing item
43+
if 'did' not in rec:
44+
raise Exception('No did found in input data record')
45+
did = rec['did'] + '/' + suffix
46+
# construct analysis record
47+
rec = {'did': did, 'application': 'CHAP'}
48+
output.append(rec)
3849
self.logger.info(f'Finished "process" in {time()-t0:.3f} seconds\n')
39-
return data
50+
return output
4051

4152
class FoxdenProvenanceProcessor(Processor):
4253
"""A Processor to communicate with FOXDEN provenance server."""
43-
def process(self, data, url, did, dryRun=False, verbose=False):
54+
def process(self, data, suffix='analysis=CHAP', verbose=False):
4455
"""FOXDEN Provenance processor
4556
4657
:param data: Input data.
4758
:type data: list[PipelineData]
48-
:param url: URL of service.
49-
:type url: str
50-
:param did: FOXDEN dataset identifier (did)
51-
:type did: string
52-
:param dryRun: `dryRun` option to verify HTTP workflow,
53-
defaults to `False`.
54-
:type dryRun: bool, optional
59+
:param suffix: did suffix to add, default 'analysis=CHAP'
60+
:type suffix: string, optional
5561
:param verbose: verbose output
5662
:type verbose: bool, optional
5763
:return: data from FOXDEN provenance service
5864
"""
5965
t0 = time()
6066
self.logger.info(
61-
f'Executing "process" with url={url} data={data} did={did}')
67+
f'Executing "process" with data={data}')
68+
output = []
69+
for item in data:
70+
# each item in data list is a CHAP record {'name': ..., 'data': {}}
71+
for rec in item['data']: # get data part of processing item
72+
if 'did' not in rec:
73+
raise Exception('No did found in input data record')
74+
rec['did'] = rec['did'] + '/' + suffix
75+
rec['parent_did'] = rec['did']
76+
rec['scripts'] = [{'name': 'CHAP', 'parent_script': None, 'order_idx': 1}]
77+
rec['site'] = 'Cornell'
78+
rec['osinfo'] = osinfo()
79+
rec['environments'] = environments()
80+
rec['input_files'] = inputFiles()
81+
rec['output_files'] = outputFiles()
82+
rec['processing'] = 'CHAP pipeline'
83+
output.append(rec)
6284
self.logger.info(f'Finished "process" in {time()-t0:.3f} seconds\n')
63-
return data
85+
return output
6486

87+
def osinfo():
88+
"""
89+
Helper function to provide osinfo
90+
"""
91+
os_info = {
92+
"name": platform.system().lower() + "-" + platform.release(),
93+
"kernel": platform.version(),
94+
"version": platform.platform()
95+
}
96+
return os_info
97+
98+
def environments():
99+
"""
100+
Detects the current Python environment (system, virtualenv, or Conda) and
101+
collects package information. Returns a list of detected environments with
102+
installed packages.
103+
"""
104+
environments = []
105+
os_name = platform.system().lower() + "-" + platform.release()
106+
107+
# Check for Conda environment
108+
conda_env = os.getenv("CONDA_PREFIX")
109+
if conda_env:
110+
conda_env_name = os.getenv("CONDA_DEFAULT_ENV", "unknown-conda-env")
111+
try:
112+
# Fetch Conda packages
113+
conda_packages = subprocess.check_output(["conda", "list", "--json"], text=True)
114+
conda_packages = json.loads(conda_packages)
115+
packages = [{"name": pkg["name"], "version": pkg["version"]} for pkg in conda_packages]
116+
except Exception:
117+
packages = []
118+
119+
environments.append({
120+
"name": conda_env_name,
121+
"version": sys.version.split()[0],
122+
"details": "Conda environment",
123+
"parent_environment": None,
124+
"os_name": os_name,
125+
"packages": packages
126+
})
127+
128+
# Check for Virtualenv (excluding Conda)
129+
elif hasattr(sys, 'real_prefix') or os.getenv("VIRTUAL_ENV"):
130+
venv_name = os.path.basename(os.getenv("VIRTUAL_ENV", "unknown-venv"))
131+
packages = [
132+
{"name": pkg.key, "version": pkg.version}
133+
for pkg in pkg_resources.working_set
134+
]
135+
136+
environments.append({
137+
"name": venv_name,
138+
"version": sys.version.split()[0],
139+
"details": "Virtualenv environment",
140+
"parent_environment": None,
141+
"os_name": os_name,
142+
"packages": packages
143+
})
144+
145+
# System Python (not inside Conda or Virtualenv)
146+
else:
147+
packages = [
148+
{"name": pkg.key, "version": pkg.version}
149+
for pkg in pkg_resources.working_set
150+
]
151+
152+
environments.append({
153+
"name": "system-python",
154+
"version": sys.version.split()[0],
155+
"details": "System-wide Python",
156+
"parent_environment": None,
157+
"os_name": os_name,
158+
"packages": packages
159+
})
160+
161+
return environments
162+
163+
def inputFiles():
164+
"""
165+
Helper function to provide input files for FOXDEN
166+
"""
167+
return [{'name':'/tmp/file1.png'}, {'name': '/tmp/file2.png'}]
168+
169+
def outputFiles():
170+
"""
171+
Helper function to provide output files for FOXDEN
172+
"""
173+
return [{'name':'/tmp/file1.png'}]
65174

66175
if __name__ == '__main__':
67176
# Local modules

CHAP/foxden/reader.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ class FoxdenMetaDataReader(PipelineItem):
2424
def read(
2525
self, url, data, did='', query='', spec=None,
2626
method='GET', headers=None,
27-
scope='read', timeout=10, dryRun=False, verbose=False):
27+
scope='read', dryRun=False, verbose=False):
2828
"""Read data from FOXDEN service
2929
3030
:param url: URL of service.
@@ -44,8 +44,6 @@ def read(
4444
:type headers: dictionary, optional
4545
:param scope: FOXDEN scope to use, e.g. read or write
4646
:type scope: string
47-
:param timeout: Timeout of HTTP request, defaults to `10`.
48-
:type timeout: str, optional
4947
:param dryRun: `dryRun` option to verify HTTP workflow,
5048
defaults to `False`.
5149
:type dryRun: bool, optional
@@ -82,7 +80,7 @@ class FoxdenProvenanceReader(PipelineItem):
8280
"""FOXDEN Provenance reader reads data from specific FOXDEN Provenance service."""
8381
def read(
8482
self, url, data, did='', method='GET', headers=None,
85-
scope='read', timeout=10, dryRun=False, verbose=False):
83+
scope='read', dryRun=False, verbose=False):
8684
"""FOXDEN Provenance processor
8785
8886
:param url: URL of service.
@@ -128,7 +126,7 @@ class FoxdenSpecScansReader(PipelineItem):
128126
def read(
129127
self, url, data, did='', query='', spec=None,
130128
method='GET', headers=None,
131-
scope='read', timeout=10, dryRun=False, verbose=False):
129+
scope='read', dryRun=False, verbose=False):
132130
"""Read data from FOXDEN service
133131
134132
:param url: URL of service.
@@ -148,8 +146,6 @@ def read(
148146
:type headers: dictionary, optional
149147
:param scope: FOXDEN scope to use, e.g. read or write
150148
:type scope: string
151-
:param timeout: Timeout of HTTP request, defaults to `10`.
152-
:type timeout: str, optional
153149
:param dryRun: `dryRun` option to verify HTTP workflow,
154150
defaults to `False`.
155151
:type dryRun: bool, optional

CHAP/foxden/writer.py

Lines changed: 79 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,65 @@
1-
"""FOXDEN writer."""
1+
#!/usr/bin/env python
2+
#-*- coding: utf-8 -*-
3+
#pylint: disable=
4+
"""
5+
File : writer.py
6+
Author : Valentin Kuznetsov <vkuznet AT gmail dot com>
7+
Description: FOXDEN writers
8+
"""
29

10+
# system modules
11+
from time import time
12+
import json
13+
14+
# CHAP modules
315
from CHAP.foxden.utils import HttpRequest
16+
from CHAP.pipeline import PipelineItem
17+
18+
19+
class FoxdenMetaDataWriter(PipelineItem):
20+
"""FOXDEN writer writes data to MetaData FOXDEN service."""
21+
def write(
22+
self, url, data, method='POST', headers=None, verbose=False):
23+
"""Write data to FOXDEN Provenance service
424
25+
:param data: Input data.
26+
:type data: list[PipelineData]
27+
:param url: URL of service.
28+
:type url: str
29+
:param method: HTTP method to use, `"POST"` for creation and
30+
`"PUT"` for update, defaults to `"POST"`.
31+
:type method: str, optional
32+
:param headers: HTTP headers to use.
33+
:type headers: dictionary, optional
34+
:param verbose: verbose output
35+
:type verbose: bool, optional
36+
:return: HTTP response from FOXDEN provenance service
37+
:rtype: list with dictionary entry
38+
"""
39+
t0 = time()
40+
self.logger.info(
41+
f'Executing "process" with url={url} data={data}')
42+
# TODO: it would be useful to perform validation of data
43+
if isinstance(data, list) and len(data) == 1:
44+
data = data[0]['data'][0]
45+
if not isinstance(data, dict):
46+
raise Exception(f'Passed data={data} is not dictionary')
47+
mrec = {"Schema": "Analysis", "Record": data}
48+
payload = json.dumps(mrec)
49+
if verbose:
50+
self.logger.info(f"method=POST url={url} payload={payload}")
51+
response = HttpRequest(url, payload, method='POST', scope='write')
52+
if verbose:
53+
self.logger.info(f"code={response.status_code} data={response.text}")
54+
data = [{'code': response.status_code, 'data': response.text}]
55+
self.logger.info(f'Finished "process" in {time()-t0:.3f} seconds\n')
56+
return data
557

6-
class FoxdenWriter():
7-
"""FOXDEN writer writes data to specific FOXDEN service."""
58+
class FoxdenProvenanceWriter(PipelineItem):
59+
"""FOXDEN writer writes data to provenance FOXDEN service."""
860
def write(
9-
self, url, data, method='POST', headers=None,
10-
scope='write', timeout=10, dryRun=False):
11-
"""Write data to FOXDEN
61+
self, url, data, method='POST', headers=None, verbose=False):
62+
"""Write data to FOXDEN Provenance service
1263
1364
:param data: Input data.
1465
:type data: list[PipelineData]
@@ -18,18 +69,30 @@ def write(
1869
`"PUT"` for update, defaults to `"POST"`.
1970
:type method: str, optional
2071
:param headers: HTTP headers to use.
21-
:param scope: FOXDEN scope to use, e.g. read or write
22-
:type scope: string
2372
:type headers: dictionary, optional
24-
:param timeout: Timeout of HTTP request, defaults to `10`.
25-
:type timeout: str, optional
26-
:param dryRun: `dryRun` option to verify HTTP workflow,
27-
defaults to `False`.
28-
:type dryRun: bool, optional
29-
:return: Contents of the input data.
30-
:rtype: object
73+
:param verbose: verbose output
74+
:type verbose: bool, optional
75+
:return: HTTP response from FOXDEN provenance service
76+
:rtype: list with dictionary entry
3177
"""
32-
return HttpRequest(url, data, method, headers, scope, timeout, dryrun)
78+
t0 = time()
79+
self.logger.info(
80+
f'Executing "process" with url={url} data={data}')
81+
rurl = f'{url}/dataset'
82+
# TODO: it would be useful to perform validation of data
83+
if isinstance(data, list) and len(data) == 1:
84+
data = data[0]['data'][0]
85+
if not isinstance(data, dict):
86+
raise Exception(f'Passed data={data} is not dictionary')
87+
payload = json.dumps(data)
88+
if verbose:
89+
self.logger.info(f"method=POST url={rurl} payload={payload}")
90+
response = HttpRequest(rurl, payload, method='POST', scope='write')
91+
if verbose:
92+
self.logger.info(f"code={response.status_code} data={response.text}")
93+
data = [{'code': response.status_code, 'data': response.text}]
94+
self.logger.info(f'Finished "process" in {time()-t0:.3f} seconds\n')
95+
return data
3396

3497

3598
if __name__ == '__main__':

0 commit comments

Comments
 (0)