-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathvault2vault.py
447 lines (381 loc) · 16.9 KB
/
vault2vault.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
"""CLI tool for recursively rekeying ansible-vault encrypted secrets"""
import argparse
import getpass
import logging
import re
import shutil
import sys
from pathlib import Path
from typing import Any
from typing import Iterable
from typing import List
from typing import Optional
import ruamel.yaml
try:
import ansible.constants
from ansible.parsing.vault import VaultSecret
from ansible.parsing.vault import VaultLib
from ansible.parsing.vault import AnsibleVaultError
except ImportError:
print(
"FATAL: No supported version of Ansible could be imported under the current python interpreter",
file=sys.stderr,
)
sys.exit(1)
__title__ = "vault2vault"
__summary__ = "Recursively rekey ansible-vault encrypted files and in-line variables"
__version__ = "0.1.3"
__url__ = "https://github.com/enpaul/vault2vault/"
__license__ = "MIT"
__authors__ = ["Ethan Paul <[email protected]>"]
YAML_FILE_EXTENSIONS = (".yml", ".yaml")
yaml = ruamel.yaml.YAML(typ="rt")
ruamel.yaml.add_constructor(
"!vault",
lambda loader, node: node.value,
constructor=ruamel.yaml.SafeConstructor,
)
def rekey(
old: VaultLib,
new: VaultLib,
content: bytes,
) -> bytes:
"""Rekey vaulted content to use a new vault password
:param old: ``VaultLib`` object populated with the vault password the content is
currently encrypted with
:param new: ``VaultLib`` object populated with the vault password the content will
be re-encrypted with
:param content: Content to decrypt using ``old`` and re-encrypt using ``new``
:returns: The value of ``content`` decrypted using the existing vault password and
re-encrypted using the new vault password
"""
return new.encrypt(old.decrypt(content))
# This whole function needs to be rebuilt from the ground up so I don't
# feel bad about disabling this warning
def _process_file( # pylint: disable=too-many-statements
path: Path,
old: VaultLib,
new: VaultLib,
interactive: bool,
backup: bool,
ignore: bool,
) -> None:
"""Determine whether a filepath includes vaulted data and if so, rekey it
:param path: Path to the file to check
:param old: VaultLib object with the current (old) vault password encoded in it
:param new: VaultLib object with the target (new) vault password encoded in it
:param interactive: Whether to prompt interactively for confirmation before each
rekey operation
:param backup: Whether to copy the original file to a backup before making any
in-place changes
:param ignore: Whether to ignore any errors that come from failing to decrypt
any vaulted data
"""
logger = logging.getLogger(__name__)
logger.debug(f"Processing file {path}")
def _process_yaml_data( # pylint: disable=too-many-locals
content: bytes, data: Any, ignore: bool, name: str = ""
):
if isinstance(data, dict):
for key, value in data.items():
content = _process_yaml_data(
content, value, ignore, name=f"{name}.{key}"
)
elif isinstance(data, list):
for index, item in enumerate(data):
content = _process_yaml_data(
content, item, ignore, name=f"{name}.{index}"
)
elif isinstance(data, ruamel.yaml.comments.TaggedScalar) and old.is_encrypted(
data.value
):
logger.info(f"Identified vaulted content in {path} at {name}")
confirm = (
_confirm(f"Rekey vault encrypted variable {name} in file {path}?")
if interactive
else True
)
if not confirm:
logger.debug(
f"User skipped vault encrypted content in {path} at {name} via interactive mode"
)
return content
try:
new_data = rekey(old, new, data.value.encode())
except AnsibleVaultError as err:
msg = f"Failed to decrypt vault encrypted data in {path} at {name} with provided vault secret"
if ignore:
logger.warning(msg)
return content
raise RuntimeError(msg) from err
content_decoded = content.decode("utf-8")
# Ok so this next section is probably the worst possible way to do this, but I did
# it this way to solve a very specific problem that would absolutely prevent people
# from using this tool: round trip YAML format preservation. Namely, that it's impossible.
# Ruamel gets the closest to achieving this: it can do round trip format preservation
# when the starting state is in _some_ known state (this is better than competitors which
# require the starting state to be in a _specific_ known state). But given how many
# ways there are to write YAML- and by extension, how many opinions there are on the
# "correct" way to write YAML- it is not possible to configure ruamel to account for all of
# them, even if everyones YAML style was compatible with ruamel's roundtrip formatting (note:
# they aren't). So there's the problem: to be useful, this tool would need to reformat every
# YAML file it touched, which means nobody would use it.
#
# To avoid the YAML formatting problem, we need a way to replace the target content
# in the raw text of the file without dumping the parsed YAML. We want to preserve
# indendation, remove any extra newlines that would be left over, add any necessary
# newlines without clobbering the following lines, and ideally avoid reimplementing
# a YAML formatter. The answer to this problem- as the answer to so many stupid problems
# seems to be- is a regex. If this is too janky for you (I know it is for me) go support
# the estraven project I'm trying to get off the ground: https://github.com/enpaul/estraven
#
# Ok, thanks for sticking with me as I was poetic about this. The solution below...
# is awful, I can admit that. But it does work, so I'll leave it up to
# your judgement as to whether it's worthwhile or not. Here's how it works:
#
# 1. First we take the first line of the original (unmodified) vaulted content. This line
# of text has several important qualities: 1) it exists in the raw text of the file, 2)
# it is pseudo-guaranteed to be unique, and 3) it is guaranteed to exist (vaulted content
# will be at least one line long, but possibly no more)
search_data = data.value.split("\n")[1]
try:
# 2. Next we use a regex to grab the full line of text from the file that includes the above
# string. This is important because the full line of text will include the leading
# whitespace, which ruamel helpfully strips out from the parsed data.
# 3. Next we grab the number of leading spaces on the line using the capture group from the
# regex
padding = len(
re.search(rf"\n(\s*){search_data}\n", content_decoded).groups()[0]
)
except (TypeError, AttributeError):
# This is to handle an edgecase where the vaulted content is actually a yaml anchor. For
# example, if a single vaulted secret needs to be stored under multiple variable names.
# In that case, the vaulted content iself will only appear once in the file, but the data
# parsed by ruamel will include it twice. If we fail to get a match on the first line, then
# we check whether the data is a yaml anchor and, if it is, we skip it.
if data.anchor.value:
logger.debug(
f"Content replacement for encrypted content in {path} at {name} was not found, so replacement will be skipped because target is a YAML anchor"
)
return content
raise
# 4. Now with the leading whitespace padding, we add this same number of spaces to each line
# of *both* the old vaulted data and the new vaulted data. It's important to do both because
# we'll need to do a replacement in a moment so we need to know both what we're replacing
# and what we're replacing it with.
padded_old_data = "\n".join(
[f"{' ' * padding}{item}" for item in data.value.split("\n") if item]
)
padded_new_data = "\n".join(
[
f"{' ' * padding}{item}"
for item in new_data.decode("utf-8").split("\n")
if item
]
)
# 5. Finally, we actually replace the content. This needs to have a count=1 so that if the same
# encrypted block appears twice in the same file we only replace the first occurance of it,
# otherwise the later replacement attempts will fail. We also need to re-encode it back to
# bytes because all file operations with vault are done in bytes mode
content = content_decoded.replace(
padded_old_data, padded_new_data, 1
).encode()
return content
with path.open("rb") as infile:
raw = infile.read()
# The 'is_encrypted' check doesn't rely on the vault secret in the VaultLib matching the
# secret the data was encrypted with, it just checks that the data is encrypted with some
# vault secret. We could use either `old` or `new` for this check, it doesn't actually matter.
if old.is_encrypted(raw):
logger.info(f"Identified vault encrypted file: {path}")
confirm = (
_confirm(f"Rekey vault encrypted file {path}?") if interactive else True
)
if not confirm:
logger.debug(
f"User skipped vault encrypted file {path} via interactive mode"
)
return
if backup:
path.rename(f"{path}.bak")
try:
updated = rekey(old, new, raw)
except AnsibleVaultError:
msg = f"Failed to decrypt vault encrypted file {path} with provided vault secret"
if ignore:
logger.warning(msg)
return
raise RuntimeError(msg) from None
elif path.suffix.lower() in YAML_FILE_EXTENSIONS:
logger.debug(f"Identified YAML file: {path}")
confirm = (
_confirm(f"Search YAML file {path} for vault encrypted variables?")
if interactive
else True
)
data = yaml.load(raw)
if not confirm:
logger.debug(
f"User skipped processing YAML file {path} via interactive mode"
)
return
if backup:
shutil.copy(path, f"{path}.bak")
updated = _process_yaml_data(raw, data, ignore=ignore)
else:
logger.debug(f"Skipping non-vault file {path}")
return
logger.debug(f"Writing updated file contents to {path}")
with path.open("wb") as outfile:
outfile.write(updated)
def _get_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
prog=__title__,
description=__summary__,
)
parser.add_argument(
"--version", help="Show program version and exit", action="store_true"
)
parser.add_argument(
"--interactive",
help="Step through files and variables interactively, prompting for confirmation before making each change",
action="store_true",
)
parser.add_argument(
"-v",
"--verbose",
help="Increase verbosity; can be repeated",
action="count",
default=0,
)
parser.add_argument(
"-b",
"--backup",
help="Write a backup of every file to be modified, suffixed with '.bak'",
action="store_true",
)
parser.add_argument(
"-i",
"--vault-id",
help="Limit rekeying to encrypted secrets with the specified Vault ID",
type=str,
default=ansible.constants.DEFAULT_VAULT_IDENTITY,
)
parser.add_argument(
"--ignore-undecryptable",
help="Ignore any file or variable that is not decryptable with the provided vault secret instead of raising an error",
action="store_true",
)
parser.add_argument(
"--old-pass-file",
help="Path to a file with the old vault password to decrypt secrets with",
type=str,
dest="old_pass_file",
)
parser.add_argument(
"--new-pass-file",
help="Path to a file with the new vault password to rekey secrets with",
type=str,
dest="new_pass_file",
)
parser.add_argument(
"paths", help="Paths to search for Ansible Vault encrypted content", nargs="*"
)
return parser.parse_args()
def _confirm(prompt: str, default: bool = True) -> bool:
while True:
confirm = input(f"{prompt} [{'YES/no' if default else 'yes/NO'}]: ")
if not confirm:
return default
if confirm.lower() in ["yes", "y"]:
return True
if confirm.lower() in ["no", "n"]:
return False
print("Please input one of the specified options", file=sys.stderr)
def _expand_paths(paths: Iterable[Path]) -> List[Path]:
logger = logging.getLogger(__name__)
results = []
for path in paths:
path = Path(path).resolve()
if path.is_file():
logger.debug(f"Including file {path}")
results.append(path)
elif path.is_dir():
logger.debug(f"Identifying files under {path}")
results += _expand_paths(path.iterdir())
else:
logger.debug(f"Discarding path {path}")
return results
def _load_password(
fpath: Optional[str], desc: str = "", confirm: bool = True
) -> VaultSecret:
"""Load a password from a file or interactively
:param fpath: Optional path to the file containing the vault password. If not provided then
the password will be prompted for interactively.
:param desc: Description text to inject into the interactive password prompt. Useful when using
this function multiple times to identify different passwords to the user.
:param confirm: Whether to prompt twice for the input and check that the two inputs match
:returns: Populated vault secret object with the loaded password
"""
logger = logging.getLogger(__name__)
if fpath:
try:
with Path(fpath).resolve().open("rb") as infile:
return VaultSecret(infile.read())
except (FileNotFoundError, PermissionError) as err:
raise RuntimeError(
f"Specified vault password file '{fpath}' does not exist or is unreadable"
) from err
logger.debug("No vault password file provided, prompting for interactive input")
password_1 = getpass.getpass(
prompt=f"Enter {desc} Ansible Vault password: ", stream=sys.stderr
)
if confirm:
password_2 = getpass.getpass(
prompt=f"Confirm (re-enter) {desc} Ansible Vault password: ",
stream=sys.stderr,
)
if password_1 != password_2:
raise RuntimeError(f"Provided {desc} passwords do not match")
return VaultSecret(password_1.encode("utf-8"))
def main():
"""Main program entrypoint and CLI interface"""
args = _get_args()
logger = logging.getLogger(__name__)
logging.basicConfig(
stream=sys.stderr,
format="%(levelname)s: %(message)s",
level=max(logging.WARNING - (args.verbose * 10), 0),
)
if args.version:
print(f"{__title__} {__version__}")
sys.exit(0)
if not args.paths:
logger.warning("No paths provided, nothing to do!")
sys.exit(0)
try:
old_pass = _load_password(args.old_pass_file, desc="existing", confirm=False)
new_pass = _load_password(args.new_pass_file, desc="new", confirm=True)
in_vault = VaultLib([(args.vault_id, old_pass)])
out_vault = VaultLib([(args.vault_id, new_pass)])
except RuntimeError as err:
logger.error(str(err))
sys.exit(1)
except KeyboardInterrupt:
sys.exit(130)
logger.info(
f"Identifying all files under {len(args.paths)} input paths: {', '.join(args.paths)}"
)
files = _expand_paths(args.paths)
logger.info(f"Identified {len(files)} files for processing")
for filepath in files:
_process_file(
filepath,
in_vault,
out_vault,
args.interactive,
args.backup,
args.ignore_undecryptable,
)
if __name__ == "__main__":
main()