diff --git a/plugin-server/src/capabilities.ts b/plugin-server/src/capabilities.ts index d340865827a3d..b81fe667c82e8 100644 --- a/plugin-server/src/capabilities.ts +++ b/plugin-server/src/capabilities.ts @@ -9,6 +9,7 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin case null: return { mmdb: true, + propertyDefs: true, ingestionV2Combined: true, processAsyncOnEventHandlers: true, processAsyncWebhooksHandlers: true, @@ -32,6 +33,10 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin mmdb: true, ingestionV2: true, } + case PluginServerMode.property_defs: + return { + propertyDefs: true, + } case PluginServerMode.recordings_blob_ingestion: return { sessionRecordingBlobIngestion: true, diff --git a/plugin-server/src/cdp/_tests/fixtures.ts b/plugin-server/src/cdp/_tests/fixtures.ts index a295dd0282dd1..44d6d98dda2e2 100644 --- a/plugin-server/src/cdp/_tests/fixtures.ts +++ b/plugin-server/src/cdp/_tests/fixtures.ts @@ -3,7 +3,7 @@ import { Message } from 'node-rdkafka' import { insertRow } from '~/tests/helpers/sql' -import { ClickHouseTimestamp, RawClickHouseEvent, Team } from '../../types' +import { ClickHouseTimestamp, ProjectId, RawClickHouseEvent, Team } from '../../types' import { PostgresRouter } from '../../utils/db/postgres' import { UUIDT } from '../../utils/utils' import { CdpInternalEvent } from '../schema' @@ -49,6 +49,7 @@ export const createIntegration = (integration: Partial) => { export const createIncomingEvent = (teamId: number, data: Partial): RawClickHouseEvent => { return { team_id: teamId, + project_id: teamId as ProjectId, created_at: new Date().toISOString() as ClickHouseTimestamp, elements_chain: '[]', person_created_at: new Date().toISOString() as ClickHouseTimestamp, diff --git a/plugin-server/src/config/config.ts b/plugin-server/src/config/config.ts index 5a44e1f9a6969..780f879b5cca7 100644 --- a/plugin-server/src/config/config.ts +++ b/plugin-server/src/config/config.ts @@ -1,5 +1,5 @@ import { LogLevel, PluginLogLevel, PluginsServerConfig, stringToPluginServerMode, ValueMatcher } from '../types' -import { isDevEnv, isTestEnv, stringToBoolean } from '../utils/env-utils' +import { isDevEnv, isProdEnv, isTestEnv, stringToBoolean } from '../utils/env-utils' import { KAFKAJS_LOG_LEVEL_MAPPING } from './constants' import { KAFKA_CLICKHOUSE_HEATMAP_EVENTS, @@ -219,6 +219,12 @@ export function getDefaultConfig(): PluginsServerConfig { INGESTION_CONSUMER_OVERFLOW_TOPIC: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW, INGESTION_CONSUMER_DLQ_TOPIC: KAFKA_EVENTS_PLUGIN_INGESTION_DLQ, + // PropertyDefsConsumer config + PROPERTY_DEFS_CONSUMER_GROUP_ID: 'property-defs-consumer', + PROPERTY_DEFS_CONSUMER_CONSUME_TOPIC: KAFKA_EVENTS_JSON, + PROPERTY_DEFS_CONSUMER_ENABLED_TEAMS: isDevEnv() ? '*' : '', + PROPERTY_DEFS_WRITE_DISABLED: isProdEnv() ? true : false, // For now we don't want to do writes on prod - only count them + // Session recording V2 SESSION_RECORDING_MAX_BATCH_SIZE_KB: 100 * 1024, // 100MB SESSION_RECORDING_MAX_BATCH_AGE_MS: 10 * 1000, // 10 seconds diff --git a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-webhooks.ts b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-webhooks.ts index 315b78747ceea..cb4c86d83ea1e 100644 --- a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-webhooks.ts +++ b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-webhooks.ts @@ -157,7 +157,7 @@ async function addGroupPropertiesToPostIngestionEvent( organizationManager: OrganizationManager, postgres: PostgresRouter ): Promise { - let groupTypes: GroupTypeToColumnIndex | undefined = undefined + let groupTypes: GroupTypeToColumnIndex | null = null if (await organizationManager.hasAvailableFeature(event.teamId, 'group_analytics')) { // If the organization has group analytics enabled then we enrich the event with group data groupTypes = await groupTypeManager.fetchGroupTypes(event.projectId) diff --git a/plugin-server/src/property-defs/__snapshots__/property-defs-consumer.test.ts.snap b/plugin-server/src/property-defs/__snapshots__/property-defs-consumer.test.ts.snap new file mode 100644 index 0000000000000..8d248be5c2b24 --- /dev/null +++ b/plugin-server/src/property-defs/__snapshots__/property-defs-consumer.test.ts.snap @@ -0,0 +1,229 @@ +// Jest Snapshot v1, https://goo.gl/fbAQLP + +exports[`PropertyDefsConsumer group updates should create group properties from the $group_set property 1`] = ` +[ + { + "created_at": "2025-01-01T01:00:00.000Z", + "id": "", + "last_seen_at": "2025-01-01T01:00:00.000Z", + "name": "$groupidentify", + "project_id": "2", + "query_usage_30_day": null, + "team_id": 2, + "volume_30_day": null, + }, +] +`; + +exports[`PropertyDefsConsumer group updates should create group properties from the $group_set property 2`] = ` +[ + { + "group_type_index": 0, + "id": "", + "is_numerical": false, + "name": "prop1", + "project_id": "2", + "property_type": "String", + "property_type_format": null, + "query_usage_30_day": null, + "team_id": 2, + "type": 3, + "volume_30_day": null, + }, + { + "group_type_index": 0, + "id": "", + "is_numerical": true, + "name": "prop2", + "project_id": "2", + "property_type": "Numeric", + "property_type_format": null, + "query_usage_30_day": null, + "team_id": 2, + "type": 3, + "volume_30_day": null, + }, + { + "group_type_index": null, + "id": "", + "is_numerical": false, + "name": "$group_type", + "project_id": "2", + "property_type": "String", + "property_type_format": null, + "query_usage_30_day": null, + "team_id": 2, + "type": 1, + "volume_30_day": null, + }, + { + "group_type_index": null, + "id": "", + "is_numerical": false, + "name": "$group_key", + "project_id": "2", + "property_type": "String", + "property_type_format": null, + "query_usage_30_day": null, + "team_id": 2, + "type": 1, + "volume_30_day": null, + }, +] +`; + +exports[`PropertyDefsConsumer property updates should batch multipleΒ writes 1`] = ` +[ + { + "group_type_index": null, + "id": "", + "is_numerical": false, + "name": "url", + "project_id": "2", + "property_type": "String", + "property_type_format": null, + "query_usage_30_day": null, + "team_id": 2, + "type": 1, + "volume_30_day": null, + }, + { + "group_type_index": null, + "id": "", + "is_numerical": false, + "name": "foo", + "project_id": "2", + "property_type": "String", + "property_type_format": null, + "query_usage_30_day": null, + "team_id": 2, + "type": 1, + "volume_30_day": null, + }, + { + "group_type_index": null, + "id": "", + "is_numerical": true, + "name": "test", + "project_id": "2", + "property_type": "Numeric", + "property_type_format": null, + "query_usage_30_day": null, + "team_id": 2, + "type": 1, + "volume_30_day": null, + }, +] +`; + +exports[`PropertyDefsConsumer property updates should handle existing property defs 1`] = ` +[ + { + "group_type_index": null, + "id": "", + "is_numerical": true, + "name": "foo", + "project_id": "2", + "property_type": "Numeric", + "property_type_format": null, + "query_usage_30_day": null, + "team_id": 2, + "type": 1, + "volume_30_day": null, + }, +] +`; + +exports[`PropertyDefsConsumer property updates should handle existing property defs 2`] = ` +[ + { + "group_type_index": null, + "id": "", + "is_numerical": true, + "name": "foo", + "project_id": "2", + "property_type": "Numeric", + "property_type_format": null, + "query_usage_30_day": null, + "team_id": 2, + "type": 1, + "volume_30_day": null, + }, + { + "group_type_index": null, + "id": "", + "is_numerical": false, + "name": "other", + "project_id": "2", + "property_type": "String", + "property_type_format": null, + "query_usage_30_day": null, + "team_id": 2, + "type": 1, + "volume_30_day": null, + }, +] +`; + +exports[`PropertyDefsConsumer property updates should only write the first seen property defs to the DB 1`] = ` +[ + { + "group_type_index": null, + "id": "", + "is_numerical": false, + "name": "url", + "project_id": "2", + "property_type": "String", + "property_type_format": null, + "query_usage_30_day": null, + "team_id": 2, + "type": 1, + "volume_30_day": null, + }, +] +`; + +exports[`PropertyDefsConsumer property updates should write simple property defs to the DB 1`] = ` +[ + { + "created_at": "2025-01-01T01:00:00.000Z", + "id": "", + "last_seen_at": "2025-01-01T01:00:00.000Z", + "name": "$pageview", + "project_id": "2", + "query_usage_30_day": null, + "team_id": 2, + "volume_30_day": null, + }, +] +`; + +exports[`PropertyDefsConsumer property updates should write simple property defs to the DB 2`] = ` +[ + { + "event": "$pageview", + "id": "", + "project_id": "2", + "property": "url", + "team_id": 2, + }, +] +`; + +exports[`PropertyDefsConsumer property updates should write simple property defs to the DB 3`] = ` +[ + { + "group_type_index": null, + "id": "", + "is_numerical": false, + "name": "url", + "project_id": "2", + "property_type": "String", + "property_type_format": null, + "query_usage_30_day": null, + "team_id": 2, + "type": 1, + "volume_30_day": null, + }, +] +`; diff --git a/plugin-server/src/property-defs/property-defs-consumer.test.ts b/plugin-server/src/property-defs/property-defs-consumer.test.ts new file mode 100644 index 0000000000000..d077f343bea75 --- /dev/null +++ b/plugin-server/src/property-defs/property-defs-consumer.test.ts @@ -0,0 +1,344 @@ +import { DateTime } from 'luxon' +import { Message } from 'node-rdkafka' + +import { mockProducer } from '~/tests/helpers/mocks/producer.mock' +import { forSnapshot } from '~/tests/helpers/snapshots' +import { getFirstTeam, resetTestDatabase } from '~/tests/helpers/sql' + +import { insertHogFunction as _insertHogFunction } from '../cdp/_tests/fixtures' +import { ClickHouseEvent, Hub, ProjectId, RawClickHouseEvent, Team, TimestampFormat } from '../types' +import { closeHub, createHub } from '../utils/db/hub' +import { PostgresUse } from '../utils/db/postgres' +import { castTimestampOrNow } from '../utils/utils' +import { PropertyDefsConsumer } from './property-defs-consumer' +import { PropertyDefsDB } from './services/property-defs-db' + +const DEFAULT_TEST_TIMEOUT = 5000 +jest.setTimeout(DEFAULT_TEST_TIMEOUT) + +const mockConsumer = { + on: jest.fn(), + commitSync: jest.fn(), + commit: jest.fn(), + queryWatermarkOffsets: jest.fn(), + committed: jest.fn(), + assignments: jest.fn(), + isConnected: jest.fn(() => true), + getMetadata: jest.fn(), +} + +jest.mock('../../src/kafka/batch-consumer', () => { + return { + startBatchConsumer: jest.fn(() => + Promise.resolve({ + join: () => ({ + finally: jest.fn(), + }), + stop: jest.fn(), + consumer: mockConsumer, + }) + ), + } +}) + +let offsetIncrementer = 0 + +const createRawClickHouseEvent = (event: ClickHouseEvent): RawClickHouseEvent => { + // Inverts the parsing for simpler tests + return { + ...event, + properties: JSON.stringify(event.properties), + timestamp: castTimestampOrNow(event.timestamp ?? null, TimestampFormat.ClickHouse), + created_at: castTimestampOrNow(event.created_at ?? null, TimestampFormat.ClickHouse), + elements_chain: JSON.stringify(event.elements_chain), + person_created_at: castTimestampOrNow(event.person_created_at ?? null, TimestampFormat.ClickHouse), + person_properties: JSON.stringify(event.person_properties), + group0_created_at: castTimestampOrNow(event.group0_created_at ?? null, TimestampFormat.ClickHouse), + group0_properties: event.group0_properties ? JSON.stringify(event.group0_properties) : undefined, + group1_created_at: castTimestampOrNow(event.group1_created_at ?? null, TimestampFormat.ClickHouse), + group1_properties: event.group1_properties ? JSON.stringify(event.group1_properties) : undefined, + group2_created_at: castTimestampOrNow(event.group2_created_at ?? null, TimestampFormat.ClickHouse), + group2_properties: event.group2_properties ? JSON.stringify(event.group2_properties) : undefined, + group3_created_at: castTimestampOrNow(event.group3_created_at ?? null, TimestampFormat.ClickHouse), + group3_properties: event.group3_properties ? JSON.stringify(event.group3_properties) : undefined, + group4_created_at: castTimestampOrNow(event.group4_created_at ?? null, TimestampFormat.ClickHouse), + group4_properties: event.group4_properties ? JSON.stringify(event.group4_properties) : undefined, + } +} + +const createClickHouseEvent = (event: Partial = {}): ClickHouseEvent => { + return { + uuid: event.uuid ?? '123', + event: event.event ?? '$pageview', + team_id: event.team_id ?? 1, + project_id: event.project_id ?? (1 as ProjectId), + distinct_id: event.distinct_id ?? 'distinct_id_1', + /** Person UUID. */ + person_id: event.person_id ?? undefined, + + timestamp: DateTime.now(), + created_at: DateTime.now(), + properties: event.properties ?? {}, + elements_chain: event.elements_chain ?? null, + person_created_at: event.person_created_at ?? null, + person_properties: event.person_properties ?? {}, + group0_properties: event.group0_properties ?? {}, + group1_properties: event.group1_properties ?? {}, + group2_properties: event.group2_properties ?? {}, + group3_properties: event.group3_properties ?? {}, + group4_properties: event.group4_properties ?? {}, + group0_created_at: event.group0_created_at ?? null, + group1_created_at: event.group1_created_at ?? null, + group2_created_at: event.group2_created_at ?? null, + group3_created_at: event.group3_created_at ?? null, + group4_created_at: event.group4_created_at ?? null, + person_mode: event.person_mode ?? 'full', + } +} + +const createKafkaMessages: (events: ClickHouseEvent[]) => Message[] = (events) => { + return events.map((event) => { + // TRICKY: This is the slightly different format that capture sends + return { + value: Buffer.from(JSON.stringify(createRawClickHouseEvent(event))), + size: 1, + topic: 'test', + offset: offsetIncrementer++, + timestamp: DateTime.now().toMillis(), + partition: 1, + } + }) +} + +/** + * TEST CASES TO COVER: + * - $groupidentify + * - Should create group properties from the $group_set property + * - Should create property properties from its own event properties + * - Should limit to the max number of groups using the group types + * - batching + * - Should only write once per unique constraint (team_id, event, property etc) + */ + +describe('PropertyDefsConsumer', () => { + let ingester: PropertyDefsConsumer + let hub: Hub + let fixedTime: DateTime + let team: Team + let propertyDefsDB: PropertyDefsDB + + const advanceTime = (amount: number) => { + fixedTime = fixedTime.plus({ seconds: amount }) + jest.spyOn(Date, 'now').mockReturnValue(fixedTime.toMillis()) + } + + beforeEach(async () => { + fixedTime = DateTime.fromObject({ year: 2025, month: 1, day: 1 }, { zone: 'UTC' }) + jest.spyOn(Date, 'now').mockReturnValue(fixedTime.toMillis()) + + offsetIncrementer = 0 + await resetTestDatabase() + hub = await createHub() + team = await getFirstTeam(hub) + + // Create some default group type mappings + const groupTypeMappings = [ + { group_type: 'group-a', group_type_index: 0 }, + { group_type: 'group-b', group_type_index: 1 }, + ] + for (const mapping of groupTypeMappings) { + await hub.postgres.query( + PostgresUse.COMMON_WRITE, + `INSERT INTO posthog_grouptypemapping (team_id, project_id, group_type, group_type_index) VALUES ($1, $2, $3, $4)`, + [team.id, team.project_id, mapping.group_type, mapping.group_type_index], + 'createGroupTypeMappings' + ) + } + + hub.PROPERTY_DEFS_CONSUMER_ENABLED_TEAMS = '*' + ingester = new PropertyDefsConsumer(hub) + + hub.kafkaProducer = mockProducer + propertyDefsDB = ingester['propertyDefsDB'] + + jest.spyOn(propertyDefsDB, 'writeEventDefinitions') + jest.spyOn(propertyDefsDB, 'writePropertyDefinitions') + jest.spyOn(propertyDefsDB, 'writeEventProperties') + }) + + afterEach(async () => { + jest.restoreAllMocks() + await closeHub(hub) + }) + + afterAll(() => { + jest.useRealTimers() + }) + + describe('property updates', () => { + it('should write simple property defs to the DB', async () => { + await ingester.handleKafkaBatch( + createKafkaMessages([ + createClickHouseEvent({ + team_id: team.id, + project_id: team.project_id, + properties: { + url: 'http://example.com', + }, + }), + ]) + ) + + expect(propertyDefsDB.writeEventDefinitions).toHaveBeenCalledTimes(1) + expect(propertyDefsDB.writePropertyDefinitions).toHaveBeenCalledTimes(1) + expect(propertyDefsDB.writeEventProperties).toHaveBeenCalledTimes(1) + + expect(forSnapshot(await propertyDefsDB.listEventDefinitions(team.id))).toMatchSnapshot() + + expect( + forSnapshot(await propertyDefsDB.listEventProperties(team.id), { + overrides: { id: '' }, + }) + ).toMatchSnapshot() + + expect(forSnapshot(await propertyDefsDB.listPropertyDefinitions(team.id))).toMatchSnapshot() + }) + + it('should only write the first seen property defs to the DB', async () => { + await ingester.handleKafkaBatch( + createKafkaMessages([ + createClickHouseEvent({ + team_id: team.id, + project_id: team.project_id, + properties: { + url: 'http://example.com', + }, + }), + createClickHouseEvent({ + team_id: team.id, + project_id: team.project_id, + properties: { + url: 2, + }, + }), + createClickHouseEvent({ + team_id: team.id, + project_id: team.project_id, + properties: { + url: 5, + }, + }), + ]) + ) + + expect(propertyDefsDB.writeEventDefinitions).toHaveBeenCalledTimes(1) + expect(propertyDefsDB.writePropertyDefinitions).toHaveBeenCalledTimes(1) + expect(propertyDefsDB.writeEventProperties).toHaveBeenCalledTimes(1) + + // Snapshot shows a String type as it was the first seen value + expect(forSnapshot(await propertyDefsDB.listPropertyDefinitions(team.id))).toMatchSnapshot() + }) + + it('should batch multipleΒ writes', async () => { + await ingester.handleKafkaBatch( + createKafkaMessages([ + createClickHouseEvent({ + team_id: team.id, + project_id: team.project_id, + event: 'event1', + properties: { + url: 'http://example.com', + }, + }), + createClickHouseEvent({ + team_id: team.id, + project_id: team.project_id, + event: 'event2', + properties: { + foo: 'bar', + test: 4, + }, + }), + ]) + ) + + expect(propertyDefsDB.writeEventDefinitions).toHaveBeenCalledTimes(1) + expect(propertyDefsDB.writePropertyDefinitions).toHaveBeenCalledTimes(1) + expect(propertyDefsDB.writeEventProperties).toHaveBeenCalledTimes(1) + + // Snapshot shows a String type as it was the first seen value + expect(forSnapshot(await propertyDefsDB.listPropertyDefinitions(team.id))).toMatchSnapshot() + }) + + it('should handle existing property defs', async () => { + await ingester.handleKafkaBatch( + createKafkaMessages([ + createClickHouseEvent({ + team_id: team.id, + project_id: team.project_id, + event: 'event1', + properties: { + foo: 1, + }, + }), + ]) + ) + expect(forSnapshot(await propertyDefsDB.listPropertyDefinitions(team.id))).toMatchSnapshot() + + advanceTime(1000) + + await ingester.handleKafkaBatch( + createKafkaMessages([ + createClickHouseEvent({ + team_id: team.id, + project_id: team.project_id, + event: 'event1', + properties: { + other: 'property', + }, + }), + createClickHouseEvent({ + team_id: team.id, + project_id: team.project_id, + event: 'event1', + properties: { + foo: 'changed!!', + }, + }), + ]) + ) + + // Contains both properties but only one updated + expect(forSnapshot(await propertyDefsDB.listPropertyDefinitions(team.id))).toMatchSnapshot() + }) + }) + + describe('group updates', () => { + it('should create group properties from the $group_set property', async () => { + await ingester.handleKafkaBatch( + createKafkaMessages([ + createClickHouseEvent({ + team_id: team.id, + project_id: team.project_id, + event: '$groupidentify', + properties: { + $group_type: 'group-a', + $group_key: 'key-1', + $group_set: { + prop1: 'foo', + prop2: 2, + }, + }, + }), + ]) + ) + + expect(propertyDefsDB.writeEventDefinitions).toHaveBeenCalledTimes(1) + expect(propertyDefsDB.writePropertyDefinitions).toHaveBeenCalledTimes(1) + expect(propertyDefsDB.writeEventProperties).toHaveBeenCalledTimes(1) + + expect(forSnapshot(await propertyDefsDB.listEventDefinitions(team.id))).toMatchSnapshot() + expect(forSnapshot(await propertyDefsDB.listPropertyDefinitions(team.id))).toMatchSnapshot() + }) + }) +}) diff --git a/plugin-server/src/property-defs/property-defs-consumer.ts b/plugin-server/src/property-defs/property-defs-consumer.ts new file mode 100644 index 0000000000000..a038896a1838b --- /dev/null +++ b/plugin-server/src/property-defs/property-defs-consumer.ts @@ -0,0 +1,455 @@ +import { DateTime } from 'luxon' +import { Message } from 'node-rdkafka' +import { Counter } from 'prom-client' + +import { buildIntegerMatcher } from '../config/config' +import { BatchConsumer, startBatchConsumer } from '../kafka/batch-consumer' +import { createRdConnectionConfigFromEnvVars } from '../kafka/config' +import { addSentryBreadcrumbsEventListeners } from '../main/ingestion-queues/kafka-metrics' +import { runInstrumentedFunction } from '../main/utils' +import { + ClickHouseEvent, + EventDefinitionType, + EventPropertyType, + Hub, + PluginServerService, + ProjectId, + PropertyDefinitionType, + PropertyDefinitionTypeEnum, + PropertyType, + RawClickHouseEvent, + ValueMatcher, +} from '../types' +import { parseRawClickHouseEvent } from '../utils/event' +import { status } from '../utils/status' +import { UUIDT } from '../utils/utils' +import { GroupTypeManager, GroupTypesByProjectId } from '../worker/ingestion/group-type-manager' +import { TeamManager } from '../worker/ingestion/team-manager' +import { PropertyDefsDB } from './services/property-defs-db' +import { + getPropertyType, + PROPERTY_DEFS_PROPERTIES_TO_SKIP, + sanitizeEventName, + willFitInPostgres, +} from './services/property-defs-utils' + +// Must require as `tsc` strips unused `import` statements and just requiring this seems to init some globals +require('@sentry/tracing') + +const propertyDefTypesCounter = new Counter({ + name: 'property_defs_types_total', + help: 'Count of derived property types.', + labelNames: ['type'], +}) + +const eventDefTypesCounter = new Counter({ + name: 'event_defs_types_total', + help: 'Count of new event definitions.', +}) + +const eventPropTypesCounter = new Counter({ + name: 'event_props_types_total', + help: 'Count of derived event properties.', +}) + +const propDefDroppedCounter = new Counter({ + name: 'prop_defs_dropped_total', + help: 'Count of property definitions dropped.', + labelNames: ['type', 'reason'], +}) + +const propDefsPostgresWritesCounter = new Counter({ + name: 'prop_defs_postgres_writes_total', + help: 'Count of property definitions written to Postgres.', + labelNames: ['type'], +}) + +export type CollectedPropertyDefinitions = { + // known project ID => resolved group_type & group_type_index + eventDefinitionsById: Record> + // known project ID => deduped properties + propertyDefinitionsById: Record> + // known project ID => deduped event_properties + eventPropertiesById: Record> +} + +/** + * NOTE: This is currently experimental and only used to do some testing on performance and comparisons. + */ +export class PropertyDefsConsumer { + protected groupId: string + protected topic: string + protected name = 'property-defs-consumer' + + private batchConsumer?: BatchConsumer + private propertyDefsDB: PropertyDefsDB + private teamManager: TeamManager + private groupTypeManager: GroupTypeManager + private isStopping = false + protected heartbeat = () => {} + protected promises: Set> = new Set() + private propDefsEnabledProjects: ValueMatcher + private writeDisabled: boolean + + constructor(private hub: Hub) { + this.groupId = hub.PROPERTY_DEFS_CONSUMER_GROUP_ID + this.topic = hub.PROPERTY_DEFS_CONSUMER_CONSUME_TOPIC + this.propertyDefsDB = new PropertyDefsDB(hub) + this.teamManager = new TeamManager(hub.postgres) + this.groupTypeManager = new GroupTypeManager(hub.postgres, this.teamManager) + this.propDefsEnabledProjects = buildIntegerMatcher(hub.PROPERTY_DEFS_CONSUMER_ENABLED_TEAMS, true) + this.writeDisabled = hub.PROPERTY_DEFS_WRITE_DISABLED + } + + public get service(): PluginServerService { + return { + id: this.name, + onShutdown: async () => await this.stop(), + healthcheck: () => this.isHealthy() ?? false, + batchConsumer: this.batchConsumer, + } + } + + public async start(): Promise { + await Promise.all([ + this.startKafkaConsumer({ + topic: this.topic, + groupId: this.groupId, + handleBatch: async (messages) => this.handleKafkaBatch(messages), + }), + ]) + } + + public async stop(): Promise { + status.info('πŸ”', `${this.name} - stopping`) + this.isStopping = true + + // Mark as stopping so that we don't actually process any more incoming messages, but still keep the process alive + status.info('πŸ”', `${this.name} - stopping batch consumer`) + await this.batchConsumer?.stop() + status.info('πŸ‘', `${this.name} - stopped!`) + } + + public isHealthy() { + return this.batchConsumer?.isHealthy() + } + + private scheduleWork(promise: Promise): Promise { + this.promises.add(promise) + void promise.finally(() => this.promises.delete(promise)) + return promise + } + + private runInstrumented(name: string, func: () => Promise): Promise { + return runInstrumentedFunction({ statsKey: `propertyDefsConsumer.${name}`, func }) + } + + public async handleKafkaBatch(messages: Message[]) { + let parsedMessages: ClickHouseEvent[] = await this.runInstrumented('parseKafkaMessages', () => + this.parseKafkaBatch(messages) + ) + + parsedMessages = parsedMessages.filter((msg) => this.propDefsEnabledProjects(msg.project_id)) + + if (parsedMessages.length === 0) { + status.debug('πŸ”', `No messages to process`) + return + } + + const projectsToLoadGroupsFor = new Set() + + parsedMessages.forEach((msg) => { + if (msg.event === '$groupidentify') { + projectsToLoadGroupsFor.add(msg.project_id) + } + }) + + const groupTypesByProjectId = await this.runInstrumented('fetchGroupTypesForProjects', () => + this.groupTypeManager.fetchGroupTypesForProjects(projectsToLoadGroupsFor) + ) + + // extract and dedup event and property definitions + const collected = await this.runInstrumented('derivePropDefs', () => + Promise.resolve(this.extractPropertyDefinitions(parsedMessages, groupTypesByProjectId)) + ) + + const eventDefinitions = Object.values(collected.eventDefinitionsById).flatMap((eventDefinitions) => + Object.values(eventDefinitions) + ) + + if (eventDefinitions.length > 0) { + eventDefTypesCounter.inc(eventDefinitions.length) + status.info('πŸ”', `Writing event definitions batch of size ${eventDefinitions.length}`) + propDefsPostgresWritesCounter.inc({ type: 'event_definitions' }) + if (!this.writeDisabled) { + void this.scheduleWork(this.propertyDefsDB.writeEventDefinitions(eventDefinitions)) + } + } + + const propertyDefinitions = Object.values(collected.propertyDefinitionsById).flatMap((propertyDefinitions) => + Object.values(propertyDefinitions) + ) + + if (propertyDefinitions.length > 0) { + for (const propDef of propertyDefinitions) { + propertyDefTypesCounter.inc({ type: propDef.type }) + } + status.info('πŸ”', `Writing property definitions batch of size ${propertyDefinitions.length}`) + propDefsPostgresWritesCounter.inc({ type: 'property_definitions' }) + if (!this.writeDisabled) { + void this.scheduleWork(this.propertyDefsDB.writePropertyDefinitions(propertyDefinitions)) + } + } + + const eventProperties = Object.values(collected.eventPropertiesById).flatMap((eventProperties) => + Object.values(eventProperties) + ) + + if (eventProperties.length > 0) { + eventPropTypesCounter.inc(eventProperties.length) + status.info('πŸ”', `Writing event properties batch of size ${eventProperties.length}`) + propDefsPostgresWritesCounter.inc({ type: 'event_properties' }) + if (!this.writeDisabled) { + void this.scheduleWork(this.propertyDefsDB.writeEventProperties(eventProperties)) + } + } + + status.debug('πŸ”', `Waiting for promises`, { promises: this.promises.size }) + await this.runInstrumented('awaitScheduledWork', () => Promise.all(this.promises)) + status.debug('πŸ”', `Processed batch`) + } + + private extractPropertyDefinitions( + events: ClickHouseEvent[], + groupTypesByProjectId: GroupTypesByProjectId + ): CollectedPropertyDefinitions { + const collected: CollectedPropertyDefinitions = { + eventDefinitionsById: {}, + propertyDefinitionsById: {}, + eventPropertiesById: {}, + } + + for (const event of events) { + event.event = sanitizeEventName(event.event) + + if (!willFitInPostgres(event.event)) { + propDefDroppedCounter.inc({ type: 'event', reason: 'key_too_long' }) + continue + } + + // Setup all the objects for this event's project ID + const eventDefinitions = (collected.eventDefinitionsById[event.project_id] = + collected.eventDefinitionsById[event.project_id] ?? {}) + + const propertyDefinitions = (collected.propertyDefinitionsById[event.project_id] = + collected.propertyDefinitionsById[event.project_id] ?? {}) + + const eventProperties = (collected.eventPropertiesById[event.project_id] = + collected.eventPropertiesById[event.project_id] ?? {}) + + // Capture event definition + if (!eventDefinitions[event.event]) { + eventDefinitions[event.event] = { + id: new UUIDT().toString(), + name: event.event, + team_id: event.team_id, + project_id: event.team_id, // TODO: add project_id + created_at: event.created_at.toISO() || DateTime.now().toString(), + volume_30_day: 0, // deprecated + query_usage_30_day: 0, // deprecated + } + } + + // Decision: are there group properties eligible for capture in this event? + let shouldCaptureGroupProps: boolean = event.event === '$groupidentify' + const groupTypesForProject = groupTypesByProjectId[event.project_id] + + if (shouldCaptureGroupProps) { + shouldCaptureGroupProps = true + + if (!groupTypesForProject) { + propDefDroppedCounter.inc({ type: 'group', reason: 'team_groups_not_found' }) + shouldCaptureGroupProps = false + } else if (!event.properties['$group_type']) { + propDefDroppedCounter.inc({ type: 'group', reason: 'undefined_group' }) + shouldCaptureGroupProps = false + } else if (typeof groupTypesForProject[event.properties['$group_type']] !== 'number') { + propDefDroppedCounter.inc({ type: 'group', reason: 'group_index_not_found' }) + shouldCaptureGroupProps = false + } + } + + // Capture group properties + if (shouldCaptureGroupProps && groupTypesForProject) { + const groupType: string = event.properties['$group_type'] // e.g. "organization" + const groupTypeIndex = groupTypesForProject[groupType] + const groupProperties: Record | undefined = event.properties['$group_set'] // { name: 'value', id: 'id', foo: "bar" } + + for (const [property, value] of Object.entries(groupProperties ?? {})) { + if (!willFitInPostgres(property)) { + propDefDroppedCounter.inc({ type: 'group', reason: 'key_too_long' }) + continue + } + + const propType = getPropertyType(property, value) + if (!propType) { + propDefDroppedCounter.inc({ type: 'group', reason: 'missing_prop_type' }) + continue + } + + const propDefKey = `${groupType}:${property}` + if (!propertyDefinitions[propDefKey]) { + propertyDefinitions[propDefKey] = { + id: new UUIDT().toString(), + name: property, + is_numerical: propType === PropertyType.Numeric, + team_id: event.team_id, + project_id: event.team_id, // TODO: Add project_id + property_type: propType, + type: PropertyDefinitionTypeEnum.Group, + group_type_name: groupType, + group_type_index: groupTypeIndex, + } + } + } + } + + // Capture person properties + for (const [property, value] of Object.entries(event.person_properties ?? {})) { + if (!willFitInPostgres(property)) { + propDefDroppedCounter.inc({ type: 'person', reason: 'key_too_long' }) + continue + } + + const propDefKey = `person:${property}` + if (!propertyDefinitions[propDefKey]) { + const propType = getPropertyType(property, value) + if (propType) { + propertyDefinitions[propDefKey] = { + id: new UUIDT().toString(), + name: property, + is_numerical: propType === PropertyType.Numeric, + team_id: event.team_id, + project_id: event.team_id, // TODO: Add project_id + property_type: propType, + type: PropertyDefinitionTypeEnum.Person, + } + } + } + } + + // Capture event properties + for (const [property, value] of Object.entries(event.properties)) { + if (!willFitInPostgres(property)) { + propDefDroppedCounter.inc({ type: 'event', reason: 'key_too_long' }) + continue + } + + if (PROPERTY_DEFS_PROPERTIES_TO_SKIP.includes(property)) { + // We don't need to count these as it is expected that they will be dropped + continue + } + + const propDefKey = `event:${property}` + if (!propertyDefinitions[propDefKey]) { + const propType = getPropertyType(property, value) + if (propType) { + propertyDefinitions[propDefKey] = { + id: new UUIDT().toString(), + name: property, + is_numerical: propType === PropertyType.Numeric, + team_id: event.team_id, + project_id: event.team_id, // TODO: Add project_id + property_type: propType, + type: PropertyDefinitionTypeEnum.Event, + } + } + } + + const eventPropKey = `${event.event}:${property}` + if (!eventProperties[eventPropKey]) { + eventProperties[eventPropKey] = { + id: new UUIDT().toString(), + event: event.event, + property, + team_id: event.team_id, + project_id: event.team_id, // TODO: Add project_id + } + } + } + } + + return collected + } + + private parseKafkaBatch(messages: Message[]): Promise { + const events: ClickHouseEvent[] = [] + + messages.forEach((message) => { + try { + const clickHouseEvent = parseRawClickHouseEvent( + JSON.parse(message.value!.toString()) as RawClickHouseEvent + ) + + events.push(clickHouseEvent) + } catch (e) { + status.error('Error parsing message', e) + } + }) + + return Promise.resolve(events) + } + + private async startKafkaConsumer(options: { + topic: string + groupId: string + handleBatch: (messages: Message[]) => Promise + }): Promise { + this.batchConsumer = await startBatchConsumer({ + ...options, + connectionConfig: createRdConnectionConfigFromEnvVars(this.hub, 'consumer'), + autoCommit: true, + sessionTimeout: this.hub.KAFKA_CONSUMPTION_SESSION_TIMEOUT_MS, + maxPollIntervalMs: this.hub.KAFKA_CONSUMPTION_MAX_POLL_INTERVAL_MS, + consumerMaxBytes: this.hub.KAFKA_CONSUMPTION_MAX_BYTES, + consumerMaxBytesPerPartition: this.hub.KAFKA_CONSUMPTION_MAX_BYTES_PER_PARTITION, + consumerMaxWaitMs: this.hub.KAFKA_CONSUMPTION_MAX_WAIT_MS, + consumerErrorBackoffMs: this.hub.KAFKA_CONSUMPTION_ERROR_BACKOFF_MS, + fetchBatchSize: this.hub.INGESTION_BATCH_SIZE, + batchingTimeoutMs: this.hub.KAFKA_CONSUMPTION_BATCHING_TIMEOUT_MS, + topicCreationTimeoutMs: this.hub.KAFKA_TOPIC_CREATION_TIMEOUT_MS, + topicMetadataRefreshInterval: this.hub.KAFKA_TOPIC_METADATA_REFRESH_INTERVAL_MS, + eachBatch: async (messages, { heartbeat }) => { + status.info('πŸ”', `${this.name} - handling batch`, { + size: messages.length, + }) + + this.heartbeat = heartbeat + + // histogramKafkaBatchSize.observe(messages.length) + // histogramKafkaBatchSizeKb.observe(messages.reduce((acc, m) => (m.value?.length ?? 0) + acc, 0) / 1024) + + return await runInstrumentedFunction({ + statsKey: `propertyDefsConsumer.handleEachBatch`, + sendTimeoutGuardToSentry: false, + func: async () => { + await options.handleBatch(messages) + }, + }) + }, + callEachBatchWhenEmpty: false, + }) + + addSentryBreadcrumbsEventListeners(this.batchConsumer.consumer) + + this.batchConsumer.consumer.on('disconnected', async (err) => { + if (this.isStopping) { + return + } + // since we can't be guaranteed that the consumer will be stopped before some other code calls disconnect + // we need to listen to disconnect and make sure we're stopped + status.info('πŸ”', `${this.name} batch consumer disconnected, cleaning up`, { err }) + await this.stop() + }) + } +} diff --git a/plugin-server/src/property-defs/services/property-defs-db.ts b/plugin-server/src/property-defs/services/property-defs-db.ts new file mode 100644 index 0000000000000..68e017a43b989 --- /dev/null +++ b/plugin-server/src/property-defs/services/property-defs-db.ts @@ -0,0 +1,193 @@ +import { DateTime } from 'luxon' + +import { EventDefinitionType, EventPropertyType, Hub, PropertyDefinitionType } from '../../types' +import { PostgresUse } from '../../utils/db/postgres' +import { status } from '../../utils/status' + +export class PropertyDefsDB { + constructor(private hub: Hub) {} + + async writeEventProperty(eventProperty: EventPropertyType) { + await this.hub.postgres + .query( + PostgresUse.COMMON_WRITE, + `INSERT INTO posthog_eventproperty (event, property, team_id, project_id) + VALUES ($1, $2, $3, $4) + ON CONFLICT DO NOTHING + `, + [eventProperty.event, eventProperty.property, eventProperty.team_id, eventProperty.project_id], + 'upsertEventProperty' + ) + .catch((e) => { + status.error('πŸ”', `Error writing event property`, { eventProperty, error: e.message }) + throw e + }) + } + + async writeEventProperties(eventProperties: EventPropertyType[]) { + await this.hub.postgres + .query( + PostgresUse.COMMON_WRITE, + `INSERT INTO posthog_eventproperty (event, property, team_id, project_id) + VALUES (UNNEST($1::text[]), UNNEST($2::text[]), UNNEST($3::int[]), UNNEST($4::int[])) + ON CONFLICT DO NOTHING + `, + [ + eventProperties.map((ep) => ep.event), + eventProperties.map((ep) => ep.property), + eventProperties.map((ep) => ep.team_id), + eventProperties.map((ep) => ep.project_id), + ], + 'upsertEventPropertiesBatch' + ) + .catch((e) => { + status.error('πŸ”', `Error writing event properties batch`, { eventProperties, error: e.message }) + throw e + }) + } + + async writePropertyDefinition(propertyDefinition: PropertyDefinitionType) { + await this.hub.postgres + .query( + PostgresUse.COMMON_WRITE, + `INSERT INTO posthog_propertydefinition (id, name, type, group_type_index, is_numerical, team_id, project_id, property_type, volume_30_day, query_usage_30_day) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, NULL, NULL) + ON CONFLICT (coalesce(project_id, team_id::bigint), name, type, coalesce(group_type_index, -1)) + DO UPDATE SET property_type=EXCLUDED.property_type + WHERE posthog_propertydefinition.property_type IS NULL`, + [ + propertyDefinition.id, + propertyDefinition.name, + propertyDefinition.type, + propertyDefinition.group_type_index, + propertyDefinition.is_numerical, + propertyDefinition.team_id, + propertyDefinition.project_id, + propertyDefinition.property_type, + ], + 'upsertPropertyDefinition' + ) + .catch((e) => { + status.error('πŸ”', `Error writing property definitions batch`, { + propertyDefinition, + error: e.message, + }) + throw e + }) + } + + async writePropertyDefinitions(propertyDefinitions: PropertyDefinitionType[]) { + await this.hub.postgres + .query( + PostgresUse.COMMON_WRITE, + `INSERT INTO posthog_propertydefinition (id, name, type, group_type_index, is_numerical, team_id, project_id, property_type, volume_30_day, query_usage_30_day) + VALUES (UNNEST($1::uuid[]), UNNEST($2::text[]), UNNEST($3::smallint[]), UNNEST($4::int[]), UNNEST($5::boolean[]), UNNEST($6::int[]), UNNEST($7::int[]), UNNEST($8::text[]), NULL, NULL) + ON CONFLICT (coalesce(project_id, team_id::bigint), name, type, coalesce(group_type_index, -1)) + DO UPDATE SET property_type=EXCLUDED.property_type + WHERE posthog_propertydefinition.property_type IS NULL + `, + [ + propertyDefinitions.map((pd) => pd.id), + propertyDefinitions.map((pd) => pd.name), + propertyDefinitions.map((pd) => pd.type), + propertyDefinitions.map((pd) => pd.group_type_index), + propertyDefinitions.map((pd) => pd.is_numerical), + propertyDefinitions.map((pd) => pd.team_id), + propertyDefinitions.map((pd) => pd.project_id), + propertyDefinitions.map((pd) => pd.property_type), + ], + 'upsertPropertyDefinitionsBatch' + ) + .catch((e) => { + status.error('πŸ”', `Error writing property definitions batch`, { + propertyDefinitions, + error: e.message, + }) + throw e + }) + } + + async writeEventDefinition(eventDefinition: EventDefinitionType) { + await this.hub.postgres + .query( + PostgresUse.COMMON_WRITE, + `INSERT INTO posthog_eventdefinition (id, name, team_id, project_id, last_seen_at, created_at, volume_30_day, query_usage_30_day) + VALUES ($1, $2, $3, $4, $5, $6, NULL, NULL) + ON CONFLICT (coalesce(project_id, team_id::bigint), name) + DO UPDATE SET last_seen_at=EXCLUDED.last_seen_at WHERE posthog_eventdefinition.last_seen_at < EXCLUDED.last_seen_at + `, + [ + eventDefinition.id, + eventDefinition.name, + eventDefinition.team_id, + eventDefinition.project_id, + DateTime.now().toISO(), // TODO: Should this be the event timestamp? + DateTime.now().toISO(), + ], + 'upsertEventDefinition' + ) + .catch((e) => { + status.error('πŸ”', `Error writing event definition`, { eventDefinition, error: e.message }) + throw e + }) + } + + async writeEventDefinitions(eventDefinitions: EventDefinitionType[]) { + const now = DateTime.now().toISO() + await this.hub.postgres + .query( + PostgresUse.COMMON_WRITE, + `INSERT INTO posthog_eventdefinition (id, name, team_id, project_id, last_seen_at, created_at, volume_30_day, query_usage_30_day) + VALUES (UNNEST($1::uuid[]), UNNEST($2::text[]), UNNEST($3::int[]), UNNEST($4::int[]), UNNEST($5::timestamp[]), UNNEST($6::timestamp[]), NULL, NULL) + ON CONFLICT (coalesce(project_id, team_id::bigint), name) + DO UPDATE SET last_seen_at=EXCLUDED.last_seen_at WHERE posthog_eventdefinition.last_seen_at < EXCLUDED.last_seen_at + `, + [ + eventDefinitions.map((ed) => ed.id), + eventDefinitions.map((ed) => ed.name), + eventDefinitions.map((ed) => ed.team_id), + eventDefinitions.map((ed) => ed.project_id), + eventDefinitions.map(() => now), + eventDefinitions.map(() => now), + ], + 'upsertEventDefinitionsBatch' + ) + .catch((e) => { + status.error('πŸ”', `Error writing event definitions batch`, { eventDefinitions, error: e.message }) + throw e + }) + } + + async listPropertyDefinitions(teamId: number): Promise { + const result = await this.hub.postgres.query( + PostgresUse.COMMON_READ, + `SELECT * FROM posthog_propertydefinition WHERE team_id = $1`, + [teamId], + 'listPropertyDefinitions' + ) + + return result.rows + } + + async listEventDefinitions(teamId: number): Promise { + const result = await this.hub.postgres.query( + PostgresUse.COMMON_READ, + `SELECT * FROM posthog_eventdefinition WHERE team_id = $1`, + [teamId], + 'listEventDefinitions' + ) + + return result.rows + } + + async listEventProperties(teamId: number): Promise { + const result = await this.hub.postgres.query( + PostgresUse.COMMON_READ, + `SELECT * FROM posthog_eventproperty WHERE team_id = $1`, + [teamId], + 'listEventProperties' + ) + + return result.rows + } +} diff --git a/plugin-server/src/property-defs/services/property-defs-utils.test.ts b/plugin-server/src/property-defs/services/property-defs-utils.test.ts new file mode 100644 index 0000000000000..b671f5bc4d615 --- /dev/null +++ b/plugin-server/src/property-defs/services/property-defs-utils.test.ts @@ -0,0 +1,48 @@ +import { PropertyType } from '~/src/types' + +import { getPropertyType } from './property-defs-utils' + +describe('PropertyDefsUtils', () => { + describe('getPropertyType', () => { + const testCases: [string, any, PropertyType | null][] = [ + // Special key prefixes + ['utm_source', 'google', PropertyType.String], + ['utm_medium', 123, PropertyType.String], + ['$feature/my_flag', true, PropertyType.String], + ['$feature_flag_response', false, PropertyType.String], + ['$survey_response', 'yes', PropertyType.String], + ['$survey_response_2', 123, PropertyType.String], + + // String values + ['key', 'hello', PropertyType.String], + ['key', 'true', PropertyType.Boolean], + ['key', 'false', PropertyType.Boolean], + ['key', 'TRUE', PropertyType.Boolean], + ['key', 'FALSE', PropertyType.Boolean], + ['key', '2024-01-01T00:00:00Z', PropertyType.DateTime], + ['key', '2024-01-01T00:00:00+00:00', PropertyType.DateTime], + ['key', 'invalid-date', PropertyType.String], + + // Number values + ['key', 123, PropertyType.Numeric], + ['timestamp', 1234567890, PropertyType.Numeric], + ['TIME', 1234567890, PropertyType.Numeric], + ['key', 123.45, PropertyType.Numeric], + ['key', -123, PropertyType.Numeric], + ['key', 0, PropertyType.Numeric], + + // Boolean values + ['key', true, PropertyType.Boolean], + ['key', false, PropertyType.Boolean], + + // Edge cases + ['key', null, null], + ['key', undefined, null], + ] + it.each(testCases)('should derive the correct property type for %s: %s', (key, value, expected) => { + const result = getPropertyType(key, value) + + expect(result).toEqual(expected) + }) + }) +}) diff --git a/plugin-server/src/property-defs/services/property-defs-utils.ts b/plugin-server/src/property-defs/services/property-defs-utils.ts new file mode 100644 index 0000000000000..9224882449c6f --- /dev/null +++ b/plugin-server/src/property-defs/services/property-defs-utils.ts @@ -0,0 +1,115 @@ +import { DateTime } from 'luxon' + +import { PropertyType } from '../../types' + +// lifted from here: +// https://github.com/PostHog/posthog/blob/021aaab04b4acd96cf8121c033ac3b0042492598/rust/property-defs-rs/src/types.rs#L457-L461 +const DJANGO_MAX_CHARFIELD_LENGTH = 200 + +// These properties have special meaning, and are ignored +export const PROPERTY_DEFS_PROPERTIES_TO_SKIP: string[] = [ + '$set', + '$set_once', + '$unset', + '$group_0', + '$group_1', + '$group_2', + '$group_3', + '$group_4', + '$groups', +] + +export const PROPERTY_DEFS_DATE_PROP_KEYWORDS: string[] = [ + 'time', + 'timestamp', + 'date', + '_at', + '-at', + 'createdat', + 'updatedat', +] + +export function willFitInPostgres(s: string) { + return s.length < DJANGO_MAX_CHARFIELD_LENGTH +} + +export function sanitizeEventName(eventName: string) { + return eventName.replace('\u0000', '\uFFFD') +} + +export function sixMonthsAgoUnixSeconds() { + const now = new Date() + now.setMonth(now.getMonth() - 6) + return Math.floor(now.getTime() / 1000) +} + +const BOOLEAN_VALUES = ['true', 'false', 'TRUE', 'FALSE'] + +export const getPropertyType = (rawKey: string, value: any): PropertyType | null => { + const key = rawKey.trim().toLowerCase() + + // Special cases for certain property prefixes + if (key.startsWith('utm_')) { + // utm_ prefixed properties should always be detected as strings. + // Sometimes the first value sent looks like a number, even though + // subsequent values are not. + return PropertyType.String + } + if (key.startsWith('$feature/')) { + // $feature/ prefixed properties should always be detected as strings. + // These are feature flag values, and can be boolean or string. + // Sometimes the first value sent is boolean (because flag isn't enabled) while + // subsequent values are not. We don't want this to be misunderstood as a boolean. + return PropertyType.String + } + + if (key === '$feature_flag_response') { + // $feature_flag_response properties should always be detected as strings. + // These are feature flag values, and can be boolean or string. + // Sometimes the first value sent is boolean (because flag isn't enabled) while + // subsequent values are not. We don't want this to be misunderstood as a boolean. + return PropertyType.String + } + + if (key.startsWith('$survey_response')) { + // NB: $survey_responses are collected in an interesting way, where the first + // response is called `$survey_response` and subsequent responses are called + // `$survey_response_2`, `$survey_response_3`, etc. So, this check should auto-cast + // all survey responses to strings. + return PropertyType.String + } + + if (typeof value === 'string') { + const s = value.trim() + if (BOOLEAN_VALUES.includes(s)) { + return PropertyType.Boolean + } + // Try to parse this as an ISO 8601 date + try { + if (PROPERTY_DEFS_DATE_PROP_KEYWORDS.some((kw) => key.includes(kw))) { + return PropertyType.DateTime + } + const date = DateTime.fromISO(s) + if (date.isValid) { + return PropertyType.DateTime + } + // TODO(eli): add speculative date string matching? + } catch { + // Not a valid date, continue to string type + } + return PropertyType.String + } + + if (typeof value === 'boolean') { + return PropertyType.Boolean + } + + if (typeof value === 'number') { + if (value >= sixMonthsAgoUnixSeconds()) { + return PropertyType.DateTime + } + return PropertyType.Numeric + } + + return null +} diff --git a/plugin-server/src/server.ts b/plugin-server/src/server.ts index ea444e9a2a477..5cf345beacaec 100644 --- a/plugin-server/src/server.ts +++ b/plugin-server/src/server.ts @@ -28,6 +28,7 @@ import { import { SessionRecordingIngester } from './main/ingestion-queues/session-recording/session-recordings-consumer' import { DefaultBatchConsumerFactory } from './main/ingestion-queues/session-recording-v2/batch-consumer-factory' import { SessionRecordingIngester as SessionRecordingIngesterV2 } from './main/ingestion-queues/session-recording-v2/consumer' +import { PropertyDefsConsumer } from './property-defs/property-defs-consumer' import { setupCommonRoutes } from './router' import { Hub, PluginServerService, PluginsServerConfig } from './types' import { closeHub, createHub } from './utils/db/hub' @@ -235,6 +236,14 @@ export class PluginServer { }) } + if (capabilities.propertyDefs) { + serviceLoaders.push(async () => { + const consumer = new PropertyDefsConsumer(hub) + await consumer.start() + return consumer.service + }) + } + if (capabilities.cdpInternalEvents) { serviceLoaders.push(async () => { const consumer = new CdpInternalEventsConsumer(hub) diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index 0dc3ac4cb80f8..be06018137492 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -76,6 +76,7 @@ export enum KafkaSaslMechanism { export enum PluginServerMode { ingestion_v2 = 'ingestion-v2', + property_defs = 'property-defs', async_onevent = 'async-onevent', async_webhooks = 'async-webhooks', recordings_blob_ingestion = 'recordings-blob-ingestion', @@ -339,6 +340,11 @@ export interface PluginsServerConfig extends CdpConfig, IngestionConsumerConfig // Destination Migration Diffing DESTINATION_MIGRATION_DIFFING_ENABLED: boolean + + PROPERTY_DEFS_CONSUMER_GROUP_ID: string + PROPERTY_DEFS_CONSUMER_CONSUME_TOPIC: string + PROPERTY_DEFS_CONSUMER_ENABLED_TEAMS: string + PROPERTY_DEFS_WRITE_DISABLED: boolean } export interface Hub extends PluginsServerConfig { @@ -391,6 +397,7 @@ export interface PluginServerCapabilities { // and the shouldSetupPluginInServer() test accordingly. ingestionV2Combined?: boolean ingestionV2?: boolean + propertyDefs?: boolean processAsyncOnEventHandlers?: boolean processAsyncWebhooksHandlers?: boolean sessionRecordingBlobIngestion?: boolean @@ -710,6 +717,7 @@ export type PersonMode = 'full' | 'propertyless' | 'force_upgrade' /** Raw event row from ClickHouse. */ export interface RawClickHouseEvent extends BaseEvent { + project_id: ProjectId timestamp: ClickHouseTimestamp created_at: ClickHouseTimestamp properties?: string @@ -739,6 +747,7 @@ export interface RawKafkaEvent extends RawClickHouseEvent { /** Parsed event row from ClickHouse. */ export interface ClickHouseEvent extends BaseEvent { + project_id: ProjectId timestamp: DateTime created_at: DateTime properties: Record @@ -1125,7 +1134,6 @@ export interface EventDefinitionType { query_usage_30_day: number | null team_id: number project_id: number | null - last_seen_at: string // DateTime created_at: string // DateTime } @@ -1155,19 +1163,23 @@ export enum PropertyDefinitionTypeEnum { Event = 1, Person = 2, Group = 3, + Session = 4, } +export type ResolvedGroups = Record + export interface PropertyDefinitionType { id: string name: string is_numerical: boolean - volume_30_day: number | null - query_usage_30_day: number | null team_id: number project_id: number | null - property_type?: PropertyType + property_type: PropertyType | null type: PropertyDefinitionTypeEnum - group_type_index: number | null + group_type_name?: string + group_type_index?: number | null + volume_30_day?: number | null + query_usage_30_day?: number | null } export interface EventPropertyType { diff --git a/plugin-server/src/utils/db/postgres.ts b/plugin-server/src/utils/db/postgres.ts index b61440508e8b5..996f4aab998f5 100644 --- a/plugin-server/src/utils/db/postgres.ts +++ b/plugin-server/src/utils/db/postgres.ts @@ -87,28 +87,6 @@ export class PostgresRouter { } } - public async bulkInsert>( - usage: PostgresUse | TransactionClient, - // Should have {VALUES} as a placeholder - queryWithPlaceholder: string, - values: Array, - tag: string - ): Promise { - if (values.length === 0) { - return - } - - const valuesWithPlaceholders = values - .map((array, index) => { - const len = array.length - const valuesWithIndexes = array.map((_, subIndex) => `$${index * len + subIndex + 1}`) - return `(${valuesWithIndexes.join(', ')})` - }) - .join(', ') - - await this.query(usage, queryWithPlaceholder.replace('{VALUES}', valuesWithPlaceholders), values.flat(), tag) - } - public async transaction( usage: PostgresUse, tag: string, diff --git a/plugin-server/src/worker/ingestion/group-type-manager.ts b/plugin-server/src/worker/ingestion/group-type-manager.ts index f330be444e33f..6f871a3b45f80 100644 --- a/plugin-server/src/worker/ingestion/group-type-manager.ts +++ b/plugin-server/src/worker/ingestion/group-type-manager.ts @@ -8,42 +8,62 @@ import { TeamManager } from './team-manager' /** How many unique group types to allow per team */ export const MAX_GROUP_TYPES_PER_TEAM = 5 +export type GroupTypesByProjectId = Record + export class GroupTypeManager { private groupTypesCache: Map - private instanceSiteUrl: string constructor(private postgres: PostgresRouter, private teamManager: TeamManager, instanceSiteUrl?: string | null) { this.groupTypesCache = new Map() - this.instanceSiteUrl = instanceSiteUrl || 'unknown' } - public async fetchGroupTypes(projectId: ProjectId): Promise { - const cachedGroupTypes = getByAge(this.groupTypesCache, projectId) - if (cachedGroupTypes) { - return cachedGroupTypes + public async fetchGroupTypes(projectId: ProjectId): Promise { + const response = await this.fetchGroupTypesForProjects([projectId]) + return response[projectId] + } + + public async fetchGroupTypesForProjects(projectIds: ProjectId[] | Set): Promise { + const projectIdsSet = new Set(projectIds) + const projectIdsToLoad = new Set() + const response: GroupTypesByProjectId = {} + + for (const projectId of projectIdsSet) { + const cachedGroupTypes = getByAge(this.groupTypesCache, projectId) + + response[projectId] = cachedGroupTypes ?? {} + + if (!cachedGroupTypes) { + projectIdsToLoad.add(projectId) + } + } + + if (projectIdsToLoad.size === 0) { + return response } const timeout = timeoutGuard(`Still running "fetchGroupTypes". Timeout warning after 30 sec!`) try { const { rows } = await this.postgres.query( - PostgresUse.COMMON_WRITE, - `SELECT * FROM posthog_grouptypemapping WHERE project_id = $1`, - [projectId], + PostgresUse.COMMON_READ, + `SELECT * FROM posthog_grouptypemapping WHERE project_id = ANY($1)`, + [Array.from(projectIdsToLoad)], 'fetchGroupTypes' ) - const teamGroupTypes: GroupTypeToColumnIndex = {} - for (const row of rows) { - teamGroupTypes[row.group_type] = row.group_type_index + const groupTypes = (response[row.project_id] = response[row.project_id] ?? {}) + groupTypes[row.group_type] = row.group_type_index } - this.groupTypesCache.set(projectId, [teamGroupTypes, Date.now()]) - - return teamGroupTypes + for (const projectId of projectIdsToLoad) { + response[projectId] = response[projectId] ?? {} + this.groupTypesCache.set(projectId, [response[projectId], Date.now()]) + } } finally { clearTimeout(timeout) } + + return response } public async fetchGroupTypeIndex( @@ -51,28 +71,28 @@ export class GroupTypeManager { projectId: ProjectId, groupType: string ): Promise { - const groupTypes = await this.fetchGroupTypes(projectId) + const groupTypes = (await this.fetchGroupTypes(projectId)) ?? {} if (groupType in groupTypes) { return groupTypes[groupType] - } else { - const [groupTypeIndex, isInsert] = await this.insertGroupType( - teamId, - projectId, - groupType, - Object.keys(groupTypes).length - ) - if (groupTypeIndex !== null) { - this.groupTypesCache.delete(projectId) - } + } - if (isInsert && groupTypeIndex !== null) { - // TODO: Is the `group type ingested` event being valuable? If not, we can remove - // `captureGroupTypeInsert()`. If yes, we should move this capture to use the project instead of team - await this.captureGroupTypeInsert(teamId, groupType, groupTypeIndex) - } - return groupTypeIndex + const [groupTypeIndex, isInsert] = await this.insertGroupType( + teamId, + projectId, + groupType, + Object.keys(groupTypes).length + ) + if (groupTypeIndex !== null) { + this.groupTypesCache.delete(projectId) + } + + if (isInsert && groupTypeIndex !== null) { + // TODO: Is the `group type ingested` event being valuable? If not, we can remove + // `captureGroupTypeInsert()`. If yes, we should move this capture to use the project instead of team + await this.captureGroupTypeInsert(teamId, groupType, groupTypeIndex) } + return groupTypeIndex } public async insertGroupType( diff --git a/plugin-server/src/worker/ingestion/process-event.ts b/plugin-server/src/worker/ingestion/process-event.ts index ae145ec72095e..3aa6a932a11c0 100644 --- a/plugin-server/src/worker/ingestion/process-event.ts +++ b/plugin-server/src/worker/ingestion/process-event.ts @@ -59,7 +59,7 @@ export class EventsProcessor { this.clickhouse = pluginsServer.clickhouse this.kafkaProducer = pluginsServer.kafkaProducer this.teamManager = pluginsServer.teamManager - this.groupTypeManager = new GroupTypeManager(pluginsServer.postgres, this.teamManager, pluginsServer.SITE_URL) + this.groupTypeManager = new GroupTypeManager(pluginsServer.postgres, this.teamManager) this.groupAndFirstEventManager = new GroupAndFirstEventManager( this.teamManager, this.groupTypeManager, diff --git a/plugin-server/src/worker/ingestion/team-manager.ts b/plugin-server/src/worker/ingestion/team-manager.ts index b51e380dd506d..d8ec216a91819 100644 --- a/plugin-server/src/worker/ingestion/team-manager.ts +++ b/plugin-server/src/worker/ingestion/team-manager.ts @@ -64,7 +64,7 @@ export class TeamManager { * * Caching is added to reduce the load on Postgres, not to be resilient * to failures. If PG is unavailable and the cache expired, this function - * will trow and the lookup must be retried later. + * will throw and the lookup must be retried later. * * Returns null if the token is invalid. */ diff --git a/plugin-server/tests/helpers/snapshots.ts b/plugin-server/tests/helpers/snapshots.ts index 8ce3f2b9b9244..5d06ea09f3d8e 100644 --- a/plugin-server/tests/helpers/snapshots.ts +++ b/plugin-server/tests/helpers/snapshots.ts @@ -1,7 +1,7 @@ const UUID_REGEX = /[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}/gi /** - * Helper method that takes an object and replaces all UUIDs or given keyswith placeholders + * Helper method that takes an object and replaces all UUIDs or given keys with placeholders */ export const forSnapshot = ( obj: any,