From 1a40126767ea209cd1fa38811348ebde72cd0099 Mon Sep 17 00:00:00 2001 From: carsonxu <459452372@qq.com> Date: Fri, 30 Oct 2020 12:55:06 +0800 Subject: [PATCH] =?UTF-8?q?getObject=E3=80=81selectObjectContent=20?= =?UTF-8?q?=E6=94=AF=E6=8C=81=20stream?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 2 +- demo/demo.js | 145 +++++++++++++++++++++++++++++++++++++++++----- demo/util.js | 2 +- package.json | 2 +- sdk/advance.js | 1 + sdk/base.js | 154 ++++++++++++++++++++++++++++++++++++++++++++----- sdk/util.js | 28 ++++++++- 7 files changed, 301 insertions(+), 33 deletions(-) diff --git a/README.md b/README.md index d27d013..2a59a5c 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ npm i cos-nodejs-sdk-v5 --save var COS = require('cos-nodejs-sdk-v5'); // 创建实例 var cos = new COS({ - SecretId: 'AKIDxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx', + SecretId: 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx', SecretKey: 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx', }); // 分片上传 diff --git a/demo/demo.js b/demo/demo.js index b39b6ac..cb4cd8c 100644 --- a/demo/demo.js +++ b/demo/demo.js @@ -723,17 +723,22 @@ function putObjectCopy() { function getObject() { var filepath1 = path.resolve(__dirname, '1mb.out1.zip'); - var filepath2 = path.resolve(__dirname, '123/1mb.out2.zip'); - // cos.getObject({ - // Bucket: config.Bucket, // Bucket 格式:test-1250000000 - // Region: config.Region, - // Key: '1mb.zip', - // onProgress: function (progressData) { - // console.log(JSON.stringify(progressData)); - // } - // }, function (err, data) { - // fs.writeFileSync(filepath1, data.Body); - // }); + var filepath2 = path.resolve(__dirname, '1mb.out2.zip'); + var filepath3 = path.resolve(__dirname, '1mb.out3.zip'); + + // file1 获取对象字节到内存变量 + cos.getObject({ + Bucket: config.Bucket, // Bucket 格式:test-1250000000 + Region: config.Region, + Key: '1mb.zip', + onProgress: function (progressData) { + console.log(JSON.stringify(progressData)); + } + }, function (err, data) { + fs.writeFileSync(filepath1, data.Body); + }); + + // file2 获取对象到本地文件 cos.getObject({ Bucket: config.Bucket, // Bucket 格式:test-1250000000 Region: config.Region, @@ -745,6 +750,19 @@ function getObject() { }, function (err, data) { console.log(err || data); }); + + // file3 pipe 格式获取对象到本地文件 + var stream = cos.getObjectStream({ + Bucket: config.Bucket, // Bucket 格式:test-1250000000 + Region: config.Region, + Key: '1mb.zip', + onProgress: function (progressData) { + console.log(JSON.stringify(progressData)); + } + }, function (err, data) { + console.log(err || data); + }); + stream.pipe(fs.createWriteStream(filepath3)) } function headObject() { @@ -845,6 +863,92 @@ function restoreObject() { }); } +var selectCsvOpt = { + Bucket: config.Bucket, // Bucket 格式:test-1250000000 + Region: config.Region, + Key: '1.csv', + SelectType: 2, + SelectRequest: { + // Expression: "select * from cosobject s limit 100", + Expression: "Select * from COSObject", + ExpressionType: "SQL", + InputSerialization: { + CSV: { + FileHeaderInfo: "IGNORE", + RecordDelimiter: "\\n", + FieldDelimiter: ",", + QuoteCharacter: "\"", + QuoteEscapeCharacter: "\"", + Comments: "#", + AllowQuotedRecordDelimiter: "FALSE" + } + }, + OutputSerialization: { + CSV: { + QuoteFields: "ASNEEDED", + RecordDelimiter: "\\n", + FieldDelimiter: ",", + QuoteCharacter: "\"", + QuoteEscapeCharacter: "\"" + } + }, + RequestProgress: { + Enabled: "FALSE" + } + }, +}; + +var selectJsonOpt = { + Bucket: config.Bucket, // Bucket 格式:test-1250000000 + Region: config.Region, + Key: '1.json', + SelectType: 2, + SelectRequest: { + Expression: "Select * from COSObject", + ExpressionType: "SQL", + InputSerialization: { + JSON: { + Type: "DOCUMENT", + }, + }, + OutputSerialization: { + JSON: { + RecordDelimiter: "\n" + }, + }, + RequestProgress: { + Enabled: "FALSE" + } + }, +}; + +function selectObjectContentStream() { + // 查询 JSON + var selectStream = cos.selectObjectContentStream({ + ...selectJsonOpt, + // DataType: 'raw', + }, function (err, data) { + console.log(err || data); + }); + var outFile = './result.txt'; + selectStream.pipe(fs.createWriteStream(outFile)); + selectStream.on('end', () => console.log(fs.readFileSync(outFile).toString())) +} + +selectObjectContent() +function selectObjectContent() { + // // 如果返回结果很大,可以用 selectObjectContentStream 处理 + // // 查询 CSV + // cos.selectObjectContent(selectCsvOpt, function (err, data) { + // console.log(err || data); + // }); + + // 查询 JSON + cos.selectObjectContent(selectJsonOpt, function (err, data) { + console.log(err || data); + }); +} + function abortUploadTask() { cos.abortUploadTask({ Bucket: config.Bucket, /* 必须 */ // Bucket 格式:test-1250000000 @@ -1113,9 +1217,9 @@ function deleteFolder() { // getBucketLifecycle(); // putBucketLifecycle(); // deleteBucketLifecycle(); +// putBucketVersioning(); // getBucketVersioning(); // listObjectVersions(); -// putBucketVersioning(); // getBucketReplication(); // putBucketReplication(); // deleteBucketReplication(); @@ -1124,9 +1228,19 @@ function deleteFolder() { // deleteBucketWebsite(); // putBucketReferer(); // getBucketReferer(); +// putBucketDomain() +// getBucketDomain() +// deleteBucketDomain() +// putBucketLogging() +// getBucketLogging() +// deleteBucketLogging() +// putBucketInventory() +// getBucketInventory() +// deleteBucketInventory() +// listBucketInventory() // deleteBucket(); -// putObject(); // putObjectCopy(); +// getObjectStream(); // getObject(); // headObject(); // putObjectAcl(); @@ -1135,11 +1249,14 @@ function deleteFolder() { // deleteMultipleObject(); // restoreObject(); // abortUploadTask(); +// selectObjectContentStream(); +// selectObjectContent(); // sliceUploadFile(); +// uploadFiles(); // cancelTask(); // pauseTask(); // restartTask(); -// uploadFiles(); +// putObject(); // sliceCopyFile(); // uploadFolder(); // listFolder(); diff --git a/demo/util.js b/demo/util.js index 7424b9e..9d464d0 100644 --- a/demo/util.js +++ b/demo/util.js @@ -1,7 +1,7 @@ var os = require('os'); var fs = require('fs'); -var platform = os.platform(); +var platform = os.platform(); var createFile = function (filepath, size, callback) { var cb = function (err) { callback && callback(); diff --git a/package.json b/package.json index b62e095..727e907 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "cos-nodejs-sdk-v5", - "version": "2.8.2", + "version": "2.8.3", "description": "cos nodejs sdk v5", "main": "index.js", "scripts": { diff --git a/sdk/advance.js b/sdk/advance.js index 3508277..16c0f52 100644 --- a/sdk/advance.js +++ b/sdk/advance.js @@ -277,6 +277,7 @@ function getUploadIdAndPartList(params, callback) { Region: Region, Key: Key, Headers: util.clone(params.Headers), + Query: util.clone(params.Query), StorageClass: StorageClass, }, params); self.multipartInit(_params, function (err, data) { diff --git a/sdk/base.js b/sdk/base.js index d7a7022..3b282a2 100644 --- a/sdk/base.js +++ b/sdk/base.js @@ -1,6 +1,7 @@ var pkg = require('../package.json'); var REQUEST = require('request'); var mime = require('mime-types'); +var Stream = require('stream'); var util = require('./util'); var fs = require('fs'); @@ -1818,7 +1819,7 @@ function listObjectVersions(params, callback) { * @param {Object} data 为对应的 object 数据,包括 body 和 headers */ function getObject(params, callback) { - var reqParams = {}; + var reqParams = params.Query || {}; reqParams['response-content-type'] = params['ResponseContentType']; reqParams['response-content-language'] = params['ResponseContentLanguage']; @@ -1831,7 +1832,10 @@ function getObject(params, callback) { var self = this; var outputStream = params.Output; - if (outputStream && typeof outputStream === 'string') { + if (params.ReturnStream) { + outputStream = new Stream.PassThrough(); + BodyType = 'stream'; + } else if (outputStream && typeof outputStream === 'string') { outputStream = fs.createWriteStream(outputStream); BodyType = 'stream'; } else if (outputStream && typeof outputStream.pipe === 'function') { @@ -1899,10 +1903,9 @@ function getObject(params, callback) { if (err) { var statusCode = err.statusCode; if (params.Headers['If-Modified-Since'] && statusCode && statusCode === 304) { - return callback(null, { - NotModified: true - }); + return callback(null, {NotModified: true}); } + if (outputStream) outputStream.emit('error', err); return callback(err); } var result = {}; @@ -1922,7 +1925,12 @@ function getObject(params, callback) { }); callback(null, result); }); + if (params.ReturnStream) return outputStream; +} +function getObjectStream(params, callback) { + params.ReturnStream = true; + return getObject.call(this, params, callback); } /** @@ -1977,6 +1985,7 @@ function putObject(params, callback) { Region: params.Region, Key: params.Key, headers: params.Headers, + qs: params.Query, body: params.Body, onProgress: onProgress, }, function (err, data) { @@ -1995,15 +2004,9 @@ function putObject(params, callback) { object: params.Key, }); url = url.substr(url.indexOf('://') + 3); - var result = { - Location: url, - statusCode: data.statusCode, - headers: data.headers, - }; - if (data.headers && data.headers.etag) { - result.ETag = data.headers.etag; - } - return callback(null, result); + data.Location = url; + if (data.headers && data.headers.etag) data.ETag = data.headers.etag; + return callback(null, data); } callback(null, data); }); @@ -2520,6 +2523,117 @@ function deleteObjectTagging(params, callback) { }); } +/** + * 使用 SQL 语句从指定对象(CSV 格式或者 JSON 格式)中检索内容 + * @param {Object} params 参数对象,必须 + * @param {String} params.Bucket Object名称,必须 + * @param {String} params.Region 地域名称,必须 + * @param {Object} params.SelectRequest 地域名称,必须 + * @param {Function} callback 回调函数,必须 + * @return {Object} err 请求失败的错误,如果请求成功,则为空。https://cloud.tencent.com/document/product/436/42998 + * @return {Object} data 返回的数据 + */ +function selectObjectContent(params, callback) { + var SelectType = params['SelectType']; + if (!SelectType) return callback({error: 'missing param SelectType'}); + + var SelectRequest = params['SelectRequest'] || {}; + var xml = util.json2xml({SelectRequest: SelectRequest}); + + var headers = params.Headers; + headers['Content-Type'] = 'application/xml'; + headers['Content-MD5'] = util.binaryBase64(util.md5(xml)); + + var outputStream; + var selectResult = {}; + var SelectStream = require('./select-stream'); + if (params.ReturnStream && params.DataType === 'raw') { // 流 && raw 直接原样数据吐回 + outputStream = new Stream.PassThrough(); + } else { // 包含 params.ReturnStream || !params.ReturnStream + outputStream = new SelectStream(); + outputStream.on('message:progress', function (progress) { + if (typeof params.onProgress === 'function') params.onProgress(progress); + }); + outputStream.on('message:stats', function (stats) { + selectResult.stats = stats; + }); + outputStream.on('message:error', function (error) { + selectResult.error = error; + }); + } + submitRequest.call(this, { + Interface: 'selectObjectContent', + Action: 'name/cos:GetObject', + method: 'POST', + Bucket: params.Bucket, + Region: params.Region, + Key: params.Key, + headers: params.Headers, + action: 'select', + qs: { + 'select-type': params['SelectType'], + }, + VersionId: params.VersionId, + body: xml, + rawBody: true, + outputStream: outputStream, + }, function (err, data) { + if (err && err.statusCode === 204) { + return callback(null, {statusCode: err.statusCode}); + } else if (err) { + if (outputStream) outputStream.emit('error', err); + return callback(err); + } else if (selectResult.error) { + return callback(util.extend(selectResult.error, { + statusCode: data.statusCode, + headers: data.headers, + })); + } + var result = { + statusCode: data.statusCode, + headers: data.headers, + }; + // 只要流里有解析出 stats,就返回 Stats + if (selectResult.stats) result.Stats = selectResult.stats; + // 只要有 records,就返回 PayLoad + if (selectResult.records) result.PayLoad = Buffer.concat(selectResult.records); + callback(null, result); + }); + if (!params.ReturnStream && params.DataType !== 'raw') { + selectResult.records = []; + outputStream.pipe(new Stream.Writable({ + write: function (chunk, encoding, callback) { + selectResult.records.push(chunk); + callback(); + }, + writev: function (chunks, encoding, callback) { + chunks.forEach(function (item) { + selectResult.records.push(chunks); + }); + callback(); + }, + })); + outputStream.pipe(outputStream); + } + if (params.ReturnStream) return outputStream; +} + +/** + * 使用 SQL 语句从指定对象(CSV 格式或者 JSON 格式)中检索内容 + * @param {Object} params 参数对象,必须 + * @param {String} params.Bucket Object名称,必须 + * @param {String} params.Region 地域名称,必须 + * @param {Object} params.SelectRequest 地域名称,必须 + * @param {Function} callback 回调函数,必须 + * @return {Object} err 请求失败的错误,如果请求成功,则为空。https://cloud.tencent.com/document/product/436/42998 + * @return {Object} data 返回的数据 + * @return {Object} Stream 返回值 + */ +function selectObjectContentStream(params, callback) { + params.ReturnStream = true; + return selectObjectContent.call(this, params, callback); +} + // 分块上传 @@ -2560,6 +2674,7 @@ function multipartInit(params, callback) { Key: params.Key, action: 'uploads', headers: params.Headers, + qs: params.Query, }, function (err, data) { if (err) { return callback(err); @@ -3297,7 +3412,10 @@ function submitRequest(params, callback) { } params.AuthData = AuthData; _submitRequest.call(self, params, function (err, data) { - if (err && tryTimes < 2 && (oldClockOffset !== self.options.SystemClockOffset || allowRetry.call(self, err))) { + if (err && + !params.outputStream && + tryTimes < 2 && + (oldClockOffset !== self.options.SystemClockOffset || allowRetry.call(self, err))) { if (params.headers) { delete params.headers.Authorization; delete params.headers['token']; @@ -3407,6 +3525,9 @@ function _submitRequest(params, callback) { } } + // 特殊处理内容到写入流的情况,等待流 finish 后才 callback + if (params.outputStream) callback = util.callbackAfterStreamFinish(params.outputStream, callback); + self.emit('before-send', opt); var sender = REQUEST(opt); var retResponse; @@ -3641,6 +3762,7 @@ var API_MAP = { // Object 相关方法 getObject: getObject, + getObjectStream: getObjectStream, headObject: headObject, listObjectVersions: listObjectVersions, putObject: putObject, @@ -3654,6 +3776,8 @@ var API_MAP = { putObjectTagging: putObjectTagging, getObjectTagging: getObjectTagging, deleteObjectTagging: deleteObjectTagging, + selectObjectContent: selectObjectContent, + selectObjectContentStream: selectObjectContentStream, // 分块上传相关方法 uploadPartCopy: uploadPartCopy, diff --git a/sdk/util.js b/sdk/util.js index 5c34baa..7162c28 100644 --- a/sdk/util.js +++ b/sdk/util.js @@ -445,7 +445,8 @@ var apiWrapper = function (apiName, apiFn) { }; var errMsg = checkParams(); - var isSync = apiName === 'getAuth' || apiName === 'getV4Auth' || apiName === 'getObjectUrl'; + var isSync = apiName === 'getAuth' || apiName === 'getV4Auth' || apiName === 'getObjectUrl' + || apiName.indexOf('Stream') > -1; var Promise = global.Promise; if (!isSync && Promise && !callback) { return new Promise(function (resolve, reject) { @@ -562,6 +563,30 @@ var getSkewTime = function (offset) { return Date.now() + (offset || 0); }; +// 重写 callback,等待流结束后才 callback +var callbackAfterStreamFinish = function (stream, callback) { + if (!stream) return callback; + var err, data, count = 2; + var cb = function (e, d) { + // 如果有数据,且没有错误,清理 设置错误 + if (d && !data || e || err) { + data = d; + } + if (e && !err) { + err = e; + data = null; + } + --count === 0 && callback(err, data); + }; + stream.on('error', function (err) { + cb(err); + }); + stream.on('finish', function () { + cb(); + }); + return cb; +}; + var util = { noop: noop, formatParams: formatParams, @@ -587,6 +612,7 @@ var util = { throttleOnProgress: throttleOnProgress, getFileSize: getFileSize, getSkewTime: getSkewTime, + callbackAfterStreamFinish: callbackAfterStreamFinish, getAuth: getAuth, getV4Auth: getV4Auth, isBrowser: false,