-
Notifications
You must be signed in to change notification settings - Fork 13
/
parse_modulemd.py
executable file
·212 lines (177 loc) · 6.69 KB
/
parse_modulemd.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
#!/usr/bin/python3
import click
from os.path import basename
from pdc_client import PDCClient
import yaml
from datetime import date
try:
from yaml import CLoader as Loader, CDumper as Dumper
except ImportError:
from yaml import Loader, Dumper
def get_modulemd(module_name, stream):
"""
Check if module and stream are built successfully on PDC server
"""
pdc_server = "https://pdc.fedoraproject.org/rest_api/v1/unreleasedvariants"
#Using develop=True to not authenticate to the server
pdc_session = PDCClient(pdc_server, ssl_verify=True, develop=True)
pdc_query = dict(
variant_id = module_name,
variant_version = stream,
fields="modulemd",
ordering="variant_release",
#active=True returns only succesful builds
active = True,
)
try:
mod_info = pdc_session(**pdc_query)
except Exception as ex:
raise IOError("Could not query PDC server for %s (stream: %s) - %s" % (
module_name, stream, ex))
if not mod_info or "results" not in mod_info.keys() or not mod_info["results"]:
raise IOError("%s (stream: %s) is not available on PDC" % (
module_name, stream))
return yaml.load(mod_info["results"][-1]["modulemd"], Loader=Loader)
def _get_module_build_deps(name, ref, modulemd=None):
if not modulemd:
modulemd = get_modulemd(name, ref)
try:
module_deps = modulemd['data']['dependencies']['buildrequires']
except KeyError:
# This module has no dependencies
return {}
return module_deps
def _get_module_deps(name, ref, modulemd=None):
if not modulemd:
modulemd = get_modulemd(name, ref)
try:
module_deps = modulemd['data']['dependencies']['requires']
except KeyError:
# This module has no dependencies
return {}
return module_deps
def _get_recursive_module_deps(deps, name, ref):
if name in deps and deps[name] == ref:
return
if name in deps and not deps['name'] == ref:
raise TypeError("Conflicting refs for {}".format(name))
deps[name] = ref
module_deps = _get_module_deps(name, ref)
for depname, depref in module_deps.items():
_get_recursive_module_deps(deps, depname, depref)
@click.group()
@click.option('--module', default='base-runtime',
help='The module to get the API from')
@click.option('--modulemd', default=None,
help='Path to module metadata YAML on the local filesystem')
@click.option('--ref', default='f26',
help='The ref of the module to retrieve')
@click.pass_context
def cli(ctx, module, modulemd, ref):
if modulemd:
with open(modulemd, 'r') as modulemd_file:
ctx.obj["modulemd"] = yaml.load(open(modulemd, 'r'))
try:
ctx.obj["name"] = ctx.obj["modulemd"]['data']['name']
except KeyError:
# If the modulemd doesn't list the name, assume that the
# filename does
ctx.obj["name"] = basename(modulemd).rsplit('.', 1)[0]
else:
ctx.obj["modulemd"] = get_modulemd(module, ref)
ctx.obj["name"] = module
ctx.obj["ref"] = ref
@cli.command()
@click.pass_context
def get_api(ctx):
for rpm in sorted(ctx.obj["modulemd"]['data']['api']['rpms']):
print(rpm)
@cli.command()
@click.pass_context
def get_name(ctx):
print(ctx.obj['name'])
@cli.command()
@click.option('--recursive/--no-recursive', default=True,
help='Whether to get all of the dependencies of dependencies')
@click.pass_context
def get_deps(ctx, recursive):
deps = _get_module_deps(ctx.obj["name"],
ctx.obj["ref"],
ctx.obj["modulemd"])
if recursive:
for depname, depref in deps.items():
_get_recursive_module_deps(deps, depname, depref)
for dep in sorted(deps):
print("{}:{}".format(dep,deps[dep]))
@cli.command()
@click.pass_context
def get_build_deps(ctx):
deps = _get_module_build_deps(ctx.obj["name"],
ctx.obj["ref"],
ctx.obj["modulemd"])
# Get all the runtime dependencies for these build-deps
for depname, depref in deps.items():
_get_recursive_module_deps(deps, depname, depref)
for dep in sorted(deps):
print("{}:{}".format(dep,deps[dep]))
@cli.command()
@click.pass_context
def get_component_rpms(ctx):
modulemd = ctx.obj["modulemd"]
for pkg in modulemd['data']['components']['rpms'].keys():
print(pkg)
@cli.command()
@click.pass_context
@click.argument('hash-file', nargs=1, type=click.File('r'))
@click.argument('output-file', nargs=1, type=click.File('w', atomic=True))
def update_module_hashes(ctx, hash_file, output_file):
"""
Update the hashes in a modulemd
The HASH_FILE must be in the format <pkgname>#<dist-git hash>, one per line
"""
datestamp = date.today().isoformat()
current_pkgs = []
for line in hash_file:
(pkgname, githash) = line.rstrip().split('#')
current_pkgs.append(pkgname)
try:
old_hash = ctx.obj['modulemd']['data']['components']['rpms'][pkgname]['ref']
except KeyError as e:
print("DEBUG: Adding {}".format(e))
# This package doesn't exist yet. Create it.
ctx.obj['modulemd']['data']['components']['rpms'][pkgname] = dict(
ref=githash,
rationale="Added by Base Runtime tools on {}".format(datestamp)
)
old_hash = ctx.obj['modulemd']['data']['components']['rpms'][pkgname]['ref']
if old_hash == githash:
continue
ctx.obj['modulemd']['data']['components']['rpms'][pkgname]['rationale'] = \
"Autogenerated by Base Runtime tools on {}".format(datestamp)
ctx.obj['modulemd']['data']['components']['rpms'][pkgname]['ref'] = \
githash
for pkg in ctx.obj['modulemd']['data']['components']['rpms'].keys():
if pkg not in current_pkgs:
print("DEBUG: {} in original modulemd but not in hash-file".format(pkg))
output_file.write(yaml.dump(ctx.obj['modulemd'],
Dumper=Dumper,
default_flow_style=False))
@cli.command()
@click.argument('output-file', nargs=1, type=click.File('w', atomic=True))
def reflow_modulemd(hash_file, output_file):
"""
Read in a modulemd and write it back out with standard PyYAML ordering and
layout.
"""
modulemd = yaml.load(hash_file, Loader=Loader)
output_file.write(
yaml.dump(
modulemd,
Dumper=Dumper,
default_flow_style=False
)
)
def main():
cli(obj={})
if __name__ == "__main__":
main()