Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Status quo caching extension #53

Merged
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
9215d64
added storage object to represent caches and associated readout funct…
tfesenbecker Oct 31, 2019
0a46da7
extended CLI to support storage files
tfesenbecker Oct 31, 2019
e727b69
extended simulator to support storage files
tfesenbecker Oct 31, 2019
d64954d
added new drone attribute sitename connecting drones and storage elem…
tfesenbecker Oct 31, 2019
5597434
added file provider object connecting storage objects and jobs
tfesenbecker Oct 31, 2019
bb1dcbe
added different caching/cache cleaning/walltime recalculation algorithms
tfesenbecker Oct 31, 2019
8d6db96
renamed storage readout
tfesenbecker Oct 31, 2019
fb150db
fixed debug output
tfesenbecker Nov 1, 2019
69072ae
renamed storage input reader
tfesenbecker Nov 1, 2019
53ebec5
updated Job class
tfesenbecker Nov 2, 2019
f997223
replaced function modifying walltime by function with property decorator
tfesenbecker Nov 2, 2019
2e2c06f
Revert "replaced function modifying walltime by function with propert…
tfesenbecker Nov 2, 2019
110b3e9
replaced function modifying walltime by function with property decorator
tfesenbecker Nov 2, 2019
b032a0d
resolving PEP8 issues
tfesenbecker Nov 2, 2019
7753d0d
Merge branch 'master' of https://github.com/MatterMiners/lapis into c…
tfesenbecker Nov 2, 2019
5123034
fixed file provider bug (wrong inputfiles dictionary)
tfesenbecker Nov 2, 2019
1c2fe9f
Update lapis/cli/simulate.py
tfesenbecker Nov 4, 2019
8739ce9
renamed function get_used_storage to _calculate_used_storage
tfesenbecker Nov 4, 2019
0b5a922
Merge branch 'cachingextension' of https://github.com/tfesenbecker/la…
tfesenbecker Nov 4, 2019
855242a
attached fileprovider to drone instead of job and passed it via make_…
tfesenbecker Nov 4, 2019
bfadacb
reworked file coverage function to return a score
tfesenbecker Nov 4, 2019
3f30c58
added proper __repr__ function
tfesenbecker Nov 4, 2019
2b214aa
added file classes
tfesenbecker Nov 7, 2019
2bd91d7
moved caching algorithm and associated cache cleanup to it's own class
tfesenbecker Nov 7, 2019
29576eb
Redesign of the storage class and associated changes
tfesenbecker Nov 7, 2019
146fbe3
put walltime getter and walltime recalculation back in seperate methods
tfesenbecker Nov 7, 2019
7ef8dd9
added parallel treatment of jobs input files in file provider
tfesenbecker Nov 7, 2019
b94ab82
fixed failed unit test that were caused by Drone without file provide…
tfesenbecker Nov 7, 2019
1e9e795
Merge branch 'master' of https://github.com/MatterMiners/lapis into c…
tfesenbecker Nov 7, 2019
191df2b
changed scoring to take filesizes into consideration
tfesenbecker Nov 8, 2019
a635318
Merge branch 'cachingextension' into feature/storageimprovement
tfesenbecker Nov 8, 2019
6f7ace1
Merge pull request #1 from tfesenbecker/feature/storageimprovement
tfesenbecker Nov 8, 2019
75165ad
fixed bug from merge
tfesenbecker Nov 8, 2019
d943ed6
Merge branch 'cachingextension' of https://github.com/tfesenbecker/la…
tfesenbecker Nov 8, 2019
9453632
Merge pull request #2 from tfesenbecker/feature/storageimprovement
tfesenbecker Nov 8, 2019
32faa38
removed debug output to fix unit test
tfesenbecker Nov 8, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion lapis/cli/simulate.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from lapis.pool import StaticPool, Pool
from lapis.pool_io.htcondor import htcondor_pool_reader
from lapis.job_io.swf import swf_job_reader
from lapis.storage_io.storage import storage_reader

from lapis.scheduler import CondorJobScheduler
from lapis.simulator import Simulator
Expand All @@ -25,6 +26,8 @@

pool_import_mapper = {"htcondor": htcondor_pool_reader}

storage_import_mapper = {"standard": storage_reader}


@click.group()
@click.option("--seed", type=int, default=1234)
Expand Down Expand Up @@ -71,8 +74,17 @@ def cli(ctx, seed, until, log_tcp, log_file, log_telegraf):
type=(click.File("r"), click.Choice(list(pool_import_mapper.keys()))),
multiple=True,
)
@click.option(
"--storage-files",
"storage_files",
type=(
click.File("r"),
click.File("r"),
click.Choice(list(storage_import_mapper.keys())),
),
)
@click.pass_context
def static(ctx, job_file, pool_file):
def static(ctx, job_file, pool_file, storage_files):
click.echo("starting static environment")
simulator = Simulator(seed=ctx.obj["seed"])
file, file_type = job_file
Expand All @@ -87,6 +99,12 @@ def static(ctx, job_file, pool_file):
pool_reader=pool_import_mapper[pool_file_type],
pool_type=StaticPool,
)
storage_file, storage_content_file, storage_type = storage_files
simulator.create_storage(
storage_input=storage_file,
storage_content_input=storage_content_file,
storage_reader=storage_import_mapper[storage_type],
)
simulator.run(until=ctx.obj["until"])


Expand Down
7 changes: 7 additions & 0 deletions lapis/drone.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def __init__(
pool_resources: dict,
scheduling_duration: float,
ignore_resources: list = None,
sitename: str = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please reason about including sitename into the drone? From my point of view, this might be too specific. I currently also don't see the advantages from having something like sitename. But I bet you do have some very good reasons! :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be careful about using sitename -- it is a very specific jargon/feature (CMS) and may not properly reflect how we need to model caches. E.g. what we've seen is that for a single "site" there may be different cache/storage requirements -- see Chris' results on benchmarking TSystems. If we need to identify the "site", that is basically the Pool of the drone -- which can already be used as a dict key etc.

Is there some advantage to using a string identifier?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sitename is just quick workaround for the mapping of pools/drones and storage elements. I want to replace this as soon as I understand what information/means of identification will available once other changes to the overall system (adding more of the jobs ClassAd structure, ...) are done. Passing this information directly to the drone was the option that needed the least changes to existing code.

@maxfischer2781 What do you mean by

which can already be used as a dict key etc.

?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by ...

A Pool instance can be used as a key in a mapping -- e.g. Dict[Pool, Storage] should be possible to map from pool (site) to storage and vice versa.

Which of course isn't of much use to you if you want to define this outside of the simulation, e.g. in a JSON/YAML configuration file...

):
"""
:param scheduler:
Expand All @@ -23,6 +24,7 @@ def __init__(
"""
super(Drone, self).__init__()
self.scheduler = scheduler
self.sitename = sitename
self.pool_resources = pool_resources
self.resources = Capacities(**pool_resources)
# shadowing requested resources to determine jobs to be killed
Expand Down Expand Up @@ -140,6 +142,11 @@ async def start_job(self, job: Job, kill: bool = False):
pass
self.scheduler.update_drone(self)
await job_execution.done
print(
"finished job {} on drone {} @ {}".format(
repr(job), repr(self), time.now
)
)
except ResourcesUnavailable:
await instant
job_execution.cancel()
Expand Down
37 changes: 37 additions & 0 deletions lapis/file_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from lapis.storage import Storage


class FileProvider(object):
tfesenbecker marked this conversation as resolved.
Show resolved Hide resolved

__slots__ = ("storages",)

def __init__(self):
self.storages = dict()

def add_storage_element(self, storage_element: Storage):
try:
self.storages[storage_element.sitename].append(storage_element)
except KeyError:
self.storages[storage_element.sitename] = [storage_element]

def provides_all_files(self, job):
tfesenbecker marked this conversation as resolved.
Show resolved Hide resolved
"""
Dummy implementation, to be replaced: if a part of every inputfile of the job is
provided by a storage element located on the same site as the drone the job
is running on this function returns True
:param job:
:return:
"""
provided_storages = self.storages.get(job.drone.sitename, None)
eileen-kuehn marked this conversation as resolved.
Show resolved Hide resolved
if provided_storages:
for inputfilename, inputfilespecs in job.used_inputfiles.items():
provides_inputfile = 0
for storage in provided_storages:
provides_inputfile += storage.provides_file(
(inputfilename, inputfilespecs)
)
tfesenbecker marked this conversation as resolved.
Show resolved Hide resolved
if not provides_inputfile:
return False
return True
tfesenbecker marked this conversation as resolved.
Show resolved Hide resolved
else:
return False
20 changes: 18 additions & 2 deletions lapis/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from usim import CancelTask

from lapis.monitor import sampling_required
from lapis.utilities.walltime_models import walltime_models

if TYPE_CHECKING:
from lapis.drone import Drone
Expand All @@ -14,7 +15,7 @@ class Job(object):
__slots__ = (
"resources",
"used_resources",
"walltime",
"_walltime",
"requested_walltime",
"queue_date",
"requested_inputfiles",
Expand All @@ -23,6 +24,7 @@ class Job(object):
"in_queue_until",
"_name",
"drone",
"fileprovider",
tfesenbecker marked this conversation as resolved.
Show resolved Hide resolved
"_success",
)

Expand Down Expand Up @@ -59,7 +61,7 @@ def __init__(
self.used_resources[key],
)
self.resources[key] = self.used_resources[key]
self.walltime = used_resources.pop("walltime")
self._walltime = used_resources.pop("walltime")
self.requested_walltime = resources.pop("walltime", None)
self.requested_inputfiles = resources.pop("inputfiles", None)
self.used_inputfiles = used_resources.pop("inputfiles", None)
Expand All @@ -68,6 +70,7 @@ def __init__(
self.in_queue_since = in_queue_since
self.in_queue_until = None
self.drone = drone
self.fileprovider = None
self._name = name
self._success: Optional[bool] = None

Expand All @@ -91,10 +94,23 @@ def waiting_time(self) -> float:
return self.in_queue_until - self.in_queue_since
return float("Inf")

@property
def walltime(self):
if self.fileprovider and self.fileprovider.provides_all_files(self):
return walltime_models["maxeff"](self, self._walltime)
else:
return self._walltime

async def run(self):
self.in_queue_until = time.now
self._success = None
await sampling_required.put(self)
if self.drone:
print(
"running job {} on site {} in drone {}".format(
repr(self), self.drone.sitename, repr(self.drone)
)
)
eileen-kuehn marked this conversation as resolved.
Show resolved Hide resolved
try:
await (time + self.walltime)
except CancelTask:
Expand Down
1 change: 1 addition & 0 deletions lapis/pool_io/htcondor.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,6 @@ def htcondor_pool_reader(
for key, value in resource_name_mapping.items()
},
ignore_resources=["disk"],
sitename=row.get("sitename", None),
),
)
11 changes: 9 additions & 2 deletions lapis/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from typing import Dict
from usim import Scope, interval
from usim import Scope, interval, time

from lapis.drone import Drone
from lapis.monitor import sampling_required
Expand All @@ -26,9 +26,10 @@ class CondorJobScheduler(object):
:return:
"""

def __init__(self, job_queue):
def __init__(self, job_queue, fileprovider):
tfesenbecker marked this conversation as resolved.
Show resolved Hide resolved
self._stream_queue = job_queue
self.drone_cluster = []
self.fileprovider = fileprovider
self.interval = 60
self.job_queue = JobQueue()
self._collecting = True
Expand Down Expand Up @@ -90,6 +91,12 @@ async def run(self):
for job in self.job_queue:
best_match = self._schedule_job(job)
if best_match:
job.fileprovider = self.fileprovider
print(
"start job {} on drone {} @ {}".format(
repr(job), repr(best_match), time.now
)
)
scope.do(best_match.start_job(job))
self.job_queue.remove(job)
await sampling_required.put(self.job_queue)
Expand Down
14 changes: 13 additions & 1 deletion lapis/simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from lapis.drone import Drone
from lapis.job import job_to_queue_scheduler
from lapis.file_provider import FileProvider
from lapis.monitor.general import (
user_demand,
job_statistics,
Expand All @@ -26,6 +27,8 @@ def __init__(self, seed=1234):
random.seed(seed)
self.job_queue = Queue()
self.pools = []
self.storage_list = []
self.fileprovider = FileProvider()
tfesenbecker marked this conversation as resolved.
Show resolved Hide resolved
self.controllers = []
self.job_scheduler = None
self.job_generator = None
Expand Down Expand Up @@ -59,8 +62,17 @@ def create_pools(self, pool_input, pool_reader, pool_type, controller=None):
if controller:
self.controllers.append(controller(target=pool, rate=1))

def create_storage(self, storage_input, storage_content_input, storage_reader):
for storage in storage_reader(
storage=storage_input, storage_content=storage_content_input
):
self.storage_list.append(storage)
self.fileprovider.add_storage_element(storage)

def create_scheduler(self, scheduler_type):
self.job_scheduler = scheduler_type(job_queue=self.job_queue)
self.job_scheduler = scheduler_type(
job_queue=self.job_queue, fileprovider=self.fileprovider
)

def run(self, until=None):
print(f"running until {until}")
Expand Down
81 changes: 81 additions & 0 deletions lapis/storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
from usim import time

from typing import Optional

from lapis.utilities.cache_algorithm_implementations import cache_algorithm
from lapis.utilities.cache_cleanup_implementations import cache_cleanup


class Storage(object):

__slots__ = ("name", "sitename", "storagesize", "usedstorage", "content")

def __init__(
self, name: str, sitename: str, storagesize: int, content: Optional[dict] = None
tfesenbecker marked this conversation as resolved.
Show resolved Hide resolved
):
self.name = name
maxfischer2781 marked this conversation as resolved.
Show resolved Hide resolved
self.sitename = sitename
tfesenbecker marked this conversation as resolved.
Show resolved Hide resolved
self.storagesize = storagesize
self.content = content
self.usedstorage = self.get_used_storage()
tfesenbecker marked this conversation as resolved.
Show resolved Hide resolved
self.describe_state()

def get_used_storage(self):
return sum(subdict["usedsize"] for subdict in self.content.values())
tfesenbecker marked this conversation as resolved.
Show resolved Hide resolved

def free_space(self):
return self.storagesize - self.usedstorage
tfesenbecker marked this conversation as resolved.
Show resolved Hide resolved

def place_new_file(self, filerequest: tuple):
tfesenbecker marked this conversation as resolved.
Show resolved Hide resolved
filename, filespecs = filerequest
if self.free_space() - filespecs["usedsize"] < 0:
self.make_room(self.free_space() - filespecs["usedsize"])
self.content.update({filename: filespecs})
self.content[filename].update(
cachedsince=time.now, lastaccessed=time.now, numberofaccesses=0
)
self.usedstorage = self.get_used_storage()
tfesenbecker marked this conversation as resolved.
Show resolved Hide resolved

def update_file(self, filerequest: tuple):
filename, filespecs = filerequest
requested_file = filename
tfesenbecker marked this conversation as resolved.
Show resolved Hide resolved
filesize_difference = (
filespecs["usedsize"] - self.content[requested_file]["usedsize"]
)
if filesize_difference > 0:
self.make_room(filesize_difference)
self.content[requested_file]["usedsize"] += filesize_difference
tfesenbecker marked this conversation as resolved.
Show resolved Hide resolved
self.content[requested_file]["lastaccessed"] = time.now
self.content[requested_file]["numberofaccesses"] += 1
self.usedstorage = self.get_used_storage()

def make_room(self, filesize_difference: int):
if self.free_space() - filesize_difference < 0:
cache_cleanup["fifo"](filesize_difference, self)
tfesenbecker marked this conversation as resolved.
Show resolved Hide resolved
maxfischer2781 marked this conversation as resolved.
Show resolved Hide resolved

def provides_file(self, filerequest: dict):
filename, filespecs = filerequest
if filename in self.content.keys():
self.update_file(filerequest)
return True
else:
if self.cache_file():
self.place_new_file(filerequest)
return False
tfesenbecker marked this conversation as resolved.
Show resolved Hide resolved

def cache_file(self):
tfesenbecker marked this conversation as resolved.
Show resolved Hide resolved
# cache everything, test different implementations
return cache_algorithm["standard"]()

def describe_state(self):
print(
"{name} on site {site}: {used}MB of {tot}MB used ({div} %), contains "
"files {filelist}".format(
name=self.name,
site=self.sitename,
used=self.usedstorage,
tot=self.storagesize,
div=100.0 * self.usedstorage / self.storagesize,
filelist=", ".join(self.content.keys()),
)
)
tfesenbecker marked this conversation as resolved.
Show resolved Hide resolved
Empty file added lapis/storage_io/__init__.py
Empty file.
34 changes: 34 additions & 0 deletions lapis/storage_io/storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import csv
from lapis.storage import Storage


def storage_reader(storage, storage_content):
storage_content = storage_content_reader(storage_content)
reader = csv.DictReader(storage, delimiter=" ", quotechar="'")
for row in reader:
yield Storage(
name=row["name"],
sitename=row["sitename"],
storagesize=int(row["cachesizeMB"]),
content=storage_content[row["name"]],
)


def storage_content_reader(file_name):
reader = csv.DictReader(file_name, delimiter=" ", quotechar="'")
cache_information = dict()
for row in reader:
if row["cachename"] not in cache_information.keys():
cache_information[row["cachename"]] = dict()
cache_information[row["cachename"]][row["filename"]] = dict()
for key in [
"filesize",
"usedsize",
"cachedsince",
"lastaccessed",
"numberofaccesses",
]:
cache_information[row["cachename"]][row["filename"]][key] = int(row[key])
if not cache_information:
cache_information = None
return cache_information
5 changes: 5 additions & 0 deletions lapis/utilities/cache_algorithm_implementations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
def cache_all():
tfesenbecker marked this conversation as resolved.
Show resolved Hide resolved
return True


cache_algorithm = {"standard": cache_all}
Loading