-
Notifications
You must be signed in to change notification settings - Fork 0
/
cg_elimit
executable file
·273 lines (212 loc) · 9.88 KB
/
cg_elimit
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
The program determines the effective value of a cgroup limit for a given cgroup or PID, meaning it returns the
value that finally is used, which might not necessarily the one configured for the cgroup the process is assigned to.
Exit codes:
0 Everything went fine.
1 Cgroup control is not supported.
3 Given PID could not be found or it's cgroup information could not be retrieved.
4 Mount data could not be retrieved.
5 Cgroup directory does not exist.
6 Reading a cgroup control file failed.
7 Comparison error.
Changelog:
2022-11-14 v0.1 First code.
2022-11-16 v0.2 First release.
"""
import os
import sys
from typing import Tuple
__author__ = 'Sören Schmidt'
__maintainer__ = __author__
__email__ = '[email protected]'
__license__ = 'GPL3'
__version__ = '0.2'
VERBOSE = False
class CgroupHandler():
supported_controls_cgroup_v1 = {'memory': ['memory.limit_in_bytes', 'memory.memsw.limit_in_bytes', 'memory.kmem.limit_in_bytes', 'memory.kmem.tcp.limit_in_bytes']}
supported_controls_cgroup_v2 = {'memory': ['memory.max','memory.high', 'memory.swap.high', 'memory.swap.max', 'memory.zswap.max'],
'pids': ['pids.max']}
@classmethod
def is_supported(cls, control : str) -> bool:
"""Checks if the control is supported."""
return control in sum(cls.supported_controls_cgroup_v1.values(), []) + sum(cls.supported_controls_cgroup_v2.values(), [])
@classmethod
def get_version(cls, control : str) -> int:
"""Returns the cgroup version for the given control.
Terminates in case of an unsupported control.
"""
if control in sum(cls.supported_controls_cgroup_v1.values(), []):
return 1
if control in sum(cls.supported_controls_cgroup_v2.values(), []):
return 2
exit_on_error(f'"{control}" is unsupported.', 1)
@classmethod
def get_controller(cls, control : str) -> Tuple[str, int]:
"""Returns the cgroup controller and the cgroup version for the given control."""
cgroup_version = cls.get_version(control)
controller = [x for x, y in [cls.supported_controls_cgroup_v1, cls.supported_controls_cgroup_v2][cgroup_version-1].items() if control in y][0]
return controller, cgroup_version
@classmethod
def cgroup_of_pid(cls, pid: int, control: str) -> str:
"""Retrieves the cgroup of the given PID from the proc filesystem or
terminates if either the PID does not exists or the cgroup
cannot be determined.
"""
# Raise exception if PID has no entry in procfs.
if not os.path.exists(f'/proc/{pid}'):
exit_on_error(f'Could not find a process with PID {pid}.', 3)
# Raise exception if the cgroup file does not exists.
try:
with open(f'/proc/{pid}/cgroup', 'r') as f:
content = f.readlines()
except Exception as err:
exit_on_error(f'Error accessing "/proc/{pid}/cgroup": {err}', 3)
# Depending on the cgroup version and controller, we pick the cgroup.
try:
if cls.get_version(control) == 2:
cgroup = [cg for ix, ctrl, cg in map(lambda x : x.split(':'), content) if ix == '0' and ctrl == ''][0].strip()
else:
controller = [x for x, y in cls.supported_controls_cgroup_v1.items() if control in y][0]
cgroup = [cg for _, ctrl, cg in map(lambda x : x.split(':'), content) if ctrl in controller][0].strip()
except:
cgroup = None
return cgroup
@classmethod
def get_mountpoint(cls, control : str) -> str:
"""Returns the mount point of the cgroupfs for a given control."""
controller, cgroup_version = cls.get_controller(control)
cgroup_fs = 'cgroup2' if cls.get_version(control) == 2 else 'cgroup'
try:
with open('/proc/mounts', 'r') as f:
cg_entries = [mp for _, mp, fs, _, _, _ in map(lambda x : x.split(), f.readlines()) if fs == cgroup_fs]
if cgroup_version == 2:
mountpoint = cg_entries[0]
else:
mountpoint = [mp for mp in cg_entries if controller in mp][0]
except Exception as err:
exit_on_error(f'Error reading "/proc/mounts": {err}', 4)
return mountpoint
@classmethod
def control_value(cls, param_path: str) -> str:
"""Reads the content of the given cgroup parameter or
terminates the program if reading failed.
"""
try:
with open(param_path, 'r') as f:
content = f.readlines()
except Exception as err:
exit_on_error(err, 6)
return ''.join(content).strip()
@classmethod
def minimum(cls, value1 : str, value2 : str) -> str:
"""Returns the minimum of two values and understands keywords
used by cgroups like "max" or "infinity".
"""
# Handle "max" and "infinity" keywords as well as None.
if value1 in ['max', 'infinity', None]:
return value2
if value2 == ['max', 'infinity', None]:
return value1
# Handle integer comparison.
try:
return min(int(value1), int(value2))
except Exception as err:
exit_on_error(err, 7)
def help() -> None:
"""Printing a short help."""
print(f'''v{__version__}\n\nUsage: {sys.argv[0]} -h|--help\n {sys.argv[0]} [-v|--verbose] CONTROL CGROUP|PID
-h|--help Print this help.
-v|--verbose Print intermediate steps and not only the result.
-l|--list List all supported cgroup control files.
CONTROL Name of the cgroup control (limit) which should be retrieved.
CGROUP Name of the cgroup for which the effective value of the control should be retrieved.
This should either the absolute path (e.g. "/sys/fs/cgroup/memory/system.slice/cron.service")
or the relative path starting from the cgroup mount point (e.g. "/system.slice/cron.service").
In any case, the path has to start with a slash.
PID PID of the process for which the effective value of the control should be retrieved.
The program determines the effective value of a cgroup limit for a given cgroup or PID. This means it returns the
value that finally is used, which might not necessarily the one configured for the cgroup the process is assigned to.
Only the retrieved value is printed to stdout. If the control is missing (no configuration), nothing is returned.
''')
def list_controls() -> None:
"""Lists all supported controls."""
print('\ncgroup v1\n---------')
for controls in CgroupHandler.supported_controls_cgroup_v1.values():
print('\n'.join(controls))
print('\ncgroup v2\n---------')
for controls in CgroupHandler.supported_controls_cgroup_v2.values():
print('\n'.join(controls))
def exit_on_error(text: str, rc : int = 1) -> None:
"""Prints text on stderr and terminates with the given exit code."""
print(text, file=sys.stderr)
sys.exit(rc)
def print_info(text: str) -> None:
"""Prints text to stdout depending on verbosity."""
if VERBOSE:
print(text)
def main():
global VERBOSE
"""
Let's first do some (cheap) argument parsing.
"""
# If help is required, we only do that.
if '-h' in sys.argv or '--help' in sys.argv:
help()
sys.exit(0)
# If listing supported controls is required, we only do that.
if '-l' in sys.argv or '--list' in sys.argv:
list_controls()
sys.exit(0)
# Enabling verbosity if wanted.
if '-v' in sys.argv or '--verbose' in sys.argv:
VERBOSE = True
try:
sys.argv.remove('-v')
sys.argv.remove('--verbose')
except ValueError:
pass
# Now only two arguments are left: control and pid/cgroup.
if len(sys.argv) !=3:
help()
sys.exit(1)
if not CgroupHandler.is_supported(sys.argv[1]):
exit_on_error(f'"{sys.argv[1]}" is not supported.', 1)
control = sys.argv[1]
# Determining cgroup either given directly or indirectly by PID.
try:
pid = int(sys.argv[2])
cgroup = CgroupHandler.cgroup_of_pid(pid, control)
print_info(f'Got cgroup for PID {pid}: {cgroup}')
except ValueError:
cgroup = sys.argv[2]
# Now we have the following variables:
# cgroup : Cgroup where we start (which still might be incorrect, if provided by the user).
# control : Control file which we use to retrieve the effective value.
# We need the mount point for the cgroupfs.
mountpoint = CgroupHandler.get_mountpoint(control)
print_info(f'Cgroupfs mounted at: {mountpoint}')
# Creating an absolute path for the cgroup and check existence.
cgroup_fullpath = cgroup if cgroup.startswith(mountpoint) else os.path.normpath(f'{mountpoint}{cgroup}')
print_info(f'Path to cgroup: {cgroup_fullpath}')
if not os.path.isdir(cgroup_fullpath):
exit_on_error(f'Cgroup directory "{cgroup_fullpath}" does not exist.', 5)
# Traversing cgroup backwards to root.
path = cgroup_fullpath.split(os.sep)
limit = None
for index in range(len(path), 3, -1):
control_path = f'{os.sep.join(path[:index])}{os.sep}{control}'
if os.path.exists(control_path):
current_value = CgroupHandler.control_value(control_path)
print_info(f'{control_path} = {current_value}')
limit = CgroupHandler.minimum(limit, current_value)
else:
print_info(f'{control_path} missing')
print_info(f'Current limit: {limit}')
# Print the result and terminate.
if limit:
print(limit)
sys.exit(0)
if __name__ == '__main__':
main()