diff --git a/integration_tests.py b/integration_tests.py index 9ffc041d2ae..f1c0745b121 100755 --- a/integration_tests.py +++ b/integration_tests.py @@ -350,6 +350,12 @@ def install_server(): # This runs a continuous loop that exports the config in yaml # for the diracx container to use + # It needs to be started and running before the DIRAC server installation + # because after installing the databases, the install server script + # calls dirac-login. + # At this point we need the new CS to have been updated + # already else the token exchange fails. + typer.secho("Starting configuration export loop for diracx", fg=c.GREEN) base_cmd = _build_docker_cmd("server", tty=False, daemon=True, use_root=True) subprocess.run( diff --git a/src/DIRAC/FrameworkSystem/Utilities/diracx.py b/src/DIRAC/FrameworkSystem/Utilities/diracx.py new file mode 100644 index 00000000000..3c37d74402f --- /dev/null +++ b/src/DIRAC/FrameworkSystem/Utilities/diracx.py @@ -0,0 +1,77 @@ +# pylint: disable=import-error + +import requests + +from cachetools import TTLCache, cached +from pathlib import Path +from tempfile import NamedTemporaryFile +from typing import Any +from DIRAC import gConfig +from DIRAC.ConfigurationSystem.Client.Helpers import Registry + + +from diracx.core.preferences import DiracxPreferences + +from diracx.core.utils import write_credentials + +from diracx.core.models import TokenResponse +from diracx.client import DiracClient + +# How long tokens are kept +DEFAULT_TOKEN_CACHE_TTL = 5 * 60 + +# Add a cache not to query the token all the time +_token_cache = TTLCache(maxsize=100, ttl=DEFAULT_TOKEN_CACHE_TTL) + + +@cached(_token_cache, key=lambda x, y: repr(x)) +def _get_token(credDict, diracxUrl, /) -> Path: + """ + Write token to a temporary file and return the path to that file + + """ + + apiKey = gConfig.getValue("/DiracX/LegacyExchangeApiKey") + if not apiKey: + raise ValueError("Missing mandatory /DiracX/LegacyExchangeApiKey configuration") + + vo = Registry.getVOForGroup(credDict["group"]) + dirac_properties = list(set(credDict.get("groupProperties", [])) | set(credDict.get("properties", []))) + group = credDict["group"] + + scopes = [f"vo:{vo}", f"group:{group}"] + [f"property:{prop}" for prop in dirac_properties] + + r = requests.get( + f"{diracxUrl}/auth/legacy-exchange", + params={ + "preferred_username": credDict["username"], + "scope": " ".join(scopes), + }, + headers={"Authorization": f"Bearer {apiKey}"}, + timeout=10, + ) + + r.raise_for_status() + + token_location = Path(NamedTemporaryFile().name) + + write_credentials(TokenResponse(**r.json()), location=token_location) + + return token_location + + +def TheImpersonator(credDict: dict[str, Any]) -> DiracClient: + """ + Client to be used by DIRAC server needing to impersonate + a user for diracx. + It queries a token, places it in a file, and returns the `DiracClient` + class + + Use as a context manager + """ + + diracxUrl = gConfig.getValue("/DiracX/URL") + token_location = _get_token(credDict, diracxUrl) + pref = DiracxPreferences(url=diracxUrl, credentials_path=token_location) + + return DiracClient(diracx_preferences=pref) diff --git a/src/DIRAC/Resources/Storage/StorageBase.py b/src/DIRAC/Resources/Storage/StorageBase.py index 0aef091039b..5b85279bf47 100755 --- a/src/DIRAC/Resources/Storage/StorageBase.py +++ b/src/DIRAC/Resources/Storage/StorageBase.py @@ -336,9 +336,7 @@ def constructURLFromLFN(self, lfn, withWSUrl=False): # 2. VO name must not appear as any subdirectory or file name lfnSplitList = lfn.split("/") voLFN = lfnSplitList[1] - # TODO comparison to Sandbox below is for backward compatibility, should - # be removed in the next release - if voLFN != self.se.vo and voLFN != "SandBox" and voLFN != "Sandbox": + if voLFN != self.se.vo and voLFN != "SandBox" and voLFN != "S3": return S_ERROR(f"LFN ({lfn}) path must start with VO name ({self.se.vo})") urlDict = dict(self.protocolParameters) diff --git a/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg b/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg index b619a2cba9f..131aec67efa 100644 --- a/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg +++ b/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg @@ -142,6 +142,8 @@ Services SandboxPrefix = Sandbox BasePath = /opt/dirac/storage/sandboxes DelayedExternalDeletion = True + # If true, uploads the sandbox via diracx on an S3 storage + UseDiracXBackend = False Authorization { Default = authenticated diff --git a/src/DIRAC/WorkloadManagementSystem/Service/SandboxStoreHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/SandboxStoreHandler.py index 8f20b96ad33..048b399c8ed 100755 --- a/src/DIRAC/WorkloadManagementSystem/Service/SandboxStoreHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/SandboxStoreHandler.py @@ -9,6 +9,7 @@ """ import hashlib import os +import requests import tempfile import threading import time @@ -49,6 +50,7 @@ def initializeHandler(cls, serviceInfoDict): def initializeRequest(self): self.__backend = self.getCSOption("Backend", "local") self.__localSEName = self.getCSOption("LocalSE", "SandboxSE") + self._useDiracXBackend = self.getCSOption("UseDiracXBackend", False) self._maxUploadBytes = self.getCSOption("MaxSandboxSizeMiB", 10) * 1048576 if self.__backend.lower() == "local" or self.__backend == self.__localSEName: self.__useLocalStorage = True @@ -106,6 +108,51 @@ def _getFromClient(self, fileId, token, fileSize, fileHelper=None, data=""): gLogger.info("Upload requested", f"for {aHash} [{extension}]") credDict = self.getRemoteCredentials() + + if self._useDiracXBackend: + from DIRAC.FrameworkSystem.Utilities.diracx import TheImpersonator + from diracx.client.models import SandboxInfo # pylint: disable=import-error + + gLogger.info("Forwarding to DiracX") + with tempfile.TemporaryFile(mode="w+b") as tar_fh: + result = fileHelper.networkToDataSink(tar_fh, maxFileSize=self._maxUploadBytes) + if not result["OK"]: + return result + tar_fh.seek(0) + + hasher = hashlib.sha256() + while data := tar_fh.read(512 * 1024): + hasher.update(data) + checksum = hasher.hexdigest() + tar_fh.seek(0) + gLogger.debug("Sandbox checksum is", checksum) + + sandbox_info = SandboxInfo( + checksum_algorithm="sha256", + checksum=checksum, + size=os.stat(tar_fh.fileno()).st_size, + format=extension, + ) + + with TheImpersonator(credDict) as client: + res = client.jobs.initiate_sandbox_upload(sandbox_info) + + if res.url: + gLogger.debug("Uploading sandbox for", res.pfn) + files = {"file": ("file", tar_fh)} + + response = requests.post(res.url, data=res.fields, files=files, timeout=300) + + gLogger.debug("Sandbox uploaded", f"for {res.pfn} with status code {response.status_code}") + # TODO: Handle this error better + try: + response.raise_for_status() + except Exception as e: + return S_ERROR("Error uploading sandbox", repr(e)) + else: + gLogger.debug("Sandbox already exists in storage backend", res.pfn) + return S_OK(res.pfn) + sbPath = self.__getSandboxPath(f"{aHash}.{extension}") # Generate the location result = self.__generateLocation(sbPath) @@ -431,6 +478,27 @@ def _sendToClient(self, fileID, token, fileHelper=None, raw=False): credDict = self.getRemoteCredentials() serviceURL = self.serviceInfoDict["URL"] filePath = fileID.replace(serviceURL, "") + + # If the PFN starts with S3, we know it has been uploaded to the + # S3 sandbox store, so download it from there before sending it + if filePath.startswith("/S3"): + from DIRAC.FrameworkSystem.Utilities.diracx import TheImpersonator + + with TheImpersonator(credDict) as client: + res = client.jobs.get_sandbox_file(pfn=filePath) + r = requests.get(res.url) + r.raise_for_status() + sbData = r.content + if fileHelper: + from io import BytesIO + + result = fileHelper.DataSourceToNetwork(BytesIO(sbData)) + # fileHelper.oFile.close() + return result + if raw: + return sbData + return S_OK(sbData) + result = self.sandboxDB.getSandboxId(self.__localSEName, filePath, credDict["username"], credDict["group"]) if not result["OK"]: return result diff --git a/tests/Jenkins/utilities.sh b/tests/Jenkins/utilities.sh index f3c200bb149..bddbe8a9195 100644 --- a/tests/Jenkins/utilities.sh +++ b/tests/Jenkins/utilities.sh @@ -612,7 +612,7 @@ diracProxies() { # Make sure DiracX is running # And make sure it was synced - if [[ -n $DIRACX_URL ]]; then + if [[ -n $TEST_DIRACX ]]; then echo "Waiting for for DiracX to be available" >&2 for i in {1..100}; do if dirac-login -C "${SERVERINSTALLDIR}/user/client.pem" -K "${SERVERINSTALLDIR}/user/client.key" -T 72 "${DEBUG}"; then