From c9bbebf94e4a67c265b6025d4437097243a297d9 Mon Sep 17 00:00:00 2001 From: Vikram Sreekanti Date: Tue, 28 Jan 2020 14:12:20 -0800 Subject: [PATCH] Renames Droplet to Cloudburst (#21) * Renames droplet to cloudburst * Adds new Python dependencies --- README.md | 22 ++++----- {droplet => cloudburst}/__init__.py | 0 {droplet => cloudburst}/client/__init__.py | 0 .../client/benchmark_trigger.py | 2 +- {droplet => cloudburst}/client/client.py | 30 ++++++------ .../client/run_benchmark.py | 26 +++++------ {droplet => cloudburst}/server/__init__.py | 0 .../server/benchmarks/__init__.py | 0 .../server/benchmarks/centr_avg.py | 16 +++---- .../server/benchmarks/composition.py | 18 ++++---- .../server/benchmarks/dist_avg.py | 18 ++++---- .../server/benchmarks/lambda_locality.py | 2 +- .../server/benchmarks/locality.py | 34 +++++++------- .../server/benchmarks/mobilenet.py | 34 +++++++------- .../server/benchmarks/predserving.py | 28 +++++------ .../server/benchmarks/scaling.py | 16 +++---- .../server/benchmarks/server.py | 30 ++++++------ .../server/benchmarks/summa.py | 30 ++++++------ .../server/benchmarks/utils.py | 0 .../server/executor/__init__.py | 0 .../server/executor/call.py | 22 ++++----- .../server/executor/pin.py | 4 +- .../server/executor/server.py | 20 ++++---- .../server/executor/user_library.py | 8 ++-- .../server/executor/utils.py | 9 ++-- .../server/scheduler/__init__.py | 0 .../server/scheduler/call.py | 18 ++++---- .../server/scheduler/create.py | 10 ++-- .../server/scheduler/policy/__init__.py | 0 .../server/scheduler/policy/base_policy.py | 4 +- .../server/scheduler/policy/default_policy.py | 20 +++++--- .../server/scheduler/server.py | 46 ++++++++++++------- .../server/scheduler/utils.py | 4 +- {droplet => cloudburst}/server/utils.py | 2 +- {droplet => cloudburst}/shared/__init__.py | 0 .../shared/anna_ipc_client.py | 8 ++-- {droplet => cloudburst}/shared/function.py | 8 ++-- {droplet => cloudburst}/shared/future.py | 2 +- .../shared/proto/__init__.py | 0 {droplet => cloudburst}/shared/reference.py | 2 +- {droplet => cloudburst}/shared/serializer.py | 32 +++++++++---- {droplet => cloudburst}/shared/utils.py | 4 +- common | 2 +- ...droplet-local.yml => cloudburst-local.yml} | 3 +- ...oplet.dockerfile => cloudburst.dockerfile} | 17 ++++--- .../{start-droplet.sh => start-cloudburst.sh} | 36 +++++++-------- docs/function-execution.md | 22 ++++----- docs/local-mode.md | 14 +++--- requirements.txt | 1 + scripts/build.sh | 26 +++++------ scripts/clean.sh | 2 +- ...let-local.sh => start-cloudburst-local.sh} | 4 +- ...plet-local.sh => stop-cloudburst-local.sh} | 0 scripts/travis/docker-build.sh | 6 +-- setup.py | 6 +-- tests/__main__.py | 26 +++++------ tests/mock/kvs_client.py | 6 +-- tests/server/executor/test_call.py | 32 ++++++------- tests/server/executor/test_pin.py | 14 +++--- tests/server/executor/test_user_library.py | 6 +-- .../scheduler/policy/test_default_policy.py | 22 ++++----- tests/server/scheduler/test_call.py | 26 +++++------ tests/server/scheduler/test_create.py | 20 ++++---- tests/server/utils.py | 6 +-- tests/shared/test_serializer.py | 16 +++---- 65 files changed, 444 insertions(+), 398 deletions(-) rename {droplet => cloudburst}/__init__.py (100%) rename {droplet => cloudburst}/client/__init__.py (100%) rename {droplet => cloudburst}/client/benchmark_trigger.py (98%) rename {droplet => cloudburst}/client/client.py (91%) rename {droplet => cloudburst}/client/run_benchmark.py (70%) rename {droplet => cloudburst}/server/__init__.py (100%) rename {droplet => cloudburst}/server/benchmarks/__init__.py (100%) rename {droplet => cloudburst}/server/benchmarks/centr_avg.py (80%) rename {droplet => cloudburst}/server/benchmarks/composition.py (79%) rename {droplet => cloudburst}/server/benchmarks/dist_avg.py (87%) rename {droplet => cloudburst}/server/benchmarks/lambda_locality.py (97%) rename {droplet => cloudburst}/server/benchmarks/locality.py (77%) rename {droplet => cloudburst}/server/benchmarks/mobilenet.py (79%) rename {droplet => cloudburst}/server/benchmarks/predserving.py (81%) rename {droplet => cloudburst}/server/benchmarks/scaling.py (83%) rename {droplet => cloudburst}/server/benchmarks/server.py (78%) rename {droplet => cloudburst}/server/benchmarks/summa.py (88%) rename {droplet => cloudburst}/server/benchmarks/utils.py (100%) rename {droplet => cloudburst}/server/executor/__init__.py (100%) rename {droplet => cloudburst}/server/executor/call.py (95%) rename {droplet => cloudburst}/server/executor/pin.py (96%) rename {droplet => cloudburst}/server/executor/server.py (95%) rename {droplet => cloudburst}/server/executor/user_library.py (95%) rename {droplet => cloudburst}/server/executor/utils.py (91%) rename {droplet => cloudburst}/server/scheduler/__init__.py (100%) rename {droplet => cloudburst}/server/scheduler/call.py (88%) rename {droplet => cloudburst}/server/scheduler/create.py (93%) rename {droplet => cloudburst}/server/scheduler/policy/__init__.py (100%) rename {droplet => cloudburst}/server/scheduler/policy/base_policy.py (96%) rename {droplet => cloudburst}/server/scheduler/policy/default_policy.py (95%) rename {droplet => cloudburst}/server/scheduler/server.py (88%) rename {droplet => cloudburst}/server/scheduler/utils.py (96%) rename {droplet => cloudburst}/server/utils.py (97%) rename {droplet => cloudburst}/shared/__init__.py (100%) rename {droplet => cloudburst}/shared/anna_ipc_client.py (97%) rename {droplet => cloudburst}/shared/function.py (81%) rename {droplet => cloudburst}/shared/future.py (97%) rename {droplet => cloudburst}/shared/proto/__init__.py (100%) rename {droplet => cloudburst}/shared/reference.py (96%) rename {droplet => cloudburst}/shared/serializer.py (84%) rename {droplet => cloudburst}/shared/utils.py (88%) rename conf/{droplet-local.yml => cloudburst-local.yml} (71%) rename dockerfiles/{droplet.dockerfile => cloudburst.dockerfile} (76%) rename dockerfiles/{start-droplet.sh => start-cloudburst.sh} (66%) rename scripts/{start-droplet-local.sh => start-cloudburst-local.sh} (87%) rename scripts/{stop-droplet-local.sh => stop-cloudburst-local.sh} (100%) diff --git a/README.md b/README.md index de241b58..5923fce0 100644 --- a/README.md +++ b/README.md @@ -1,27 +1,27 @@ -# Droplet +# Cloudburst -[![Build Status](https://travis-ci.com/hydro-project/droplet.svg?branch=master)](https://travis-ci.com/hydro-project/droplet) -[![codecov](https://codecov.io/gh/hydro-project/droplet/branch/master/graph/badge.svg)](https://codecov.io/gh/hydro-project/droplet) +[![Build Status](https://travis-ci.com/hydro-project/cloudburst.svg?branch=master)](https://travis-ci.com/hydro-project/cloudburst) +[![codecov](https://codecov.io/gh/hydro-project/cloudburst/branch/master/graph/badge.svg)](https://codecov.io/gh/hydro-project/cloudburst) [![License](https://img.shields.io/badge/license-Apache--2.0-blue.svg)](https://opensource.org/licenses/Apache-2.0) -Droplet is a low-latency, stateful serverless programming framework built on top of the [Anna KVS](https://github.com/hydro-project/anna). Droplet enables users to execute compositions of functions at low latency, and the system builds on top of Anna in order to enable stateful computation. Droplet is co-deployed with the [Anna caching system](https://github.com/hydro-project/anna-cache) to achieve low-latency access to shared state, and the system relies on Anna's lattice data structures to resolve conflicting updates to shared state. +Cloudburst is a low-latency, stateful serverless programming framework built on top of the [Anna KVS](https://github.com/hydro-project/anna). Cloudburst enables users to execute compositions of functions at low latency, and the system builds on top of Anna in order to enable stateful computation. Cloudburst is co-deployed with the [Anna caching system](https://github.com/hydro-project/anna-cache) to achieve low-latency access to shared state, and the system relies on Anna's lattice data structures to resolve conflicting updates to shared state. ## Getting Started -You can install Droplet's dependencies with `pip` and use the bash scripts included in this repository to run the system locally. You can find the Droplet client in `droplet/client/client.py`. Full documentation on starting a cluster in local mode can be found [here](docs/local-mode.md); documentation for the Droplet client can be found [here](docs/function-execution.md). An example interaction is modeled below. +You can install Cloudburst's dependencies with `pip` and use the bash scripts included in this repository to run the system locally. You can find the Cloudburst client in `cloudburst/client/client.py`. Full documentation on starting a cluster in local mode can be found [here](docs/local-mode.md); documentation for the Cloudburst client can be found [here](docs/function-execution.md). An example interaction is modeled below. ```bash $ pip install -r requirements.txt -$ ./scripts/start-droplet-local.sh +$ ./scripts/start-cloudburst-local.sh ... -$ ./scripts/stop-droplet-local.sh +$ ./scripts/stop-cloudburst-local.sh ``` -The `DropletConnection` is the main client interface; when running in local mode, all interaction between the client and server happens on `localhost`. Users can register functions and execute them. The executions return `DropletFuture`s, which can be retrieved asynchronously via the `get` method. Users can also register DAGs (directed, acylic graphs) of functions, where results from one function will be passed to downstream functions. +The `CloudburstConnection` is the main client interface; when running in local mode, all interaction between the client and server happens on `localhost`. Users can register functions and execute them. The executions return `CloudburstFuture`s, which can be retrieved asynchronously via the `get` method. Users can also register DAGs (directed, acylic graphs) of functions, where results from one function will be passed to downstream functions. ```python ->>> from droplet.client.client import DropletConnection ->>> local_cloud = DropletConnection('127.0.0.1', '127.0.0.1', local=True) +>>> from cloudburst.client.client import CloudburstConnection +>>> local_cloud = CloudburstConnection('127.0.0.1', '127.0.0.1', local=True) >>> cloud_sq = local_cloud.register(lambda _, x: x * x, 'square') >>> cloud_sq(2).get 4 @@ -30,7 +30,7 @@ The `DropletConnection` is the main client interface; when running in local mode 4 ``` -To run Anna and Droplet in cluster mode, you will need to use the cluster management setup, which can be found in the [hydro-project/cluster](https://github.com/hydro-project/cluster) repo. Instructions on how to use the cluster management tools can be found in that repo. +To run Anna and Cloudburst in cluster mode, you will need to use the cluster management setup, which can be found in the [hydro-project/cluster](https://github.com/hydro-project/cluster) repo. Instructions on how to use the cluster management tools can be found in that repo. ## License diff --git a/droplet/__init__.py b/cloudburst/__init__.py similarity index 100% rename from droplet/__init__.py rename to cloudburst/__init__.py diff --git a/droplet/client/__init__.py b/cloudburst/client/__init__.py similarity index 100% rename from droplet/client/__init__.py rename to cloudburst/client/__init__.py diff --git a/droplet/client/benchmark_trigger.py b/cloudburst/client/benchmark_trigger.py similarity index 98% rename from droplet/client/benchmark_trigger.py rename to cloudburst/client/benchmark_trigger.py index a8b41ea6..30236730 100644 --- a/droplet/client/benchmark_trigger.py +++ b/cloudburst/client/benchmark_trigger.py @@ -19,7 +19,7 @@ import cloudpickle as cp -from droplet.server.benchmarks import utils +from cloudburst.server.benchmarks import utils logging.basicConfig(filename='log_trigger.txt', level=logging.INFO, format='%(asctime)s %(message)s') diff --git a/droplet/client/client.py b/cloudburst/client/client.py similarity index 91% rename from droplet/client/client.py rename to cloudburst/client/client.py index a368240c..86d870fe 100644 --- a/droplet/client/client.py +++ b/cloudburst/client/client.py @@ -17,19 +17,19 @@ import zmq from anna.client import AnnaTcpClient -from droplet.shared.function import DropletFunction -from droplet.shared.future import DropletFuture -from droplet.shared.proto.droplet_pb2 import ( +from cloudburst.shared.function import CloudburstFunction +from cloudburst.shared.future import CloudburstFuture +from cloudburst.shared.proto.cloudburst_pb2 import ( Dag, DagCall, Function, FunctionCall, GenericResponse, - NORMAL # Droplet consistency modes + NORMAL # Cloudburst consistency modes ) -from droplet.shared.proto.shared_pb2 import StringSet -from droplet.shared.serializer import Serializer -from droplet.shared.utils import ( +from cloudburst.shared.proto.shared_pb2 import StringSet +from cloudburst.shared.serializer import Serializer +from cloudburst.shared.utils import ( CONNECT_PORT, DAG_CALL_PORT, DAG_CREATE_PORT, @@ -42,10 +42,10 @@ serializer = Serializer() -class DropletConnection(): +class CloudburstConnection(): def __init__(self, func_addr, ip, tid=0, local=False): ''' - func_addr: The address of the Droplet interface, either localhost or + func_addr: The address of the Cloudburst interface, either localhost or the address of an AWS ELB in cluster mode. ip: The IP address of the client machine -- used to send and receive responses. @@ -106,7 +106,7 @@ def get_function(self, name): ''' Retrieves a handle for an individual function. Returns None if the function cannot be found in the system. The returned object can be - called like a regular Python function, which returns a DropletFuture. + called like a regular Python function, which returns a CloudburstFuture. name: The name of the function to retrieve. ''' @@ -115,12 +115,12 @@ def get_function(self, name): functions, use the `list` method.''') return None - return DropletFunction(name, self, self.kvs_client) + return CloudburstFunction(name, self, self.kvs_client) def register(self, function, name): ''' Registers a new function or class with the system. The returned object - can be called like a regular Python function, which returns a Droplet + can be called like a regular Python function, which returns a Cloudburst Future. If the input is a class, the class is expected to have a run method, which is what is invoked at runtime. @@ -138,7 +138,7 @@ def register(self, function, name): resp.ParseFromString(self.func_create_sock.recv()) if resp.success: - return DropletFunction(name, self, self.kvs_client) + return CloudburstFunction(name, self, self.kvs_client) else: print('Unexpected error while registering function: \n\t%s.' % (resp)) @@ -180,7 +180,7 @@ def register_dag(self, name, functions, connections): def call_dag(self, dname, arg_map, direct_response=False, consistency=NORMAL, output_key=None, client_id=None): ''' - Issues a new request to execute the DAG. Returns a DropletFuture that + Issues a new request to execute the DAG. Returns a CloudburstFuture that dname: The name of the DAG to cexecute. arg_map: A map from function names to lists of arguments for each of @@ -228,7 +228,7 @@ def call_dag(self, dname, arg_map, direct_response=False, raise e else: if r.success: - return DropletFuture(r.response_id, self.kvs_client, + return CloudburstFuture(r.response_id, self.kvs_client, serializer) else: return None diff --git a/droplet/client/run_benchmark.py b/cloudburst/client/run_benchmark.py similarity index 70% rename from droplet/client/run_benchmark.py rename to cloudburst/client/run_benchmark.py index 6430b89b..5def1fa4 100755 --- a/droplet/client/run_benchmark.py +++ b/cloudburst/client/run_benchmark.py @@ -18,8 +18,8 @@ import logging import sys -from droplet.client.client import DropletConnection -from droplet.server.benchmarks import ( +from cloudburst.client.client import CloudburstConnection +from cloudburst.server.benchmarks import ( centr_avg, composition, dist_avg, @@ -44,36 +44,36 @@ if len(sys.argv) == 5: ip = sys.argv[4] - droplet_client = DropletConnection(f_elb, ip) + cloudburst_client = CloudburstConnection(f_elb, ip) else: - droplet_client = DropletConnection(f_elb) + cloudburst_client = CloudburstConnection(f_elb) bname = sys.argv[1] if bname == 'composition': - total, scheduler, kvs, retries = composition.run(droplet_client, + total, scheduler, kvs, retries = composition.run(cloudburst_client, num_requests, None) elif bname == 'locality': - locality.run(droplet_client, num_requests, True, None) - total, scheduler, kvs, retries = locality.run(droplet_client, num_requests, + locality.run(cloudburst_client, num_requests, True, None) + total, scheduler, kvs, retries = locality.run(cloudburst_client, num_requests, False, None) elif bname == 'mobilenet': - total, scheduler, kvs, retries = mobilenet.run(droplet_client, num_requests, + total, scheduler, kvs, retries = mobilenet.run(cloudburst_client, num_requests, None) elif bname == 'pred_serving': - total, scheduler, kvs, retries = predserving.run(droplet_client, + total, scheduler, kvs, retries = predserving.run(cloudburst_client, num_requests, None) elif bname == 'avg': - total, scheduler, kvs, retries = dist_avg.run(droplet_client, num_requests, + total, scheduler, kvs, retries = dist_avg.run(cloudburst_client, num_requests, None) elif bname == 'center_avg': - total, scheduler, kvs, retries = centr_avg.run(droplet_client, num_requests, + total, scheduler, kvs, retries = centr_avg.run(cloudburst_client, num_requests, None) elif bname == 'summa': - total, scheduler, kvs, retries = summa.run(droplet_client, num_requests, + total, scheduler, kvs, retries = summa.run(cloudburst_client, num_requests, None) elif bname == 'scaling': - total, scheduler, kvs, retries = scaling.run(droplet_client, num_requests, + total, scheduler, kvs, retries = scaling.run(cloudburst_client, num_requests, None) else: print('Unknown benchmark type: %s!' % (bname)) diff --git a/droplet/server/__init__.py b/cloudburst/server/__init__.py similarity index 100% rename from droplet/server/__init__.py rename to cloudburst/server/__init__.py diff --git a/droplet/server/benchmarks/__init__.py b/cloudburst/server/benchmarks/__init__.py similarity index 100% rename from droplet/server/benchmarks/__init__.py rename to cloudburst/server/benchmarks/__init__.py diff --git a/droplet/server/benchmarks/centr_avg.py b/cloudburst/server/benchmarks/centr_avg.py similarity index 80% rename from droplet/server/benchmarks/centr_avg.py rename to cloudburst/server/benchmarks/centr_avg.py index 1d213517..9a9d5233 100644 --- a/droplet/server/benchmarks/centr_avg.py +++ b/cloudburst/server/benchmarks/centr_avg.py @@ -19,33 +19,33 @@ import numpy as np -def run(droplet_client, num_requests, sckt): +def run(cloudburst_client, num_requests, sckt): ''' DEFINE AND REGISTER FUNCTIONS ''' - def follower(droplet, exec_id, my_id): + def follower(cloudburst, exec_id, my_id): import random val = random.randint(0, 100) key = '%s-%d' % (exec_id, my_id) - droplet.put(key, val) + cloudburst.put(key, val) return key, my_id, val - def leader(droplet, exec_id, num_execs): + def leader(cloudburst, exec_id, num_execs): values = [] for i in range(num_execs): key = '%s-%d' % (exec_id, i) - result = droplet.get(key) + result = cloudburst.get(key) while result is None: - result = droplet.get(key) + result = cloudburst.get(key) values.append(result) import numpy as np return np.mean(values) - cloud_follow = droplet_client.register(follower, 'follower') - cloud_lead = droplet_client.register(leader, 'leader') + cloud_follow = cloudburst_client.register(follower, 'follower') + cloud_lead = cloudburst_client.register(leader, 'leader') if cloud_follow and cloud_lead: print('Successfully registered follower and leader functions.') diff --git a/droplet/server/benchmarks/composition.py b/cloudburst/server/benchmarks/composition.py similarity index 79% rename from droplet/server/benchmarks/composition.py rename to cloudburst/server/benchmarks/composition.py index c5839903..36c376ea 100644 --- a/droplet/server/benchmarks/composition.py +++ b/cloudburst/server/benchmarks/composition.py @@ -18,19 +18,19 @@ import cloudpickle as cp -from droplet.shared.proto.droplet_pb2 import DropletError, DAG_ALREADY_EXISTS +from cloudburst.shared.proto.cloudburst_pb2 import CloudburstError, DAG_ALREADY_EXISTS -def run(droplet_client, num_requests, sckt): +def run(cloudburst_client, num_requests, sckt): ''' DEFINE AND REGISTER FUNCTIONS ''' - def incr(droplet, x): + def incr(cloudburst, x): return x + 1 - def square(droplet, x): + def square(cloudburst, x): return x * x - cloud_incr = droplet_client.register(incr, 'incr') - cloud_square = droplet_client.register(square, 'square') + cloud_incr = cloudburst_client.register(incr, 'incr') + cloud_square = cloudburst_client.register(square, 'square') if cloud_incr and cloud_square: print('Successfully registered incr and square functions.') @@ -55,11 +55,11 @@ def square(droplet, x): functions = ['incr', 'square'] connections = [('incr', 'square')] - success, error = droplet_client.register_dag(dag_name, functions, + success, error = cloudburst_client.register_dag(dag_name, functions, connections) if not success and error != DAG_ALREADY_EXISTS: - print('Failed to register DAG: %s' % (DropletError.Name(error))) + print('Failed to register DAG: %s' % (CloudburstError.Name(error))) sys.exit(1) ''' RUN DAG ''' @@ -73,7 +73,7 @@ def square(droplet, x): for _ in range(num_requests): start = time.time() - rid = droplet_client.call_dag(dag_name, arg_map) + rid = cloudburst_client.call_dag(dag_name, arg_map) end = time.time() stime = end - start diff --git a/droplet/server/benchmarks/dist_avg.py b/cloudburst/server/benchmarks/dist_avg.py similarity index 87% rename from droplet/server/benchmarks/dist_avg.py rename to cloudburst/server/benchmarks/dist_avg.py index 8eecf2d4..369ca999 100644 --- a/droplet/server/benchmarks/dist_avg.py +++ b/cloudburst/server/benchmarks/dist_avg.py @@ -19,9 +19,9 @@ import numpy as np -def run(droplet_client, num_requests, sckt): +def run(cloudburst_client, num_requests, sckt): ''' DEFINE AND REGISTER FUNCTIONS ''' - def avg(droplet, uid, eid, num_execs, val): + def avg(cloudburst, uid, eid, num_execs, val): import random import time @@ -29,9 +29,9 @@ def avg(droplet, uid, eid, num_execs, val): import cloudpickle as cp import numpy as np - myid = droplet.getid() + myid = cloudburst.getid() key = '%s:%d' % (uid, eid) - droplet.put(key, LWWPairLattice(0, cp.dumps(myid))) + cloudburst.put(key, LWWPairLattice(0, cp.dumps(myid))) procs = set() keyset = [] @@ -43,9 +43,9 @@ def avg(droplet, uid, eid, num_execs, val): key = '%s:%d' % (uid, i) keyset.append(key) - locs = droplet.get(keyset) + locs = cloudburst.get(keyset) while None in locs.values(): - locs = droplet.get(keyset) + locs = cloudburst.get(keyset) for key in locs: procs.add(cp.loads(locs[key].reveal())) @@ -64,7 +64,7 @@ def avg(droplet, uid, eid, num_execs, val): curr_weight = np.sum(weight_msgs) dst = random.sample(procs, 1)[0] - droplet.send(dst, cp.dumps((curr_val * .5, curr_weight * .5))) + cloudburst.send(dst, cp.dumps((curr_val * .5, curr_weight * .5))) val_msgs.clear() weight_msgs.clear() @@ -74,7 +74,7 @@ def avg(droplet, uid, eid, num_execs, val): start = time.time() while time.time() - start < .1: - msgs = droplet.recv() + msgs = cloudburst.recv() for msg in msgs: msg = cp.loads(msg[1]) val_msgs.append(msg[0]) @@ -87,7 +87,7 @@ def avg(droplet, uid, eid, num_execs, val): return curr_avg - cloud_avg = droplet_client.register(avg, 'avg') + cloud_avg = cloudburst_client.register(avg, 'avg') if cloud_avg: print('Successfully registered avg function.') diff --git a/droplet/server/benchmarks/lambda_locality.py b/cloudburst/server/benchmarks/lambda_locality.py similarity index 97% rename from droplet/server/benchmarks/lambda_locality.py rename to cloudburst/server/benchmarks/lambda_locality.py index 2149e6ea..1a161644 100644 --- a/droplet/server/benchmarks/lambda_locality.py +++ b/cloudburst/server/benchmarks/lambda_locality.py @@ -21,7 +21,7 @@ import boto3 import cloudpickle as cp -from droplet.server.benchmarks import utils +from cloudburst.server.benchmarks import utils sys_random = random.SystemRandom() diff --git a/droplet/server/benchmarks/locality.py b/cloudburst/server/benchmarks/locality.py similarity index 77% rename from droplet/server/benchmarks/locality.py rename to cloudburst/server/benchmarks/locality.py index 9c748133..dbf3207c 100644 --- a/droplet/server/benchmarks/locality.py +++ b/cloudburst/server/benchmarks/locality.py @@ -21,21 +21,21 @@ import cloudpickle as cp import numpy as np -from droplet.server.benchmarks import utils -from droplet.shared.proto.droplet_pb2 import DropletError, DAG_ALREADY_EXISTS -from droplet.shared.reference import DropletReference +from cloudburst.server.benchmarks import utils +from cloudburst.shared.proto.cloudburst_pb2 import CloudburstError, DAG_ALREADY_EXISTS +from cloudburst.shared.reference import CloudburstReference sys_random = random.SystemRandom() OSIZE = 1000000 -def run(droplet_client, num_requests, create, sckt): +def run(cloudburst_client, num_requests, create, sckt): dag_name = 'locality' kvs_key = 'LOCALITY_OIDS' if create: ''' DEFINE AND REGISTER FUNCTIONS ''' - def dot(droplet, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10): + def dot(cloudburst, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10): import numpy as np s1 = np.add(v1, v2) s2 = np.add(v3, v4) @@ -51,7 +51,7 @@ def dot(droplet, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10): return np.average(s1) - cloud_dot = droplet_client.register(dot, 'dot') + cloud_dot = cloudburst_client.register(dot, 'dot') if cloud_dot: logging.info('Successfully registered the dot function.') @@ -63,9 +63,9 @@ def dot(droplet, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10): for _ in range(10): inp = np.zeros(OSIZE) k = str(uuid.uuid4()) - droplet_client.put_object(k, inp) + cloudburst_client.put_object(k, inp) - refs += (DropletReference(k, True),) + refs += (CloudburstReference(k, True),) dot_test = cloud_dot(*refs).get() if dot_test != 0.0: @@ -77,17 +77,17 @@ def dot(droplet, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10): ''' CREATE DAG ''' functions = ['dot'] connections = [] - success, error = droplet_client.register_dag(dag_name, functions, + success, error = cloudburst_client.register_dag(dag_name, functions, connections) if not success and error != DAG_ALREADY_EXISTS: - print('Failed to register DAG: %s' % (DropletError.Name(error))) + print('Failed to register DAG: %s' % (CloudburstError.Name(error))) sys.exit(1) # for the hot version oid = str(uuid.uuid4()) arr = np.random.randn(OSIZE) - droplet_client.put_object(oid, arr) - droplet_client.put_object(kvs_key, [oid]) + cloudburst_client.put_object(oid, arr) + cloudburst_client.put_object(kvs_key, [oid]) return [], [], [], 0 @@ -102,13 +102,13 @@ def dot(droplet, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10): # array = np.random.rand(OSIZE) # oid = str(uuid.uuid4()) - # droplet_client.put_object(oid, array) + # cloudburst_client.put_object(oid, array) # oids.append(oid) # logging.info('Finished creating data!') # for the hot version - oids = droplet_client.get_object(kvs_key) + oids = cloudburst_client.get_object(kvs_key) total_time = [] scheduler_time = [] @@ -124,14 +124,14 @@ def dot(droplet, v1, v2, v3, v4, v5, v6, v7, v8, v9, v10): for i in range(num_requests): refs = [] # for ref in oids[(i * 10):(i * 10) + 10]: # for the cold version - # refs.append(DropletReference(ref, True)) + # refs.append(CloudburstReference(ref, True)) for _ in range(10): # for the hot version - refs.append(DropletReference(oids[0], True)) + refs.append(CloudburstReference(oids[0], True)) start = time.time() arg_map = {'dot': refs} - droplet_client.call_dag(dag_name, arg_map, True) + cloudburst_client.call_dag(dag_name, arg_map, True) end = time.time() epoch_total.append(end - start) diff --git a/droplet/server/benchmarks/mobilenet.py b/cloudburst/server/benchmarks/mobilenet.py similarity index 79% rename from droplet/server/benchmarks/mobilenet.py rename to cloudburst/server/benchmarks/mobilenet.py index 6cf7abed..92ff69cd 100644 --- a/droplet/server/benchmarks/mobilenet.py +++ b/cloudburst/server/benchmarks/mobilenet.py @@ -20,10 +20,10 @@ import cloudpickle as cp import numpy as np -from droplet.shared.reference import DropletReference +from cloudburst.shared.reference import CloudburstReference -def run(droplet_client, num_requests, sckt): +def run(cloudburst_client, num_requests, sckt): ''' UPLOAD THE MODEL OBJECT ''' @@ -33,34 +33,34 @@ def run(droplet_client, num_requests, sckt): with open('model/label_map.json', 'rb') as f: bts = f.read() lattice = LWWPairLattice(0, bts) - droplet_client.kvs_client.put(label_key, lattice) + cloudburst_client.kvs_client.put(label_key, lattice) with open('model/mobilenet_v2_1.4_224_frozen.pb', 'rb') as f: bts = f.read() lattice = LWWPairLattice(0, bts) - droplet_client.kvs_client.put(model_key, lattice) + cloudburst_client.kvs_client.put(model_key, lattice) ''' DEFINE AND REGISTER FUNCTIONS ''' - def preprocess(droplet, inp): + def preprocess(cloudburst, inp): from skimage import filters return filters.gaussian(inp).reshape(1, 224, 224, 3) class Mobilenet: - def __init__(self, droplet, model_key, label_map_key): + def __init__(self, cloudburst, model_key, label_map_key): import tensorflow as tf import json tf.enable_eager_execution() - self.model = droplet.get(model_key, deserialize=False) - self.label_map = json.loads(droplet.get(label_map_key, + self.model = cloudburst.get(model_key, deserialize=False) + self.label_map = json.loads(cloudburst.get(label_map_key, deserialize=False)) self.gd = tf.GraphDef.FromString(self.model) self.inp, self.predictions = tf.import_graph_def(self.gd, return_elements=['input:0', 'MobilenetV2/Predictions/Reshape_1:0']) - def run(self, droplet, img): + def run(self, cloudburst, img): # load libs import tensorflow as tf from PIL import Image @@ -78,14 +78,14 @@ def run(self, droplet, img): return x - def average(droplet, inp): + def average(cloudburst, inp): import numpy as np inp = [inp,] return np.mean(inp, axis=0) - cloud_prep = droplet_client.register(preprocess, 'preprocess') - cloud_mnet = droplet_client.register((Mobilenet, (model_key, label_key)), 'mnet') - cloud_average = droplet_client.register(average, 'average') + cloud_prep = cloudburst_client.register(preprocess, 'preprocess') + cloud_mnet = cloudburst_client.register((Mobilenet, (model_key, label_key)), 'mnet') + cloud_average = cloudburst_client.register(average, 'average') if cloud_prep and cloud_mnet and cloud_average: print('Successfully registered preprocess, mnet, and average ' @@ -121,7 +121,7 @@ def average(droplet, inp): functions = ['preprocess', 'mnet', 'average'] connections = [('preprocess', 'mnet'), ('mnet', 'average')] - success, error = droplet_client.register_dag(dag_name, functions, + success, error = cloudburst_client.register_dag(dag_name, functions, connections) if not success: @@ -138,15 +138,15 @@ def average(droplet, inp): oid = str(uuid.uuid4()) oids.append(oid) - droplet_client.put_object(oid, arr) + cloudburst_client.put_object(oid, arr) for i in range(num_requests): oid = oids[i] - arg_map = {'preprocess': [DropletReference(oid, True)]} + arg_map = {'preprocess': [CloudburstReference(oid, True)]} start = time.time() - droplet_client.call_dag(dag_name, arg_map, True) + cloudburst_client.call_dag(dag_name, arg_map, True) end = time.time() total_time += [end - start] diff --git a/droplet/server/benchmarks/predserving.py b/cloudburst/server/benchmarks/predserving.py similarity index 81% rename from droplet/server/benchmarks/predserving.py rename to cloudburst/server/benchmarks/predserving.py index 407669b8..7d81466d 100644 --- a/droplet/server/benchmarks/predserving.py +++ b/cloudburst/server/benchmarks/predserving.py @@ -19,32 +19,32 @@ import cloudpickle as cp import numpy as np -from droplet.shared.reference import DropletReference +from cloudburst.shared.reference import CloudburstReference -def run(droplet_client, num_requests, sckt): +def run(cloudburst_client, num_requests, sckt): ''' DEFINE AND REGISTER FUNCTIONS ''' - def preprocess(droplet, inp): + def preprocess(cloudburst, inp): from skimage import filters return filters.gaussian(inp).reshape(1, 3, 224, 224) - def sqnet(droplet, inp): + def sqnet(cloudburst, inp): import torch import torchvision model = torchvision.models.squeezenet1_1() return model(torch.tensor(inp.astype(np.float32))).detach().numpy() - def average(droplet, inp1, inp2, inp3): + def average(cloudburst, inp1, inp2, inp3): import numpy as np inp = [inp1, inp2, inp3] return np.mean(inp, axis=0) - cloud_prep = droplet_client.register(preprocess, 'preprocess') - cloud_sqnet1 = droplet_client.register(sqnet, 'sqnet1') - cloud_sqnet2 = droplet_client.register(sqnet, 'sqnet2') - cloud_sqnet3 = droplet_client.register(sqnet, 'sqnet3') - cloud_average = droplet_client.register(average, 'average') + cloud_prep = cloudburst_client.register(preprocess, 'preprocess') + cloud_sqnet1 = cloudburst_client.register(sqnet, 'sqnet1') + cloud_sqnet2 = cloudburst_client.register(sqnet, 'sqnet2') + cloud_sqnet3 = cloudburst_client.register(sqnet, 'sqnet3') + cloud_average = cloudburst_client.register(average, 'average') if cloud_prep and cloud_sqnet1 and cloud_sqnet2 and cloud_sqnet3 and \ cloud_average: @@ -95,7 +95,7 @@ def average(droplet, inp1, inp2, inp3): connections = [('preprocess', 'sqnet1'), ('preprocess', 'sqnet2'), ('preprocess', 'sqnet3'), ('sqnet1', 'average'), ('sqnet2', 'average'), ('sqnet3', 'average')] - success, error = droplet_client.register_dag(dag_name, functions, + success, error = cloudburst_client.register_dag(dag_name, functions, connections) if not success: @@ -112,15 +112,15 @@ def average(droplet, inp1, inp2, inp3): oid = str(uuid.uuid4()) oids.append(oid) - droplet_client.put(oid, arr) + cloudburst_client.put(oid, arr) for i in range(num_requests): oid = oids[i] - arg_map = {'preprocess': [DropletReference(oid, True)]} + arg_map = {'preprocess': [CloudburstReference(oid, True)]} start = time.time() - droplet_client.call_dag(dag_name, arg_map, True) + cloudburst_client.call_dag(dag_name, arg_map, True) end = time.time() total_time += [end - start] diff --git a/droplet/server/benchmarks/scaling.py b/cloudburst/server/benchmarks/scaling.py similarity index 83% rename from droplet/server/benchmarks/scaling.py rename to cloudburst/server/benchmarks/scaling.py index 4f397ca3..170ebfab 100644 --- a/droplet/server/benchmarks/scaling.py +++ b/cloudburst/server/benchmarks/scaling.py @@ -18,21 +18,21 @@ import cloudpickle as cp -from droplet.shared.proto.droplet_pb2 import DropletError -from droplet.server.benchmarks import utils +from cloudburst.shared.proto.cloudburst_pb2 import CloudburstError +from cloudburst.server.benchmarks import utils -def run(droplet_client, num_requests, sckt, create): +def run(cloudburst_client, num_requests, sckt, create): ''' DEFINE AND REGISTER FUNCTIONS ''' dag_name = 'scaling' if create: - def slp(droplet, x): + def slp(cloudburst, x): import time time.sleep(.050) return x - cloud_sleep = droplet_client.register(slp, 'sleep') + cloud_sleep = cloudburst_client.register(slp, 'sleep') if cloud_sleep: print('Successfully registered sleep function.') @@ -48,10 +48,10 @@ def slp(droplet, x): ''' CREATE DAG ''' functions = ['sleep'] - success, error = droplet_client.register_dag(dag_name, functions, []) + success, error = cloudburst_client.register_dag(dag_name, functions, []) if not success: - print('Failed to register DAG: %s' % (DropletError.Name(error))) + print('Failed to register DAG: %s' % (CloudburstError.Name(error))) sys.exit(1) return [], [], [], 0 @@ -67,7 +67,7 @@ def slp(droplet, x): epoch = 0 for _ in range(num_requests): start = time.time() - res = droplet_client.call_dag(dag_name, arg_map, True) + res = cloudburst_client.call_dag(dag_name, arg_map, True) end = time.time() if res is not None: diff --git a/droplet/server/benchmarks/server.py b/cloudburst/server/benchmarks/server.py similarity index 78% rename from droplet/server/benchmarks/server.py rename to cloudburst/server/benchmarks/server.py index 34038f1e..6adac345 100755 --- a/droplet/server/benchmarks/server.py +++ b/cloudburst/server/benchmarks/server.py @@ -17,8 +17,8 @@ import zmq -from droplet.client.client import DropletConnection -from droplet.server.benchmarks import ( +from cloudburst.client.client import CloudburstConnection +from cloudburst.server.benchmarks import ( composition, locality, lambda_locality, @@ -27,13 +27,13 @@ scaling, utils ) -import droplet.server.utils as sutils +import cloudburst.server.utils as sutils BENCHMARK_START_PORT = 3000 -def benchmark(ip, droplet_address, tid): - droplet = DropletConnection(droplet_address, ip, tid) +def benchmark(ip, cloudburst_address, tid): + cloudburst = CloudburstConnection(cloudburst_address, ip, tid) logging.basicConfig(filename='log_benchmark.txt', level=logging.INFO, format='%(asctime)s %(message)s') @@ -42,7 +42,7 @@ def benchmark(ip, droplet_address, tid): benchmark_start_socket = ctx.socket(zmq.PULL) benchmark_start_socket.bind('tcp://*:' + str(BENCHMARK_START_PORT + tid)) - kvs = droplet.kvs_client + kvs = cloudburst.kvs_client while True: msg = benchmark_start_socket.recv_string() @@ -58,30 +58,30 @@ def benchmark(ip, droplet_address, tid): sckt = ctx.socket(zmq.PUSH) sckt.connect('tcp://' + resp_addr + ':3000') - run_bench(bname, num_requests, droplet, kvs, sckt, create) + run_bench(bname, num_requests, cloudburst, kvs, sckt, create) -def run_bench(bname, num_requests, droplet, kvs, sckt, create=False): +def run_bench(bname, num_requests, cloudburst, kvs, sckt, create=False): logging.info('Running benchmark %s, %d requests.' % (bname, num_requests)) if bname == 'composition': - total, scheduler, kvs, retries = composition.run(droplet, num_requests, + total, scheduler, kvs, retries = composition.run(cloudburst, num_requests, sckt) elif bname == 'locality': - total, scheduler, kvs, retries = locality.run(droplet, num_requests, + total, scheduler, kvs, retries = locality.run(cloudburst, num_requests, create, sckt) elif bname == 'redis' or bname == 's3': total, scheduler, kvs, retries = lambda_locality.run(bname, kvs, num_requests, sckt) elif bname == 'predserving': - total, scheduler, kvs, retries = predserving.run(droplet, num_requests, + total, scheduler, kvs, retries = predserving.run(cloudburst, num_requests, sckt) elif bname == 'mobilenet': - total, scheduler, kvs, retries = mobilenet.run(droplet, num_requests, + total, scheduler, kvs, retries = mobilenet.run(cloudburst, num_requests, sckt) elif bname == 'scaling': - total, scheduler, kvs, retries = scaling.run(droplet, num_requests, + total, scheduler, kvs, retries = scaling.run(cloudburst, num_requests, sckt, create) else: logging.info('Unknown benchmark type: %s!' % (bname)) @@ -112,10 +112,10 @@ def run_bench(bname, num_requests, droplet, kvs, sckt, create=False): if len(sys.argv) > 1: conf_file = sys.argv[1] else: - conf_file = 'conf/droplet-config.yml' + conf_file = 'conf/cloudburst-config.yml' conf = sutils.load_conf(conf_file) bench_conf = conf['benchmark'] - benchmark(conf['ip'], bench_conf['droplet_address'], + benchmark(conf['ip'], bench_conf['cloudburst_address'], int(bench_conf['thread_id'])) diff --git a/droplet/server/benchmarks/summa.py b/cloudburst/server/benchmarks/summa.py similarity index 88% rename from droplet/server/benchmarks/summa.py rename to cloudburst/server/benchmarks/summa.py index 95d13b80..c9ac1244 100644 --- a/droplet/server/benchmarks/summa.py +++ b/cloudburst/server/benchmarks/summa.py @@ -19,22 +19,22 @@ import numpy as np -from droplet.shared.reference import DropletReference +from cloudburst.shared.reference import CloudburstReference -def run(droplet_client, num_requests, sckt): +def run(cloudburst_client, num_requests, sckt): ''' DEFINE AND REGISTER FUNCTIONS ''' - def summa(droplet, uid, lblock, rblock, rid, cid, numrows, numcols): + def summa(cloudburst, uid, lblock, rblock, rid, cid, numrows, numcols): import cloudpickle as cp bsize = lblock.shape[0] ssize = 100 res = np.zeros((bsize, bsize)) - myid = droplet.getid() + myid = cloudburst.getid() key = '%s: (%d, %d)' % (uid, rid, cid) - droplet.put(key, myid) + cloudburst.put(key, myid) proc_locs = {} keyset = [] @@ -55,9 +55,9 @@ def summa(droplet, uid, lblock, rblock, rid, cid, numrows, numcols): keyset.append(key) idset[key] = (rid, j) - locs = droplet.get(keyset) + locs = cloudburst.get(keyset) while None in locs.values(): - locs = droplet.get(keyset) + locs = cloudburst.get(keyset) for key in locs: loc = idset[key] @@ -73,7 +73,7 @@ def summa(droplet, uid, lblock, rblock, rid, cid, numrows, numcols): msg = cp.dumps((send_id, lblock[:, (k * ssize): ((k+1) * ssize)])) - droplet.send(dest, msg) + cloudburst.send(dest, msg) for r in range(numrows): if r == rid: @@ -85,7 +85,7 @@ def summa(droplet, uid, lblock, rblock, rid, cid, numrows, numcols): msg = cp.dumps((send_id, rblock[(k * ssize):((k+1) * ssize), :])) - droplet.send(dest, msg) + cloudburst.send(dest, msg) num_recvs = (((numrows - 1) * bsize) / ssize) * 2 recv_count = 0 @@ -101,7 +101,7 @@ def summa(droplet, uid, lblock, rblock, rid, cid, numrows, numcols): ((r+1) * ssize), :] while recv_count < num_recvs: - msgs = droplet.recv() + msgs = cloudburst.recv() recv_count += (len(msgs)) for msg in msgs: @@ -142,7 +142,7 @@ def summa(droplet, uid, lblock, rblock, rid, cid, numrows, numcols): res = np.add(res, np.matmul(left, right)) return res - cloud_summa = droplet_client.register(summa, 'summa') + cloud_summa = cloudburst_client.register(summa, 'summa') if cloud_summa: print('Successfully registered summa function.') @@ -181,8 +181,8 @@ def get_block(arr, row, col, bsize): id1 = str(uuid.uuid4()) id2 = str(uuid.uuid4()) - droplet_client.put_object(id1, lblock) - droplet_client.put_object(id2, rblock) + cloudburst_client.put_object(id1, lblock) + cloudburst_client.put_object(id2, rblock) left_id_map[(r, c)] = id1 right_id_map[(r, c)] = id2 @@ -190,8 +190,8 @@ def get_block(arr, row, col, bsize): start = time.time() for r in range(nr): for c in range(nc): - r1 = DropletReference(left_id_map[(r, c)], True) - r2 = DropletReference(right_id_map[(r, c)], True) + r1 = CloudburstReference(left_id_map[(r, c)], True) + r2 = CloudburstReference(right_id_map[(r, c)], True) rids[(r, c)] = cloud_summa(uid, r1, r2, r, c, nr, nc) end = time.time() diff --git a/droplet/server/benchmarks/utils.py b/cloudburst/server/benchmarks/utils.py similarity index 100% rename from droplet/server/benchmarks/utils.py rename to cloudburst/server/benchmarks/utils.py diff --git a/droplet/server/executor/__init__.py b/cloudburst/server/executor/__init__.py similarity index 100% rename from droplet/server/executor/__init__.py rename to cloudburst/server/executor/__init__.py diff --git a/droplet/server/executor/call.py b/cloudburst/server/executor/call.py similarity index 95% rename from droplet/server/executor/call.py rename to cloudburst/server/executor/call.py index 239b7ac2..972cbd3d 100644 --- a/droplet/server/executor/call.py +++ b/cloudburst/server/executor/call.py @@ -24,16 +24,16 @@ VectorClock ) -from droplet.server.executor import utils -import droplet.server.utils as sutils -from droplet.shared.proto.droplet_pb2 import ( +from cloudburst.server.executor import utils +import cloudburst.server.utils as sutils +from cloudburst.shared.proto.cloudburst_pb2 import ( DagTrigger, FunctionCall, - NORMAL, MULTI, # Droplet's consistency modes, - EXECUTION_ERROR, FUNC_NOT_FOUND # Droplet's error types + NORMAL, MULTI, # Cloudburst's consistency modes, + EXECUTION_ERROR, FUNC_NOT_FOUND # Cloudburst's error types ) -from droplet.shared.reference import DropletReference -from droplet.shared.serializer import Serializer +from cloudburst.shared.reference import CloudburstReference +from cloudburst.shared.serializer import Serializer serializer = Serializer() @@ -87,7 +87,7 @@ def exec_function(exec_socket, kvs, user_library, cache, function_cache): def _exec_func_normal(kvs, func, args, user_lib, cache): - refs = list(filter(lambda a: isinstance(a, DropletReference), args)) + refs = list(filter(lambda a: isinstance(a, CloudburstReference), args)) if refs: refs = _resolve_ref_normal(refs, kvs, cache) @@ -97,7 +97,7 @@ def _exec_func_normal(kvs, func, args, user_lib, cache): def _exec_func_causal(kvs, func, args, user_lib, schedule=None, key_version_locations={}, dependencies={}): - refs = list(filter(lambda a: isinstance(a, DropletReference), args)) + refs = list(filter(lambda a: isinstance(a, CloudburstReference), args)) if refs: refs = _resolve_ref_causal(refs, kvs, schedule, key_version_locations, @@ -113,7 +113,7 @@ def _run_function(func, refs, args, user_lib): # If any of the arguments are references, we insert the resolved reference # instead of the raw value. for arg in args: - if isinstance(arg, DropletReference): + if isinstance(arg, CloudburstReference): func_args += (refs[arg.key],) else: func_args += (arg,) @@ -402,7 +402,7 @@ def _compute_children_read_set(schedule): delta = new_delta for child in children: - refs = list(filter(lambda arg: type(arg) == DropletReference, + refs = list(filter(lambda arg: type(arg) == CloudburstReference, [serializer.load(arg) for arg in schedule.arguments[child].values])) for ref in refs: diff --git a/droplet/server/executor/pin.py b/cloudburst/server/executor/pin.py similarity index 96% rename from droplet/server/executor/pin.py rename to cloudburst/server/executor/pin.py index 84de769a..1df66e91 100644 --- a/droplet/server/executor/pin.py +++ b/cloudburst/server/executor/pin.py @@ -15,8 +15,8 @@ import logging import sys -import droplet.server.utils as sutils -from droplet.server.executor import utils +import cloudburst.server.utils as sutils +from cloudburst.server.executor import utils def pin(pin_socket, pusher_cache, kvs, status, function_cache, runtimes, diff --git a/droplet/server/executor/server.py b/cloudburst/server/executor/server.py similarity index 95% rename from droplet/server/executor/server.py rename to cloudburst/server/executor/server.py index 2dd2324f..09d8b941 100644 --- a/droplet/server/executor/server.py +++ b/cloudburst/server/executor/server.py @@ -21,14 +21,14 @@ from anna.zmq_util import SocketCache import zmq -from droplet.server import utils as sutils -from droplet.server.executor import utils -from droplet.server.executor.call import exec_function, exec_dag_function -from droplet.server.executor.pin import pin, unpin -from droplet.server.executor.user_library import DropletUserLibrary -from droplet.shared.anna_ipc_client import AnnaIpcClient -from droplet.shared.proto.droplet_pb2 import DagSchedule, DagTrigger -from droplet.shared.proto.internal_pb2 import ( +from cloudburst.server import utils as sutils +from cloudburst.server.executor import utils +from cloudburst.server.executor.call import exec_function, exec_dag_function +from cloudburst.server.executor.pin import pin, unpin +from cloudburst.server.executor.user_library import CloudburstUserLibrary +from cloudburst.shared.anna_ipc_client import AnnaIpcClient +from cloudburst.shared.proto.cloudburst_pb2 import DagSchedule, DagTrigger +from cloudburst.shared.proto.internal_pb2 import ( ExecutorStatistics, ThreadStatus, ) @@ -83,7 +83,7 @@ def executor(ip, mgmt_ip, schedulers, thread_id): else: client = AnnaTcpClient('127.0.0.1', '127.0.0.1', local=True, offset=1) - user_library = DropletUserLibrary(context, pusher_cache, ip, thread_id, + user_library = CloudburstUserLibrary(context, pusher_cache, ip, thread_id, client) status = ThreadStatus() @@ -346,7 +346,7 @@ def executor(ip, mgmt_ip, schedulers, thread_id): if len(sys.argv) > 1: conf_file = sys.argv[1] else: - conf_file = 'conf/droplet-config.yml' + conf_file = 'conf/cloudburst-config.yml' conf = sutils.load_conf(conf_file) exec_conf = conf['executor'] diff --git a/droplet/server/executor/user_library.py b/cloudburst/server/executor/user_library.py similarity index 95% rename from droplet/server/executor/user_library.py rename to cloudburst/server/executor/user_library.py index acbe0d53..00e47f09 100644 --- a/droplet/server/executor/user_library.py +++ b/cloudburst/server/executor/user_library.py @@ -14,13 +14,13 @@ import zmq -import droplet.server.utils as sutils -from droplet.shared.serializer import Serializer +import cloudburst.server.utils as sutils +from cloudburst.shared.serializer import Serializer serializer = Serializer() -class AbstractDropletUserLibrary: +class AbstractCloudburstUserLibrary: # Stores a lattice value at ref. def put(self, ref, ltc): raise NotImplementedError @@ -41,7 +41,7 @@ def recv(self): raise NotImplementedError -class DropletUserLibrary(AbstractDropletUserLibrary): +class CloudburstUserLibrary(AbstractCloudburstUserLibrary): # ip: Executor IP. # tid: Executor thread ID. diff --git a/droplet/server/executor/utils.py b/cloudburst/server/executor/utils.py similarity index 91% rename from droplet/server/executor/utils.py rename to cloudburst/server/executor/utils.py index 12fa3641..4d1d86ce 100644 --- a/droplet/server/executor/utils.py +++ b/cloudburst/server/executor/utils.py @@ -12,9 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -import droplet.server.utils as sutils -from droplet.shared.proto.droplet_pb2 import NORMAL -from droplet.shared.serializer import Serializer +import cloudburst.server.utils as sutils +from cloudburst.shared.proto.cloudburst_pb2 import NORMAL +from cloudburst.shared.serializer import Serializer serializer = Serializer() @@ -51,6 +51,9 @@ def retrieve_function(name, kvs, user_library, consistency=NORMAL): # function. if type(result) == tuple: cls = result[0] + if type(result[1]) != tuple: + result[1] = (result[1],) + args = (user_library,) + result[1] obj = cls(*args) result = obj.run diff --git a/droplet/server/scheduler/__init__.py b/cloudburst/server/scheduler/__init__.py similarity index 100% rename from droplet/server/scheduler/__init__.py rename to cloudburst/server/scheduler/__init__.py diff --git a/droplet/server/scheduler/call.py b/cloudburst/server/scheduler/call.py similarity index 88% rename from droplet/server/scheduler/call.py rename to cloudburst/server/scheduler/call.py index 25d2273c..f5c97ed0 100644 --- a/droplet/server/scheduler/call.py +++ b/cloudburst/server/scheduler/call.py @@ -15,17 +15,17 @@ import time import uuid -from droplet.server.scheduler import utils -import droplet.server.utils as sutils -from droplet.shared.proto.droplet_pb2 import ( +from cloudburst.server.scheduler import utils +import cloudburst.server.utils as sutils +from cloudburst.shared.proto.cloudburst_pb2 import ( DagSchedule, DagTrigger, FunctionCall, GenericResponse, - NO_RESOURCES # Droplet's error types + NO_RESOURCES # Cloudburst's error types ) -from droplet.shared.reference import DropletReference -from droplet.shared.serializer import Serializer +from cloudburst.shared.reference import CloudburstReference +from cloudburst.shared.serializer import Serializer serializer = Serializer() @@ -40,9 +40,9 @@ def call_function(func_call_socket, pusher_cache, policy): if not call.response_key: call.response_key = str(uuid.uuid4()) - # Filter the arguments for DropletReferences, and use the policy engine to + # Filter the arguments for CloudburstReferences, and use the policy engine to # pick a node for this request. - refs = list(filter(lambda arg: type(arg) == DropletReference, + refs = list(filter(lambda arg: type(arg) == CloudburstReference, map(lambda arg: serializer.load(arg), call.arguments.values))) result = policy.pick_executor(refs) @@ -86,7 +86,7 @@ def call_dag(call, pusher_cache, dags, policy): for fname in dag.functions: args = call.function_args[fname].values - refs = list(filter(lambda arg: type(arg) == DropletReference, + refs = list(filter(lambda arg: type(arg) == CloudburstReference, map(lambda arg: serializer.load(arg), args))) result = policy.pick_executor(refs, fname) diff --git a/droplet/server/scheduler/create.py b/cloudburst/server/scheduler/create.py similarity index 93% rename from droplet/server/scheduler/create.py rename to cloudburst/server/scheduler/create.py index 17a0f710..5f04fbf0 100644 --- a/droplet/server/scheduler/create.py +++ b/cloudburst/server/scheduler/create.py @@ -21,14 +21,14 @@ SingleKeyCausalLattice ) -from droplet.shared.proto.droplet_pb2 import ( +from cloudburst.shared.proto.cloudburst_pb2 import ( Dag, Function, - NORMAL, # Droplet's consistency modes - DAG_ALREADY_EXISTS, NO_RESOURCES, NO_SUCH_DAG # Droplet's error modes + NORMAL, # Cloudburst's consistency modes + DAG_ALREADY_EXISTS, NO_RESOURCES, NO_SUCH_DAG # Cloudburst's error modes ) -import droplet.server.utils as sutils -from droplet.server.scheduler import utils +import cloudburst.server.utils as sutils +from cloudburst.server.scheduler import utils sys_random = random.SystemRandom() diff --git a/droplet/server/scheduler/policy/__init__.py b/cloudburst/server/scheduler/policy/__init__.py similarity index 100% rename from droplet/server/scheduler/policy/__init__.py rename to cloudburst/server/scheduler/policy/__init__.py diff --git a/droplet/server/scheduler/policy/base_policy.py b/cloudburst/server/scheduler/policy/base_policy.py similarity index 96% rename from droplet/server/scheduler/policy/base_policy.py rename to cloudburst/server/scheduler/policy/base_policy.py index 8b3a1668..8cb47c35 100644 --- a/droplet/server/scheduler/policy/base_policy.py +++ b/cloudburst/server/scheduler/policy/base_policy.py @@ -13,9 +13,9 @@ # limitations under the License. -class BaseDropletSchedulerPolicy(): +class BaseCloudburstSchedulerPolicy(): ''' - An abstract class for the Droplet scheduler policy. The policy is intended + An abstract class for the Cloudburst scheduler policy. The policy is intended to track its relevant metadata over time, and it can optionally choose to ignore metadata not relevant to its decisions. ''' diff --git a/droplet/server/scheduler/policy/default_policy.py b/cloudburst/server/scheduler/policy/default_policy.py similarity index 95% rename from droplet/server/scheduler/policy/default_policy.py rename to cloudburst/server/scheduler/policy/default_policy.py index 1ac341c5..ebc79652 100644 --- a/droplet/server/scheduler/policy/default_policy.py +++ b/cloudburst/server/scheduler/policy/default_policy.py @@ -18,12 +18,12 @@ import zmq -from droplet.shared.proto.droplet_pb2 import GenericResponse -from droplet.shared.proto.shared_pb2 import StringSet -from droplet.server.scheduler.policy.base_policy import ( - BaseDropletSchedulerPolicy +from cloudburst.shared.proto.cloudburst_pb2 import GenericResponse +from cloudburst.shared.proto.shared_pb2 import StringSet +from cloudburst.server.scheduler.policy.base_policy import ( + BaseCloudburstSchedulerPolicy ) -from droplet.server.scheduler.utils import ( +from cloudburst.server.scheduler.utils import ( get_cache_ip_key, get_pin_address, get_unpin_address @@ -32,7 +32,7 @@ sys_random = random.SystemRandom() -class DefaultDropletSchedulerPolicy(BaseDropletSchedulerPolicy): +class DefaultCloudburstSchedulerPolicy(BaseCloudburstSchedulerPolicy): def __init__(self, pin_accept_socket, pusher_cache, kvs_client, ip, random_threshold=0.20): @@ -77,6 +77,13 @@ def __init__(self, pin_accept_socket, pusher_cache, kvs_client, ip, # rather than by policy. self.random_threshold = random_threshold + self.unique_executors = set() + + def get_unique_executors(self): + count = len(self.unique_executors) + self.unique_executors = set() + return count + def pick_executor(self, references, function_name=None): # Construct a map which maps from IP addresses to the number of # relevant arguments they have cached. For the time begin, we will @@ -147,6 +154,7 @@ def pick_executor(self, references, function_name=None): if not function_name: self.unpinned_executors.discard(max_ip) + self.unique_executors.add(max_ip) return max_ip def pin_function(self, dag_name, function_name): diff --git a/droplet/server/scheduler/server.py b/cloudburst/server/scheduler/server.py similarity index 88% rename from droplet/server/scheduler/server.py rename to cloudburst/server/scheduler/server.py index 9c2979e5..4b8159ac 100644 --- a/droplet/server/scheduler/server.py +++ b/cloudburst/server/scheduler/server.py @@ -12,38 +12,41 @@ # See the License for the specific language governing permissions and # limitations under the License. +import json import logging import sys import time +import uuid import zmq from anna.client import AnnaTcpClient from anna.zmq_util import SocketCache +import requests -from droplet.server.scheduler.call import call_dag, call_function -from droplet.server.scheduler.create import ( +from cloudburst.server.scheduler.call import call_dag, call_function +from cloudburst.server.scheduler.create import ( create_dag, create_function, delete_dag ) -from droplet.server.scheduler.policy.default_policy import ( - DefaultDropletSchedulerPolicy +from cloudburst.server.scheduler.policy.default_policy import ( + DefaultCloudburstSchedulerPolicy ) -import droplet.server.scheduler.utils as sched_utils -import droplet.server.utils as sutils -from droplet.shared.proto.droplet_pb2 import ( +import cloudburst.server.scheduler.utils as sched_utils +import cloudburst.server.utils as sutils +from cloudburst.shared.proto.cloudburst_pb2 import ( Dag, DagCall, GenericResponse, - NO_SUCH_DAG # Droplet's error types + NO_SUCH_DAG # Cloudburst's error types ) -from droplet.shared.proto.internal_pb2 import ( +from cloudburst.shared.proto.internal_pb2 import ( ExecutorStatistics, SchedulerStatus, ThreadStatus ) -from droplet.shared.proto.shared_pb2 import StringSet -from droplet.shared.utils import ( +from cloudburst.shared.proto.shared_pb2 import StringSet +from cloudburst.shared.utils import ( CONNECT_PORT, DAG_CALL_PORT, DAG_CREATE_PORT, @@ -60,12 +63,14 @@ format='%(asctime)s %(message)s') -def scheduler(ip, mgmt_ip, route_addr): +def scheduler(ip, mgmt_ip, route_addr, metric_addr): # If the management IP is not set, we are running in local mode. local = (mgmt_ip is None) kvs = AnnaTcpClient(route_addr, ip, local=local) + scheduler_id = str(uuid.uuid4()) + context = zmq.Context(1) # A mapping from a DAG's name to its protobuf representation. @@ -134,7 +139,7 @@ def scheduler(ip, mgmt_ip, route_addr): poller.register(sched_update_socket, zmq.POLLIN) # Start the policy engine. - policy = DefaultDropletSchedulerPolicy(pin_accept_socket, pusher_cache, + policy = DefaultCloudburstSchedulerPolicy(pin_accept_socket, pusher_cache, kvs, ip) policy.update() @@ -247,6 +252,12 @@ def scheduler(ip, mgmt_ip, route_addr): requestor_cache, False) if end - start > REPORT_THRESHOLD: + num_unique_executors = policy.get_unique_executors() + key = scheduler_id + ':' + str(time.time()) + data = {'key': key, 'count': num_unique_executors} + # requests.post(metric_addr, data=json.dumps(data), + # headers={'Content-Type': 'application/json'}) + status = SchedulerStatus() for name in dags.keys(): status.dags.append(name) @@ -292,8 +303,6 @@ def scheduler(ip, mgmt_ip, route_addr): sckt = pusher_cache.get(sutils.get_statistics_report_address (mgmt_ip)) sckt.send(stats.SerializeToString()) - else: - logging.info(str(stats)) start = time.time() @@ -302,9 +311,12 @@ def scheduler(ip, mgmt_ip, route_addr): if len(sys.argv) > 1: conf_file = sys.argv[1] else: - conf_file = 'conf/droplet-config.yml' + conf_file = 'conf/cloudburst-config.yml' conf = sutils.load_conf(conf_file) sched_conf = conf['scheduler'] - scheduler(conf['ip'], conf['mgmt_ip'], sched_conf['routing_address']) + metric_address = 'http://' + sched_conf['metric_address'] + ':3000/publish' + + scheduler(conf['ip'], conf['mgmt_ip'], sched_conf['routing_address'], + metric_address) diff --git a/droplet/server/scheduler/utils.py b/cloudburst/server/scheduler/utils.py similarity index 96% rename from droplet/server/scheduler/utils.py rename to cloudburst/server/scheduler/utils.py index 84e0870e..4653576f 100644 --- a/droplet/server/scheduler/utils.py +++ b/cloudburst/server/scheduler/utils.py @@ -14,8 +14,8 @@ from anna.lattices import SetLattice -import droplet.server.utils as sutils -from droplet.shared.proto.shared_pb2 import StringSet +import cloudburst.server.utils as sutils +from cloudburst.shared.proto.shared_pb2 import StringSet FUNCOBJ = 'funcs/index-allfuncs' diff --git a/droplet/server/utils.py b/cloudburst/server/utils.py similarity index 97% rename from droplet/server/utils.py rename to cloudburst/server/utils.py index e06b6120..78f963d1 100644 --- a/droplet/server/utils.py +++ b/cloudburst/server/utils.py @@ -17,7 +17,7 @@ from anna.lattices import VectorClock, MaxIntLattice import yaml -from droplet.shared.proto.droplet_pb2 import GenericResponse +from cloudburst.shared.proto.cloudburst_pb2 import GenericResponse FUNC_PREFIX = 'funcs/' BIND_ADDR_TEMPLATE = 'tcp://*:%d' diff --git a/droplet/shared/__init__.py b/cloudburst/shared/__init__.py similarity index 100% rename from droplet/shared/__init__.py rename to cloudburst/shared/__init__.py diff --git a/droplet/shared/anna_ipc_client.py b/cloudburst/shared/anna_ipc_client.py similarity index 97% rename from droplet/shared/anna_ipc_client.py rename to cloudburst/shared/anna_ipc_client.py index 952657cd..b64867e8 100644 --- a/droplet/shared/anna_ipc_client.py +++ b/cloudburst/shared/anna_ipc_client.py @@ -17,17 +17,17 @@ from anna.base_client import BaseAnnaClient import zmq -from droplet.shared.proto.anna_pb2 import ( +from cloudburst.shared.proto.anna_pb2 import ( NONE, # The undefined lattice type NO_ERROR, KEY_DNE, # Anna's error modes KeyResponse ) -from droplet.shared.proto.causal_pb2 import ( +from cloudburst.shared.proto.causal_pb2 import ( CausalRequest, CausalResponse ) -from droplet.shared.proto.droplet_pb2 import ( - SINGLE, MULTI # Droplet's consistency modes +from cloudburst.shared.proto.cloudburst_pb2 import ( + SINGLE, MULTI # Cloudburst's consistency modes ) GET_REQUEST_ADDR = "ipc:///requests/get" diff --git a/droplet/shared/function.py b/cloudburst/shared/function.py similarity index 81% rename from droplet/shared/function.py rename to cloudburst/shared/function.py index 125b3947..1c4dd5f4 100644 --- a/droplet/shared/function.py +++ b/cloudburst/shared/function.py @@ -12,13 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -from droplet.shared.future import DropletFuture -from droplet.shared.serializer import Serializer +from cloudburst.shared.future import CloudburstFuture +from cloudburst.shared.serializer import Serializer serializer = Serializer() -class DropletFunction(): +class CloudburstFunction(): def __init__(self, name, conn, kvs_client): self.name = name self._conn = conn @@ -29,4 +29,4 @@ def __call__(self, *args): if obj_id is None or len(obj_id) == 0: return None - return DropletFuture(obj_id, self._kvs_client, serializer) + return CloudburstFuture(obj_id, self._kvs_client, serializer) diff --git a/droplet/shared/future.py b/cloudburst/shared/future.py similarity index 97% rename from droplet/shared/future.py rename to cloudburst/shared/future.py index 4f4f7f5f..772bcda8 100644 --- a/droplet/shared/future.py +++ b/cloudburst/shared/future.py @@ -13,7 +13,7 @@ # limitations under the License. -class DropletFuture(): +class CloudburstFuture(): def __init__(self, obj_id, kvs_client, serializer): self.obj_id = obj_id self.kvs_client = kvs_client diff --git a/droplet/shared/proto/__init__.py b/cloudburst/shared/proto/__init__.py similarity index 100% rename from droplet/shared/proto/__init__.py rename to cloudburst/shared/proto/__init__.py diff --git a/droplet/shared/reference.py b/cloudburst/shared/reference.py similarity index 96% rename from droplet/shared/reference.py rename to cloudburst/shared/reference.py index 5a3ed0a0..2cd86758 100644 --- a/droplet/shared/reference.py +++ b/cloudburst/shared/reference.py @@ -13,7 +13,7 @@ # limitations under the License. -class DropletReference(): +class CloudburstReference(): def __init__(self, key, deserialize): self.key = key self.deserialize = deserialize diff --git a/droplet/shared/serializer.py b/cloudburst/shared/serializer.py similarity index 84% rename from droplet/shared/serializer.py rename to cloudburst/shared/serializer.py index 1c0675aa..a6935209 100644 --- a/droplet/shared/serializer.py +++ b/cloudburst/shared/serializer.py @@ -23,15 +23,16 @@ ) import cloudpickle as cp import numpy as np +import pandas as pd import pyarrow as pa -from droplet.server.utils import DEFAULT_VC, generate_timestamp -from droplet.shared.proto.droplet_pb2 import ( - DEFAULT, NUMPY, # Droplet's supported serializer types +from cloudburst.server.utils import DEFAULT_VC, generate_timestamp +from cloudburst.shared.proto.cloudburst_pb2 import ( + DEFAULT, NUMPY, STRING, # Cloudburst's supported serializer types Value ) -from droplet.shared.reference import DropletReference -import droplet.shared.future as future +from cloudburst.shared.reference import CloudburstReference +import cloudburst.shared.future as future class Serializer(): @@ -53,6 +54,8 @@ def load(self, data): if val.type == DEFAULT: return self._load_default(val.body) + elif val.type == STRING: + return self._load_string(val.body) elif val.type == NUMPY: return self._load_numpy(val.body) @@ -60,16 +63,23 @@ def dump(self, data, valobj=None, serialize=True): if not valobj: valobj = Value() + # If we are attempting to pass a future into another function, we # simply turn it into a reference because the runtime knows how to # automatically resolve it. - if isinstance(data, future.DropletFuture): - valobj.body = self._dump_default(DropletReference(data.obj_id, + if type(data) == bytes: + valobj.body = data + valobj.type = DEFAULT + elif isinstance(data, future.CloudburstFuture): + valobj.body = self._dump_default(CloudburstReference(data.obj_id, True)) valobj.type = DEFAULT - elif isinstance(data, np.ndarray): + elif isinstance(data, np.ndarray) or isinstance(data, pd.DataFrame): valobj.body = self._dump_numpy(data) valobj.type = NUMPY + elif isinstance(data, str): + valobj.body = self._dump_string(data) + valobj.type = STRING else: valobj.body = self._dump_default(data) valobj.type = DEFAULT @@ -155,6 +165,12 @@ def _load_default(self, msg): return cp.loads(msg) + def _dump_string(self, msg): + return bytes(msg, 'utf-8') + + def _load_string(self, msg): + return str(msg, 'utf-8') + def _dump_numpy(self, msg): return pa.serialize(msg).to_buffer().to_pybytes() diff --git a/droplet/shared/utils.py b/cloudburst/shared/utils.py similarity index 88% rename from droplet/shared/utils.py rename to cloudburst/shared/utils.py index b61220a0..42d1a919 100644 --- a/droplet/shared/utils.py +++ b/cloudburst/shared/utils.py @@ -12,10 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -# The port on which clients can connect to the Droplet service. +# The port on which clients can connect to the Cloudburst service. CONNECT_PORT = 5000 -# The port on which function creation calls are sent to Droplet. +# The port on which function creation calls are sent to Cloudburst. FUNC_CREATE_PORT = 5001 # The port on which function invocation messages are sent. diff --git a/common b/common index b576ca55..a96ed5b1 160000 --- a/common +++ b/common @@ -1 +1 @@ -Subproject commit b576ca556c5fc2c0a648e6402ae3626316f825f1 +Subproject commit a96ed5b1235f4d9b2ff5a77328ef10af8c73bcf5 diff --git a/conf/droplet-local.yml b/conf/cloudburst-local.yml similarity index 71% rename from conf/droplet-local.yml rename to conf/cloudburst-local.yml index 19feda67..91c6fa6e 100644 --- a/conf/droplet-local.yml +++ b/conf/cloudburst-local.yml @@ -6,6 +6,7 @@ executor: thread_id: 0 scheduler: routing_address: 127.0.0.1 + metric_address: 127.0.0.1 benchmark: - droplet_address: 127.0.0.1 + cloudburst_address: 127.0.0.1 thread_id: 0 diff --git a/dockerfiles/droplet.dockerfile b/dockerfiles/cloudburst.dockerfile similarity index 76% rename from dockerfiles/droplet.dockerfile rename to dockerfiles/cloudburst.dockerfile index 4cd0ca7f..46b1390c 100644 --- a/dockerfiles/droplet.dockerfile +++ b/dockerfiles/cloudburst.dockerfile @@ -23,10 +23,10 @@ ARG build_branch=docker-build USER root # Download latest version of the code from relevant repository & branch -- if -# none are specified, we use hydro-project/droplet by default. Install the KVS +# none are specified, we use hydro-project/cloudburst by default. Install the KVS # client from the Anna project. -WORKDIR $HYDRO_HOME/droplet -RUN git remote remove origin && git remote add origin https://github.com/$repo_org/droplet +WORKDIR $HYDRO_HOME/cloudburst +RUN git remote remove origin && git remote add origin https://github.com/$repo_org/cloudburst RUN git fetch -p origin && git checkout -b $build_branch origin/$source_branch RUN pip3 install -r requirements.txt WORKDIR $HYDRO_HOME @@ -38,8 +38,13 @@ WORKDIR / # These installations are currently pipeline specific until we figure out a # better way to do package management for Python. -RUN pip3 install tensorflow scikit-image +RUN pip3 install tensorflow==1.12.0 tensorboard==1.12.2 scikit-image -COPY start-droplet.sh /start-droplet.sh +COPY start-cloudburst.sh /start-cloudburst.sh -CMD bash start-droplet.sh +RUN pip3 install pandas s3fs + +RUN touch a +RUN pip3 install --upgrade git+https://github.com/devin-petersohn/modin@engines/cloudburst_init + +CMD bash start-cloudburst.sh diff --git a/dockerfiles/start-droplet.sh b/dockerfiles/start-cloudburst.sh similarity index 66% rename from dockerfiles/start-droplet.sh rename to dockerfiles/start-cloudburst.sh index 8e79022a..3705e241 100644 --- a/dockerfiles/start-droplet.sh +++ b/dockerfiles/start-cloudburst.sh @@ -31,7 +31,7 @@ gen_yml_list() { # Download latest version of the code from relevant repository & branch -- if -# none are specified, we use hydro-project/droplet by default. Install the KVS +# none are specified, we use hydro-project/cloudburst by default. Install the KVS # client from the Anna project. cd $HYDRO_HOME/anna git remote remove origin @@ -40,7 +40,7 @@ git fetch -p origin git checkout -b brnch origin/$ANNA_REPO_BRANCH git submodule update -cd $HYDRO_HOME/droplet +cd $HYDRO_HOME/cloudburst git submodule update if [[ -z "$REPO_ORG" ]]; then REPO_ORG="hydro-project" @@ -51,46 +51,46 @@ if [[ -z "$REPO_BRANCH" ]]; then fi git remote remove origin -git remote add origin https://github.com/$REPO_ORG/droplet +git remote add origin https://github.com/$REPO_ORG/cloudburst git fetch -p origin git checkout -b brnch origin/$REPO_BRANCH # Compile protobufs and run other installation procedures before starting. ./scripts/build.sh -touch conf/droplet-config.yml -echo "ip: $IP" >> conf/droplet-config.yml -echo "mgmt_ip: $MGMT_IP" >> conf/droplet-config.yml +touch conf/cloudburst-config.yml +echo "ip: $IP" >> conf/cloudburst-config.yml +echo "mgmt_ip: $MGMT_IP" >> conf/cloudburst-config.yml # Add the current directory to the PYTHONPATH in order to resolve imports # correctly. export PYTHONPATH=$PYTHONPATH:$(pwd) if [[ "$ROLE" = "executor" ]]; then - echo "executor:" >> conf/droplet-config.yml - echo " thread_id: $THREAD_ID" >> conf/droplet-config.yml + echo "executor:" >> conf/cloudburst-config.yml + echo " thread_id: $THREAD_ID" >> conf/cloudburst-config.yml LST=$(gen_yml_list "$SCHED_IPS") - echo " scheduler_ips:" >> conf/droplet-config.yml - echo "$LST" >> conf/droplet-config.yml + echo " scheduler_ips:" >> conf/cloudburst-config.yml + echo "$LST" >> conf/cloudburst-config.yml while true; do - python3.6 droplet/server/executor/server.py + python3.6 cloudburst/server/executor/server.py if [[ "$?" = "1" ]]; then exit 1 fi done elif [[ "$ROLE" = "scheduler" ]]; then - echo "scheduler:" >> conf/droplet-config.yml - echo " routing_address: $ROUTE_ADDR" >> conf/droplet-config.yml + echo "scheduler:" >> conf/cloudburst-config.yml + echo " routing_address: $ROUTE_ADDR" >> conf/cloudburst-config.yml - python3.6 droplet/server/scheduler/server.py + python3.6 cloudburst/server/scheduler/server.py elif [[ "$ROLE" = "benchmark" ]]; then - echo "benchmark:" >> conf/droplet-config.yml - echo " droplet_address: $FUNCTION_ADDR" >> conf/droplet-config.yml - echo " thread_id: $THREAD_ID" >> conf/droplet-config.yml + echo "benchmark:" >> conf/cloudburst-config.yml + echo " cloudburst_address: $FUNCTION_ADDR" >> conf/cloudburst-config.yml + echo " thread_id: $THREAD_ID" >> conf/cloudburst-config.yml - python3.6 droplet/server/benchmarks/server.py + python3.6 cloudburst/server/benchmarks/server.py fi diff --git a/docs/function-execution.md b/docs/function-execution.md index 69a3166a..b64d3f3f 100644 --- a/docs/function-execution.md +++ b/docs/function-execution.md @@ -1,39 +1,39 @@ -# Executing Functions in Droplet +# Executing Functions in Cloudburst -You will either need to run Droplet in [local mode](local-mode.md) or run a Hydro cluster. You can find instructions for running a Hydro cluster in the `hydro-project/cluster` repo, [here](https://github.com/hydro-project/cluster/blob/master/docs/getting-started-aws.md). Once you have either of these modes set up, you are ready to run functions in Droplet. +You will either need to run Cloudburst in [local mode](local-mode.md) or run a Hydro cluster. You can find instructions for running a Hydro cluster in the `hydro-project/cluster` repo, [here](https://github.com/hydro-project/cluster/blob/master/docs/getting-started-aws.md). Once you have either of these modes set up, you are ready to run functions in Cloudburst. First, we'll create two new functions: ```python3 >>> local = True # or False if you are running against a HydroCluster >>> elb_address = '127.0.0.1 ' # or the address of the ELB returned by the ->>> from droplet.client.client import DropletConnection ->>> droplet = DropletConnection(AWS_FUNCTION_ELB, MY_IP) +>>> from cloudburst.client.client import CloudburstConnection +>>> cloudburst = CloudburstConnection(AWS_FUNCTION_ELB, MY_IP) >>> incr = lambda _, a: a + 1 ->>> cloud_incr = droplet.register(incr, 'incr') +>>> cloud_incr = cloudburst.register(incr, 'incr') >>> cloud_incr(1).get() 2 >>> square = lambda _, a: a * a ->>> cloud_square = droplet.register(square, 'square') +>>> cloud_square = cloudburst.register(square, 'square') >>> cloud_square(2).get() 4 ``` -Note that every function takes a first argument that is the [Droplet User Library](#DropletUserLibrary). We ignore that variable in these functions because we do not need it; the API is fully documented below. +Note that every function takes a first argument that is the [Cloudburst User Library](#CloudburstUserLibrary). We ignore that variable in these functions because we do not need it; the API is fully documented below. Now we'll chain those functions together and execute them at once: ```python3 # Create a DAG with two functions, incr and square, where incr comes before square. ->>> droplet.register_dag('test_dag', ['incr', 'square'], [('incr', 'square')]) +>>> cloudburst.register_dag('test_dag', ['incr', 'square'], [('incr', 'square')]) True # returns False if registration fails, e.g., if one of the referenced functions does not exist ->>> droplet.call_dag('test_dag', { 'incr': 1 }).get() +>>> cloudburst.call_dag('test_dag', { 'incr': 1 }).get() 4 ``` * All calls to functions and DAGs are by default asynchronous. Results are stored in the key-value store, and object IDs are returned. DAG calls can optionally specify synchronous calls by setting the `direct_response` argument to `True`. * DAGs can have arbitrary branches and connections and have multiple sources, but there must be only one sink function in the DAG. The result of this sink function is what is returned to the caller. -* For those familiar with the Anna KVS, all use of lattices is abstracted away from the Droplet user. The serialization and deserialization is done automatically by the runtime, and only Python values are passed into and out of all API functions. +* For those familiar with the Anna KVS, all use of lattices is abstracted away from the Cloudburst user. The serialization and deserialization is done automatically by the runtime, and only Python values are passed into and out of all API functions. ## Registering and Executing Classes @@ -58,7 +58,7 @@ cloud.register((Expensive, init_arg), 'expensive_class') 1 Note that the benefits of using a class will not work with one-shot function execution, as the class will be reinitialized for each request. -## Droplet User Library +## Cloudburst User Library | API Name | Functionality | |-----------|---------------| diff --git a/docs/local-mode.md b/docs/local-mode.md index 3ae2ba11..010746de 100644 --- a/docs/local-mode.md +++ b/docs/local-mode.md @@ -1,15 +1,15 @@ -# Running Droplet in Local Mode +# Running Cloudburst in Local Mode -In order to run Droplet, whether in local mode or in [cluster mode](https://github.com/hydro-project/cluster/blob/master/docs/getting-started-aws.md), the Droplet runtime requires a copy of the Anna KVS to be running. Both Droplet and Anna can be run with limited capabilities on a single machine. You can find documentation on running Anna in local mode [here](https://github.com/hydro-project/anna/blob/master/docs/local-mode.md). The rest of this document assumes that Anna is already running in local mode on your machine. +In order to run Cloudburst, whether in local mode or in [cluster mode](https://github.com/hydro-project/cluster/blob/master/docs/getting-started-aws.md), the Cloudburst runtime requires a copy of the Anna KVS to be running. Both Cloudburst and Anna can be run with limited capabilities on a single machine. You can find documentation on running Anna in local mode [here](https://github.com/hydro-project/anna/blob/master/docs/local-mode.md). The rest of this document assumes that Anna is already running in local mode on your machine. ## Prerequisites -Droplet currently only supports Python3. To install Python dependencies, simply run `pip install -r requiments.txt` from the Droplet source directory. +Cloudburst currently only supports Python3. To install Python dependencies, simply run `pip install -r requiments.txt` from the Cloudburst source directory. -Before running Droplet, we need to compile its Protobufs locally to generate the Python dependency files. `scripts/build.sh` automatically does this for you and installs them in the correct location, but it requires having the `protoc` tool installed. If you need to remove the locally compiled protobufs, you can run `bash scripts/clean.sh`. +Before running Cloudburst, we need to compile its Protobufs locally to generate the Python dependency files. `scripts/build.sh` automatically does this for you and installs them in the correct location, but it requires having the `protoc` tool installed. If you need to remove the locally compiled protobufs, you can run `bash scripts/clean.sh`. -Finally, Droplet requires access to the Anna Python client, which is in the Anna KVS repository. A default script to clone the Anna repository and install the client (the client is not currently `pip`-installable) can be found in `scripts/install-anna.sh`. You can customize the installation location by adding the `--prefix` flag to the `setup.py` command. +Finally, Cloudburst requires access to the Anna Python client, which is in the Anna KVS repository. A default script to clone the Anna repository and install the client (the client is not currently `pip`-installable) can be found in `scripts/install-anna.sh`. You can customize the installation location by adding the `--prefix` flag to the `setup.py` command. -## Running Droplet +## Running Cloudburst -Once all the protobufs have been compiled, `scripts/start-droplet-local.sh` will start a local Droplet server. You can stop these processes with `scripts/stop-droplet-local.sh`. For more information on how to interact with Droplet once it is running, see the [function execution docs](docs/function-execution.md). \ No newline at end of file +Once all the protobufs have been compiled, `scripts/start-cloudburst-local.sh` will start a local Cloudburst server. You can stop these processes with `scripts/stop-cloudburst-local.sh`. For more information on how to interact with Cloudburst once it is running, see the [function execution docs](docs/function-execution.md). diff --git a/requirements.txt b/requirements.txt index 563ccdba..b62b0317 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,6 +2,7 @@ cloudpickle==0.6.1 coverage==4.5.4 flake8==3.7.7 numpy==1.16.1 +pandas==0.25.1 protobuf==3.6.1 pyarrow==0.14.1 pycodestyle==2.5.0 diff --git a/scripts/build.sh b/scripts/build.sh index e3e8330e..8666ebe6 100755 --- a/scripts/build.sh +++ b/scripts/build.sh @@ -15,30 +15,30 @@ # limitations under the License. if [ -z "$(command -v protoc)" ]; then - echo "The protoc tool is required before you can run Droplet locally." + echo "The protoc tool is required before you can run Cloudburst locally." echo "Please install protoc manually, or use the scripts in" \ "hydro-project/common to install dependencies before proceeding." exit 1 fi -rm -rf droplet/shared/proto -mkdir droplet/shared/proto -touch droplet/shared/proto/__init__.py -protoc -I=common/proto --python_out=droplet/shared/proto droplet.proto shared.proto -protoc -I=common/proto --python_out=droplet/shared/proto anna.proto shared.proto causal.proto -protoc -I=proto --python_out=droplet/shared/proto internal.proto +rm -rf cloudburst/shared/proto +mkdir cloudburst/shared/proto +touch cloudburst/shared/proto/__init__.py +protoc -I=common/proto --python_out=cloudburst/shared/proto cloudburst.proto shared.proto +protoc -I=common/proto --python_out=cloudburst/shared/proto anna.proto shared.proto causal.proto +protoc -I=proto --python_out=cloudburst/shared/proto internal.proto # NOTE: This is a hack. We have to do this because the protobufs are not # packaged properly (in the protobuf definitions). This isn't an issue for C++ # builds, because all the header files are in one place, but it breaks our # Python imports. Consider how to fix this in the future. if [[ "$OSTYPE" = "darwin"* ]]; then - sed -i '' "s/import shared_pb2/from . import shared_pb2/g" $(find droplet/shared/proto | grep pb2 | grep -v pyc | grep -v internal) - sed -i '' "s/import anna_pb2/from . import anna_pb2/g" $(find droplet/shared/proto | grep pb2 | grep -v pyc | grep -v internal) - sed -i '' "s/import droplet_pb2/from . import droplet_pb2/g" $(find droplet/shared/proto | grep pb2 | grep -v pyc | grep -v internal) + sed -i '' "s/import shared_pb2/from . import shared_pb2/g" $(find cloudburst/shared/proto | grep pb2 | grep -v pyc | grep -v internal) + sed -i '' "s/import anna_pb2/from . import anna_pb2/g" $(find cloudburst/shared/proto | grep pb2 | grep -v pyc | grep -v internal) + sed -i '' "s/import cloudburst_pb2/from . import cloudburst_pb2/g" $(find cloudburst/shared/proto | grep pb2 | grep -v pyc | grep -v internal) else # We assume other linux distributions - sed -i "s|import shared_pb2|from . import shared_pb2|g" $(find droplet/shared/proto | grep pb2 | grep -v pyc | grep -v internal) - sed -i "s|import anna_pb2|from . import anna_pb2|g" $(find droplet/shared/proto | grep pb2 | grep -v pyc | grep -v internal) - sed -i "s|import droplet_pb2|from . import droplet_pb2|g" $(find droplet/shared/proto | grep pb2 | grep -v pyc | grep -v internal) + sed -i "s|import shared_pb2|from . import shared_pb2|g" $(find cloudburst/shared/proto | grep pb2 | grep -v pyc | grep -v internal) + sed -i "s|import anna_pb2|from . import anna_pb2|g" $(find cloudburst/shared/proto | grep pb2 | grep -v pyc | grep -v internal) + sed -i "s|import cloudburst_pb2|from . import cloudburst_pb2|g" $(find cloudburst/shared/proto | grep pb2 | grep -v pyc | grep -v internal) fi diff --git a/scripts/clean.sh b/scripts/clean.sh index 65523701..326ae886 100755 --- a/scripts/clean.sh +++ b/scripts/clean.sh @@ -14,5 +14,5 @@ # See the License for the specific language governing permissions and # limitations under the License. -rm -rf droplet/shared/proto +rm -rf cloudburst/shared/proto find . | grep __pycache__ | xargs rm -rf diff --git a/scripts/start-droplet-local.sh b/scripts/start-cloudburst-local.sh similarity index 87% rename from scripts/start-droplet-local.sh rename to scripts/start-cloudburst-local.sh index d3a7aeca..3490c6bd 100755 --- a/scripts/start-droplet-local.sh +++ b/scripts/start-cloudburst-local.sh @@ -30,9 +30,9 @@ fi # attempting to execute the tests. export PYTHONPATH=$PYTHONPATH:$(pwd) -python3 droplet/server/scheduler/server.py conf/droplet-local.yml & +python3 cloudburst/server/scheduler/server.py conf/cloudburst-local.yml & SPID=$! -python3 droplet/server/executor/server.py conf/droplet-local.yml & +python3 cloudburst/server/executor/server.py conf/cloudburst-local.yml & EPID=$! echo $SPID > pids diff --git a/scripts/stop-droplet-local.sh b/scripts/stop-cloudburst-local.sh similarity index 100% rename from scripts/stop-droplet-local.sh rename to scripts/stop-cloudburst-local.sh diff --git a/scripts/travis/docker-build.sh b/scripts/travis/docker-build.sh index 8e6a39cc..540cb48d 100755 --- a/scripts/travis/docker-build.sh +++ b/scripts/travis/docker-build.sh @@ -18,12 +18,12 @@ # for PR builds, because we don't want to update the docker image. if [[ "$TRAVIS_BRANCH" = "master" ]] && [[ "$TRAVIS_PULL_REQUEST" = "false" ]]; then docker pull hydroproject/base - docker pull hydroproject/droplet + docker pull hydroproject/cloudburst cd dockerfiles - docker build . -f droplet.dockerfile -t hydroproject/droplet + docker build . -f cloudburst.dockerfile -t hydroproject/cloudburst echo "$DOCKER_PASSWORD" | docker login -u "$DOCKER_USERNAME" --password-stdin - docker push hydroproject/droplet + docker push hydroproject/cloudburst fi diff --git a/setup.py b/setup.py index 00111d37..c2221961 100644 --- a/setup.py +++ b/setup.py @@ -36,15 +36,15 @@ def compile_proto(self): def cleanup(self): os.system('./scripts/clean.sh') - os.system('rm -rf Droplet.egg-info') + os.system('rm -rf Cloudburst.egg-info') setup( - name='Droplet', + name='Cloudburst', version='0.1.0', packages=find_packages(), license='Apache v2', - long_description='The Droplet Client and Server', + long_description='The Cloudburst Client and Server', install_requires=['zmq', 'protobuf', 'anna'], cmdclass={'install': InstallWrapper} ) diff --git a/tests/__main__.py b/tests/__main__.py index 03915ccd..6e16a8ee 100644 --- a/tests/__main__.py +++ b/tests/__main__.py @@ -28,37 +28,37 @@ from tests.shared import test_serializer -def droplet_test_suite(): - droplet_tests = [] +def cloudburst_test_suite(): + cloudburst_tests = [] loader = unittest.TestLoader() - # Load Droplet Executor tests - droplet_tests.append( + # Load Cloudburst Executor tests + cloudburst_tests.append( loader.loadTestsFromTestCase(test_executor_call.TestExecutorCall)) - droplet_tests.append( + cloudburst_tests.append( loader.loadTestsFromTestCase(test_pin.TestExecutorPin)) - droplet_tests.append( + cloudburst_tests.append( loader.loadTestsFromTestCase(test_user_library.TestUserLibrary)) - # Load Droplet Scheduler tests - droplet_tests.append( + # Load Cloudburst Scheduler tests + cloudburst_tests.append( loader.loadTestsFromTestCase(test_scheduler_call.TestSchedulerCall)) - droplet_tests.append( + cloudburst_tests.append( loader.loadTestsFromTestCase(test_create.TestSchedulerCreate)) - droplet_tests.append( + cloudburst_tests.append( loader.loadTestsFromTestCase( test_default_policy.TestDefaultSchedulerPolicy)) # Load miscellaneous tests - droplet_tests.append(loader.loadTestsFromTestCase( + cloudburst_tests.append(loader.loadTestsFromTestCase( test_serializer.TestSerializer)) - return unittest.TestSuite(droplet_tests) + return unittest.TestSuite(cloudburst_tests) if __name__ == '__main__': runner = unittest.TextTestRunner() - result = runner.run(droplet_test_suite()) + result = runner.run(cloudburst_test_suite()) if not result.wasSuccessful(): sys.exit(1) diff --git a/tests/mock/kvs_client.py b/tests/mock/kvs_client.py index 5ac58d8b..d39823ae 100644 --- a/tests/mock/kvs_client.py +++ b/tests/mock/kvs_client.py @@ -14,9 +14,9 @@ from anna.base_client import BaseAnnaClient -from droplet.server.utils import DEFAULT_VC -from droplet.shared.proto.shared_pb2 import KeyVersion -from droplet.shared.proto.droplet_pb2 import NORMAL +from cloudburst.server.utils import DEFAULT_VC +from cloudburst.shared.proto.shared_pb2 import KeyVersion +from cloudburst.shared.proto.cloudburst_pb2 import NORMAL class MockAnnaClient(BaseAnnaClient): diff --git a/tests/server/executor/test_call.py b/tests/server/executor/test_call.py index 1eb09527..a257df44 100644 --- a/tests/server/executor/test_call.py +++ b/tests/server/executor/test_call.py @@ -23,19 +23,19 @@ VectorClock ) -from droplet.server.executor.call import exec_function, exec_dag_function -from droplet.server.executor.user_library import DropletUserLibrary -from droplet.server.utils import DEFAULT_VC -from droplet.shared.proto.droplet_pb2 import ( +from cloudburst.server.executor.call import exec_function, exec_dag_function +from cloudburst.server.executor.user_library import CloudburstUserLibrary +from cloudburst.server.utils import DEFAULT_VC +from cloudburst.shared.proto.cloudburst_pb2 import ( DagSchedule, DagTrigger, FunctionCall, GenericResponse, - NORMAL, MULTI, # Droplet's supported consistency modes - FUNC_NOT_FOUND, EXECUTION_ERROR # Droplet's error types + NORMAL, MULTI, # Cloudburst's supported consistency modes + FUNC_NOT_FOUND, EXECUTION_ERROR # Cloudburst's error types ) -from droplet.shared.reference import DropletReference -from droplet.shared.serializer import Serializer +from cloudburst.shared.reference import CloudburstReference +from cloudburst.shared.serializer import Serializer from tests.mock import kvs_client, zmq_utils from tests.server.utils import ( create_function, @@ -61,7 +61,7 @@ def setUp(self): self.kvs_client = kvs_client.MockAnnaClient() self.socket = zmq_utils.MockZmqSocket() self.pusher_cache = zmq_utils.MockPusherCache() - self.user_library = DropletUserLibrary(zmq_utils.MockZmqContext(), + self.user_library = CloudburstUserLibrary(zmq_utils.MockZmqContext(), self.pusher_cache, self.ip, 0, self.kvs_client) @@ -214,7 +214,7 @@ def func(_, x): return x * x # Create and serialize the function call. call = self._create_function_call( - fname, [DropletReference(arg_name, True)], NORMAL) + fname, [CloudburstReference(arg_name, True)], NORMAL) self.socket.inbox.append(call.SerializeToString()) # Execute the function call. @@ -251,7 +251,7 @@ def func(_, x): return x * x # Create and serialize the function call. call = self._create_function_call( - fname, [DropletReference(arg_name, True)], MULTI) + fname, [CloudburstReference(arg_name, True)], MULTI) self.socket.inbox.append(call.SerializeToString()) # Execute the function call. @@ -288,7 +288,7 @@ def func(_, x): return sum(x) # Put the function into the KVS and create a function call. create_function(func, self.kvs_client, fname) call = self._create_function_call( - fname, [DropletReference(arg_name, True)], NORMAL) + fname, [CloudburstReference(arg_name, True)], NORMAL) self.socket.inbox.append(call.SerializeToString()) # Execute the function call. @@ -321,7 +321,7 @@ def func(_, x): return len(x) >= 2 and x[0] < x[1] # Put the function into the KVS and create a function call. create_function(func, self.kvs_client, fname) call = self._create_function_call( - fname, [DropletReference(arg_name, True)], NORMAL) + fname, [CloudburstReference(arg_name, True)], NORMAL) self.socket.inbox.append(call.SerializeToString()) # Execute the function call. @@ -347,10 +347,10 @@ def test_exec_class_function(self): ''' # Create the function and put it into the KVS. class Test: - def __init__(self, droplet, num): + def __init__(self, cloudburst, num): self.num = num - def run(self, droplet, inp): + def run(self, cloudburst, inp): return inp + self.num fname = 'class' @@ -552,7 +552,7 @@ def square(_, x): return x * x dag = create_linear_dag([incr, square], [iname, sname], self.kvs_client, 'dag', MultiKeyCausalLattice) schedule, triggers = self._create_fn_schedule( - dag, DropletReference(arg_name, True), iname, [iname, sname], + dag, CloudburstReference(arg_name, True), iname, [iname, sname], MULTI) exec_dag_function(self.pusher_cache, self.kvs_client, triggers, incr, diff --git a/tests/server/executor/test_pin.py b/tests/server/executor/test_pin.py index 31cb1030..40e04d7a 100644 --- a/tests/server/executor/test_pin.py +++ b/tests/server/executor/test_pin.py @@ -14,10 +14,10 @@ import unittest -from droplet.server.executor.pin import pin, unpin -from droplet.server.executor.user_library import DropletUserLibrary -from droplet.shared.proto.droplet_pb2 import GenericResponse -from droplet.shared.proto.internal_pb2 import ThreadStatus +from cloudburst.server.executor.pin import pin, unpin +from cloudburst.server.executor.user_library import CloudburstUserLibrary +from cloudburst.shared.proto.cloudburst_pb2 import GenericResponse +from cloudburst.shared.proto.internal_pb2 import ThreadStatus from tests.mock import kvs_client, zmq_utils from tests.server.utils import create_function @@ -45,9 +45,9 @@ def setUp(self): self.runtimes = {} self.exec_counts = {} - self.user_library = DropletUserLibrary(zmq_utils.MockZmqContext(), - self.pusher_cache, self.ip, 0, - self.kvs_client) + self.user_library = CloudburstUserLibrary(zmq_utils.MockZmqContext(), + self.pusher_cache, self.ip, 0, + self.kvs_client) def test_succesful_pin(self): ''' diff --git a/tests/server/executor/test_user_library.py b/tests/server/executor/test_user_library.py index ead92d4f..a0237ba1 100644 --- a/tests/server/executor/test_user_library.py +++ b/tests/server/executor/test_user_library.py @@ -14,8 +14,8 @@ import unittest -from droplet.server.executor.user_library import DropletUserLibrary -from droplet.shared.serializer import Serializer +from cloudburst.server.executor.user_library import CloudburstUserLibrary +from cloudburst.shared.serializer import Serializer from tests.mock.kvs_client import MockAnnaClient from tests.mock.zmq_utils import MockPusherCache, MockZmqContext @@ -36,7 +36,7 @@ def setUp(self): self.ip = '127.0.0.1' self.kvs_client = MockAnnaClient() - self.user_library = DropletUserLibrary(self.context, self.pusher_cache, + self.user_library = CloudburstUserLibrary(self.context, self.pusher_cache, self.ip, 0, self.kvs_client) def test_kvs_io(self): diff --git a/tests/server/scheduler/policy/test_default_policy.py b/tests/server/scheduler/policy/test_default_policy.py index d26cdcc0..7004fc2f 100644 --- a/tests/server/scheduler/policy/test_default_policy.py +++ b/tests/server/scheduler/policy/test_default_policy.py @@ -18,14 +18,14 @@ from anna.lattices import LWWPairLattice -from droplet.server.scheduler.policy.default_policy import ( - DefaultDropletSchedulerPolicy +from cloudburst.server.scheduler.policy.default_policy import ( + DefaultCloudburstSchedulerPolicy ) -from droplet.server.scheduler.utils import get_cache_ip_key -import droplet.server.utils as sutils -from droplet.shared.proto.internal_pb2 import ThreadStatus, SchedulerStatus -from droplet.shared.proto.shared_pb2 import StringSet -from droplet.shared.serializer import Serializer +from cloudburst.server.scheduler.utils import get_cache_ip_key +import cloudburst.server.utils as sutils +from cloudburst.shared.proto.internal_pb2 import ThreadStatus, SchedulerStatus +from cloudburst.shared.proto.shared_pb2 import StringSet +from cloudburst.shared.serializer import Serializer from tests.mock import kvs_client, zmq_utils serializer = Serializer() @@ -48,10 +48,10 @@ def setUp(self): self.kvs_client = kvs_client.MockAnnaClient() self.ip = '127.0.0.1' - self.policy = DefaultDropletSchedulerPolicy(self.pin_socket, - self.pusher_cache, - self.kvs_client, self.ip, - random_threshold=0) + self.policy = DefaultCloudburstSchedulerPolicy(self.pin_socket, + self.pusher_cache, + self.kvs_client, self.ip, + random_threshold=0) def tearDown(self): # Clear all policy metadata. diff --git a/tests/server/scheduler/test_call.py b/tests/server/scheduler/test_call.py index 98cfd1af..cf1ab9d0 100644 --- a/tests/server/scheduler/test_call.py +++ b/tests/server/scheduler/test_call.py @@ -14,25 +14,25 @@ import unittest -from droplet.server.scheduler.call import call_function, call_dag -from droplet.server.scheduler.policy.default_policy import ( - DefaultDropletSchedulerPolicy +from cloudburst.server.scheduler.call import call_function, call_dag +from cloudburst.server.scheduler.policy.default_policy import ( + DefaultCloudburstSchedulerPolicy ) -from droplet.server.scheduler import utils -from droplet.server import utils as sutils -from droplet.shared.proto.droplet_pb2 import ( +from cloudburst.server.scheduler import utils +from cloudburst.server import utils as sutils +from cloudburst.shared.proto.cloudburst_pb2 import ( Dag, DagCall, DagSchedule, DagTrigger, FunctionCall, GenericResponse, - NO_RESOURCES, # Droplet's error types - NORMAL # Droplet's consistency modes + NO_RESOURCES, # Cloudburst's error types + NORMAL # Cloudburst's consistency modes ) -from droplet.shared.proto.internal_pb2 import ThreadStatus -from droplet.shared.reference import DropletReference -from droplet.shared.serializer import Serializer +from cloudburst.shared.proto.internal_pb2 import ThreadStatus +from cloudburst.shared.reference import CloudburstReference +from cloudburst.shared.serializer import Serializer from tests.mock import kvs_client, zmq_utils serializer = Serializer() @@ -55,7 +55,7 @@ def setUp(self): self.kvs_client = kvs_client.MockAnnaClient() self.ip = '127.0.0.1' - self.policy = DefaultDropletSchedulerPolicy(self.pin_socket, + self.policy = DefaultCloudburstSchedulerPolicy(self.pin_socket, self.pusher_cache, self.kvs_client, self.ip, random_threshold=0) @@ -129,7 +129,7 @@ def test_call_function_with_refs(self): call.name = 'function' call.request_id = 12 val = call.arguments.values.add() - serializer.dump(DropletReference(ref_name, True), val) + serializer.dump(CloudburstReference(ref_name, True), val) self.socket.inbox.append(call.SerializeToString(0)) # Execute the scheduling policy. diff --git a/tests/server/scheduler/test_create.py b/tests/server/scheduler/test_create.py index c866ad39..3f5cc209 100644 --- a/tests/server/scheduler/test_create.py +++ b/tests/server/scheduler/test_create.py @@ -17,24 +17,24 @@ from anna.lattices import LWWPairLattice, SingleKeyCausalLattice -from droplet.server.scheduler.create import ( +from cloudburst.server.scheduler.create import ( create_dag, create_function, delete_dag ) -from droplet.server.scheduler.policy.default_policy import ( - DefaultDropletSchedulerPolicy +from cloudburst.server.scheduler.policy.default_policy import ( + DefaultCloudburstSchedulerPolicy ) -from droplet.server.scheduler.utils import get_pin_address, get_unpin_address -import droplet.server.utils as sutils -from droplet.shared.proto.droplet_pb2 import ( +from cloudburst.server.scheduler.utils import get_pin_address, get_unpin_address +import cloudburst.server.utils as sutils +from cloudburst.shared.proto.cloudburst_pb2 import ( Dag, Function, GenericResponse, - NORMAL, MULTI, # Droplet's consistency modes - DAG_ALREADY_EXISTS, NO_RESOURCES, NO_SUCH_DAG # Droplet's error modes + NORMAL, MULTI, # Cloudburst's consistency modes + DAG_ALREADY_EXISTS, NO_RESOURCES, NO_SUCH_DAG # Cloudburst's error modes ) -from droplet.shared.serializer import Serializer +from cloudburst.shared.serializer import Serializer from tests.mock import kvs_client, zmq_utils from tests.server.utils import create_linear_dag @@ -58,7 +58,7 @@ def setUp(self): self.kvs_client = kvs_client.MockAnnaClient() self.ip = '127.0.0.1' - self.policy = DefaultDropletSchedulerPolicy(self.pin_socket, + self.policy = DefaultCloudburstSchedulerPolicy(self.pin_socket, self.pusher_cache, self.kvs_client, self.ip, random_threshold=0) diff --git a/tests/server/utils.py b/tests/server/utils.py index 52575df3..6085fd70 100644 --- a/tests/server/utils.py +++ b/tests/server/utils.py @@ -14,9 +14,9 @@ from anna.lattices import LWWPairLattice -from droplet.server.utils import get_func_kvs_name -from droplet.shared.proto.droplet_pb2 import Dag -from droplet.shared.serializer import Serializer +from cloudburst.server.utils import get_func_kvs_name +from cloudburst.shared.proto.cloudburst_pb2 import Dag +from cloudburst.shared.serializer import Serializer serializer = Serializer() diff --git a/tests/shared/test_serializer.py b/tests/shared/test_serializer.py index 109a0483..1f9ebd84 100644 --- a/tests/shared/test_serializer.py +++ b/tests/shared/test_serializer.py @@ -17,13 +17,13 @@ from anna.lattices import MapLattice import numpy as np -from droplet.shared.proto.droplet_pb2 import ( +from cloudburst.shared.proto.cloudburst_pb2 import ( Value, - DEFAULT, NUMPY # Droplet's serializer types + DEFAULT, NUMPY # Cloudburst's serializer types ) -from droplet.shared.future import DropletFuture -from droplet.shared.reference import DropletReference -from droplet.shared.serializer import Serializer +from cloudburst.shared.future import CloudburstFuture +from cloudburst.shared.reference import CloudburstReference +from cloudburst.shared.serializer import Serializer from tests.mock.kvs_client import MockAnnaClient @@ -81,11 +81,11 @@ def test_serialize_to_bytes(self): def test_serialize_future(self): ''' Tests that the serializer correctly detects and converts a - DropletFuture to a DropletReference. + CloudburstFuture to a CloudburstReference. ''' kvs_client = MockAnnaClient() - future = DropletFuture('id', kvs_client, self.serializer) + future = CloudburstFuture('id', kvs_client, self.serializer) serialized = self.serializer.dump(future, serialize=False) @@ -93,7 +93,7 @@ def test_serialize_future(self): self.assertEqual(serialized.type, DEFAULT) reference = self.serializer.load(serialized) - self.assertEqual(type(reference), DropletReference) + self.assertEqual(type(reference), CloudburstReference) self.assertEqual(future.obj_id, reference.key) def test_serializer_map_lattice(self):