Skip to content

Commit 312e150

Browse files
committed
WIP: start to flesh out the TS property defs service
1 parent bf05580 commit 312e150

File tree

4 files changed

+188
-24
lines changed

4 files changed

+188
-24
lines changed

plugin-server/src/property-defs/property-defs-consumer.test.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { DateTime } from 'luxon'
22
import { Message } from 'node-rdkafka'
33

4-
import { insertHogFunction as _insertHogFunction } from '~/tests/cdp/fixtures'
4+
import { insertHogFunction as _insertHogFunction } from '../cdp/_tests/fixtures'
55
import { mockProducer } from '~/tests/helpers/mocks/producer.mock'
66
import { resetTestDatabase } from '~/tests/helpers/sql'
77

plugin-server/src/property-defs/property-defs-consumer.ts

+176-22
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,13 @@ import { BatchConsumer, startBatchConsumer } from '../kafka/batch-consumer'
66
import { createRdConnectionConfigFromEnvVars } from '../kafka/config'
77
import { addSentryBreadcrumbsEventListeners } from '../main/ingestion-queues/kafka-metrics'
88
import { runInstrumentedFunction } from '../main/utils'
9+
import { PostgresRouter } from '../utils/db/postgres'
910
import {
1011
ClickHouseEvent,
12+
EventDefinitionType,
1113
EventPropertyType,
1214
Hub,
15+
PluginsServerConfig,
1316
PluginServerService,
1417
PropertyDefinitionType,
1518
PropertyDefinitionTypeEnum,
@@ -18,21 +21,41 @@ import {
1821
} from '../types'
1922
import { parseRawClickHouseEvent } from '../utils/event'
2023
import { status } from '../utils/status'
24+
import { castTimestampToClickhouseFormat } from '../utils/utils'
2125

2226
// Must require as `tsc` strips unused `import` statements and just requiring this seems to init some globals
2327
require('@sentry/tracing')
2428

29+
// TODO(eli): wire up LOTS more metrics ASAP!
30+
2531
export const propertyDefTypesCounter = new Counter({
2632
name: 'property_defs_types_total',
2733
help: 'Count of derived property types.',
2834
labelNames: ['type'],
2935
})
3036

37+
export const eventDefTypesCounter = new Counter({
38+
name: 'event_defs_types_total',
39+
help: 'Count of new event definitions.',
40+
})
41+
42+
export const eventPropTypesCounter = new Counter({
43+
name: 'event_props_types_total',
44+
help: 'Count of derived event properties.',
45+
})
46+
3147
export type CollectedPropertyDefinitions = {
48+
teamIdsInBatch: Set<number>
49+
teamIdsWithGroupUpdatesInBatch: Set<number>
50+
eventDefinitionsById: Record<string, EventDefinitionType>
3251
propertyDefinitionsById: Record<string, PropertyDefinitionType>
33-
eventPropertiesByEventById: Record<string, EventPropertyType>
52+
eventPropertiesById: Record<string, EventPropertyType>
3453
}
3554

55+
// lifted from here:
56+
// https://github.com/PostHog/posthog/blob/021aaab04b4acd96cf8121c033ac3b0042492598/rust/property-defs-rs/src/types.rs#L457-L461
57+
const DJANGO_MAX_CHARFIELD_LENGTH = 200
58+
3659
// These properties have special meaning, and are ignored
3760
const SKIP_PROPERTIES: string[] = [
3861
'$set',
@@ -46,7 +69,56 @@ const SKIP_PROPERTIES: string[] = [
4669
'$groups',
4770
]
4871

49-
export const getPropertyType = (key: string, value: any): PropertyType | null => {
72+
const DATE_PROP_KEYWORDS: string[] = [
73+
"time",
74+
"timestamp",
75+
"date",
76+
"_at",
77+
"-at",
78+
"createdat",
79+
"updatedat",
80+
]
81+
82+
//
83+
// SQL queries
84+
//
85+
86+
const WRITE_EVENT_PROPERTY = `
87+
INSERT INTO posthog_eventproperty (event, property, team_id, project_id)
88+
VALUES ($1, $2, $3, $4)
89+
ON CONFLICT DO NOTHING
90+
`
91+
92+
const WRITE_PROPERTY_DEFINITION = `
93+
INSERT INTO posthog_propertydefinition (id, name, type, group_type_index, is_numerical, volume_30_day, query_usage_30_day, team_id, project_id, property_type)
94+
VALUES ($1, $2, $3, $4, $5, NULL, NULL, $6, $7, $8)
95+
ON CONFLICT (coalesce(project_id, team_id::bigint), name, type, coalesce(group_type_index, -1))
96+
DO UPDATE SET property_type=EXCLUDED.property_type WHERE posthog_propertydefinition.property_type IS NULL
97+
`
98+
99+
const WRITE_EVENT_DEFINITION = `
100+
INSERT INTO posthog_eventdefinition (id, name, volume_30_day, query_usage_30_day, team_id, project_id, last_seen_at, created_at)
101+
VALUES ($1, $2, NULL, NULL, $3, $4, $5, NOW())
102+
ON CONFLICT (coalesce(project_id, team_id::bigint), name)
103+
DO UPDATE SET last_seen_at = $5
104+
`
105+
106+
// TODO(eli): TBD - replace VALUES with array of integer team IDs, maybe something like this?
107+
// https://github.com/PostHog/posthog/blob/master/plugin-server/src/utils/db/postgres.ts#L90-L110
108+
const FETCH_TEAM_IDS = `
109+
SELECT id AS team_id FROM posthog_team WHERE id = ANY (ARRAY[{VALUES}])
110+
`
111+
112+
// TODO(eli): same here...
113+
const FETCH_GROUP_TYPES_BY_TEAM_IDS = `
114+
SELECT pt.id AS team_id, pgtm.group_type, pgtm.group_type_index FROM posthog_team AS pt
115+
JOIN posthog_grouptypemapping AS pgtm ON pt.id = pgtm.team_id
116+
WHERE pt.id = ANY (ARRAY[{VALUES}])
117+
`
118+
119+
export const getPropertyType = (rawKey: string, value: any): PropertyType | null => {
120+
const key = rawKey.trim().toLowerCase()
121+
50122
// Special cases for certain property prefixes
51123
if (key.startsWith('utm_')) {
52124
// utm_ prefixed properties should always be detected as strings.
@@ -80,36 +152,54 @@ export const getPropertyType = (key: string, value: any): PropertyType | null =>
80152

81153
if (typeof value === 'string') {
82154
const s = value.trim()
83-
if (s === 'true' || s === 'false' || s === 'TRUE' || s === 'FALSE') {
155+
if (s === 'true' || s === 'false') {
84156
return PropertyType.Boolean
85157
}
86158
// Try to parse this as an ISO 8601 date
87159
try {
160+
if (DATE_PROP_KEYWORDS.some(kw => key.includes(kw))) {
161+
return PropertyType.DateTime
162+
}
88163
const date = DateTime.fromISO(s)
89164
if (date.isValid) {
90165
return PropertyType.DateTime
91166
}
167+
// TODO(eli): add speculative date string matching?
168+
92169
} catch {
93170
// Not a valid date, continue to string type
94171
}
95172
return PropertyType.String
96173
}
97174

175+
if (typeof value === 'boolean') {
176+
return PropertyType.Boolean
177+
}
178+
98179
if (typeof value === 'number') {
99-
// Check if the key contains timestamp-related keywords
100-
if (key.includes('timestamp') || key.includes('TIMESTAMP') || key.includes('time') || key.includes('TIME')) {
180+
if (value >= sixMonthsAgoUnixSeconds()) {
101181
return PropertyType.DateTime
102182
}
103183
return PropertyType.Numeric
104184
}
105185

106-
if (typeof value === 'boolean') {
107-
return PropertyType.Boolean
108-
}
109-
110186
return null
111187
}
112188

189+
function willFitInPostgres(s: string) {
190+
return s.length < DJANGO_MAX_CHARFIELD_LENGTH
191+
}
192+
193+
function sanitizeEventName(eventName: string) {
194+
return eventName.replace('\u0000', '\uFFFD');
195+
}
196+
197+
function sixMonthsAgoUnixSeconds() {
198+
const now = new Date();
199+
now.setMonth(now.getMonth() - 6);
200+
return Math.floor(now.getTime() / 1000);
201+
}
202+
113203
/**
114204
* NOTE: This is currently experimental and only used to do some testing on performance and comparisons.
115205
*/
@@ -119,14 +209,18 @@ export class PropertyDefsConsumer {
119209
protected topic: string
120210

121211
batchConsumer?: BatchConsumer
212+
db: PostgresRouter
213+
config: PluginsServerConfig
122214
isStopping = false
123215
protected heartbeat = () => {}
124216
protected promises: Set<Promise<any>> = new Set()
125217

126-
constructor(private hub: Hub) {
218+
constructor(private hub: Hub, config: PluginsServerConfig) {
127219
// The group and topic are configurable allowing for multiple ingestion consumers to be run in parallel
128220
this.groupId = hub.PROPERTY_DEFS_CONSUMER_GROUP_ID
129221
this.topic = hub.PROPERTY_DEFS_CONSUMER_CONSUME_TOPIC
222+
this.config = config,
223+
this.db = hub?.postgres ?? new PostgresRouter(this.config)
130224
}
131225

132226
public get service(): PluginServerService {
@@ -175,34 +269,89 @@ export class PropertyDefsConsumer {
175269
public async handleKafkaBatch(messages: Message[]) {
176270
const parsedMessages = await this.runInstrumented('parseKafkaMessages', () => this.parseKafkaBatch(messages))
177271
const collected = await this.runInstrumented('derivePropDefs', () =>
178-
Promise.resolve(this.derivePropDefs(parsedMessages))
272+
Promise.resolve(this.extractPropertyDefinitions(parsedMessages))
179273
)
180274

275+
for (const eventDef of Object.values(collected.eventDefinitionsById)) {
276+
eventDefTypesCounter.inc()
277+
console.log(eventDef) // TODO(eli): temp: make linter happy
278+
// TODO(eli): write it!
279+
}
280+
181281
for (const propDef of Object.values(collected.propertyDefinitionsById)) {
182282
propertyDefTypesCounter.inc({ type: propDef.property_type ?? 'null' })
283+
// TODO(eli): write it!
183284
}
184285

185-
// TODO: Get all the related property defs from the DB and compare what we would have written for all those that don't exist
186-
// TODO: Write prop defs to DB
286+
for (const eventProp of Object.values(collected.eventPropertiesById)) {
287+
eventPropTypesCounter.inc()
288+
console.log(eventProp) // TODO(eli): temp: make linter happy
289+
// TODO(eli): write it!
290+
}
187291

188292
status.debug('🔁', `Waiting for promises`, { promises: this.promises.size })
189293
await this.runInstrumented('awaitScheduledWork', () => Promise.all(this.promises))
190294
status.debug('🔁', `Processed batch`)
191295
}
192296

193-
private derivePropDefs(events: ClickHouseEvent[]): CollectedPropertyDefinitions {
297+
private extractPropertyDefinitions(events: ClickHouseEvent[]): CollectedPropertyDefinitions {
194298
const collected: CollectedPropertyDefinitions = {
299+
// TODO(eli): look these up in batches as pre-write step
300+
teamIdsInBatch: new Set<number>,
301+
// TODO(eli): look these up in batches to resolve group types as pre-write step
302+
teamIdsWithGroupUpdatesInBatch: new Set<number>,
303+
// deduped from batch, written to posthog_eventdefinition
304+
eventDefinitionsById: {},
305+
// deduped from batch, written to posthog_propertydefinition
195306
propertyDefinitionsById: {},
196-
eventPropertiesByEventById: {},
307+
// deduped from batch, written to posthog_eventproperty
308+
eventPropertiesById: {},
197309
}
198310

199311
for (const event of events) {
312+
// these will be looked up later to trim write batches if team doesn't exist
313+
if (!collected.teamIdsInBatch.has(event.team_id)) {
314+
collected.teamIdsInBatch.add(event.team_id);
315+
}
316+
317+
event.event = sanitizeEventName(event.event)
318+
319+
if (!willFitInPostgres(event.event)) {
320+
continue
321+
}
322+
323+
const eventDefIdKey: string = `${event.team_id}:${event.event}`
324+
325+
if (!collected.eventDefinitionsById[eventDefIdKey]) {
326+
collected.eventDefinitionsById[eventDefIdKey] = {
327+
id: eventDefIdKey,
328+
name: event.event,
329+
team_id: event.team_id,
330+
project_id: event.team_id, // TODO: add project_id
331+
created_at: event.created_at.toISO() || DateTime.now().toString(),
332+
volume_30_day: 0, // deprecated
333+
query_usage_30_day: 0, // deprecated
334+
}
335+
}
336+
200337
// Detect group identify events
201338
if (event.event === '$groupidentify') {
339+
if (!collected.teamIdsWithGroupUpdatesInBatch.has(event.team_id)) {
340+
collected.teamIdsWithGroupUpdatesInBatch.add(event.team_id);
341+
}
342+
343+
// bail on this event if there's no group type assigned
202344
const groupType: string | undefined = event.properties['$group_type'] // e.g. "organization"
203-
const groupProperties: Record<string, any> | undefined = event.properties['$group_set'] // { name: 'value', id: 'id', foo: "bar" }
345+
if (typeof groupType === 'undefined') {
346+
continue
347+
}
204348

349+
const groupProperties: Record<string, any> | undefined = event.properties['$group_set'] // { name: 'value', id: 'id', foo: "bar" }
205350
for (const [property, value] of Object.entries(groupProperties ?? {})) {
351+
if (!willFitInPostgres(property)) {
352+
continue
353+
}
354+
206355
const propDefId = `${event.team_id}:${groupType}:${property}`
207356

208357
if (collected.propertyDefinitionsById[propDefId]) {
@@ -219,7 +368,8 @@ export class PropertyDefsConsumer {
219368
project_id: event.team_id, // TODO: Add project_id
220369
property_type: propType,
221370
type: PropertyDefinitionTypeEnum.Event,
222-
group_type_index: 0, // TODO: This!
371+
group_type_name: groupType,
372+
group_type_index: 0, // TODO(eli): resolve these w/DB query on team_id using "groupType"
223373
}
224374
}
225375
}
@@ -229,6 +379,10 @@ export class PropertyDefsConsumer {
229379

230380
// Detect person properties
231381
for (const [property, value] of Object.entries(event.person_properties ?? {})) {
382+
if (!willFitInPostgres(property)) {
383+
continue
384+
}
385+
232386
const propDefPersonId = `${event.team_id}:person:${property}`
233387

234388
if (!collected.propertyDefinitionsById[propDefPersonId]) {
@@ -249,7 +403,7 @@ export class PropertyDefsConsumer {
249403

250404
// Detect event properties
251405
for (const [property, value] of Object.entries(event.properties)) {
252-
if (SKIP_PROPERTIES.includes(property)) {
406+
if (!willFitInPostgres(property) || SKIP_PROPERTIES.includes(property)) {
253407
continue
254408
}
255409

@@ -270,11 +424,11 @@ export class PropertyDefsConsumer {
270424
}
271425
}
272426

273-
const eventDefId = `${event.team_id}:${event.event}:${property}`
427+
const eventPropId = `${event.team_id}:${event.event}:${property}`
274428

275-
if (!collected.eventPropertiesByEventById[eventDefId]) {
276-
collected.eventPropertiesByEventById[eventDefId] = {
277-
id: eventDefId,
429+
if (!collected.eventPropertiesById[eventPropId]) {
430+
collected.eventPropertiesById[eventPropId] = {
431+
id: eventPropId,
278432
event: event.event,
279433
property,
280434
team_id: event.team_id,

plugin-server/src/server.ts

+10
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import { delay } from './utils/utils'
4242
import { teardownPlugins } from './worker/plugins/teardown'
4343
import { initPlugins as _initPlugins, reloadPlugins } from './worker/tasks'
4444
import { populatePluginCapabilities } from './worker/vm/lazy'
45+
import { PropertyDefsConsumer } from './property-defs/property-defs-consumer'
4546

4647
CompressionCodecs[CompressionTypes.Snappy] = SnappyCodec
4748
CompressionCodecs[CompressionTypes.LZ4] = new LZ4().codec
@@ -235,6 +236,15 @@ export class PluginServer {
235236
})
236237
}
237238

239+
// TODO(eli): come back to this...
240+
if (capabilities.propertyDefs) {
241+
serviceLoaders.push(async () => {
242+
const consumer = new PropertyDefsConsumer(hub, this.config)
243+
await consumer.start()
244+
return consumer.service
245+
})
246+
}
247+
238248
if (capabilities.cdpInternalEvents) {
239249
serviceLoaders.push(async () => {
240250
const consumer = new CdpInternalEventsConsumer(hub)

plugin-server/src/types.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -1130,7 +1130,6 @@ export interface EventDefinitionType {
11301130
query_usage_30_day: number | null
11311131
team_id: number
11321132
project_id: number | null
1133-
last_seen_at: string // DateTime
11341133
created_at: string // DateTime
11351134
}
11361135

@@ -1171,6 +1170,7 @@ export interface PropertyDefinitionType {
11711170
project_id: number | null
11721171
property_type: PropertyType | null
11731172
type: PropertyDefinitionTypeEnum
1173+
group_type_name?: string,
11741174
group_type_index?: number | null
11751175
volume_30_day?: number | null
11761176
query_usage_30_day?: number | null

0 commit comments

Comments
 (0)