Skip to content

Commit

Permalink
Enable opentelemetry support in the ETOS API
Browse files Browse the repository at this point in the history
  • Loading branch information
t-persson committed Nov 30, 2023
1 parent 4a36df8 commit 0ac1ce4
Show file tree
Hide file tree
Showing 16 changed files with 186 additions and 150 deletions.
4 changes: 1 addition & 3 deletions python/docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@
import inspect
import shutil

__location__ = os.path.join(
os.getcwd(), os.path.dirname(inspect.getfile(inspect.currentframe()))
)
__location__ = os.path.join(os.getcwd(), os.path.dirname(inspect.getfile(inspect.currentframe())))

# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
Expand Down
4 changes: 4 additions & 0 deletions python/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,7 @@ gql[requests]~=3.4
httpx~=0.24
kubernetes~=26.1
sse-starlette~=1.6
opentelemetry-api~=1.21
opentelemetry-exporter-otlp~=1.21
opentelemetry-instrumentation-fastapi==0.42b0
opentelemetry-sdk~=1.21
4 changes: 4 additions & 0 deletions python/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ install_requires =
httpx~=0.24
kubernetes~=26.1
sse-starlette~=1.6
opentelemetry-api~=1.21
opentelemetry-exporter-otlp~=1.21
opentelemetry-instrumentation-fastapi==0.42b0
opentelemetry-sdk~=1.21

# Require a specific Python version, e.g. Python 2.7 or >= 3.4
python_requires = >=3.4
Expand Down
29 changes: 27 additions & 2 deletions python/src/etos_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,44 @@
# limitations under the License.
"""ETOS API module."""
import os
from importlib.metadata import version, PackageNotFoundError
from importlib.metadata import PackageNotFoundError, version

from etos_lib.logging.logger import setup_logging
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.sdk.resources import SERVICE_NAME, SERVICE_NAMESPACE, SERVICE_VERSION, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

from etos_api.library.context_logging import ContextLogging

from .main import APP

# The API shall not send logs to RabbitMQ as it
# is too early in the ETOS test run.
os.environ["ETOS_ENABLE_SENDING_LOGS"] = "false"

try:
VERSION = version("environment_provider")
VERSION = version("etos_api")
except PackageNotFoundError:
VERSION = "Unknown"

DEV = os.getenv("DEV", "false").lower() == "true"
ENVIRONMENT = "development" if DEV else "production"
setup_logging("ETOS API", VERSION, ENVIRONMENT)

if os.getenv("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"):
PROVIDER = TracerProvider(
resource=Resource.create(
{SERVICE_NAME: "etos-api", SERVICE_VERSION: VERSION, SERVICE_NAMESPACE: ENVIRONMENT}
)
)
EXPORTER = OTLPSpanExporter()
PROCESSOR = BatchSpanProcessor(EXPORTER)
PROVIDER.add_span_processor(PROCESSOR)
trace.set_tracer_provider(PROVIDER)

FastAPIInstrumentor().instrument_app(
APP, tracer_provider=PROVIDER, excluded_urls="selftest/.*,logs/.*"
)
8 changes: 2 additions & 6 deletions python/src/etos_api/library/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,18 +161,14 @@ async def digest(self, name: str) -> str:
response = await self.head(session, manifest_url, self.token(manifest_url))
try:
if response.status == 401 and "www-authenticate" in response.headers:
self.logger.info(
"Generate a new authorization token for %r", manifest_url
)
self.logger.info("Generate a new authorization token for %r", manifest_url)
response_json = await self.authorize(session, response)
with self.lock:
self.tokens[manifest_url] = {
"token": response_json.get("token"),
"expire": time.time() + response_json.get("expires_in"),
}
response = await self.head(
session, manifest_url, self.token(manifest_url)
)
response = await self.head(session, manifest_url, self.token(manifest_url))
digest = response.headers.get("Docker-Content-Digest")
except aiohttp.ClientResponseError as exception:
self.logger.error("Error getting container image %r", exception)
Expand Down
4 changes: 1 addition & 3 deletions python/src/etos_api/library/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,7 @@ def thread_exception_handler(exception_info, function, *args, **kwargs):
loop = asyncio.get_event_loop()
future = loop.run_in_executor(
None, # Default executor.
functools.partial(
thread_exception_handler, sys.exc_info(), function, *args, **kwargs
),
functools.partial(thread_exception_handler, sys.exc_info(), function, *args, **kwargs),
)
return await asyncio.wait_for(future, timeout=None)

Expand Down
16 changes: 4 additions & 12 deletions python/src/etos_api/library/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,7 @@ class Recipe(BaseModel):
}

@validator("constraints")
def validate_constraints(
cls, value
): # Pydantic requires cls. pylint:disable=no-self-argument
def validate_constraints(cls, value): # Pydantic requires cls. pylint:disable=no-self-argument
"""Validate the constraints fields for each recipe.
Validation is done manually because error messages from pydantic
Expand Down Expand Up @@ -133,14 +131,10 @@ def validate_constraints(
count[constraint.key] += 1
more_than_one = [key for key, number in count.items() if number > 1]
if more_than_one:
raise ValueError(
f"Too many instances of keys {more_than_one}. Only 1 allowed."
)
raise ValueError(f"Too many instances of keys {more_than_one}. Only 1 allowed.")
missing = [key for key, number in count.items() if number == 0]
if missing:
raise ValueError(
f"Too few instances of keys {missing}. At least 1 required."
)
raise ValueError(f"Too few instances of keys {missing}. At least 1 required.")
return value


Expand Down Expand Up @@ -169,9 +163,7 @@ async def _download_suite(self, test_suite_url):
suite = requests.get(test_suite_url, timeout=60)
suite.raise_for_status()
except Exception as exception: # pylint:disable=broad-except
raise AssertionError(
f"Unable to download suite from {test_suite_url}"
) from exception
raise AssertionError(f"Unable to download suite from {test_suite_url}") from exception
return suite.json()

async def validate(self, test_suite_url):
Expand Down
9 changes: 5 additions & 4 deletions python/src/etos_api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
# limitations under the License.
"""ETOS API."""
import logging

from fastapi import FastAPI

# from opentelemetry.sdk.trace import TracerProvider
from starlette.responses import RedirectResponse
from etos_api import routers

from etos_api import routers

APP = FastAPI()
LOGGER = logging.getLogger(__name__)
Expand All @@ -45,9 +48,7 @@ async def redirect_head_to_root():
"""
LOGGER.debug("Redirecting head requests to the root endpoint to '/sefltest/ping'")
# DEPRECATED. Exists only for backwards compatibility.
return RedirectResponse(
url="/selftest/ping", status_code=308
) # 308 = Permanent Redirect
return RedirectResponse(url="/selftest/ping", status_code=308) # 308 = Permanent Redirect


APP.include_router(routers.etos.ROUTER)
Expand Down
154 changes: 82 additions & 72 deletions python/src/etos_api/routers/environment_provider/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,20 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Environment provider proxy API."""
import logging
import asyncio
import logging
import os
import time

import aiohttp
from fastapi import APIRouter, HTTPException
from etos_lib import ETOS
from fastapi import APIRouter, HTTPException
from opentelemetry import trace

from .schemas import ConfigureEnvironmentProviderRequest

ROUTER = APIRouter()
TRACER = trace.get_tracer("etos_api.routers.environment_provider.router")
LOGGER = logging.getLogger(__name__)


Expand All @@ -36,48 +39,48 @@ async def _wait_for_configuration(etos_library, environment):
:param environment: Environment that has been configured.
:type environment: :obj:`etos_api.routers.etos.schemas.ConfigureEnvironmentProviderRequest`
"""
LOGGER.info("Waiting for configuration to be applied in the environment provider.")
end_time = time.time() + etos_library.debug.default_http_timeout
LOGGER.debug("Timeout: %r", etos_library.debug.default_http_timeout)
async with aiohttp.ClientSession() as session:
while time.time() < end_time:
try:
async with session.get(
f"{etos_library.debug.environment_provider}/configure",
params={"suite_id": environment.suite_id},
headers={
"Content-Type": "application/json",
"Accept": "application/json",
},
) as response:
assert 200 <= response.status < 400
response_json = await response.json()
LOGGER.info("Configuration: %r", response_json)
assert response_json.get("dataset") is not None
assert response_json.get("iut_provider") is not None
assert response_json.get("log_area_provider") is not None
assert response_json.get("execution_space_provider") is not None
break
except AssertionError:
if response.status < 400:
LOGGER.warning("Configuration not ready yet.")
else:
LOGGER.warning(
"Configuration verification request failed: %r, %r",
response.status,
response.reason,
)
await asyncio.sleep(2)
else:
raise HTTPException(
status_code=400,
detail="Environment provider configuration did not apply properly",
)
with TRACER.start_as_current_span("wait-for-configuration") as span:
LOGGER.info("Waiting for configuration to be applied in the environment provider.")
end_time = time.time() + etos_library.debug.default_http_timeout
LOGGER.debug("Timeout: %r", etos_library.debug.default_http_timeout)
span.set_attribute("timeout", etos_library.debug.default_http_timeout)
async with aiohttp.ClientSession() as session:
while time.time() < end_time:
try:
async with session.get(
f"{etos_library.debug.environment_provider}/configure",
params={"suite_id": environment.suite_id},
headers={
"Content-Type": "application/json",
"Accept": "application/json",
},
) as response:
assert 200 <= response.status < 400
response_json = await response.json()
LOGGER.info("Configuration: %r", response_json)
assert response_json.get("dataset") is not None
assert response_json.get("iut_provider") is not None
assert response_json.get("log_area_provider") is not None
assert response_json.get("execution_space_provider") is not None
break
except AssertionError:
if response.status < 400:
LOGGER.warning("Configuration not ready yet.")
else:
LOGGER.warning(
"Configuration verification request failed: %r, %r",
response.status,
response.reason,
)
await asyncio.sleep(2)
else:
raise HTTPException(
status_code=400,
detail="Environment provider configuration did not apply properly",
)


@ROUTER.post(
"/environment_provider/configure", tags=["environment_provider"], status_code=204
)
@ROUTER.post("/environment_provider/configure", tags=["environment_provider"], status_code=204)
async def configure_environment_provider(
environment: ConfigureEnvironmentProviderRequest,
):
Expand All @@ -86,35 +89,42 @@ async def configure_environment_provider(
:param environment: Environment to configure.
:type environment: :obj:`etos_api.routers.etos.schemas.ConfigureEnvironmentProviderRequest`
"""
LOGGER.identifier.set(environment.suite_id)
LOGGER.info("Configuring environment provider using %r", environment)
etos_library = ETOS("ETOS API", os.getenv("HOSTNAME"), "ETOS API")
with TRACER.start_as_current_span("configure-environment-provider") as span:
LOGGER.identifier.set(environment.suite_id)
span.set_attribute("etos.id", environment.suite_id)
span.set_attribute("etos.iut_provider", environment.iut_provider)
span.set_attribute("etos.execution_space_provider", environment.execution_space_provider)
span.set_attribute("etos.log_area_provider", environment.log_area_provider)
span.set_attribute("etos.dataset", str(environment.dataset))

end_time = time.time() + etos_library.debug.default_http_timeout
LOGGER.debug("HTTP Timeout: %r", etos_library.debug.default_http_timeout)
async with aiohttp.ClientSession() as session:
while time.time() < end_time:
try:
async with session.post(
f"{etos_library.debug.environment_provider}/configure",
json=environment.dict(),
headers={
"Content-Type": "application/json",
"Accept": "application/json",
},
) as response:
assert 200 <= response.status < 400
break
except AssertionError:
LOGGER.warning(
"Configuration request failed: %r, %r",
response.status,
response.reason,
LOGGER.info("Configuring environment provider using %r", environment)
etos_library = ETOS("ETOS API", os.getenv("HOSTNAME"), "ETOS API")

end_time = time.time() + etos_library.debug.default_http_timeout
LOGGER.debug("HTTP Timeout: %r", etos_library.debug.default_http_timeout)
async with aiohttp.ClientSession() as session:
while time.time() < end_time:
try:
async with session.post(
f"{etos_library.debug.environment_provider}/configure",
json=environment.dict(),
headers={
"Content-Type": "application/json",
"Accept": "application/json",
},
) as response:
assert 200 <= response.status < 400
break
except AssertionError:
LOGGER.warning(
"Configuration request failed: %r, %r",
response.status,
response.reason,
)
await asyncio.sleep(2)
else:
raise HTTPException(
status_code=400,
detail=f"Unable to configure environment provider with '{environment.json()}'",
)
await asyncio.sleep(2)
else:
raise HTTPException(
status_code=400,
detail=f"Unable to configure environment provider with '{environment.json()}'",
)
await _wait_for_configuration(etos_library, environment)
await _wait_for_configuration(etos_library, environment)
Loading

0 comments on commit 0ac1ce4

Please sign in to comment.