From 8648925fdc7d54a9e20dfdaf601631a8c3089033 Mon Sep 17 00:00:00 2001 From: Alan Malta Rodrigues Date: Thu, 17 Feb 2022 18:46:14 -0500 Subject: [PATCH 1/3] Fix delayed replication in CouchDB 3.x Use RequestDBWriter with couch credentials Fix replication to use credentials on both source/target New Couch status 410; replace temp view by permanent one Fix RotatingDatabase class in CMSCouch - temp view Keep sanitizing url in Requests class only add replication sleep time for unittest replications do not allow cmsweb couchdb databases to be deleted log replication doc deletion and creation in AgentStatusPoller Create parameter for seconds to sleep after replication is created fix docstring --- .../WorkQueue/filters/queueFilter.js | 4 +- .../AgentStatusWatcher/AgentStatusPoller.py | 21 +++-- src/python/WMCore/Database/CMSCouch.py | 86 +++++++++++++------ .../Services/RequestDB/RequestDBReader.py | 5 +- src/python/WMCore/Services/Requests.py | 2 + .../WMCore/WorkQueue/WorkQueueBackend.py | 27 +++--- 6 files changed, 98 insertions(+), 47 deletions(-) diff --git a/src/couchapps/WorkQueue/filters/queueFilter.js b/src/couchapps/WorkQueue/filters/queueFilter.js index f30c8b6fa5..b595606885 100644 --- a/src/couchapps/WorkQueue/filters/queueFilter.js +++ b/src/couchapps/WorkQueue/filters/queueFilter.js @@ -1,5 +1,5 @@ function(doc, req) { - if (doc._deleted){ + if (doc._deleted){ return false; } @@ -8,4 +8,4 @@ function(doc, req) { return (ele['ChildQueueUrl'] === req.query.childUrl && ele['ParentQueueUrl'] === req.query.parentUrl); } return false; -} \ No newline at end of file +} diff --git a/src/python/WMComponent/AgentStatusWatcher/AgentStatusPoller.py b/src/python/WMComponent/AgentStatusWatcher/AgentStatusPoller.py index 9812af39b0..70d4d3a1a7 100644 --- a/src/python/WMComponent/AgentStatusWatcher/AgentStatusPoller.py +++ b/src/python/WMComponent/AgentStatusWatcher/AgentStatusPoller.py @@ -72,6 +72,14 @@ def __init__(self, config): self.workqueueDS = WorkQueueDS(localWQUrl) def setUpCouchDBReplication(self): + """ + This method will delete the current replication documents and + fresh new ones will be created. + :return: None + """ + # delete old replicator docs before setting up fresh ones + resp = self.localCouchMonitor.deleteReplicatorDocs() + logging.info("Deleted old replication documents and the response was: %s", resp) self.replicatorDocs = [] # set up common replication code @@ -96,14 +104,13 @@ def setUpCouchDBReplication(self): self.replicatorDocs.append({'source': localQInboxURL, 'target': parentQURL, 'filter': wqfilter, 'query_params': query_params}) - # delete old replicator docs before setting up - self.localCouchMonitor.deleteReplicatorDocs() - + logging.info("Going to create %d new replication documents", len(self.replicatorDocs)) for rp in self.replicatorDocs: - self.localCouchMonitor.couchServer.replicate( - rp['source'], rp['target'], filter=rp['filter'], - query_params=rp.get('query_params', False), - continuous=True) + resp = self.localCouchMonitor.couchServer.replicate(rp['source'], rp['target'], + continuous=True, + filter=rp['filter'], + query_params=rp.get('query_params', False)) + logging.info(".. response for the replication document creation was: %s", resp) def setup(self, parameters): """ diff --git a/src/python/WMCore/Database/CMSCouch.py b/src/python/WMCore/Database/CMSCouch.py index 8f52153958..99730b1d15 100644 --- a/src/python/WMCore/Database/CMSCouch.py +++ b/src/python/WMCore/Database/CMSCouch.py @@ -133,7 +133,9 @@ def makeRequest(self, uri=None, data=None, type='GET', incoming_headers=None, encode, decode, contentType) except HTTPException as e: self.checkForCouchError(getattr(e, "status", None), - getattr(e, "reason", None), data) + getattr(e, "reason", None), + data, + getattr(e, "result", None)) return result @@ -157,6 +159,8 @@ def checkForCouchError(self, status, reason, data=None, result=None): raise CouchNotAcceptableError(reason, data, result) elif status == 409: raise CouchConflictError(reason, data, result) + elif status == 410: + raise CouchFeatureGone(reason, data, result) elif status == 412: raise CouchPreconditionFailedError(reason, data, result) elif status == 416: @@ -882,7 +886,7 @@ def _expire(self): now = datetime.now() then = now - self.timing['expire'] - options = {'startkey': 0, 'endkey': time.mktime(then.timetuple())} + options = {'startkey': 0, 'endkey': int(time.mktime(then.timetuple()))} expired = self._find_dbs_in_state('archived', options) for db in expired: try: @@ -894,14 +898,25 @@ def _expire(self): self.seed_db.queueDelete(db_state) self.seed_db.commit() + def _create_design_doc(self): + """Create a design doc with a view for the rotate state""" + tempDesignDoc = {'views': { + 'rotateState': { + 'map': "function(doc) {emit(doc.timestamp, doc.rotate_state, doc._id);}" + }, + } + } + self.seed_db.put('/%s/_design/TempDesignDoc' % self.seed_db.name, tempDesignDoc) + def _find_dbs_in_state(self, state, options=None): + """Creates a design document with a single (temporary) view in it""" options = options or {} - # TODO: couchapp this, how to make sure that the app is deployed? - find = {'map': "function(doc) {if(doc.rotate_state == '%s') {emit(doc.timestamp, doc._id);}}" % state} - uri = '/%s/_temp_view' % self.seed_db.name - if options: - uri += '?%s' % urllib.parse.urlencode(options) - data = self.seed_db.post(uri, find) + if self.seed_db.documentExists("_design/TempDesignDoc"): + logging.info("Skipping designDoc creation because it already exists!") + else: + self._create_design_doc() + + data = self.seed_db.loadView("TempDesignDoc", "rotateState", options=options) return data['rows'] def inactive_dbs(self): @@ -997,9 +1012,13 @@ def createDatabase(self, dbname, size=1000): return Database(dbname=dbname, url=self.url, size=size, ckey=self.ckey, cert=self.cert) def deleteDatabase(self, dbname): - "Delete a database from the server" + """Delete a database from the server""" check_name(dbname) dbname = urllib.parse.quote_plus(dbname) + if "cmsweb" in self.url: + msg = f"You can't be serious that you want to delete a PROODUCTION database!!! " + msg += f"At url: {self.url}, for database name: {dbname}. Bailing out!" + raise RuntimeError(msg) return self.delete("/%s" % dbname) def connectDatabase(self, dbname='database', create=True, size=1000): @@ -1014,18 +1033,11 @@ def connectDatabase(self, dbname='database', create=True, size=1000): def replicate(self, source, destination, continuous=False, create_target=False, cancel=False, doc_ids=False, - filter=False, query_params=False): - """Trigger replication between source and destination. Options are as - described in http://wiki.apache.org/couchdb/Replication, in summary: - continuous = bool, trigger continuous replication - create_target = bool, implicitly create the target database - cancel = bool, stop continuous replication - doc_ids = list, id's of specific documents you want to replicate - filter = string, name of the filter function you want to apply to - the replication, the function should be defined in a design - document in the source database. - query_params = dictionary of parameters to pass into the filter - function + filter=False, query_params=False, sleepSecs=0): + """ + Trigger replication between source and destination. CouchDB options are + defined in: https://docs.couchdb.org/en/3.1.2/api/server/common.html#replicate + with further details in: https://docs.couchdb.org/en/stable/replication/replicator.html Source and destination need to be appropriately urlquoted after the port number. E.g. if you have a database with /'s in the name you need to @@ -1033,14 +1045,28 @@ def replicate(self, source, destination, continuous=False, TODO: Improve source/destination handling - can't simply URL quote, though, would need to decompose the URL and rebuild it. - """ - if source not in self.listDatabases(): + + :param source: string with the source url to replicate data from + :param destination: string with the destination url to replicate data to + :param continuous: boolean to perform a continuous replication or not + :param create_target: boolean to create the target database, if non-existent + :param cancel: boolean to stop a replication (but we better just delete the doc!) + :param doc_ids: a list of specific doc ids that we would like to replicate + :param filter: string with the name of the filter function to be used. Note that + this filter is expected to have been defined in the design doc. + :param query_params: dictionary of parameters to pass over to the filter function + :param sleepSecs: amount of seconds to sleep after the replication job is created + :return: status of the replication creation + """ + listDbs = self.listDatabases() + if source not in listDbs: check_server_url(source) - if destination not in self.listDatabases(): + if destination not in listDbs: if create_target and not destination.startswith("http"): check_name(destination) else: check_server_url(destination) + if not destination.startswith("http"): destination = '%s/%s' % (self.url, destination) if not source.startswith("http"): @@ -1055,7 +1081,10 @@ def replicate(self, source, destination, continuous=False, data["filter"] = filter if query_params: data["query_params"] = query_params - return self.post('/_replicator', data) + resp = self.post('/_replicator', data) + # Sleep required for CouchDB 3.x unit tests + time.sleep(sleepSecs) + return resp def status(self): """ @@ -1132,6 +1161,13 @@ def __init__(self, reason, data, result): CouchError.__init__(self, reason, data, result) self.type = "CouchConflictError" + +class CouchFeatureGone(CouchError): + def __init__(self, reason, data, result): + CouchError.__init__(self, reason, data, result) + self.type = "CouchFeatureGone" + + class CouchPreconditionFailedError(CouchError): def __init__(self, reason, data, result): CouchError.__init__(self, reason, data, result) diff --git a/src/python/WMCore/Services/RequestDB/RequestDBReader.py b/src/python/WMCore/Services/RequestDB/RequestDBReader.py index c8cdef0326..5eb62374b6 100644 --- a/src/python/WMCore/Services/RequestDB/RequestDBReader.py +++ b/src/python/WMCore/Services/RequestDB/RequestDBReader.py @@ -21,7 +21,10 @@ def _commonInit(self, couchURL, couchapp): self.dbName = self.couchDB.name self.couchServer = CouchServer(self.couchURL) else: - couchURL = sanitizeURL(couchURL)['url'] + # NOTE: starting in CouchDB 3.x, we need to provide the couch credentials in + # order to be able to write to the database, thus a RequestDBWriter object + if isinstance(self.__class__, RequestDBReader): + couchURL = sanitizeURL(couchURL)['url'] self.couchURL, self.dbName = splitCouchServiceURL(couchURL) self.couchServer = CouchServer(self.couchURL) self.couchDB = self.couchServer.connectDatabase(self.dbName, False) diff --git a/src/python/WMCore/Services/Requests.py b/src/python/WMCore/Services/Requests.py index 89d582756e..0529193054 100644 --- a/src/python/WMCore/Services/Requests.py +++ b/src/python/WMCore/Services/Requests.py @@ -100,6 +100,8 @@ def __init__(self, url='http://localhost', idict=None): urlComponent = sanitizeURL(url) if urlComponent['username'] is not None: self.addBasicAuth(urlComponent['username'], urlComponent['password']) + # CouchDB 3.x requires user/passwd in the source/target of replication docs + # More info in: https://github.com/dmwm/WMCore/pull/11001 url = urlComponent['url'] # remove user, password from url self.setdefault("host", url) diff --git a/src/python/WMCore/WorkQueue/WorkQueueBackend.py b/src/python/WMCore/WorkQueue/WorkQueueBackend.py index 37efd51db7..74dc4c94df 100644 --- a/src/python/WMCore/WorkQueue/WorkQueueBackend.py +++ b/src/python/WMCore/WorkQueue/WorkQueueBackend.py @@ -76,43 +76,46 @@ def __init__(self, db_url, db_name='workqueue', self.db = self.server.connectDatabase(db_name, create=False, size=10000) self.hostWithAuth = db_url self.inbox = self.server.connectDatabase(inbox_name, create=False, size=10000) + self.queueUrlWithAuth = queueUrl or (db_url + '/' + db_name) self.queueUrl = sanitizeURL(queueUrl or (db_url + '/' + db_name))['url'] self.eleKey = 'WMCore.WorkQueue.DataStructs.WorkQueueElement.WorkQueueElement' def forceQueueSync(self): - """Force a blocking replication - used only in tests""" - self.pullFromParent(continuous=False) - self.sendToParent(continuous=False) + """Setup CouchDB replications - used only in tests""" + self.pullFromParent(continuous=True) + self.sendToParent(continuous=True) def pullFromParent(self, continuous=True, cancel=False): """Replicate from parent couch - blocking: used only int test""" try: - if self.parentCouchUrl and self.queueUrl: + if self.parentCouchUrlWithAuth and self.queueUrlWithAuth: self.logger.info("Forcing pullFromParent from parentCouch: %s to queueUrl %s/%s", - self.parentCouchUrl, self.queueUrl, self.inbox.name) - self.server.replicate(source=self.parentCouchUrl, + self.parentCouchUrlWithAuth, self.queueUrlWithAuth, self.inbox.name) + self.server.replicate(source=self.parentCouchUrlWithAuth, destination="%s/%s" % (self.hostWithAuth, self.inbox.name), filter='WorkQueue/queueFilter', - query_params={'childUrl': self.queueUrl, 'parentUrl': self.parentCouchUrl}, + query_params={'childUrl': self.queueUrl, + 'parentUrl': self.parentCouchUrl}, continuous=continuous, cancel=cancel) except Exception as ex: - self.logger.warning('Replication from %s failed: %s' % (self.parentCouchUrl, str(ex))) + self.logger.warning('Replication from %s failed: %s' % (self.parentCouchUrlWithAuth, str(ex))) def sendToParent(self, continuous=True, cancel=False): """Replicate to parent couch - blocking: used only int test""" try: - if self.parentCouchUrl and self.queueUrl: + if self.parentCouchUrlWithAuth and self.queueUrlWithAuth: self.logger.info("Forcing sendToParent from queueUrl %s/%s to parentCouch: %s", - self.queueUrl, self.inbox.name, self.parentCouchUrl) + self.queueUrlWithAuth, self.inbox.name, self.parentCouchUrlWithAuth) self.server.replicate(source="%s" % self.inbox.name, destination=self.parentCouchUrlWithAuth, filter='WorkQueue/queueFilter', - query_params={'childUrl': self.queueUrl, 'parentUrl': self.parentCouchUrl}, + query_params={'childUrl': self.queueUrl, + 'parentUrl': self.parentCouchUrl}, continuous=continuous, cancel=cancel) except Exception as ex: - self.logger.warning('Replication to %s failed: %s' % (self.parentCouchUrl, str(ex))) + self.logger.warning('Replication to %s failed: %s' % (self.parentCouchUrlWithAuth, str(ex))) def getElementsForSplitting(self): """Returns the elements from the inbox that need to be split, From 0773b0d232292991018fae520556b1d4a90efbed Mon Sep 17 00:00:00 2001 From: Alan Malta Rodrigues Date: Fri, 10 Jun 2022 09:08:55 -0400 Subject: [PATCH 2/3] Ensure dataset_lifetime bind is defined for DBS3Buffer/NewSubscription get dict key safely --- src/python/WMComponent/DBS3Buffer/MySQL/NewSubscription.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/python/WMComponent/DBS3Buffer/MySQL/NewSubscription.py b/src/python/WMComponent/DBS3Buffer/MySQL/NewSubscription.py index 1529e1aa27..6a8212d395 100644 --- a/src/python/WMComponent/DBS3Buffer/MySQL/NewSubscription.py +++ b/src/python/WMComponent/DBS3Buffer/MySQL/NewSubscription.py @@ -39,6 +39,8 @@ def _createPhEDExSubBinds(self, datasetID, subscriptionInfo, custodialFlag): # DeleteFromSource is not supported for move subscriptions delete_blocks = None + # if None, force it to 0 as per the table schema + dataLifetime = subscriptionInfo.get('DatasetLifetime', 0) or 0 if custodialFlag: sites = subscriptionInfo['CustodialSites'] phedex_group = subscriptionInfo['CustodialGroup'] @@ -59,9 +61,8 @@ def _createPhEDExSubBinds(self, datasetID, subscriptionInfo, custodialFlag): 'move': isMove, 'priority': subscriptionInfo['Priority'], 'phedex_group': phedex_group, - 'delete_blocks': delete_blocks} - if subscriptionInfo['DatasetLifetime'] is not None: - bind.update(dict(dataset_lifetime=subscriptionInfo['DatasetLifetime'])) + 'delete_blocks': delete_blocks, + 'dataset_lifetime': dataLifetime} binds.append(bind) return binds From 9931e7ad6918820f34c14f2b9efe84a0e18842c3 Mon Sep 17 00:00:00 2001 From: Alan Malta Rodrigues Date: Wed, 15 Jun 2022 23:40:27 -0400 Subject: [PATCH 3/3] Better error handling for DBS3Upload block injection --- .../WMComponent/DBS3Buffer/DBSUploadPoller.py | 24 +++++++++++++------ 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/src/python/WMComponent/DBS3Buffer/DBSUploadPoller.py b/src/python/WMComponent/DBS3Buffer/DBSUploadPoller.py index 70947729f4..b0aeaf996c 100644 --- a/src/python/WMComponent/DBS3Buffer/DBSUploadPoller.py +++ b/src/python/WMComponent/DBS3Buffer/DBSUploadPoller.py @@ -99,18 +99,13 @@ def uploadWorker(workInput, results, dbsUrl, gzipEncoding=False): logging.warning("Block %s already exists. Marking it as uploaded.", name) logging.debug("Exception: %s", exString) results.put({'name': name, 'success': "uploaded"}) - elif 'Proxy Error' in exString: - # This is probably a successfully insertion that went bad. - # Put it on the check list - msg = "Got a proxy error for block %s." % name - logging.warning(msg) - results.put({'name': name, 'success': "check"}) elif 'Missing data when inserting to dataset_parents' in exString: msg = "Parent dataset is not inserted yet for block %s." % name logging.warning(msg) results.put({'name': name, 'success': "error", 'error': msg}) else: - msg = "Error trying to process block %s through DBS. Error: %s" % (name, exString) + reason = parseDBSException(exString) + msg = "Error trying to process block %s through DBS. Error: %s" % (name, reason) logging.exception(msg) logging.debug("block info: %s \n", block) results.put({'name': name, 'success': "error", 'error': msg}) @@ -118,6 +113,21 @@ def uploadWorker(workInput, results, dbsUrl, gzipEncoding=False): return +def parseDBSException(exBodyString): + """ + parse DBS Go-based server exception + :param exBodyString: exception message body string (not exception). + The upstream code extract HTTP body from exception object and pass it here. + :return: either (parsed) concise exception message or original body string + """ + try: + data = json.loads(exBodyString) + # dbs2go always return a list + return data[0]['error']['reason'] + except: + return exBodyString + + def isPassiveError(exceptionObj): """ This function will parse the exception object and report whether