Skip to content

Commit

Permalink
updates from @Gozala's review
Browse files Browse the repository at this point in the history
rework the indexes of the subscription and consumer tables per his guidance in storacha/w3up#746 (comment)
  • Loading branch information
travis committed Jun 2, 2023
1 parent 3fc807d commit 8663aa2
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 44 deletions.
40 changes: 31 additions & 9 deletions upload-api/stores/provisions.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
import { ConflictError as ConsumerConflictError } from '../tables/consumer.js'
import { ConflictError as SubscriptionConflictError } from '../tables/subscription.js'
import { CBOR, Failure } from '@ucanto/server'

/**
* Create a subscription ID for a given provision. Currently
* uses the CID of `customer` which ensures each customer
* will get at most one subscription. This can be relaxed (ie,
* by deriving subscription ID from customer AND consumer) in the future
* or by other providers for flexibility.
*
* @param {import('@web3-storage/upload-api').Provision} item
*/
export const createProvisionSubscriptionId = async ({ customer }) =>
(await CBOR.write(customer)).cid.toString()

/**
* @param {import('../types').SubscriptionTable} subscriptionTable
Expand All @@ -18,10 +31,11 @@ export function useProvisionStore (subscriptionTable, consumerTable, services) {
const { cause, consumer, customer, provider } = item
// by setting subscription to customer we make it so each customer can have at most one subscription
// TODO is this what we want?
const subscription = customer
const { cid } = await CBOR.write({ customer })
const subscription = cid.toString()

try {
await subscriptionTable.insert({
await subscriptionTable.add({
cause: cause.cid,
provider,
customer,
Expand All @@ -31,26 +45,34 @@ export function useProvisionStore (subscriptionTable, consumerTable, services) {
// if we got a conflict error, ignore - it means the subscription already exists and
// can be used to create a consumer/provider relationship below
if (!(error instanceof SubscriptionConflictError)) {
throw error
return {
error: new Failure('Unknown error adding subscription', {
cause: error
})
}
}
}

try {
await consumerTable.insert({
await consumerTable.add({
cause: cause.cid,
provider,
consumer,
subscription
})
return { ok: {} }
} catch (error) {
if (error instanceof ConsumerConflictError) {
return {
return (error instanceof ConsumerConflictError) ? (
{
error
}
} else {
throw error
}
) : (
{
error: new Failure('Unknown error adding consumer', {
cause: error
})
}
)
}
},

Expand Down
11 changes: 6 additions & 5 deletions upload-api/tables/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import { marshall } from '@aws-sdk/util-dynamodb'
* @typedef {import('../types').ConsumerTable} ConsumerTable
* @typedef {import('../types').UnstableConsumerTable} UnstableConsumerTable
* @typedef {import('../types').ConsumerInput} ConsumerInput
* @typedef {import('../types').Consumer} Consumer
*/

/**
Expand Down Expand Up @@ -54,9 +53,9 @@ export function useConsumerTable (dynamoDb, tableName) {
* Record the fact that a consumer is consuming a provider via a subscription
*
* @param {ConsumerInput} item
* @returns {Promise<Consumer>}
* @returns {Promise<{}>}
*/
insert: async ({ consumer, provider, subscription, cause }) => {
add: async ({ consumer, provider, subscription, cause }) => {
const insertedAt = new Date().toISOString()

const row = {
Expand All @@ -71,7 +70,7 @@ export function useConsumerTable (dynamoDb, tableName) {
await dynamoDb.send(new PutItemCommand({
TableName: tableName,
Item: marshall(row),
ConditionExpression: `attribute_not_exists(consumer) OR ((cause = :cause) AND (consumer = :consumer) AND (provider = :provider) AND (subscription = :subscription))`,
ConditionExpression: `(attribute_not_exists(subscription) AND attribute_not_exists(provider)) OR ((cause = :cause) AND (consumer = :consumer) AND (provider = :provider) AND (subscription = :subscription))`,
ExpressionAttributeValues: {
':cause': { 'S': row.cause },
':consumer': { 'S': row.consumer },
Expand All @@ -82,7 +81,7 @@ export function useConsumerTable (dynamoDb, tableName) {
return {}
} catch (error) {
const error_ = error instanceof Error && error.message === 'The conditional request failed' ? new ConflictError({
message: `Space ${row.consumer} cannot be provisioned with ${row.provider}: it already has a provider`
message: `Space ${row.consumer} cannot be provisioned with ${row.subscription} and ${row.provider}: that subscription is already in use`
}) : error;
throw error_;
}
Expand All @@ -91,6 +90,7 @@ export function useConsumerTable (dynamoDb, tableName) {
hasStorageProvider: async (consumer) => {
const cmd = new QueryCommand({
TableName: tableName,
IndexName: 'consumer',
KeyConditions: {
consumer: {
ComparisonOperator: 'EQ',
Expand Down Expand Up @@ -141,6 +141,7 @@ export function useConsumerTable (dynamoDb, tableName) {
findSubscriptionsForConsumer: async (consumer) => {
const cmd = new QueryCommand({
TableName: tableName,
IndexName: 'consumer',
KeyConditions: {
consumer: {
ComparisonOperator: 'EQ',
Expand Down
28 changes: 18 additions & 10 deletions upload-api/tables/delegations.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
} from '@web3-storage/access/encoding'
// eslint-disable-next-line no-unused-vars
import * as Ucanto from '@ucanto/interface'
import { Failure } from '@ucanto/server'
import { CAR, Delegation, parseLink } from '@ucanto/core'
import {
NoInvocationFoundForGivenCidError,
Expand Down Expand Up @@ -53,7 +54,7 @@ export function createDelegationsTable (region, tableName, { bucket, invocationB
* @param {import('../types').WorkflowBucket} deps.workflowBucket
* @returns {import('@web3-storage/upload-api').DelegationsStorage}
*/
export function useDelegationsTable (dynamoDb, tableName, { bucket, invocationBucket, workflowBucket} ) {
export function useDelegationsTable (dynamoDb, tableName, { bucket, invocationBucket, workflowBucket }) {
return {
putMany: async (cause, delegations) => {
if (delegations.length === 0) {
Expand Down Expand Up @@ -118,12 +119,18 @@ export function useDelegationsTable (dynamoDb, tableName, { bucket, invocationBu
if (result.ok) {
delegations.push(result.ok)
} else {
console.warn(`could not find delegation ${delegationCid} from invocation ${invocationCid}`)
// TODO: should we do anything else here?
return { error: new Failure(`could not find delegation ${delegationCid} from invocation ${invocationCid}`) }
}
} else {
// otherwise, we'll try to find the delegation in the R2 bucket we used to stash them in
delegations.push(await cidToDelegation(bucket, delegationCid))
const result = await cidToDelegation(bucket, delegationCid)
if (result.ok){
delegations.push(result.ok)
} else {
return {error: new Failure(`failed to get delegation ${delegationCid} from legacy delegations bucket`, {
cause: result.error
})}
}
}
}
return {
Expand All @@ -136,33 +143,34 @@ export function useDelegationsTable (dynamoDb, tableName, { bucket, invocationBu
/**
* @param {Ucanto.Link} cause
* @param {Delegation} d
* @returns {{cause: string, link: string, audience: string, issuer: string}}}
* @returns {{cause: string, link: string, audience: string, issuer: string, expiration: number}}}
*/
function createDelegationItem (cause, d) {
return {
cause: cause.toString(),
link: d.cid.toString(),
audience: d.audience.did(),
issuer: d.issuer.did(),
expiration: d.expiration
}
}

/**
* @param {import('../types').DelegationsBucket} bucket
* @param {Ucanto.Link} cid
* @returns {Promise<Ucanto.Delegation>}
* @returns {Promise<Ucanto.Result<Ucanto.Delegation, Ucanto.Failure>>}
*/
async function cidToDelegation (bucket, cid) {
const delegationCarBytes = await bucket.get(/** @type {import('multiformats/cid').CID} */ (cid))
const delegationCarBytes = await bucket.get(/** @type {import('multiformats/cid').CID} */(cid))
if (!delegationCarBytes) {
throw new Error(`failed to read car bytes for cid ${cid.toString(base32)}`)
return { error: new Failure(`failed to read car bytes for cid ${cid.toString(base32)}`) }
}
const delegations = bytesToDelegations(delegationCarBytes)
const delegation = delegations.find((d) => d.cid.equals(cid))
if (!delegation) {
throw new Error(`failed to parse delegation with expected cid ${cid.toString(base32)}`)
return { error: new Failure(`failed to parse delegation with expected cid ${cid.toString(base32)}`) }
}
return delegation
return { ok: delegation }
}

/**
Expand Down
10 changes: 5 additions & 5 deletions upload-api/tables/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ export const delegationTableProps = {
link: 'string', // `baf...x` (CID of the delegation)
audience: 'string', // `did:web:service`
issuer: 'string', // `did:key:agent`
expiration: 'string', // `9256939505` (unix timestamp)
expiration: 'number', // `9256939505` (unix timestamp in seconds)
insertedAt: 'string', // `2022-12-24T...`
updatedAt: 'string', // `2022-12-24T...`
},
Expand All @@ -53,10 +53,10 @@ export const subscriptionTableProps = {
insertedAt: 'string', // `2022-12-24T...`
updatedAt: 'string', // `2022-12-24T...`
},
// TODO does this index setup seem right?
primaryIndex: { partitionKey: 'customer', sortKey: 'subscription' },
primaryIndex: { partitionKey: 'subscription', sortKey: 'provider' },
globalIndexes: {
customerProvider: { partitionKey: 'customer', sortKey: 'provider', projection: ['cause', 'subscription'] },
customer: { partitionKey: 'customer', projection: ['subscription']},
// TODO: I don't think we should keep this index - partitioning by provider is basically useless and won't be much faster than a table scan
provider: { partitionKey: 'provider', projection: ['customer'] },
subscription: { partitionKey: 'subscription', projection: ['customer'] }
Expand All @@ -73,9 +73,9 @@ export const consumerTableProps = {
insertedAt: 'string', // `2022-12-24T...`
updatedAt: 'string', // `2022-12-24T...`
},
// TODO does this index setup seem right?
primaryIndex: { partitionKey: 'consumer', sortKey: 'subscription' },
primaryIndex: { partitionKey: 'subscription', sortKey: 'provider' },
globalIndexes: {
consumer: { partitionKey: 'consumer', projection: ['provider', 'subscription']},
// TODO: I don't think we should keep this index - partitioning by provider is basically useless and won't be much faster than a table scan
provider: { partitionKey: 'provider', projection: ['consumer'] }
}
Expand Down
6 changes: 3 additions & 3 deletions upload-api/tables/subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import { marshall, unmarshall } from '@aws-sdk/util-dynamodb'
* @typedef {import('../types').SubscriptionTable} SubscriptionTable
* @typedef {import('../types').UnstableSubscriptionTable} UnstableSubscriptionTable
* @typedef {import('../types').SubscriptionInput} SubscriptionInput
* @typedef {import('../types').Subscription} Subscription
*/

export class ConflictError extends Failure {
Expand Down Expand Up @@ -53,9 +52,9 @@ export function useSubscriptionTable (dynamoDb, tableName) {
* Record the fact that a subscription is consuming a provider via a subscription
*
* @param {SubscriptionInput} item
* @returns {Promise<Subscription>}
* @returns {Promise<{}>}
*/
insert: async ({ customer, provider, subscription, cause }) => {
add: async ({ customer, provider, subscription, cause }) => {
const insertedAt = new Date().toISOString()

const item = {
Expand Down Expand Up @@ -152,6 +151,7 @@ export function useSubscriptionTable (dynamoDb, tableName) {
findSubscriptionsForCustomer: async (customer) => {
const cmd = new QueryCommand({
TableName: tableName,
IndexName: 'customer',
KeyConditions: {
customer: {
ComparisonOperator: 'EQ',
Expand Down
4 changes: 2 additions & 2 deletions upload-api/test/queries.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ async function subscriptionsTestFixture (context) {
const { dynamo } = context
const subscriptions = await subscriptionsTable(dynamo)
const { customer, provider, subscription, cause } = await loadStaticFixtures()
await subscriptions.insert({ customer, provider, subscription, cause })
await subscriptions.add({ customer, provider, subscription, cause })
return ({
subscriptions,
customer,
Expand All @@ -91,7 +91,7 @@ async function consumersTestFixture (context) {
const { dynamo } = context
const consumers = await consumersTable(dynamo)
const { consumer, provider, subscription, cause } = await loadStaticFixtures()
await consumers.insert({ consumer, provider, subscription, cause })
await consumers.add({ consumer, provider, subscription, cause })
return ({
consumers,
consumer,
Expand Down
37 changes: 27 additions & 10 deletions upload-api/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,51 +46,68 @@ export interface WorkflowBucket {
}

export interface DelegationsBucket {
/** put a delegation into the delegations bucket */
put: (cid: CID, bytes: ByteView<Delegation>) => Promise<void>
/** get a delegation from the delegations bucket */
get: (cid: CID) => Promise<ByteView<Delegation>|undefined>
}

/**
*
*/
export interface SubscriptionInput {
/** DID of the customer who maintains this subscription */
customer: DID,
/** DID of the provider who services this subscription */
provider: DID,
/** ID of this subscription - should be unique per-provider */
subscription: string,
/** CID of the invocation that created this subscription */
cause: UCANLink
}

export interface Subscription {

}

export interface SubscriptionTable {
insert: (consumer: SubscriptionInput) => Promise<Subscription>
/** add a subscription - a relationship between a customer and a provider that will allow for provisioning of consumers */
add: (consumer: SubscriptionInput) => Promise<{}>
/** return the count of subscriptions in the system */
count: () => Promise<bigint>
/** return a list of the subscriptions a customer has with a provider */
findProviderSubscriptionsForCustomer: (customer: DID, provider: DID) =>
Promise<{ subscription: string }[]>
}

/**
* functions that are useful for testing and prototyping - should only be used in tests for now
*/
export interface UnstableSubscriptionTable extends SubscriptionTable {
findCustomersByProvider: (provider: DID) => Promise<DID[]>
findCustomerForSubscription: (subscription: string) => Promise<DID | undefined>
findSubscriptionsForCustomer: (customer: string) => Promise<DID[] | undefined>
}

export interface ConsumerInput {
/** the DID of the consumer (eg, a space) for whom services are being provisioned */
consumer: DID,
/** the DID of the provider who will provide services for the consumer */
provider: DID,
/** the ID of the subscription representing the relationship between the consumer and provider */
subscription: string,
/** the CID of the UCAN invocation that created this record */
cause: UCANLink
}

export interface Consumer {

}

export interface ConsumerTable {
insert: (consumer: ConsumerInput) => Promise<Consumer>
/** add a consumer - a relationship between a provider, subscription and consumer */
add: (consumer: ConsumerInput) => Promise<{}>
/** return the number of consumers */
count: () => Promise<bigint>
/** return a boolean indicating whether the given consumer has a storage provider */
hasStorageProvider: (consumer: DID) => Promise<boolean>
}

/**
* functions that are useful for testing and prototyping - should only be used in tests for now
*/
export interface UnstableConsumerTable extends ConsumerTable {
findConsumersByProvider: (provider: DID) => Promise<DID[]>
findSubscriptionsForConsumer: (consumer: DID) => Promise<DID[]>
Expand Down

0 comments on commit 8663aa2

Please sign in to comment.