From 9215d648d8c921d3515783d7e579e6c2ce3ecc36 Mon Sep 17 00:00:00 2001 From: tfesenbecker Date: Thu, 31 Oct 2019 16:54:59 +0100 Subject: [PATCH 01/29] added storage object to represent caches and associated readout functionality --- lapis/storage.py | 81 +++++++++++++++++++++++++ lapis/storage_io/__init__.py | 0 lapis/storage_io/storage_information.py | 34 +++++++++++ 3 files changed, 115 insertions(+) create mode 100644 lapis/storage.py create mode 100644 lapis/storage_io/__init__.py create mode 100644 lapis/storage_io/storage_information.py diff --git a/lapis/storage.py b/lapis/storage.py new file mode 100644 index 0000000..fd69e23 --- /dev/null +++ b/lapis/storage.py @@ -0,0 +1,81 @@ +from usim import time + +from typing import Optional + +from lapis.utilities.cache_algorithm_implementations import cache_algorithm +from lapis.utilities.cache_cleanup_implementations import cache_cleanup + + +class Storage(object): + + __slots__ = ("name", "sitename", "storagesize", "usedstorage", "content") + + def __init__( + self, name: str, sitename: str, storagesize: int, content: Optional[dict] = None + ): + self.name = name + self.sitename = sitename + self.storagesize = storagesize + self.content = content + self.usedstorage = self.get_used_storage() + self.describe_state() + + def get_used_storage(self): + return sum(subdict["usedsize"] for subdict in self.content.values()) + + def free_space(self): + return self.storagesize - self.usedstorage + + def place_new_file(self, filerequest: tuple): + filename, filespecs = filerequest + if self.free_space() - filespecs["usedsize"] < 0: + self.make_room(self.free_space() - filespecs["usedsize"]) + self.content.update({filename: filespecs}) + self.content[filename].update( + cachedsince=time.now, lastaccessed=time.now, numberofaccesses=0 + ) + self.usedstorage = self.get_used_storage() + + def update_file(self, filerequest: tuple): + filename, filespecs = filerequest + requested_file = filename + filesize_difference = ( + filespecs["usedsize"] - self.content[requested_file]["usedsize"] + ) + if filesize_difference > 0: + self.make_room(filesize_difference) + self.content[requested_file]["usedsize"] += filesize_difference + self.content[requested_file]["lastaccessed"] = time.now + self.content[requested_file]["numberofaccesses"] += 1 + self.usedstorage = self.get_used_storage() + + def make_room(self, filesize_difference: int): + if self.free_space() - filesize_difference < 0: + cache_cleanup["fifo"](filesize_difference, self) + + def provides_file(self, filerequest: dict): + filename, filespecs = filerequest + if filename in self.content.keys(): + self.update_file(filerequest) + return True + else: + if self.cache_file(): + self.place_new_file(filerequest) + return False + + def cache_file(self): + # cache everything, test different implementations + return cache_algorithm["standard"]() + + def describe_state(self): + print( + "{name} on site {site}: {used}MB of {tot}MB used ({div} %), contains " + "files {filelist}".format( + name=self.name, + site=self.sitename, + used=self.usedstorage, + tot=self.storagesize, + div=100.0 * self.usedstorage / self.storagesize, + filelist=", ".join(self.content.keys()), + ) + ) diff --git a/lapis/storage_io/__init__.py b/lapis/storage_io/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/lapis/storage_io/storage_information.py b/lapis/storage_io/storage_information.py new file mode 100644 index 0000000..5829816 --- /dev/null +++ b/lapis/storage_io/storage_information.py @@ -0,0 +1,34 @@ +import csv +from lapis.storage import Storage + + +def storage_reader(storage, storage_content): + storage_content = storage_content_reader(storage_content) + reader = csv.DictReader(storage, delimiter=" ", quotechar="'") + for row in reader: + yield Storage( + name=row["name"], + sitename=row["sitename"], + storagesize=int(row["cachesizeMB"]), + content=storage_content[row["name"]], + ) + + +def storage_content_reader(file_name): + reader = csv.DictReader(file_name, delimiter=" ", quotechar="'") + cache_information = dict() + for row in reader: + if row["cachename"] not in cache_information.keys(): + cache_information[row["cachename"]] = dict() + cache_information[row["cachename"]][row["filename"]] = dict() + for key in [ + "filesize", + "usedsize", + "cachedsince", + "lastaccessed", + "numberofaccesses", + ]: + cache_information[row["cachename"]][row["filename"]][key] = int(row[key]) + if not cache_information: + cache_information = None + return cache_information From 0a46da7bae67b0d13ee3bd56021b6c29bd25e96f Mon Sep 17 00:00:00 2001 From: tfesenbecker Date: Thu, 31 Oct 2019 16:56:38 +0100 Subject: [PATCH 02/29] extended CLI to support storage files --- lapis/cli/simulate.py | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/lapis/cli/simulate.py b/lapis/cli/simulate.py index aa176ee..a4c98c3 100644 --- a/lapis/cli/simulate.py +++ b/lapis/cli/simulate.py @@ -6,9 +6,11 @@ from lapis.controller import SimulatedLinearController from lapis.job_io.htcondor import htcondor_job_reader + from lapis.pool import StaticPool, Pool from lapis.pool_io.htcondor import htcondor_pool_reader from lapis.job_io.swf import swf_job_reader +from lapis.storage_io.storage_information import storage_reader from lapis.scheduler import CondorJobScheduler from lapis.simulator import Simulator @@ -25,6 +27,8 @@ pool_import_mapper = {"htcondor": htcondor_pool_reader} +storage_import_mapper = {"standard": storage_reader} + @click.group() @click.option("--seed", type=int, default=1234) @@ -71,8 +75,17 @@ def cli(ctx, seed, until, log_tcp, log_file, log_telegraf): type=(click.File("r"), click.Choice(list(pool_import_mapper.keys()))), multiple=True, ) +@click.option( + "--storage-files", + "storage_files", + type=( + click.File("r"), + click.File("r"), + click.Choice(list(storage_import_mapper.keys())), + ), +) @click.pass_context -def static(ctx, job_file, pool_file): +def static(ctx, job_file, pool_file, storage_files): click.echo("starting static environment") simulator = Simulator(seed=ctx.obj["seed"]) file, file_type = job_file @@ -87,6 +100,12 @@ def static(ctx, job_file, pool_file): pool_reader=pool_import_mapper[pool_file_type], pool_type=StaticPool, ) + storage_file, storage_content_file, storage_type = storage_files + simulator.create_storage( + storage_input=storage_file, + storage_content_input=storage_content_file, + storage_reader=storage_import_mapper[storage_type], + ) simulator.run(until=ctx.obj["until"]) From e727b690dcbc25c4007e21e49053a479a2e50c6f Mon Sep 17 00:00:00 2001 From: tfesenbecker Date: Thu, 31 Oct 2019 17:31:03 +0100 Subject: [PATCH 03/29] extended simulator to support storage files --- lapis/simulator.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/lapis/simulator.py b/lapis/simulator.py index 9689ae2..9b441b3 100644 --- a/lapis/simulator.py +++ b/lapis/simulator.py @@ -6,6 +6,7 @@ from lapis.drone import Drone from lapis.job import job_to_queue_scheduler +from lapis.file_provider import FileProvider from lapis.monitor.general import ( user_demand, job_statistics, @@ -26,6 +27,8 @@ def __init__(self, seed=1234): random.seed(seed) self.job_queue = Queue() self.pools = [] + self.storage_list = [] + self.fileprovider = FileProvider() self.controllers = [] self.job_scheduler = None self.job_generator = None @@ -59,8 +62,17 @@ def create_pools(self, pool_input, pool_reader, pool_type, controller=None): if controller: self.controllers.append(controller(target=pool, rate=1)) + def create_storage(self, storage_input, storage_content_input, storage_reader): + for storage in storage_reader( + storage=storage_input, storage_content=storage_content_input + ): + self.storage_list.append(storage) + self.fileprovider.add_storage_element(storage) + def create_scheduler(self, scheduler_type): - self.job_scheduler = scheduler_type(job_queue=self.job_queue) + self.job_scheduler = scheduler_type( + job_queue=self.job_queue, fileprovider=self.fileprovider + ) def run(self, until=None): print(f"running until {until}") From d64954d8cb185b3de7f7d06b514d827e39d9dadf Mon Sep 17 00:00:00 2001 From: tfesenbecker Date: Thu, 31 Oct 2019 17:32:49 +0100 Subject: [PATCH 04/29] added new drone attribute sitename connecting drones and storage elements --- lapis/drone.py | 7 +++++++ lapis/pool_io/htcondor.py | 1 + 2 files changed, 8 insertions(+) diff --git a/lapis/drone.py b/lapis/drone.py index b6d9cab..a30c72d 100644 --- a/lapis/drone.py +++ b/lapis/drone.py @@ -15,6 +15,7 @@ def __init__( pool_resources: dict, scheduling_duration: float, ignore_resources: list = None, + sitename: str = None, ): """ :param scheduler: @@ -23,6 +24,7 @@ def __init__( """ super(Drone, self).__init__() self.scheduler = scheduler + self.sitename = sitename self.pool_resources = pool_resources self.resources = Capacities(**pool_resources) # shadowing requested resources to determine jobs to be killed @@ -140,6 +142,11 @@ async def start_job(self, job: Job, kill: bool = False): pass self.scheduler.update_drone(self) await job_execution.done + print( + "finished job {} on drone {} @ {}".format( + repr(job), repr(self), time.now + ) + ) except ResourcesUnavailable: await instant job_execution.cancel() diff --git a/lapis/pool_io/htcondor.py b/lapis/pool_io/htcondor.py index 314eb1e..9c1c23f 100644 --- a/lapis/pool_io/htcondor.py +++ b/lapis/pool_io/htcondor.py @@ -48,5 +48,6 @@ def htcondor_pool_reader( for key, value in resource_name_mapping.items() }, ignore_resources=["disk"], + sitename=row.get("sitename", None), ), ) From 55974348cae02bf3be5f731c6fb1c01a69d08c8e Mon Sep 17 00:00:00 2001 From: tfesenbecker Date: Thu, 31 Oct 2019 17:35:45 +0100 Subject: [PATCH 05/29] added file provider object connecting storage objects and jobs --- lapis/file_provider.py | 37 +++++++++++++++++++++++++++++++++++++ lapis/job.py | 17 ++++++++++++++++- lapis/scheduler.py | 11 +++++++++-- 3 files changed, 62 insertions(+), 3 deletions(-) create mode 100644 lapis/file_provider.py diff --git a/lapis/file_provider.py b/lapis/file_provider.py new file mode 100644 index 0000000..044ae6b --- /dev/null +++ b/lapis/file_provider.py @@ -0,0 +1,37 @@ +from lapis.storage import Storage + + +class FileProvider(object): + + __slots__ = ("storages",) + + def __init__(self): + self.storages = dict() + + def add_storage_element(self, storage_element: Storage): + try: + self.storages[storage_element.sitename].append(storage_element) + except KeyError: + self.storages[storage_element.sitename] = [storage_element] + + def provides_all_files(self, job): + """ + Dummy implementation, to be replaced: if a part of every inputfile of the job is + provided by a storage element located on the same site as the drone the job + is running on this function returns True + :param job: + :return: + """ + provided_storages = self.storages.get(job.drone.sitename, None) + if provided_storages: + for inputfilename, inputfilespecs in job.inputfiles.items(): + provides_inputfile = 0 + for storage in provided_storages: + provides_inputfile += storage.provides_file( + (inputfilename, inputfilespecs) + ) + if not provides_inputfile: + return False + return True + else: + return False diff --git a/lapis/job.py b/lapis/job.py index 9354288..cb22ec0 100644 --- a/lapis/job.py +++ b/lapis/job.py @@ -5,6 +5,7 @@ from usim import CancelTask from lapis.monitor import sampling_required +from lapis.utilities.walltime_models import walltime_models if TYPE_CHECKING: from lapis.drone import Drone @@ -22,6 +23,7 @@ class Job(object): "in_queue_until", "_name", "drone", + "fileprovider", "_success", ) @@ -66,6 +68,7 @@ def __init__( self.in_queue_since = in_queue_since self.in_queue_until = None self.drone = drone + self.fileprovider = None self._name = name self._success: Optional[bool] = None @@ -89,12 +92,24 @@ def waiting_time(self) -> float: return self.in_queue_until - self.in_queue_since return float("Inf") + def modified_walltime(self): + if self.fileprovider.provides_all_files(self): + return walltime_models["maxeff"](self) + else: + return self.walltime + async def run(self): self.in_queue_until = time.now self._success = None await sampling_required.put(self) + print( + "running job {} on site {} in drone {}".format( + repr(self), self.drone.sitename, repr(self.drone) + ) + ) + walltime = self.modified_walltime() try: - await (time + self.walltime) + await (time + walltime) except CancelTask: self._success = False except BaseException: diff --git a/lapis/scheduler.py b/lapis/scheduler.py index c3ad63e..50207d6 100644 --- a/lapis/scheduler.py +++ b/lapis/scheduler.py @@ -1,5 +1,5 @@ from typing import Dict -from usim import Scope, interval +from usim import Scope, interval, time from lapis.drone import Drone from lapis.monitor import sampling_required @@ -26,9 +26,10 @@ class CondorJobScheduler(object): :return: """ - def __init__(self, job_queue): + def __init__(self, job_queue, fileprovider): self._stream_queue = job_queue self.drone_cluster = [] + self.fileprovider = fileprovider self.interval = 60 self.job_queue = JobQueue() self._collecting = True @@ -90,6 +91,12 @@ async def run(self): for job in self.job_queue: best_match = self._schedule_job(job) if best_match: + job.fileprovider = self.fileprovider + print( + "start job {} on drone {} @ {}".format( + repr(job), repr(best_match), time.now + ) + ) scope.do(best_match.start_job(job)) self.job_queue.remove(job) await sampling_required.put(self.job_queue) From bb1dcbe429ce14f65c228cfa389b4a2d1334f10a Mon Sep 17 00:00:00 2001 From: tfesenbecker Date: Thu, 31 Oct 2019 17:37:04 +0100 Subject: [PATCH 06/29] added different caching/cache cleaning/walltime recalculation algorithms --- .../cache_algorithm_implementations.py | 5 ++++ .../cache_cleanup_implementations.py | 25 +++++++++++++++++++ lapis/utilities/walltime_models.py | 6 +++++ 3 files changed, 36 insertions(+) create mode 100644 lapis/utilities/cache_algorithm_implementations.py create mode 100644 lapis/utilities/cache_cleanup_implementations.py create mode 100644 lapis/utilities/walltime_models.py diff --git a/lapis/utilities/cache_algorithm_implementations.py b/lapis/utilities/cache_algorithm_implementations.py new file mode 100644 index 0000000..23b1ff0 --- /dev/null +++ b/lapis/utilities/cache_algorithm_implementations.py @@ -0,0 +1,5 @@ +def cache_all(): + return True + + +cache_algorithm = {"standard": cache_all} diff --git a/lapis/utilities/cache_cleanup_implementations.py b/lapis/utilities/cache_cleanup_implementations.py new file mode 100644 index 0000000..90c4e6e --- /dev/null +++ b/lapis/utilities/cache_cleanup_implementations.py @@ -0,0 +1,25 @@ +def fifo(size, storage): + # FIFO, test different implementations + sorted_content = sorted( + list(storage.content.items()), key=lambda x: x[1]["cachedsince"] + ) + while size < 0: + size += sorted_content[0][1]["cachedsizeMB"] + storage.content.pop(sorted_content[0][0]) + storage.usedstorage -= sorted_content[0][1]["cachedsizeMB"] + sorted_content.pop(0) + + +def last_accessed(size, storage): + # FIFO, test different implementations + sorted_content = sorted( + list(storage.content.items()), key=lambda x: x[1]["lastaccessed"] + ) + while size < 0: + size += sorted_content[0][1]["cachedsizeMB"] + storage.content.pop(sorted_content[0][0]) + storage.usedstorage -= sorted_content[0][1]["cachedsizeMB"] + sorted_content.pop(0) + + +cache_cleanup = {"fifo": fifo, "lastaccessed": last_accessed} diff --git a/lapis/utilities/walltime_models.py b/lapis/utilities/walltime_models.py new file mode 100644 index 0000000..d214c25 --- /dev/null +++ b/lapis/utilities/walltime_models.py @@ -0,0 +1,6 @@ +def extrapolate_walltime_to_maximal_efficiency(job, maximal_efficiency: float = 0.8): + return (job.used_resources["cores"] / maximal_efficiency) * job.walltime + + +# TODO: add models depending on fraction of cached files etc +walltime_models = {"maxeff": extrapolate_walltime_to_maximal_efficiency} From 8d6db96cc625cde44fd91ace5d10fd85e8cd5b93 Mon Sep 17 00:00:00 2001 From: tfesenbecker Date: Thu, 31 Oct 2019 20:00:58 +0100 Subject: [PATCH 07/29] renamed storage readout --- lapis/cli/simulate.py | 2 +- lapis/storage_io/storage.py | 34 ++++++++++++++++++++++++++++++++++ 2 files changed, 35 insertions(+), 1 deletion(-) create mode 100644 lapis/storage_io/storage.py diff --git a/lapis/cli/simulate.py b/lapis/cli/simulate.py index a4c98c3..8c41848 100644 --- a/lapis/cli/simulate.py +++ b/lapis/cli/simulate.py @@ -10,7 +10,7 @@ from lapis.pool import StaticPool, Pool from lapis.pool_io.htcondor import htcondor_pool_reader from lapis.job_io.swf import swf_job_reader -from lapis.storage_io.storage_information import storage_reader +from lapis.storage_io.storage import storage_reader from lapis.scheduler import CondorJobScheduler from lapis.simulator import Simulator diff --git a/lapis/storage_io/storage.py b/lapis/storage_io/storage.py new file mode 100644 index 0000000..5829816 --- /dev/null +++ b/lapis/storage_io/storage.py @@ -0,0 +1,34 @@ +import csv +from lapis.storage import Storage + + +def storage_reader(storage, storage_content): + storage_content = storage_content_reader(storage_content) + reader = csv.DictReader(storage, delimiter=" ", quotechar="'") + for row in reader: + yield Storage( + name=row["name"], + sitename=row["sitename"], + storagesize=int(row["cachesizeMB"]), + content=storage_content[row["name"]], + ) + + +def storage_content_reader(file_name): + reader = csv.DictReader(file_name, delimiter=" ", quotechar="'") + cache_information = dict() + for row in reader: + if row["cachename"] not in cache_information.keys(): + cache_information[row["cachename"]] = dict() + cache_information[row["cachename"]][row["filename"]] = dict() + for key in [ + "filesize", + "usedsize", + "cachedsince", + "lastaccessed", + "numberofaccesses", + ]: + cache_information[row["cachename"]][row["filename"]][key] = int(row[key]) + if not cache_information: + cache_information = None + return cache_information From fb150db59e28ceb6ba6a19fe010094bc18ec6b85 Mon Sep 17 00:00:00 2001 From: tfesenbecker Date: Fri, 1 Nov 2019 12:32:46 +0100 Subject: [PATCH 08/29] fixed debug output --- lapis/job.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/lapis/job.py b/lapis/job.py index cb22ec0..9f1104a 100644 --- a/lapis/job.py +++ b/lapis/job.py @@ -102,11 +102,12 @@ async def run(self): self.in_queue_until = time.now self._success = None await sampling_required.put(self) - print( - "running job {} on site {} in drone {}".format( - repr(self), self.drone.sitename, repr(self.drone) + if self.drone: + print( + "running job {} on site {} in drone {}".format( + repr(self), self.drone.sitename, repr(self.drone) + ) ) - ) walltime = self.modified_walltime() try: await (time + walltime) From 69072ae7f155767949fae4cb0a7dc064e527379d Mon Sep 17 00:00:00 2001 From: tfesenbecker Date: Fri, 1 Nov 2019 12:34:18 +0100 Subject: [PATCH 09/29] renamed storage input reader --- lapis/storage_io/storage_information.py | 34 ------------------------- 1 file changed, 34 deletions(-) delete mode 100644 lapis/storage_io/storage_information.py diff --git a/lapis/storage_io/storage_information.py b/lapis/storage_io/storage_information.py deleted file mode 100644 index 5829816..0000000 --- a/lapis/storage_io/storage_information.py +++ /dev/null @@ -1,34 +0,0 @@ -import csv -from lapis.storage import Storage - - -def storage_reader(storage, storage_content): - storage_content = storage_content_reader(storage_content) - reader = csv.DictReader(storage, delimiter=" ", quotechar="'") - for row in reader: - yield Storage( - name=row["name"], - sitename=row["sitename"], - storagesize=int(row["cachesizeMB"]), - content=storage_content[row["name"]], - ) - - -def storage_content_reader(file_name): - reader = csv.DictReader(file_name, delimiter=" ", quotechar="'") - cache_information = dict() - for row in reader: - if row["cachename"] not in cache_information.keys(): - cache_information[row["cachename"]] = dict() - cache_information[row["cachename"]][row["filename"]] = dict() - for key in [ - "filesize", - "usedsize", - "cachedsince", - "lastaccessed", - "numberofaccesses", - ]: - cache_information[row["cachename"]][row["filename"]][key] = int(row[key]) - if not cache_information: - cache_information = None - return cache_information From 53ebec5ea004e4f031a0f70102276fe0102d62e4 Mon Sep 17 00:00:00 2001 From: tfesenbecker Date: Sat, 2 Nov 2019 12:49:55 +0100 Subject: [PATCH 10/29] updated Job class --- lapis/job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lapis/job.py b/lapis/job.py index 9f1104a..5803444 100644 --- a/lapis/job.py +++ b/lapis/job.py @@ -93,7 +93,7 @@ def waiting_time(self) -> float: return float("Inf") def modified_walltime(self): - if self.fileprovider.provides_all_files(self): + if self.fileprovider and self.fileprovider.provides_all_files(self): return walltime_models["maxeff"](self) else: return self.walltime From f9972234ddbcd5d073f51a085b70752146e356d1 Mon Sep 17 00:00:00 2001 From: tfesenbecker Date: Sat, 2 Nov 2019 16:41:21 +0100 Subject: [PATCH 11/29] replaced function modifying walltime by function with property decorator --- lapis/job.py | 14 +++++++------- lapis/utilities/walltime_models.py | 10 ++++++++-- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/lapis/job.py b/lapis/job.py index 5803444..d7c376c 100644 --- a/lapis/job.py +++ b/lapis/job.py @@ -15,7 +15,7 @@ class Job(object): __slots__ = ( "resources", "used_resources", - "walltime", + "_walltime", "requested_walltime", "queue_date", "inputfiles", @@ -60,7 +60,7 @@ def __init__( self.used_resources[key], ) self.resources[key] = self.used_resources[key] - self.walltime = used_resources.pop("walltime") + self._walltime = used_resources.pop("walltime") self.requested_walltime = resources.pop("walltime", None) self.inputfiles = resources.pop("inputfiles", None) self.queue_date = queue_date @@ -92,11 +92,12 @@ def waiting_time(self) -> float: return self.in_queue_until - self.in_queue_since return float("Inf") - def modified_walltime(self): + @property + def walltime(self): if self.fileprovider and self.fileprovider.provides_all_files(self): - return walltime_models["maxeff"](self) + return walltime_models["maxeff"](self, self._walltime) else: - return self.walltime + return self._walltime async def run(self): self.in_queue_until = time.now @@ -108,9 +109,8 @@ async def run(self): repr(self), self.drone.sitename, repr(self.drone) ) ) - walltime = self.modified_walltime() try: - await (time + walltime) + await (time + self.walltime) except CancelTask: self._success = False except BaseException: diff --git a/lapis/utilities/walltime_models.py b/lapis/utilities/walltime_models.py index d214c25..bd2083a 100644 --- a/lapis/utilities/walltime_models.py +++ b/lapis/utilities/walltime_models.py @@ -1,5 +1,11 @@ -def extrapolate_walltime_to_maximal_efficiency(job, maximal_efficiency: float = 0.8): - return (job.used_resources["cores"] / maximal_efficiency) * job.walltime +from lapis.job import Job + + +def extrapolate_walltime_to_maximal_efficiency( + job: Job, original_walltime, maximal_efficiency: float = 0.8 +): + + return (job.used_resources["cores"] / maximal_efficiency) * original_walltime # TODO: add models depending on fraction of cached files etc From 2e2c06fbf0856e6b9867d8097e231963bce234f5 Mon Sep 17 00:00:00 2001 From: tfesenbecker Date: Sat, 2 Nov 2019 16:45:25 +0100 Subject: [PATCH 12/29] Revert "replaced function modifying walltime by function with property decorator" This reverts commit f9972234ddbcd5d073f51a085b70752146e356d1. --- lapis/job.py | 14 +++++++------- lapis/utilities/walltime_models.py | 10 ++-------- 2 files changed, 9 insertions(+), 15 deletions(-) diff --git a/lapis/job.py b/lapis/job.py index d7c376c..5803444 100644 --- a/lapis/job.py +++ b/lapis/job.py @@ -15,7 +15,7 @@ class Job(object): __slots__ = ( "resources", "used_resources", - "_walltime", + "walltime", "requested_walltime", "queue_date", "inputfiles", @@ -60,7 +60,7 @@ def __init__( self.used_resources[key], ) self.resources[key] = self.used_resources[key] - self._walltime = used_resources.pop("walltime") + self.walltime = used_resources.pop("walltime") self.requested_walltime = resources.pop("walltime", None) self.inputfiles = resources.pop("inputfiles", None) self.queue_date = queue_date @@ -92,12 +92,11 @@ def waiting_time(self) -> float: return self.in_queue_until - self.in_queue_since return float("Inf") - @property - def walltime(self): + def modified_walltime(self): if self.fileprovider and self.fileprovider.provides_all_files(self): - return walltime_models["maxeff"](self, self._walltime) + return walltime_models["maxeff"](self) else: - return self._walltime + return self.walltime async def run(self): self.in_queue_until = time.now @@ -109,8 +108,9 @@ async def run(self): repr(self), self.drone.sitename, repr(self.drone) ) ) + walltime = self.modified_walltime() try: - await (time + self.walltime) + await (time + walltime) except CancelTask: self._success = False except BaseException: diff --git a/lapis/utilities/walltime_models.py b/lapis/utilities/walltime_models.py index bd2083a..d214c25 100644 --- a/lapis/utilities/walltime_models.py +++ b/lapis/utilities/walltime_models.py @@ -1,11 +1,5 @@ -from lapis.job import Job - - -def extrapolate_walltime_to_maximal_efficiency( - job: Job, original_walltime, maximal_efficiency: float = 0.8 -): - - return (job.used_resources["cores"] / maximal_efficiency) * original_walltime +def extrapolate_walltime_to_maximal_efficiency(job, maximal_efficiency: float = 0.8): + return (job.used_resources["cores"] / maximal_efficiency) * job.walltime # TODO: add models depending on fraction of cached files etc From 110b3e9354529ca86b17c7cc92c9f885bc43fe97 Mon Sep 17 00:00:00 2001 From: tfesenbecker Date: Sat, 2 Nov 2019 17:07:31 +0100 Subject: [PATCH 13/29] replaced function modifying walltime by function with property decorator --- lapis/job.py | 14 +++++++------- lapis/utilities/walltime_models.py | 6 ++++-- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/lapis/job.py b/lapis/job.py index 5803444..d7c376c 100644 --- a/lapis/job.py +++ b/lapis/job.py @@ -15,7 +15,7 @@ class Job(object): __slots__ = ( "resources", "used_resources", - "walltime", + "_walltime", "requested_walltime", "queue_date", "inputfiles", @@ -60,7 +60,7 @@ def __init__( self.used_resources[key], ) self.resources[key] = self.used_resources[key] - self.walltime = used_resources.pop("walltime") + self._walltime = used_resources.pop("walltime") self.requested_walltime = resources.pop("walltime", None) self.inputfiles = resources.pop("inputfiles", None) self.queue_date = queue_date @@ -92,11 +92,12 @@ def waiting_time(self) -> float: return self.in_queue_until - self.in_queue_since return float("Inf") - def modified_walltime(self): + @property + def walltime(self): if self.fileprovider and self.fileprovider.provides_all_files(self): - return walltime_models["maxeff"](self) + return walltime_models["maxeff"](self, self._walltime) else: - return self.walltime + return self._walltime async def run(self): self.in_queue_until = time.now @@ -108,9 +109,8 @@ async def run(self): repr(self), self.drone.sitename, repr(self.drone) ) ) - walltime = self.modified_walltime() try: - await (time + walltime) + await (time + self.walltime) except CancelTask: self._success = False except BaseException: diff --git a/lapis/utilities/walltime_models.py b/lapis/utilities/walltime_models.py index d214c25..b6e5dae 100644 --- a/lapis/utilities/walltime_models.py +++ b/lapis/utilities/walltime_models.py @@ -1,5 +1,7 @@ -def extrapolate_walltime_to_maximal_efficiency(job, maximal_efficiency: float = 0.8): - return (job.used_resources["cores"] / maximal_efficiency) * job.walltime +def extrapolate_walltime_to_maximal_efficiency( + job, original_walltime, maximal_efficiency: float = 0.8 +): + return (job.used_resources["cores"] / maximal_efficiency) * original_walltime # TODO: add models depending on fraction of cached files etc From b032a0d5f1199d9bab26e3b9a484dcb06f8a3c45 Mon Sep 17 00:00:00 2001 From: tfesenbecker Date: Sat, 2 Nov 2019 17:09:56 +0100 Subject: [PATCH 14/29] resolving PEP8 issues --- lapis/utilities/walltime_models.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lapis/utilities/walltime_models.py b/lapis/utilities/walltime_models.py index b6e5dae..7484b6a 100644 --- a/lapis/utilities/walltime_models.py +++ b/lapis/utilities/walltime_models.py @@ -1,3 +1,5 @@ + + def extrapolate_walltime_to_maximal_efficiency( job, original_walltime, maximal_efficiency: float = 0.8 ): From 5123034731a4d454a61dc02e04a8180c19c9236d Mon Sep 17 00:00:00 2001 From: tfesenbecker Date: Sat, 2 Nov 2019 20:56:46 +0100 Subject: [PATCH 15/29] fixed file provider bug (wrong inputfiles dictionary) --- lapis/file_provider.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lapis/file_provider.py b/lapis/file_provider.py index 044ae6b..49ff321 100644 --- a/lapis/file_provider.py +++ b/lapis/file_provider.py @@ -24,7 +24,7 @@ def provides_all_files(self, job): """ provided_storages = self.storages.get(job.drone.sitename, None) if provided_storages: - for inputfilename, inputfilespecs in job.inputfiles.items(): + for inputfilename, inputfilespecs in job.used_inputfiles.items(): provides_inputfile = 0 for storage in provided_storages: provides_inputfile += storage.provides_file( From 1c2fe9fb828ad445ade3da0fc91ebe022b3f4310 Mon Sep 17 00:00:00 2001 From: tfesenbecker <50665055+tfesenbecker@users.noreply.github.com> Date: Mon, 4 Nov 2019 09:23:18 +0100 Subject: [PATCH 16/29] Update lapis/cli/simulate.py Co-Authored-By: Eileen Kuehn --- lapis/cli/simulate.py | 1 - 1 file changed, 1 deletion(-) diff --git a/lapis/cli/simulate.py b/lapis/cli/simulate.py index 8c41848..85d6854 100644 --- a/lapis/cli/simulate.py +++ b/lapis/cli/simulate.py @@ -6,7 +6,6 @@ from lapis.controller import SimulatedLinearController from lapis.job_io.htcondor import htcondor_job_reader - from lapis.pool import StaticPool, Pool from lapis.pool_io.htcondor import htcondor_pool_reader from lapis.job_io.swf import swf_job_reader From 8739ce9d3110dbaef17638efa31bb93b20c99189 Mon Sep 17 00:00:00 2001 From: tfesenbecker Date: Mon, 4 Nov 2019 13:39:16 +0100 Subject: [PATCH 17/29] renamed function get_used_storage to _calculate_used_storage --- lapis/storage.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lapis/storage.py b/lapis/storage.py index fd69e23..9f11442 100644 --- a/lapis/storage.py +++ b/lapis/storage.py @@ -17,10 +17,10 @@ def __init__( self.sitename = sitename self.storagesize = storagesize self.content = content - self.usedstorage = self.get_used_storage() + self.usedstorage = self._calculate_used_storage() self.describe_state() - def get_used_storage(self): + def _calculate_used_storage(self): return sum(subdict["usedsize"] for subdict in self.content.values()) def free_space(self): @@ -34,7 +34,7 @@ def place_new_file(self, filerequest: tuple): self.content[filename].update( cachedsince=time.now, lastaccessed=time.now, numberofaccesses=0 ) - self.usedstorage = self.get_used_storage() + self.usedstorage = self._calculate_used_storage() def update_file(self, filerequest: tuple): filename, filespecs = filerequest @@ -47,7 +47,7 @@ def update_file(self, filerequest: tuple): self.content[requested_file]["usedsize"] += filesize_difference self.content[requested_file]["lastaccessed"] = time.now self.content[requested_file]["numberofaccesses"] += 1 - self.usedstorage = self.get_used_storage() + self.usedstorage = self._calculate_used_storage() def make_room(self, filesize_difference: int): if self.free_space() - filesize_difference < 0: From 855242aca7c2daac56b79f893390c848f882f791 Mon Sep 17 00:00:00 2001 From: tfesenbecker Date: Mon, 4 Nov 2019 16:52:38 +0100 Subject: [PATCH 18/29] attached fileprovider to drone instead of job and passed it via make_drone instead of scheduler --- lapis/drone.py | 2 ++ lapis/job.py | 6 +++--- lapis/scheduler.py | 4 +--- lapis/simulator.py | 8 ++------ 4 files changed, 8 insertions(+), 12 deletions(-) diff --git a/lapis/drone.py b/lapis/drone.py index a30c72d..7b9765a 100644 --- a/lapis/drone.py +++ b/lapis/drone.py @@ -12,6 +12,7 @@ class Drone(interfaces.Pool): def __init__( self, scheduler, + fileprovider, pool_resources: dict, scheduling_duration: float, ignore_resources: list = None, @@ -24,6 +25,7 @@ def __init__( """ super(Drone, self).__init__() self.scheduler = scheduler + self.fileprovider = fileprovider self.sitename = sitename self.pool_resources = pool_resources self.resources = Capacities(**pool_resources) diff --git a/lapis/job.py b/lapis/job.py index c918255..1e4f9e7 100644 --- a/lapis/job.py +++ b/lapis/job.py @@ -24,7 +24,6 @@ class Job(object): "in_queue_until", "_name", "drone", - "fileprovider", "_success", ) @@ -70,7 +69,6 @@ def __init__( self.in_queue_since = in_queue_since self.in_queue_until = None self.drone = drone - self.fileprovider = None self._name = name self._success: Optional[bool] = None @@ -96,7 +94,9 @@ def waiting_time(self) -> float: @property def walltime(self): - if self.fileprovider and self.fileprovider.provides_all_files(self): + if self.drone.fileprovider and self.drone.fileprovider.input_file_coverage( + self.drone.sitename, self.used_inputfiles + ): return walltime_models["maxeff"](self, self._walltime) else: return self._walltime diff --git a/lapis/scheduler.py b/lapis/scheduler.py index 50207d6..1dfa6cd 100644 --- a/lapis/scheduler.py +++ b/lapis/scheduler.py @@ -26,10 +26,9 @@ class CondorJobScheduler(object): :return: """ - def __init__(self, job_queue, fileprovider): + def __init__(self, job_queue): self._stream_queue = job_queue self.drone_cluster = [] - self.fileprovider = fileprovider self.interval = 60 self.job_queue = JobQueue() self._collecting = True @@ -91,7 +90,6 @@ async def run(self): for job in self.job_queue: best_match = self._schedule_job(job) if best_match: - job.fileprovider = self.fileprovider print( "start job {} on drone {} @ {}".format( repr(job), repr(best_match), time.now diff --git a/lapis/simulator.py b/lapis/simulator.py index 9b441b3..4c2c4d3 100644 --- a/lapis/simulator.py +++ b/lapis/simulator.py @@ -27,7 +27,6 @@ def __init__(self, seed=1234): random.seed(seed) self.job_queue = Queue() self.pools = [] - self.storage_list = [] self.fileprovider = FileProvider() self.controllers = [] self.job_scheduler = None @@ -56,7 +55,7 @@ def create_pools(self, pool_input, pool_reader, pool_type, controller=None): for pool in pool_reader( iterable=pool_input, pool_type=pool_type, - make_drone=partial(Drone, self.job_scheduler), + make_drone=partial(Drone, self.job_scheduler, self.fileprovider), ): self.pools.append(pool) if controller: @@ -66,13 +65,10 @@ def create_storage(self, storage_input, storage_content_input, storage_reader): for storage in storage_reader( storage=storage_input, storage_content=storage_content_input ): - self.storage_list.append(storage) self.fileprovider.add_storage_element(storage) def create_scheduler(self, scheduler_type): - self.job_scheduler = scheduler_type( - job_queue=self.job_queue, fileprovider=self.fileprovider - ) + self.job_scheduler = scheduler_type(job_queue=self.job_queue) def run(self, until=None): print(f"running until {until}") From bfadacbf1c899cf75d2d4b7cdc3423536c95f53d Mon Sep 17 00:00:00 2001 From: tfesenbecker Date: Mon, 4 Nov 2019 16:56:49 +0100 Subject: [PATCH 19/29] reworked file coverage function to return a score --- lapis/file_provider.py | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/lapis/file_provider.py b/lapis/file_provider.py index 49ff321..ccbd417 100644 --- a/lapis/file_provider.py +++ b/lapis/file_provider.py @@ -1,4 +1,5 @@ from lapis.storage import Storage +from typing import Optional class FileProvider(object): @@ -14,24 +15,25 @@ def add_storage_element(self, storage_element: Storage): except KeyError: self.storages[storage_element.sitename] = [storage_element] - def provides_all_files(self, job): + def input_file_coverage( + self, dronesite: str, requested_files: Optional[dict] = None + ) -> float: """ - Dummy implementation, to be replaced: if a part of every inputfile of the job is - provided by a storage element located on the same site as the drone the job - is running on this function returns True - :param job: + Dummy implementation, to be replaced + + :param requested_files: + :param dronesite: :return: """ - provided_storages = self.storages.get(job.drone.sitename, None) + provided_storages = self.storages.get(dronesite, None) if provided_storages: - for inputfilename, inputfilespecs in job.used_inputfiles.items(): - provides_inputfile = 0 + provides_inputfile = [] + for inputfilename, inputfilespecs in requested_files.items(): + provides_inputfile.append(0) for storage in provided_storages: - provides_inputfile += storage.provides_file( + provides_inputfile[-1] += storage.provides_file( (inputfilename, inputfilespecs) ) - if not provides_inputfile: - return False - return True + return 1 - provided_storages.count(0) / len(provided_storages) else: - return False + return 0 From 3f30c58e02a56345425161e78204833bb8b33cce Mon Sep 17 00:00:00 2001 From: tfesenbecker Date: Mon, 4 Nov 2019 16:58:02 +0100 Subject: [PATCH 20/29] added proper __repr__ function --- lapis/storage.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/lapis/storage.py b/lapis/storage.py index 9f11442..4c67c48 100644 --- a/lapis/storage.py +++ b/lapis/storage.py @@ -18,7 +18,8 @@ def __init__( self.storagesize = storagesize self.content = content self.usedstorage = self._calculate_used_storage() - self.describe_state() + self.__repr__() + print(self.sitename) def _calculate_used_storage(self): return sum(subdict["usedsize"] for subdict in self.content.values()) @@ -26,19 +27,18 @@ def _calculate_used_storage(self): def free_space(self): return self.storagesize - self.usedstorage - def place_new_file(self, filerequest: tuple): - filename, filespecs = filerequest + def add_file(self, filename: str, filespecs: tuple): + assert filename not in self.content.keys() if self.free_space() - filespecs["usedsize"] < 0: self.make_room(self.free_space() - filespecs["usedsize"]) - self.content.update({filename: filespecs}) + self.content[filename] = filespecs self.content[filename].update( cachedsince=time.now, lastaccessed=time.now, numberofaccesses=0 ) self.usedstorage = self._calculate_used_storage() def update_file(self, filerequest: tuple): - filename, filespecs = filerequest - requested_file = filename + requested_file, filespecs = filerequest filesize_difference = ( filespecs["usedsize"] - self.content[requested_file]["usedsize"] ) @@ -60,15 +60,15 @@ def provides_file(self, filerequest: dict): return True else: if self.cache_file(): - self.place_new_file(filerequest) + self.add_file(filename, filespecs) return False def cache_file(self): # cache everything, test different implementations return cache_algorithm["standard"]() - def describe_state(self): - print( + def __repr__(self): + return ( "{name} on site {site}: {used}MB of {tot}MB used ({div} %), contains " "files {filelist}".format( name=self.name, From 2b214aa1959b7b8ad2294e8f07b044c355d3e415 Mon Sep 17 00:00:00 2001 From: tfesenbecker Date: Thu, 7 Nov 2019 14:33:40 +0100 Subject: [PATCH 21/29] added file classes --- lapis/files.py | 60 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) create mode 100644 lapis/files.py diff --git a/lapis/files.py b/lapis/files.py new file mode 100644 index 0000000..0bb8cd7 --- /dev/null +++ b/lapis/files.py @@ -0,0 +1,60 @@ +from typing import Optional + + +class StoredFile(object): + def __init__(self, filename, filespecs): + self.filename = filename + self._filesize: Optional[int] = filespecs.get("filesize", None) + self._storedsize: Optional[int] = filespecs.get("storedsize", self._filesize) + self._cachedsince: Optional[int] = filespecs.get("cachedsince", None) + self._lastaccessed: Optional[int] = filespecs.get("lastaccessed", None) + self._numberofaccesses: int = filespecs.get("numberofaccesses", 0) + + @property + def filesize(self): + return self._filesize + + @property + def cachedsince(self): + return self._cachedsince + + @property + def lastaccessed(self): + return self._lastaccessed + + @property + def numberofaccesses(self): + return self._numberofaccesses + + @cachedsince.setter + def cachedsince(self, value: int): + self._cachedsince = value + + @lastaccessed.setter + def lastaccessed(self, value: int): + self._lastaccessed = value + + def increment_accesses(self): + self._numberofaccesses += 1 + + +class RequestedFile(object): + def __init__(self, filename, filespecs): + self.filename: str = filename + self._filesize: Optional[int] = filespecs.get("filesize", None) + # self._requestedsize: Optional[int] = filespecs.get("requestedsize", None) + # self._added: Optional[int] = filespecs.get("requestedsize", None) + + @property + def filesize(self): + return self._filesize + + def convert_to_stored_file(self, currenttime): + filespecs = dict( + filesize=self._filesize, + cachedsince=currenttime, + lastaccessed=currenttime, + numberofaccesses=1, + ) + print("convert file: ", filespecs) + return StoredFile(self.filename, filespecs) From 2bd91d73be6deb0c73353317f28c2783c2000203 Mon Sep 17 00:00:00 2001 From: tfesenbecker Date: Thu, 7 Nov 2019 14:34:51 +0100 Subject: [PATCH 22/29] moved caching algorithm and associated cache cleanup to it's own class --- lapis/cachealgorithm.py | 52 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100644 lapis/cachealgorithm.py diff --git a/lapis/cachealgorithm.py b/lapis/cachealgorithm.py new file mode 100644 index 0000000..6064de5 --- /dev/null +++ b/lapis/cachealgorithm.py @@ -0,0 +1,52 @@ +from typing import Optional, Set +from lapis.files import RequestedFile +from lapis.utilities.cache_algorithm_implementations import cache_algorithm +from lapis.utilities.cache_cleanup_implementations import sort_files_by_cachedsince + + +class CacheAlgorithm(object): + def __init__(self, storage, additional_information: Optional[str] = None): + self._storage = storage + self._additional_information = additional_information + + def _file_based_consideration(self, candidate: RequestedFile) -> bool: + """ + File based caching decision: Checks if candidate file should be cached based on + conditions that apply to + file itself without considering the caches overall state. + :param candidate: + :return: + """ + if self._storage.storagesize > candidate.filesize: + return cache_algorithm["standard"](candidate) + else: + return False + + def _context_based_consideration(self, candidate: RequestedFile): + """ + Caching decision based on the the overall context + :param candidate: + :return: + """ + to_be_removed = set() + sorted_stored_files = sort_files_by_cachedsince(self._storage.files) + current_free_storage = self._storage.free_space() + for stored_file in sorted_stored_files: + if stored_file.numberofaccesses < 3: + to_be_removed.add(stored_file) + current_free_storage += stored_file.filesize + if current_free_storage >= candidate.filesize: + return to_be_removed + else: + continue + if current_free_storage >= candidate.filesize: + return {candidate} + + def consider(self, candidate: RequestedFile) -> Optional[Set[RequestedFile]]: + if self._file_based_consideration(candidate): + if self._storage.free_space() < candidate.filesize: + return self._context_based_consideration(candidate) + else: + return {} + else: + return {candidate} From 29576eb6757a3368b9d27c1bb7482fa0d03e3e70 Mon Sep 17 00:00:00 2001 From: tfesenbecker Date: Thu, 7 Nov 2019 14:37:44 +0100 Subject: [PATCH 23/29] Redesign of the storage class and associated changes --- lapis/file_provider.py | 12 +- lapis/job.py | 15 +- lapis/storage.py | 134 ++++++++++++------ lapis/storage_io/storage.py | 2 +- .../cache_algorithm_implementations.py | 2 +- .../cache_cleanup_implementations.py | 58 +++++--- 6 files changed, 145 insertions(+), 78 deletions(-) diff --git a/lapis/file_provider.py b/lapis/file_provider.py index ccbd417..b66e2bc 100644 --- a/lapis/file_provider.py +++ b/lapis/file_provider.py @@ -1,4 +1,5 @@ from lapis.storage import Storage +from lapis.files import RequestedFile from typing import Optional @@ -15,8 +16,8 @@ def add_storage_element(self, storage_element: Storage): except KeyError: self.storages[storage_element.sitename] = [storage_element] - def input_file_coverage( - self, dronesite: str, requested_files: Optional[dict] = None + async def input_file_coverage( + self, dronesite: str, requested_files: Optional[dict] = None, job_repr=None ) -> float: """ Dummy implementation, to be replaced @@ -25,15 +26,18 @@ def input_file_coverage( :param dronesite: :return: """ + print("FILEPROVIDER hit input file coverage") + provided_storages = self.storages.get(dronesite, None) if provided_storages: provides_inputfile = [] for inputfilename, inputfilespecs in requested_files.items(): provides_inputfile.append(0) for storage in provided_storages: - provides_inputfile[-1] += storage.provides_file( - (inputfilename, inputfilespecs) + provides_inputfile[-1] += await storage.providing_file( + RequestedFile(inputfilename, inputfilespecs), job_repr ) + return 1 - provided_storages.count(0) / len(provided_storages) else: return 0 diff --git a/lapis/job.py b/lapis/job.py index 1e4f9e7..023234c 100644 --- a/lapis/job.py +++ b/lapis/job.py @@ -93,9 +93,14 @@ def waiting_time(self) -> float: return float("Inf") @property - def walltime(self): - if self.drone.fileprovider and self.drone.fileprovider.input_file_coverage( - self.drone.sitename, self.used_inputfiles + async def walltime(self): + print("DEBUG JOB hit walltime") + # TODO: reimplement that usedsize != filesize and change back to used_inputfiles + if ( + self.drone.fileprovider + and await self.drone.fileprovider.input_file_coverage( + self.drone.sitename, self.requested_inputfiles, repr(self) + ) ): return walltime_models["maxeff"](self, self._walltime) else: @@ -112,7 +117,7 @@ async def run(self): ) ) try: - await (time + self.walltime) + await (time + await self.walltime) except CancelTask: self._success = False except BaseException: @@ -123,7 +128,7 @@ async def run(self): await sampling_required.put(self) def __repr__(self): - return "<%s: %s>" % (self.__class__.__name__, self._name or id(self)) + return "<%s: %s>" % (self.__class__.__name__, self._name or id(self) % 100) async def job_to_queue_scheduler(job_generator, job_queue): diff --git a/lapis/storage.py b/lapis/storage.py index 4c67c48..0147811 100644 --- a/lapis/storage.py +++ b/lapis/storage.py @@ -1,72 +1,118 @@ -from usim import time +from usim import time, Resources from typing import Optional -from lapis.utilities.cache_algorithm_implementations import cache_algorithm -from lapis.utilities.cache_cleanup_implementations import cache_cleanup +from lapis.files import StoredFile, RequestedFile +from lapis.cachealgorithm import CacheAlgorithm class Storage(object): - __slots__ = ("name", "sitename", "storagesize", "usedstorage", "content") + __slots__ = ( + "name", + "sitename", + "storagesize", + "placement_duration", + "deletion_duration", + "_usedstorage", + "files", + "filenames", + "cachealgorithm", + ) def __init__( - self, name: str, sitename: str, storagesize: int, content: Optional[dict] = None + self, name: str, sitename: str, storagesize: int, files: Optional[dict] = None ): self.name = name self.sitename = sitename + self.placement_duration = 10 + self.deletion_duration = 5 self.storagesize = storagesize - self.content = content - self.usedstorage = self._calculate_used_storage() + self.files = self._dict_to_file_object(files) + self.filenames = set(file.filename for file in self.files) + self._usedstorage = Resources(**dict(usedsize=self._initial_used_storage())) + self.cachealgorithm = CacheAlgorithm(self) self.__repr__() - print(self.sitename) - def _calculate_used_storage(self): - return sum(subdict["usedsize"] for subdict in self.content.values()) + def _initial_used_storage(self): + initial_value = sum(file.filesize for file in self.files) + print("{} set initial value {}".format(self.name, initial_value)) + return initial_value + + def _dict_to_file_object(self, files): + files_set = set() + for filename, filespecs in files.items(): + files_set.add(StoredFile(filename, filespecs)) + return files_set + + @property + def usedstorage(self): + return dict(self._usedstorage.levels)["usedsize"] def free_space(self): return self.storagesize - self.usedstorage - def add_file(self, filename: str, filespecs: tuple): - assert filename not in self.content.keys() - if self.free_space() - filespecs["usedsize"] < 0: - self.make_room(self.free_space() - filespecs["usedsize"]) - self.content[filename] = filespecs - self.content[filename].update( - cachedsince=time.now, lastaccessed=time.now, numberofaccesses=0 + def find_file(self, filename): + for file in self.files: + if file.filename == filename: + return file + + async def remove_from_storage(self, file: StoredFile, job_repr): + print( + "REMOVE FROM STORAGE: Job {}, File {} @ {}".format( + job_repr, file.filename, time.now + ) ) - self.usedstorage = self._calculate_used_storage() + await (time + self.deletion_duration) + await self._usedstorage.decrease(**{"usedsize": file.filesize}) + self.filenames.remove(file.filename) + self.files.remove(file) - def update_file(self, filerequest: tuple): - requested_file, filespecs = filerequest - filesize_difference = ( - filespecs["usedsize"] - self.content[requested_file]["usedsize"] + async def add_to_storage(self, file: RequestedFile, job_repr): + print( + "ADD TO STORAGE: Job {}, File {} @ {}".format( + job_repr, file.filename, time.now + ) ) - if filesize_difference > 0: - self.make_room(filesize_difference) - self.content[requested_file]["usedsize"] += filesize_difference - self.content[requested_file]["lastaccessed"] = time.now - self.content[requested_file]["numberofaccesses"] += 1 - self.usedstorage = self._calculate_used_storage() - - def make_room(self, filesize_difference: int): - if self.free_space() - filesize_difference < 0: - cache_cleanup["fifo"](filesize_difference, self) - - def provides_file(self, filerequest: dict): - filename, filespecs = filerequest - if filename in self.content.keys(): - self.update_file(filerequest) + file = file.convert_to_stored_file(time.now) + await (time + self.placement_duration) + await self._usedstorage.increase(**{"usedsize": file.filesize}) + self.filenames.add(file.filename) + self.files.add(file) + + def update_file(self, stored_file: StoredFile, job_repr): + print("UPDATE: Job {}, File {}".format(job_repr, stored_file.filename)) + stored_file.lastaccessed = time.now + stored_file.increment_accesses() + + async def apply_caching_decision(self, requested_file: RequestedFile, job_repr): + print( + "APPLY CACHING DECISION: Job {}, File {} @ {}".format( + job_repr, requested_file.filename, time.now + ) + ) + to_be_removed = self.cachealgorithm.consider(requested_file) + if not to_be_removed: + await self.add_to_storage(requested_file, job_repr) + elif to_be_removed == {requested_file}: + # file will not be cached because it either does not match + # conditions or because there is no space in the cache + print( + "APPLY CACHING DECISION: Job {}, File {}: File wasnt " + "cached @ {}".format(job_repr, requested_file.filename, time.now) + ) + else: + for file in to_be_removed: + await self.remove_from_storage(file, job_repr) + + async def providing_file(self, requested_file: RequestedFile, job_repr): + if requested_file.filename in self.filenames: + self.update_file(self.find_file(requested_file.filename), job_repr) return True else: - if self.cache_file(): - self.add_file(filename, filespecs) + await self.apply_caching_decision(requested_file, job_repr) return False - def cache_file(self): - # cache everything, test different implementations - return cache_algorithm["standard"]() - def __repr__(self): return ( "{name} on site {site}: {used}MB of {tot}MB used ({div} %), contains " @@ -76,6 +122,6 @@ def __repr__(self): used=self.usedstorage, tot=self.storagesize, div=100.0 * self.usedstorage / self.storagesize, - filelist=", ".join(self.content.keys()), + filelist=", ".join(self.filenames), ) ) diff --git a/lapis/storage_io/storage.py b/lapis/storage_io/storage.py index 5829816..88a0ed6 100644 --- a/lapis/storage_io/storage.py +++ b/lapis/storage_io/storage.py @@ -10,7 +10,7 @@ def storage_reader(storage, storage_content): name=row["name"], sitename=row["sitename"], storagesize=int(row["cachesizeMB"]), - content=storage_content[row["name"]], + files=storage_content[row["name"]], ) diff --git a/lapis/utilities/cache_algorithm_implementations.py b/lapis/utilities/cache_algorithm_implementations.py index 23b1ff0..468ed0d 100644 --- a/lapis/utilities/cache_algorithm_implementations.py +++ b/lapis/utilities/cache_algorithm_implementations.py @@ -1,4 +1,4 @@ -def cache_all(): +def cache_all(*args, **kwargs): return True diff --git a/lapis/utilities/cache_cleanup_implementations.py b/lapis/utilities/cache_cleanup_implementations.py index 90c4e6e..19da640 100644 --- a/lapis/utilities/cache_cleanup_implementations.py +++ b/lapis/utilities/cache_cleanup_implementations.py @@ -1,25 +1,37 @@ -def fifo(size, storage): - # FIFO, test different implementations - sorted_content = sorted( - list(storage.content.items()), key=lambda x: x[1]["cachedsince"] - ) - while size < 0: - size += sorted_content[0][1]["cachedsizeMB"] - storage.content.pop(sorted_content[0][0]) - storage.usedstorage -= sorted_content[0][1]["cachedsizeMB"] - sorted_content.pop(0) +def sort_files_by_cachedsince(stored_files: set): + return sorted(list(stored_files), key=lambda x: x.cachedsince) -def last_accessed(size, storage): - # FIFO, test different implementations - sorted_content = sorted( - list(storage.content.items()), key=lambda x: x[1]["lastaccessed"] - ) - while size < 0: - size += sorted_content[0][1]["cachedsizeMB"] - storage.content.pop(sorted_content[0][0]) - storage.usedstorage -= sorted_content[0][1]["cachedsizeMB"] - sorted_content.pop(0) - - -cache_cleanup = {"fifo": fifo, "lastaccessed": last_accessed} +# async def fifo(size, storage): +# print("hit fifo") +# print(storage.files.keys()) +# # FIFO, test different implementations +# sorted_content = sorted( +# list(storage.files.items()), key=lambda x: x.filespecs.cachedsince +# ) +# print("sorted", sorted_content) +# while size < 0: +# print("hit while") +# size += sorted_content[0][1]["cachedsizeMB"] +# storage.files.pop(sorted_content[0][0]) +# await sleep(storage.placement_duration) +# await storage._usedstorage.decrease( +# **{"usedsize": sorted_content[0][1]["cachedsizeMB"]}) +# print(storage.usedstorage) +# sorted_content.pop(0) +# print("after fifo ", storage.files.keys()) +# +# +# def last_accessed(size, storage): +# # FIFO, test different implementations +# sorted_content = sorted( +# list(storage.content.items()), key=lambda x: x[1]["lastaccessed"] +# ) +# while size < 0: +# size += sorted_content[0][1]["cachedsizeMB"] +# storage.content.pop(sorted_content[0][0]) +# storage.usedstorage -= sorted_content[0][1]["cachedsizeMB"] +# sorted_content.pop(0) +# +# +# cache_cleanup = {"fifo": fifo, "lastaccessed": last_accessed} From 146fbe30046d5c5995ea7e3bf933a78be95ca9d6 Mon Sep 17 00:00:00 2001 From: tfesenbecker Date: Thu, 7 Nov 2019 14:46:45 +0100 Subject: [PATCH 24/29] put walltime getter and walltime recalculation back in seperate methods --- lapis/job.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/lapis/job.py b/lapis/job.py index 1e4f9e7..5f97440 100644 --- a/lapis/job.py +++ b/lapis/job.py @@ -93,13 +93,17 @@ def waiting_time(self) -> float: return float("Inf") @property - def walltime(self): + def walltime(self) -> float: + """ + :return: Time that passes while job is running + """ + return self._walltime + + def recalculate_walltime(self): if self.drone.fileprovider and self.drone.fileprovider.input_file_coverage( self.drone.sitename, self.used_inputfiles ): - return walltime_models["maxeff"](self, self._walltime) - else: - return self._walltime + self._walltime = walltime_models["maxeff"](self, self._walltime) async def run(self): self.in_queue_until = time.now @@ -112,7 +116,8 @@ async def run(self): ) ) try: - await (time + self.walltime) + self.recalculate_walltime() + await (time + self._walltime) except CancelTask: self._success = False except BaseException: From 7ef8dd9e019c540f66dd2fdc69800fc8d84bdb65 Mon Sep 17 00:00:00 2001 From: tfesenbecker Date: Thu, 7 Nov 2019 19:59:03 +0100 Subject: [PATCH 25/29] added parallel treatment of jobs input files in file provider --- lapis/file_provider.py | 56 ++++++++++++++++++++++++++++++++++++------ lapis/files.py | 4 +-- 2 files changed, 49 insertions(+), 11 deletions(-) diff --git a/lapis/file_provider.py b/lapis/file_provider.py index b66e2bc..51133b9 100644 --- a/lapis/file_provider.py +++ b/lapis/file_provider.py @@ -1,6 +1,7 @@ from lapis.storage import Storage from lapis.files import RequestedFile from typing import Optional +from usim import Queue, Scope class FileProvider(object): @@ -11,6 +12,11 @@ def __init__(self): self.storages = dict() def add_storage_element(self, storage_element: Storage): + """ + Register storage element in FileProvider clustering storage elements by sitename + :param storage_element: + :return: + """ try: self.storages[storage_element.sitename].append(storage_element) except KeyError: @@ -30,14 +36,48 @@ async def input_file_coverage( provided_storages = self.storages.get(dronesite, None) if provided_storages: - provides_inputfile = [] - for inputfilename, inputfilespecs in requested_files.items(): - provides_inputfile.append(0) - for storage in provided_storages: - provides_inputfile[-1] += await storage.providing_file( - RequestedFile(inputfilename, inputfilespecs), job_repr + score_queue = Queue() + async with Scope() as scope: + for inputfilename, inputfilespecs in requested_files.items(): + scope.do( + self.look_file_up_in_storage( + RequestedFile(inputfilename, inputfilespecs), + provided_storages, + job_repr, + score_queue, + ) ) - - return 1 - provided_storages.count(0) / len(provided_storages) + await score_queue.close() + total_score = await self.calculate_score(score_queue) + return total_score / len(provided_storages) else: return 0 + + async def look_file_up_in_storage( + self, requested_file: RequestedFile, available_storages: list, job_repr, q + ): + """ + Calculates how many storages provide the requested file, puts result in queue + for readout. + :param requested_file: + :param available_storages: + :param job_repr: + :param q: + :return: + """ + file_score = sum( + [ + await storage.providing_file(requested_file, job_repr) + for storage in available_storages + ] + ) + await q.put({requested_file.filename: file_score}) + + async def calculate_score(self, queue: Queue): + """ + Reads each input files individual score from queue and returns number of input + files that are provided by a storage element. + :param queue: + :return: + """ + return sum([1 async for element in queue if list(element.values())[0] > 0]) diff --git a/lapis/files.py b/lapis/files.py index 0bb8cd7..65960f4 100644 --- a/lapis/files.py +++ b/lapis/files.py @@ -39,11 +39,9 @@ def increment_accesses(self): class RequestedFile(object): - def __init__(self, filename, filespecs): + def __init__(self, filename: str, filespecs: dict): self.filename: str = filename self._filesize: Optional[int] = filespecs.get("filesize", None) - # self._requestedsize: Optional[int] = filespecs.get("requestedsize", None) - # self._added: Optional[int] = filespecs.get("requestedsize", None) @property def filesize(self): From b94ab8220597957974dd1c461d99f3b099bc2141 Mon Sep 17 00:00:00 2001 From: tfesenbecker Date: Thu, 7 Nov 2019 20:08:33 +0100 Subject: [PATCH 26/29] fixed failed unit test that were caused by Drone without file provider, one unit test fails because job has no drone, ticket already exists --- lapis/drone.py | 8 +++++--- lapis_tests/test_job.py | 2 ++ 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/lapis/drone.py b/lapis/drone.py index 7b9765a..87441eb 100644 --- a/lapis/drone.py +++ b/lapis/drone.py @@ -1,7 +1,9 @@ from cobald import interfaces from usim import time, Scope, instant, Capacities, ResourcesUnavailable +from typing import Optional from lapis.job import Job +from lapis.file_provider import FileProvider class ResourcesExceeded(Exception): @@ -12,9 +14,9 @@ class Drone(interfaces.Pool): def __init__( self, scheduler, - fileprovider, - pool_resources: dict, - scheduling_duration: float, + fileprovider: FileProvider = FileProvider(), + pool_resources: Optional[dict] = None, + scheduling_duration: Optional[float] = None, ignore_resources: list = None, sitename: str = None, ): diff --git a/lapis_tests/test_job.py b/lapis_tests/test_job.py index 13b4090..46dab2c 100644 --- a/lapis_tests/test_job.py +++ b/lapis_tests/test_job.py @@ -3,6 +3,7 @@ from lapis.drone import Drone from lapis.job import Job +from lapis.file_provider import FileProvider from lapis_tests import via_usim, DummyScheduler @@ -46,6 +47,7 @@ async def test_job_in_drone(self): scheduler=scheduler, pool_resources={"cores": 1, "memory": 1}, scheduling_duration=0, + fileprovider=FileProvider(), ) async with Scope() as scope: scope.do(drone.start_job(job=job)) From 191df2bbcfdd6caaba44f57a1298f88881755a05 Mon Sep 17 00:00:00 2001 From: tfesenbecker Date: Fri, 8 Nov 2019 09:35:44 +0100 Subject: [PATCH 27/29] changed scoring to take filesizes into consideration --- lapis/file_provider.py | 35 +++++++++++++++++++++++++++-------- lapis/files.py | 1 - lapis/job.py | 2 +- lapis/scheduler.py | 1 + lapis/storage.py | 11 ++++++++--- 5 files changed, 37 insertions(+), 13 deletions(-) diff --git a/lapis/file_provider.py b/lapis/file_provider.py index 51133b9..d2bc961 100644 --- a/lapis/file_provider.py +++ b/lapis/file_provider.py @@ -1,7 +1,7 @@ from lapis.storage import Storage from lapis.files import RequestedFile from typing import Optional -from usim import Queue, Scope +from usim import Queue, Scope, time class FileProvider(object): @@ -48,8 +48,16 @@ async def input_file_coverage( ) ) await score_queue.close() - total_score = await self.calculate_score(score_queue) - return total_score / len(provided_storages) + cached_size = await self.calculate_score(score_queue) + total_size = float( + sum( + [ + inputfilespecs["filesize"] + for _, inputfilespecs in requested_files.items() + ] + ) + ) + return cached_size / total_size else: return 0 @@ -65,13 +73,18 @@ async def look_file_up_in_storage( :param q: :return: """ - file_score = sum( + print( + "LOOK UP: Job {}, File {} @ {}".format( + job_repr, requested_file.filename, time.now + ) + ) + file_score = sorted( [ - await storage.providing_file(requested_file, job_repr) + int(await storage.providing_file(requested_file, job_repr)) for storage in available_storages ] - ) - await q.put({requested_file.filename: file_score}) + )[0] + await q.put({requested_file: file_score}) async def calculate_score(self, queue: Queue): """ @@ -80,4 +93,10 @@ async def calculate_score(self, queue: Queue): :param queue: :return: """ - return sum([1 async for element in queue if list(element.values())[0] > 0]) + return sum( + [ + list(element.keys())[0].filesize + async for element in queue + if list(element.values())[0] > 0 + ] + ) diff --git a/lapis/files.py b/lapis/files.py index 65960f4..15934bb 100644 --- a/lapis/files.py +++ b/lapis/files.py @@ -54,5 +54,4 @@ def convert_to_stored_file(self, currenttime): lastaccessed=currenttime, numberofaccesses=1, ) - print("convert file: ", filespecs) return StoredFile(self.filename, filespecs) diff --git a/lapis/job.py b/lapis/job.py index 023234c..b7a4711 100644 --- a/lapis/job.py +++ b/lapis/job.py @@ -94,7 +94,7 @@ def waiting_time(self) -> float: @property async def walltime(self): - print("DEBUG JOB hit walltime") + print("WALLTIME: Job {}".format(repr(self))) # TODO: reimplement that usedsize != filesize and change back to used_inputfiles if ( self.drone.fileprovider diff --git a/lapis/scheduler.py b/lapis/scheduler.py index 1dfa6cd..69197ae 100644 --- a/lapis/scheduler.py +++ b/lapis/scheduler.py @@ -87,6 +87,7 @@ async def run(self): async with Scope() as scope: scope.do(self._collect_jobs()) async for _ in interval(self.interval): + print("NEW SCHEDULING INTERVAL @ {}".format(time.now)) for job in self.job_queue: best_match = self._schedule_job(job) if best_match: diff --git a/lapis/storage.py b/lapis/storage.py index 0147811..4cacfd4 100644 --- a/lapis/storage.py +++ b/lapis/storage.py @@ -80,10 +80,15 @@ async def add_to_storage(self, file: RequestedFile, job_repr): self.filenames.add(file.filename) self.files.add(file) - def update_file(self, stored_file: StoredFile, job_repr): - print("UPDATE: Job {}, File {}".format(job_repr, stored_file.filename)) + async def update_file(self, stored_file: StoredFile, job_repr): + await (time + 1) stored_file.lastaccessed = time.now stored_file.increment_accesses() + print( + "UPDATE: Job {}, File {} @ {}".format( + job_repr, stored_file.filename, time.now + ) + ) async def apply_caching_decision(self, requested_file: RequestedFile, job_repr): print( @@ -107,7 +112,7 @@ async def apply_caching_decision(self, requested_file: RequestedFile, job_repr): async def providing_file(self, requested_file: RequestedFile, job_repr): if requested_file.filename in self.filenames: - self.update_file(self.find_file(requested_file.filename), job_repr) + await self.update_file(self.find_file(requested_file.filename), job_repr) return True else: await self.apply_caching_decision(requested_file, job_repr) From 75165adc9a0b73e76e695d8c653ca8e66c4f06cd Mon Sep 17 00:00:00 2001 From: tfesenbecker Date: Fri, 8 Nov 2019 09:55:59 +0100 Subject: [PATCH 28/29] fixed bug from merge --- lapis/job.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/lapis/job.py b/lapis/job.py index 2dac98f..3699858 100644 --- a/lapis/job.py +++ b/lapis/job.py @@ -102,8 +102,11 @@ def walltime(self) -> float: async def recalculate_walltime(self): print("WALLTIME: Job {}".format(repr(self))) - if self.drone.fileprovider and await self.drone.fileprovider.input_file_coverage( - self.drone.sitename, self.used_inputfiles, repr(self) + if ( + self.drone.fileprovider + and await self.drone.fileprovider.input_file_coverage( + self.drone.sitename, self.requested_inputfiles, repr(self) + ) ): self._walltime = walltime_models["maxeff"](self, self._walltime) From 32faa38d5ee40bdb37bb846c7bcc077a099ad94d Mon Sep 17 00:00:00 2001 From: tfesenbecker Date: Fri, 8 Nov 2019 10:13:47 +0100 Subject: [PATCH 29/29] removed debug output to fix unit test --- lapis/job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lapis/job.py b/lapis/job.py index 3699858..ee75afe 100644 --- a/lapis/job.py +++ b/lapis/job.py @@ -133,7 +133,7 @@ async def run(self): await sampling_required.put(self) def __repr__(self): - return "<%s: %s>" % (self.__class__.__name__, self._name or id(self) % 100) + return "<%s: %s>" % (self.__class__.__name__, self._name or id(self)) async def job_to_queue_scheduler(job_generator, job_queue):