Skip to content

Commit 69ab918

Browse files
authored
Merge pull request #186 from annexwu/feature/Nodejs12.16-COSAnalyzeInventory
添加 Nodejs12.16-COSAnalyzeInventory 函数模板
2 parents 1130f41 + 6041b03 commit 69ab918

File tree

4,605 files changed

+1711955
-0
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

4,605 files changed

+1711955
-0
lines changed
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
{
2+
"serverless-cloud-function-application": {
3+
"Chinese": {
4+
"name": "COS 清单文件简易分析",
5+
"description": "本函数将源 bucket 中的 COS 清单文件进行简易分析,处理后将结果上传到指定 bucket",
6+
"attention": "该函数模板仅供 COS 控制台使用,请勿自行创建",
7+
"author": {
8+
"name": "腾讯云"
9+
}
10+
},
11+
"English": {
12+
"name": "CosAnalyzeInventory",
13+
"description": "This function will download cos inventory files from source bucket, analyze them, and upload result file to target bucket",
14+
"attention": "This function template is only provided to COS console, please do not use it by yourself",
15+
"author": {
16+
"name": "Tencent Cloud"
17+
}
18+
},
19+
"runtime": "Nodejs12.16",
20+
"readme": "https://github.com/tencentyun/serverless-demo/tree/master/Nodejs12.16-COSAnalyzeInventory",
21+
"version": "1.0.8",
22+
"tags": ["Nodejs12.16", "analyze inventory", "cos analyze inventory"]
23+
}
24+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
component: scf
2+
name: ap-guangzhou_default_CosAnalyzeInventory
3+
inputs:
4+
name: CosAnalyzeInventory
5+
src:
6+
src: ./src
7+
handler: index.main_handler
8+
runtime: Nodejs12.16
9+
namespace: default
10+
region: ap-guangzhou
11+
memorySize: 512
12+
timeout: 86400
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
/* eslint-disable no-param-reassign */
2+
const { streamPipelinePromise } = require('./utils');
3+
const { Readable, PassThrough } = require('stream');
4+
const Input = require('./Input/index');
5+
const Filter = require('./Filter/index');
6+
const Output = require('./Output/index');
7+
8+
class CosAnalyzeInventoryProcessTask {
9+
constructor({
10+
cosSdkInstance,
11+
cosUpload,
12+
bucket,
13+
region,
14+
key,
15+
analyzeConfig,
16+
sourceList,
17+
}) {
18+
const analyzeConfigColumns = analyzeConfig.columns.map(item => item.key);
19+
sourceList = sourceList.map((item) => {
20+
const {
21+
type,
22+
fileFormat = 'CSV',
23+
fileSchema,
24+
fileHeaderInfo = 'NONE',
25+
fieldDelimiter,
26+
} = item;
27+
let inputConfig = {};
28+
if (fileFormat === 'CSV') {
29+
let columns = [];
30+
if (fileSchema) {
31+
columns = fileSchema
32+
.split(',')
33+
.map(key => key.trim())
34+
.map(key => (key === 'Key' ? 'EncodedKey' : key));
35+
} else {
36+
columns = analyzeConfigColumns;
37+
}
38+
const unsetKeys = analyzeConfigColumns.filter(key => !columns.includes(key)
39+
&& !['Prefix', 'PrefixDepth', 'Key', 'EncodedKey'].includes(key));
40+
if (unsetKeys.length > 0) {
41+
throw new Error(`${unsetKeys.join(', ')} has not found in file`);
42+
}
43+
if (analyzeConfig.inputExtractor === 'CosSelect') {
44+
inputConfig = {
45+
extractor: 'CosSelect',
46+
params: {
47+
Expression: `Select ${columns
48+
.map((item, index) => `_${index + 1} as ${item === 'Key' ? 'EncodedKey' : item}`)
49+
.join(', ')} from COSObject`,
50+
ExpressionType: 'SQL',
51+
InputSerialization: {
52+
CompressionType: type === 'gzip' ? 'GZIP' : 'NONE',
53+
CSV: {
54+
FileHeaderInfo: fileHeaderInfo,
55+
RecordDelimiter: '\n',
56+
FieldDelimiter:
57+
fieldDelimiter || analyzeConfig.colDelimiter || ',',
58+
QuoteCharacter: '"',
59+
QuoteEscapeCharacter: '"',
60+
AllowQuotedRecordDelimiter: 'TRUE',
61+
},
62+
},
63+
OutputSerialization: {
64+
JSON: {
65+
RecordDelimiter: '\n',
66+
},
67+
},
68+
...(analyzeConfig.cosSelect || {}),
69+
},
70+
};
71+
} else {
72+
inputConfig = {
73+
extractor: 'CosNormalCsv',
74+
params: {
75+
InputSerialization: {
76+
RecordDelimiter: '\n',
77+
FieldDelimiter:
78+
fieldDelimiter || analyzeConfig.colDelimiter || ',',
79+
Columns: analyzeConfig.columns || columns.map(key => ({ key })),
80+
CompressionType: type === 'gzip' ? 'GZIP' : 'NONE',
81+
},
82+
},
83+
};
84+
}
85+
}
86+
return {
87+
...item,
88+
inputConfig,
89+
};
90+
});
91+
Object.assign(this, {
92+
cosSdkInstance,
93+
cosUpload,
94+
bucket,
95+
region,
96+
key,
97+
analyzeConfig,
98+
sourceList,
99+
});
100+
}
101+
async runTask() {
102+
const input = new Input({
103+
cosSdkInstance: this.cosSdkInstance,
104+
sourceList: this.sourceList,
105+
});
106+
const readStream = input.getReadStream();
107+
const filter = new Filter({
108+
filter: 'InventoryAnalyzeTransformStream',
109+
params: {
110+
...this.analyzeConfig,
111+
destroySource: () => readStream.end(),
112+
},
113+
});
114+
const output = new Output({
115+
cosUpload: this.cosUpload,
116+
consumer: 'COS',
117+
params: {
118+
objectConfig: {
119+
Bucket: this.bucket,
120+
Region: this.region,
121+
Key: this.key,
122+
},
123+
extraConfig: {},
124+
},
125+
});
126+
const passThrough = new PassThrough();
127+
await this.targetSetHeader(passThrough);
128+
const { streamList, promiseList } = output.getWriteStreamListAndPromiseList();
129+
const result = await Promise.all([
130+
streamPipelinePromise([
131+
readStream,
132+
filter.getTransformStream(),
133+
passThrough,
134+
]),
135+
streamPipelinePromise([passThrough, ...streamList]),
136+
...promiseList,
137+
]);
138+
return result.pop();
139+
}
140+
async targetSetHeader(passThrough) {
141+
const {
142+
select,
143+
targetRowDelimiter = '\n',
144+
targetColDelimiter = ',',
145+
targetSetHeader = true,
146+
} = this.analyzeConfig;
147+
if (!targetSetHeader) {
148+
return;
149+
}
150+
const headerStr = select.map(({ label, key }) => label || key).join(targetColDelimiter)
151+
+ targetRowDelimiter;
152+
Readable.from([headerStr]).pipe(passThrough, { end: false });
153+
}
154+
}
155+
156+
module.exports = CosAnalyzeInventoryProcessTask;
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
/* eslint-disable arrow-body-style */
2+
/* eslint-disable no-unused-vars */
3+
/* eslint-disable no-param-reassign */
4+
const ScfInvokeTask = require('./ScfInvokeTask');
5+
const CosAnalyzeInventoryProcessTask = require('./CosAnalyzeInventoryProcessTask');
6+
const {
7+
validateAnalyzeConfig,
8+
fixAnalyzeConfig,
9+
} = require('./Filter/utils/analyzeUtils');
10+
11+
class CosAnalyzeInventoryTask {
12+
constructor({
13+
cosSdkInstance,
14+
scfSdkInstance,
15+
cosUpload,
16+
parentRequestId,
17+
context,
18+
functionName,
19+
sourceList,
20+
bucket,
21+
region,
22+
key,
23+
prefix,
24+
preAnalyzeConfig,
25+
analyzeConfig,
26+
}) {
27+
Object.assign(this, {
28+
cosSdkInstance,
29+
scfSdkInstance,
30+
cosUpload,
31+
parentRequestId,
32+
context,
33+
functionName,
34+
sourceList,
35+
bucket,
36+
region,
37+
key: key || `${prefix}${functionName}/${context.request_id}/result.csv`,
38+
prefix,
39+
preAnalyzeConfig: preAnalyzeConfig
40+
? fixAnalyzeConfig(preAnalyzeConfig)
41+
: preAnalyzeConfig,
42+
analyzeConfig: fixAnalyzeConfig(analyzeConfig),
43+
});
44+
}
45+
async runTask() {
46+
let result;
47+
let error;
48+
try {
49+
let { sourceList } = this;
50+
if (
51+
!this.parentRequestId
52+
&& this.preAnalyzeConfig
53+
&& this.preAnalyzeConfig.columns
54+
&& this.preAnalyzeConfig.columns.length
55+
) {
56+
const partNumber = 20;
57+
const scfInvokeTask = new ScfInvokeTask({
58+
scfSdkInstance: this.scfSdkInstance,
59+
parallel: partNumber,
60+
paramsList: this.getSplitList({
61+
sourceList,
62+
partNumber,
63+
}).map((items, index) => ({
64+
Region: this.context.tencentcloud_region,
65+
FunctionName: this.context.function_name,
66+
Namespace: this.context.namespace,
67+
InvocationType: 'Event',
68+
Qualifier: '$DEFAULT',
69+
ClientContext: JSON.stringify({
70+
bucket: this.bucket,
71+
region: this.region,
72+
key: `${this.prefix}${this.functionName}/${this.context.request_id}/tmp/${index}`,
73+
analyzeConfig: {
74+
...this.preAnalyzeConfig,
75+
targetSetHeader: false,
76+
},
77+
sourceList: items,
78+
parentRequestId: this.context.request_id,
79+
}),
80+
})),
81+
});
82+
const scfInvokeTaskResults = await scfInvokeTask.runTask();
83+
sourceList = scfInvokeTaskResults.map(({ params }) => {
84+
const { ClientContext } = params;
85+
const { bucket, region, key } = JSON.parse(ClientContext);
86+
return {
87+
bucket,
88+
region,
89+
key,
90+
url: this.cosSdkInstance.getObjectUrl({
91+
Bucket: bucket,
92+
Region: region,
93+
Key: key,
94+
Sign: false,
95+
}),
96+
fileFormat: 'CSV',
97+
fileSchema: this.analyzeConfig.columns
98+
.map(item => item.key)
99+
.join(', '),
100+
};
101+
});
102+
}
103+
const cosAnalyzeInventoryProcessTask = new CosAnalyzeInventoryProcessTask({
104+
cosSdkInstance: this.cosSdkInstance,
105+
cosUpload: this.cosUpload,
106+
bucket: this.bucket,
107+
region: this.region,
108+
key: this.key,
109+
analyzeConfig: this.analyzeConfig,
110+
sourceList,
111+
});
112+
result = await cosAnalyzeInventoryProcessTask.runTask();
113+
} catch (err) {
114+
error = err;
115+
}
116+
return {
117+
params: {
118+
parentRequestId: this.parentRequestId,
119+
context: this.context,
120+
functionName: this.functionName,
121+
sourceListLength: this.sourceList.length,
122+
bucket: this.bucket,
123+
region: this.region,
124+
key: this.key,
125+
prefix: this.prefix,
126+
preAnalyzeConfig: this.preAnalyzeConfig,
127+
analyzeConfig: this.analyzeConfig,
128+
},
129+
result,
130+
error,
131+
};
132+
}
133+
getSplitList({ sourceList, partNumber = 10 }) {
134+
if (sourceList.length <= partNumber) {
135+
return sourceList.map(item => [item]);
136+
}
137+
const list = [...sourceList];
138+
const results = Array(partNumber)
139+
.fill()
140+
.map(() => []);
141+
for (let i = 0, len = list.length; i < len; i++) {
142+
const item = list[i];
143+
results[i % partNumber].push(item);
144+
}
145+
return results;
146+
}
147+
}
148+
149+
module.exports = CosAnalyzeInventoryTask;
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
const InventoryAnalyzeTransformStream = require('./types/InventoryAnalyzeTransformStream');
2+
const CustomTransformStream = require('./types/CustomTransformStream');
3+
const { getCamelCaseData } = require('../utils');
4+
5+
const { PassThrough } = require('stream');
6+
7+
class Filter {
8+
constructor({ filter = 'PassThrough', params = {} }) {
9+
Object.assign(this, {
10+
filter,
11+
params,
12+
camelCaseParams: getCamelCaseData(params),
13+
});
14+
}
15+
getTransformStream() {
16+
const { filter } = this;
17+
if (filter === 'PassThrough') {
18+
return new PassThrough({
19+
readableObjectMode: true,
20+
writableObjectMode: true,
21+
});
22+
}
23+
if (filter === 'Custom') {
24+
return new CustomTransformStream(this.params);
25+
}
26+
if (filter === 'InventoryAnalyzeTransformStream') {
27+
return new InventoryAnalyzeTransformStream(this.params);
28+
}
29+
throw new Error(`unknown filter: ${filter}`);
30+
}
31+
}
32+
33+
module.exports = Filter;

0 commit comments

Comments
 (0)