Skip to content

Commit

Permalink
Merge pull request #11184 from amaltaro/bundle-204patch1
Browse files Browse the repository at this point in the history
Bundle of patches for WMAgent 2.0.4.patch1
  • Loading branch information
amaltaro authored Jun 16, 2022
2 parents ed34623 + 9931e7a commit 4355e8a
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 57 deletions.
4 changes: 2 additions & 2 deletions src/couchapps/WorkQueue/filters/queueFilter.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
function(doc, req) {
if (doc._deleted){
if (doc._deleted){
return false;
}

Expand All @@ -8,4 +8,4 @@ function(doc, req) {
return (ele['ChildQueueUrl'] === req.query.childUrl && ele['ParentQueueUrl'] === req.query.parentUrl);
}
return false;
}
}
21 changes: 14 additions & 7 deletions src/python/WMComponent/AgentStatusWatcher/AgentStatusPoller.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,14 @@ def __init__(self, config):
self.workqueueDS = WorkQueueDS(localWQUrl)

def setUpCouchDBReplication(self):
"""
This method will delete the current replication documents and
fresh new ones will be created.
:return: None
"""
# delete old replicator docs before setting up fresh ones
resp = self.localCouchMonitor.deleteReplicatorDocs()
logging.info("Deleted old replication documents and the response was: %s", resp)

self.replicatorDocs = []
# set up common replication code
Expand All @@ -96,14 +104,13 @@ def setUpCouchDBReplication(self):
self.replicatorDocs.append({'source': localQInboxURL, 'target': parentQURL,
'filter': wqfilter, 'query_params': query_params})

# delete old replicator docs before setting up
self.localCouchMonitor.deleteReplicatorDocs()

logging.info("Going to create %d new replication documents", len(self.replicatorDocs))
for rp in self.replicatorDocs:
self.localCouchMonitor.couchServer.replicate(
rp['source'], rp['target'], filter=rp['filter'],
query_params=rp.get('query_params', False),
continuous=True)
resp = self.localCouchMonitor.couchServer.replicate(rp['source'], rp['target'],
continuous=True,
filter=rp['filter'],
query_params=rp.get('query_params', False))
logging.info(".. response for the replication document creation was: %s", resp)

def setup(self, parameters):
"""
Expand Down
24 changes: 17 additions & 7 deletions src/python/WMComponent/DBS3Buffer/DBSUploadPoller.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,25 +99,35 @@ def uploadWorker(workInput, results, dbsUrl, gzipEncoding=False):
logging.warning("Block %s already exists. Marking it as uploaded.", name)
logging.debug("Exception: %s", exString)
results.put({'name': name, 'success': "uploaded"})
elif 'Proxy Error' in exString:
# This is probably a successfully insertion that went bad.
# Put it on the check list
msg = "Got a proxy error for block %s." % name
logging.warning(msg)
results.put({'name': name, 'success': "check"})
elif 'Missing data when inserting to dataset_parents' in exString:
msg = "Parent dataset is not inserted yet for block %s." % name
logging.warning(msg)
results.put({'name': name, 'success': "error", 'error': msg})
else:
msg = "Error trying to process block %s through DBS. Error: %s" % (name, exString)
reason = parseDBSException(exString)
msg = "Error trying to process block %s through DBS. Error: %s" % (name, reason)
logging.exception(msg)
logging.debug("block info: %s \n", block)
results.put({'name': name, 'success': "error", 'error': msg})

return


def parseDBSException(exBodyString):
"""
parse DBS Go-based server exception
:param exBodyString: exception message body string (not exception).
The upstream code extract HTTP body from exception object and pass it here.
:return: either (parsed) concise exception message or original body string
"""
try:
data = json.loads(exBodyString)
# dbs2go always return a list
return data[0]['error']['reason']
except:
return exBodyString


def isPassiveError(exceptionObj):
"""
This function will parse the exception object and report whether
Expand Down
7 changes: 4 additions & 3 deletions src/python/WMComponent/DBS3Buffer/MySQL/NewSubscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ def _createPhEDExSubBinds(self, datasetID, subscriptionInfo, custodialFlag):

# DeleteFromSource is not supported for move subscriptions
delete_blocks = None
# if None, force it to 0 as per the table schema
dataLifetime = subscriptionInfo.get('DatasetLifetime', 0) or 0
if custodialFlag:
sites = subscriptionInfo['CustodialSites']
phedex_group = subscriptionInfo['CustodialGroup']
Expand All @@ -59,9 +61,8 @@ def _createPhEDExSubBinds(self, datasetID, subscriptionInfo, custodialFlag):
'move': isMove,
'priority': subscriptionInfo['Priority'],
'phedex_group': phedex_group,
'delete_blocks': delete_blocks}
if subscriptionInfo['DatasetLifetime'] is not None:
bind.update(dict(dataset_lifetime=subscriptionInfo['DatasetLifetime']))
'delete_blocks': delete_blocks,
'dataset_lifetime': dataLifetime}
binds.append(bind)
return binds

Expand Down
86 changes: 61 additions & 25 deletions src/python/WMCore/Database/CMSCouch.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,9 @@ def makeRequest(self, uri=None, data=None, type='GET', incoming_headers=None,
encode, decode, contentType)
except HTTPException as e:
self.checkForCouchError(getattr(e, "status", None),
getattr(e, "reason", None), data)
getattr(e, "reason", None),
data,
getattr(e, "result", None))

return result

Expand All @@ -157,6 +159,8 @@ def checkForCouchError(self, status, reason, data=None, result=None):
raise CouchNotAcceptableError(reason, data, result)
elif status == 409:
raise CouchConflictError(reason, data, result)
elif status == 410:
raise CouchFeatureGone(reason, data, result)
elif status == 412:
raise CouchPreconditionFailedError(reason, data, result)
elif status == 416:
Expand Down Expand Up @@ -882,7 +886,7 @@ def _expire(self):
now = datetime.now()
then = now - self.timing['expire']

options = {'startkey': 0, 'endkey': time.mktime(then.timetuple())}
options = {'startkey': 0, 'endkey': int(time.mktime(then.timetuple()))}
expired = self._find_dbs_in_state('archived', options)
for db in expired:
try:
Expand All @@ -894,14 +898,25 @@ def _expire(self):
self.seed_db.queueDelete(db_state)
self.seed_db.commit()

def _create_design_doc(self):
"""Create a design doc with a view for the rotate state"""
tempDesignDoc = {'views': {
'rotateState': {
'map': "function(doc) {emit(doc.timestamp, doc.rotate_state, doc._id);}"
},
}
}
self.seed_db.put('/%s/_design/TempDesignDoc' % self.seed_db.name, tempDesignDoc)

def _find_dbs_in_state(self, state, options=None):
"""Creates a design document with a single (temporary) view in it"""
options = options or {}
# TODO: couchapp this, how to make sure that the app is deployed?
find = {'map': "function(doc) {if(doc.rotate_state == '%s') {emit(doc.timestamp, doc._id);}}" % state}
uri = '/%s/_temp_view' % self.seed_db.name
if options:
uri += '?%s' % urllib.parse.urlencode(options)
data = self.seed_db.post(uri, find)
if self.seed_db.documentExists("_design/TempDesignDoc"):
logging.info("Skipping designDoc creation because it already exists!")
else:
self._create_design_doc()

data = self.seed_db.loadView("TempDesignDoc", "rotateState", options=options)
return data['rows']

def inactive_dbs(self):
Expand Down Expand Up @@ -997,9 +1012,13 @@ def createDatabase(self, dbname, size=1000):
return Database(dbname=dbname, url=self.url, size=size, ckey=self.ckey, cert=self.cert)

def deleteDatabase(self, dbname):
"Delete a database from the server"
"""Delete a database from the server"""
check_name(dbname)
dbname = urllib.parse.quote_plus(dbname)
if "cmsweb" in self.url:
msg = f"You can't be serious that you want to delete a PROODUCTION database!!! "
msg += f"At url: {self.url}, for database name: {dbname}. Bailing out!"
raise RuntimeError(msg)
return self.delete("/%s" % dbname)

def connectDatabase(self, dbname='database', create=True, size=1000):
Expand All @@ -1014,33 +1033,40 @@ def connectDatabase(self, dbname='database', create=True, size=1000):

def replicate(self, source, destination, continuous=False,
create_target=False, cancel=False, doc_ids=False,
filter=False, query_params=False):
"""Trigger replication between source and destination. Options are as
described in http://wiki.apache.org/couchdb/Replication, in summary:
continuous = bool, trigger continuous replication
create_target = bool, implicitly create the target database
cancel = bool, stop continuous replication
doc_ids = list, id's of specific documents you want to replicate
filter = string, name of the filter function you want to apply to
the replication, the function should be defined in a design
document in the source database.
query_params = dictionary of parameters to pass into the filter
function
filter=False, query_params=False, sleepSecs=0):
"""
Trigger replication between source and destination. CouchDB options are
defined in: https://docs.couchdb.org/en/3.1.2/api/server/common.html#replicate
with further details in: https://docs.couchdb.org/en/stable/replication/replicator.html
Source and destination need to be appropriately urlquoted after the port
number. E.g. if you have a database with /'s in the name you need to
convert them into %2F's.
TODO: Improve source/destination handling - can't simply URL quote,
though, would need to decompose the URL and rebuild it.
"""
if source not in self.listDatabases():
:param source: string with the source url to replicate data from
:param destination: string with the destination url to replicate data to
:param continuous: boolean to perform a continuous replication or not
:param create_target: boolean to create the target database, if non-existent
:param cancel: boolean to stop a replication (but we better just delete the doc!)
:param doc_ids: a list of specific doc ids that we would like to replicate
:param filter: string with the name of the filter function to be used. Note that
this filter is expected to have been defined in the design doc.
:param query_params: dictionary of parameters to pass over to the filter function
:param sleepSecs: amount of seconds to sleep after the replication job is created
:return: status of the replication creation
"""
listDbs = self.listDatabases()
if source not in listDbs:
check_server_url(source)
if destination not in self.listDatabases():
if destination not in listDbs:
if create_target and not destination.startswith("http"):
check_name(destination)
else:
check_server_url(destination)

if not destination.startswith("http"):
destination = '%s/%s' % (self.url, destination)
if not source.startswith("http"):
Expand All @@ -1055,7 +1081,10 @@ def replicate(self, source, destination, continuous=False,
data["filter"] = filter
if query_params:
data["query_params"] = query_params
return self.post('/_replicator', data)
resp = self.post('/_replicator', data)
# Sleep required for CouchDB 3.x unit tests
time.sleep(sleepSecs)
return resp

def status(self):
"""
Expand Down Expand Up @@ -1132,6 +1161,13 @@ def __init__(self, reason, data, result):
CouchError.__init__(self, reason, data, result)
self.type = "CouchConflictError"


class CouchFeatureGone(CouchError):
def __init__(self, reason, data, result):
CouchError.__init__(self, reason, data, result)
self.type = "CouchFeatureGone"


class CouchPreconditionFailedError(CouchError):
def __init__(self, reason, data, result):
CouchError.__init__(self, reason, data, result)
Expand Down
5 changes: 4 additions & 1 deletion src/python/WMCore/Services/RequestDB/RequestDBReader.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ def _commonInit(self, couchURL, couchapp):
self.dbName = self.couchDB.name
self.couchServer = CouchServer(self.couchURL)
else:
couchURL = sanitizeURL(couchURL)['url']
# NOTE: starting in CouchDB 3.x, we need to provide the couch credentials in
# order to be able to write to the database, thus a RequestDBWriter object
if isinstance(self.__class__, RequestDBReader):
couchURL = sanitizeURL(couchURL)['url']
self.couchURL, self.dbName = splitCouchServiceURL(couchURL)
self.couchServer = CouchServer(self.couchURL)
self.couchDB = self.couchServer.connectDatabase(self.dbName, False)
Expand Down
2 changes: 2 additions & 0 deletions src/python/WMCore/Services/Requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ def __init__(self, url='http://localhost', idict=None):
urlComponent = sanitizeURL(url)
if urlComponent['username'] is not None:
self.addBasicAuth(urlComponent['username'], urlComponent['password'])
# CouchDB 3.x requires user/passwd in the source/target of replication docs
# More info in: https://github.com/dmwm/WMCore/pull/11001
url = urlComponent['url'] # remove user, password from url

self.setdefault("host", url)
Expand Down
27 changes: 15 additions & 12 deletions src/python/WMCore/WorkQueue/WorkQueueBackend.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,43 +76,46 @@ def __init__(self, db_url, db_name='workqueue',
self.db = self.server.connectDatabase(db_name, create=False, size=10000)
self.hostWithAuth = db_url
self.inbox = self.server.connectDatabase(inbox_name, create=False, size=10000)
self.queueUrlWithAuth = queueUrl or (db_url + '/' + db_name)
self.queueUrl = sanitizeURL(queueUrl or (db_url + '/' + db_name))['url']
self.eleKey = 'WMCore.WorkQueue.DataStructs.WorkQueueElement.WorkQueueElement'

def forceQueueSync(self):
"""Force a blocking replication - used only in tests"""
self.pullFromParent(continuous=False)
self.sendToParent(continuous=False)
"""Setup CouchDB replications - used only in tests"""
self.pullFromParent(continuous=True)
self.sendToParent(continuous=True)

def pullFromParent(self, continuous=True, cancel=False):
"""Replicate from parent couch - blocking: used only int test"""
try:
if self.parentCouchUrl and self.queueUrl:
if self.parentCouchUrlWithAuth and self.queueUrlWithAuth:
self.logger.info("Forcing pullFromParent from parentCouch: %s to queueUrl %s/%s",
self.parentCouchUrl, self.queueUrl, self.inbox.name)
self.server.replicate(source=self.parentCouchUrl,
self.parentCouchUrlWithAuth, self.queueUrlWithAuth, self.inbox.name)
self.server.replicate(source=self.parentCouchUrlWithAuth,
destination="%s/%s" % (self.hostWithAuth, self.inbox.name),
filter='WorkQueue/queueFilter',
query_params={'childUrl': self.queueUrl, 'parentUrl': self.parentCouchUrl},
query_params={'childUrl': self.queueUrl,
'parentUrl': self.parentCouchUrl},
continuous=continuous,
cancel=cancel)
except Exception as ex:
self.logger.warning('Replication from %s failed: %s' % (self.parentCouchUrl, str(ex)))
self.logger.warning('Replication from %s failed: %s' % (self.parentCouchUrlWithAuth, str(ex)))

def sendToParent(self, continuous=True, cancel=False):
"""Replicate to parent couch - blocking: used only int test"""
try:
if self.parentCouchUrl and self.queueUrl:
if self.parentCouchUrlWithAuth and self.queueUrlWithAuth:
self.logger.info("Forcing sendToParent from queueUrl %s/%s to parentCouch: %s",
self.queueUrl, self.inbox.name, self.parentCouchUrl)
self.queueUrlWithAuth, self.inbox.name, self.parentCouchUrlWithAuth)
self.server.replicate(source="%s" % self.inbox.name,
destination=self.parentCouchUrlWithAuth,
filter='WorkQueue/queueFilter',
query_params={'childUrl': self.queueUrl, 'parentUrl': self.parentCouchUrl},
query_params={'childUrl': self.queueUrl,
'parentUrl': self.parentCouchUrl},
continuous=continuous,
cancel=cancel)
except Exception as ex:
self.logger.warning('Replication to %s failed: %s' % (self.parentCouchUrl, str(ex)))
self.logger.warning('Replication to %s failed: %s' % (self.parentCouchUrlWithAuth, str(ex)))

def getElementsForSplitting(self):
"""Returns the elements from the inbox that need to be split,
Expand Down

0 comments on commit 4355e8a

Please sign in to comment.