diff --git a/OMPython/OMCSession.py b/OMPython/OMCSession.py index ac99dc05..d5badb4b 100644 --- a/OMPython/OMCSession.py +++ b/OMPython/OMCSession.py @@ -268,6 +268,191 @@ def getClassNames(self, className=None, recursive=False, qualified=False, sort=F return self._ask(question='getClassNames', opt=opt) +class OMCPathReal(pathlib.PurePosixPath): + """ + Implementation of a basic Path object which uses OMC as backend. The connection to OMC is provided via a + OMCSessionZMQ session object. + """ + + def __init__(self, *path, session: OMCSessionZMQ) -> None: + super().__init__(*path) + self._session = session + + def with_segments(self, *pathsegments): + """ + Create a new OMCPath object with the given path segments. + + The original definition of Path is overridden to ensure session is set. + """ + return type(self)(*pathsegments, session=self._session) + + def is_file(self, *, follow_symlinks=True) -> bool: + """ + Check if the path is a regular file. + """ + return self._session.sendExpression(f'regularFileExists("{self.as_posix()}")') + + def is_dir(self, *, follow_symlinks=True) -> bool: + """ + Check if the path is a directory. + """ + return self._session.sendExpression(f'directoryExists("{self.as_posix()}")') + + def read_text(self, encoding=None, errors=None, newline=None) -> str: + """ + Read the content of the file represented by this path as text. + + The additional arguments `encoding`, `errors` and `newline` are only defined for compatibility with Path() + definition. + """ + return self._session.sendExpression(f'readFile("{self.as_posix()}")') + + def write_text(self, data: str, encoding=None, errors=None, newline=None): + """ + Write text data to the file represented by this path. + + The additional arguments `encoding`, `errors`, and `newline` are only defined for compatibility with Path() + definitions. + """ + if not isinstance(data, str): + raise TypeError('data must be str, not %s' % + data.__class__.__name__) + + return self._session.sendExpression(f'writeFile("{self.as_posix()}", "{data}", false)') + + def mkdir(self, mode=0o777, parents=False, exist_ok=False): + """ + Create a directory at the path represented by this OMCPath object. + + The additional arguments `mode`, and `parents` are only defined for compatibility with Path() definitions. + """ + if self.is_dir() and not exist_ok: + raise FileExistsError(f"Directory {self.as_posix()} already exists!") + + return self._session.sendExpression(f'mkdir("{self.as_posix()}")') + + def cwd(self): + """ + Returns the current working directory as an OMCPath object. + """ + cwd_str = self._session.sendExpression('cd()') + return OMCPath(cwd_str, session=self._session) + + def unlink(self, missing_ok: bool = False) -> None: + """ + Unlink (delete) the file or directory represented by this path. + """ + res = self._session.sendExpression(f'deleteFile("{self.as_posix()}")') + if not res and not missing_ok: + raise FileNotFoundError(f"Cannot delete file {self.as_posix()} - it does not exists!") + + def resolve(self, strict: bool = False): + """ + Resolve the path to an absolute path. This is done based on available OMC functions. + """ + if strict and not (self.is_file() or self.is_dir()): + raise OMCSessionException(f"Path {self.as_posix()} does not exist!") + + if self.is_file(): + omcpath = self._omc_resolve(self.parent.as_posix()) / self.name + elif self.is_dir(): + omcpath = self._omc_resolve(self.as_posix()) + else: + raise OMCSessionException(f"Path {self.as_posix()} is neither a file nor a directory!") + + return omcpath + + def _omc_resolve(self, pathstr: str): + """ + Internal function to resolve the path of the OMCPath object using OMC functions *WITHOUT* changing the cwd + within OMC. + """ + expression = ('omcpath_cwd := cd(); ' + f'omcpath_check := cd("{pathstr}"); ' # check requested pathstring + 'cd(omcpath_cwd)') + + try: + result = self._session.sendExpression(command=expression, parsed=False) + result_parts = result.split('\n') + pathstr_resolved = result_parts[1] + pathstr_resolved = pathstr_resolved[1:-1] # remove quotes + + omcpath_resolved = self._session.omcpath(pathstr_resolved) + except OMCSessionException as ex: + raise OMCSessionException(f"OMCPath resolve failed for {pathstr}!") from ex + + if not omcpath_resolved.is_file() and not omcpath_resolved.is_dir(): + raise OMCSessionException(f"OMCPath resolve failed for {pathstr} - path does not exist!") + + return omcpath_resolved + + def absolute(self): + """ + Resolve the path to an absolute path. This is done by calling resolve() as it is the best we can do + using OMC functions. + """ + return self.resolve(strict=True) + + def exists(self, follow_symlinks=True) -> bool: + """ + Semi replacement for pathlib.Path.exists(). + """ + return self.is_file() or self.is_dir() + + def size(self) -> int: + """ + Get the size of the file in bytes - this is an extra function and the best we can do using OMC. + """ + if not self.is_file(): + raise OMCSessionException(f"Path {self.as_posix()} is not a file!") + + res = self._session.sendExpression(f'stat("{self.as_posix()}")') + if res[0]: + return int(res[1]) + + raise OMCSessionException(f"Error reading file size for path {self.as_posix()}!") + + +if sys.version_info < (3, 12): + + class OMCPathCompatibility(pathlib.Path): + """ + Compatibility class for OMCPath in Python < 3.12. This allows to run all code which uses OMCPath (mainly + ModelicaSystem) on these Python versions. There is one remaining limitation: only OMCProcessLocal will work as + OMCPathCompatibility is based on the standard pathlib.Path implementation. + """ + + # modified copy of pathlib.Path.__new__() definition + def __new__(cls, *args, **kwargs): + logger.warning("Python < 3.12 - using a version of class OMCPath " + "based on pathlib.Path for local usage only.") + + if cls is OMCPathCompatibility: + cls = OMCPathCompatibilityWindows if os.name == 'nt' else OMCPathCompatibilityPosix + self = cls._from_parts(args) + if not self._flavour.is_supported: + raise NotImplementedError("cannot instantiate %r on your system" + % (cls.__name__,)) + return self + + def size(self) -> int: + """ + Needed compatibility function to have the same interface as OMCPathReal + """ + return self.stat().st_size + + class OMCPathCompatibilityPosix(pathlib.PosixPath, OMCPathCompatibility): + pass + + class OMCPathCompatibilityWindows(pathlib.WindowsPath, OMCPathCompatibility): + pass + + OMCPath = OMCPathCompatibility + +else: + OMCPath = OMCPathReal + + class OMCSessionZMQ: def __init__( @@ -322,6 +507,52 @@ def __del__(self): self.omc_zmq = None + def omcpath(self, *path) -> OMCPath: + """ + Create an OMCPath object based on the given path segments and the current OMC session. + """ + + # fallback solution for Python < 3.12; a modified pathlib.Path object is used as OMCPath replacement + if sys.version_info < (3, 12): + if isinstance(self.omc_process, OMCProcessLocal): + # noinspection PyArgumentList + return OMCPath(*path) + else: + raise OMCSessionException("OMCPath is supported for Python < 3.12 only if OMCProcessLocal is used!") + else: + return OMCPath(*path, session=self) + + def omcpath_tempdir(self, tempdir_base: Optional[OMCPath] = None) -> OMCPath: + """ + Get a temporary directory using OMC. It is our own implementation as non-local usage relies on OMC to run all + filesystem related access. + """ + names = [str(uuid.uuid4()) for _ in range(100)] + + if tempdir_base is None: + # fallback solution for Python < 3.12; a modified pathlib.Path object is used as OMCPath replacement + if sys.version_info < (3, 12): + tempdir_str = tempfile.gettempdir() + else: + tempdir_str = self.sendExpression("getTempDirectoryPath()") + tempdir_base = self.omcpath(tempdir_str) + + tempdir: Optional[OMCPath] = None + for name in names: + # create a unique temporary directory name + tempdir = tempdir_base / name + + if tempdir.exists(): + continue + + tempdir.mkdir(parents=True, exist_ok=False) + break + + if tempdir is None or not tempdir.is_dir(): + raise OMCSessionException("Cannot create a temporary directory!") + + return tempdir + def execute(self, command: str): warnings.warn("This function is depreciated and will be removed in future versions; " "please use sendExpression() instead", DeprecationWarning, stacklevel=2) diff --git a/tests/test_OMCPath.py b/tests/test_OMCPath.py new file mode 100644 index 00000000..b8e937f3 --- /dev/null +++ b/tests/test_OMCPath.py @@ -0,0 +1,78 @@ +import sys +import OMPython +import pytest + +skip_on_windows = pytest.mark.skipif( + sys.platform.startswith("win"), + reason="OpenModelica Docker image is Linux-only; skipping on Windows.", +) + +skip_python_older_312 = pytest.mark.skipif( + sys.version_info < (3, 12), + reason="OMCPath(non-local) only working for Python >= 3.12.", +) + + +def test_OMCPath_OMCSessionZMQ(): + om = OMPython.OMCSessionZMQ() + + _run_OMCPath_checks(om) + + del om + + +def test_OMCPath_OMCProcessLocal(): + omp = OMPython.OMCProcessLocal() + om = OMPython.OMCSessionZMQ(omc_process=omp) + + _run_OMCPath_checks(om) + + del om + + +@skip_on_windows +@skip_python_older_312 +def test_OMCPath_OMCProcessDocker(): + omcp = OMPython.OMCProcessDocker(docker="openmodelica/openmodelica:v1.25.0-minimal") + om = OMPython.OMCSessionZMQ(omc_process=omcp) + assert om.sendExpression("getVersion()") == "OpenModelica 1.25.0" + + _run_OMCPath_checks(om) + + del omcp + del om + + +@pytest.mark.skip(reason="Not able to run WSL on github") +@skip_python_older_312 +def test_OMCPath_OMCProcessWSL(): + omcp = OMPython.OMCProcessWSL( + wsl_omc='omc', + wsl_user='omc', + timeout=30.0, + ) + om = OMPython.OMCSessionZMQ(omc_process=omcp) + + _run_OMCPath_checks(om) + + del omcp + del om + + +def _run_OMCPath_checks(om: OMPython.OMCSessionZMQ): + p1 = om.omcpath_tempdir() + p2 = p1 / 'test' + p2.mkdir() + assert p2.is_dir() + p3 = p2 / '..' / p2.name / 'test.txt' + assert p3.is_file() is False + assert p3.write_text('test') + assert p3.is_file() + assert p3.size() > 0 + p3 = p3.resolve().absolute() + assert str(p3) == str((p2 / 'test.txt').resolve().absolute()) + assert p3.read_text() == "test" + assert p3.is_file() + assert p3.parent.is_dir() + p3.unlink() + assert p3.is_file() is False