-
Notifications
You must be signed in to change notification settings - Fork 107
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement logic to remake input data placement upon site list changes #12155
Conversation
@amaltaro , this is initial logic based on provided requirements. I would appreciate if you will reviewed and let me know if it has expected behavior. In particular, I need to know decision about persistent storage and overview of acknowledged responses to upstream caller. Once we settle on this the rest would be implementation of site update/rules only. |
Jenkins results:
|
Jenkins results:
|
Jenkins results:
|
Jenkins results:
|
Jenkins results:
|
test this please |
Jenkins results:
|
test this please |
Jenkins results:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Valentin, despite not covering 100% of your changes, I left some comments along the code.
In addition, for dealing with persisted information in the filesystem. If we decide to keep writing a file per workflow, we then need to implement:
- deleting that file once data replacement has been successful
- listing all files pending for data replacement
In my opinion, filesystem will provide only the workflow name that needs replacement. We then fetch the workflow from ReqMgr2 (similar to what is done by getRequestRecords()
) and let it go through the service.
@@ -195,6 +202,13 @@ def execute(self, reqStatus): | |||
self.logger.info("%d requests information completely processed.", len(reqResults)) | |||
|
|||
for wflow in reqResults: | |||
# perform site list updates |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this code has to be placed outside of this for loop (L197). Otherwise it will only get executed when there is other workflows in the queue for data placement (workflows sitting in assigned
).
@@ -195,6 +202,13 @@ def execute(self, reqStatus): | |||
self.logger.info("%d requests information completely processed.", len(reqResults)) | |||
|
|||
for wflow in reqResults: | |||
# perform site list updates | |||
errors = self._updateSites(wflow) | |||
if len(errors) == 0: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In practice, you are overwriting this metric with the very last workflow outcome.
Instead, the way it has been used so far is to provide a summary of the microservice execution cycle.
Said that, my suggestion would be to define it to an integer number saying how many workflows (count) have been re-placed.
""" | ||
Update sites API provides asynchronous update of Site info. | ||
|
||
:param doc: JSON payload with the following data structures: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the source code, it looks like we only save the workflow name. I think that is correct, but we then need to update this docstring.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We receive this record {'workflow': <wflow name>, 'SiteWhiteList' ['T1', ...], 'SiteBlackList': ['T2',...]}
from upstream and this is what is saved into a file with workflow name as a file name. This allows to keep site lists when we need to run business logic and avoid extra calls to upstream service.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on the latest discussions, you probably want to receive only the workflow name as payload in the REST API.
@amaltaro I'm not sure I understand what you are saying. This APIs is used by REST end-point and upstream client. The client must send workflow name along with new site lists. Otherwise, the whole process will never know which sites should be changed (either added or removed from existing workflow). Please elaborate more on your comment here.
:return: acknowledge dict to upstream caller (ReqMgr2) | ||
""" | ||
# preserve provided payload to local file system | ||
errors = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this API is supposed to receive a single workflow per HTTP call (and I would say this is what we should implement), then we should convert errors from list to string type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, modified.
# send acknowledged message back to upstream caller | ||
resp = {'status': 'ok'} | ||
if len(errors) != 0: | ||
resp = {'status': 'fail', 'errors': errors} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest to use the same string as we use in CouchDB, just so we keep error strings as consistent as possible. Please check out the CMSCouch.py module, which I believe sets the non ok
answer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This API is used by HTTP end-point to return to upstream caller. Please clearly define how HTTP end-point should behave both in success and failure mode? In other words, if this API succeed, what it should return, a code , nothing? And if it fails what it should return to upstream code, a string? How error can be defined in upstream code from a return value of this API?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And, on a separate note, why MSTransferor or in this sense any MS service should be complaint how CMSCouch return errors? I'm not criticizing but rather trying to understand. Bottom line, I'm asking how any MS service should return the success and failures? Is it standardized across all MS services?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was probably mixing things up and ended up thinking that this data structure was written to couchdb, hence reporting any potential errors from the backend database back to the user.
Seeing that I was wrong, I would suggest you to look into MSPileup (or perhaps pick a different MS service) to see how the server responds back to the client, which data format and content is returned. Just so we try to keep services as consistent as possible.
data = json.load(istream.read()) | ||
return data | ||
|
||
def _updateSites(self, wflow): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would remove all this code and rely on what has already been implemented in MSTransferor, hence, just let the workflow go through the standard algorithm.
When removing this module, please do not squash commits though. Just in case I am missing any detail that would make that not possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure why I need to remove it since it is a business logic of requested feature. How standard algorithm will execute a logic which is not there? So far, the default algorithm does not deal with sites in white/black lists? I don't understand what you require to do here. Please elaborate more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is why I am suggesting to have only a list of workflows that need dedicated data placement (instead of having the site lists a well).
You will, of course, have to modify the standard algorithm such that it can also considers a list of workflow(s) that is retrieved from somewhere else. Other than that, the rest of the logic is already implemented and there is no need to have all this code duplication.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@amaltaro , I still don't understand what you are asking for. According to original issue we need to implement the following logic (which is what _updateSites
function is doing):
- if a site was added to the SiteBlacklist, that site needs to be removed from the rule ids RSE expression
- if a site was added to the SiteWhitelist, that site needs to be added to the rule ids RSE expression
- the new rule ids need to be persisted in the transfer document, replacing those rules that are no longer valid.
And, standard algorithm (which is called in this module as def execute
) does not do that, it makes a transfer request. If we not agree how this should be done please let me know your time availability on zoom that we can chat about it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed yesterday, this whole method is probably no longer relevant.
@amaltaro this method is still required as it performs business logic, i.e. it reads from persistent storage new workflow JSON, compares its site lists with existing wf, and update workflow accordingly.
@amaltaro , I asked few questions about your comments and I'm not sure you saw them, but in order to proceed with this PR I need your response. Please have a look along the PR threads where I posted my questions. |
@amaltaro , this is kind ping that in order to move forward I'm awaiting response on my questions in this PRs. |
@vkuznet you need to refresh the review request through the "Reviewers" option on the top right side, otherwise I cannot see it in the GitHub filters. In addition, please update the title of this PR and if needed amend the commit message as well. |
Alan, this is not review request since I didn't made any changes, and rather it is request to answer my questions in order for me to proceed. Since I didn't update any code I though I should not request a review. Please see my questions in open threads and reply to each of them directly within a thread. |
@amaltaro , as I'm switching my time to Cornell tasks, I made all necessary changes in this PR to accommodate newly discussed logic. I would appreciate if you'll make a partial review of my changes before Monday that I'll get clear idea if provided changes correspond to suggested logic. In particular, I made the following changes:
I may miss something but I think at a core the functionality is in place. Please have a look at logic and I'll work later on polishing the code. |
Jenkins results:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vkuznet please see some comments along the code.
I think we are in the right direction here. I also wanted to say that when creating (updating?) the transfer document in CouchDB, we ideally could remove the previous rule IDs that have been overwritten in favor of the new ones. Code is around this line:
if isinstance(transferId, (set, list)): |
# make decision about current workflow, if it is new request we'll create | ||
# new replication rule, otherwise we'll move replication rule | ||
if wflow.updatedWorkflow: | ||
rules = self.rucio.listDataRules(wflow.getName()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line is in practice asking Rucio for rules for a given workflow, which won't work.
Instead, I suggest you to send a GET request to retrieve the transfer document (created and stored by WM) from MSTransferor. Example API and data structure is:
https://cmsweb.cern.ch/ms-transferor/data/info?request=cmsunified_HARVEST-Run2022E_ZeroBias_30May2023v2_240123_140316_7935
Hence, you need to consume transferDoc
-> transfers
-> transferIDs
, keeping in mind that there could be more than one element under the transfers
field (even though in practice I think it will be a single one).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, added new method to fetch transfer IDs.
for rid in rdoc['ruleIds']: | ||
# the moveReplcationRule will raise different exceptions | ||
# if move operation is not normal | ||
self.rucio.moveReplicationRule(rid, rseExpr, **ruleAttrs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like you should pass only the rule id and rse expression to this call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
""" | ||
Update sites API provides asynchronous update of Site info. | ||
|
||
:param doc: JSON payload with the following data structures: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on the latest discussions, you probably want to receive only the workflow name as payload in the REST API.
@amaltaro I'm not sure I understand what you are saying. This APIs is used by REST end-point and upstream client. The client must send workflow name along with new site lists. Otherwise, the whole process will never know which sites should be changed (either added or removed from existing workflow). Please elaborate more on your comment here.
data = json.load(istream.read()) | ||
return data | ||
|
||
def _updateSites(self, wflow): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed yesterday, this whole method is probably no longer relevant.
@amaltaro this method is still required as it performs business logic, i.e. it reads from persistent storage new workflow JSON, compares its site lists with existing wf, and update workflow accordingly.
@@ -454,6 +454,27 @@ def closeBlockContainer(self, name, scope='cms'): | |||
self.logger.error("Exception closing container/block: %s. Error: %s", name, str(ex)) | |||
return response | |||
|
|||
def moveReplicationRule(self, ruleId, rseExpression, scope='cms', **kwargs): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand you only wanted logic-related review, but FYI, this method needs some corrections.
@@ -33,6 +35,14 @@ | |||
from WMCore.Services.Rucio.RucioUtils import GROUPING_ALL | |||
|
|||
|
|||
def errorString(reason, data, result, status=None)): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see it used and still don't have the context, so just leaving this message here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, removed
test this please |
1 similar comment
test this please |
Jenkins results:
|
Jenkins results:
|
Jenkins results:
|
Jenkins results:
|
Jenkins results:
|
test this please |
Jenkins results:
|
Jenkins results:
|
Jenkins results:
|
Jenkins results:
|
Jenkins results:
|
Unit tests now fixed. |
@amaltaro , @mapellidario please proceed with testing, unit tests related to this PR are tested. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vkuznet please find some feedback along the code.
""" | ||
ruleIds = [] | ||
try: | ||
rid = self.cli.move_replication_rule(ruleId, rseExpression, issuer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this API accept move in bulk (many rule ids to be moved in a single HTTP call)? If so, I would suggest to allow ruleId to be a list (so, rename it to ruleIds or rulesList, or else).
You might want to add a pointer to the Rucio API documentation in the last line of the docstring as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the rucio move_replication_rule does not deal with bulk operations
src/python/WMCore/MicroService/MSTransferor/MSTransferorError.py
Outdated
Show resolved
Hide resolved
Jenkins results:
|
Jenkins results:
|
Jenkins results:
|
@amaltaro , I made all necessary adjustments to this PR and it is ready for final review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without a broader knowledge of the wmcore system, i can only be slightly better than pylint. I added a couple of comments along the code, but those are not blockers. i approve the PR
Jenkins results:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vkuznet almost there, just a few minor but interesting comments to be resolved. Please find them along the code.
Jenkins results:
|
Jenkins results:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vkuznet these changes are looking good to me, thanks.
Can you please squash these commits accordingly? Thanks
9ac6f10
to
34dff4c
Compare
Jenkins results:
|
34dff4c
to
f8c5ed9
Compare
Jenkins results:
|
@amaltaro , I squashed commits and tested again Jenkins. This PR is ready for merging. |
Awesome, thanks Valentin! |
Fixes #12040
Status
In development
Description
Introduce new logic to update sites and associated rules:
/ms-transferor/data/transferor
updateSites
API to handler POST request and return status to upstream caller (ReqMgr2){"workflow": <name>}
updateStorage
,cleanupStorage
andcheckDataReplacement
to perform IO operations for provided JSON payload and handle its persistent storage. So far these APIs rely on usage of local file system where it store JSON as file whose name is workflow name. If we will decide to use other storage, e.g. database only these two APIs will need a change to perform IO operationsHere is full logic of data flow
updateSites
API if necessaryupdateSites
API performs the following logic:execute
algorithm to read workflows from persistent storage (local file system) and setdataReplacement
workflow flag. If flag is set we perform move replicas rules operation otherwise we follow standard create replication rule logic.Is it backward compatible (if not, which system it affects?)
YES
Related PRs
We introduced new configuration parameter
persistentArea
and therefore it is required that we update this service configuration with it. The appropriate service_configs PR can be found over here: https://gitlab.cern.ch/cmsweb-k8s/services_config/-/merge_requests/333We also updated wmcore-docs documentation with appropriate changes about data replacement logic, please see https://gitlab.cern.ch/dmwm/wmcore-docs/-/merge_requests/68
External dependencies / deployment changes
<Does it require deployment changes? Does it rely on third-party libraries?>