Skip to content

Commit

Permalink
feat: also publish to Events SNS
Browse files Browse the repository at this point in the history
  • Loading branch information
aleortega committed Jul 24, 2024
1 parent 6cc03e2 commit 34ff79a
Show file tree
Hide file tree
Showing 5 changed files with 904 additions and 50 deletions.
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@
"tabWidth": 2
},
"dependencies": {
"@aws-sdk/client-sns": "^3.616.0",
"@dcl/catalyst-storage": "^2.0.3",
"@dcl/schemas": "^6.11.1",
"@dcl/snapshots-fetcher": "^5.0.4",
"@well-known-components/env-config-provider": "^1.1.1",
"@well-known-components/http-server": "^1.1.1",
"@well-known-components/interfaces": "^1.2.0",
"@well-known-components/logger": "^3.1.2",
"@well-known-components/metrics": "^2.0.1",
"aws-sdk": "^2.1503.0"
"@well-known-components/metrics": "^2.0.1"
}
}
90 changes: 45 additions & 45 deletions src/adapters/deployer/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { downloadEntityAndContentFiles } from '@dcl/snapshots-fetcher'
import { IDeployerComponent } from '@dcl/snapshots-fetcher/dist/types'
import { SNS } from 'aws-sdk'
import { PublishCommand, SNSClient } from '@aws-sdk/client-sns'
import { AppComponents } from '../../types'
import { DeploymentToSqs } from '@dcl/schemas/dist/misc/deployments-to-sqs'

Expand All @@ -9,72 +9,72 @@ export function createDeployerComponent(
): IDeployerComponent {
const logger = components.logs.getLogger('downloader')

const sns = new SNS()
const client = new SNSClient({
endpoint: components.sns.optionalSnsEndpoint
})

return {
async deployEntity(entity, servers) {
const markAsDeployed = entity.markAsDeployed ? entity.markAsDeployed : async () => {}
try {
if (entity.entityType === 'scene' || entity.entityType === 'wearable' || entity.entityType === 'emote') {
const exists = await components.storage.exist(entity.entityId)
const exists = await components.storage.exist(entity.entityId)

if (!exists) {
await components.downloadQueue.onSizeLessThan(1000)
if (!exists) {
await components.downloadQueue.onSizeLessThan(1000)

void components.downloadQueue.scheduleJob(async () => {
logger.info('Downloading entity', {
entityId: entity.entityId,
entityType: entity.entityType,
servers: servers.join(',')
})
void components.downloadQueue.scheduleJob(async () => {
logger.info('Downloading entity', {
entityId: entity.entityId,
entityType: entity.entityType,
servers: servers.join(',')
})

await downloadEntityAndContentFiles(
{ ...components, fetcher: components.fetch },
entity.entityId,
servers,
new Map(),
'content',
10,
1000
)
await downloadEntityAndContentFiles(
{ ...components, fetcher: components.fetch },
entity.entityId,
servers,
new Map(),
'content',
10,
1000
)

logger.info('Entity stored', { entityId: entity.entityId, entityType: entity.entityType })
logger.info('Entity stored', { entityId: entity.entityId, entityType: entity.entityType })

const deploymentToSqs: DeploymentToSqs = {
entity,
contentServerUrls: servers
}
// send sns
const deploymentToSqs: DeploymentToSqs = {
entity,
contentServerUrls: servers
}
// send sns
if (entity.entityType === 'scene' || entity.entityType === 'wearable' || entity.entityType === 'emote') {
if (components.sns.arn) {
const receipt = await sns
.publish({
const receipt = await client.send(
new PublishCommand({
TopicArn: components.sns.arn,
Message: JSON.stringify(deploymentToSqs)
})
.promise()
)
logger.info('Notification sent', {
MessageId: receipt.MessageId as any,
SequenceNumber: receipt.SequenceNumber as any
})
}
}

if (components.sns.eventArn) {
const receipt = await sns
.publish({
TopicArn: components.sns.eventArn,
Message: JSON.stringify(deploymentToSqs)
})
.promise()
logger.info('Notification sent to events SNS', {
MessageId: receipt.MessageId as any,
SequenceNumber: receipt.SequenceNumber as any
if (components.sns.eventArn) {
const receipt = await client.send(
new PublishCommand({
TopicArn: components.sns.eventArn,
Message: JSON.stringify(deploymentToSqs)
})
}
await markAsDeployed()
})
} else {
)
logger.info('Notification sent to events SNS', {
MessageId: receipt.MessageId as any,
SequenceNumber: receipt.SequenceNumber as any
})
}
await markAsDeployed()
}
})
} else {
await markAsDeployed()
}
Expand Down
4 changes: 3 additions & 1 deletion src/components.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ export async function initComponents(): Promise<AppComponents> {
const bucket = await config.getString('BUCKET')
const snsArn = await config.getString('SNS_ARN')
const eventSnsArn = await config.getString('EVENTS_SNS_ARN')
const optionalSnsEndpoint = await config.getString('SNS_ENDPOINT')

const storage = bucket
? await createAwsS3BasedFileSystemContentStorage({ fs, config }, bucket)
Expand All @@ -48,7 +49,8 @@ export async function initComponents(): Promise<AppComponents> {

const sns: SnsComponent = {
arn: snsArn,
eventArn: eventSnsArn
eventArn: eventSnsArn,
optionalSnsEndpoint
}

const deployer = createDeployerComponent({ storage, downloadQueue, fetch, logs, metrics, sns })
Expand Down
2 changes: 1 addition & 1 deletion src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ export type BaseComponents = {
sns: SnsComponent
}

export type SnsComponent = { arn?: string; eventArn?: string }
export type SnsComponent = { arn?: string; eventArn?: string; optionalSnsEndpoint?: string }

// components used in runtime
export type AppComponents = BaseComponents & {
Expand Down
Loading

0 comments on commit 34ff79a

Please sign in to comment.