diff --git a/BackbeatClient/backbeat-2017-07-01.api.json b/BackbeatClient/backbeat-2017-07-01.api.json index d658a2a9..2d97f085 100644 --- a/BackbeatClient/backbeat-2017-07-01.api.json +++ b/BackbeatClient/backbeat-2017-07-01.api.json @@ -16,7 +16,7 @@ "PutData": { "http": { "method": "PUT", - "requestUri": "/_/backbeat/data/{Bucket}/{Key+}" + "requestUri": "/_/backbeat/data/{Bucket}/{Key+}?v2" }, "input": { "type": "structure", @@ -66,9 +66,27 @@ }, "dataStoreName": { "type": "string" + }, + "cryptoScheme": { + "type": "long" + }, + "cipheredDataKey": { + "type": "string" } } } + }, + "ServerSideEncryption": { + "location": "header", + "locationName": "x-amz-server-side-encryption" + }, + "SSECustomerAlgorithm": { + "location": "header", + "locationName": "x-amz-server-side-encryption-customer-algorithm" + }, + "SSEKMSKeyId": { + "location": "header", + "locationName": "x-amz-server-side-encryption-aws-kms-key-id" } }, "payload": "Location" @@ -140,6 +158,10 @@ "location": "header", "locationName": "X-Scal-Version-Id" }, + "Tags": { + "location": "header", + "locationName": "X-Scal-Tags" + }, "Body": { "streaming": true, "type": "blob" @@ -152,6 +174,12 @@ "members": { "versionId": { "type": "string" + }, + "location": { + "type": "list", + "member": { + "shape": "LocationMDObj" + } } } } @@ -200,6 +228,53 @@ } } }, + "MultipleBackendHeadObject": { + "http": { + "method": "GET", + "requestUri": "/_/backbeat/multiplebackendmetadata/{Bucket}/{Key+}" + }, + "input": { + "type": "structure", + "required": [ + "Bucket", + "Key", + "Locations" + ], + "members": { + "Bucket": { + "location": "uri", + "locationName": "Bucket" + }, + "Key": { + "location": "uri", + "locationName": "Key" + }, + "Locations": { + "location": "header", + "locationName": "X-Scal-Locations", + "type": "string", + "member": { + "type": "structure", + "required": [ + "key", + "dataStoreName" + ], + "member": { + "shape": "LocationMDObj" + } + } + } + } + }, + "output": { + "type": "structure", + "members": { + "lastModified": { + "type": "string" + } + } + } + }, "MultipleBackendPutMPUPart": { "http": { "method": "PUT", @@ -318,6 +393,10 @@ "location": "header", "locationName": "X-Scal-Content-Encoding" }, + "Tags": { + "location": "header", + "locationName": "X-Scal-Tags" + }, "Body": { "type": "blob" } @@ -333,6 +412,46 @@ } } }, + "MultipleBackendAbortMPU": { + "http": { + "method": "DELETE", + "requestUri": "/_/backbeat/multiplebackenddata/{Bucket}/{Key+}?operation=abortmpu" + }, + "input": { + "type": "structure", + "required": [ + "Bucket", + "Key", + "StorageClass" + ], + "members": { + "Bucket": { + "location": "uri", + "locationName": "Bucket" + }, + "Key": { + "location": "uri", + "locationName": "Key" + }, + "StorageType": { + "location": "header", + "locationName": "X-Scal-Storage-Type" + }, + "StorageClass": { + "location": "header", + "locationName": "X-Scal-Storage-Class" + }, + "UploadId": { + "location": "header", + "locationName": "X-Scal-Upload-Id" + } + } + }, + "output": { + "type": "structure", + "members": {} + } + }, "MultipleBackendCompleteMPU": { "http": { "method": "POST", @@ -390,6 +509,10 @@ "location": "header", "locationName": "X-Scal-Upload-Id" }, + "Tags": { + "location": "header", + "locationName": "X-Scal-Tags" + }, "Body": { "type": "blob" } @@ -401,6 +524,12 @@ "members": { "versionId": { "type": "string" + }, + "location": { + "type": "list", + "member": { + "shape": "LocationMDObj" + } } } } @@ -549,6 +678,12 @@ "location": "uri", "locationName": "Key" }, + "VersionId": { + "type": "string", + "documentation": "VersionId used to reference a specific version of the object.", + "location": "querystring", + "locationName": "versionId" + }, "ContentLength": { "location": "header", "locationName": "Content-Length", @@ -845,13 +980,34 @@ "BatchDelete": { "http": { "method": "POST", - "requestUri": "/_/backbeat/batchdelete" + "requestUri": "/_/backbeat/batchdelete/{Bucket}/{Key+}" }, "input": { "type": "structure", "required": [ ], "members": { + "Bucket": { + "location": "uri", + "locationName": "Bucket" + }, + "Key": { + "location": "uri", + "locationName": "Key" + }, + "IfUnmodifiedSince": { + "location": "header", + "locationName": "If-Unmodified-Since", + "type": "string" + }, + "StorageClass": { + "location": "header", + "locationName": "X-Scal-Storage-Class" + }, + "Tags": { + "location": "header", + "locationName": "X-Scal-Tags" + }, "ContentType": { "location": "header", "locationName": "X-Scal-Content-Type" @@ -873,6 +1029,9 @@ }, "size": { "type": "integer" + }, + "dataStoreVersionId": { + "type": "string" } } } @@ -888,7 +1047,7 @@ "GetRaftBuckets": { "http": { "method": "GET", - "requestUri": "/_/metadata/listbuckets/{LogId}" + "requestUri": "/_/metadata/admin/raft_sessions/{LogId}/bucket" }, "input": { "type": "structure", @@ -909,10 +1068,67 @@ } } }, + "GetRaftId": { + "http": { + "method": "GET", + "requestUri": "/_/metadata/admin/buckets/{Bucket}/id" + }, + "input": { + "type": "structure", + "required": [ + "Bucket" + ], + "members": { + "Bucket": { + "location": "uri", + "locationName": "Bucket" + } + } + }, + "output": { + "type": "string" + } + }, + "GetRaftLog": { + "http": { + "method": "GET", + "requestUri": "/_/metadata/admin/raft_sessions/{LogId}/log" + }, + "input": { + "type": "structure", + "required": [ + "LogId" + ], + "members": { + "LogId": { + "location": "uri", + "locationName": "LogId" + }, + "Begin": { + "type": "integer", + "location": "querystring", + "locationName": "begin" + }, + "Limit": { + "type": "integer", + "location": "querystring", + "locationName": "limit" + }, + "TargetLeader": { + "type": "boolean", + "location": "querystring", + "locationName": "targetLeader" + } + } + }, + "output": { + "shape": "RaftLogOutput" + } + }, "GetBucketMetadata": { "http": { "method": "GET", - "requestUri": "/_/metadata/getbucket/{Bucket}" + "requestUri": "/_/metadata/default/attributes/{Bucket}" }, "input": { "type": "structure", @@ -933,7 +1149,7 @@ "GetObjectList": { "http": { "method": "GET", - "requestUri": "/_/metadata/listobjects/{Bucket}" + "requestUri": "/_/metadata/default/bucket/{Bucket}" }, "input": { "type": "structure", @@ -951,32 +1167,33 @@ "shape": "ObjectMDListResponse" } }, - "GetObjectMetadata": { + "GetBucketCseq": { "http": { "method": "GET", - "requestUri": "/_/metadata/getobject/{Bucket}/{Key+}" + "requestUri": "/_/metadata/default/informations/{Bucket}" }, "input": { "type": "structure", "required": [ - "Bucket", - "Key" + "Bucket" ], "members": { "Bucket": { "location": "uri", "locationName": "Bucket" - }, - "Key": { - "location": "uri", - "locationName": "Key" } } }, "output": { - "type": "map", - "key": {}, - "value": {} + "type": "list", + "member": { + "type": "structure", + "members": { + "cseq": { + "type": "integer" + } + } + } } } }, @@ -999,6 +1216,18 @@ "members": { "Contents": { "shape": "ObjectMDList" + }, + "CommonPrefixes": { + "type": "list", + "members": { + "type": "string" + } + }, + "IsTruncated": { + "type": "boolean" + }, + "Delimiter": { + "type": "string" } } }, @@ -1198,6 +1427,79 @@ } } } + }, + "LocationMDObj": { + "type": "structure", + "members": { + "key": { + "type": "string" + }, + "size": { + "type": "integer" + }, + "start": { + "type": "integer" + }, + "dataStoreName": { + "type": "string" + }, + "dataStoreType": { + "type": "string" + }, + "dataStoreETag": { + "type": "string" + }, + "dataStoreVersionId": { + "type": "string" + } + } + }, + "RaftLogOutput": { + "type": "structure", + "members": { + "info": { + "type": "structure", + "members": { + "start": { + "type": "integer" + }, + "cseq": { + "type": "integer" + }, + "prune": { + "type": "integer" + } + } + }, + "log": { + "type": "list", + "member": { + "type": "structure", + "members": { + "db": { + "type": "string" + }, + "entries": { + "type": "list", + "member": { + "type": "structure", + "members": { + "key": { + "type": "string" + }, + "value": { + "type": "string" + } + } + } + }, + "method": { + "type": "integer" + } + } + } + } + } } } } diff --git a/CrrExistingObjects/listingParser.js b/CrrExistingObjects/listingParser.js deleted file mode 100644 index 5d47b9bd..00000000 --- a/CrrExistingObjects/listingParser.js +++ /dev/null @@ -1,24 +0,0 @@ -function listingParser(entries) { - if (!entries) { - return entries; - } - return entries.map(entry => { - const tmp = JSON.parse(entry.value); - return { - Key: entry.key, - Size: tmp['content-length'], - ETag: tmp['content-md5'], - VersionId: tmp.versionId, - IsNull: tmp.isNull, - IsDeleteMarker: tmp.isDeleteMarker, - LastModified: tmp['last-modified'], - Owner: { - DisplayName: tmp['owner-display-name'], - ID: tmp['owner-id'], - }, - StorageClass: tmp['x-amz-storage-class'], - }; - }); -} - -module.exports = listingParser; diff --git a/CrrExistingObjects/metadataClient.js b/CrrExistingObjects/metadataClient.js deleted file mode 100644 index 58d20c86..00000000 --- a/CrrExistingObjects/metadataClient.js +++ /dev/null @@ -1,20 +0,0 @@ -const { MetadataWrapper } = require('arsenal').storage.metadata; -const werelogs = require('werelogs'); -const createMongoParams = require('../utils/createMongoParams'); -const listingParser = require('./listingParser'); - -const loggerConfig = { - level: 'info', - dump: 'error', -}; -werelogs.configure(loggerConfig); - -const log = new werelogs.Logger('s3utils::crrExistingObjects'); -const implName = 'mongodb'; -const params = { - customListingParser: listingParser, - mongodb: createMongoParams(log, { readPreference: 'primary' }), -}; -const metadata = new MetadataWrapper(implName, params, null, log); - -module.exports = metadata; diff --git a/CrrExistingObjects/metadataUtils.js b/CrrExistingObjects/metadataUtils.js deleted file mode 100644 index 8c197baa..00000000 --- a/CrrExistingObjects/metadataUtils.js +++ /dev/null @@ -1,211 +0,0 @@ -const { errors, versioning } = require('arsenal'); -const metadataClient = require('./metadataClient'); - -const versionIdUtils = versioning.VersionID; - -const { GENERATE_INTERNAL_VERSION_ID } = process.env; -const REPLICATION_GROUP_ID = process.env.REPLICATION_GROUP_ID || 'RG001'; -// Use Arsenal function to generate a version ID used internally by metadata -// for null versions that are created before bucket versioning is configured -const nonVersionedObjId = versionIdUtils.getInfVid(REPLICATION_GROUP_ID); - -function _processVersions(list) { - /* eslint-disable no-param-reassign */ - list.NextVersionIdMarker = list.NextVersionIdMarker - ? versionIdUtils.encode(list.NextVersionIdMarker) - : list.NextVersionIdMarker; - - list.Versions.forEach(v => { - v.VersionId = v.VersionId - ? versionIdUtils.encode(v.VersionId) : v.VersionId; - }); - /* eslint-enable no-param-reassign */ - return list; -} - -function listObjectVersions(params, log, cb) { - const bucketName = params.Bucket; - const listingParams = { - listingType: 'DelimiterVersions', - maxKeys: params.MaxKeys, - prefix: params.Prefix, - keyMarker: params.KeyMarker, - versionIdMarker: params.VersionIdMarker, - }; - log.debug('listing object versions', { - method: 'metadataUtils.listObjectVersions', - listingParams, - }); - return metadataClient.listObject( - bucketName, - listingParams, - log, - (err, list) => { - if (err) { - return cb(err); - } - return cb(null, _processVersions(list)); - }, - ); -} - -function _formatConfig(config) { - const { role, destination, rules } = config; - const Rules = rules.map(rule => { - const { - prefix, enabled, storageClass, id, - } = rule; - return { - ID: id, - Prefix: prefix, - Status: enabled ? 'Enabled' : 'Disabled', - Destination: { - Bucket: destination, - StorageClass: (storageClass || ''), - }, - }; - }); - return { - ReplicationConfiguration: { - Role: role, - Rules, - }, - }; -} - -function getBucketReplication(options, log, cb) { - const bucketName = options.Bucket; - log.debug('getting bucket replication', { - method: 'metadataUtils.getBucketReplication', - bucket: bucketName, - }); - return metadataClient.getBucket(bucketName, log, (err, data) => { - if (err) { - return cb(err); - } - const replConf = _formatConfig(data._replicationConfiguration); - return cb(null, replConf); - }); -} - -function _getNullVersion(objMD, bucketName, objectKey, log, cb) { - const options = {}; - if (objMD.isNull || !objMD.versionId) { - log.debug('found null version'); - return process.nextTick(() => cb(null, objMD)); - } - if (objMD.nullVersionId) { - log.debug('null version exists, get the null version'); - options.versionId = objMD.nullVersionId; - return metadataClient.getObjectMD( - bucketName, - objectKey, - options, - log, - cb, - ); - } - return process.nextTick(() => cb()); -} - -function getMetadata(params, log, cb) { - const { Bucket, Key } = params; - let versionId = params.VersionId; - log.debug('getting object metadata', { - method: 'metadataUtils.getMetadata', - bucket: Bucket, - objectKey: Key, - versionId, - }); - if (versionId && versionId !== 'null') { - versionId = versionIdUtils.decode(versionId); - } - if (versionId instanceof Error) { - const errMsg = 'Invalid version id specified'; - return cb(errors.InvalidArgument.customizeDescription(errMsg)); - } - const mdParams = { - versionId, - }; - return metadataClient.getObjectMD( - Bucket, - Key, - mdParams, - log, - (err, data) => { - if (err) { - return cb(err); - } - if (data && versionId === 'null') { - return _getNullVersion( - data, - Bucket, - Key, - log, - (err, nullVer) => { - if (err) { - return cb(err); - } - return cb(null, nullVer); - }, - ); - } - return cb(null, data); - }, - ); -} - -function getOptions(objMD) { - const options = {}; - - if (objMD.versionId === undefined) { - if (!GENERATE_INTERNAL_VERSION_ID) { - return options; - } - - objMD.setIsNull(true); - objMD.setVersionId(nonVersionedObjId); - - options.nullVersionId = objMD.versionId; - // non-versioned (non-null) MPU objects don't have a - // replay ID, so don't reference their uploadId - if (objMD.uploadId) { - options.nullUploadId = objMD.uploadId; - } - } - - // specify both 'versioning' and 'versionId' to create a "new" - // version (updating master as well) but with specified versionId - options.versioning = true; - options.versionId = objMD.versionId; - return options; -} - -function putMetadata(params, log, cb) { - const { Bucket, Key, Body: objMD } = params; - const options = getOptions(objMD); - - log.debug('updating object metadata', { - method: 'metadataUtils.putMetadata', - bucket: Bucket, - objectKey: Key, - versionId: objMD.versionId, - }); - // If the object is from a source bucket without versioning (i.e. NFS), - // then we want to create a version for the replica object even though - // none was provided in the object metadata value. - if (objMD.replicationInfo.isNFS) { - const isReplica = objMD.replicationInfo.status === 'REPLICA'; - options.versioning = isReplica; - objMD.replicationInfo.isNFS = !isReplica; - } - return metadataClient.putObjectMD(Bucket, Key, objMD, options, log, cb); -} - -module.exports = { - metadataClient, - listObjectVersions, - getBucketReplication, - getMetadata, - putMetadata, -}; diff --git a/crrExistingObjects.js b/crrExistingObjects.js index 06312574..2c10941d 100644 --- a/crrExistingObjects.js +++ b/crrExistingObjects.js @@ -1,9 +1,14 @@ +const http = require('http'); + +const AWS = require('aws-sdk'); const { - doWhilst, eachSeries, eachLimit, waterfall, series, + doWhilst, eachSeries, eachLimit, waterfall, } = require('async'); -const werelogs = require('werelogs'); + const { ObjectMD } = require('arsenal').models; -const metadataUtil = require('./CrrExistingObjects/metadataUtils'); +const werelogs = require('werelogs'); + +const BackbeatClient = require('./BackbeatClient'); const logLevel = Number.parseInt(process.env.DEBUG, 10) === 1 ? 'debug' : 'info'; @@ -15,9 +20,12 @@ werelogs.configure(loggerConfig); const log = new werelogs.Logger('s3utils::crrExistingObjects'); const BUCKETS = process.argv[2] ? process.argv[2].split(',') : null; -const { SITE_NAME } = process.env; -let { STORAGE_TYPE } = process.env; -let { TARGET_REPLICATION_STATUS } = process.env; +const { + ACCESS_KEY, SECRET_KEY, ENDPOINT, SITE_NAME, +} = process.env; +let { + STORAGE_TYPE, TARGET_REPLICATION_STATUS, +} = process.env; const { TARGET_PREFIX } = process.env; const WORKERS = (process.env.WORKERS && Number.parseInt(process.env.WORKERS, 10)) || 10; @@ -27,18 +35,31 @@ const MAX_SCANNED = (process.env.MAX_SCANNED && Number.parseInt(process.env.MAX_SCANNED, 10)); let { KEY_MARKER } = process.env; let { VERSION_ID_MARKER } = process.env; -const { GENERATE_INTERNAL_VERSION_ID } = process.env; const LISTING_LIMIT = (process.env.LISTING_LIMIT && Number.parseInt(process.env.LISTING_LIMIT, 10)) || 1000; const LOG_PROGRESS_INTERVAL_MS = 10000; +const AWS_SDK_REQUEST_RETRIES = 100; +const AWS_SDK_REQUEST_DELAY_MS = 30; if (!BUCKETS || BUCKETS.length === 0) { log.fatal('No buckets given as input! Please provide ' + 'a comma-separated list of buckets'); process.exit(1); } +if (!ENDPOINT) { + log.fatal('ENDPOINT not defined!'); + process.exit(1); +} +if (!ACCESS_KEY) { + log.fatal('ACCESS_KEY not defined'); + process.exit(1); +} +if (!SECRET_KEY) { + log.fatal('SECRET_KEY not defined'); + process.exit(1); +} if (!STORAGE_TYPE) { STORAGE_TYPE = ''; } @@ -59,6 +80,38 @@ log.info('Objects with replication status ' + `${replicationStatusToProcess.join(' or ')} ` + 'will be reset to PENDING to trigger CRR'); +const options = { + accessKeyId: ACCESS_KEY, + secretAccessKey: SECRET_KEY, + endpoint: ENDPOINT, + region: 'us-east-1', + sslEnabled: false, + s3ForcePathStyle: true, + apiVersions: { s3: '2006-03-01' }, + signatureVersion: 'v4', + signatureCache: false, + httpOptions: { + timeout: 0, + agent: new http.Agent({ keepAlive: true }), + }, +}; +/** + * Options specific to s3 requests + * `maxRetries` & `customBackoff` are set only to s3 requests + * default aws sdk retry count is 3 with an exponential delay of 2^n * 30 ms + */ +const s3Options = { + maxRetries: AWS_SDK_REQUEST_RETRIES, + customBackoff: (retryCount, error) => { + log.error('aws sdk request error', { error, retryCount }); + // computed delay is not truly exponential, it is reset to minimum after + // every 10 calls, with max delay of 15 seconds! + return AWS_SDK_REQUEST_DELAY_MS * 2 ** (retryCount % 10); + }, +}; +const s3 = new AWS.S3(Object.assign(options, s3Options)); +const bb = new BackbeatClient(options); + let nProcessed = 0; let nSkipped = 0; let nUpdated = 0; @@ -103,14 +156,14 @@ function _markObjectPending( let skip = false; return waterfall([ // get object blob - next => metadataUtil.getMetadata({ + next => bb.getMetadata({ Bucket: bucket, Key: key, VersionId: versionId, - }, log, next), + }, next), (mdRes, next) => { - objMD = new ObjectMD(mdRes); - const md = objMD.getValue(); + objMD = new ObjectMD(JSON.parse(mdRes.Body)); + const mdBlob = objMD.getSerialized(); if (!_objectShouldBeUpdated(objMD)) { skip = true; return next(); @@ -124,13 +177,6 @@ function _markObjectPending( // was versioning-suspended when the object was put. return next(); } - if (!GENERATE_INTERNAL_VERSION_ID) { - // When the GENERATE_INTERNAL_VERSION_ID env variable is set, - // matching objects with no *internal* versionId will get - // "updated" to get an internal versionId. The external versionId - // will still be "null". - return next(); - } // The object does not have an *internal* versionId, as it // was put on a nonversioned bucket: do a first metadata // update to generate one, just passing on the existing metadata @@ -138,11 +184,13 @@ function _markObjectPending( // but the following update will be able to create a versioned key // for this object, so that replication can happen. The externally // visible version will stay "null". - return metadataUtil.putMetadata({ + return bb.putMetadata({ Bucket: bucket, Key: key, - Body: md, - }, log, (err, putRes) => { + VersionId: versionId, + ContentLength: Buffer.byteLength(mdBlob), + Body: mdBlob, + }, (err, putRes) => { if (err) { return next(err); } @@ -186,12 +234,14 @@ function _markObjectPending( objMD.setReplicationSiteStatus(storageClass, 'PENDING'); objMD.setReplicationStatus('PENDING'); objMD.updateMicroVersionId(); - const md = objMD.getValue(); - return metadataUtil.putMetadata({ + const mdBlob = objMD.getSerialized(); + return bb.putMetadata({ Bucket: bucket, Key: key, - Body: md, - }, log, next); + VersionId: versionId, + ContentLength: Buffer.byteLength(mdBlob), + Body: mdBlob, + }, next); }, ], err => { ++nProcessed; @@ -213,19 +263,19 @@ function _markObjectPending( // list object versions function _listObjectVersions(bucket, VersionIdMarker, KeyMarker, cb) { - return metadataUtil.listObjectVersions({ + return s3.listObjectVersions({ Bucket: bucket, MaxKeys: LISTING_LIMIT, Prefix: TARGET_PREFIX, VersionIdMarker, KeyMarker, - }, log, cb); + }, cb); } function _markPending(bucket, versions, cb) { const options = { Bucket: bucket }; waterfall([ - next => metadataUtil.getBucketReplication(options, log, (err, res) => { + next => s3.getBucketReplication(options, (err, res) => { if (err) { log.error('error getting bucket replication', { error: err }); return next(err); @@ -272,9 +322,7 @@ function triggerCRROnBucket(bucketName, cb) { log.error('error listing object versions', { error: err }); return done(err); } - const versions = data.DeleteMarkers - ? data.Versions.concat(data.DeleteMarkers) : data.Versions; - return _markPending(bucket, versions, err => { + return _markPending(bucket, data.Versions.concat(data.DeleteMarkers), err => { if (err) { return done(err); } @@ -331,11 +379,7 @@ function triggerCRROnBucket(bucketName, cb) { } // trigger the calls to list objects and mark them for crr -series([ - next => metadataUtil.metadataClient.setup(next), - next => eachSeries(BUCKETS, triggerCRROnBucket, next), - next => metadataUtil.metadataClient.close(next), -], err => { +eachSeries(BUCKETS, triggerCRROnBucket, err => { clearInterval(logProgressInterval); if (err) { return log.error('error during task execution', { error: err });