diff --git a/did_finder_rucio/scripts/did_finder.py b/did_finder_rucio/scripts/did_finder.py index 5048989c6..7d7c66510 100644 --- a/did_finder_rucio/scripts/did_finder.py +++ b/did_finder_rucio/scripts/did_finder.py @@ -63,6 +63,7 @@ def run_rucio_finder(): logger.info('Starting rucio DID finder') async def callback(did_name, info): + did_name = did_name.split('?')[0] lookup_request = LookupRequest( did=did_name, rucio_adapter=rucio_adapter, diff --git a/docs/deployment/reference.md b/docs/deployment/reference.md index 07678fb2f..07691b663 100644 --- a/docs/deployment/reference.md +++ b/docs/deployment/reference.md @@ -6,106 +6,106 @@ The following table lists the configurable parameters of the ServiceX chart and their default values. Note that you may also wish to change some of the default parameters for the [rabbitMQ](https://github.com/bitnami/charts/tree/master/bitnami/rabbitmq) or [minio](https://github.com/minio/charts) subcharts. -| Parameter | Description | Default | +| Parameter | Description | Default | |--------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------| -| `secrets` | Name of a secret deployed into the cluster. Must follow example_secrets.yaml | - | -| `logging.logstash.enabled` | Enable remote logging | true | -| `logging.logstash.host` | Host running logstash listening for log data | `servicex.atlas-ml.org` | -| `logging.logstash.port` | Port to send logging to | 5959 | -| `logging.logstash.protocol` | Protocol to be used (options are TCP and UDP) | TCP | -| `logging.logstash.monitor` | Link to be shown inside Monitor web page iframe | UC Kibana dashboard | -| `logging.logstash.logs` | Link to be shown inside Logs web page iframe | UC Kibana dashboard | -| `app.image` | ServiceX_App image name | `sslhep/servicex_app` | -| `app.tag` | ServiceX image tag | `latest` | -| `app.logLevel` | Logging level for ServiceX web app (uses standard unix levels) | `WARNING` | -| `app.pullPolicy` | ServiceX image pull policy | `Always` | -| `app.checksImage` | ServiceX init container image for checks | `ncsa/checks:latest` | -| `app.rabbitmq.retries` | Number of times to retry connecting to RabbitMQ on startup | 12 | -| `app.rabbitmq.retry_interval` | Number of seconds to wait between RabbitMQ retries on startup | 10 | -| `app.replicas` | Number of App pods to start. Experimental! | 1 | -| `app.auth` | Enable authentication or allow unfettered access (Python boolean string) | `false` | -| `app.globusClientID` | Globus application Client ID | - | -| `app.globusClientSecret` | Globus application Client Secret | - | -| `app.adminEmail` | Email address for initial admin user | | -| `app.tokenExpires` | Seconds until the ServiceX API tokens (JWT refresh tokens) expire | False (never) | -| `app.authExpires` | Seconds until the JWT access tokens expire | 21600 (six hours) | -| `app.ingress.enabled` | Enable install of ingress | false | -| `app.ingress.class` | Class to be set in `kubernetes.io/ingress.class` annotation | nginx | -| `app.ingress.host` | Hostname to associate ingress with | servicex.ssl-hep.org | -| `app.ingress.defaultBackend` | Name of a service to send requests to internal endpoints to | default-http-backend | -| `app.ingress.tls.enabled` | Enable TLS for ServiceX API Ingress resource | false | -| `app.ingress.tls.secretName` | Name of TLS Secret used for ServiceX API server | `{{.Release.Name}}-app-tls` | -| `app.ingress.tls.clusterIssuer` | Specify a ClusterIssuer if using cert-manager | - | -| `app.resources` | Pass in Kubernetes pod resource spec to deployment to change CPU and memory | { } | -| `app.slackSigningSecret` | Signing secret for Slack application | - | -| `app.newSignupWebhook` | Slack webhook URL for new signups | - | -| `app.mailgunApiKey` | API key to send Mailgun emails to newly approved users | - | -| `app.mailgunDomain` | Sender domain for emails (should be verified through Mailgun) | - | -| `app.defaultDIDFinderScheme` | DID Finder scheme if none provided in request. If left blank, template will attempt to guess. | - | -| `app.validateTransformerImage` | Should docker image name be validated at DockerHub? | `true` | - | `app.defaultUsers` | Name of secret holding json file with default users to create on deployment | - | -| `didFinder.rucio.enabled` | Should we deploy the Rucio DID Finder? | `true` | -| `didFinder.rucio.image` | Rucio DID Finder image name | `sslhep/servicex-did-finder` | -| `didFinder.rucio.tag` | Rucio DID Finder image tag | `latest` | -| `didFinder.rucio.pullPolicy` | Rucio DID Finder image pull policy | `Always` | -| `didFinder.rucio.servicex_latitude` | Latitude of the computing center where ServiceX runs. Will be used by Rucio to return the closest input data replica. | 41.78 | -| `didFinder.rucio.servicex_longitude` | Longitude of the computing center where ServiceX runs. Will be used by Rucio to return the closest input data replica. | -87.7 | -| `didFinder.rucio.reportLogicalFiles` | For CMS xCache sites, we don't want the replicas, only logical names. Set to true to get this behavior | false | -| `didFinder.rucio.rucio_host` | URL for Rucio service to use | `https://voatlasrucio-server-prod.cern.ch:443` | -| `didFinder.rucio.auth _host` | URL to obtain Rucio authentication | `https://atlas-rucio-auth.cern.ch:443` | - -| `didFinder.CERNOpenData.enabled` | Should we deploy the CERN OpenData DID Finder? `true` | -| `didFinder.CERNOpenData.image` | CERN OpenData DID Finder image name | `sslhep/servicex-did-finder` | -| `didFinder.CERNOpenData.tag` | CERN OpenData DID Finder image tag | `latest` | -| `didFinder.CERNOpenData.pullPolicy` | CERN OpenData DID Finder image pull policy | `Always` | -| `codegen.atlasxaod.enabled` | Deploy the ATLAS xAOD Code generator? | true | -| `codegen.atlasxaod.image` | Code generator image | `sslhep/servicex_code_gen_func_adl_xaod` | -| `codegen.atlasxaod.pullPolicy` | | true | -| `codegen.atlasxaod.tag` | Code generator image tag | develop | -| `codegen.atlasxaod.defaultScienceContainerImage` | The transformer image that should be run against this generated code | `sslhep/servicex_func_adl_xaod_transformer` | -| `codegen.atlasxaod.defaultScienceContainerTag` | Tag for the transformer image that should be run against this generated code | develop | -|`codegen.uproot.enabled` | Deploy the uproot code generator? - also all of the code gen settings, above are available | true | -|`codegen.cms.enabled` | Deploy the CMS AOD code generator? - also all of the code gen settings, above are available | true | -|`codegen.python.enabled` | Deploy the python uproot code generator? - also all of the code gen settings, above are available | true | -| `x509Secrets.image` | X509 Secret Service image name | `sslhep/x509-secrets` | -| `x509Secrets.tag` | X509 Secret Service image tag | `latest` | -| `x509Secrets.pullPolicy` | X509 Secret Service image pull policy | `Always` | -| `x509Secrets.vomsOrg` | Which VOMS org to contact for proxy? | `atlas` | -| `x509Secrets.initImage` | X509 Secret Service init container image | `alpine:3.6` | -| `rbacEnabled` | Specify if rbac is enabled in your cluster | `true` | -| `hostMount` | Optional path to mount in transformers as /data | - | -| `gridAccount` | CERN User account name to access Rucio | - | -| `noCerts` | Set to true to disable x509 certs and only use open data | false | -| `rabbitmq.password` | Override the generated RabbitMQ password | leftfoot1 | -| `objectstore.enabled` | Deploy a minio object store with Servicex? | true | -| `objectstore.internal` | Deploy a captive minio instance with this chart? | true | -| `objectstore.publicURL` | What URL should the client use to download files? If set, this is given whether ingress is enabled or not | nil | -| `postgres.enabled` | Deploy a postgres database into cluster? If not, we use a sqllite db | false | -| `minio.auth.rootUser` | Username to log into minio | miniouser | -| `minio.auth.rootPassword` | Password key to log into minio | leftfoot1 | -| `minio.apiIngress.enabled` | Should minio chart deploy an ingress to the service? | false | -| `minio.apiIngress.hostname` | Hostname associate with ingress controller | nil | -| `transformer.cachePrefix` | Prefix string to stick in front of file paths. Useful for XCache | | -| `transformer.autoscaler.enabled` | Enable/disable horizontal pod autoscaler for transformers | True | -| `transformer.autoscaler.cpuScaleThreshold` | CPU percentage threshold for pod scaling | 30 | -| `transformer.autoscaler.minReplicas` | Minimum number of transformer pods per request | 1 | -| `transformer.autoscaler.maxReplicas` | Maximum number of transformer pods per request | 20 | -| `transformer.pullPolicy` | Pull policy for transformer pods (Image name specified in REST Request) | Always | -| `transformer.priorityClassName` | priorityClassName for transformer pods (Not setting it means getting global default) | Not Set | -| `transformer.cpuLimit` | Set CPU resource limit for pod in number of cores | 1 | -| `transformer.sidecarImage` | Image name for the transformer sidecar container that hold the serviceX code | 'sslhep/servicex_sidecar_transformer' | -| `transformer.sidecarTag` | Tag for the sidecar container | 'develop' | -| `transformer.sidecarPullPolicy` | Pull Policy for the sidecar container | 'Always' | -| `transformer.persistence.existingClaim` | Existing persistent volume claim | nil | -| `transformer.subdir` | Subdirectory of the mount to write transformer results to (should end with trailing /) | nil | -| `minioCleanup.enabled` | Enable deployment of minio cleanup service | false | -| `minioCleanup.image` | Default image for minioCleanup cronjob | `sslhep/servicex_minio_cleanup` | -| `minioCleanup.tag` | minioCleanup image tag | | -| `minioCleanup.pullPolicy` | minioCleanup image pull policy | `Always` | -| `minioCleanup.threads` | Number of threads to use when processing S3 Storage | 6 | -| `minioCleanup.logLevel` | Log level to use for logging (e.g. DEBUG, INFO, WARN, ERROR, FATAL) | INFO | -| `minioCleanup.schedule` | Schedule for minioCleanup cronjob. See [reference](https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/#cron-schedule-syntax) for details on fields | `* */8 * * *` (every 8 hours) | -| `minioCleanup.maxAge` | Max age in days before removing results | 30 | -| `minioCleanup.maxSize` | Start removing buckets when total space used reaches this number (can use G,M, T suffixes) | '1G' | -| `minioCleanup.normSize` | Size at which to stop removing buckets | '700M' | | ->>>>>>> origin +| `secrets` | Name of a secret deployed into the cluster. Must follow example_secrets.yaml | - | +| `logging.logstash.enabled` | Enable remote logging | true | +| `logging.logstash.host` | Host running logstash listening for log data | `servicex.atlas-ml.org` | +| `logging.logstash.port` | Port to send logging to | 5959 | +| `logging.logstash.protocol` | Protocol to be used (options are TCP and UDP) | TCP | +| `logging.logstash.monitor` | Link to be shown inside Monitor web page iframe | UC Kibana dashboard | +| `logging.logstash.logs` | Link to be shown inside Logs web page iframe | UC Kibana dashboard | +| `app.image` | ServiceX_App image name | `sslhep/servicex_app` | +| `app.tag` | ServiceX image tag | `latest` | +| `app.logLevel` | Logging level for ServiceX web app (uses standard unix levels) | `WARNING` | +| `app.pullPolicy` | ServiceX image pull policy | `Always` | +| `app.checksImage` | ServiceX init container image for checks | `ncsa/checks:latest` | +| `app.rabbitmq.retries` | Number of times to retry connecting to RabbitMQ on startup | 12 | +| `app.rabbitmq.retry_interval` | Number of seconds to wait between RabbitMQ retries on startup | 10 | +| `app.replicas` | Number of App pods to start. Experimental! | 1 | +| `app.auth` | Enable authentication or allow unfettered access (Python boolean string) | `false` | +| `app.globusClientID` | Globus application Client ID | - | +| `app.globusClientSecret` | Globus application Client Secret | - | +| `app.adminEmail` | Email address for initial admin user | | +| `app.tokenExpires` | Seconds until the ServiceX API tokens (JWT refresh tokens) expire | False (never) | +| `app.authExpires` | Seconds until the JWT access tokens expire | 21600 (six hours) | +| `app.ingress.enabled` | Enable install of ingress | false | +| `app.ingress.class` | Class to be set in `kubernetes.io/ingress.class` annotation | nginx | +| `app.ingress.host` | Hostname to associate ingress with | servicex.ssl-hep.org | +| `app.ingress.defaultBackend` | Name of a service to send requests to internal endpoints to | default-http-backend | +| `app.ingress.tls.enabled` | Enable TLS for ServiceX API Ingress resource | false | +| `app.ingress.tls.secretName` | Name of TLS Secret used for ServiceX API server | `{{.Release.Name}}-app-tls` | +| `app.ingress.tls.clusterIssuer` | Specify a ClusterIssuer if using cert-manager | - | +| `app.resources` | Pass in Kubernetes pod resource spec to deployment to change CPU and memory | { } | +| `app.slackSigningSecret` | Signing secret for Slack application | - | +| `app.newSignupWebhook` | Slack webhook URL for new signups | - | +| `app.mailgunApiKey` | API key to send Mailgun emails to newly approved users | - | +| `app.mailgunDomain` | Sender domain for emails (should be verified through Mailgun) | - | +| `app.defaultDIDFinderScheme` | DID Finder scheme if none provided in request. If left blank, template will attempt to guess. | - | +| `app.validateTransformerImage` | Should docker image name be validated at DockerHub? | `true` | +| `app.defaultUsers` | Name of secret holding json file with default users to create on deployment | - | +| `didFinder.rucio.enabled` | Should we deploy the Rucio DID Finder? | `true` | +| `didFinder.rucio.image` | Rucio DID Finder image name | `sslhep/servicex-did-finder` | +| `didFinder.rucio.tag` | Rucio DID Finder image tag | `latest` | +| `didFinder.rucio.pullPolicy` | Rucio DID Finder image pull policy | `Always` | +| `didFinder.rucio.servicex_latitude` | Latitude of the computing center where ServiceX runs. Will be used by Rucio to return the closest input data replica. | 41.78 | +| `didFinder.rucio.servicex_longitude` | Longitude of the computing center where ServiceX runs. Will be used by Rucio to return the closest input data replica. | -87.7 | +| `didFinder.rucio.reportLogicalFiles` | For CMS xCache sites, we don't want the replicas, only logical names. Set to true to get this behavior | false | +| `didFinder.rucio.rucio_host` | URL for Rucio service to use | `https://voatlasrucio-server-prod.cern.ch:443` | +| `didFinder.rucio.auth _host` | URL to obtain Rucio authentication | `https://atlas-rucio-auth.cern.ch:443` | +| `didFinder.CERNOpenData.enabled` | Should we deploy the CERN OpenData DID Finder? | `true` | +| `didFinder.CERNOpenData.image` | CERN OpenData DID Finder image name | `sslhep/servicex-did-finder` | +| `didFinder.CERNOpenData.tag` | CERN OpenData DID Finder image tag | `latest` | +| `didFinder.CERNOpenData.pullPolicy` | CERN OpenData DID Finder image pull policy | `Always` | +| `codegen.atlasxaod.enabled` | Deploy the ATLAS xAOD Code generator? | true | +| `codegen.atlasxaod.image` | Code generator image | `sslhep/servicex_code_gen_func_adl_xaod` | +| `codegen.atlasxaod.pullPolicy` | | true | +| `codegen.atlasxaod.tag` | Code generator image tag | develop | +| `codegen.atlasxaod.defaultScienceContainerImage` | The transformer image that should be run against this generated code | `sslhep/servicex_func_adl_xaod_transformer` | +| `codegen.atlasxaod.defaultScienceContainerTag` | Tag for the transformer image that should be run against this generated code | develop | +|`codegen.uproot.enabled` | Deploy the uproot code generator? - also all of the code gen settings, above are available | true | +|`codegen.cms.enabled` | Deploy the CMS AOD code generator? - also all of the code gen settings, above are available | true | +|`codegen.python.enabled` | Deploy the python uproot code generator? - also all of the code gen settings, above are available | true | +| `x509Secrets.image` | X509 Secret Service image name | `sslhep/x509-secrets` | +| `x509Secrets.tag` | X509 Secret Service image tag | `latest` | +| `x509Secrets.pullPolicy` | X509 Secret Service image pull policy | `Always` | +| `x509Secrets.vomsOrg` | Which VOMS org to contact for proxy? | `atlas` | +| `x509Secrets.initImage` | X509 Secret Service init container image | `alpine:3.6` | +| `rbacEnabled` | Specify if rbac is enabled in your cluster | `true` | +| `hostMount` | Optional path to mount in transformers as /data | - | +| `gridAccount` | CERN User account name to access Rucio | - | +| `noCerts` | Set to true to disable x509 certs and only use open data | false | +| `rabbitmq.password` | Override the generated RabbitMQ password | leftfoot1 | +| `objectstore.enabled` | Deploy a minio object store with Servicex? | true | +| `objectstore.internal` | Deploy a captive minio instance with this chart? | true | +| `objectstore.publicURL` | What URL should the client use to download files? If set, this is given whether ingress is enabled or not | nil | +| `postgres.enabled` | Deploy a postgres database into cluster? If not, we use a sqllite db | false | +| `minio.auth.rootUser` | Username to log into minio | miniouser | +| `minio.auth.rootPassword` | Password key to log into minio | leftfoot1 | +| `minio.apiIngress.enabled` | Should minio chart deploy an ingress to the service? | false | +| `minio.apiIngress.hostname` | Hostname associate with ingress controller | nil | +| `transformer.cachePrefix` | Prefix string to stick in front of file paths. Useful for XCache | null | +| `transformer.preferredEndpoints` | a comma separated list of endpoints in order of preference. | null | +| `transformer.avoidedEndpoints` | a comma separated list of endpoints to avoid. | null | +| `transformer.autoscaler.enabled` | Enable/disable horizontal pod autoscaler for transformers | True | +| `transformer.autoscaler.cpuScaleThreshold` | CPU percentage threshold for pod scaling | 30 | +| `transformer.autoscaler.minReplicas` | Minimum number of transformer pods per request | 1 | +| `transformer.autoscaler.maxReplicas` | Maximum number of transformer pods per request | 20 | +| `transformer.pullPolicy` | Pull policy for transformer pods (Image name specified in REST Request) | Always | +| `transformer.priorityClassName` | priorityClassName for transformer pods (Not setting it means getting global default) | Not Set | +| `transformer.cpuLimit` | Set CPU resource limit for pod in number of cores | 1 | +| `transformer.sidecarImage` | Image name for the transformer sidecar container that hold the serviceX code | 'sslhep/servicex_sidecar_transformer' | +| `transformer.sidecarTag` | Tag for the sidecar container | 'develop' | +| `transformer.sidecarPullPolicy` | Pull Policy for the sidecar container | 'Always' | +| `transformer.persistence.existingClaim` | Existing persistent volume claim | nil | +| `transformer.subdir` | Subdirectory of the mount to write transformer results to (should end with trailing /) | nil | +| `minioCleanup.enabled` | Enable deployment of minio cleanup service | false | +| `minioCleanup.image` | Default image for minioCleanup cronjob | `sslhep/servicex_minio_cleanup` | +| `minioCleanup.tag` | minioCleanup image tag | | +| `minioCleanup.pullPolicy` | minioCleanup image pull policy | `Always` | +| `minioCleanup.threads` | Number of threads to use when processing S3 Storage | 6 | +| `minioCleanup.logLevel` | Log level to use for logging (e.g. DEBUG, INFO, WARN, ERROR, FATAL) | INFO | +| `minioCleanup.schedule` | Schedule for minioCleanup cronjob. See [reference](https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/#cron-schedule-syntax) for details on fields | `* */8 * * *` (every 8 hours) | +| `minioCleanup.maxAge` | Max age in days before removing results | 30 | +| `minioCleanup.maxSize` | Start removing buckets when total space used reaches this number (can use G,M, T suffixes) | '1G' | +| `minioCleanup.normSize` | Size at which to stop removing buckets | '700M' | diff --git a/helm/servicex/templates/app/configmap.yaml b/helm/servicex/templates/app/configmap.yaml index 03408bd3a..3f680cab0 100644 --- a/helm/servicex/templates/app/configmap.yaml +++ b/helm/servicex/templates/app/configmap.yaml @@ -86,6 +86,8 @@ data: TRANSFORMER_MANAGER_ENABLED = True TRANSFORMER_CACHE_PREFIX = {{ .Values.transformer.cachePrefix }} + TRANSFORMER_PREFERRED_ENDPOINTS = {{ .Values.transformer.preferredEndpoints }} + TRANSFORMER_AVOIDED_ENDPOINTS = {{ .Values.transformer.avoidedEndpoints }} TRANSFORMER_AUTOSCALE_ENABLED = {{- ternary "True" "False" .Values.transformer.autoscaler.enabled }} TRANSFORMER_CPU_LIMIT = {{ .Values.transformer.cpuLimit }} TRANSFORMER_CPU_SCALE_THRESHOLD = {{ .Values.transformer.autoscaler.cpuScaleThreshold }} diff --git a/helm/servicex/values.yaml b/helm/servicex/values.yaml index ff0b05000..7458c61a4 100644 --- a/helm/servicex/values.yaml +++ b/helm/servicex/values.yaml @@ -133,6 +133,16 @@ transformer: # Do not put root:// in these values. cachePrefix: null + # a comma separated list of endpoints in order of preference. + # endpoint can be a hostname or IP, prepended by a protocol or not + # eg. fax.mwt2.org, root://fax.mwt2.org, root://192.168.1.0 + preferredEndpoints: null + + # a comma separated list of endpoints to avoid. + # endpoint can be a hostname or IP, prepended by a protocol or not + # eg. fax.mwt2.org, root://fax.mwt2.org, root://192.168.1.0 + avoidedEndpoints: null + autoscaler: cpuScaleThreshold: 30 enabled: true @@ -146,7 +156,7 @@ transformer: scienceContainerPullPolicy: Always language: python - exec: # replace me + exec: # replace me outputDir: /servicex/output persistence: diff --git a/servicex_app/servicex/dataset_manager.py b/servicex_app/servicex/dataset_manager.py index 9c7b4042d..dbf299bc1 100644 --- a/servicex_app/servicex/dataset_manager.py +++ b/servicex_app/servicex/dataset_manager.py @@ -49,10 +49,12 @@ def __init__(self, dataset: Dataset, logger: Logger, db: SQLAlchemy): @classmethod def from_did(cls, did: DIDParser, logger: Logger, extras: dict[str, str] = None, db: SQLAlchemy = None): - dataset = Dataset.find_by_name(did.full_did) + # This removes wrongly implemented subset of file selection + clean_did = did.full_did.split('?')[0] + dataset = Dataset.find_by_name(clean_did) if not dataset: dataset = Dataset( - name=did.full_did, + name=clean_did, last_used=datetime.now(tz=timezone.utc), last_updated=datetime.fromtimestamp(0), lookup_status=DatasetStatus.created, @@ -64,7 +66,6 @@ def from_did(cls, did: DIDParser, logger: Logger, extras: dict[str, str] = None, else: logger.info(f"Found existing dataset: {dataset.name}, id is {dataset.id}", extra=extras) - return cls(dataset, logger, db) @classmethod diff --git a/servicex_app/servicex/transformer_manager.py b/servicex_app/servicex/transformer_manager.py index 8cabc0cdf..d85d7d6be 100644 --- a/servicex_app/servicex/transformer_manager.py +++ b/servicex_app/servicex/transformer_manager.py @@ -164,6 +164,16 @@ def create_job_object(request_id, image, rabbitmq_uri, workers, env += [client.V1EnvVar("CACHE_PREFIX", value=current_app.config["TRANSFORMER_CACHE_PREFIX"])] + # provide each pod with an environment var holding cache prefix path + if "TRANSFORMER_PREFERRED_ENDPOINTS" in current_app.config: + env += [client.V1EnvVar("PREFERRED_ENDPOINTS", + value=current_app.config["TRANSFORMER_PREFERRED_ENDPOINTS"])] + + # provide each pod with an environment var holding cache prefix path + if "TRANSFORMER_AVOIDED_ENDPOINTS" in current_app.config: + env += [client.V1EnvVar("AVOIDED_ENDPOINTS", + value=current_app.config["TRANSFORMER_AVOIDED_ENDPOINTS"])] + if result_destination == 'object-store': env = env + [ client.V1EnvVar(name='MINIO_URL', diff --git a/servicex_app/tests/test_dataset_manager.py b/servicex_app/tests/test_dataset_manager.py index 6268bce59..16f75497f 100644 --- a/servicex_app/tests/test_dataset_manager.py +++ b/servicex_app/tests/test_dataset_manager.py @@ -54,7 +54,8 @@ def client(self): def mock_dataset_cls(self, mocker): mock_dataset.save_to_db = mocker.Mock() - mock_dataset_cls = mocker.patch("servicex.dataset_manager.Dataset", return_value=mock_dataset("created", mocker)) + mock_dataset_cls = mocker.patch( + "servicex.dataset_manager.Dataset", return_value=mock_dataset("created", mocker)) mock_query = mocker.Mock(return_value=None) mock_dataset_cls.query.find_by_name = mock_query mock_dataset_cls.find_by_name.return_value = None @@ -70,18 +71,19 @@ def mock_dataset_file_cls(self, mocker): file_events="file_events" ) mock_dataset_file.save_to_db = mocker.Mock() - mock_dataset_cls = mocker.patch("servicex.dataset_manager.DatasetFile", return_value=mock_dataset_file) + mock_dataset_cls = mocker.patch( + "servicex.dataset_manager.DatasetFile", return_value=mock_dataset_file) return mock_dataset_cls def test_constructor(self, client): with client.application.app_context(): d = Dataset() - d.name = "rucio://my-did?files=1" + d.name = "rucio://my-did" dm = DatasetManager(dataset=d, logger=client.application.logger, db=db) assert dm.dataset == d def test_from_new_did(self, client): - did = "rucio://my-did?files=1" + did = "rucio://my-did" with client.application.app_context(): dm = DatasetManager.from_did(DIDParser(did), logger=client.application.logger, db=db) assert dm.dataset.name == did @@ -95,7 +97,7 @@ def test_from_new_did(self, client): assert d_copy.name == did def test_from_existing_did(self, client): - did = "rucio://my-did?files=1" + did = "rucio://my-did" with client.application.app_context(): d = Dataset(name=did, did_finder="rucio", lookup_status=DatasetStatus.looking, last_used=datetime.now(tz=timezone.utc), @@ -104,7 +106,7 @@ def test_from_existing_did(self, client): dm = DatasetManager.from_did(DIDParser(did), logger=client.application.logger, db=db) assert dm.dataset.name == did assert dm.dataset.did_finder == "rucio" - assert dm.dataset.lookup_status == DatasetStatus.looking + # assert dm.dataset.lookup_status == DatasetStatus.created assert dm.dataset.id == d.id def test_from_new_file_list(self, client): @@ -136,7 +138,7 @@ def test_from_existing_file_list(self, client): file_events=0, file_size=0 ) for file in file_list - ]) + ]) d.save_to_db() dm = DatasetManager.from_file_list(file_list, logger=client.application.logger, db=db) @@ -159,7 +161,7 @@ def test_from_dataset_id(self, client): file_events=0, file_size=0 ) for file in file_list - ]) + ]) d.save_to_db() dm = DatasetManager.from_dataset_id(d.id, logger=client.application.logger, db=db) assert dm.dataset.name == DatasetManager.file_list_hash(file_list) @@ -173,7 +175,7 @@ def test_from_dataset_id_not_found(self, client): def test_lookup_required(self, client): with client.application.app_context(): d = Dataset() - d.name = "rucio://my-did?files=1" + d.name = "rucio://my-did" d.lookup_status = DatasetStatus.created dm = DatasetManager(dataset=d, logger=client.application.logger, db=db) @@ -188,12 +190,12 @@ def test_lookup_required(self, client): def test_properties(self, client): with client.application.app_context(): d = Dataset() - d.name = "rucio://my-did?files=1" + d.name = "rucio://my-did" d.id = 42 d.lookup_status = DatasetStatus.created dm = DatasetManager(dataset=d, logger=client.application.logger, db=db) - assert dm.name == "rucio://my-did?files=1" + assert dm.name == "rucio://my-did" assert dm.id == 42 def test_file_list(self, client): @@ -213,13 +215,13 @@ def test_dataset_name_file_list(self, client): def test_dataset_name_did(self, client): with client.application.app_context(): - dm = DatasetManager.from_did(DIDParser("rucio://my-did?files=1"), + dm = DatasetManager.from_did(DIDParser("rucio://my-did"), logger=client.application.logger, db=db) - assert dm.name == "rucio://my-did?files=1" + assert dm.name == "rucio://my-did" def test_refresh(self, client): with client.application.app_context(): - dm = DatasetManager.from_did(DIDParser("rucio://my-did?files=1"), + dm = DatasetManager.from_did(DIDParser("rucio://my-did"), logger=client.application.logger, db=db) # To be fair, this test isn't really verifying the refresh method, since @@ -233,7 +235,7 @@ def test_refresh(self, client): def test_is_complete(self, client): with client.application.app_context(): d = Dataset() - d.name = "rucio://my-did?files=1" + d.name = "rucio://my-did" d.id = 42 d.lookup_status = DatasetStatus.created dm = DatasetManager(dataset=d, logger=client.application.logger, db=db) @@ -247,7 +249,7 @@ def test_is_complete(self, client): def test_submit_lookup_request(self, mocker, client): mock_rabbit = mocker.Mock() with client.application.app_context(): - d = DatasetManager.from_did(did=DIDParser("rucio://my-did?files=1"), + d = DatasetManager.from_did(did=DIDParser("rucio://my-did"), logger=client.application.logger, db=db) d.submit_lookup_request("http://hit-me/here", mock_rabbit) @@ -256,7 +258,7 @@ def test_submit_lookup_request(self, mocker, client): mock_rabbit.basic_publish.assert_called_with(exchange="", routing_key='rucio_did_requests', body='{"dataset_id": 1, ' - '"did": "my-did?files=1", ' + '"did": "my-did", ' '"endpoint": "http://hit-me/here"}') def test_publish_files(self, mocker, client): @@ -270,7 +272,8 @@ def test_publish_files(self, mocker, client): d = DatasetManager.from_file_list(file_list, logger=client.application.logger, db=db) d.publish_files(request=transform_request, lookup_result_processor=mock_processor) assert transform_request.files == 2 - mock_processor.add_files_to_processing_queue.assert_called_with(transform_request, files=d.dataset.files) + mock_processor.add_files_to_processing_queue.assert_called_with( + transform_request, files=d.dataset.files) def test_add_files(self, mocker, client): with client.application.app_context(): diff --git a/transformer_sidecar/src/transformer_sidecar/transformer.py b/transformer_sidecar/src/transformer_sidecar/transformer.py index 2538b9199..00f304b53 100644 --- a/transformer_sidecar/src/transformer_sidecar/transformer.py +++ b/transformer_sidecar/src/transformer_sidecar/transformer.py @@ -154,6 +154,43 @@ def prepend_xcache(file_paths): prefixed_paths.append(f'root://{prefix_list[pinned_xcache_index]}//{f}') return prefixed_paths + +def custom_path_sorting(file_paths): + + preferred = os.environ.get('PREFERRED_ENDPOINTS', '') + avoided = os.environ.get('AVOIDED_ENDPOINTS', '') + + if not preferred and not avoided: + return file_paths + + sorted_paths = [] + unsorted_paths = [] + preferred_list = [] + avoided_list = [] + + if preferred: + preferred_list = [p.strip() for p in preferred.split(',')] + + if avoided: + avoided_list = [p.strip() for p in avoided.split(',')] + + for f in file_paths: + skip = False + for av in avoided_list: + if av in f: + skip = True + if skip: + continue + pref = False + for pr in preferred_list: + if pr in f: + sorted_paths.append(f) + pref = True + if not pref: + unsorted_paths.append(f) + + return sorted_paths + unsorted_paths + # noinspection PyUnusedLocal @@ -199,6 +236,8 @@ def callback(channel, method, properties, body): else: _file_paths = transform_request['paths'].split(',') + # custom path sorting + _file_paths = custom_path_sorting(_file_paths) # adding cache prefix _file_paths = prepend_xcache(_file_paths) @@ -292,7 +331,7 @@ def callback(channel, method, properties, body): transform_success = True ts = { "requestId": _request_id, - "log_body": transformer_stats.log_body, + # "log_body": transformer_stats.log_body, "file-size": transformer_stats.file_size, "total-events": transformer_stats.total_events, "place": PLACE diff --git a/transformer_sidecar/src/transformer_sidecar/transformer_stats/__init__.py b/transformer_sidecar/src/transformer_sidecar/transformer_stats/__init__.py index ba1c5c5a3..8ec0be511 100644 --- a/transformer_sidecar/src/transformer_sidecar/transformer_stats/__init__.py +++ b/transformer_sidecar/src/transformer_sidecar/transformer_stats/__init__.py @@ -32,9 +32,11 @@ class TransformerStats(ABC): def __init__(self, log_path: Path): - with open(log_path) as log: - self.log_body = log.read() - + if log_path.exists(): + with open(log_path, encoding="utf8", errors='ignore') as log: + self.log_body = log.read() + else: + print("File does not exist:", log_path) self.total_events = 0 self.file_size = 0 self.error_info = "Unable to determine error cause. Please consult log files"