-
Notifications
You must be signed in to change notification settings - Fork 1.5k
/
Copy pathconfig.ts
340 lines (315 loc) · 15.1 KB
/
config.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
import { LogLevel, PluginLogLevel, PluginsServerConfig, stringToPluginServerMode, ValueMatcher } from '../types'
import { isDevEnv, isTestEnv, stringToBoolean } from '../utils/env-utils'
import { KAFKAJS_LOG_LEVEL_MAPPING } from './constants'
import {
KAFKA_CLICKHOUSE_HEATMAP_EVENTS,
KAFKA_EVENTS_JSON,
KAFKA_EVENTS_PLUGIN_INGESTION,
KAFKA_EVENTS_PLUGIN_INGESTION_DLQ,
KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW,
KAFKA_EXCEPTION_SYMBOLIFICATION_EVENTS,
} from './kafka-topics'
export const DEFAULT_HTTP_SERVER_PORT = 6738
export const defaultConfig = overrideWithEnv(getDefaultConfig())
export function getDefaultConfig(): PluginsServerConfig {
return {
DATABASE_URL: isTestEnv()
? 'postgres://posthog:posthog@localhost:5432/test_posthog'
: isDevEnv()
? 'postgres://posthog:posthog@localhost:5432/posthog'
: '',
DATABASE_READONLY_URL: '',
PLUGIN_STORAGE_DATABASE_URL: '',
POSTGRES_CONNECTION_POOL_SIZE: 10,
POSTHOG_DB_NAME: null,
POSTHOG_DB_USER: 'postgres',
POSTHOG_DB_PASSWORD: '',
POSTHOG_POSTGRES_HOST: 'localhost',
POSTHOG_POSTGRES_PORT: 5432,
CLICKHOUSE_HOST: 'localhost',
CLICKHOUSE_OFFLINE_CLUSTER_HOST: null,
CLICKHOUSE_DATABASE: isTestEnv() ? 'posthog_test' : 'default',
CLICKHOUSE_USER: 'default',
CLICKHOUSE_PASSWORD: null,
CLICKHOUSE_CA: null,
CLICKHOUSE_SECURE: false,
EVENT_OVERFLOW_BUCKET_CAPACITY: 1000,
EVENT_OVERFLOW_BUCKET_REPLENISH_RATE: 1.0,
SKIP_UPDATE_EVENT_AND_PROPERTIES_STEP: false,
KAFKA_HOSTS: 'kafka:9092', // KEEP IN SYNC WITH posthog/settings/data_stores.py
KAFKA_PRODUCER_HOSTS: undefined,
KAFKA_CLIENT_CERT_B64: undefined,
KAFKA_CLIENT_CERT_KEY_B64: undefined,
KAFKA_TRUSTED_CERT_B64: undefined,
KAFKA_SECURITY_PROTOCOL: undefined,
KAFKA_PRODUCER_SECURITY_PROTOCOL: undefined,
KAFKA_SASL_MECHANISM: undefined,
KAFKA_SASL_USER: undefined,
KAFKA_SASL_PASSWORD: undefined,
KAFKA_CLIENT_ID: undefined,
KAFKA_CLIENT_RACK: undefined,
KAFKA_CONSUMPTION_MAX_BYTES: 10_485_760, // Default value for kafkajs
KAFKA_CONSUMPTION_MAX_BYTES_PER_PARTITION: 1_048_576, // Default value for kafkajs, must be bigger than message size
KAFKA_CONSUMPTION_MAX_WAIT_MS: 50, // Maximum time the broker may wait to fill the Fetch response with fetch.min.bytes of messages.
KAFKA_CONSUMPTION_ERROR_BACKOFF_MS: 100, // Timeout when a partition read fails (possibly because empty).
KAFKA_CONSUMPTION_BATCHING_TIMEOUT_MS: 500, // Timeout on reads from the prefetch buffer before running consumer loops
KAFKA_CONSUMPTION_TOPIC: KAFKA_EVENTS_PLUGIN_INGESTION,
KAFKA_CONSUMPTION_OVERFLOW_TOPIC: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW,
KAFKA_CONSUMPTION_REBALANCE_TIMEOUT_MS: null,
KAFKA_CONSUMPTION_SESSION_TIMEOUT_MS: 30_000,
KAFKA_CONSUMPTION_MAX_POLL_INTERVAL_MS: 300_000,
KAFKA_TOPIC_CREATION_TIMEOUT_MS: isDevEnv() ? 30_000 : 5_000, // rdkafka default is 5s, increased in devenv to resist to slow kafka
KAFKA_TOPIC_METADATA_REFRESH_INTERVAL_MS: undefined,
KAFKA_FLUSH_FREQUENCY_MS: isTestEnv() ? 5 : 500,
APP_METRICS_FLUSH_FREQUENCY_MS: isTestEnv() ? 5 : 20_000,
APP_METRICS_FLUSH_MAX_QUEUE_SIZE: isTestEnv() ? 5 : 1000,
REDIS_URL: 'redis://127.0.0.1',
INGESTION_REDIS_HOST: '',
INGESTION_REDIS_PORT: 6379,
POSTHOG_REDIS_PASSWORD: '',
POSTHOG_REDIS_HOST: '',
POSTHOG_REDIS_PORT: 6379,
BASE_DIR: '..',
PLUGINS_RELOAD_PUBSUB_CHANNEL: 'reload-plugins',
TASK_TIMEOUT: 30,
TASKS_PER_WORKER: 10,
INGESTION_CONCURRENCY: 10,
INGESTION_BATCH_SIZE: 500,
INGESTION_OVERFLOW_ENABLED: false,
INGESTION_OVERFLOW_PRESERVE_PARTITION_LOCALITY: false,
PLUGINS_DEFAULT_LOG_LEVEL: isTestEnv() ? PluginLogLevel.Full : PluginLogLevel.Log,
LOG_LEVEL: isTestEnv() ? LogLevel.Warn : LogLevel.Info,
SENTRY_DSN: null,
SENTRY_PLUGIN_SERVER_TRACING_SAMPLE_RATE: 0,
SENTRY_PLUGIN_SERVER_PROFILING_SAMPLE_RATE: 0,
HTTP_SERVER_PORT: DEFAULT_HTTP_SERVER_PORT,
SCHEDULE_LOCK_TTL: 60,
REDIS_POOL_MIN_SIZE: 1,
REDIS_POOL_MAX_SIZE: 3,
DISABLE_MMDB: isTestEnv(),
MMDB_FILE_LOCATION:
isDevEnv() || isTestEnv() ? '../share/GeoLite2-City.mmdb' : '/s3/ingestion-assets/mmdb/GeoLite2-City.mmdb',
DISTINCT_ID_LRU_SIZE: 10000,
EVENT_PROPERTY_LRU_SIZE: 10000,
JOB_QUEUES: 'graphile',
JOB_QUEUE_GRAPHILE_URL: '',
JOB_QUEUE_GRAPHILE_SCHEMA: 'graphile_worker',
JOB_QUEUE_GRAPHILE_PREPARED_STATEMENTS: false,
JOB_QUEUE_GRAPHILE_CONCURRENCY: 1,
JOB_QUEUE_S3_AWS_ACCESS_KEY: '',
JOB_QUEUE_S3_AWS_SECRET_ACCESS_KEY: '',
JOB_QUEUE_S3_AWS_REGION: 'us-west-1',
JOB_QUEUE_S3_BUCKET_NAME: '',
JOB_QUEUE_S3_PREFIX: '',
CRASH_IF_NO_PERSISTENT_JOB_QUEUE: false,
HEALTHCHECK_MAX_STALE_SECONDS: 2 * 60 * 60, // 2 hours
SITE_URL: null,
KAFKA_PARTITIONS_CONSUMED_CONCURRENTLY: 1,
CLICKHOUSE_JSON_EVENTS_KAFKA_TOPIC: KAFKA_EVENTS_JSON,
CLICKHOUSE_HEATMAPS_KAFKA_TOPIC: KAFKA_CLICKHOUSE_HEATMAP_EVENTS,
EXCEPTIONS_SYMBOLIFICATION_KAFKA_TOPIC: KAFKA_EXCEPTION_SYMBOLIFICATION_EVENTS,
PERSON_INFO_CACHE_TTL: 5 * 60, // 5 min
KAFKA_HEALTHCHECK_SECONDS: 20,
OBJECT_STORAGE_ENABLED: true,
OBJECT_STORAGE_ENDPOINT: 'http://localhost:19000',
OBJECT_STORAGE_REGION: 'us-east-1',
OBJECT_STORAGE_ACCESS_KEY_ID: 'object_storage_root_user',
OBJECT_STORAGE_SECRET_ACCESS_KEY: 'object_storage_root_password',
OBJECT_STORAGE_BUCKET: 'posthog',
PLUGIN_SERVER_MODE: null,
PLUGIN_SERVER_EVENTS_INGESTION_PIPELINE: null,
PLUGIN_LOAD_SEQUENTIALLY: false,
KAFKAJS_LOG_LEVEL: 'WARN',
MAX_TEAM_ID_TO_BUFFER_ANONYMOUS_EVENTS_FOR: 0,
CLOUD_DEPLOYMENT: null,
EXTERNAL_REQUEST_TIMEOUT_MS: 10 * 1000, // 10 seconds
DROP_EVENTS_BY_TOKEN_DISTINCT_ID: '',
DROP_EVENTS_BY_TOKEN: '',
SKIP_PERSONS_PROCESSING_BY_TOKEN_DISTINCT_ID: '',
PIPELINE_STEP_STALLED_LOG_TIMEOUT: 30,
RELOAD_PLUGIN_JITTER_MAX_MS: 60000,
RUSTY_HOOK_FOR_TEAMS: '',
RUSTY_HOOK_ROLLOUT_PERCENTAGE: 0,
RUSTY_HOOK_URL: '',
HOG_HOOK_URL: '',
CAPTURE_CONFIG_REDIS_HOST: null,
// posthog
POSTHOG_API_KEY: '',
POSTHOG_HOST_URL: 'http://localhost:8010',
STARTUP_PROFILE_DURATION_SECONDS: 300, // 5 minutes
STARTUP_PROFILE_CPU: false,
STARTUP_PROFILE_HEAP: false,
STARTUP_PROFILE_HEAP_INTERVAL: 512 * 1024, // default v8 value
STARTUP_PROFILE_HEAP_DEPTH: 16, // default v8 value
SESSION_RECORDING_KAFKA_HOSTS: undefined,
SESSION_RECORDING_KAFKA_SECURITY_PROTOCOL: undefined,
SESSION_RECORDING_KAFKA_BATCH_SIZE: 500,
SESSION_RECORDING_KAFKA_QUEUE_SIZE: 1500,
// if not set we'll use the plugin server default value
SESSION_RECORDING_KAFKA_QUEUE_SIZE_KB: undefined,
SESSION_RECORDING_LOCAL_DIRECTORY: '.tmp/sessions',
// NOTE: 10 minutes
SESSION_RECORDING_MAX_BUFFER_AGE_SECONDS: 60 * 10,
SESSION_RECORDING_BUFFER_AGE_JITTER: 0.3,
SESSION_RECORDING_BUFFER_AGE_IN_MEMORY_MULTIPLIER: 1.2,
SESSION_RECORDING_MAX_BUFFER_SIZE_KB: 1024 * 50, // 50MB
SESSION_RECORDING_REMOTE_FOLDER: 'session_recordings',
SESSION_RECORDING_REDIS_PREFIX: '@posthog/replay/',
SESSION_RECORDING_PARTITION_REVOKE_OPTIMIZATION: false,
SESSION_RECORDING_PARALLEL_CONSUMPTION: false,
POSTHOG_SESSION_RECORDING_REDIS_HOST: undefined,
POSTHOG_SESSION_RECORDING_REDIS_PORT: undefined,
SESSION_RECORDING_CONSOLE_LOGS_INGESTION_ENABLED: true,
SESSION_RECORDING_REPLAY_EVENTS_INGESTION_ENABLED: true,
SESSION_RECORDING_DEBUG_PARTITION: '',
SESSION_RECORDING_KAFKA_DEBUG: undefined,
SESSION_RECORDING_MAX_PARALLEL_FLUSHES: 10,
SESSION_RECORDING_OVERFLOW_ENABLED: false,
SESSION_RECORDING_OVERFLOW_BUCKET_REPLENISH_RATE: 5_000_000, // 5MB/second uncompressed, sustained
SESSION_RECORDING_OVERFLOW_BUCKET_CAPACITY: 200_000_000, // 200MB burst
SESSION_RECORDING_OVERFLOW_MIN_PER_BATCH: 1_000_000, // All sessions consume at least 1MB/batch, to penalise poor batching
SESSION_RECORDING_KAFKA_CONSUMPTION_STATISTICS_EVENT_INTERVAL_MS: 0, // 0 disables stats collection
SESSION_RECORDING_KAFKA_FETCH_MIN_BYTES: 1_048_576, // 1MB
ENCRYPTION_SALT_KEYS: isDevEnv() || isTestEnv() ? '00beef0000beef0000beef0000beef00' : '',
// CDP
CDP_WATCHER_COST_ERROR: 100,
CDP_WATCHER_COST_TIMING: 20,
CDP_WATCHER_COST_TIMING_LOWER_MS: 100,
CDP_WATCHER_COST_TIMING_UPPER_MS: 5000,
CDP_WATCHER_THRESHOLD_DEGRADED: 0.8,
CDP_WATCHER_BUCKET_SIZE: 10000,
CDP_WATCHER_DISABLED_TEMPORARY_TTL: 60 * 10, // 5 minutes
CDP_WATCHER_TTL: 60 * 60 * 24, // This is really long as it is essentially only important to make sure the key is eventually deleted
CDP_WATCHER_REFILL_RATE: 10,
CDP_WATCHER_DISABLED_TEMPORARY_MAX_COUNT: 3,
CDP_HOG_FILTERS_TELEMETRY_TEAMS: '',
CDP_REDIS_PASSWORD: '',
CDP_EVENT_PROCESSOR_EXECUTE_FIRST_STEP: true,
CDP_REDIS_HOST: '',
CDP_REDIS_PORT: 6479,
CDP_CYCLOTRON_BATCH_DELAY_MS: 50,
CDP_CYCLOTRON_BATCH_SIZE: 300,
CDP_GOOGLE_ADWORDS_DEVELOPER_TOKEN: '',
// Destination Migration Diffing
DESTINATION_MIGRATION_DIFFING_ENABLED: false,
// Cyclotron
CYCLOTRON_DATABASE_URL: isTestEnv()
? 'postgres://posthog:posthog@localhost:5432/test_cyclotron'
: isDevEnv()
? 'postgres://posthog:posthog@localhost:5432/cyclotron'
: '',
CYCLOTRON_SHARD_DEPTH_LIMIT: 1000000,
// New IngestionConsumer config
INGESTION_CONSUMER_GROUP_ID: 'events-ingestion-consumer',
INGESTION_CONSUMER_CONSUME_TOPIC: KAFKA_EVENTS_PLUGIN_INGESTION,
INGESTION_CONSUMER_OVERFLOW_TOPIC: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW,
INGESTION_CONSUMER_DLQ_TOPIC: KAFKA_EVENTS_PLUGIN_INGESTION_DLQ,
// PropertyDefsConsumer config
PROPERTY_DEFS_CONSUMER_GROUP_ID: 'property-defs-consumer',
PROPERTY_DEFS_CONSUMER_CONSUME_TOPIC: KAFKA_EVENTS_JSON,
// Session recording V2
SESSION_RECORDING_MAX_BATCH_SIZE_KB: 100 * 1024, // 100MB
SESSION_RECORDING_MAX_BATCH_AGE_MS: 10 * 1000, // 10 seconds
SESSION_RECORDING_V2_S3_BUCKET: 'posthog',
SESSION_RECORDING_V2_S3_PREFIX: 'session_recording_batches',
SESSION_RECORDING_V2_S3_ENDPOINT: 'http://localhost:19000',
SESSION_RECORDING_V2_S3_REGION: 'us-east-1',
SESSION_RECORDING_V2_S3_ACCESS_KEY_ID: 'object_storage_root_user',
SESSION_RECORDING_V2_S3_SECRET_ACCESS_KEY: 'object_storage_root_password',
SESSION_RECORDING_V2_S3_TIMEOUT_MS: 30000,
// Cookieless
COOKIELESS_FORCE_STATELESS_MODE: false,
COOKIELESS_DISABLED: false,
COOKIELESS_DELETE_EXPIRED_LOCAL_SALTS_INTERVAL_MS: 60 * 60 * 1000, // 1 hour
COOKIELESS_SESSION_TTL_SECONDS: 60 * 60 * 24, // 24 hours
COOKIELESS_SALT_TTL_SECONDS: 60 * 60 * 24, // 24 hours
COOKIELESS_SESSION_INACTIVITY_MS: 30 * 60 * 1000, // 30 minutes
COOKIELESS_IDENTIFIES_TTL_SECONDS:
(24 + // max supported ingestion lag
12 + // max negative timezone in the world*/
14 + // max positive timezone in the world */
24) * // amount of time salt is valid in one timezone
60 *
60,
}
}
export function overrideWithEnv(
config: PluginsServerConfig,
env: Record<string, string | undefined> = process.env
): PluginsServerConfig {
const defaultConfig = getDefaultConfig() as any // to make typechecker happy to use defaultConfig[key]
const tmpConfig: any = { ...config }
for (const key of Object.keys(config)) {
if (typeof env[key] !== 'undefined') {
if (key == 'PLUGIN_SERVER_MODE') {
const mode = env[key]
if (mode == null || mode in stringToPluginServerMode) {
tmpConfig[key] = env[key]
} else {
throw Error(`Invalid PLUGIN_SERVER_MODE ${env[key]}`)
}
} else if (typeof defaultConfig[key] === 'number') {
tmpConfig[key] = env[key]?.indexOf('.') ? parseFloat(env[key]!) : parseInt(env[key]!)
} else if (typeof defaultConfig[key] === 'boolean') {
tmpConfig[key] = stringToBoolean(env[key])
} else {
tmpConfig[key] = env[key]
}
}
}
const newConfig: PluginsServerConfig = { ...tmpConfig }
if (!newConfig.DATABASE_URL && !newConfig.POSTHOG_DB_NAME) {
throw Error(
'You must specify either DATABASE_URL or the database options POSTHOG_DB_NAME, POSTHOG_DB_USER, POSTHOG_DB_PASSWORD, POSTHOG_POSTGRES_HOST, POSTHOG_POSTGRES_PORT!'
)
}
if (!newConfig.DATABASE_URL) {
const encodedUser = encodeURIComponent(newConfig.POSTHOG_DB_USER)
const encodedPassword = encodeURIComponent(newConfig.POSTHOG_DB_PASSWORD)
newConfig.DATABASE_URL = `postgres://${encodedUser}:${encodedPassword}@${newConfig.POSTHOG_POSTGRES_HOST}:${newConfig.POSTHOG_POSTGRES_PORT}/${newConfig.POSTHOG_DB_NAME}`
}
if (!newConfig.JOB_QUEUE_GRAPHILE_URL) {
newConfig.JOB_QUEUE_GRAPHILE_URL = newConfig.DATABASE_URL
}
if (!Object.keys(KAFKAJS_LOG_LEVEL_MAPPING).includes(newConfig.KAFKAJS_LOG_LEVEL)) {
throw Error(
`Invalid KAFKAJS_LOG_LEVEL ${newConfig.KAFKAJS_LOG_LEVEL}. Valid: ${Object.keys(
KAFKAJS_LOG_LEVEL_MAPPING
).join(', ')}`
)
}
return newConfig
}
export function buildIntegerMatcher(config: string | undefined, allowStar: boolean): ValueMatcher<number> {
// Builds a ValueMatcher on a comma-separated list of values.
// Optionally, supports a '*' value to match everything
if (!config || config.trim().length == 0) {
return () => false
} else if (allowStar && config === '*') {
return () => true
} else {
const values = new Set(
config
.split(',')
.map((n) => parseInt(n))
.filter((num) => !isNaN(num))
)
return (v: number) => {
return values.has(v)
}
}
}
export function buildStringMatcher(config: string | undefined, allowStar: boolean): ValueMatcher<string> {
// Builds a ValueMatcher on a comma-separated list of values.
// Optionally, supports a '*' value to match everything
if (!config || config.trim().length == 0) {
return () => false
} else if (allowStar && config === '*') {
return () => true
} else {
const values = new Set(config.split(','))
return (v: string) => {
return values.has(v)
}
}
}