Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Prop defs spike #29476

Draft
wants to merge 40 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
56cd79a
Fixes
benjackwhite Mar 4, 2025
67fc980
Fixes
benjackwhite Mar 4, 2025
c2adfbe
Fixes
benjackwhite Mar 4, 2025
4f2bb77
Fix the stuff
benjackwhite Mar 4, 2025
df25445
Fixes
benjackwhite Mar 4, 2025
890ac71
Fix
benjackwhite Mar 4, 2025
45f677b
Fix up
benjackwhite Mar 5, 2025
8683712
Fixes
benjackwhite Mar 4, 2025
e184550
Fixes
benjackwhite Mar 4, 2025
6ee1be6
Fixes
benjackwhite Mar 4, 2025
46a389b
Fix the stuff
benjackwhite Mar 4, 2025
856720a
Fixes
benjackwhite Mar 4, 2025
b031cee
Fix
benjackwhite Mar 4, 2025
bf05580
Fix up
benjackwhite Mar 5, 2025
312e150
WIP: start to flesh out the TS property defs service
eli-r-ph Mar 11, 2025
66cc1f9
Merge branch 'feat/node-prop-defs' of github.com:PostHog/posthog into…
benjackwhite Mar 11, 2025
deaaf8c
Fix
benjackwhite Mar 11, 2025
a802c80
Tidy DB calls
benjackwhite Mar 11, 2025
44a82b9
Add tracking of reason for skipping events
benjackwhite Mar 11, 2025
5e3c185
Refactor
benjackwhite Mar 11, 2025
16eb986
Fixes
benjackwhite Mar 11, 2025
0366019
Fixes
benjackwhite Mar 11, 2025
3c94c9b
Fixes
benjackwhite Mar 11, 2025
3784ae0
Fixes
benjackwhite Mar 11, 2025
5334923
fix
benjackwhite Mar 11, 2025
f93fdd4
Improve snapshotter
benjackwhite Mar 11, 2025
ae6e073
Fix snapshots
benjackwhite Mar 11, 2025
0c8f0b3
Merge branch 'fix/for-snapshot' into feat/node-prop-defs
benjackwhite Mar 11, 2025
da0aa7c
fixes
benjackwhite Mar 11, 2025
734098c
Fixes
benjackwhite Mar 11, 2025
6c1ad2b
Adds test cases and fixes groupidentify not setting it's own properties
benjackwhite Mar 11, 2025
3cc8fa7
Merge branch 'master' into feat/node-prop-defs
benjackwhite Mar 11, 2025
1ae00b9
WIP: pre-collect and filter for known teams and resolved group indices
eli-r-ph Mar 11, 2025
572513b
instrument missing team ID or unresolved group index
eli-r-ph Mar 11, 2025
722336b
refactor team ID and group resolution to happen prior to event prop e…
eli-r-ph Mar 11, 2025
6b0b05e
Fixes
benjackwhite Mar 12, 2025
6a44681
add batch upsert methods to prop defs DB module
eli-r-ph Mar 12, 2025
7cbadf8
move team and group index resolution to Managers; batch team + group …
eli-r-ph Mar 13, 2025
97dcb1d
Fixes
benjackwhite Mar 13, 2025
a7a57dc
Fix up group type manager
benjackwhite Mar 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions plugin-server/src/capabilities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin
case null:
return {
mmdb: true,
propertyDefs: true,
ingestionV2Combined: true,
processAsyncOnEventHandlers: true,
processAsyncWebhooksHandlers: true,
Expand All @@ -32,6 +33,10 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin
mmdb: true,
ingestionV2: true,
}
case PluginServerMode.property_defs:
return {
propertyDefs: true,
}
case PluginServerMode.recordings_blob_ingestion:
return {
sessionRecordingBlobIngestion: true,
Expand Down
4 changes: 4 additions & 0 deletions plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,10 @@ export function getDefaultConfig(): PluginsServerConfig {
INGESTION_CONSUMER_OVERFLOW_TOPIC: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW,
INGESTION_CONSUMER_DLQ_TOPIC: KAFKA_EVENTS_PLUGIN_INGESTION_DLQ,

// PropertyDefsConsumer config
PROPERTY_DEFS_CONSUMER_GROUP_ID: 'property-defs-consumer',
PROPERTY_DEFS_CONSUMER_CONSUME_TOPIC: KAFKA_EVENTS_JSON,

// Session recording V2
SESSION_RECORDING_MAX_BATCH_SIZE_KB: 100 * 1024, // 100MB
SESSION_RECORDING_MAX_BATCH_AGE_MS: 10 * 1000, // 10 seconds
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Jest Snapshot v1, https://goo.gl/fbAQLP

exports[`PropertyDefsConsumer property updates should only write the first seen property defs to the DB 1`] = `
[
{
"group_type_index": null,
"id": "<REPLACED-UUID-0>",
"is_numerical": false,
"name": "url",
"project_id": "2",
"property_type": "String",
"property_type_format": null,
"query_usage_30_day": null,
"team_id": 2,
"type": 1,
"volume_30_day": null,
},
]
`;

exports[`PropertyDefsConsumer property updates should write simple property defs to the DB 1`] = `
[
{
"created_at": "2025-01-01T00:00:00.000Z",
"id": "<REPLACED-UUID-0>",
"last_seen_at": "2025-01-01T00:00:00.000Z",
"name": "$pageview",
"project_id": "2",
"query_usage_30_day": null,
"team_id": 2,
"volume_30_day": null,
},
]
`;

exports[`PropertyDefsConsumer property updates should write simple property defs to the DB 2`] = `
[
{
"event": "$pageview",
"id": "<REPLACED_NUMBER>",
"project_id": "2",
"property": "url",
"team_id": 2,
},
]
`;

exports[`PropertyDefsConsumer property updates should write simple property defs to the DB 3`] = `
[
{
"group_type_index": null,
"id": "<REPLACED-UUID-0>",
"is_numerical": false,
"name": "url",
"project_id": "2",
"property_type": "String",
"property_type_format": null,
"query_usage_30_day": null,
"team_id": 2,
"type": 1,
"volume_30_day": null,
},
]
`;
215 changes: 215 additions & 0 deletions plugin-server/src/property-defs/property-defs-consumer.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
import { DateTime } from 'luxon'
import { Message } from 'node-rdkafka'

import { mockProducer } from '~/tests/helpers/mocks/producer.mock'
import { forSnapshot } from '~/tests/helpers/snapshots'
import { getFirstTeam, resetTestDatabase } from '~/tests/helpers/sql'

import { insertHogFunction as _insertHogFunction } from '../cdp/_tests/fixtures'
import { ClickHouseEvent, Hub, RawClickHouseEvent, Team, TimestampFormat } from '../types'
import { closeHub, createHub } from '../utils/db/hub'
import { castTimestampOrNow } from '../utils/utils'
import { PropertyDefsConsumer } from './property-defs-consumer'
import { PropertyDefsDB } from './services/property-defs-db'

const DEFAULT_TEST_TIMEOUT = 5000
jest.setTimeout(DEFAULT_TEST_TIMEOUT)

const mockConsumer = {
on: jest.fn(),
commitSync: jest.fn(),
commit: jest.fn(),
queryWatermarkOffsets: jest.fn(),
committed: jest.fn(),
assignments: jest.fn(),
isConnected: jest.fn(() => true),
getMetadata: jest.fn(),
}

jest.mock('../../src/kafka/batch-consumer', () => {
return {
startBatchConsumer: jest.fn(() =>
Promise.resolve({
join: () => ({
finally: jest.fn(),
}),
stop: jest.fn(),
consumer: mockConsumer,
})
),
}
})

let offsetIncrementer = 0

const createRawClickHouseEvent = (event: ClickHouseEvent): RawClickHouseEvent => {
// Inverts the parsing for simpler tests
return {
...event,
properties: JSON.stringify(event.properties),
timestamp: castTimestampOrNow(event.timestamp ?? null, TimestampFormat.ClickHouse),
created_at: castTimestampOrNow(event.created_at ?? null, TimestampFormat.ClickHouse),
elements_chain: JSON.stringify(event.elements_chain),
person_created_at: castTimestampOrNow(event.person_created_at ?? null, TimestampFormat.ClickHouse),
person_properties: JSON.stringify(event.person_properties),
group0_created_at: castTimestampOrNow(event.group0_created_at ?? null, TimestampFormat.ClickHouse),
group0_properties: event.group0_properties ? JSON.stringify(event.group0_properties) : undefined,
group1_created_at: castTimestampOrNow(event.group1_created_at ?? null, TimestampFormat.ClickHouse),
group1_properties: event.group1_properties ? JSON.stringify(event.group1_properties) : undefined,
group2_created_at: castTimestampOrNow(event.group2_created_at ?? null, TimestampFormat.ClickHouse),
group2_properties: event.group2_properties ? JSON.stringify(event.group2_properties) : undefined,
group3_created_at: castTimestampOrNow(event.group3_created_at ?? null, TimestampFormat.ClickHouse),
group3_properties: event.group3_properties ? JSON.stringify(event.group3_properties) : undefined,
group4_created_at: castTimestampOrNow(event.group4_created_at ?? null, TimestampFormat.ClickHouse),
group4_properties: event.group4_properties ? JSON.stringify(event.group4_properties) : undefined,
}
}

const createClickHouseEvent = (event: Partial<ClickHouseEvent> = {}): ClickHouseEvent => {
return {
uuid: event.uuid ?? '123',
event: event.event ?? '$pageview',
team_id: event.team_id ?? 1,
distinct_id: event.distinct_id ?? 'distinct_id_1',
/** Person UUID. */
person_id: event.person_id ?? undefined,

timestamp: DateTime.now(),
created_at: DateTime.now(),
properties: event.properties ?? {},
elements_chain: event.elements_chain ?? null,
person_created_at: event.person_created_at ?? null,
person_properties: event.person_properties ?? {},
group0_properties: event.group0_properties ?? {},
group1_properties: event.group1_properties ?? {},
group2_properties: event.group2_properties ?? {},
group3_properties: event.group3_properties ?? {},
group4_properties: event.group4_properties ?? {},
group0_created_at: event.group0_created_at ?? null,
group1_created_at: event.group1_created_at ?? null,
group2_created_at: event.group2_created_at ?? null,
group3_created_at: event.group3_created_at ?? null,
group4_created_at: event.group4_created_at ?? null,
person_mode: event.person_mode ?? 'full',
}
}

const createKafkaMessages: (events: ClickHouseEvent[]) => Message[] = (events) => {
return events.map((event) => {
// TRICKY: This is the slightly different format that capture sends
return {
value: Buffer.from(JSON.stringify(createRawClickHouseEvent(event))),
size: 1,
topic: 'test',
offset: offsetIncrementer++,
timestamp: DateTime.now().toMillis(),
partition: 1,
}
})
}

/**
* TEST CASES TO COVER:
* - $groupidentify
* - Should create group properties from the $group_set property
* - Should create property properties from its own event properties
* - Should limit to the max number of groups using the group types
* - batching
* - Should only write once per unique constraint (team_id, event, property etc)
*/

describe('PropertyDefsConsumer', () => {
let ingester: PropertyDefsConsumer
let hub: Hub
let fixedTime: DateTime
let team: Team
let propertyDefsDB: PropertyDefsDB

beforeEach(async () => {
fixedTime = DateTime.fromObject({ year: 2025, month: 1, day: 1 }, { zone: 'UTC' })
jest.spyOn(Date, 'now').mockReturnValue(fixedTime.toMillis())

offsetIncrementer = 0
await resetTestDatabase()
hub = await createHub()
team = await getFirstTeam(hub)
ingester = new PropertyDefsConsumer(hub)

hub.kafkaProducer = mockProducer
propertyDefsDB = ingester['propertyDefsDB']

jest.spyOn(propertyDefsDB, 'writeEventDefinition')
jest.spyOn(propertyDefsDB, 'writePropertyDefinition')
jest.spyOn(propertyDefsDB, 'writeEventProperty')
})

afterEach(async () => {
jest.restoreAllMocks()
await closeHub(hub)
})

afterAll(() => {
jest.useRealTimers()
})

describe('property updates', () => {
it('should write simple property defs to the DB', async () => {
await ingester.handleKafkaBatch(
createKafkaMessages([
createClickHouseEvent({
team_id: team.id,
properties: {
url: 'http://example.com',
},
}),
])
)

expect(propertyDefsDB.writeEventDefinition).toHaveBeenCalledTimes(1)
expect(propertyDefsDB.writePropertyDefinition).toHaveBeenCalledTimes(1)
expect(propertyDefsDB.writeEventProperty).toHaveBeenCalledTimes(1)

expect(forSnapshot(await propertyDefsDB.listEventDefinitions(team.id))).toMatchSnapshot()

expect(
forSnapshot(await propertyDefsDB.listEventProperties(team.id), {
overrides: { id: '<REPLACED_NUMBER>' },
})
).toMatchSnapshot()

expect(forSnapshot(await propertyDefsDB.listPropertyDefinitions(team.id))).toMatchSnapshot()
})

it('should only write the first seen property defs to the DB', async () => {
await ingester.handleKafkaBatch(
createKafkaMessages([
createClickHouseEvent({
team_id: team.id,
properties: {
url: 'http://example.com',
},
}),
createClickHouseEvent({
team_id: team.id,
properties: {
url: 2,
},
}),
createClickHouseEvent({
team_id: team.id,
properties: {
url: 5,
},
}),
])
)

expect(propertyDefsDB.writeEventDefinition).toHaveBeenCalledTimes(1)
expect(propertyDefsDB.writePropertyDefinition).toHaveBeenCalledTimes(1)
expect(propertyDefsDB.writeEventProperty).toHaveBeenCalledTimes(1)

// Snapshot shows a String type as it was the first seen value
expect(forSnapshot(await propertyDefsDB.listPropertyDefinitions(team.id))).toMatchSnapshot()
})
})
})
Loading
Loading