-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add async app-server to submit/monitor jobs #5
Comments
Consider using # This is inspired by https://github.com/jaredvasquez/rq-docker
# - there is no explicit LICENSE on that project
# See this SO about sharing AWS credentials with a docker container
# https://stackoverflow.com/questions/36354423/which-is-the-best-way-to-pass-aws-credentials-to-docker-container
version: '3.7'
services:
rq-server:
image: redis:alpine
ports:
- 6379:6379
# # rq-dashboard does not work with arq
# # https://github.com/samuelcolvin/arq/issues/219
# rq-dashboard:
# image: aio-aws-arq
# depends_on:
# - "rq-server"
# command: rq-dashboard -H rq-server
# ports:
# - 9181:9181
arq-worker:
image: aio-aws-arq
depends_on:
- "rq-server"
command: arq aio_aws.aio_aws_arq.WorkerSettings
environment:
- REDIS_DSN=redis://rq-server:6379
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
- AWS_DEFAULT_REGION=${AWS_DEFAULT_REGION}
# deploy:
# replicas: 3 import json
import os
import time
from collections import deque
from random import random
from typing import Dict
from typing import List
from aio_aws.aio_aws_config import AioAWSConfig
from aio_aws.aio_aws_config import response_success
from aio_aws.aio_aws_lambda import AWSLambdaFunction
from arq.jobs import JobStatus
LOGGER = logging.getLogger(__name__)
try:
from arq import create_pool
from arq.connections import RedisSettings
LOGGER.debug("Optional arq module is available")
except ModuleNotFoundError as err:
LOGGER.error("Optional arq package is not available")
raise err
# See https://arq-docs.helpmanual.io/
# See queue names in docker-arq/docker-compose.yml
def get_redis_settings() -> RedisSettings:
redis_dsn = os.getenv("REDIS_DSN")
LOGGER.info("redis_dsn: %s", redis_dsn)
if redis_dsn:
return RedisSettings.from_dsn(redis_dsn)
else:
return RedisSettings()
async def startup(ctx):
aws_region = os.getenv("AWS_DEFAULT_REGION", "us-east-1")
aio_config = AioAWSConfig(
aws_region=aws_region,
max_pool_connections=80, # a large connection pool
min_jitter=0.2,
max_jitter=0.8,
retries=1,
)
ctx["aio_config"] = aio_config
ctx["lambda_func_name"] = "aioAwsQuery"
async def shutdown(ctx):
# await ctx['session'].close()
return
async def aio_aws_query(ctx: Dict) -> List[str]:
lambda_func_name = ctx["lambda_func_name"]
# Create a Lambda event payload
event = {
"process": "aio_aws_lambda",
"query": "s3://bucket/query_key",
}
LOGGER.info("Submit query: %s", event)
payload = json.dumps(event).encode()
func = AWSLambdaFunction(name=lambda_func_name, payload=payload)
await func.invoke()
reports = []
if response_success(func.response):
# the lambda response is a list
reports.extend(func.content)
return reports
async def arq_task(tasks, wait: bool = False) -> List[str]:
"""
:param tasks: ARQ tasks
:param wait: wait for the queued jobs to complete or not
:return: if waiting, gather results
"""
redis = await create_pool(get_redis_settings())
jobs = deque()
for task in tasks:
job = await redis.enqueue_job(task)
if job:
jobs.append(job)
if not jobs:
LOGGER.warning("Failed to enqueue any new jobs")
reports = []
if not wait:
LOGGER.info("Not waiting, see rq-dashboard for jobs")
return reports
while jobs:
# polling jobs for complete jobs, with random sleep
job = jobs.popleft()
# queued, started, deferred, finished, and failed
status = await job.status()
LOGGER.info("Job %s: %s", job.job_id, status)
if status in [JobStatus.deferred, JobStatus.queued, JobStatus.in_progress]:
jobs.append(job)
time.sleep(5 + random()) # maybe not required?
elif status == JobStatus.not_found:
LOGGER.warning("Discarding job not found %s", job.job_id)
elif status == JobStatus.complete:
result = await job.result(timeout=5)
LOGGER.info("Report %s", result)
reports.extend(result)
else:
LOGGER.warning("Unknown job status %s", status)
return reports
# WorkerSettings defines the settings to use when creating the work,
# it's used by the arq cli, see options in docs at
# https://arq-docs.helpmanual.io/#arq.worker.Worker
class WorkerSettings:
functions = [aio_aws_query]
on_startup = startup
on_shutdown = shutdown
redis_settings = get_redis_settings()
queue_name = "aio-aws" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Consider the pattern used by https://github.com/AdRoll/batchiepatchie to run a backend server for submitting and monitoring jobs. The current patterns for CLI scripting can tie up the user without any option to continue interactions.
See also
The text was updated successfully, but these errors were encountered: