Skip to content

Commit

Permalink
fixed double quotes handling
Browse files Browse the repository at this point in the history
  • Loading branch information
nicolasbisurgi authored and MariusWirtz committed Sep 20, 2024
1 parent ece7354 commit bb94f8f
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 63 deletions.
117 changes: 55 additions & 62 deletions rushti.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from itertools import product
from logging.config import fileConfig
from pathlib import Path
from typing import List, Union, Dict, Tuple
from typing import List, Union, Dict, Tuple, Type, Any

import keyring

Expand Down Expand Up @@ -170,31 +170,66 @@ def decrypt_password(encrypted_password: str) -> str:


def extract_task_or_wait_from_line(line: str) -> Union[Task, Wait]:
""" Translate one line from txt file into arguments for execution: instance, process, parameters
:param line: Arguments for execution. E.g. instance="tm1srv01" process="Bedrock.Server.Wait" pWaitSec=2
:return: instance_name, process_name, parameters
"""
if line.strip().lower() == 'wait':
return Wait()

line_arguments = dict()
return extract_task_from_line(line, task_class=Task)

def extract_task_from_line_type_opt(line: str) -> OptimizedTask:
return extract_task_from_line(line, task_class=OptimizedTask)

def extract_task_from_line(line: str, task_class: Union[Type[Task], Type[OptimizedTask]]) -> Union[Task, OptimizedTask]:
line_arguments = parse_line_arguments(line)

if task_class == OptimizedTask:
task_id = line_arguments.pop("id")
predecessors = line_arguments.pop("predecessors", [])
require_predecessor_success = line_arguments.pop("require_predecessor_success", False)

return OptimizedTask(
task_id=task_id,
instance_name=line_arguments.pop("instance"),
process_name=line_arguments.pop("process"),
predecessors=predecessors,
require_predecessor_success=require_predecessor_success,
parameters=line_arguments)
else:
return Task(
instance_name=line_arguments.pop("instance"),
process_name=line_arguments.pop("process"),
parameters=line_arguments)

def parse_line_arguments(line: str) -> Dict[str, Any]:
line_arguments = {}
line = line.replace("\\", UNIQUE_STRING)
for pair in shlex.split(line):
param, value = pair.split("=")
param = param.replace(UNIQUE_STRING, "\\")

parts = shlex.split(line, posix=False)

for part in parts:
if '=' not in part:
continue
argument, value = part.split('=', 1)
argument = argument.replace(UNIQUE_STRING, "\\")
value = value.replace(UNIQUE_STRING, "\\")

# if instance or process, needs to be case insensitive
if param.lower() == "process" or param.lower() == "instance":
line_arguments[param.lower()] = value.strip('"').strip()
# parameters (e.g. pWaitSec) are case sensitive in TM1 REST API !
if argument.lower() in ["process", "instance", "id"]:
line_arguments[argument.lower()] = value.strip('"')
elif argument.lower() == "require_predecessor_success":
line_arguments[argument] = value.strip('"').lower() in TRUE_VALUES
elif argument.lower() == "predecessors":
predecessors = value.strip('"').split(",")
line_arguments[argument] = [] if predecessors[0] in ["", "0", 0] else predecessors
else:
line_arguments[param] = value.strip('"').strip()
return Task(
instance_name=line_arguments.pop("instance"),
process_name=line_arguments.pop("process"),
parameters=line_arguments)
# Remove surrounding double quotes if present
if value.startswith('"') and value.endswith('"'):
value = value[1:-1]
# Unescape quotes within the value
value = value.replace('\\"', '"')
# Replace double backslashes with single backslashes
value = value.replace("\\\\", "\\")
line_arguments[argument] = value

return line_arguments

def expand_task(
tm1_services: Dict[str, TM1Service],
Expand All @@ -211,8 +246,8 @@ def expand_task(
member_properties=['Name'],
parent_properties=None,
element_properties=None)
except:
raise RuntimeError(f"Failed to execute MDX '{mdx}'")
except Exception as e:
raise RuntimeError(f"Failed to execute MDX '{mdx}': {str(e)}")
list_params.append([(param[:-1], element[0]["Name"]) for element in elements])
else:
list_params.append([(param, value)])
Expand All @@ -226,48 +261,6 @@ def expand_task(
return result


def extract_task_from_line_type_opt(line: str) -> OptimizedTask:
""" Translate one line from txt file type 'opt' into arguments for execution
:param: line: Arguments for execution. E.g. id="5" predecessors="2,3" require_predecessor_success="1" instance="tm1srv01"
process="Bedrock.Server.Wait" pWaitSec=5
:return: attributes
"""
line_arguments = dict()
line = line.replace("\\", UNIQUE_STRING)
for pair in shlex.split(line):
argument, value = pair.split("=")
argument = argument.replace(UNIQUE_STRING, "\\")
value = value.replace(UNIQUE_STRING, "\\")

# if instance or process, needs to be case insensitive
if argument.lower() == "process" or argument.lower() == "instance" or argument.lower() == "id":
line_arguments[argument.lower()] = value.strip('"').strip()

elif argument.lower() == "require_predecessor_success":
line_arguments["require_predecessor_success"] = value.strip('"').strip().lower() in TRUE_VALUES

# Convert string attribute value into list
elif argument.lower() == "predecessors":
predecessors = value.strip('"').strip().split(",")
# "", "0" and 0 is understood as 'no predecessor'
if predecessors[0] in ["", "0", 0]:
line_arguments[argument] = []
else:
line_arguments[argument] = predecessors

# parameters (e.g. pWaitSec) are case sensitive in TM1 REST API !
else:
line_arguments[argument] = value.strip('"').strip()

return OptimizedTask(
task_id=line_arguments.pop("id"),
instance_name=line_arguments.pop("instance"),
process_name=line_arguments.pop("process"),
predecessors=line_arguments.pop("predecessors"),
require_predecessor_success=line_arguments.pop("require_predecessor_success", False),
parameters=line_arguments)


def extract_ordered_tasks_and_waits_from_file_type_norm(
file_path: str,
expand: bool = False,
Expand Down
63 changes: 62 additions & 1 deletion tests/tests.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import unittest

from rushti import deduce_levels_of_tasks, extract_tasks_from_file_type_opt, \
extract_ordered_tasks_and_waits_from_file_type_opt
extract_ordered_tasks_and_waits_from_file_type_opt, parse_line_arguments
from utils import OptimizedTask, Wait


Expand Down Expand Up @@ -128,3 +128,64 @@ def test_extract_lines_from_file_type_opt_multi_task_per_id(self):
self.assertEqual(expected_task.parameters, ordered_task.parameters)
self.assertEqual(expected_task.predecessors, ordered_task.predecessors)
self.assertEqual(expected_task.require_predecessor_success, ordered_task.require_predecessor_success)


class TestParseLineArguments(unittest.TestCase):

def test_basic_arguments(self):
line = 'instance=tm1 process=process1 param1="value1" param2="value 2"'
result = parse_line_arguments(line)
expected = {
'instance': 'tm1',
'process': 'process1',
'param1': 'value1',
'param2': 'value 2'
}
self.assertEqual(result, expected)

def test_nested_double_quotes(self):
line = 'instance=tm1 process=process1 param1="value with \\"quotes\\"" param2="simple"'
result = parse_line_arguments(line)
expected = {
'instance': 'tm1',
'process': 'process1',
'param1': 'value with "quotes"',
'param2': 'simple'
}
self.assertEqual(result, expected)

def test_backslashes(self):
line = r'instance=tm1 process=process1 param1="value\\with\\backslashes" param2="normal"'
result = parse_line_arguments(line)
expected = {
'instance': 'tm1',
'process': 'process1',
'param1': r'value\with\backslashes',
'param2': 'normal'
}
self.assertEqual(result, expected)

def test_complex_nested_quotes(self):
line = 'instance=tm1 process=process1 param1="outer \\"inner \\\\"deepest\\\\" inner\\" outer"'
result = parse_line_arguments(line)
expected = {
'instance': 'tm1',
'process': 'process1',
'param1': 'outer "inner \\"deepest\\" inner" outer'
}
self.assertEqual(result, expected)

def test_predecessors_and_require_predecessor_success(self):
line = 'id=1 instance=tm1 process=process1 predecessors="2,3,4" require_predecessor_success="true"'
result = parse_line_arguments(line)
expected = {
'id': '1',
'instance': 'tm1',
'process': 'process1',
'predecessors': ['2', '3', '4'],
'require_predecessor_success': True
}
self.assertEqual(result, expected)

if __name__ == '__main__':
unittest.main()

0 comments on commit bb94f8f

Please sign in to comment.