Skip to content

Commit

Permalink
remove batch size limit and use buffers (#2627)
Browse files Browse the repository at this point in the history
* remove batch size limit and use buffers

* fix test

* add proper error handling on the put call

* add tests for isAWSError

* use APIError

* set default batch size to 100k

* fix generated types
  • Loading branch information
peterdemartini authored Dec 12, 2024
1 parent f455c37 commit 98504f7
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 40 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import { isAWSError } from '../client'
import { _Error as AWSError } from '@aws-sdk/client-s3'

describe('isAWSError', () => {
it('should return true for a valid AWS error', () => {
const error: AWSError = {
Code: 'AccessDenied',
Message: 'Access Denied'
}
expect(isAWSError(error)).toBe(true)
})

it('should return false for a non-AWS error', () => {
const error = new Error('Some other error')
expect(isAWSError(error)).toBe(false)
})

it('should return false for an object without Code and Message properties', () => {
const error = { name: 'SomeError', message: 'Some error message' }
expect(isAWSError(error)).toBe(false)
})

it('should return false for null', () => {
expect(isAWSError(null)).toBe(false)
})

it('should return false for undefined', () => {
expect(isAWSError(undefined)).toBe(false)
})

it('should return false for a string', () => {
expect(isAWSError('Some error')).toBe(false)
})

it('should return false for a number', () => {
expect(isAWSError(123)).toBe(false)
})

it('should return false for an object without Code property', () => {
const error = { Message: 'Some error message' }
expect(isAWSError(error)).toBe(false)
})

it('should return false for an object without Message property', () => {
const error = { Code: 'SomeError' }
expect(isAWSError(error)).toBe(false)
})
})
38 changes: 34 additions & 4 deletions packages/destination-actions/src/destinations/s3/client.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { Settings } from './generated-types'
import { STSClient, AssumeRoleCommand } from '@aws-sdk/client-sts'
import { S3Client, PutObjectCommandInput, PutObjectCommand } from '@aws-sdk/client-s3'
import { S3Client, PutObjectCommandInput, PutObjectCommand, _Error as AWSError } from '@aws-sdk/client-s3'
import { v4 as uuidv4 } from '@lukeed/uuid'
import * as process from 'process'
import { ErrorCodes, IntegrationError } from '@segment/actions-core'
import { ErrorCodes, IntegrationError, RetryableError, APIError } from '@segment/actions-core'
import { Credentials } from './types'

export class Client {
Expand Down Expand Up @@ -53,7 +53,7 @@ export class Client {

async uploadS3(
settings: Settings,
fileContent: string,
fileContent: string | Buffer,
filename_prefix: string,
s3_aws_folder_name: string,
fileExtension: string
Expand Down Expand Up @@ -100,7 +100,37 @@ export class Client {
await s3Client.send(new PutObjectCommand(uploadParams))
return { statusCode: 200, message: 'Upload successful' }
} catch (err) {
throw new Error(`Non-retryable error: ${err}`)
if (isAWSError(err)) {
// https://docs.aws.amazon.com/AWSJavaScriptSDK/v3/latest/Package/-aws-sdk-client-s3/Interface/_Error/
if (err.Code && accessDeniedCodes.has(err.Code)) {
throw new APIError(err.Message || err.Code, 403)
} else if (err.Code === 'NoSuchBucket') {
throw new APIError(err.Message || err.Code, 404)
} else if (err.Code === 'SlowDown') {
throw new APIError(err.Message || err.Code, 429)
} else {
throw new RetryableError(err.Message || err.Code || 'Unknown AWS Put error: ' + err)
}
} else {
throw new APIError('Unknown error during AWS PUT: ' + err, 500)
}
}
}
}

const accessDeniedCodes = new Set([
'AccessDenied',
'AccountProblem',
'AllAccessDisabled',
'InvalidAccessKeyId',
'InvalidSecurity',
'NotSignedUp',
'AmbiguousGrantByEmailAddress',
'AuthorizationHeaderMalformed',
'RequestExpired'
])

// isAWSError validates that the error is an generic AWS error
export function isAWSError(err: unknown): err is AWSError {
return typeof err === 'object' && err !== null && 'Code' in err && 'Message' in err
}
5 changes: 2 additions & 3 deletions packages/destination-actions/src/destinations/s3/fields.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,10 @@ export const commonFields: ActionDefinition<Settings>['fields'] = {
},
batch_size: {
label: 'Batch Size',
description:
'Maximum number of events to include in each batch. Actual batch sizes may be lower. Max batch size is 10000.',
description: 'Maximum number of events to include in each batch. Actual batch sizes may be lower.',
type: 'number',
required: false,
default: 5000
default: 100_000
},
s3_aws_folder_name: {
label: 'AWS Subfolder Name',
Expand Down
51 changes: 21 additions & 30 deletions packages/destination-actions/src/destinations/s3/functions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,12 @@ import { Payload } from './syncToS3/generated-types'
import { Settings } from './generated-types'
import { Client } from './client'
import { RawMapping, ColumnHeader } from './types'
import { IntegrationError } from '@segment/actions-core'

export async function send(payloads: Payload[], settings: Settings, rawMapping: RawMapping) {
const batchSize = payloads[0] && typeof payloads[0].batch_size === 'number' ? payloads[0].batch_size : 0
const delimiter = payloads[0]?.delimiter
const actionColName = payloads[0]?.audience_action_column_name
const batchColName = payloads[0]?.batch_size_column_name

const maxBatchSize = 100_000
if (batchSize > maxBatchSize) {
throw new IntegrationError(`Batch size cannot exceed ${maxBatchSize}`, 'Invalid Payload', 400)
}

const headers: ColumnHeader[] = Object.entries(rawMapping.columns)
.filter(([_, value]) => value !== '')
.map(([column]) => {
Expand Down Expand Up @@ -49,15 +42,13 @@ export function clean(delimiter: string, str?: string) {
return delimiter === 'tab' ? str : str.replace(delimiter, '')
}

function processField(row: string[], value: unknown | undefined) {
row.push(
encodeString(
value === undefined || value === null
? ''
: typeof value === 'object'
? String(JSON.stringify(value))
: String(value)
)
function processField(value: unknown | undefined): string {
return encodeString(
value === undefined || value === null
? ''
: typeof value === 'object'
? String(JSON.stringify(value))
: String(value)
)
}

Expand All @@ -67,26 +58,26 @@ export function generateFile(
delimiter: string,
actionColName?: string,
batchColName?: string
): string {
const rows: string[] = []
rows.push(`${headers.map((header) => header.cleanName).join(delimiter === 'tab' ? '\t' : delimiter)}\n`)

payloads.forEach((payload, index) => {
): Buffer {
const rows = payloads.map((payload, index) => {
const isLastRow = index === payloads.length - 1
const row: string[] = []
headers.forEach((header) => {
const row = headers.map((header): string => {
if (header.originalName === actionColName) {
processField(row, getAudienceAction(payload))
} else if (header.originalName === batchColName) {
processField(row, payloads.length)
} else {
processField(row, payload.columns[header.originalName])
return processField(getAudienceAction(payload))
}
if (header.originalName === batchColName) {
return processField(payloads.length)
}
return processField(payload.columns[header.originalName])
})

rows.push(`${row.join(delimiter === 'tab' ? '\t' : delimiter)}${isLastRow ? '' : '\n'}`)
return Buffer.from(`${row.join(delimiter === 'tab' ? '\t' : delimiter)}${isLastRow ? '' : '\n'}`)
})
return rows.join('')

return Buffer.concat([
Buffer.from(`${headers.map((header) => header.cleanName).join(delimiter === 'tab' ? '\t' : delimiter)}\n`),
...rows
])
}

export function encodeString(str: string) {
Expand Down
2 changes: 1 addition & 1 deletion packages/destination-actions/src/destinations/s3/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ const destination: DestinationDefinition<Settings> = {
},
extendRequest() {
return {
timeout: Math.max(60_000, DEFAULT_REQUEST_TIMEOUT)
timeout: Math.max(30_000, DEFAULT_REQUEST_TIMEOUT)
}
},
actions: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,6 @@ describe('generateFile', () => {

it('should generate a CSV file with correct content', () => {
const result = generateFile(payloads, headers, ',', 'audience_action', 'batch_size')
expect(result).toEqual(output)
expect(result.toString()).toEqual(output)
})
})

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 98504f7

Please sign in to comment.