Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

766 replica ordering #767

Closed
wants to merge 17 commits into from
1 change: 1 addition & 0 deletions did_finder_rucio/scripts/did_finder.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def run_rucio_finder():
logger.info('Starting rucio DID finder')

async def callback(did_name, info):
did_name = did_name.split('?')[0]
lookup_request = LookupRequest(
did=did_name,
rucio_adapter=rucio_adapter,
Expand Down
204 changes: 102 additions & 102 deletions docs/deployment/reference.md

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions helm/servicex/templates/app/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ data:
TRANSFORMER_MANAGER_ENABLED = True

TRANSFORMER_CACHE_PREFIX = {{ .Values.transformer.cachePrefix }}
TRANSFORMER_PREFERRED_ENDPOINTS = {{ .Values.transformer.preferredEndpoints }}
TRANSFORMER_AVOIDED_ENDPOINTS = {{ .Values.transformer.avoidedEndpoints }}
TRANSFORMER_AUTOSCALE_ENABLED = {{- ternary "True" "False" .Values.transformer.autoscaler.enabled }}
TRANSFORMER_CPU_LIMIT = {{ .Values.transformer.cpuLimit }}
TRANSFORMER_CPU_SCALE_THRESHOLD = {{ .Values.transformer.autoscaler.cpuScaleThreshold }}
Expand Down
12 changes: 11 additions & 1 deletion helm/servicex/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,16 @@ transformer:
# Do not put root:// in these values.
cachePrefix: null

# a comma separated list of endpoints in order of preference.
# endpoint can be a hostname or IP, prepended by a protocol or not
# eg. fax.mwt2.org, root://fax.mwt2.org, root://192.168.1.0
preferredEndpoints: null

# a comma separated list of endpoints to avoid.
# endpoint can be a hostname or IP, prepended by a protocol or not
# eg. fax.mwt2.org, root://fax.mwt2.org, root://192.168.1.0
avoidedEndpoints: null

autoscaler:
cpuScaleThreshold: 30
enabled: true
Expand All @@ -146,7 +156,7 @@ transformer:
scienceContainerPullPolicy: Always

language: python
exec: # replace me
exec: # replace me
outputDir: /servicex/output

persistence:
Expand Down
7 changes: 4 additions & 3 deletions servicex_app/servicex/dataset_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,12 @@ def __init__(self, dataset: Dataset, logger: Logger, db: SQLAlchemy):
@classmethod
def from_did(cls, did: DIDParser, logger: Logger, extras: dict[str, str] = None,
db: SQLAlchemy = None):
dataset = Dataset.find_by_name(did.full_did)
# This removes wrongly implemented subset of file selection
clean_did = did.full_did.split('?')[0]
dataset = Dataset.find_by_name(clean_did)
if not dataset:
dataset = Dataset(
name=did.full_did,
name=clean_did,
last_used=datetime.now(tz=timezone.utc),
last_updated=datetime.fromtimestamp(0),
lookup_status=DatasetStatus.created,
Expand All @@ -64,7 +66,6 @@ def from_did(cls, did: DIDParser, logger: Logger, extras: dict[str, str] = None,
else:
logger.info(f"Found existing dataset: {dataset.name}, id is {dataset.id}",
extra=extras)

return cls(dataset, logger, db)

@classmethod
Expand Down
10 changes: 10 additions & 0 deletions servicex_app/servicex/transformer_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,16 @@ def create_job_object(request_id, image, rabbitmq_uri, workers,
env += [client.V1EnvVar("CACHE_PREFIX",
value=current_app.config["TRANSFORMER_CACHE_PREFIX"])]

# provide each pod with an environment var holding cache prefix path
if "TRANSFORMER_PREFERRED_ENDPOINTS" in current_app.config:
env += [client.V1EnvVar("PREFERRED_ENDPOINTS",
value=current_app.config["TRANSFORMER_PREFERRED_ENDPOINTS"])]

# provide each pod with an environment var holding cache prefix path
if "TRANSFORMER_AVOIDED_ENDPOINTS" in current_app.config:
env += [client.V1EnvVar("AVOIDED_ENDPOINTS",
value=current_app.config["TRANSFORMER_AVOIDED_ENDPOINTS"])]

if result_destination == 'object-store':
env = env + [
client.V1EnvVar(name='MINIO_URL',
Expand Down
39 changes: 21 additions & 18 deletions servicex_app/tests/test_dataset_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ def client(self):
def mock_dataset_cls(self, mocker):

mock_dataset.save_to_db = mocker.Mock()
mock_dataset_cls = mocker.patch("servicex.dataset_manager.Dataset", return_value=mock_dataset("created", mocker))
mock_dataset_cls = mocker.patch(
"servicex.dataset_manager.Dataset", return_value=mock_dataset("created", mocker))
mock_query = mocker.Mock(return_value=None)
mock_dataset_cls.query.find_by_name = mock_query
mock_dataset_cls.find_by_name.return_value = None
Expand All @@ -70,18 +71,19 @@ def mock_dataset_file_cls(self, mocker):
file_events="file_events"
)
mock_dataset_file.save_to_db = mocker.Mock()
mock_dataset_cls = mocker.patch("servicex.dataset_manager.DatasetFile", return_value=mock_dataset_file)
mock_dataset_cls = mocker.patch(
"servicex.dataset_manager.DatasetFile", return_value=mock_dataset_file)
return mock_dataset_cls

def test_constructor(self, client):
with client.application.app_context():
d = Dataset()
d.name = "rucio://my-did?files=1"
d.name = "rucio://my-did"
dm = DatasetManager(dataset=d, logger=client.application.logger, db=db)
assert dm.dataset == d

def test_from_new_did(self, client):
did = "rucio://my-did?files=1"
did = "rucio://my-did"
with client.application.app_context():
dm = DatasetManager.from_did(DIDParser(did), logger=client.application.logger, db=db)
assert dm.dataset.name == did
Expand All @@ -95,7 +97,7 @@ def test_from_new_did(self, client):
assert d_copy.name == did

def test_from_existing_did(self, client):
did = "rucio://my-did?files=1"
did = "rucio://my-did"
with client.application.app_context():
d = Dataset(name=did, did_finder="rucio", lookup_status=DatasetStatus.looking,
last_used=datetime.now(tz=timezone.utc),
Expand All @@ -104,7 +106,7 @@ def test_from_existing_did(self, client):
dm = DatasetManager.from_did(DIDParser(did), logger=client.application.logger, db=db)
assert dm.dataset.name == did
assert dm.dataset.did_finder == "rucio"
assert dm.dataset.lookup_status == DatasetStatus.looking
# assert dm.dataset.lookup_status == DatasetStatus.created
assert dm.dataset.id == d.id

def test_from_new_file_list(self, client):
Expand Down Expand Up @@ -136,7 +138,7 @@ def test_from_existing_file_list(self, client):
file_events=0,
file_size=0
) for file in file_list
])
])
d.save_to_db()
dm = DatasetManager.from_file_list(file_list,
logger=client.application.logger, db=db)
Expand All @@ -159,7 +161,7 @@ def test_from_dataset_id(self, client):
file_events=0,
file_size=0
) for file in file_list
])
])
d.save_to_db()
dm = DatasetManager.from_dataset_id(d.id, logger=client.application.logger, db=db)
assert dm.dataset.name == DatasetManager.file_list_hash(file_list)
Expand All @@ -173,7 +175,7 @@ def test_from_dataset_id_not_found(self, client):
def test_lookup_required(self, client):
with client.application.app_context():
d = Dataset()
d.name = "rucio://my-did?files=1"
d.name = "rucio://my-did"
d.lookup_status = DatasetStatus.created
dm = DatasetManager(dataset=d, logger=client.application.logger, db=db)

Expand All @@ -188,12 +190,12 @@ def test_lookup_required(self, client):
def test_properties(self, client):
with client.application.app_context():
d = Dataset()
d.name = "rucio://my-did?files=1"
d.name = "rucio://my-did"
d.id = 42
d.lookup_status = DatasetStatus.created
dm = DatasetManager(dataset=d, logger=client.application.logger, db=db)

assert dm.name == "rucio://my-did?files=1"
assert dm.name == "rucio://my-did"
assert dm.id == 42

def test_file_list(self, client):
Expand All @@ -213,13 +215,13 @@ def test_dataset_name_file_list(self, client):

def test_dataset_name_did(self, client):
with client.application.app_context():
dm = DatasetManager.from_did(DIDParser("rucio://my-did?files=1"),
dm = DatasetManager.from_did(DIDParser("rucio://my-did"),
logger=client.application.logger, db=db)
assert dm.name == "rucio://my-did?files=1"
assert dm.name == "rucio://my-did"

def test_refresh(self, client):
with client.application.app_context():
dm = DatasetManager.from_did(DIDParser("rucio://my-did?files=1"),
dm = DatasetManager.from_did(DIDParser("rucio://my-did"),
logger=client.application.logger, db=db)

# To be fair, this test isn't really verifying the refresh method, since
Expand All @@ -233,7 +235,7 @@ def test_refresh(self, client):
def test_is_complete(self, client):
with client.application.app_context():
d = Dataset()
d.name = "rucio://my-did?files=1"
d.name = "rucio://my-did"
d.id = 42
d.lookup_status = DatasetStatus.created
dm = DatasetManager(dataset=d, logger=client.application.logger, db=db)
Expand All @@ -247,7 +249,7 @@ def test_is_complete(self, client):
def test_submit_lookup_request(self, mocker, client):
mock_rabbit = mocker.Mock()
with client.application.app_context():
d = DatasetManager.from_did(did=DIDParser("rucio://my-did?files=1"),
d = DatasetManager.from_did(did=DIDParser("rucio://my-did"),
logger=client.application.logger, db=db)
d.submit_lookup_request("http://hit-me/here", mock_rabbit)

Expand All @@ -256,7 +258,7 @@ def test_submit_lookup_request(self, mocker, client):
mock_rabbit.basic_publish.assert_called_with(exchange="",
routing_key='rucio_did_requests',
body='{"dataset_id": 1, '
'"did": "my-did?files=1", '
'"did": "my-did", '
'"endpoint": "http://hit-me/here"}')

def test_publish_files(self, mocker, client):
Expand All @@ -270,7 +272,8 @@ def test_publish_files(self, mocker, client):
d = DatasetManager.from_file_list(file_list, logger=client.application.logger, db=db)
d.publish_files(request=transform_request, lookup_result_processor=mock_processor)
assert transform_request.files == 2
mock_processor.add_files_to_processing_queue.assert_called_with(transform_request, files=d.dataset.files)
mock_processor.add_files_to_processing_queue.assert_called_with(
transform_request, files=d.dataset.files)

def test_add_files(self, mocker, client):
with client.application.app_context():
Expand Down
41 changes: 40 additions & 1 deletion transformer_sidecar/src/transformer_sidecar/transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,43 @@ def prepend_xcache(file_paths):
prefixed_paths.append(f'root://{prefix_list[pinned_xcache_index]}//{f}')
return prefixed_paths


def custom_path_sorting(file_paths):

preferred = os.environ.get('PREFERRED_ENDPOINTS', '')
avoided = os.environ.get('AVOIDED_ENDPOINTS', '')

if not preferred and not avoided:
return file_paths

sorted_paths = []
unsorted_paths = []
preferred_list = []
avoided_list = []

if preferred:
preferred_list = [p.strip() for p in preferred.split(',')]

if avoided:
avoided_list = [p.strip() for p in avoided.split(',')]

for f in file_paths:
skip = False
for av in avoided_list:
if av in f:
skip = True
if skip:
continue
pref = False
for pr in preferred_list:
if pr in f:
sorted_paths.append(f)
pref = True
if not pref:
unsorted_paths.append(f)

return sorted_paths + unsorted_paths

# noinspection PyUnusedLocal


Expand Down Expand Up @@ -199,6 +236,8 @@ def callback(channel, method, properties, body):
else:
_file_paths = transform_request['paths'].split(',')

# custom path sorting
_file_paths = custom_path_sorting(_file_paths)
# adding cache prefix
_file_paths = prepend_xcache(_file_paths)

Expand Down Expand Up @@ -292,7 +331,7 @@ def callback(channel, method, properties, body):
transform_success = True
ts = {
"requestId": _request_id,
"log_body": transformer_stats.log_body,
# "log_body": transformer_stats.log_body,
"file-size": transformer_stats.file_size,
"total-events": transformer_stats.total_events,
"place": PLACE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@

class TransformerStats(ABC):
def __init__(self, log_path: Path):
with open(log_path) as log:
self.log_body = log.read()

if log_path.exists():
with open(log_path, encoding="utf8", errors='ignore') as log:
self.log_body = log.read()
else:
print("File does not exist:", log_path)
self.total_events = 0
self.file_size = 0
self.error_info = "Unable to determine error cause. Please consult log files"
Loading