diff --git a/README.md b/README.md index eb20088..0d37cc3 100644 --- a/README.md +++ b/README.md @@ -1 +1,83 @@ -# CloudPITS \ No newline at end of file +# CloudPITS / Instance Selection + +This is the initial idea behind Cloud-PITS, which involves selecting a group of instances that provides a price per +cost ratio that will execute a given SPITS code in less than an user-defined input amount. +Since this is a prototype it still has a lot of limitations and is not ready for general usage. However the general +idea behind the algorithm is here. + + #### Disclaimer + + This is still a proof of concept. The SPITS program used for validation computes the Zero Offset Non-hyperbolic + Common Reflection Surface (DOI:10.1111/j.1365-2478.2012.01055.x) parameters, therefore there are some parameters that + are currenlty tailored for that specific software. This is not yet generic for any SPITS program! + + #### How it works + + The instance selection algorithm works with a stored database containing previous executions performance + measurements. By using previous executions information for the same experiment it is possible to infer how the + current experiment will perform, therefore allows us to create a good initial instance poll. Additionally, by + verifying periodically how the instances are performing and how expensive they are at a given moment, it is possible + to select the types that offer better performance for their money. + + The three main files are the following (with their input/output described in their own headers): + + launch/create_ondemand_jm.py + populate/simulation.py + populate/to_execute.py + + The first one (launch/create_ondemand_zocrsomp_jm.py) is used to launch an On-Demand instance that should work as + the master instance (Job Manager and instance selector). Therefore it is necessary that in some part of the + user_data variable inserting the code for calling the "to_execute.py" code to call this instance selection algorithm + and start the Job Manager process. + + The second one simulates an execution, given previous the information stored in the database and the price log + stored in a file (populate/log_prices.csv). This allows the user to simulate how much would be spent for each input + price. + + The last one is the actual Python script to be executed, in which the script will start a new poll of instances and + then verify from time to time for instances that are performing below the desired cost vs performance threshold. + Replacing bad performing types with better ones. + + #### Database + + The algorithm extracts information from a database as configured in the "rds_config.py" file. The database should be + called "experimentos" and have the tables as displayed in the picture below. There is a SQL script to generate it + in the folder "databases/". + ![](database/experimentos_db.png) + + #### Performance measurement report + + The worker instances must report their performance to the Job Manager via CloudWatch. To do so, the user needs to + create a new metric with the following characteristics: + + Namespace='Performance', + MetricName='perf_sec', + Dimensions=[{'Name': 'Instance Id', 'Value': instance_id}, + {'Name': 'Type', 'Value': instance_type}] + + Namespace='Performance', + MetricName='perf_sec_stdev', + Dimensions=[{'Name': 'Instance Id', 'Value': instance_id}, + {'Name': 'Type', 'Value': instance_type}] + + Namespace='Performance', + MetricName='tasks_completed', + Dimensions=[{'Name': 'Instance Id', 'Value': instance_id}, + {'Name': 'Type', 'Value': instance_type}] + + This report also goes to the database aforementioned so that future executions use them. Also, they are mandatory so + that the instance selection Python script can select the initial poll of instances for the SPITS program being + optimized. + + #### Dependencies + + The program have a few dependencies. They can be installed using the python pip command: + + python3 -m pip install --user -r requirements.txt + + Most Python packages already come with the some dependencies installed, however, two of them are not included, namely: + + boto3: To access the AWS instances + pymysql: To enable Python to access a MySQL database and perform queries + + Further information about all files are in their own documentations. diff --git a/database/experimentos_db.png b/database/experimentos_db.png new file mode 100644 index 0000000..8d5f524 Binary files /dev/null and b/database/experimentos_db.png differ diff --git a/database/experimentos_db.sql b/database/experimentos_db.sql new file mode 100644 index 0000000..6f1af4b --- /dev/null +++ b/database/experimentos_db.sql @@ -0,0 +1,147 @@ +-- The MIT License (MIT) +-- +-- Copyright (c) 2019 Nicholas Torres Okita +-- +-- Permission is hereby granted, free of charge, to any person obtaining a copy +-- of this software and associated documentation files (the "Software"), to +-- deal in the Software without restriction, including without limitation the +-- rights to use, copy, modify, merge, publish, distribute, sublicense, +-- and/or sell copies of the Software, and to permit persons to whom the +-- Software is furnished to do so, subject to the following conditions: +-- +-- The above copyright notice and this permission notice shall be included in +-- all copies or substantial portions of the Software. +-- +-- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +-- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +-- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +-- THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +-- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +-- FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +-- IN THE SOFTWARE. +-- +-- MySQL script to create the database for performance reports + +SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0; +SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0; +SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION'; + +-- ----------------------------------------------------- +-- Schema experimentos +-- ----------------------------------------------------- + +-- ----------------------------------------------------- +-- Schema experimentos +-- ----------------------------------------------------- +CREATE SCHEMA IF NOT EXISTS `experimentos` DEFAULT CHARACTER SET utf8 ; +USE `experimentos` ; + +-- ----------------------------------------------------- +-- Table `experimentos`.`data` +-- ----------------------------------------------------- +CREATE TABLE IF NOT EXISTS `experimentos`.`data` ( + `iddata` INT(11) NOT NULL AUTO_INCREMENT, + `name` VARCHAR(45) NULL DEFAULT NULL, + `hash` VARCHAR(45) NULL DEFAULT NULL, + PRIMARY KEY (`iddata`)) +ENGINE = InnoDB +AUTO_INCREMENT = 13 +DEFAULT CHARACTER SET = utf8; + + +-- ----------------------------------------------------- +-- Table `experimentos`.`instance` +-- ----------------------------------------------------- +CREATE TABLE IF NOT EXISTS `experimentos`.`instance` ( + `name` VARCHAR(45) NOT NULL, + PRIMARY KEY (`name`)) +ENGINE = InnoDB +DEFAULT CHARACTER SET = utf8; + + +-- ----------------------------------------------------- +-- Table `experimentos`.`parameters` +-- ----------------------------------------------------- +CREATE TABLE IF NOT EXISTS `experimentos`.`parameters` ( + `idparameters` INT(11) NOT NULL AUTO_INCREMENT, + `aph` FLOAT NULL DEFAULT NULL, + `apm` FLOAT NULL DEFAULT NULL, + `window` DOUBLE NULL DEFAULT NULL, + `np` FLOAT NULL DEFAULT NULL, + `gens` FLOAT NULL DEFAULT NULL, + PRIMARY KEY (`idparameters`)) +ENGINE = InnoDB +AUTO_INCREMENT = 49 +DEFAULT CHARACTER SET = utf8; + + +-- ----------------------------------------------------- +-- Table `experimentos`.`experiment` +-- ----------------------------------------------------- +CREATE TABLE IF NOT EXISTS `experimentos`.`experiment` ( + `idperformance` INT(11) NOT NULL AUTO_INCREMENT, + `data_iddata` INT(11) NOT NULL, + `parameters_idparameters` INT(11) NOT NULL, + `instance_name` VARCHAR(45) NOT NULL, + PRIMARY KEY (`idperformance`), + INDEX `fk_performance_data1_idx` (`data_iddata` ASC) VISIBLE, + INDEX `fk_performance_parameters1_idx` (`parameters_idparameters` ASC) VISIBLE, + INDEX `fk_performance_instance1_idx` (`instance_name` ASC) VISIBLE, + CONSTRAINT `fk_performance_data1` + FOREIGN KEY (`data_iddata`) + REFERENCES `experimentos`.`data` (`iddata`), + CONSTRAINT `fk_performance_instance1` + FOREIGN KEY (`instance_name`) + REFERENCES `experimentos`.`instance` (`name`), + CONSTRAINT `fk_performance_parameters1` + FOREIGN KEY (`parameters_idparameters`) + REFERENCES `experimentos`.`parameters` (`idparameters`)) +ENGINE = InnoDB +AUTO_INCREMENT = 5731 +DEFAULT CHARACTER SET = utf8; + + +-- ----------------------------------------------------- +-- Table `experimentos`.`interpols` +-- ----------------------------------------------------- +CREATE TABLE IF NOT EXISTS `experimentos`.`interpols` ( + `idinterpols` INT(11) NOT NULL AUTO_INCREMENT, + `parameters_idparameters` INT(11) NOT NULL, + `data_iddata` INT(11) NOT NULL, + `interpols` FLOAT NULL DEFAULT NULL, + `datetime` DATETIME NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (`idinterpols`), + INDEX `fk_interpols_parameters1_idx` (`parameters_idparameters` ASC) VISIBLE, + INDEX `fk_interpols_data1_idx` (`data_iddata` ASC) VISIBLE, + CONSTRAINT `fk_interpols_data1` + FOREIGN KEY (`data_iddata`) + REFERENCES `experimentos`.`data` (`iddata`), + CONSTRAINT `fk_interpols_parameters1` + FOREIGN KEY (`parameters_idparameters`) + REFERENCES `experimentos`.`parameters` (`idparameters`)) +ENGINE = InnoDB +AUTO_INCREMENT = 71 +DEFAULT CHARACTER SET = utf8; + + +-- ----------------------------------------------------- +-- Table `experimentos`.`interpsec` +-- ----------------------------------------------------- +CREATE TABLE IF NOT EXISTS `experimentos`.`interpsec` ( + `idinterpsec` INT(11) NOT NULL AUTO_INCREMENT, + `interpsec` FLOAT NULL DEFAULT NULL, + `experiment_idperformance` INT(11) NOT NULL, + `creationDate` DATETIME NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (`idinterpsec`), + INDEX `fk_interpsec_experiment1_idx` (`experiment_idperformance` ASC) VISIBLE, + CONSTRAINT `fk_interpsec_experiment1` + FOREIGN KEY (`experiment_idperformance`) + REFERENCES `experimentos`.`experiment` (`idperformance`)) +ENGINE = InnoDB +AUTO_INCREMENT = 45971 +DEFAULT CHARACTER SET = utf8; + + +SET SQL_MODE=@OLD_SQL_MODE; +SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS; +SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS; diff --git a/launch/create_ondemand_jm.py b/launch/create_ondemand_jm.py new file mode 100755 index 0000000..9a0011a --- /dev/null +++ b/launch/create_ondemand_jm.py @@ -0,0 +1,126 @@ +#!/usr/bin/python + +# The MIT License (MIT) +# +# Copyright (c) 2018-2019 Nicholas Torres Okita +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, +# and/or sell copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. + +# This function creates a Job Manager instance + +import boto3 +import logging +import sys +from datetime import datetime + +# function getLogger +# +# \param name: Name of the logger +# \return Logger object +# +# This function simply creates a new logger object to output information +def getLogger(name): + now = datetime.now() + #Logging configuration + logger = logging.getLogger(name) + logger.setLevel(logging.INFO) + #Log formatter + formatter = logging.Formatter("[%(asctime)s] %(levelname)-8s %(message)s") + #Log File handler + handler = logging.FileHandler("create_spot.log") + handler.setLevel(logging.INFO) + handler.setFormatter(formatter) + logger.addHandler(handler) + #Screen handler + screenHandler = logging.StreamHandler(stream=sys.stdout) + screenHandler.setLevel(logging.INFO) + screenHandler.setFormatter(formatter) + logger.addHandler(screenHandler) + return logger + +# function main +# \param (command line input) instance type: name of instance type to be created +# +# This function is the one responsible for creating the instance. The user needs +# to complete information regarding the user_data (script to be run when the +# instance starts), and other parameters for the instance creation, such as +# ImageId and SecurityGroupId (the others are document through out the code). +def main(): + # Variable containing the user script to initialize and execute the program + user_data = """#!/bin/bash""" + + if len(sys.argv) <= 1: + logger.error('Please insert the instance type') + return + + logger.info('Starting the instance deployment from the template "'+sys.argv[1]+'"') + instance_type_in = sys.argv[1] + + ec2 = boto3.resource('ec2',region_name='us-east-1') + instances = ec2.create_instances( + InstanceType=instance_type_in, + ImageId='', # Image AMI id + InstanceInitiatedShutdownBehavior='terminate', + SecurityGroupIds=[''], # Security group ID to create instances + SubnetId='', # Subnet Id + UserData=user_data, + MaxCount=1, + MinCount=1, + KeyName='', # Instance key name + Monitoring={ + 'Enabled': True + }, + BlockDeviceMappings=[ # This is the root disk, can be + # reconfigured to what is more + # convenient + { + 'DeviceName': '/dev/sda1', + 'VirtualName': 'eth0', + 'Ebs': { + 'DeleteOnTermination': True, + 'VolumeSize': 20, + 'VolumeType': 'io1', + 'Iops': 1000 + }, + 'NoDevice':'' + }, + ] + ) + + # Get the instance id from the created instance + instance_id = instances[0].id + logger.info('Instance deployed! Instance id: '+instance_id) + + # Set tags, user can edit to add more tags or rename them + result = ec2.create_tags(Resources=[instance_id], + Tags=[ + { + 'Key': 'Type', + 'Value': 'jobmanager-ondemand' + }, + { + 'Key': 'Name', + 'Value': 'JobManager' + } + ] + ) + +if __name__ == '__main__': + logger = getLogger(__name__) + main() diff --git a/main/instance_operations.py b/main/instance_operations.py new file mode 100644 index 0000000..4dcefe5 --- /dev/null +++ b/main/instance_operations.py @@ -0,0 +1,318 @@ +#!/usr/bin/env python + +# The MIT License (MIT) +# +# Copyright (c) 2018-2019 Nicholas Torres Okita +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, +# and/or sell copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. + +# This file contains functions that realize operations in the AWS instances + +import logging +import sys +import time +import boto3 +import base64 +from datetime import datetime + +# function getLogger +# +# \param name: Name of the logger +# \return Logger object +# +# This function simply creates a new logger object to output information +def getLogger(name): + now = datetime.now() + # Logging configuration + logger = logging.getLogger(name) + logger.setLevel(logging.INFO) + # Log formatter + formatter = logging.Formatter("[%(asctime)s] %(levelname)-8s %(message)s") + # Log File handler + handler = logging.FileHandler("jm_handler.log") + handler.setLevel(logging.INFO) + handler.setFormatter(formatter) + logger.addHandler(handler) + # Screen handler + screenHandler = logging.StreamHandler(stream=sys.stdout) + screenHandler.setLevel(logging.INFO) + screenHandler.setFormatter(formatter) + logger.addHandler(screenHandler) + return logger + +class instance_operations: + + # function __init__ + # \param logger : logger to output information + # Initialize the class ec2 client and resource + def __init__(self, logger): + self.logger = logger + self.ec2 = boto3.client('ec2', region_name='us-east-1') + self.ec2res = boto3.resource('ec2', region_name='us-east-1') + + # function get_current_spot_price_allaz + # \param instance_type : string containing the instance type + # Gets current spot price for an input instance type in all availability zones + def get_current_spot_price_allaz(self, instance_type): + dict = {} + history = self.ec2.describe_spot_price_history( + StartTime=datetime.now().isoformat(), + EndTime=datetime.now().isoformat(), + ProductDescriptions=['Linux/UNIX'], + InstanceTypes=[instance_type]) + + for h in history['SpotPriceHistory']: + dict[h['AvailabilityZone']] = float(h['SpotPrice']) + + return dict + + # function get_current_spot_price_allaz + # \param instance_type : string containing the instance type + # \param az : availability zone + # Gets current spot price for an input instance type in an input availability zones + def get_current_spot_price(self, instance_type, az): + history = self.ec2.describe_spot_price_history( + StartTime=datetime.now().isoformat(), + EndTime=datetime.now().isoformat(), + ProductDescriptions=['Linux/UNIX'], + InstanceTypes=[instance_type], + AvailabilityZone=az) + + price = -1 + if (len(history['SpotPriceHistory']) > 0): + price = float(history['SpotPriceHistory'][0]['SpotPrice']) + + return price + + # function terminateInstance + # \param instanceid: string containing the instance id to be terminated + # Terminates instance described by instanceid + def terminateInstance(self, instanceid): + inst = self.ec2res.Instance(instanceid) + inst.terminate() + + # function createSpotInstance + # \param instance_type_in: string containing the instance type + # \param az : availability zone + # \param price : instance price + # Creates an Spot instance of type instance_type_in, in availability zone az + # and priced at max price. + def createSpotInstance(self, instance_type_in, az, price): + # To be completed by user defined subnets + subnets = { + 'us-east-1a': '', + 'us-east-1b': '', + 'us-east-1c': '', + 'us-east-1d': '', + 'us-east-1e': '', + 'us-east-1f': '' + } + + bestZone = ['', sys.float_info.max] + + zones = self.ec2.describe_availability_zones() + zoneNames = [] + for zone in zones['AvailabilityZones']: + if zone['State'] == 'available': + zoneNames.append(zone['ZoneName']) + + PRICE = str(price * 1.2) + INSTANCE = instance_type_in + + bestZone[0] = az + bestZone[1] = price + now_str = "[" + datetime.now().isoformat() + "] " + self.logger.info(INSTANCE + " " + bestZone[0] + " " + PRICE) + + # Variable containing the user script to initialize and execute the program + user_data = """#!/bin/bash + """ + + instance_type = INSTANCE + response = self.ec2.request_spot_instances( + SpotPrice=PRICE, + InstanceCount=1, + LaunchSpecification={ + 'InstanceType': instance_type, + 'ImageId': '', # Image AMI id + 'SecurityGroupIds': [''], # Security group ID to create instances + 'SubnetId': subnets[bestZone[0]], + 'UserData': (base64.b64encode(user_data.encode())).decode(), + 'KeyName': '', # Instance key name + 'Monitoring': { + 'Enabled': True + }, + 'Placement': { + 'AvailabilityZone': bestZone[0] + }, + 'BlockDeviceMappings': [ # This is the root disk, can be + # reconfigured to what is more + # convenient + { + 'DeviceName': '/dev/sda1', + 'VirtualName': 'eth0', + 'Ebs': { + 'DeleteOnTermination': True, + 'VolumeSize': 20, + 'VolumeType': 'io1', + 'Iops': 1000 + }, + 'NoDevice': '' + }, + ] + } + ) + + spot_request_id = response['SpotInstanceRequests'][0]['SpotInstanceRequestId'] + time.sleep(30) + response = self.ec2.describe_spot_instance_requests(SpotInstanceRequestIds=[spot_request_id]) + cur_spot = response['SpotInstanceRequests'][0] + if ('InstanceId' in cur_spot): + now_str = "[" + datetime.now().isoformat() + "] " + self.logger.info("SPOTID " + cur_spot['InstanceId']) + result = self.ec2.create_tags(Resources=[cur_spot['InstanceId']], + Tags=[ # Instance tags (default as Type and Name) + { + 'Key': 'Type', + 'Value': 'worker-spot' + }, + { + 'Key': 'Name', + 'Value': 'Auto-Generated Spot Worker ' + instance_type_in + } + ] + ) + return cur_spot['InstanceId'] + else: + now_str = "[" + datetime.now().isoformat() + "] " + self.logger.error('Spot Request for ' + instance_type_in + ' failed in zone ' + bestZone[0]) + self.ec2.cancel_spot_instance_requests(SpotInstanceRequestIds=[spot_request_id]) + return '' + + # function createSpotInstanceThreads + # \param instance_type_in: string containing the instance type + # \param az : availability zone + # \param price : instance price + # \param valid_count : Number of times instance must perform over budget + # \param ret_dict : Return the object created + # Create a Spot instance using threads (call createSpotInstance function) + def createSpotInstanceThreads(self, instance_type_in, az, price, valid_count, ret_dict): + ret = self.createSpotInstance(instance_type_in, az, price) + if ret != '': + ec2_instance = self.ec2res.Instance(ret) + init_time = datetime.strptime(ec2_instance.launch_time.strftime("%Y-%m-%dT%H:%M:%S"), + "%Y-%m-%dT%H:%M:%S") + + instance_dict = {"instance_id": ret, + "instance_type": instance_type_in, + "instance_az": az, + "price": price, + "performance_negative": -1, + "init_time": init_time, + "cur_time": init_time, + "valid": valid_count, + "prev_valid": valid_count} + + ret_dict[ret] = instance_dict + else: + self.logger.error("FAILED TO CREATE INSTANCE OF TYPE " + instance_type_in) + + # function replaceInstance + # \param list : list of instances + # \param oldTuple : instance to be replaced + # Replaces an instance with one with better cost per performance ratio + def replaceInstance(self, list, oldTuple): + counter = 0 + # (instance_id, instance_type, az, price, costperinterp, costperinterp_stdev) + for inst in list: + self.logger.debug("Counter for " + str(oldTuple[1]) + ": " + str(counter)) + self.logger.debug("Trying instance: " + str(inst[1])) + counter = counter + 1 + if ((oldTuple[7] < inst[6]) and (oldTuple[1] != inst[1])): + if (self.createSpotInstance(inst[1], inst[2], inst[3]) != ''): + self.terminateInstance(oldTuple[0]) + now_str = "[" + datetime.now().isoformat() + "] " + self.logger.info('Replacing ' + oldTuple[1] + ' in zone ' + oldTuple[2] + ' with best case performance as ' + str(oldTuple[7]) + ' with ' + inst[1] + ' in zone ' + inst[2] + ' with worst case performance as ' + str(inst[6])) + return + else: + break + now_str = "[" + datetime.now().isoformat() + "] " + self.logger.info(oldTuple[1] + ' in zone ' + oldTuple[2] + ' with best case performance as ' + str(oldTuple[7]) + ' could not be replaced') + + # function newInstance + # \param list : list sorted in decrescent cost/performance order (best first) + # Creates a new instance with the best cost/performance in list + def newInstance(self, list): + counter = 0 + # (instance_id, instance_type, az, price, costperinterp, costperinterp_stdev) + for inst in list: + if (self.createSpotInstance(inst[1], inst[2], inst[3]) != ''): + self.logger.info('Started new instance of type ' + inst[1]) + return + + self.logger.info('Couldn\'t start new instance') + + # function get_instance_reservations + # \return list of instance reservations + # Get instances ids that have tag:Type worker-spot and is + # turned on and running + def get_instance_reservations(self): + filters = [] + f1 = {} + f1['Name'] = 'tag:Type' + f1['Values'] = ['worker-spot'] + filters.append(f1) + f2 = {} + f2['Name'] = 'instance-state-name' + f2['Values'] = ['running'] + filters.append(f2) + + instance_reservations = self.ec2.describe_instances(Filters=filters) + return instance_reservations + + # function get_jobmanager_reservations + # \return list of instance reservations + # Get instances ids that have tag:type jobmanager-ondemand and is + # turned on and running + def get_jobmanager_reservations(self): + filters = [] + f1 = {} + f1['Name'] = 'tag:Type' + f1['Values'] = ['jobmanager-ondemand'] + filters.append(f1) + f2 = {} + f2['Name'] = 'instance-state-name' + f2['Values'] = ['running'] + filters.append(f2) + + instance_reservations = self.ec2.describe_instances(Filters=filters) + return instance_reservations + + # function get_jobmanager_reservations + # \return time object with the job manager initialization time + # Gets the time that the job manager started running + def get_jobmanager_init_time(self): + jm_res = self.get_jobmanager_reservations() + instance = jm_res['Reservations'][0] + instance_id = instance['Instances'][0]['InstanceId'] + + ec2_instance = self.ec2res.Instance(instance_id) + init_time = datetime.strptime(ec2_instance.launch_time.strftime("%Y-%m-%dT%H:%M:%S"), "%Y-%m-%dT%H:%M:%S") + + return init_time diff --git a/main/log_prices.csv b/main/log_prices.csv new file mode 100644 index 0000000..1a1250e --- /dev/null +++ b/main/log_prices.csv @@ -0,0 +1,6 @@ +# This is a CSV file that should contain prices for the desired instances in the following format +# timestamp region az instance_type os price +# +# This way it will be possible for the simulator to work +# Also remove this header after populating the log +timestamp region az instance_type os price \ No newline at end of file diff --git a/main/pseudo_instance_operations.py b/main/pseudo_instance_operations.py new file mode 100644 index 0000000..c114451 --- /dev/null +++ b/main/pseudo_instance_operations.py @@ -0,0 +1,84 @@ +#!/usr/bin/env python + +# The MIT License (MIT) +# +# Copyright (c) 2019 Nicholas Torres Okita +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, +# and/or sell copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. + +# This file contains similar operations to the instance_operations however they +# are fake ones, so there is no actual instance creation or termination + +from datetime import datetime +from datetime import timedelta +import csv + +class pseudo_instance_operations: + + # function __init__ + # Initiliaze the class object by opening the input file which stores the + # price at each (simulated) time. And set a initial time for the simulation + # program to be running (in this example 15th of February of 2019). + def __init__(self): + self.input_f = open("log_prices.csv", "r") + self.rd = [] + csv_dr = csv.DictReader(self.input_f, delimiter="\t") + for row in csv_dr: + self.rd.append(row) + + self.time_now = datetime.strptime("2019-02-15T00:00:00.000Z", "%Y-%m-%dT%H:%M:%S.%fZ") + + # function get_current_spot_price_allaz + # \param instance_type: string with the name of the instance type + # This function gets the current spot price (from a file) for an instance + # type for all availability zones. + def get_current_spot_price_allaz(self, instance_type): + valid_time = datetime.strptime("2019-01-01T00:00:00.000Z", "%Y-%m-%dT%H:%M:%S.%fZ") + dict = {} + for row in self.rd: + if (row['instance_type'] == instance_type): + time_stamp = datetime.strptime(row['timestamp'], "%Y-%m-%dT%H:%M:%S.%fZ") + if (time_stamp <= self.time_now and time_stamp >= valid_time): + valid_time = time_stamp + dict[row['az']] = float(row['price']) + + return dict + + # function get_current_spot_price + # \param instance_type: string with the name of the instance type + # \param az: string containing the availability zone + # This function gets the current spot price (from a file) for an instance + # type in an availability zone az + def get_current_spot_price(self, instance_type, az): + valid_time = datetime.strptime("2019-01-01T00:00:00.000Z", "%Y-%m-%dT%H:%M:%S.%fZ") + price = -1 + for row in self.rd: + if (row['instance_type'] == instance_type and row['az'] == az): + time_stamp = datetime.strptime(row['timestamp'], "%Y-%m-%dT%H:%M:%S.%fZ") + if (time_stamp <= self.time_now and time_stamp >= valid_time): + valid_time = time_stamp + price = float(row['price']) + + return price + + # function add_minutes + # \param increase_time: time to be increased in minutes + # Increment the current timer in increase_time minutes + def add_minutes(self, increase_time): + self.time_now = self.time_now + timedelta(minutes=increase_time) \ No newline at end of file diff --git a/main/rds_config.py b/main/rds_config.py new file mode 100644 index 0000000..34ab1ed --- /dev/null +++ b/main/rds_config.py @@ -0,0 +1,32 @@ +# The MIT License (MIT) +# +# Copyright (c) 2019 Nicholas Torres Okita +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, +# and/or sell copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. + +# Config file containing credentials for rds mysql instance +# To be completed by the user +# db_host: Database server address +# db_username: Username to access the database +# db_password: Password to access the database +# db_name: Name of the database to be accessed +db_host = "" +db_username = "" +db_password = "" +db_name = "" \ No newline at end of file diff --git a/main/rds_operations.py b/main/rds_operations.py new file mode 100755 index 0000000..c486aa5 --- /dev/null +++ b/main/rds_operations.py @@ -0,0 +1,248 @@ +#!/usr/bin/env python + +# The MIT License (MIT) +# +# Copyright (c) 2019 Nicholas Torres Okita +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, +# and/or sell copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. + +# This file contains the RDS operations to access and get information from a Relational Database Server (RDS) +# At this moment it requires that the + +import pymysql +import rds_config +import sys + +# function rds_connect +# +# \return conn connection object +# +# This function uses the configuration stored in the rds_config.py file to +# create a connection to a database. The connection object is then returned +# to the function caller. If there was an error while trying to connect to the +# database, the program will call sys.exit() and close. +def rds_connect(): + # Remote access to MySQL database in RDS + rds_host = rds_config.db_host + name = rds_config.db_username + password = rds_config.db_password + db_name = rds_config.db_name + + # Try connection + try: + conn = pymysql.connect(rds_host, user=name, passwd=password, db=db_name, connect_timeout=5) + except: + print("ERROR: Unexpected error: Could not connect to MySql instance.") + sys.exit() + + return conn + +# function get_idparameters +# \param conn: Database connection object +# \param aph: Aperture in half offsets +# \param apm: Aperture in midpoints +# \param window: Window of time samples +# \param np: Number of population members +# \param gens: Number of generations +# +# \return idparameters: key to idparameters in the database +# +# This function will access the database through the conn object and then search +# for the parameters table that has it. These parameters are mainly used for +# seismic programs, which was the SPITS program used for this prototype. +# +# If the function cannot find the paramters table with the input paramters, then +# create a new entry. At last, return the idparameters key stored. +def get_idparameters(conn, aph, apm, window, np, gens): + idparameters = -1 + with conn.cursor() as cur: + query = "select * from experimentos.parameters where `aph`={} and `apm`={} and `window` like {} and `np`={} and `gens`={};" + cur.execute(query.format(aph, apm, window, np, gens)) + results = cur.fetchall() + if len(results) == 0: + query = "insert into experimentos.parameters(`aph`,`apm`,`window`,`np`,`gens`) values({},{},{},{},{});" + cur.execute(query.format(aph, apm, window, np, gens)) + cur.execute("select LAST_INSERT_ID();") + results = cur.fetchall() + idparameters = results[0][0] + else: + idparameters = results[0][0] + conn.commit() + conn.commit() + + return idparameters + +# function get_iddata +# \param conn: Database connection object +# \param data_hash: md5sum of the data set being used +# +# \return iddata +# +# This function will return the iddata from the table that stores the data set, +# returning -1 if the table was not found. +def get_iddata(conn, data_hash): + iddata = -1 + with conn.cursor() as cur: + query = "select * from experimentos.data where hash='{}'" + cur.execute(query.format(data_hash)) + results = cur.fetchall() + if len(results) == 0: + print("Data wasn't found") + else: + iddata = results[0][0] + conn.commit() + conn.commit() + + return iddata + +# function get_idexperiment +# \param conn: Database connection object +# \param iddata: id for the data set being used +# \param idparameters: id for the set of parameters being used +# \param instance_type: string containing the instance type +# +# \return idexperiment +# +# Using the previously gotten iddata and idparameters, plus the instance_type +# this functino will return an experiment table (which is a table that contains +# experiment informations for a given instance in a set of parameters). If the +# table does not exist, create a new one. Then return the table id. +def get_idexperiment(conn, iddata, idparameters, instance_type): + idexperiment = -1 + with conn.cursor() as cur: + query = "select * from experimentos.experiment where data_iddata={} and parameters_idparameters={} and instance_name='{}'" + cur.execute(query.format(iddata, idparameters, instance_type)) + results = cur.fetchall() + if len(results) == 0: + query = "insert into experimentos.experiment(`data_iddata`,`parameters_idparameters`,`instance_name`) values({},{},'{}');" + print(query.format(iddata, idparameters, instance_type)) + cur.execute(query.format(iddata, idparameters, instance_type)) + cur.execute("select LAST_INSERT_ID();") + results = cur.fetchall() + idexperiment = results[0][0] + else: + idexperiment = results[0][0] + conn.commit() + conn.commit() + + return idexperiment + +# function insert_interpsec +# \param conn: Database connection object +# \param idexperiment: id for the experiment +# \param interpsec: performance measurement +# +# This function accesses the database using the connection object conn, then +# it inserts a new performance entry for a given experiment. +def insert_interpsec(conn, idexperiment, interpsec): + with conn.cursor() as cur: + query = "insert into experimentos.interpsec(`interpsec`,`experiment_idperformance`) values({},{});" + cur.execute(query.format(interpsec, idexperiment)) + conn.commit() + +# function insert_interpols +# \param conn: Database connection object +# \param idparametrs: id for the parameters +# \param iddata: id for the data set +# \param interpols: number of tasks completed +# +# This function accesses the database using the connection object conn, then +# it inserts the total amount of tasks completed for the pair of parameters +# and data set. +def insert_interpols(conn, idparameters, iddata, interpols): + with conn.cursor() as cur: + query = "insert into experimentos.interpols(`parameters_idparameters`,`data_iddata`, `interpols`) values({},{},{});" + cur.execute(query.format(idparameters, iddata, interpols)) + conn.commit() + +# function get_interpols +# \param conn: Database connection object +# \param idparametrs: id for the parameters +# \param iddata: id for the data set +# +# \return Number of tasks to be completed for a pair of data set and parameters +# +# This function simply accesses the database via the conn object and gets the +# number of tasks for the pair iddata and idparameters. +def get_interpols(conn, idparameters, iddata): + with conn.cursor() as cur: + query = "select avg(interpols) from experimentos.interpols where `parameters_idparameters`={} and `data_iddata`={};" + cur.execute(query.format(idparameters, iddata)) + results = cur.fetchall() + conn.commit() + conn.commit() + + return results[0][0] + +# function get_interpsec_allinstances +# \param conn: Database connection object +# \param iddata: id for the data set +# \param idparametrs: id for the parameters +# +# \return Performance measurements for all instances +# +# This function accesses the database via the conn object and runs a query to +# return the performance information (stored as interpsec) for all instances +# types that executed the data with a set of parameters. +def get_interpsec_allinstances(conn, iddata, idparameters): + with conn.cursor() as cur: + query = """select instance_name, avg(interpsec) as interpsec, stddev(interpsec) as stddev, min(interpsec) as min_interpsec, + max(interpsec) as max_interpsec, T.name, T.hash + from experimentos.interpsec inner join experimentos.experiment on experiment_idperformance=idperformance + inner join experimentos.data T on data_iddata=T.iddata + where iddata={} and parameters_idparameters={} + group by T.name, instance_name + order by T.name, interpsec; + """ + + cur.execute(query.format(iddata, idparameters)) + results = cur.fetchall() + conn.commit() + conn.commit() + + return results + +# function get_interpsec_allinstances +# \param conn: Database connection object +# \param iddata: id for the data set +# \param idparametrs: id for the parameters +# \param instance_type: string containing an instance type +# +# \return Performance measurements for an instance +# +# This function accesses the database via the conn object and runs a query to +# return the performance information (stored as interpsec) for an input instance +# type that executed the data with a set of parameters. +def get_interpsec(conn, iddata, idparameters, instance_type): + with conn.cursor() as cur: + query = """select instance_name, avg(interpsec) as interpsec, stddev(interpsec) as stddev, min(interpsec) as min_interpsec, + max(interpsec) as max_interpsec, T.name, T.hash + from experimentos.interpsec inner join experimentos.experiment on experiment_idperformance=idperformance + inner join experimentos.data T on data_iddata=T.iddata + where iddata={} and parameters_idparameters={} and instance_name="{}" + group by T.name, instance_name + order by T.name, interpsec; + """ + + cur.execute(query.format(iddata, idparameters, instance_type)) + results = cur.fetchall() + conn.commit() + conn.commit() + + return results \ No newline at end of file diff --git a/main/save_interpols.py b/main/save_interpols.py new file mode 100644 index 0000000..de91fc2 --- /dev/null +++ b/main/save_interpols.py @@ -0,0 +1,81 @@ +#!/usr/bin/env python + +# The MIT License (MIT) +# +# Copyright (c) 2019 Nicholas Torres Okita +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, +# and/or sell copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. + +# This file script stores performance information to a Relational Database Server (RDS) database. +# To store the information the experiment configuration should be passed as input by the command line: +# aph = argv[1] +# apm = argv[2] +# window = argv[3] +# np = argv[4] +# gens = argv[5] +# data_hash = argv[6] +# tasks = argv[7] + +import logging +import sys +import rds_operations + +# function main +# \param (command line argument) aph: aperture in half offsets +# \param (command line argument) apm: aperture in midpoints +# \param (command line argument) window: time window +# \param (command line argument) np: Number of members in population +# \param (command line argument) gens: Number of generations +# \param (command line argument) data_hash: Data md5sum +# \param (command line argument) performance: Performance measurement +# +# This function accesses the MySQL database using rds_operations and adds a +# the number of tasks completed related to an experiment +# +# This is an example code and can be modified to better suit the user needs. +# However, keep in mind that others parts of the code will need to be modified +# as well, namely rds_operations.py, to_execute.py and simulation.py. +def main(): + # Remote access to MySQL database in RDS + conn = rds_operations.rds_connect() + + # Get parameters from command line + aph = float(sys.argv[1]) + apm = float(sys.argv[2]) + window = float(sys.argv[3]) + np = int(sys.argv[4]) + gens = int(sys.argv[5]) + + # Get parameters id + idparameters = rds_operations.get_idparameters(conn, aph, apm, window, np, gens) + + # Get data id from hash + data_hash = sys.argv[6] + iddata = rds_operations.get_iddata(conn, data_hash) + + # Get number of tasks from command line + interpols = float(sys.argv[7]) + + # Insert tasks to the database + rds_operations.insert_interpols(conn, idparameters, iddata, interpols) + + +if __name__ == "__main__": + logger = logging.getLogger() + main() diff --git a/main/save_performance.py b/main/save_performance.py new file mode 100644 index 0000000..0d55eef --- /dev/null +++ b/main/save_performance.py @@ -0,0 +1,97 @@ +#!/usr/bin/env python + +# The MIT License (MIT) +# +# Copyright (c) 2019 Nicholas Torres Okita +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, +# and/or sell copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. + +# This file script stores performance information to a Relational Database Server (RDS) database. +# To store the information the experiment configuration should be passed as input by the command line: +# aph = argv[1] +# apm = argv[2] +# window = argv[3] +# np = argv[4] +# gens = argv[5] +# data_hash = argv[6] +# performance = argv[7] + +import logging +import sys +import requests +import rds_operations + +# function getInstanceType +# \return String containing instance type +# This function simply requests an AWS local URL to get its instance type +def getInstanceType(): + response = requests.get('http://169.254.169.254/latest/meta-data/instance-type') + output = response.text + return str(output) + +# function main +# \param (command line argument) aph: aperture in half offsets +# \param (command line argument) apm: aperture in midpoints +# \param (command line argument) window: time window +# \param (command line argument) np: Number of members in population +# \param (command line argument) gens: Number of generations +# \param (command line argument) data_hash: Data md5sum +# \param (command line argument) performance: Performance measurement +# +# This function accesses the MySQL database using rds_operations and adds a +# performance measurement relating to an experiment. If the performance +# measurement is zero, then it will simply ignore the value. +# +# This is an example code and can be modified to better suit the user needs. +# However, keep in mind that others parts of the code will need to be modified +# as well, namely rds_operations.py, to_execute.py and simulation.py. +def main(): + # Remote access to MySQL database in RDS + conn = rds_operations.rds_connect() + + # Get parameters from input + aph = float(sys.argv[1]) + apm = float(sys.argv[2]) + window = float(sys.argv[3]) + np = int(sys.argv[4]) + gens = int(sys.argv[5]) + + # Get parameters id + idparameters = rds_operations.get_idparameters(conn, aph, apm, window, np, gens) + + # Get data id from hash + data_hash = sys.argv[6] + iddata = rds_operations.get_iddata(conn, data_hash) + + # Gets current instance type and performance + instance_type = getInstanceType() + interpsec = float(sys.argv[7]) + + # Gets experiment id (with data, parameters and instance type) + idexperiment = rds_operations.get_idexperiment(conn, iddata, idparameters, instance_type) + + # Insert performance measurement to the database + # Does not insert if it was zero + if (interpsec != 0): + rds_operations.insert_interpsec(conn, idexperiment, interpsec) + + +if __name__ == "__main__": + logger = logging.getLogger() + main() diff --git a/main/simulation.py b/main/simulation.py new file mode 100755 index 0000000..d3055d8 --- /dev/null +++ b/main/simulation.py @@ -0,0 +1,325 @@ +#!/usr/bin/env python + +# The MIT License (MIT) +# +# Copyright (c) 2018-2019 Nicholas Torres Okita +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, +# and/or sell copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. + +# This module simulates the execution of a SPITS program given Spot instance prices stored in a file + +import logging +import sys +import time +from datetime import datetime +import operator +from instance_operations import instance_operations +from pseudo_instance_operations import pseudo_instance_operations +import rds_operations +import random + +# function getLogger +# +# \param name: Name of the logger +# \return Logger object +# +# This function simply creates a new logger object to output information +def getLogger(name): + now = datetime.now() + #Logging configuration + logger = logging.getLogger(name) + logger.setLevel(logging.DEBUG) + #Log formatter + formatter = logging.Formatter("[%(asctime)s] %(levelname)-8s %(message)s") + #Log File handler + string_name = "simulation{}.log" + handler = logging.FileHandler(string_name.format(now.strftime("%Y%m%d%H%M%S"))) + handler.setLevel(logging.DEBUG) + handler.setFormatter(formatter) + logger.addHandler(handler) + #Screen handler + screenHandler = logging.StreamHandler(stream=sys.stdout) + screenHandler.setLevel(logging.DEBUG) + screenHandler.setFormatter(formatter) + logger.addHandler(screenHandler) + return logger + +# function get_from_input +# +# \param _string: String to be searched from input +# \param input_dict: Input dictionary of pairs key=value +# \return Value stored for _string or -1 if it does not exist +def get_from_input(_string, input_dict): + if _string in input_dict: + return input_dict[_string] + else: + return -1 + +# function pareto +# \param target_nodes: number of instances that will be used +# \param data_hash: data to be executed hash (needs to be stored in the database) +# \param idparameters: database id with the parameters used in experiment +# +# This function will print the estimated Pareto given the stored performance and +# current price for each instance. Note that the Job Manager instance selected +# is the c5.4xlarge and all disks are 20GB 1000IOPS, therefore the prices are +# defined as such. +# +# Furthermore, we consider a performance penalty of 0.1% for each new instance +# added. +def pareto(target_nodes, data_hash, idparameters): + all_zones = ['us-east-1a', 'us-east-1b', 'us-east-1c', 'us-east-1d', 'us-east-1e', 'us-east-1f'] + + conn = rds_operations.rds_connect() + target_tasks = rds_operations.get_interpols(conn, idparameters, rds_operations.get_iddata(conn, data_hash)) + iddata = rds_operations.get_iddata(conn, data_hash) + + all_performance = rds_operations.get_interpsec_allinstances(conn, iddata, idparameters) + fake_ops = pseudo_instance_operations() + ops = instance_operations(logger) + + jm_perf = 0 + for result in all_performance: + if result[0] == "c5.4xlarge": + jm_perf = float(result[1]) + + logger.info("INSTANCE_TYPE\tTIME_TO_RUN(seconds)\tCOST(dollars)") + for result in all_performance: + instance_type = result[0] + interpsec = result[1] + stddev_interpsec = result[2] + + # prices = instance_operations.get_current_spot_price_allaz(ec2, instance_type) + + best_price = 10000 + best_az = "" + + for az in all_zones: + price = ops.get_current_spot_price(instance_type, az) + + if (price < best_price and price > 0): + best_price = price + 0.09375 + best_az = az + + costperinterp = float(interpsec / (best_price / 3600)) + string = "{}\t{}\t{}" + time_to_run = target_tasks / (jm_perf + interpsec * target_nodes * (1-((target_nodes-1)/1000.0))) + price_to_pay = time_to_run * target_nodes * best_price / 3600 + time_to_run * 0.68 / 3600 + logger.info(string.format(instance_type,time_to_run,price_to_pay)) + +# function main +# +# \param (command line input) interval : time to wait for the next iteration in seconds +# \param (command line input) time_skip : time to skip in simulation in minutes +# \param (command line input) failure_create : failure rate (percentage) when creating an instance +# \param (command line input) failure_execute : failure rate (percentage) while instance is running +# \param (command line input) budget : budget in dollars to complete the execution +# \param (command line input) data_hash : dataset hash stored in the database +# \param (command line input) nodes : number of instances running +# \param (command line input) target_tasks : number of tasks to be executed (if empty, gets from database from +# data_hash) +# (id parameters is set as default to 41, can be changed in code) +# +# Before the iterations loop, this function sets the input parameters and +# initializes the database connection, getting the id number for the dataset +# and how many tasks need to be completed. +# +# The most important part is after the initialization, in which the program +# computes how much was spent (considering a Job Manager of type c5.4xlarge +# and a 20GB 1000IOPS disk) and how many tasks were completed. With those +# values, it verifies if any instance is constantly going over budget, +# killing the ones that are and replacing them with instances that are not. +# +# If the experiment cannot continue due to budget constraints, then the budget +# is increased by 10%. +# +# The differences between the simulation and the real module (to_execute.py) is +# that the simulation considers that there is an user defined chance of an +# instance failing to be created (defined by failure_create), to simulate an +# instance unavailability from the Spot and an instance failing during execution +# (defined by failure_exec), to simulate an instance being terminated by the +# provider. Furthermore, it allows the user to define the prices via a file +# (log_prices.py) so that they can see how the algorithm behaves given price +# changes. At last, to allow the user to simulate the execution without waiting +# for the actual execution time, it is possible to set a time_skip, so that the +# module will simulate how much of the task was completed (with a random +# oscillation of 10% and performance penalty of 0.1% with an increased number of +# instances) and how much was spent. +# +# This loop ends after the main SPITS program finishes the execution (or the +# number of tasks completed is greater or equal to the one stored in +# the database). +def main(): + input_dict = {} + for cur in sys.argv: + if '=' in cur: + key, val = cur.split('=') + input_dict.update({key: val}) + + interval = float(get_from_input("interval", input_dict)) + time_skip = float(get_from_input("time_skip", input_dict)) + failure_create = float(get_from_input("failure_create", input_dict)) + failure_exec = float(get_from_input("failure_exec", input_dict)) + + list_running = [] + budget = float(get_from_input("budget", input_dict)) + data_hash = get_from_input("data_hash", input_dict) + target_nodes = int(get_from_input("nodes", input_dict)) + + all_zones = ['us-east-1a', 'us-east-1b', 'us-east-1c', 'us-east-1d', 'us-east-1e', 'us-east-1f'] + + idparameters = 41 + + conn = rds_operations.rds_connect() + target_tasks = float(get_from_input("target_tasks", input_dict)) + if (target_tasks == -1): + target_tasks = rds_operations.get_interpols(conn, idparameters, rds_operations.get_iddata(conn, data_hash)) + iddata = rds_operations.get_iddata(conn, data_hash) + + spent_sofar = 0 + wk_spent_sofar = 0 + jm_spent_sofar = 0 + tasks_sofar = 0 + + in_budget = budget + _in_budget = in_budget + in_tasks = target_tasks + + fake_ops = pseudo_instance_operations() + ops = instance_operations(logger) + + pareto(target_nodes, data_hash, idparameters) + jm_cost = 0.68 + jm_interpsec = rds_operations.get_interpsec(conn, iddata, idparameters, "c5.4xlarge") + + tasks_str = 'TASKS PROCESSED SO FAR = {}/{}' + spent_str = 'HOW MONEY WAS SPENT (TOTAL = JM + WK) = {} = {} + {}' + money_str = 'MONEY SPENT SO FAR = {}/{} (user requested = {})' + instances_str = 'NUMBER OF INSTANCES RUNNING = {}/{}' + simulated_time = 'SIMULATED TIMENOW = {}' + + log_to_csv = 'CSV\t{}\t{}' + + time_spent = 0 + + while target_tasks > 0: + negative_bias = (len(list_running)-1)/1000 + for inst in list_running: + price = ops.get_current_spot_price(inst[1], inst[2]) + 0.09375 + wk_spent_sofar += price * time_skip / 60 + + coin_toss = random.random() + if (coin_toss < failure_exec): + logger.debug('REMOVING INST ' + inst[1] + ' FOR RANDOM FAILURE (' + str(coin_toss) + '/' + str( + failure_exec) + ')') + list_running.remove(inst) + else: + tasks_sofar += inst[8] * random.uniform(0.9 - negative_bias, 1.1 - negative_bias) * time_skip * 60 + + tasks_sofar += random.uniform(jm_interpsec[1] - jm_interpsec[2], jm_interpsec[1] + jm_interpsec[2]) * time_skip * 60 + jm_spent_sofar += 0.68 * time_skip / 60 + + spent_sofar = wk_spent_sofar + jm_spent_sofar + + target_tasks = in_tasks - tasks_sofar + budget = in_budget - spent_sofar + + logger.info(simulated_time.format(fake_ops.time_now)) + logger.info(tasks_str.format(tasks_sofar, in_tasks)) + logger.info(spent_str.format(spent_sofar, jm_spent_sofar, wk_spent_sofar)) + logger.info(money_str.format(spent_sofar, in_budget, _in_budget)) + logger.info(instances_str.format(len(list_running), target_nodes)) + logger.info(log_to_csv.format(time_spent/60,spent_sofar)) + + logger.debug("INSTANCES RUNNING") + for inst in list_running: + logger.debug("(" + inst[1] + "," + inst[2] + "," + str(inst[3]) + "," + str(inst[8]) + ")") + + if (target_tasks <= 0): + break + + target_ratio = target_tasks / budget + + old_list = list_running + list_running = [] + + for inst in old_list: + price = ops.get_current_spot_price(inst[1], inst[2]) + 0.09375 + if (inst[8]/(price/3600) >= target_ratio): + list_running.append(inst) + else: + logger.debug('REMOVING INST ' + inst[1]) + + if (len(list_running) < target_nodes): + candidates = [] + + while len(candidates) == 0: + budget = in_budget - spent_sofar + target_ratio = target_tasks / budget + all_performance = rds_operations.get_interpsec_allinstances(conn, iddata, idparameters) + for result in all_performance: + instance_type = result[0] + interpsec = result[1] + stddev_interpsec = result[2] + + # prices = instance_operations.get_current_spot_price_allaz(ec2, instance_type) + + for az in all_zones: + price = ops.get_current_spot_price(instance_type, az) + 0.09375 + costperinterp = float(interpsec / (price / 3600)) + costperinterp_stdev = float(stddev_interpsec / (price / 3600)) + costperinterp_positive = costperinterp + costperinterp_stdev + costperinterp_negative = costperinterp - costperinterp_stdev + tuple = ( + "inactive", instance_type, az, price, costperinterp, costperinterp_stdev, costperinterp_negative, + costperinterp_positive, interpsec) + + if costperinterp_negative > target_ratio: + if (instance_type != "p3.2xlarge"): + candidates.append(tuple) + + candidates.sort(key=operator.itemgetter(8), reverse=True) + logger.debug(candidates) + + if (len(candidates) == 0): + logger.error("IMPOSSIBLE TO RUN EXPERIMENT WITH THIS CONFIGURATION") + in_budget += in_budget / 10 + logger.error("INCREASING BUDGET BY 10\% (TO " + str(in_budget) + " USD)") + + k = 0 + while len(list_running) < target_nodes: + for i in range(0,(int(target_nodes/5))): + if (random.random() > failure_create): + list_running.append(candidates[k]) + else: + logger.debug("FAILED TO CREATE INSTANCE OF TYPE " + candidates[k][1]) + k = (k + 1) % len(candidates) + + if len(list_running) >= target_nodes: + break + k = (k + 1) % len(candidates) + + time.sleep(interval) + fake_ops.add_minutes(time_skip) + time_spent += time_skip + +if __name__ == "__main__": + logger = getLogger(__name__) + main() + diff --git a/main/to_execute.py b/main/to_execute.py new file mode 100755 index 0000000..6bfee3c --- /dev/null +++ b/main/to_execute.py @@ -0,0 +1,404 @@ +#!/usr/bin/env python + +# The MIT License (MIT) +# +# Copyright (c) 2018-2019 Nicholas Torres Okita +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, +# and/or sell copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. + +# The main in this file is to be executed in the Job Manager to select the best +# instances for a given input price. If the user selects a very low price (for +# example 5 dollars), the script will try to increase the budget constraint to +# something feasible. + +import logging +import sys +import time +import boto3 +from datetime import datetime +from datetime import timedelta +import operator +import threading +from instance_operations import instance_operations +import rds_operations + +# function getLogger +# +# \param name: Name of the logger +# \return Logger object +# +# This function simply creates a new logger object to output information +def getLogger(name): + now = datetime.now() + #Logging configuration + logger = logging.getLogger(name) + logger.setLevel(logging.DEBUG) + #Log formatter + formatter = logging.Formatter("[%(asctime)s] %(levelname)-8s %(message)s") + #Log File handler + handler = logging.FileHandler("jm_handler.log") + handler.setLevel(logging.DEBUG) + handler.setFormatter(formatter) + logger.addHandler(handler) + #Screen handler + screenHandler = logging.StreamHandler(stream=sys.stdout) + screenHandler.setLevel(logging.DEBUG) + screenHandler.setFormatter(formatter) + logger.addHandler(screenHandler) + return logger + +# function get_from_input +# +# \param _string: String to be searched from input +# \param input_dict: Input dictionary of pairs key=value +# \return Value stored for _string or -1 if it does not exist +def get_from_input(_string, input_dict): + if _string in input_dict: + return input_dict[_string] + else: + return -1 + +# function pareto +# \param target_nodes: number of instances that will be used +# \param data_hash: data to be executed hash (needs to be stored in the database) +# \param idparameters: database id with the parameters used in experiment +# +# This function will print the estimated Pareto given the stored performance and +# current price for each instance. Note that the Job Manager instance selected +# is the c5.4xlarge and all disks are 20GB 1000IOPS, therefore the prices are +# defined as such. +# +# Furthermore, we consider a performance penalty of 0.1% for each new instance +# added. +def pareto(target_nodes, data_hash, idparameters): + all_zones = ['us-east-1a', 'us-east-1b', 'us-east-1c', 'us-east-1d', 'us-east-1e', 'us-east-1f'] + + conn = rds_operations.rds_connect() + target_tasks = rds_operations.get_interpols(conn, idparameters, rds_operations.get_iddata(conn, data_hash)) + iddata = rds_operations.get_iddata(conn, data_hash) + + all_performance = rds_operations.get_interpsec_allinstances(conn, iddata, idparameters) + ops = instance_operations(logger) + + jm_perf = 0 + for result in all_performance: + if result[0]== "c5.4xlarge": + jm_perf = float(result[1]) + + logger.info("INSTANCE_TYPE\tTIME_TO_RUN(seconds)\tCOST(dollars)") + for result in all_performance: + instance_type = result[0] + interpsec = result[1] + stddev_interpsec = result[2] + + # prices = instance_operations.get_current_spot_price_allaz(ec2, instance_type) + + best_price = 10000 + best_az = "" + + for az in all_zones: + price = ops.get_current_spot_price(instance_type, az) + 0.09375 + + if (price < best_price and price > 0): + best_price = price + best_az = az + + costperinterp = float(interpsec / (best_price / 3600)) + string = "{}\t{}\t{}" + time_to_run = target_tasks / (jm_perf + interpsec * target_nodes * (1-((target_nodes-1)/1000.0))) + price_to_pay = time_to_run * target_nodes * best_price / 3600 + time_to_run * 0.68 / 3600 + logger.info(string.format(instance_type,time_to_run,price_to_pay)) + +# function main +# +# \param (command line input) interval : time to wait for the next iteration in minutes +# \param (command line input) budget : budget in dollars to complete the execution +# \param (command line input) data_hash : dataset hash stored in the database +# \param (command line input) nodes : maximum number of instances running +# (id parameters is set as default to 41, can be changed in code) +# +# Before the iterations loop, this function sets the input parameters and +# initializes the database connection, getting the number dataset and how many +# tasks need to be completed. +# +# The most important part is after the initialization, in which the program +# computes how much was spent (considering a Job Manager of type c5.4xlarge +# and a 20GB 1000IOPS disk) and how many tasks were completed. With those +# values, it verifies if any instance is constantly going over budget, +# killing the ones that are and replacing them with instances that are not. +# +# If the experiment cannot continue due to budget constraints, then the budget +# is increased by 10%. +# +# This loop ends after the main SPITS program finishes the execution (or the +# number of tasks completed is greater or equal to the one stored in +# the database). +def main(): + input_dict = {} + for cur in sys.argv: + if '=' in cur: + key, val = cur.split('=') + input_dict.update({key: val}) + + interval = float(get_from_input("interval", input_dict)) + + budget = float(get_from_input("budget", input_dict)) + data_hash = get_from_input("data_hash", input_dict) + target_nodes = int(get_from_input("nodes", input_dict)) + + valid_count = int(get_from_input("valid_count", input_dict)) + if (valid_count == -1): + valid_count = 1 + + all_zones = ['us-east-1a', 'us-east-1b', 'us-east-1c', 'us-east-1d', 'us-east-1e', 'us-east-1f'] + + idparameters = 41 + + conn = rds_operations.rds_connect() + target_tasks = rds_operations.get_interpols(conn, idparameters, rds_operations.get_iddata(conn, data_hash)) + iddata = rds_operations.get_iddata(conn, data_hash) + + spent_sofar = 0 + tasks_sofar = 0 + + in_budget = budget + _in_budget = in_budget + in_tasks = target_tasks + + pareto(target_nodes, data_hash, idparameters) + + tasks_str = 'TASKS PROCESSED SO FAR = {}/{}' + money_str = 'MONEY SPENT SO FAR = {}/{} (user requested = {})' + money_left_str = 'MONEY LEFT TO SPEND = {} (user requested = {}) | TARGET RATIO = {}' + instances_str = 'NUMBER OF INSTANCES RUNNING = {}/{}' + simulated_time = 'TIMENOW = {}' + + ops = instance_operations(logger) + time_start = ops.get_jobmanager_init_time() + + ec2 = boto3.client('ec2', region_name='us-east-1') + cloudwatch = boto3.client('cloudwatch') + + log_to_csv = 'CSV\t{}\t{}' + + time_spent = 0 + + run_dict = {} + wk_spent_sofar = 0 + jm_spent_sofar = 0 + ec2_res = boto3.resource('ec2', region_name='us-east-1') + #time.sleep(180) + while target_tasks > 0: + # Update cost + jm_diff = (datetime.utcnow() - time_start) + jm_spent_sofar = jm_diff.total_seconds() * (0.68+0.09375) / 3600 + + for key,inst in run_dict.items(): + diff = datetime.utcnow() - inst["cur_time"] + inst["cur_time"] = datetime.utcnow() + wk_spent_sofar += float(inst["price"]) * diff.total_seconds() / 3600 + inst["prev_valid"] = inst["valid"] + inst["valid"] = 0 + + # Verifies running instances + instance_reservations = ops.get_instance_reservations() + for instance in instance_reservations['Reservations']: + instance_id = instance['Instances'][0]['InstanceId'] + instance_type = instance['Instances'][0]['InstanceType'] + instance_az = instance['Instances'][0]['Placement']['AvailabilityZone'] + + ec2_instance = ec2_res.Instance(instance_id) + init_time = datetime.strptime(ec2_instance.launch_time.strftime("%Y-%m-%dT%H:%M:%S"), "%Y-%m-%dT%H:%M:%S") + + result = cloudwatch.get_metric_statistics(Namespace='Performance', + MetricName='perf_sec', + StartTime=(datetime.today() - timedelta(minutes=2*interval)), + Dimensions=[{'Name': 'Instance Id', 'Value': instance_id}, + {'Name': 'Type', 'Value': instance_type}], + EndTime=datetime.today(), + Period=60, + Statistics=['Average']) + + tasks_completed = cloudwatch.get_metric_statistics(Namespace='Performance', + MetricName='tasks_completed', + StartTime=(datetime.today() - timedelta(minutes=2*interval)), + Dimensions=[{'Name': 'Instance Id', 'Value': instance_id}, + {'Name': 'Type', 'Value': instance_type}], + EndTime=datetime.today(), + Period=60, + Statistics=['Maximum']) + + result_stdev = cloudwatch.get_metric_statistics(Namespace='Performance', + MetricName='perf_sec_stdev', + StartTime=(datetime.today() - timedelta(minutes=2*interval)), + Dimensions=[{'Name': 'Instance Id', 'Value': instance_id}, + {'Name': 'Type', 'Value': instance_type}], + EndTime=datetime.today(), + Period=60, + Statistics=['Average']) + + if tasks_completed['Datapoints']: + if (tasks_completed['Datapoints'][0]['Maximum']) > tasks_sofar: + tasks_sofar = tasks_completed['Datapoints'][0]['Maximum'] + + if not instance_id in run_dict: + if result['Datapoints'] and result_stdev['Datapoints']: + price = ops.get_current_spot_price(instance_type, instance_az) + 0.09375 + performance_negative = float(result['Datapoints'][0]['Average']) - float( + result_stdev['Datapoints'][0]['Average']) + + instance_dict = {"instance_id":instance_id, + "instance_type":instance_type, + "instance_az":instance_az, + "price":price, + "performance_negative":performance_negative, + "init_time":init_time, + "cur_time": init_time, + "valid":valid_count, + "prev_valid": valid_count} + + run_dict[instance_id] = instance_dict + else: + price = ops.get_current_spot_price(instance_type, instance_az) + 0.09375 + instance_dict = {"instance_id": instance_id, + "instance_type": instance_type, + "instance_az": instance_az, + "price": price, + "performance_negative": -1, + "init_time": init_time, + "cur_time": init_time, + "valid": valid_count, + "prev_valid":valid_count} + run_dict[instance_id] = instance_dict + else: + instance_dict = run_dict[instance_id] + instance_dict["valid"] = instance_dict["prev_valid"] + + if result['Datapoints'] and result_stdev['Datapoints']: + instance_dict["price"] = ops.get_current_spot_price(instance_type, instance_az) + 0.09375 + instance_dict["performance_negative"] = float(result['Datapoints'][0]['Average']) - float(result_stdev['Datapoints'][0]['Average']) + + spent_sofar = wk_spent_sofar + jm_spent_sofar + + target_tasks = in_tasks - tasks_sofar + budget = in_budget - spent_sofar + + target_ratio = target_tasks / budget + + logger.info(simulated_time.format(datetime.now())) + logger.info(tasks_str.format(tasks_sofar, in_tasks)) + logger.info(money_str.format(spent_sofar, in_budget, _in_budget)) + logger.info(money_left_str.format(budget, _in_budget, target_ratio)) + logger.info(instances_str.format(len(run_dict.keys()), target_nodes)) + logger.info(log_to_csv.format(((datetime.utcnow() - time_start).total_seconds()),spent_sofar)) + + logger.debug("INSTANCES RUNNING") + for key,inst in run_dict.items(): + logger.debug(inst) + + if (target_tasks <= 0): + break + + for key,inst in run_dict.items(): + if (inst["performance_negative"]/(inst["price"]/3600) < target_ratio or inst["performance_negative"] == -1): + inst["valid"] -= 1 + logger.debug("DECREASING COUNTER FOR INST " + inst["instance_id"] + "(" + inst["instance_type"] + "), NOW: " + str(inst["valid"])) + else: + inst["valid"] = min(inst["valid"] + 1, valid_count) + + if (inst["valid"] <= 0): + logger.debug('REMOVING INST ' + inst["instance_id"]) + ops.terminateInstance(inst["instance_id"]) + + + for key in list(run_dict.keys()): + if run_dict[key]["valid"] <= 0: + del run_dict[key] + + if (len(list(run_dict.keys())) < target_nodes): + candidates = [] + + while len(candidates) == 0: + budget = in_budget - spent_sofar + target_ratio = target_tasks / budget + all_performance = rds_operations.get_interpsec_allinstances(conn, iddata, idparameters) + + for result in all_performance: + instance_type = result[0] + interpsec = result[1] + stddev_interpsec = result[2] + + prices = ops.get_current_spot_price_allaz(instance_type) + + for az in prices: + price = prices[az] + 0.09375 + costperinterp = float(interpsec / (price / 3600)) + costperinterp_stdev = float(stddev_interpsec / (price / 3600)) + costperinterp_negative = costperinterp - costperinterp_stdev + + instance_dict = {"instance_id": "inactive", + "instance_type": instance_type, + "instance_az": az, + "price": price, + "performance_negative": interpsec, + } + + if costperinterp_negative > target_ratio: + temp = [inst for inst in run_dict.values() if inst['instance_type'] == instance_type and inst['instance_az'] == az] + if (len(temp) == 0): + candidates.append(instance_dict) + else: + for inst in temp: + if (inst not in candidates): + candidates.append(inst) + + candidates.sort(key=operator.itemgetter('performance_negative'), reverse=True) + + if (len(candidates) == 0): + logger.error("IMPOSSIBLE TO RUN EXPERIMENT WITH THIS CONFIGURATION") + in_budget += in_budget / 10 + logger.error("INCREASING BUDGET BY 10\% (TO " + str(in_budget) + " USD)") + + logger.debug("CANDIDATES") + for inst in candidates: + logger.debug(inst) + + k = 0 + counter = 0 + while len(run_dict.keys()) < target_nodes and counter < 5: + threads = [] + num_threads = min(target_nodes - len(run_dict.keys()), int(target_nodes/5)) + # Replace instances + for i in range(0, num_threads): + thr = threading.Thread(target=ops.createSpotInstanceThreads, args=(candidates[k]['instance_type'], candidates[k]['instance_az'], candidates[k]['price'],valid_count,run_dict)) + thr.start() + threads.append(thr) + + for thr in threads: + thr.join() + + k = (k + 1) % len(candidates) + if k == 0: + counter += 1 + + time.sleep(interval*60) + +if __name__ == "__main__": + logger = getLogger(__name__) + main() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..4bd5050 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +boto3 +pymysql \ No newline at end of file