-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Showing
3 changed files
with
221 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
--- | ||
|
||
pyca_conf: {} | ||
|
||
... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,207 @@ | ||
from ansible.errors import AnsibleFilterError | ||
import functools | ||
import traceback | ||
|
||
|
||
# For details on the format see | ||
# https://configobj.readthedocs.io/en/latest/configobj.html#the-config-file-format | ||
|
||
|
||
DEFAULTS = { | ||
'agent': {'database': 'sqlite:///var/lib/pyca/pyca.db'}, | ||
'capture': {'directory': '/var/lib/pyca/recordings'} | ||
} | ||
|
||
|
||
def ansibleify_exceptions(f): | ||
def g(*args, **kwargs): | ||
try: | ||
return f(*args, **kwargs) | ||
except AnsibleFilterError: | ||
raise | ||
except Exception as e: | ||
raise AnsibleFilterError( | ||
'pyca-ansible: got exception:\n' + | ||
''.join(traceback.format_exception(e)).rstrip()) | ||
return g | ||
|
||
|
||
@ansibleify_exceptions | ||
def dict_to_configobj(d, defaults=DEFAULTS): | ||
''' | ||
Convert a dict d to a str (the contents of a configobj file). Values not | ||
present in d will be filled in from defaults. | ||
''' | ||
d = convert_and_quote_dict(d) | ||
defaults = convert_and_quote_dict(defaults) | ||
# Update d with defaults | ||
for section, kvs in defaults.items(): | ||
if section not in d: | ||
d[section] = kvs | ||
else: | ||
for k, v in kvs.items(): | ||
d[section].setdefault(k, v) | ||
# Write out the configobj file | ||
r = [] | ||
for section, kvs in sorted(d.items()): | ||
r.append(f'[{section}]\n') | ||
for k, v in sorted(kvs.items()): | ||
r.append(f'{k} = {v}\n') | ||
return ''.join(r) | ||
|
||
|
||
def convert_and_quote_dict(d): | ||
''' | ||
d must be of the form {str: {str: T/[T]/(T,)}}, where T can be str, int, | ||
float, or bool. The top-level keys are interpreted as section names of a | ||
configobj file, and the associated dicts as keys and values therein. This | ||
function converts all non-str objects to str and properly quotes them (note | ||
that how an object can be quoted depends on whether it is a section name, | ||
key, or value). It may be impossible to quote a value, in which case an | ||
exception is raised. | ||
bytes is accepted as an alternative to str and converted with .decode(). | ||
''' | ||
if not isinstance(d, dict): | ||
raise AnsibleFilterError('pyca-ansible: pyca_conf must be dict') | ||
r = {} | ||
for section, kvs in d.items(): | ||
quoted_section = quote_section_name( | ||
convert_bytes(section, 'keys of pyca_conf dict')) | ||
if not isinstance(kvs, dict): | ||
raise AnsibleFilterError( | ||
'pyca-ansible: values of pyca_conf dict must be dict') | ||
if not kvs: | ||
continue | ||
s = {} | ||
for k, v in kvs.items(): | ||
quoted_k = quote_keyword( | ||
convert_bytes(k, 'keys of nested dicts of pyca_conf')) | ||
quoted_v = convert_bytes_num_iter( | ||
v, 'values of nested dicts of pyca_conf', | ||
then_quote_and_join=True) | ||
s[quoted_k] = quoted_v | ||
r[quoted_section] = s | ||
return r | ||
|
||
|
||
def convert_bytes(v, entity, additional_types=''): | ||
''' | ||
Convert bytes to str. Leave str unchanged. | ||
If v is neither, entity and additional_types are used to build the error | ||
message (see the code). | ||
''' | ||
if isinstance(v, str): | ||
return v | ||
if isinstance(v, bytes): | ||
try: | ||
return v.decode() | ||
except UnicodeError: | ||
raise AnsibleFilterError( | ||
f'pyca-ansible: could not decode bytes: {v!r}') | ||
raise AnsibleFilterError( | ||
f'pyca-ansible: {entity} must be one of: str, bytes{additional_types}') | ||
|
||
|
||
def convert_bytes_num(v, entity, additional_types=''): | ||
''' | ||
Convert int, bool, and float to str. Fall back on convert_bytes for | ||
bytes and str. | ||
''' | ||
if isinstance(v, (int, float)): # Note that int includes bool | ||
return str(v) | ||
return convert_bytes(v, entity, ', bool, int, float' + additional_types) | ||
|
||
|
||
def convert_bytes_num_iter( | ||
v, entity, additional_types='', *, then_quote_and_join=False | ||
): | ||
''' | ||
Convert the elements of a list or tuple to str using convert_bytes_num, | ||
returning an iterable of the converted values. Fall back on | ||
convert_bytes_num if v is not a list or tuple. (I.e., in this case, the | ||
returned value is a bare string.) | ||
If then_quote_and_join is True, each value in the iterable or the bare | ||
value is quoted properly (with quote_list_value for iterable items or | ||
quote_scalar_value for the bare value), and in case of an iterable, the | ||
quoted values are then joined together using join_config_list, so that the | ||
function always returns a single string. | ||
''' | ||
if isinstance(v, (list, tuple)): | ||
r = map(functools.partial( | ||
convert_bytes_num, entity='items of ' + entity), v) | ||
if then_quote_and_join: | ||
r = join_config_list(map(quote_list_value, r)) | ||
else: | ||
r = convert_bytes_num(v, entity, ', list, tuple' + additional_types) | ||
if then_quote_and_join: | ||
r = quote_scalar_value(r) | ||
return r | ||
|
||
|
||
def join_config_list(it): | ||
if not isinstance(it, (list, tuple)): | ||
it = tuple(it) | ||
if len(it) == 0: | ||
return ',' | ||
if len(it) == 1: | ||
return it[0] + ',' | ||
return ', '.join(it) | ||
|
||
|
||
# Order of preference for the different kinds of quotes: | ||
# (unquoted), ', ", ''', """ | ||
sort_quotes = functools.partial( | ||
sorted, key=lambda s: (-len(s), s), reverse=True) | ||
|
||
|
||
def quote_function(entity, metachars, triple_quotes, allow_empty): | ||
''' | ||
Create a function that will quote a string as per the rules described by | ||
the arguments: | ||
- metachars is a list of characters that require wrapping the string in | ||
quotes if one is present. | ||
- triple_quotes specifies whether triple quotes are allowed. | ||
- allow_empty specifies whether empty strings are allowed. | ||
entity is used in the error message if a value cannot be quoted. | ||
''' | ||
def r(s): | ||
if s == '': | ||
if allow_empty: | ||
return "''" | ||
raise AnsibleFilterError(f'pyca-ansible: {entity} cannot be empty') | ||
maybe_triple_quotes = {"'''", '"""'} if triple_quotes else set() | ||
if '\n' in s: # Newlines can only be triple-quoted | ||
quotes = maybe_triple_quotes | ||
else: | ||
quotes = {'', "'", '"'} | maybe_triple_quotes | ||
if s[0].isspace() or s[-1].isspace(): | ||
quotes.discard('') | ||
else: | ||
for c in ["'", '"'] + metachars: | ||
if c in s: | ||
quotes.discard('') | ||
break | ||
for q in ["'", '"', "'''", '"""']: | ||
if q in s: | ||
quotes.discard(q) | ||
if not quotes: | ||
raise AnsibleFilterError( | ||
f'pyca-ansible: {entity} cannot be quoted properly: {s!r}') | ||
q = sort_quotes(quotes)[0] | ||
return q + s + q | ||
return r | ||
|
||
|
||
quote_section_name = quote_function('section name', [']'], False, False) | ||
quote_keyword = quote_function('keyword', ['=', ':'], False, True) | ||
quote_scalar_value = quote_function('value', [',', '#'], True, True) | ||
quote_list_value = quote_function('list value', [',', '#'], False, True) | ||
|
||
|
||
class FilterModule: | ||
def filters(self): | ||
return {'dict_to_configobj': dict_to_configobj} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters