diff --git a/docker-compose.development.yml b/docker-compose.development.yml index 60ec380f..b4a049fd 100644 --- a/docker-compose.development.yml +++ b/docker-compose.development.yml @@ -31,8 +31,9 @@ services: - PREFETCH_RUNS_SINCE=2592000 # 30 days in seconds - PREFETCH_RUNS_LIMIT=1 # Prefetch only one run - S3_NUM_WORKERS=2 - - CACHE_ARTIFACT_MAX_ACTIONS=4 - - CACHE_DAG_MAX_ACTIONS=4 + - CACHE_ARTIFACT_MAX_ACTIONS=1 + - CACHE_DAG_MAX_ACTIONS=1 + - CACHE_LOG_MAX_ACTIONS=1 - CACHE_ARTIFACT_STORAGE_LIMIT=16000000 - CACHE_DAG_STORAGE_LIMIT=16000000 - WS_POSTPROCESS_CONCURRENCY_LIMIT=8 @@ -62,6 +63,7 @@ services: volumes: - ./services:/root/services environment: + - LOGLEVEL=WARNING - MF_METADATA_DB_HOST=db - MF_METADATA_DB_PORT=5432 - MF_METADATA_DB_USER=postgres diff --git a/services/ui_backend_service/data/cache/client/cache_server.py b/services/ui_backend_service/data/cache/client/cache_server.py index 268709d3..829c833b 100644 --- a/services/ui_backend_service/data/cache/client/cache_server.py +++ b/services/ui_backend_service/data/cache/client/cache_server.py @@ -347,7 +347,7 @@ def _error_callback(self, worker, res): help="Maximum number of concurrent cache actions.") @click.option("--max-size", default=10000, - help="Maximum amount of disk space to use in MB.") + help="Maximum amount of disk space to use in bytes.") def cli(root=None, max_actions=None, max_size=None): diff --git a/services/ui_backend_service/data/cache/client/cache_store.py b/services/ui_backend_service/data/cache/client/cache_store.py index dd73fbf1..338eb507 100644 --- a/services/ui_backend_service/data/cache/client/cache_store.py +++ b/services/ui_backend_service/data/cache/client/cache_store.py @@ -181,12 +181,17 @@ def mark_for_deletion(path, size): unmarked_size -= size def ensure_path(self, path): + "Ensures that the directory for a given path exists, creating it if missing." dirr = os.path.dirname(path) + self.ensure_dir(dirr) + + def ensure_dir(self, dirr): + "Ensures that a directory exists, creating it if missing." if not os.path.isdir(dirr): try: makedirs(dirr) except Exception as ex: - self.warn(ex, "Could not create dir: %s" % path) + self.warn(ex, "Could not create dir: %s" % dirr) def open_tempdir(self, token, action_name, stream_key): self._gc_objects() @@ -198,7 +203,9 @@ def open_tempdir(self, token, action_name, stream_key): ) try: - tmp = tempfile.mkdtemp(prefix="cache_action_%s." % token, dir=self.tmproot) + self.ensure_dir(self.tmproot) + tmp = tempfile.mkdtemp(prefix='cache_action_%s.' % token, + dir=self.tmproot) except Exception as ex: msg = "Could not create a temp directory for request %s" % token self.warn(ex, msg) diff --git a/services/ui_backend_service/data/cache/store.py b/services/ui_backend_service/data/cache/store.py index 307c7f90..538b8987 100644 --- a/services/ui_backend_service/data/cache/store.py +++ b/services/ui_backend_service/data/cache/store.py @@ -81,7 +81,7 @@ async def start_caches(self, app): async def _monitor_restart_requests(self): while True: - for _cache in [self.artifact_cache, self.dag_cache]: + for _cache in [self.artifact_cache, self.dag_cache, self.log_cache]: if await _cache.restart_requested(): cache_name = type(_cache).__name__ logger.info("[{}] restart requested...".format(cache_name))