diff --git a/consumer/Dockerfile b/consumer/Dockerfile index d9c75a7..e8025f4 100644 --- a/consumer/Dockerfile +++ b/consumer/Dockerfile @@ -6,6 +6,11 @@ WORKDIR /app # Copy the current directory contents into the container at /app COPY ./requirements.txt /app/requirements.txt +RUN apt-get update && apt-get install -y \ + gcc \ + python3-dev \ + && rm -rf /var/lib/apt/lists/* + # Install any needed packages specified in requirements.txt RUN pip install --no-cache-dir -r requirements.txt diff --git a/consumer/Makefile b/consumer/Makefile index d9602d1..bb7ea84 100644 --- a/consumer/Makefile +++ b/consumer/Makefile @@ -9,3 +9,6 @@ build-linux: run-docker: docker run --name consumer -d consumer + + +.PHONY: run build-local build-linux run-docker \ No newline at end of file diff --git a/consumer/src/api/full.py b/consumer/src/api/full.py index 6e0b85a..014d9b1 100644 --- a/consumer/src/api/full.py +++ b/consumer/src/api/full.py @@ -17,7 +17,11 @@ async def full_cpu(duration: int, background_tasks: BackgroundTasks): background_tasks.add_task(create_full_cpu_process, duration) return JSONResponse( status_code=status.HTTP_200_OK, - content={"full_type": "cpu", "duration": duration, "message": f"full cpu in {duration} seconds"}, + content={ + "full_type": "cpu", + "duration": duration, + "message": f"full cpu in {duration} seconds", + }, ) @@ -26,7 +30,11 @@ async def full_cpu(duration: int, background_tasks: BackgroundTasks): background_tasks.add_task(create_full_rem_process, duration) return JSONResponse( status_code=status.HTTP_200_OK, - content={"full_type": "mem", "duration": duration, "message": f"full memory in {duration} seconds"}, + content={ + "full_type": "mem", + "duration": duration, + "message": f"full memory in {duration} seconds", + }, ) diff --git a/consumer/src/api/health.py b/consumer/src/api/health.py index 4ad16d3..9c415fc 100644 --- a/consumer/src/api/health.py +++ b/consumer/src/api/health.py @@ -1,8 +1,10 @@ -from fastapi import APIRouter, status +from fastapi import APIRouter, status, BackgroundTasks +from services.log import LoggingBackgroundClass router = APIRouter(prefix="", tags=["Health"]) @router.get("/", status_code=status.HTTP_200_OK) -async def health(): - return {"hello": "world"} +async def health(backgound_tasks: BackgroundTasks): + LoggingBackgroundClass() + return {"health": "ok"} diff --git a/consumer/src/api/job.py b/consumer/src/api/job.py index b6d413a..2a72bbd 100644 --- a/consumer/src/api/job.py +++ b/consumer/src/api/job.py @@ -2,7 +2,7 @@ from fastapi.responses import JSONResponse from services.job import MockBehaviorBackgroundClass -from state.instance import BackgroundJobSaver , Sleep +from state.instance import BackgroundJobSaver, Sleep router = APIRouter(prefix="/job", tags=["Job"]) @@ -50,14 +50,11 @@ async def status_normal_behavior(): }, ) + @router.get("/sleep/{seconds}", status_code=status.HTTP_200_OK) async def sleep(seconds: int): - Sleep.set_sleep_time(seconds) return JSONResponse( - status_code=200, - content={ - "message": f"Sleep time set to {seconds} seconds." - } - ) \ No newline at end of file + status_code=200, content={"message": f"Sleep time set to {seconds} seconds."} + ) diff --git a/consumer/src/api/status.py b/consumer/src/api/status.py index 39818f0..ae627a3 100644 --- a/consumer/src/api/status.py +++ b/consumer/src/api/status.py @@ -12,11 +12,12 @@ ) # schemas -from schema.statistics import HardwareResponse , StatisticsResponse +from schema.statistics import HardwareResponse, StatisticsResponse router = APIRouter(prefix="/status", tags=["Status"]) + @router.get("", status_code=status.HTTP_200_OK, response_model=StatisticsResponse) async def all_status(): """ diff --git a/consumer/src/app.py b/consumer/src/app.py index d92958d..8a9a47b 100644 --- a/consumer/src/app.py +++ b/consumer/src/app.py @@ -59,4 +59,3 @@ async def add_process_time_header(request: Request, call_next: callable): app.include_router(full_router, prefix="/api") app.include_router(status_router, prefix="/api") app.include_router(state_router, prefix="/api") - diff --git a/consumer/src/core/config.py b/consumer/src/core/config.py index 51ef575..98f050a 100644 --- a/consumer/src/core/config.py +++ b/consumer/src/core/config.py @@ -2,18 +2,23 @@ from functools import lru_cache -class Settings(): - full_cpu_process_count: int = os.getenv("FULL_CPU_PROCESS_COUNT", 4) - full_ram_byte: int = os.getenv("FULL_RAM_BYTE", 1024*1024*1024*10) - job_interval: int = os.getenv("JOB_INTERVAL", 1) - enqueue_upper_bound: int = os.getenv("ENQUEUE_UPPER_BOUND", 100) - enqueue_lower_bound: int = os.getenv("ENQUEUE_LOWER_BOUND", 1) - dequeue_upper_bound: int = os.getenv("DEQUEUE_UPPER_BOUND", 100) - dequeue_lower_bound: int = os.getenv("DEQUEUE_LOWER_BOUND", 1) - execute_upper_bound: int = os.getenv("EXECUTE_UPPER_BOUND", 100) - execute_lower_bound: int = os.getenv("EXECUTE_LOWER_BOUND", 10) +class Settings: + full_cpu_process_count: int = int(os.getenv("FULL_CPU_PROCESS_COUNT", 4)) + full_ram_byte: int = int(os.getenv("FULL_RAM_BYTE", 1024 * 1024 * 1024 * 10)) + job_interval: int = int(os.getenv("JOB_INTERVAL", 1)) + enqueue_upper_bound: int = int(os.getenv("ENQUEUE_UPPER_BOUND", 100)) + enqueue_lower_bound: int = int(os.getenv("ENQUEUE_LOWER_BOUND", 1)) + dequeue_upper_bound: int = int(os.getenv("DEQUEUE_UPPER_BOUND", 100)) + dequeue_lower_bound: int = int(os.getenv("DEQUEUE_LOWER_BOUND", 1)) + execute_upper_bound: int = int(os.getenv("EXECUTE_UPPER_BOUND", 100)) + execute_lower_bound: int = int(os.getenv("EXECUTE_LOWER_BOUND", 10)) + + log_interval: int = int(os.getenv("LOG_INTERVAL", 1)) + log_file: str = os.getenv("LOG_FILE", "./log/consumer.log") + log_max_bytes: int = int(os.getenv("LOG_MAX_BYTES", 10 * 1024 * 1024)) # 10MB + log_backup_count: int = int(os.getenv("LOG_BACKUP_COUNT", 10)) # 10 files @lru_cache() def get_settings(): - return Settings() \ No newline at end of file + return Settings() diff --git a/consumer/src/schema/statistics.py b/consumer/src/schema/statistics.py index de22493..699c6f8 100644 --- a/consumer/src/schema/statistics.py +++ b/consumer/src/schema/statistics.py @@ -10,4 +10,4 @@ class StatisticsResponse(BaseModel): cpu: HardwareResponse mem: HardwareResponse remain_count: int - avg_exe_time: float \ No newline at end of file + avg_exe_time: float diff --git a/consumer/src/services/full.py b/consumer/src/services/full.py index c3531bf..b73a155 100644 --- a/consumer/src/services/full.py +++ b/consumer/src/services/full.py @@ -4,32 +4,36 @@ settings = get_settings() + def full_cpu_in_duration(duration: int): now = time.time() print("time", now) print(f"full cpu in {duration} seconds") - i=0 + i = 0 while True: if time.time() > now + duration: break i += 1 print("time", time.time()) + def create_full_cpu_process(duration: int): - ''' + """ create 4 process to full cpu in duration seconds - ''' + """ for _ in range(settings.full_cpu_process_count): p = multiprocessing.Process(target=full_cpu_in_duration, args=(duration,)) p.start() - + """ refence: https://stackoverflow.com/questions/6317818/eat-memory-using-python """ + + def full_ram_in_duration(duration: int): now = time.time() print("time", now) @@ -39,7 +43,7 @@ def full_ram_in_duration(duration: int): a = bytearray(settings.full_ram_byte) while True: if time.time() > now + duration: - break + break try: a = a + bytearray(settings.full_ram_byte) @@ -54,7 +58,8 @@ def full_ram_in_duration(duration: int): print(f"Allocated {cnt} GB in {duration} seconds") print("time", time.time()) + def create_full_rem_process(duration: int): p = multiprocessing.Process(target=full_ram_in_duration, args=(duration,)) p.start() - return p \ No newline at end of file + return p diff --git a/consumer/src/services/job.py b/consumer/src/services/job.py index f46f5c4..f8c2661 100644 --- a/consumer/src/services/job.py +++ b/consumer/src/services/job.py @@ -6,6 +6,7 @@ settings = get_settings() + def check_enque(): return random.randint(0, 1) @@ -25,7 +26,9 @@ def get_deque_num(): def enqueue(num: int): Counter.increase(num) TotalQueryCount.increase(num) - execution_time = random.randint(settings.execute_lower_bound, settings.execute_upper_bound) + execution_time = random.randint( + settings.execute_lower_bound, settings.execute_upper_bound + ) AvgExecutionTime.add_time(execution_time, TotalQueryCount.get_count()) diff --git a/consumer/src/services/log.py b/consumer/src/services/log.py new file mode 100644 index 0000000..c2f5010 --- /dev/null +++ b/consumer/src/services/log.py @@ -0,0 +1,51 @@ +import asyncio +import logging + +from core.config import get_settings +from services.statistics import ( + get_cpu_percent, + get_mem_usage, + get_remain_count, + get_avg_exe_time, +) + + +async def backgound_logging_task(): + """ + level,time,cpu,ram,remain_count,avg_exe_time + """ + + settings = get_settings() + + logging.basicConfig() + logger = logging.getLogger("uvicorn.default") + file_handler = logging.handlers.RotatingFileHandler( + filename=settings.log_file, + maxBytes=settings.log_max_bytes, + backupCount=settings.log_backup_count, + ) + + formatter = logging.Formatter( + fmt="%(asctime)s,%(message)s", + datefmt="%H:%M:%S", + ) + + file_handler.setFormatter(formatter) + + logger.addHandler(file_handler) + + while True: + """ + level,time,cpu,ram,remain_count,avg_exe_time + """ + logger.info( + msg=f"{get_cpu_percent()},{get_mem_usage()},{get_remain_count()},{get_avg_exe_time()}" + ) + # print("logging...") + + await asyncio.sleep(settings.log_interval) + + +class LoggingBackgroundClass: + def __init__(self, *args, **kwargs) -> None: + self.task = asyncio.create_task(backgound_logging_task(*args, **kwargs))