-
Notifications
You must be signed in to change notification settings - Fork 0
/
parse_features
executable file
·198 lines (166 loc) · 6.47 KB
/
parse_features
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
#!/usr/bin/env python3
import os
import sys
import argparse
import re
from functools import reduce
from glob import glob
import yaml
import networkx
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--feature-dir", default="features")
parser.add_argument("--features", type=lambda arg: set([f for f in arg.split(",") if f]))
parser.add_argument("--ignore", type=lambda arg: set([f for f in arg.split(",") if f]), default=set())
parser.add_argument("--cname")
parser.add_argument("--arch")
parser.add_argument("--version")
parser.add_argument("--default-arch")
parser.add_argument("--default-version")
args_type_allowed = [
"cname",
"features",
"platforms",
"flags",
"elements",
"arch",
"version"
]
parser.add_argument("type", nargs="?", choices=args_type_allowed, default="cname")
args = parser.parse_args()
assert bool(args.features) ^ bool(args.cname), "please provide either `--features` or `--cname` argument"
if args.cname:
search = re.search("^([a-z][a-zA-Z0-9_-]*?)(-(amd64|arm64)(-([a-z0-9.]+))?)?$", args.cname)
assert search, f"not a valid cname {args.cname}"
matches = search.groups()
input_cname_base = matches[0]
arch = matches[2]
version = matches[4]
input_features = reverse_cname_base(input_cname_base)
else:
input_features = args.features
if args.arch:
arch = args.arch
if args.version:
version = args.version
if not arch and (args.type == "cname" or args.type == "arch"):
assert args.default_arch, "architecture not specified and no default architecture set"
arch = args.default_arch
if not version and (args.type == "cname" or args.type == "version"):
assert args.default_version, "version not specified and no default version set"
version = args.default_version
if args.type == "arch":
print(arch)
sys.exit(0)
elif args.type == "version":
print(version)
sys.exit(0)
feature_graph = read_feature_files(args.feature_dir)
ignore_set = args.ignore
ignore_filter_func = lambda node : node not in ignore_set
feature_graph = networkx.subgraph_view(feature_graph, filter_node=ignore_filter_func)
for feature in input_features:
assert feature in feature_graph.nodes(), f"feature {feature} not found"
graph = filter_graph(feature_graph, input_features)
features = reverse_sort_nodes(graph)
features_by_type = dict()
for type in [ "platform", "element", "flag" ]:
features_by_type[type] = [feature for feature in features if get_node_type(graph.nodes[feature]) == type]
if len(features_by_type["platform"]) != 1:
print("warning: number of platforms != 1", file=sys.stderr)
sorted_features = sort_nodes(graph)
minimal_feature_set = get_minimal_feature_set(graph)
sorted_minimal_features = sort_set(minimal_feature_set, sorted_features)
cname_base = get_cname_base(sorted_minimal_features)
cname = f"{cname_base}-{arch}-{version}"
if args.type == "cname":
print(cname)
elif args.type == "features":
print(",".join(features))
elif args.type == "platforms":
print(",".join(features_by_type["platform"]))
elif args.type == "elements":
print(",".join(features_by_type["element"]))
elif args.type == "flags":
print(",".join(features_by_type["flag"]))
def get_local_bin(name):
directory = os.path.dirname(os.path.realpath(__file__))
return f"{directory}/{name}"
def popen_read(command):
f = os.popen(command)
output = f.readline().rstrip("\n")
f.close()
return output
def reverse_cname_base(cname):
cname = cname.replace("_", "-_")
return set(cname.split("-"))
def get_cname_base(sorted_features):
return reduce(lambda a, b : a + ("-" if not b.startswith("_") else "") + b, sorted_features)
def get_minimal_feature_set(graph):
return set([node for (node, degree) in graph.in_degree() if degree == 0])
def filter_graph(feature_graph, feature_set, ignore_excludes=False):
filter_set = set(feature_graph.nodes())
filter_func = lambda node : node in filter_set
graph = networkx.subgraph_view(feature_graph, filter_node=filter_func)
graph_by_edge = dict()
for attr in [ "include", "exclude" ]:
edge_filter_func = (lambda attr : lambda a, b : graph.get_edge_data(a, b)["attr"] == attr)(attr)
graph_by_edge[attr] = networkx.subgraph_view(graph, filter_edge=edge_filter_func)
while True:
include_set = feature_set.copy()
for feature in feature_set:
include_set.update(networkx.descendants(graph_by_edge["include"], feature))
filter_set = include_set
if ignore_excludes:
break
exclude_list = []
for node in networkx.lexicographical_topological_sort(graph):
for exclude in graph_by_edge["exclude"].successors(node):
exclude_list.append(exclude)
if not exclude_list:
break
exclude = exclude_list[0]
assert exclude not in feature_set, f"excluding explicitly included feature {exclude}, unsatisfiable condition"
filter_set.remove(exclude)
assert (not graph_by_edge["exclude"].edges()) or ignore_excludes
return graph
def read_feature_files(feature_dir):
feature_yaml_files = glob(f"{feature_dir}/*/info.yaml")
features = [parse_feature_yaml(i) for i in feature_yaml_files]
feature_graph = networkx.DiGraph()
for feature in features:
feature_graph.add_node(feature["name"], content=feature["content"])
for node in feature_graph.nodes():
node_features = get_node_features(feature_graph.nodes[node])
for attr in node_features:
if attr not in [ "include", "exclude" ]:
continue
for ref in node_features[attr]:
feature_graph.add_edge(node, ref, attr=attr)
assert networkx.is_directed_acyclic_graph(feature_graph)
return feature_graph
def parse_feature_yaml(feature_yaml_file):
assert os.path.basename(feature_yaml_file) == "info.yaml"
name = os.path.basename(os.path.dirname(feature_yaml_file))
content = yaml.load(open(feature_yaml_file), Loader=yaml.FullLoader)
return { "name": name, "content": content }
def sort_set(input_set, order_list):
return [item for item in order_list if item in input_set]
def sort_key(graph, node):
prefix_map = { "platform": "0", "element": "1", "flag": "2" }
node_type = get_node_type(graph.nodes.get(node, {}))
prefix = prefix_map[node_type]
return f"{prefix}-{node}"
def sort_nodes(graph):
key_lambda = lambda node : sort_key(graph, node)
return list(networkx.lexicographical_topological_sort(graph, key=key_lambda))
def reverse_sort_nodes(graph):
reverse_graph = graph.reverse()
assert networkx.is_directed_acyclic_graph(reverse_graph)
return sort_nodes(reverse_graph)
def get_node_type(node):
return node.get("content", {}).get("type")
def get_node_features(node):
return node.get("content", {}).get("features", {})
if __name__ == "__main__":
main()