Skip to content

Commit

Permalink
Basic kafka admin stuff
Browse files Browse the repository at this point in the history
New routes and full form, successful creation of ACLs

Log sync, rough draft of sync functionality, group check

Minor changes so tests will work across browser in a single run

better indicator of the site being ready

Better getter for env variables

getting topics by groups

Trying a reduction of the set

Adds more info, adds a filter, clean up the page, consolidate side nav UI
  • Loading branch information
dakota002 committed Aug 7, 2024
1 parent 0392d5e commit 5681ba0
Show file tree
Hide file tree
Showing 11 changed files with 705 additions and 8 deletions.
15 changes: 15 additions & 0 deletions app.arc
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,16 @@ legacy_users
email *String
PointInTimeRecovery true

kafka_acls
topicName *String
cognitoGroup **String
PointInTimeRecovery true

kafka_acl_log
partitionKey *Number
syncedOn **Number
PointInTimeRecovery ture

@tables-indexes
email_notification_subscription
topic *String
Expand Down Expand Up @@ -143,6 +153,11 @@ synonyms
synonymId *String
name synonymsByUuid

kafka_acls
cognitoGroup *String
permissionType **String
name aclsByGroup

@aws
runtime nodejs20.x
region us-east-1
Expand Down
14 changes: 14 additions & 0 deletions app/components/NoticeTypeCheckboxes/NoticeTypeCheckboxes.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -215,12 +215,14 @@ interface NoticeTypeCheckboxProps {
defaultSelected?: string[]
selectedFormat?: NoticeFormat
validationFunction?: (arg: any) => void
filterSet?: string[]
}

export function NoticeTypeCheckboxes({
defaultSelected,
selectedFormat = 'text',
validationFunction,
filterSet,
}: NoticeTypeCheckboxProps) {
const [userSelected, setUserSelected] = useState(new Set<string>())
const [selectedCounter, setSelectedCounter] = useState(0)
Expand Down Expand Up @@ -257,6 +259,18 @@ export function NoticeTypeCheckboxes({
}
}

const filteredJsonNoticeTypes = {
...Object.keys(JsonNoticeTypes)
.filter((key) =>
JsonNoticeTypes[key].every((topic) => filterSet?.includes(topic))
)
.map((key) => {
return { key: JsonNoticeTypes[key] }
}),
}

// There must be a better way, but i will worry about that at another time
console.log(filteredJsonNoticeTypes) // { '0': { key: [ 'gcn.circulars' ] } }, not quite right yet
return (
<>
<NestedCheckboxes
Expand Down
281 changes: 278 additions & 3 deletions app/lib/kafka.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,24 @@
*
* SPDX-License-Identifier: Apache-2.0
*/
import { tables } from '@architect/functions'
import { paginateScan } from '@aws-sdk/lib-dynamodb'
import type { DynamoDBDocument } from '@aws-sdk/lib-dynamodb'
import { Kafka } from 'gcn-kafka'
import type { AclEntry } from 'kafkajs'
import {
AclOperationTypes,
AclPermissionTypes,
AclResourceTypes,
ResourcePatternTypes,
} from 'kafkajs'
import memoizee from 'memoizee'

import { domain, getEnvOrDie } from './env.server'
import { domain, getEnvOrDieInProduction } from './env.server'
import type { User } from '~/routes/_auth/user.server'

const client_id = getEnvOrDie('KAFKA_CLIENT_ID')
const client_secret = getEnvOrDie('KAFKA_CLIENT_SECRET')
const client_id = getEnvOrDieInProduction('KAFKA_CLIENT_ID') ?? ''
const client_secret = getEnvOrDieInProduction('KAFKA_CLIENT_SECRET')
const kafka = new Kafka({
client_id,
client_secret,
Expand Down Expand Up @@ -68,3 +79,267 @@ if (process.env.ARC_SANDBOX) {
await producer.send({ topic, messages: [{ value }] })
}
}

export type KafkaACL = {
topicName: string
permissionType: PermissionType
cognitoGroup: string
prefixed: boolean
}

export type PermissionType = 'producer' | 'consumer'

export const adminGroup = 'gcn.nasa.gov/gcn-admin'

const consumerOperations = [AclOperationTypes.READ, AclOperationTypes.DESCRIBE]
const producerOperations = [
AclOperationTypes.CREATE,
AclOperationTypes.WRITE,
AclOperationTypes.DESCRIBE,
]

const admin_client_id = getEnvOrDieInProduction('KAFKA_ADMIN_CLIENT_ID') ?? ''
const admin_client_secret = getEnvOrDieInProduction('KAFKA_ADMIN_CLIENT_SECRET')
const adminKafka = new Kafka({
client_id: admin_client_id,
client_secret: admin_client_secret,
domain,
})

function validateUser(user: User) {
if (!user.groups.includes(adminGroup))
throw new Response(null, { status: 403 })
}

export async function createKafkaACL(user: User, acl: KafkaACL) {
validateUser(user)
// Save to db
const db = await tables()
await db.kafka_acls.put(acl)

// Add to Kafka
const adminClient = adminKafka.admin()
await adminClient.connect()
await adminClient.createTopics({
topics: [
{
topic: acl.topicName,
},
],
})
const acls =
acl.permissionType == 'producer'
? createProducerAcls(acl)
: createConsumerAcls(acl)
await adminClient.createAcls({ acl: acls })
await adminClient.disconnect()
}

export async function getKafkaACLByTopicName(user: User, topicName: string) {
validateUser(user)
const db = await tables()
return (await db.kafka_acls.get({ topicName })) as KafkaACL
}

export async function getKafkaACLsFromDynamoDB(user: User, filter?: string) {
validateUser(user)
const db = await tables()
const client = db._doc as unknown as DynamoDBDocument
const TableName = db.name('kafka_acls')
const pages = paginateScan(
{ client },
{
TableName,
FilterExpression: filter
? 'contains(topicName, :filter) OR contains(cognitoGroup, :filter)'
: undefined,
ExpressionAttributeValues: filter
? {
':filter': filter,
}
: undefined,
}
)

const acls: KafkaACL[] = []
for await (const page of pages) {
const newACL = page.Items as KafkaACL[]
if (newACL) acls.push(...newACL)
}
return acls
}

export async function getKafkaTopicsForUser(user: User) {
validateUser(user)
const userGroups = user.groups.filter((x) =>
x.startsWith('gcn.nasa.gov/kafka-')
)
const db = await tables()
const items = (
await Promise.all([
...userGroups.map((cognitoGroup) =>
db.kafka_acls.query({
IndexName: 'aclsByGroup',
KeyConditionExpression:
'cognitoGroup = :group AND permissionType = :permission',
ProjectionExpression: 'topicName',
ExpressionAttributeValues: {
':group': cognitoGroup,
':permission': 'consumer',
},
})
),
])
)
.filter((x) => x.Count && x.Count > 0)
.flatMap((x) => x.Items)
.map((x) => x.topicName)

return items
}
export async function getAclsFromBrokers() {
const adminClient = adminKafka.admin()
await adminClient.connect()
const acls = await adminClient.describeAcls({
resourceType: AclResourceTypes.TOPIC,
host: '*',
permissionType: AclPermissionTypes.ALLOW,
operation: AclOperationTypes.ANY,
resourcePatternType: ResourcePatternTypes.ANY,
})
await adminClient.disconnect()
const results: KafkaACL[] = []
for (const item of acls.resources) {
const topicName = item.resourceName
const prefixed = item.resourcePatternType === ResourcePatternTypes.PREFIXED
const producerRules = producerOperations.every((op) =>
item.acls.map((x) => x.operation).includes(op)
)
const producerGroup =
producerRules &&
[
...new Set(
item.acls
.filter((acl) => producerOperations.includes(acl.operation))
.map((x) => x.principal)
),
][0]?.replace('User:', '')
const consumerRules = consumerOperations.every((op) =>
item.acls.map((x) => x.operation).includes(op)
)
const consumerGroup =
consumerRules &&
[
...new Set(
item.acls
.filter((acl) => consumerOperations.includes(acl.operation))
.map((x) => x.principal)
),
][0]?.replace('User:', '')
if (producerRules && producerGroup)
results.push({
topicName,
permissionType: 'producer',
cognitoGroup: producerGroup,
prefixed,
})
if (consumerRules && consumerGroup)
results.push({
topicName,
permissionType: 'consumer',
cognitoGroup: consumerGroup,
prefixed,
})
}
return results
}

export async function deleteKafkaACL(user: User, acl: KafkaACL) {
validateUser(user)
const db = await tables()
await db.kafka_acls.delete({
topicName: acl.topicName,
cognitoGroup: acl.cognitoGroup,
})

const acls =
acl.permissionType == 'producer'
? createProducerAcls(acl)
: createConsumerAcls(acl)

const adminClient = adminKafka.admin()
await adminClient.connect()
await adminClient.deleteAcls({ filters: acls })
await adminClient.disconnect()
}

function createProducerAcls(acl: KafkaACL): AclEntry[] {
// Create, Write, and Describe operations
return mapAclAndOperations(acl, producerOperations)
}

function createConsumerAcls(acl: KafkaACL): AclEntry[] {
// Read and Describe operations
return mapAclAndOperations(acl, consumerOperations)
}

function mapAclAndOperations(acl: KafkaACL, operations: AclOperationTypes[]) {
return operations.map((operation) => {
return {
resourceType: AclResourceTypes.TOPIC,
resourceName: acl.topicName,
resourcePatternType: acl.prefixed
? ResourcePatternTypes.PREFIXED
: ResourcePatternTypes.LITERAL,
principal: `User:${acl.cognitoGroup}`,
host: '*',
operation,
permissionType: AclPermissionTypes.ALLOW,
}
})
}

export async function updateBrokersFromDb(user: User) {
const dbDefinedAcls = await getKafkaACLsFromDynamoDB(user)
const mappedAcls = dbDefinedAcls.flatMap((x) =>
x.permissionType === 'producer'
? createProducerAcls(x)
: createConsumerAcls(x)
)

const adminClient = adminKafka.admin()
await adminClient.connect()
await adminClient.createAcls({ acl: mappedAcls })
await adminClient.disconnect()
}

export async function updateDbFromBrokers(user: User) {
const kafkaDefinedAcls = await getAclsFromBrokers()
const db = await tables()
await Promise.all([
...kafkaDefinedAcls.map((acl) => db.kafka_acls.put(acl)),
db.kafka_acl_log.put({
partitionKey: 1,
syncedOn: Date.now(),
syncedBy: user.email,
}),
])
}

type KafkaAclSyncLog = {
partitionKey: number
syncedOn: number
syncedBy: string
}

export async function getLastSyncDate(): Promise<KafkaAclSyncLog> {
const db = await tables()
return (
await db.kafka_acl_log.query({
KeyConditionExpression: 'partitionKey = :1',
ExpressionAttributeValues: { ':1': 1 },
ScanIndexForward: false,
Limit: 1,
})
).Items.pop() as KafkaAclSyncLog
}
8 changes: 8 additions & 0 deletions app/root.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import { useSpinDelay } from 'spin-delay'
import invariant from 'tiny-invariant'

import { features, getEnvOrDieInProduction, origin } from './lib/env.server'
import { adminGroup } from './lib/kafka.server'
import { DevBanner } from './root/DevBanner'
import { Footer } from './root/Footer'
import NewsBanner from './root/NewsBanner'
Expand Down Expand Up @@ -119,6 +120,7 @@ export async function loader({ request }: LoaderFunctionArgs) {
const recaptchaSiteKey = getEnvOrDieInProduction('RECAPTCHA_SITE_KEY')
const userIsMod = user?.groups.includes(moderatorGroup)
const userIsVerifiedSubmitter = user?.groups.includes(submitterGroup)
const userIsAdmin = user?.groups.includes(adminGroup)

return {
origin,
Expand All @@ -129,6 +131,7 @@ export async function loader({ request }: LoaderFunctionArgs) {
idp,
userIsMod,
userIsVerifiedSubmitter,
userIsAdmin,
}
}

Expand Down Expand Up @@ -168,6 +171,11 @@ export function useSubmitterStatus() {
return userIsVerifiedSubmitter
}

export function useAdminStatus() {
const { userIsAdmin } = useLoaderDataRoot()
return userIsAdmin
}

export function useRecaptchaSiteKey() {
const { recaptchaSiteKey } = useLoaderDataRoot()
return recaptchaSiteKey
Expand Down
Loading

0 comments on commit 5681ba0

Please sign in to comment.