diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..fccbeee --- /dev/null +++ b/.travis.yml @@ -0,0 +1,7 @@ +notifications: + email: false +language: python +python: + - "2.7" +cache: apt +script: make test diff --git a/Dockerfile b/Dockerfile index 93bdc10..ee97c40 100644 --- a/Dockerfile +++ b/Dockerfile @@ -3,8 +3,8 @@ # portainer source installed into /opt/portainer. # -FROM ubuntu:12.04 -MAINTAINER Tom Arnfeld +FROM ubuntu:14.04 +MAINTAINER Oli Hall # Install dependencies RUN apt-get update && apt-get install -y build-essential git python-setuptools python-virtualenv diff --git a/README.md b/README.md index c1249bc..33d65dd 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ # Portainer -Portainer is an [Apache Mesos](https://mesos.apache.org) framework that enables you to build docker images across a cluster of many machines. +Portainer is an [Apache Mesos](https://mesos.apache.org) framework that enables you to build Docker images across a cluster of many machines. ``` .,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,.,. @@ -23,7 +23,7 @@ Portainer is an [Apache Mesos](https://mesos.apache.org) framework that enables ``````` ``` -When building docker images at scale, it can be time consuming and wasteful to manage dedicated infrastructure for building and pushing images. Building large containers with many sources and dependencies is a heavy operation, sometimes requiring many large machines. Deploying this infrastructure can be expensive and often leads to poor utilization. +When building Docker images at scale, it can be time consuming and wasteful to manage dedicated infrastructure for building and pushing images. Building large containers with many sources and dependencies is a heavy operation, sometimes requiring many large machines. Deploying this infrastructure can be expensive and often leads to poor utilization. Given an existing Apache Mesos cluster, Portainer can get to work right away. If you're new to Mesos, you can try out the Vagrant box provided, or learn more about the [Apache Mesos Architecture](https://mesos.apache.org/documentation/latest/mesos-architecture/) and [get started](https://mesos.apache.org/gettingstarted/). @@ -41,8 +41,7 @@ See below for more documentation on how to use the Vagrant virtual machine. #### Notes -- Pushing built images to the public docker index is currently not supported -- Support for docker client ~>1.7.0 requires Apache Mesos >=0.23.0 ([MESOS-3279](https://issues.apache.org/jira/browse/MESOS-3279)) +- Pushing built images to the public Docker index is currently not supported. -------------------------------------------------------------------------------- @@ -59,7 +58,7 @@ You'll need to have the following dependencies installed to run the framework. ### Mesos Agent Dependencies -By default, Portainer will try and launch an ephemeral docker daemon (`docker -d`) on the mesos agent machine using [docker in docker](https://github.com/jpetazzo/dind). This requires that you're using a Docker Containerizer on your Mesos agents. If you are not, you'll need to specify the `--docker-host` argument (e.g `--docker-host /var/run/docker.sock`) describing where the docker daemon can be accessed on each agent. +By default, Portainer will try and launch an ephemeral Docker daemon (`docker -d`) on the mesos agent machine using [docker in docker](https://github.com/jpetazzo/dind). This requires that you're using a Docker Containerizer on your Mesos agents. If you are not, you'll need to specify the `--docker-host` argument (e.g `--docker-host /var/run/docker.sock`) describing where the Docker daemon can be accessed on each agent. ## Building an Image @@ -69,13 +68,15 @@ By default, Portainer will try and launch an ephemeral docker daemon (`docker -d $ bin/build-executor ``` +This script uses PEX to package portainer's code into an executable to be run on Mesos. + _Note: If you've got a dirty git tree, you'll need to set the `FORCE=1` environment variable._ The built PEX (python executable) archive will be dumped into `./dist`, and needs to be uploaded somewhere Mesos can reach it (HDFS, S3, FTP, HTTP etc). Check the output from the build-executor command to see the file name, and upload the file. The environment name is tacked on to the archive filename, e.g. `dist/portainer-37cc6d5eb334473fdaa9c7522c4ce585032dca5c.linux-x86_64.tar.gz`. Make sure you build the executor on the same platform as your mesos slaves use. -In future, readily-downloadable prebuild pex files will be available on versioned github releases. +In future, readily-downloadable prebuilt pex files will be available on versioned github releases. #### 2. Grab a `Dockerfile` @@ -120,7 +121,7 @@ The vagrant virtual environment provided will launch a VM will the following com ### 1. Start the VM -To use the Vagrant box, run `vagrant box add debian-73-x64-virtualbox-nocm http://puppet-vagrant-boxes.puppetlabs.com/debian-73-x64-virtualbox-nocm.box` then `vagrant up` to set everything up. +To use the Vagrant box, run `vagrant up` to set everything up. This will download a Debian 8 virtualmachine and install all the required processes/dependencies. ### 2. Test Mesos @@ -128,7 +129,7 @@ The VM runs on a static IP `192.168.33.50` so before proceeding it's best to che ### 3. Build the executor -To build the Portainer executor, simply run `bin/build-executor`. +To build the Portainer executor, simply run `bin/build-executor` from the portainer root directory (`/opt/portainer`). _Note: If you've got a dirty git tree, you'll need to set the `FORCE=1` environment variable._ diff --git a/Vagrantfile b/Vagrantfile index ca8944c..87a113b 100644 --- a/Vagrantfile +++ b/Vagrantfile @@ -10,15 +10,28 @@ sudo apt-key adv --keyserver keyserver.ubuntu.com --recv E56151BF DISTRO=$(lsb_release -is | tr '[:upper:]' '[:lower:]') CODENAME=$(lsb_release -cs) -# Add the repository +# Add the Mesos/Docker repositories (Docker needs official GPG key adding first) echo "deb http://repos.mesosphere.io/${DISTRO} ${CODENAME} main" | \ sudo tee /etc/apt/sources.list.d/mesosphere.list sudo apt-get -y update -sudo apt-get -y install mesos +sudo apt-get install -y --no-install-recommends apt-transport-https ca-certificates curl software-properties-common +curl -fsSL https://apt.dockerproject.org/gpg | sudo apt-key add - +echo "deb https://apt.dockerproject.org/repo/ debian-$CODENAME main" | \ + sudo tee /etc/apt/sources.list.d/docker.list + +# Update package list and install Mesos/Docker +sudo apt-get -y update +sudo apt-get -y install mesos=1.1.0-2.0.107.debian81 +sudo apt-get -y install docker-engine=1.10.3-0~jessie + +# Set Mesos IP/containerizer/docker instance sudo bash -c "echo 192.168.33.50 > /etc/mesos-master/ip" sudo bash -c "echo 192.168.33.50 > /etc/mesos-slave/ip" sudo bash -c "echo docker,mesos > /etc/mesos-slave/containerizers" -sudo bash -c "echo /usr/bin/docker-1.7.0 > /etc/mesos-slave/docker" +sudo bash -c "echo /usr/bin/docker > /etc/mesos-slave/docker" + +# add local user as user on VM +adduser $1 # Start a bunch of services sudo service zookeeper restart @@ -26,20 +39,9 @@ sleep 5 (sudo service mesos-master stop || true) (sudo service mesos-slave stop || true) -# Install Docker -sudo bash -c 'echo "deb http://http.debian.net/debian wheezy-backports main" > /etc/apt/sources.list.d/backports.list' -sudo apt-get install -y linux-image-amd64 -curl -sSL https://get.docker.com/ | sh -sudo usermod -a -G docker vagrant - -# Download a specific docker binary -# TODO: Skip the above? -sudo bash -c "curl -0 https://get.docker.com/builds/Linux/x86_64/docker-1.7.0 > /usr/bin/docker-1.7.0" -sudo chmod +x /usr/bin/docker-1.7.0 - # Set up the docker registry sudo mkdir -p /registry -sudo docker create -p 5000:5000 -v /registry:/tmp/registry-dev --name=registry registry:0.9.1 +sudo docker create -p 5000:5000 -v /registry:/tmp/registry-dev --name=registry registry:2 (sudo docker start registry || true) # Start mesos @@ -47,25 +49,27 @@ sudo service mesos-master start sudo service mesos-slave start # Install portainer dependencies -sudo apt-get install -y python-setuptools +sudo apt-get install -y python-setuptools python-dev sudo easy_install pip sudo pip install virtualenv SCRIPT Vagrant.configure("2") do |config| - # Use the same base box as vagrant-web - config.vm.box = "debian-73-x64-virtualbox-nocm" + config.vm.box = "puppetlabs/debian-8.2-64-nocm" config.vm.synced_folder "./", "/opt/portainer" config.vm.network :private_network, ip: "192.168.33.50" - # Configure the VM with 1024Mb of RAM and 2 CPUs + # Configure the VM with 2048Mb of RAM and 2 CPUs config.vm.provider :virtualbox do |vb| - vb.customize ["modifyvm", :id, "--memory", "1024"] + vb.customize ["modifyvm", :id, "--memory", "2048"] vb.customize ["modifyvm", :id, "--cpus", "2"] end + # ensure VM clock stays in sync with correct time zone + config.vm.provision :shell, :inline => "sudo rm /etc/localtime && sudo ln -s /usr/share/zoneinfo/Europe/London /etc/localtime", run: "always" + # Install all the things! - config.vm.provision "shell", inline: $docker_setup + config.vm.provision "shell", inline: $docker_setup, args: ENV['USER'] end diff --git a/bin/build-executor b/bin/build-executor index 2947aab..c3fa183 100755 --- a/bin/build-executor +++ b/bin/build-executor @@ -42,21 +42,23 @@ pushd $TMP_DIR > /dev/null echo "Fetching dependencies (fresh) to wheelhouse" # Use wheel to download, so that we get git-urls and non-pypi releases, all consistently bundled as wheels for pex - # to then load. Mo need to fetch dependencies, as pex will do it anyway (and do it better it seems) - pip wheel -q --no-deps -r $SOURCE_DIR/requirements.pip + # to then load. No need to fetch dependencies, as pex will do it anyway (and do it better it seems) + mkdir wheelhouse + + pip wheel -q --no-deps -r $SOURCE_DIR/requirements.pip -w wheelhouse # Gather up all requirements as cli "-r" args to pex. Because we have git urls, we have to provide them all # 'inexact' as bare names, as we don't know the versions in advance - reqs=$(ls wheelhouse | sed -n 's/^\([^-]*\)-.*$/\1/p' | grep -v pex | awk '$0="-r "$0') + reqs=$(ls wheelhouse | sed -n 's/^\([^-]*\)-.*$/\1/p' | grep -v pex | awk '$0=" "$0') echo "Building pex" # Then feed them all to pex, using the wheelhouse as the cache dir. Provide a cache TTL to trick pex into using the # .whl files even though the req list entries are inexact - pex --wheel --repo wheelhouse --cache-dir wheelhouse --cache-ttl=9999 $reqs -s $SOURCE_DIR -e portainer.app -o ./bin/portainer + pex $SOURCE_DIR $reqs --wheel --cache-dir=wheelhouse --cache-ttl=9999 -e portainer.app -o ./bin/portainer # if there's anything that looks like a non-pure-python wheel being used, mark the filename as platform-specific # check for wheels with names that aren't "..-py2..", "..-py2.py3.." etc -- ones with c deps for example are named "..-cp27.." - if $(ls | grep -qv '^.*-.*-py\d'); then + if $(ls wheelhouse | grep -qv '^.*-.*-py\d'); then PLATFORM=$(python -c 'import pkg_resources; print pkg_resources.get_build_platform()') TAR_NAME="${TAR_NAME}.${PLATFORM}" fi diff --git a/example/tree/a/Dockerfile b/example/tree/a/Dockerfile index f55d978..73ba710 100644 --- a/example/tree/a/Dockerfile +++ b/example/tree/a/Dockerfile @@ -1,6 +1,6 @@ -FROM ubuntu:12.04 -MAINTAINER Tom Arnfeld +FROM ubuntu:14.04 +MAINTAINER Oli Hall REPOSITORY portainer/tree-a diff --git a/example/tree/b/Dockerfile b/example/tree/b/Dockerfile index 49104ee..b888626 100644 --- a/example/tree/b/Dockerfile +++ b/example/tree/b/Dockerfile @@ -1,6 +1,6 @@ FROM portainer/tree-a -MAINTAINER Tom Arnfeld +MAINTAINER Oli Hall REPOSITORY portainer/tree-b diff --git a/portainer/app/__main__.py b/portainer/app/__main__.py index c39938b..217d8a2 100644 --- a/portainer/app/__main__.py +++ b/portainer/app/__main__.py @@ -35,8 +35,7 @@ def main(argv): formatter = logging.Formatter(fmt="%(asctime)s[%(name)s] %(message)s") handler.setFormatter(formatter) - for logger in ("portainer.build", "portainer.scheduler", "portainer.executor", "pesos", - "compactor", "tornado"): + for logger in ("portainer.build", "portainer.scheduler", "portainer.executor", "pesos", "compactor", "tornado"): logger = logging.getLogger(logger) logger.propagate = False logger.addHandler(handler) diff --git a/portainer/app/build.py b/portainer/app/build.py index 54a9b7c..5a5f1a0 100644 --- a/portainer/app/build.py +++ b/portainer/app/build.py @@ -1,16 +1,18 @@ -"""The entrypoint to the portainer app. Spins up the a schedular instance and +"""The entrypoint to the portainer app. Spins up a scheduler instance and waits for the result.""" import getpass import logging -import pesos.scheduler import sys import threading import time -from pesos.vendor.mesos import mesos_pb2 from portainer.app import subcommand from portainer.app.scheduler import Scheduler +from portainer.util.parser import parse_dockerfile + +from pymesos import MesosSchedulerDriver + logger = logging.getLogger("portainer.build") @@ -59,16 +61,22 @@ def main(args): logger.error("The --repository argument cannot be used when building multiple images") sys.exit(1) + # TODO eliminate duplication of dockerfile parsing + dockerfiles = [parse_dockerfile(d, registry=args.pull_registry) for d in args.dockerfile] + # Launch the mesos framework - framework = mesos_pb2.FrameworkInfo() - framework.user = getpass.getuser() - framework.name = "portainer" + framework = { + 'user': getpass.getuser(), + 'name': 'Portainer: building %s' % ', '.join([d.get("REPOSITORY").next()[0] for d in dockerfiles]) + } if args.framework_role: - framework.role = args.framework_role + framework['role'] = args.framework_role if args.framework_id: - framework.id.value = args.framework_id + framework['id'] = { + 'value': args.framework_id + } if args.docker_host: args.container_image = None @@ -89,11 +97,13 @@ def main(args): max_retries=args.retries ) - driver = pesos.scheduler.PesosSchedulerDriver( - scheduler, framework, args.mesos_master + driver = MesosSchedulerDriver( + scheduler, + framework, + args.mesos_master ) - # Kick off the pesos scheduler and watch the magic happen + # Kick off the pymesos scheduler and watch the magic happen thread = threading.Thread(target=driver.run) thread.setDaemon(True) thread.start() diff --git a/portainer/app/executor.py b/portainer/app/executor.py index 8eb4fe4..4a30fec 100644 --- a/portainer/app/executor.py +++ b/portainer/app/executor.py @@ -3,14 +3,14 @@ invoking the docker daemon, then running `docker build` for the task's staged context file, and communicating with mesos throughout""" +import base64 import docker import functools import io import json import logging -import mesos.interface import os -import pesos.executor +import pymesos import re import signal import subprocess @@ -18,19 +18,15 @@ import time import traceback -from pesos.vendor.mesos import mesos_pb2 - from portainer.app import subcommand from portainer.proto import portainer_pb2 - logger = logging.getLogger("portainer.executor") -@subcommand("build-executor") +@subcommand("run-executor") def main(args): - - driver = pesos.executor.PesosExecutorDriver(Executor()) + driver = pymesos.MesosExecutorDriver(Executor()) thread = threading.Thread(target=driver.run) thread.setDaemon(True) @@ -40,12 +36,7 @@ def main(args): time.sleep(0.5) -class Executor(mesos.interface.Executor): - - TASK_STARTING = mesos_pb2.TASK_STARTING - TASK_RUNNING = mesos_pb2.TASK_RUNNING - TASK_FINISHED = mesos_pb2.TASK_FINISHED - TASK_FAILED = mesos_pb2.TASK_FAILED +class Executor(pymesos.Executor): def __init__(self): self.build_task = None @@ -54,15 +45,14 @@ def __init__(self): self.docker_daemon_up = False def registered(self, driver, executorInfo, frameworkInfo, slaveInfo): - logger.info("Setting up environment for building containers") # Parse the build task object try: build_task = portainer_pb2.BuildTask() - build_task.ParseFromString(executorInfo.data) + build_task.ParseFromString(base64.b64decode(executorInfo['data'])) except Exception: - logger.error("Failed to parse BuildTask in ExecutorInfo.data") + logger.error("Failed to parse BuildTask in ExecutorInfo['data']") raise self.build_task = build_task @@ -82,7 +72,7 @@ def launch_docker_daemon(): # Use the `wrapdocker` script included in our docker image proc = subprocess.Popen(["/usr/local/bin/wrapdocker"], env=env) - self.docker = docker.Client() + self.docker = docker.APIClient() while True: try: self.docker.ping() @@ -100,24 +90,23 @@ def launch_docker_daemon(): daemon_thread.setDaemon(True) daemon_thread.start() else: - self.docker = docker.Client(build_task.daemon.docker_host) + self.docker = docker.APIClient(base_url=build_task.daemon.docker_host) self.docker_daemon_up = True - def disconnected(self, driver): - logger.info("Disconnected from master! Ahh!") - def reregistered(self, driver, slaveInfo): - logger.info("Re-registered from the master! Ahh!") + logger.info("Re-registered to master! Ahh!") - def launchTask(self, driver, taskInfo): + def disconnected(self, driver): + logger.info("Disconnected from master! Ahh!") - logger.info("Launched task %s", taskInfo.task_id.value) + def launchTask(self, driver, task): + logger.info("Launched task %s", task['task_id']['value']) # Spawn another thread to run the task, freeing up the executor thread = threading.Thread(target=functools.partial( self._build_image, driver, - taskInfo, + task, self.build_task )) @@ -136,49 +125,14 @@ def shutdown(self, driver): else: logger.warning("Unable to locate docker pidfile") - def _wrap_docker_stream(self, stream): - """Wrapper to parse the different types of messages from the - Docker Remote API and spit them out in a friendly format.""" - - for msg in stream: - logger.info("Received update from docker: %s", msg.rstrip()) - - # Parse the message / handle any errors from docker - try: - update = json.loads(msg.rstrip()) - except Exception, e: - logger.error("Caught exception parsing message %s %r", msg, e) - else: - if "error" in update: - logger.error("Docker error: %s", update["error"]) - yield update["error"], False - raise Exception("Docker encountered an error") - - friendly_message = None - is_stream = False - - if "stream" in update: - is_stream = True - friendly_message = re.sub(r'\033\[[0-9;]*m', '', - update["stream"].rstrip()) - if "status" in update: - friendly_message = update["status"].rstrip() - if "id" in update: - friendly_message = "[%s] %s" % (update["id"], friendly_message) - if "progress" in update: - friendly_message += " (%s)" % update["progress"] - - if friendly_message is not None: - yield friendly_message, is_stream - def _build_image(self, driver, taskInfo, buildTask): """Build an image for the given buildTask.""" # Tell mesos that we're starting the task - driver.sendStatusUpdate(mesos_pb2.TaskStatus( - task_id=taskInfo.task_id, - state=self.TASK_STARTING - )) + driver.sendStatusUpdate({ + 'task_id': taskInfo['task_id'], + 'state': 'TASK_STARTING' + }) logger.info("Waiting for docker daemon to be available") @@ -193,10 +147,10 @@ def _build_image(self, driver, taskInfo, buildTask): raise Exception("Timed out waiting for docker daemon") # Now that docker is up, let's go and do stuff - driver.sendStatusUpdate(mesos_pb2.TaskStatus( - task_id=taskInfo.task_id, - state=self.TASK_RUNNING - )) + driver.sendStatusUpdate({ + 'task_id': taskInfo['task_id'], + 'state': 'TASK_RUNNING' + }) if not buildTask: raise Exception("Failed to decode the BuildTask protobuf data") @@ -226,7 +180,7 @@ def _build_image(self, driver, taskInfo, buildTask): for message, is_stream in self._wrap_docker_stream(build_request): if not is_stream or (is_stream and buildTask.stream): driver.sendFrameworkMessage( - ("%s: %s" % (image_name, message)).encode('unicode-escape') + base64.b64encode("%s: %s" % (image_name, message)) ) else: sandbox_dir = os.environ.get("MESOS_SANDBOX", os.environ["MESOS_DIRECTORY"]) @@ -246,7 +200,7 @@ def _build_image(self, driver, taskInfo, buildTask): for message, is_stream in self._wrap_docker_stream(build_request): if not is_stream or (is_stream and buildTask.stream): driver.sendFrameworkMessage( - ("%s: %s" % (image_name, message)).encode('unicode-escape') + base64.b64encode("%s: %s" % (image_name, message)) ) # Extract the newly created image ID @@ -257,7 +211,7 @@ def _build_image(self, driver, taskInfo, buildTask): # Tag the image with all the required tags tags = buildTask.image.tag or ["latest"] - driver.sendFrameworkMessage(str("%s: Tagging image %s" % (image_name, image_id))) + driver.sendFrameworkMessage(base64.b64encode("%s: Tagging image %s" % (image_name, image_id))) for tag in tags: try: self.docker.tag( @@ -266,31 +220,66 @@ def _build_image(self, driver, taskInfo, buildTask): tag=tag, force=True ) - driver.sendFrameworkMessage(str("%s: -> %s" % (image_name, tag))) + driver.sendFrameworkMessage(base64.b64encode("%s: -> %s" % (image_name, tag))) except Exception, e: raise e # Push the image to the registry - driver.sendFrameworkMessage(str("%s: Pushing image" % image_name)) + driver.sendFrameworkMessage(base64.b64encode("%s: Pushing image" % image_name)) push_request = self.docker.push(image_name, stream=True) for message, is_stream in self._wrap_docker_stream(push_request): if not is_stream or (is_stream and buildTask.stream): driver.sendFrameworkMessage( - str("%s: %s" % (image_name, message)) + base64.b64encode("%s: %s" % (image_name, message)) ) - driver.sendStatusUpdate(mesos_pb2.TaskStatus( - task_id=taskInfo.task_id, - state=self.TASK_FINISHED - )) + driver.sendStatusUpdate({ + 'task_id': taskInfo['task_id'], + 'state': 'TASK_FINISHED' + }) except Exception, e: logger.error("Caught exception building image: %s", e) - driver.sendStatusUpdate(mesos_pb2.TaskStatus( - task_id=taskInfo.task_id, - state=self.TASK_FAILED, - message=str(e), - data=traceback.format_exc() - )) + driver.sendStatusUpdate({ + 'task_id': taskInfo['task_id'], + 'state': 'TASK_FAILED', + 'message': str(e), + 'data': base64.b64encode(traceback.format_exc()) + }) # Re-raise the exception for logging purposes and to terminate the thread raise + + def _wrap_docker_stream(self, stream): + """Wrapper to parse the different types of messages from the + Docker Remote API and spit them out in a friendly format.""" + + for msg in stream: + logger.info("Received update from docker: %s", msg.rstrip()) + + # Parse the message / handle any errors from docker + try: + update = json.loads(msg.rstrip()) + except Exception, e: + logger.error("Caught exception parsing message %s %r", msg, e) + else: + if "error" in update: + logger.error("Docker error: %s", update["error"]) + yield update["error"], False + raise Exception("Docker encountered an error") + + friendly_message = None + is_stream = False + + if "stream" in update: + is_stream = True + friendly_message = re.sub(r'\033\[[0-9;]*m', '', + update["stream"].rstrip()) + if "status" in update: + friendly_message = update["status"].rstrip() + if "id" in update: + friendly_message = "[%s] %s" % (update["id"], friendly_message) + if "progress" in update: + friendly_message += " (%s)" % update["progress"] + + if friendly_message is not None: + yield friendly_message, is_stream diff --git a/portainer/app/scheduler.py b/portainer/app/scheduler.py index 0d0ef2b..bcb0def 100644 --- a/portainer/app/scheduler.py +++ b/portainer/app/scheduler.py @@ -2,25 +2,25 @@ the task definition; pack up the task context; ship it to the staging area; accept the offer and launch the task; and wait for the result""" +import base64 import logging import os -import mesos.interface import progressbar +import pymesos import sys -import StringIO import tarfile import tempfile import threading import traceback import uuid +from Queue import Queue from collections import defaultdict -from fnmatch import fnmatch from functools import partial -from fs.opener import opener -from pesos.vendor.mesos import mesos_pb2 from urlparse import urlparse -from Queue import Queue + +from fs.opener import opener +from fnmatch import fnmatch from portainer.proto import portainer_pb2 from portainer.util.parser import parse_dockerfile, parse_dockerignore @@ -36,8 +36,7 @@ class StagingSystemRequiredException(Exception): pass -class Scheduler(mesos.interface.Scheduler): - """Mesos scheduler that is responsible for launching the builder tasks.""" +class Scheduler(pymesos.Scheduler): def __init__(self, executor_uri, cpu_limit, mem_limit, push_registry, staging_uri, stream=False, verbose=False, repository=None, @@ -95,7 +94,6 @@ def enqueue_build(self, path, tags): """ task_id = str(uuid.uuid1()) - logger.info("Queuing build for %s for %s", task_id, path) build_task = portainer_pb2.BuildTask() @@ -185,7 +183,7 @@ def handle_exception(e): if len(registry) > 1: build_task.image.registry.port = int(registry[1]) except ValueError: - raise ValueError("Failed to parse REGISTRY in %s", path) + raise ValueError("Failed to parse REGISTRY in %s" % path) # Add any tags build_task.image.tag.extend(tags) @@ -194,24 +192,155 @@ def handle_exception(e): self.pending += 1 self.queued_tasks.append((dockerfile, build_task)) + def _make_build_context(self, output, context_root, dockerfile): + """Generate and return a compressed tar archive of the build context.""" + + if not self.filesystem: + raise StagingSystemRequiredException("A staging filesystem is required for local sources") + + tar = tarfile.open(mode="w:gz", fileobj=output) + for idx, (cmd, instruction) in enumerate(dockerfile.instructions): + if cmd in ("ADD", "COPY"): + local_path, remote_path = instruction + tar_path = "context/%s" % str(idx) + + # TODO(tarnfeld): This isn't strict enough + if local_path.startswith("http"): + logger.debug("Skipping remote ADD %s", local_path) + continue + + if not local_path.startswith("/"): + local_path = os.path.join(context_root, local_path) + local_path = os.path.abspath(local_path) + + if os.path.isfile(local_path): + # Preserve the file extension + parts = local_path.split(".") + if len(parts) > 1: + tar_path += "." + parts[-1] + logger.debug("Adding path %s to tar in %s", local_path, tar_path) + tar.add(local_path, arcname=tar_path) + else: + ignore = set() + for (dirpath, _, filenames) in os.walk(local_path, followlinks=True): + # Update the set of ignored paths with any new .dockerignore files we see + ignore_path = os.path.join(dirpath, ".dockerignore") + if os.path.exists(ignore_path): + with open(ignore_path, 'r') as f: + for glob in parse_dockerignore(f): + ignore.add(os.path.join(dirpath, glob) + "*") + + for filename in filenames: + path = os.path.join(dirpath, filename) + for expr in ignore: + if fnmatch(path, expr): + logger.debug("Ignoring path %s", path) + break + else: + rel_path = path.replace(local_path, '').lstrip('/') + logger.debug("Adding path %s to tar in %s", rel_path, tar_path) + tar.add(path, arcname=os.path.join(tar_path, rel_path)) + + dockerfile.instructions[idx] = (cmd, (tar_path, remote_path)) + + def statusUpdate(self, driver, status): + """Called when a status update is received from the mesos cluster.""" + + finished = False + failed = False + task_id = status['task_id']['value'] + + if status['task_id']['value'] not in self.task_history: + logger.error("Task update for unknown task! %s", task_id) + return + + if status['state'] == 'TASK_STARTING': + logger.info("Task update %s : STARTING", task_id) + if status['state'] == 'TASK_RUNNING': + logger.info("Task update %s : RUNNING", task_id) + if status['state'] == 'TASK_FAILED': + logger.info("Task update %s : FAILED", task_id) + if status['message'] and status.get('data'): + logger.info("Exception caught while building image: \n\n%s", base64.b64decode(status['data'])) + elif status['message']: + logger.info("Failure while building image: \n\n%s", status['message']) + failed = True + elif status['state'] == 'TASK_FINISHED': + logger.info("Task update %s : FINISHED", task_id) + finished = True + elif status['state'] == 'TASK_KILLED': + logger.info("Task update %s : KILLED", task_id) + failed = True + elif status['state'] == 'TASK_LOST': + logger.info("Task update %s : LOST", task_id) + failed = True + + # Update the last known status of the task + last_known_state = self.task_status[task_id] + self.task_status[task_id] = status['state'] + + if finished: + self.cleanup.schedule_cleanup(task_id) + + self.running -= 1 + self.finished += 1 + elif failed: + self.running -= 1 + + # Re-queue the task if it hasn't started RUNNING yet + if last_known_state in {None, 'TASK_STARTING', 'TASK_STAGING'} and \ + self.task_retries[task_id] < self.max_retries: + self._reschedule_task(task_id, blacklist_slave=status['agent_id']['value']) + else: + self.failed += 1 + self.cleanup.schedule_cleanup(task_id) + + # If there are no tasks running, and the queue is empty, we should stop + if self.running == 0 and self.pending == 0: + driver.stop() + + def _reschedule_task(self, task_id, blacklist_slave=None): + if task_id not in self.task_history: + logger.error("Cannot re-schedule unknown task %s", task_id) + return + + with self._processing_queue: + del self.task_status[task_id] + + self.pending += 1 + self.task_retries[task_id] += 1 + + if blacklist_slave: + self.blacklist.add(blacklist_slave) + + self.queued_tasks.append(self.task_history[task_id]) + logger.info("Re-scheduling task [%d] %s, excluding slave %s", + self.task_retries[task_id], task_id, blacklist_slave) + def registered(self, driver, frameworkId, masterInfo): - host = masterInfo.hostname or masterInfo.ip - master = "http://%s:%s" % (host, masterInfo.port) - logger.info("Framework %s registered to %s", frameworkId.value, master) + host = masterInfo['hostname'] or masterInfo['ip'] + master = "http://%s:%s" % (host, masterInfo['port']) + logger.info("Framework %s registered to %s", frameworkId['value'], master) def disconnected(self, driver): logger.warning("Framework disconnected from the mesos master") def reregistered(self, driver, masterInfo): - host = masterInfo.hostname or masterInfo.ip - master = "http://%s:%s" % (host, masterInfo.port) + host = masterInfo['hostname'] or masterInfo['ip'] + master = "http://%s:%s" % (host, masterInfo['port']) logger.info("Framework re-registered to %s", master) def error(self, driver, message): logger.error("Framework error: %s", message) - def resource_offers(self, driver, offers): + def frameworkMessage(self, driver, executorId, slaveId, message): + message = base64.b64decode(message) + if "Buffering" in message: # Heh. This'll do for now, eh? + logger.debug("\t%s", message) + else: + logger.info("\t%s", message) + def resourceOffers(self, driver, offers): # Spawn another thread to handle offer processing to free up the driver t = threading.Thread(target=partial( self._handle_offers, @@ -229,8 +358,7 @@ def _handle_offers(self, driver, offers): tasks_to_launch = [] if not self.pending: - for offer in offers: - driver.declineOffer(offer.id) + driver.declineOffer([offer['id'] for offer in offers]) else: for offer in offers: offer_cpu = 0.0 @@ -238,20 +366,20 @@ def _handle_offers(self, driver, offers): offer_role = None # Extract the important resources from the offer - for resource in offer.resources: - offer_role = resource.role - if resource.name == "cpus": - offer_cpu = float(resource.scalar.value) - if resource.name == "mem": - offer_mem = int(resource.scalar.value) + for resource in offer['resources']: + offer_role = resource['role'] + if resource['name'] == "cpus": + offer_cpu = float(resource['scalar']['value']) + if resource['name'] == "mem": + offer_mem = int(resource['scalar']['value']) logger.debug("Received offer for cpus:%f mem:%d role:%s", offer_cpu, offer_mem, offer_role) # Look for a task in the queue that fits the bill with self._processing_queue: - if offer.slave_id.value in self.blacklist: - logger.info("Ignoring offer from blacklisted slave %s", offer.slave_id.value) - driver.declineOffer(offer.id) + if offer['agent_id']['value'] in self.blacklist: + logger.info("Ignoring offer from blacklisted agent %s", offer['agent_id']['value']) + driver.declineOffer(offer['id']) continue for idx, (dockerfile, build_task) in enumerate(self.queued_tasks): @@ -265,11 +393,11 @@ def _handle_offers(self, driver, offers): self.pending -= 1 self.running += 1 tasks_to_launch.append((offer, offer_role, cpu, mem, dockerfile, build_task)) - logger.info("Launching build task %s with offer from %s", build_task.task_id, offer.hostname) + logger.info("Launching build task %s with offer from %s", build_task.task_id, offer['hostname']) break # TODO: Don't currently support launching multiple tasks in a single offer else: logger.debug("Ignoring offer %r", offer) - driver.declineOffer(offer.id) + driver.declineOffer(offer['id']) # Remove all of the tasks that are about to be launched self.queued_tasks = filter(None, self.queued_tasks) @@ -295,221 +423,94 @@ def _handle_offers(self, driver, offers): tasks = [] if tasks: - driver.launchTasks([offer.id], tasks) + driver.launchTasks([offer['id']], tasks) else: - driver.declineOffer(offer.id) - - def status_update(self, driver, update): - """Called when a status update is received from the mesos cluster.""" - - finished = False - failed = False - task_id = update.task_id.value - - if update.task_id.value not in self.task_history: - logger.error("Task update for unknown task! %s", task_id) - return - - if update.state == mesos_pb2.TASK_STARTING: - logger.info("Task update %s : STARTING", task_id) - if update.state == mesos_pb2.TASK_RUNNING: - logger.info("Task update %s : RUNNING", task_id) - if update.state == mesos_pb2.TASK_FAILED: - logger.info("Task update %s : FAILED", task_id) - if update.message and update.data: - logger.info("Exception caught while building image: \n\n%s", update.data) - failed = True - elif update.state == mesos_pb2.TASK_FINISHED: - logger.info("Task update %s : FINISHED", task_id) - finished = True - elif update.state == mesos_pb2.TASK_KILLED: - logger.info("Task update %s : KILLED", task_id) - failed = True - elif update.state == mesos_pb2.TASK_LOST: - logger.info("Task update %s : LOST", task_id) - failed = True - - # Update the last known status of the task - last_known_state = self.task_status[task_id] - self.task_status[task_id] = update.state - - if finished: - self.cleanup.schedule_cleanup(task_id) - - self.running -= 1 - self.finished += 1 - elif failed: - self.running -= 1 - - # Re-queue the task if it hasn't started RUNNING yet - if last_known_state in {None, mesos_pb2.TASK_STARTING, mesos_pb2.TASK_STAGING} and \ - self.task_retries[task_id] < self.max_retries: - self._reschedule_task(task_id, blacklist_slave=update.slave_id.value) - else: - self.failed += 1 - self.cleanup.schedule_cleanup(task_id) - - # If there are no tasks running, and the queue is empty, we should stop - if self.running == 0 and self.pending == 0: - driver.stop() - - def framework_message(self, driver, executorId, slaveId, message): - message = message.decode('unicode-escape') - if "Buffering" in message: # Heh. This'll do for now, eh? - logger.debug("\t%s", message) - else: - logger.info("\t%s", message) - - def _reschedule_task(self, task_id, blacklist_slave=None): - if task_id not in self.task_history: - logger.error("Cannot re-schedule unknown task %s", task_id) - return - - with self._processing_queue: - del self.task_status[task_id] - - self.pending += 1 - self.task_retries[task_id] += 1 - - if blacklist_slave: - self.blacklist.add(blacklist_slave) - - self.queued_tasks.append(self.task_history[task_id]) - logger.info("Re-scheduling task [%d] %s, excluding slave %s", - self.task_retries[task_id], task_id, blacklist_slave) + driver.declineOffer([offer['id']]) def _prepare_task(self, driver, dockerfile, build_task, offer, cpu, mem, role): # Define the mesos task - task = mesos_pb2.TaskInfo() - task.name = "%s/%s" % (":".join([build_task.image.registry.hostname, str(build_task.image.registry.port)]), build_task.image.repository) - task.task_id.value = build_task.task_id - task.slave_id.value = offer.slave_id.value + task = {} + task['name'] = "%s/%s" % (":".join([build_task.image.registry.hostname, str(build_task.image.registry.port)]), build_task.image.repository) + task['task_id'] = { + 'value': build_task.task_id + } + task['agent_id'] = { + 'value': offer['agent_id']['value'] + } # Create the executor args = [] if self.verbose: args.append("--verbose") - task.executor.executor_id.value = build_task.task_id - task.executor.command.value = "${MESOS_SANDBOX:-${MESOS_DIRECTORY}}/%s/bin/portainer %s build-executor" % ( - os.path.basename(self.executor_uri).rstrip(".tar.gz"), " ".join(args) - ) + task['executor'] = { + 'executor_id': { + 'value': build_task.task_id + }, + 'command': { + 'value': "${MESOS_SANDBOX:-${MESOS_DIRECTORY}}/%s/bin/portainer %s run-executor" + % (os.path.basename(self.executor_uri).rstrip(".tar.gz"), " ".join(args)) + } + } if self.container_image: - task.executor.container.type = mesos_pb2.ContainerInfo.DOCKER - task.executor.container.docker.image = self.container_image - task.executor.container.docker.privileged = True + task['executor']['container'] = { + 'type': 'DOCKER', + 'docker': { + 'image': self.container_image, + 'privileged': True + } + } - task.executor.name = "build" - task.executor.source = "build %s" % (task.name) + task['executor']['name'] = "build" + task['executor']['source'] = "build %s" % (task['name']) # Configure the mesos executor with the portainer executor uri - portainer_executor = task.executor.command.uris.add() - portainer_executor.value = self.executor_uri + task['executor']['command']['uris'] = [ + { + 'value': self.executor_uri + } + ] if build_task.context: # Add the docker context - uri = task.executor.command.uris.add() - uri.value = build_task.context_url - uri.extract = False + task['executor']['command']['uris'].append({ + 'value': build_task.context_url, + 'extract': False + }) - task.data = build_task.SerializeToString() - task.executor.data = task.data + task['data'] = base64.b64encode(build_task.SerializeToString()) + task['executor']['data'] = task['data'] # Build up the resources we require - cpu_resource = task.resources.add() - cpu_resource.name = "cpus" - cpu_resource.type = mesos_pb2.Value.SCALAR - cpu_resource.role = role - cpu_resource.scalar.value = cpu - - mem_resource = task.resources.add() - mem_resource.name = "mem" - mem_resource.type = mesos_pb2.Value.SCALAR - mem_resource.role = role - mem_resource.scalar.value = mem + cpu_resource = { + 'name': "cpus", + 'type': 'SCALAR', + 'role': role, + 'scalar': { + 'value': cpu + } + } + + mem_resource = { + 'name': "mem", + 'type': 'SCALAR', + 'role': role, + 'scalar': { + 'value': mem + } + } + + task['resources'] = [ + cpu_resource, + mem_resource + ] self.task_history[build_task.task_id] = (dockerfile, build_task) return task - def _make_build_context(self, output, context_root, dockerfile): - """Generate and return a compressed tar archive of the build context.""" - - if not self.filesystem: - raise StagingSystemRequiredException("A staging filesystem is required for local sources") - - tar = tarfile.open(mode="w:gz", fileobj=output) - for idx, (cmd, instruction) in enumerate(dockerfile.instructions): - if cmd in ("ADD", "COPY"): - local_path, remote_path = instruction - tar_path = "context/%s" % str(idx) - - # TODO(tarnfeld): This isn't strict enough - if local_path.startswith("http"): - logger.debug("Skipping remote ADD %s", local_path) - continue - - if not local_path.startswith("/"): - local_path = os.path.join(context_root, local_path) - local_path = os.path.abspath(local_path) - - if os.path.isfile(local_path): - # Preserve the file extension - parts = local_path.split(".") - if len(parts) > 1: - tar_path += "." + parts[-1] - logger.debug("Adding path %s to tar in %s", local_path, tar_path) - tar.add(local_path, arcname=tar_path) - else: - ignore = set() - for (dirpath, _, filenames) in os.walk(local_path, followlinks=True): - # Update the set of ignored paths with any new .dockerignore files we see - ignore_path = os.path.join(dirpath, ".dockerignore") - if os.path.exists(ignore_path): - with open(ignore_path, 'r') as f: - for glob in parse_dockerignore(f): - ignore.add(os.path.join(dirpath, glob) + "*") - - for filename in filenames: - path = os.path.join(dirpath, filename) - for expr in ignore: - if fnmatch(path, expr): - logger.debug("Ignoring path %s", path) - break - else: - rel_path = path.replace(local_path, '').lstrip('/') - logger.debug("Adding path %s to tar in %s", rel_path, tar_path) - tar.add(path, arcname=os.path.join(tar_path, rel_path)) - - dockerfile.instructions[idx] = (cmd, (tar_path, remote_path)) - - # Write the modified dockerfile into the tar also - buildfile = StringIO.StringIO() - buildfile.write("# Generated by portainer\n") - - for cmd, instructions in dockerfile.instructions: - if cmd not in dockerfile.INTERNAL: - line = "%s %s" % (cmd, " ".join(instructions)) - - logger.debug("Adding instruction %r to dockerfile", line) - buildfile.write("%s\n" % line) - - buildfile.seek(0, os.SEEK_END) - info = tarfile.TarInfo("Dockerfile") - info.size = buildfile.tell() - - buildfile.seek(0) - tar.addfile(info, fileobj=buildfile) - - tar.close() - output.seek(0, os.SEEK_END) - tar_size = output.tell() - output.seek(0) - - return tar_size - class TaskCleanupThread(threading.Thread): diff --git a/requirements.pexbuild.txt b/requirements.pexbuild.txt index ea5977e..0930d66 100644 --- a/requirements.pexbuild.txt +++ b/requirements.pexbuild.txt @@ -1,4 +1,4 @@ -pex==0.8.6 -pip==1.5.2 -setuptools==7 +pex==1.1.19 +pip==9.0.1 +setuptools==30.4.0 wheel diff --git a/requirements.pip b/requirements.pip index 50ccb29..dddf00d 100644 --- a/requirements.pip +++ b/requirements.pip @@ -1,9 +1,8 @@ -git+git://github.com/tarnfeld/pesos.git@d489a976cd9aaf9012e03888d7bfb44b903977c6#egg=pesos -git+git://github.com/duedil-ltd/pyfilesystem.git@f952ec334e074d56f187dd61a3932d7b884dbdde#egg=fs +git+git://github.com/duedil-ltd/pyfilesystem.git@2b58f1359f50ecaf9ffea1e9ab3a88afb9f84d5e#egg=fs trollius==0.4 protobuf>=2.6.1,<2.7 -docker-py==0.4.0 +docker==2.0.2 boto==2.27.0 progressbar==2.2 pywebhdfs==0.2.4 -mesos.interface==0.21.1 +pymesos==0.2.9