Skip to content

Commit 7cbadf8

Browse files
committed
move team and group index resolution to Managers; batch team + group fetches; batch all event and prop writes
1 parent 6a44681 commit 7cbadf8

File tree

5 files changed

+191
-101
lines changed

5 files changed

+191
-101
lines changed

plugin-server/src/property-defs/property-defs-consumer.ts

+44-63
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,20 @@ import {
1010
ClickHouseEvent,
1111
EventDefinitionType,
1212
EventPropertyType,
13+
GroupTypeToColumnIndex,
1314
Hub,
1415
PluginServerService,
1516
PropertyDefinitionType,
1617
PropertyDefinitionTypeEnum,
1718
PropertyType,
1819
RawClickHouseEvent,
19-
ResolvedGroups,
20+
TeamId,
2021
} from '../types'
2122
import { parseRawClickHouseEvent } from '../utils/event'
2223
import { status } from '../utils/status'
2324
import { UUIDT } from '../utils/utils'
25+
import { GroupTypeManager } from '../worker/ingestion/group-type-manager'
26+
import { TeamManager } from '../worker/ingestion/team-manager'
2427
import { PropertyDefsDB } from './services/property-defs-db'
2528
import {
2629
getPropertyType,
@@ -32,6 +35,8 @@ import {
3235
// Must require as `tsc` strips unused `import` statements and just requiring this seems to init some globals
3336
require('@sentry/tracing')
3437

38+
const BATCH_SIZE = 100
39+
3540
// TODO(eli): wire up LOTS more metrics ASAP!
3641

3742
const propertyDefTypesCounter = new Counter({
@@ -59,8 +64,8 @@ const propDefDroppedCounter = new Counter({
5964
export type CollectedPropertyDefinitions = {
6065
// looked up prior to event/prop extraction
6166
knownTeamIds: Set<number>
62-
// looked up prior to event/prop extraction
63-
resolvedTeamGroups: Record<number, ResolvedGroups>
67+
// looked up prior to event/prop extraction. map of project_id => group_type => group_index
68+
resolvedTeamGroups: Record<number, GroupTypeToColumnIndex>
6469
// known team ID => resolved group_type & group_type_index
6570
eventDefinitionsById: Record<number, Record<string, EventDefinitionType>>
6671
// known team ID => deduped properties
@@ -79,6 +84,8 @@ export class PropertyDefsConsumer {
7984

8085
private batchConsumer?: BatchConsumer
8186
private propertyDefsDB: PropertyDefsDB
87+
private teamManager: TeamManager
88+
private groupTypeManager: GroupTypeManager
8289
private isStopping = false
8390
protected heartbeat = () => {}
8491
protected promises: Set<Promise<any>> = new Set()
@@ -87,6 +94,8 @@ export class PropertyDefsConsumer {
8794
this.groupId = hub.PROPERTY_DEFS_CONSUMER_GROUP_ID
8895
this.topic = hub.PROPERTY_DEFS_CONSUMER_CONSUME_TOPIC
8996
this.propertyDefsDB = new PropertyDefsDB(hub)
97+
this.teamManager = new TeamManager(hub.postgres)
98+
this.groupTypeManager = new GroupTypeManager(hub.postgres, this.teamManager)
9099
}
91100

92101
public get service(): PluginServerService {
@@ -133,7 +142,9 @@ export class PropertyDefsConsumer {
133142
}
134143

135144
public async handleKafkaBatch(messages: Message[]) {
136-
const parsedMessages = await this.runInstrumented('parseKafkaMessages', () => this.parseKafkaBatch(messages))
145+
const parsedMessages: ClickHouseEvent[] = await this.runInstrumented('parseKafkaMessages', () =>
146+
this.parseKafkaBatch(messages)
147+
)
137148

138149
// used to filter and dedup to minimum batch of writable records
139150
const collected: CollectedPropertyDefinitions = {
@@ -144,18 +155,18 @@ export class PropertyDefsConsumer {
144155
eventPropertiesById: {},
145156
}
146157

147-
const teamsInBatch = this.extractTeamIds(parsedMessages)
148-
const groupTeamsInBatch = this.extractGroupTeamIds(parsedMessages, collected.knownTeamIds)
158+
const eventTeamIds = parsedMessages.map((msg) => msg.team_id as TeamId)
159+
const groupTeamIds = parsedMessages.filter((msg) => msg.event == '$groupidentify').map((msg) => msg.team_id)
149160

150-
const [knownTeamIds, resolvedTeamGroups] = await Promise.all([
151-
this.runInstrumented('resolveTeams', () => this.resolveTeams(this.propertyDefsDB, teamsInBatch)),
152-
this.runInstrumented('resolveGroupsForTeams', () =>
153-
this.resolveGroupsForTeams(this.propertyDefsDB, groupTeamsInBatch)
161+
const [knownTeamIds, resolvedProjectGroups] = await Promise.all([
162+
this.runInstrumented('resolveTeams', () => this.teamManager.validateTeamIds(eventTeamIds)),
163+
this.runInstrumented('resolveProjectGroupTypeIndices', () =>
164+
this.groupTypeManager.fetchGroupTypesIndicesForTeams(groupTeamIds)
154165
),
155166
])
156167

157-
collected.knownTeamIds = knownTeamIds
158-
collected.resolvedTeamGroups = resolvedTeamGroups
168+
collected.knownTeamIds = new Set(knownTeamIds)
169+
collected.resolvedTeamGroups = resolvedProjectGroups
159170

160171
console.log('🔁', `Event batch teams and group indices resolved`)
161172

@@ -167,38 +178,47 @@ export class PropertyDefsConsumer {
167178
console.log('🔁', `Property definitions collected`, JSON.stringify(collected, null, 2))
168179

169180
for (const knownTeamId in collected.eventDefinitionsById) {
181+
let buffer: EventDefinitionType[] = []
170182
for (const key in collected.eventDefinitionsById[knownTeamId]) {
171183
const eventDef = collected.eventDefinitionsById[knownTeamId][key]
172-
184+
buffer.push(eventDef)
173185
eventDefTypesCounter.inc()
174-
status.info('🔁', `Writing event definition`, { eventDef })
175186

176-
// TODO: Batch all these DB writes
177-
void this.scheduleWork(this.propertyDefsDB.writeEventDefinition(eventDef))
187+
if (buffer.length === BATCH_SIZE) {
188+
status.info('🔁', `Writing event definition batch of size ${buffer.length}`)
189+
void this.scheduleWork(this.propertyDefsDB.writeEventDefinitionsBatch(buffer))
190+
buffer = []
191+
}
178192
}
179193
}
180194

181195
for (const knownTeamId in collected.propertyDefinitionsById) {
196+
let buffer: PropertyDefinitionType[] = []
182197
for (const key in collected.propertyDefinitionsById[knownTeamId]) {
183198
const propDef: PropertyDefinitionType = collected.propertyDefinitionsById[knownTeamId][key]
184-
199+
buffer.push(propDef)
185200
propertyDefTypesCounter.inc({ type: propDef.property_type?.toString() ?? 'unknown' })
186-
status.info('🔁', `Writing property definition`, { propDef })
187201

188-
// TODO: Batch all these DB writes
189-
void this.scheduleWork(this.propertyDefsDB.writePropertyDefinition(propDef))
202+
if (buffer.length === BATCH_SIZE) {
203+
status.info('🔁', `Writing property definitions batch of size ${buffer.length}`)
204+
void this.scheduleWork(this.propertyDefsDB.writePropertyDefinitionsBatch(buffer))
205+
buffer = []
206+
}
190207
}
191208
}
192209

193210
for (const knownTeamId in collected.eventPropertiesById) {
211+
let buffer: EventPropertyType[] = []
194212
for (const key in collected.eventPropertiesById[knownTeamId]) {
195213
const eventProp = collected.eventPropertiesById[knownTeamId][key]
196-
197214
eventPropTypesCounter.inc()
198-
status.info('🔁', `Writing event property`, { eventProp })
215+
buffer.push(eventProp)
199216

200-
// TODO: Batch all these DB writes
201-
void this.scheduleWork(this.propertyDefsDB.writeEventProperty(eventProp))
217+
if (buffer.length === BATCH_SIZE) {
218+
status.info('🔁', `Writing event properties batch of size ${buffer.length}`)
219+
void this.scheduleWork(this.propertyDefsDB.writeEventPropertiesBatch(buffer))
220+
buffer = []
221+
}
202222
}
203223
}
204224

@@ -207,45 +227,6 @@ export class PropertyDefsConsumer {
207227
status.debug('🔁', `Processed batch`)
208228
}
209229

210-
private async resolveTeams(db: PropertyDefsDB, teamIdsInBatch: Set<number>): Promise<Set<number>> {
211-
const teamsFound = await db.findTeamIds(Array.from(teamIdsInBatch))
212-
return new Set(teamsFound.filter((row) => !teamIdsInBatch.has(row.teamId)).map((row) => row.teamId))
213-
}
214-
215-
private async resolveGroupsForTeams(
216-
db: PropertyDefsDB,
217-
knownTeamIdsWithGroup: Set<number>
218-
): Promise<Record<number, ResolvedGroups>> {
219-
const result = await db.resolveGroupsForTeams(Array.from(knownTeamIdsWithGroup))
220-
221-
const out: Record<number, ResolvedGroups> = {}
222-
result.forEach((row) => {
223-
let resolved: ResolvedGroups
224-
if (out[row.teamId]) {
225-
resolved = out[row.teamId]
226-
} else {
227-
resolved = {}
228-
}
229-
230-
resolved[row.groupName] = row.groupIndex
231-
out[row.teamId] = resolved
232-
})
233-
234-
return out
235-
}
236-
237-
private extractTeamIds(events: ClickHouseEvent[]): Set<number> {
238-
return new Set(events.map((event) => event.team_id))
239-
}
240-
241-
private extractGroupTeamIds(events: ClickHouseEvent[], knownTeamIds: Set<number>): Set<number> {
242-
return new Set(
243-
events
244-
.filter((event) => event.event == '$groupidentify' && knownTeamIds.has(event.team_id))
245-
.map((event) => event.team_id)
246-
)
247-
}
248-
249230
private extractPropertyDefinitions(events: ClickHouseEvent[], collected: CollectedPropertyDefinitions) {
250231
for (const event of events) {
251232
if (!collected.knownTeamIds.has(event.team_id)) {

plugin-server/src/property-defs/services/property-defs-db.ts

+2-34
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,6 @@ import { EventDefinitionType, EventPropertyType, Hub, PropertyDefinitionType } f
44
import { PostgresUse } from '../../utils/db/postgres'
55
import { status } from '../../utils/status'
66

7-
export interface TeamIdRow {
8-
teamId: number
9-
}
10-
11-
export interface TeamGroupRow {
12-
teamId: number
13-
groupName: string
14-
groupIndex: number
15-
}
16-
177
export class PropertyDefsDB {
188
constructor(private hub: Hub) {}
199

@@ -34,7 +24,7 @@ export class PropertyDefsDB {
3424
})
3525
}
3626

37-
async writeEventProperiesBatch(eventProperties: EventPropertyType[]) {
27+
async writeEventPropertiesBatch(eventProperties: EventPropertyType[]) {
3828
const values = Array(eventProperties.length).fill('($1, $2, $3, $4)').join(', ')
3929
const query = `INSERT INTO posthog_eventproperty (event, property, team_id, project_id)
4030
VALUES ${values}
@@ -76,7 +66,7 @@ export class PropertyDefsDB {
7666
)
7767
.catch((e) => {
7868
status.error('🔁', `Error writing property definitions batch`, {
79-
propertyDefinition: propertyDefinition,
69+
propertyDefinition,
8070
error: e.message,
8171
})
8272
throw e
@@ -165,28 +155,6 @@ export class PropertyDefsDB {
165155
})
166156
}
167157

168-
async findTeamIds(teamIds: number[]): Promise<TeamIdRow[]> {
169-
const result = await this.hub.postgres.query<TeamIdRow>(
170-
PostgresUse.COMMON_READ,
171-
`SELECT id AS team_id FROM posthog_team WHERE id = ANY ($1)`,
172-
[teamIds],
173-
'findTeamIds'
174-
)
175-
176-
return result.rows
177-
}
178-
179-
async resolveGroupsForTeams(teamIds: number[]): Promise<TeamGroupRow[]> {
180-
const result = await this.hub.postgres.query<TeamGroupRow>(
181-
PostgresUse.COMMON_READ,
182-
`SELECT team_ids, group_type_index FROM posthog_grouptype WHERE team_ids = ANY ($1)`,
183-
[teamIds],
184-
'findTeamIds'
185-
)
186-
187-
return result.rows
188-
}
189-
190158
async listPropertyDefinitions(teamId: number): Promise<PropertyDefinitionType[]> {
191159
const result = await this.hub.postgres.query<PropertyDefinitionType>(
192160
PostgresUse.COMMON_READ,

plugin-server/src/types.ts

+10
Original file line numberDiff line numberDiff line change
@@ -1186,6 +1186,16 @@ export interface EventPropertyType {
11861186
project_id: number | null
11871187
}
11881188

1189+
export interface TeamIdRow {
1190+
teamId: number
1191+
}
1192+
1193+
export interface TeamGroupRow {
1194+
teamId: number
1195+
groupName: string
1196+
groupIndex: number
1197+
}
1198+
11891199
export type GroupTypeToColumnIndex = Record<string, GroupTypeIndex>
11901200

11911201
export enum PropertyUpdateOperation {

plugin-server/src/worker/ingestion/group-type-manager.ts

+76-2
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
1-
import { GroupTypeIndex, GroupTypeToColumnIndex, ProjectId, Team, TeamId } from '../../types'
1+
import { GroupTypeIndex, GroupTypeToColumnIndex, ProjectId, Team, TeamGroupRow, TeamId } from '../../types'
22
import { PostgresRouter, PostgresUse } from '../../utils/db/postgres'
33
import { timeoutGuard } from '../../utils/db/utils'
44
import { captureTeamEvent } from '../../utils/posthog'
5+
import { status } from '../../utils/status'
56
import { getByAge } from '../../utils/utils'
67
import { TeamManager } from './team-manager'
78

89
/** How many unique group types to allow per team */
910
export const MAX_GROUP_TYPES_PER_TEAM = 5
1011

12+
const CHUNK_SIZE = 100
13+
1114
export class GroupTypeManager {
1215
private groupTypesCache: Map<ProjectId, [GroupTypeToColumnIndex, number]>
1316
private instanceSiteUrl: string
@@ -17,6 +20,77 @@ export class GroupTypeManager {
1720
this.instanceSiteUrl = instanceSiteUrl || 'unknown'
1821
}
1922

23+
// BaseEvent doesn't have project_id yet so still using team_id here.
24+
// TODO(eli): see if the underlying Kafka event payload does and we can add it (cc Ben)
25+
public async fetchGroupTypesIndicesForTeams(
26+
groupTeamIds: TeamId[]
27+
): Promise<Record<number, GroupTypeToColumnIndex>> {
28+
const out: Record<number, GroupTypeToColumnIndex> = {}
29+
if (groupTeamIds.length === 0) {
30+
return out
31+
}
32+
33+
const dedupedTeamIds = new Set(groupTeamIds)
34+
35+
// first, capture already cached group types and their indexes
36+
const cachedTeamIds = new Set(
37+
Array.from(dedupedTeamIds).filter((teamId) => {
38+
const gtci = this.groupTypesCache.get(teamId as ProjectId) // HACK ALERT!! no way this is a good idea (cc Ben)
39+
if (gtci) {
40+
gtci.forEach((entry) => {
41+
// if it's a GroupTypeToColumnIndex, not a number, we can use it
42+
if (typeof entry !== 'number') {
43+
if (!out[teamId]) {
44+
out[teamId] = entry
45+
}
46+
}
47+
})
48+
return true
49+
}
50+
return false
51+
})
52+
)
53+
54+
// finally, figure out what we need to fetch from the DB, then do so in batches
55+
const teamIdsToFetch = Array.from(dedupedTeamIds.difference(cachedTeamIds))
56+
57+
const handles: Promise<TeamGroupRow[]>[] = []
58+
for (let i = 0; i < teamIdsToFetch.length; i += CHUNK_SIZE) {
59+
const chunk = teamIdsToFetch.slice(i, i + CHUNK_SIZE)
60+
handles.push(this.fetchGroupTypeIndicesForTeams(chunk))
61+
}
62+
63+
await Promise.all(handles).then((results) => {
64+
results.forEach((foundGTIs) =>
65+
foundGTIs.forEach((teamGroupRow) => {
66+
if (!out[teamGroupRow.teamId]) {
67+
out[teamGroupRow.teamId] = {}
68+
}
69+
out[teamGroupRow.teamId][teamGroupRow.groupName] = teamGroupRow.groupIndex as GroupTypeIndex
70+
})
71+
)
72+
})
73+
74+
return out
75+
}
76+
77+
// TODO(eli): convert to use ProjectId if we can add to ClickHouseEvent
78+
private async fetchGroupTypeIndicesForTeams(teamIds: TeamId[]): Promise<TeamGroupRow[]> {
79+
const result = await this.postgres
80+
.query<TeamGroupRow>(
81+
PostgresUse.COMMON_READ,
82+
`SELECT team_id, group_type, group_type_index FROM posthog_grouptypemapping WHERE team_id = ANY ($1)`,
83+
[teamIds],
84+
'findGroupTypeIndicesForTeams'
85+
)
86+
.catch((e) => {
87+
status.error('🔁', `Error fetching group type mappings`, { error: e.message })
88+
throw e
89+
})
90+
91+
return result.rows
92+
}
93+
2094
public async fetchGroupTypes(projectId: ProjectId): Promise<GroupTypeToColumnIndex> {
2195
const cachedGroupTypes = getByAge(this.groupTypesCache, projectId)
2296
if (cachedGroupTypes) {
@@ -26,7 +100,7 @@ export class GroupTypeManager {
26100
const timeout = timeoutGuard(`Still running "fetchGroupTypes". Timeout warning after 30 sec!`)
27101
try {
28102
const { rows } = await this.postgres.query(
29-
PostgresUse.COMMON_WRITE,
103+
PostgresUse.COMMON_WRITE, // TODO: can we get away with COMMON_READ here? cc Ben
30104
`SELECT * FROM posthog_grouptypemapping WHERE project_id = $1`,
31105
[projectId],
32106
'fetchGroupTypes'

0 commit comments

Comments
 (0)