diff --git a/.github/workflows/node-clickhouse.js.yml b/.github/workflows/node-clickhouse.js.yml index 0fee32e5..9e5fdf41 100644 --- a/.github/workflows/node-clickhouse.js.yml +++ b/.github/workflows/node-clickhouse.js.yml @@ -23,7 +23,7 @@ jobs: strategy: matrix: - node-version: [18, 16.x] + node-version: [18, 16.x, 20] # See supported Node.js release schedule at https://nodejs.org/en/about/releases/ steps: @@ -43,4 +43,4 @@ jobs: CLICKHOUSE_TSDB: loki INTEGRATION_E2E: 1 CLOKI_EXT_URL: 127.0.0.1:3100 - run: node qryn.js >/dev/stdout & npm run test --forceExit + run: node qryn.mjs >/dev/stdout & npm run test --forceExit diff --git a/Dockerfile_bun b/Dockerfile_bun new file mode 100644 index 00000000..2bd49ab1 --- /dev/null +++ b/Dockerfile_bun @@ -0,0 +1,15 @@ +# Qryn +FROM oven/bun:1-alpine + +# BUILD FORCE +ENV BUILD 703030 +ENV PORT 3100 + +COPY . /app +WORKDIR /app +RUN bun install + +# Expose Ports +EXPOSE 3100 + +CMD [ "bun", "qryn.mjs" ] diff --git a/common.js b/common.js index acf354a2..5936707d 100644 --- a/common.js +++ b/common.js @@ -125,3 +125,13 @@ module.exports.isCustomSamplesOrderingRule = () => { module.exports.CORS = process.env.CORS_ALLOW_ORIGIN || '*' module.exports.clusterName = process.env.CLUSTER_NAME + +module.exports.readonly = process.env.READONLY || false + +module.exports.bun = () => { + try { + return Bun + } catch (err) { + return false + } +} diff --git a/docker/docker-compose-centos.yml b/docker/docker-compose-centos.yml index 7a142343..9beb607f 100644 --- a/docker/docker-compose-centos.yml +++ b/docker/docker-compose-centos.yml @@ -39,4 +39,4 @@ services: container_name: centos volumes: - ../:/opt/qryn - entrypoint: bash -c 'cd ~ ; cp -rf /opt/qryn . ; cd qryn; ls -la ; rm -rf node_modules ; npm install ; CLICKHOUSE_DB=loki CLICKHOUSE_TSDB=loki INTEGRATION_E2E=1 CLICKHOUSE_SERVER=clickhouse-seed node qryn.js' + entrypoint: bash -c 'cd ~ ; cp -rf /opt/qryn . ; cd qryn; ls -la ; rm -rf node_modules ; npm install ; CLICKHOUSE_DB=loki CLICKHOUSE_TSDB=loki INTEGRATION_E2E=1 CLICKHOUSE_SERVER=clickhouse-seed node qryn.mjs' diff --git a/lib/bun_wrapper.js b/lib/bun_wrapper.js new file mode 100644 index 00000000..6fe38a2c --- /dev/null +++ b/lib/bun_wrapper.js @@ -0,0 +1,168 @@ +const { Transform } = require('stream') +const log = require('./logger') +const { EventEmitter } = require('events') + +class BodyStream extends Transform { + _transform (chunk, encoding, callback) { + callback(null, chunk) + } + + once (event, listerer) { + const self = this + const _listener = (e) => { + listerer(e) + self.removeListener(event, _listener) + } + this.on(event, _listener) + } +} + +const wrapper = (handler, parsers) => { + /** + * @param ctx {Request} + */ + const res = async (ctx, server) => { + let response = '' + let status = 200 + let reqBody = '' + let headers = {} + log.info(`${ctx.url}`) + + const stream = new BodyStream() + setTimeout(async () => { + if (!ctx.body) { + stream.end() + return + } + for await (const chunk of ctx.body) { + stream.write(chunk) + } + stream.end() + }) + const req = { + headers: Object.fromEntries(ctx.headers.entries()), + raw: stream, + log: log, + params: ctx.params || {}, + query: {} + } + for (const [key, value] of (new URL(ctx.url)).searchParams) { + if (!(key in req.query)) { + req.query[key] = value + continue + } + req.query[key] = Array.isArray(req.query[key]) + ? [...req.query[key], value] + : [req.query[key], value] + } + const res = { + send: (msg) => { + response = msg + }, + code: (code) => { + status = code + return res + }, + header: (key, value) => { + headers[key] = value + return res + }, + headers: (hdrs) => { + headers = { ...headers, ...hdrs } + return res + } + } + + if (parsers) { + const contentType = (ctx.headers.get('Content-Type') || '') + let ok = false + for (const [type, parser] of Object.entries(parsers)) { + if (type !== '*' && contentType.indexOf(type) > -1) { + log.debug(`parsing ${type}`) + reqBody = await parser(req, stream) + ok = true + log.debug(`parsing ${type} ok`) + } + } + if (!ok && parsers['*']) { + log.debug('parsing *') + reqBody = await parsers['*'](req, stream) + ok = true + log.debug('parsing * ok') + } + if (!ok) { + throw new Error('undefined content type ' + contentType) + } + } + + req.body = reqBody || stream + + let result = handler(req, res) + if (result && result.then) { + result = await result + } + if (result && result.on) { + response = '' + result.on('data', (d) => { + response += d + }) + await new Promise((resolve, reject) => { + result.on('end', resolve) + result.on('error', reject) + result.on('close', resolve) + }) + result = null + } + if (result) { + response = result + } + if (response instanceof Object && typeof response !== 'string' && !Buffer.isBuffer(response)) { + response = JSON.stringify(response) + } + return new Response(response, { status: status, headers: headers }) + } + return res +} + +const wsWrapper = (handler) => { + /** + * @param ctx {Request} + */ + const res = { + open: async (ctx, server) => { + const req = { + headers: Object.fromEntries(ctx.data.ctx.headers.entries()), + log: log, + query: {} + } + for (const [key, value] of (new URL(ctx.data.ctx.url)).searchParams) { + if (!(key in req.query)) { + req.query[key] = value + continue + } + req.query[key] = Array.isArray(req.query[key]) + ? [...req.query[key], value] + : [req.query[key], value] + } + + ctx.closeEmitter = new EventEmitter() + ctx.closeEmitter.send = ctx.send.bind(ctx) + + const ws = { + socket: ctx.closeEmitter + } + + const result = handler(ws, { query: req.query }) + if (result && result.then) { + await result + } + }, + close: (ctx) => { ctx.closeEmitter.emit('close') } + } + return res +} + +module.exports = { + wrapper, + wsWrapper +} diff --git a/lib/db/clickhouse.js b/lib/db/clickhouse.js index 6e7ab8bf..5c6fd056 100644 --- a/lib/db/clickhouse.js +++ b/lib/db/clickhouse.js @@ -17,14 +17,6 @@ const dist = clusterName ? '_dist' : '' /* DB Helper */ const ClickHouse = require('@apla/clickhouse') -const clickhouseOptions = { - host: process.env.CLICKHOUSE_SERVER || 'localhost', - port: process.env.CLICKHOUSE_PORT || 8123, - auth: process.env.CLICKHOUSE_AUTH || 'default:', - protocol: process.env.CLICKHOUSE_PROTO ? process.env.CLICKHOUSE_PROTO + ':' : 'http:', - readonly: !!process.env.READONLY, - queryOptions: { database: process.env.CLICKHOUSE_DB || 'cloki' } -} const transpiler = require('../../parser/transpiler') const rotationLabels = process.env.LABELS_DAYS || 7 @@ -33,9 +25,9 @@ const axios = require('axios') const { samplesTableName, samplesReadTableName } = UTILS const path = require('path') const { Transform } = require('stream') -const { CORS } = require('../../common') - -const protocol = process.env.CLICKHOUSE_PROTO || 'http' +const { CORS, bun } = require('../../common') +const clickhouseOptions = require('./clickhouse_options').databaseOptions +const { getClickhouseUrl } = require('./clickhouse_options') // External Storage Policy for Tables (S3, MINIO) const storagePolicy = process.env.STORAGE_POLICY || false @@ -76,7 +68,8 @@ const conveyor = { let throttler = null const resolvers = {} const rejectors = {} -if (isMainThread) { +let first = false +if (isMainThread && !bun()) { throttler = new Worker(path.join(__dirname, 'throttler.js')) throttler.on('message', (msg) => { switch (msg.status) { @@ -90,8 +83,29 @@ if (isMainThread) { delete resolvers[msg.id] delete rejectors[msg.id] }) +} else if (isMainThread && !first) { + first = true + const _throttler = require('./throttler') + throttler = { + on: _throttler.on, + postMessage: _throttler.postMessage, + removeAllListeners: _throttler.removeAllListeners, + terminate: _throttler.terminate + } + _throttler.init() + throttler.on('message', (msg) => { + switch (msg.status) { + case 'ok': + resolvers[msg.id]() + break + case 'err': + rejectors[msg.id](new Error('Database push error')) + break + } + delete resolvers[msg.id] + delete rejectors[msg.id] + }) } - // timeSeriesv2Throttler.start(); /* Cache Helper */ @@ -348,10 +362,6 @@ function pushOTLP (traces) { }) } -function getClickhouseUrl () { - return `${protocol}://${clickhouseOptions.auth}@${clickhouseOptions.host}:${clickhouseOptions.port}` -} - /** * @param query {{ * query: string, @@ -455,6 +465,7 @@ const queryTempoScanV2 = async function (query) { } const limit = query.limit ? `LIMIT ${parseInt(query.limit)}` : '' const sql = `${select} ${from} WHERE ${where.join(' AND ')} ORDER BY timestamp_ns DESC ${limit} FORMAT JSON` + console.log(sql) const resp = await rawRequest(sql, null, process.env.CLICKHOUSE_DB || 'cloki') return resp.data.data ? resp.data.data : JSON.parse(resp.data).data } diff --git a/lib/db/clickhouse_options.js b/lib/db/clickhouse_options.js new file mode 100644 index 00000000..2db4510b --- /dev/null +++ b/lib/db/clickhouse_options.js @@ -0,0 +1,22 @@ +const UTILS = require('../utils') +const { samplesTableName, samplesReadTableName } = UTILS + +const clickhouseOptions = { + host: process.env.CLICKHOUSE_SERVER || 'localhost', + port: process.env.CLICKHOUSE_PORT || 8123, + auth: process.env.CLICKHOUSE_AUTH || 'default:', + protocol: process.env.CLICKHOUSE_PROTO ? process.env.CLICKHOUSE_PROTO + ':' : 'http:', + readonly: !!process.env.READONLY, + queryOptions: { database: process.env.CLICKHOUSE_DB || 'cloki' } +} + +function getClickhouseUrl () { + return `${clickhouseOptions.protocol}//${clickhouseOptions.auth}@${clickhouseOptions.host}:${clickhouseOptions.port}` +} + +module.exports = { + samplesTableName, + samplesReadTableName, + getClickhouseUrl, + databaseOptions: clickhouseOptions +} diff --git a/lib/db/throttler.js b/lib/db/throttler.js index 21a3250c..8d84495e 100644 --- a/lib/db/throttler.js +++ b/lib/db/throttler.js @@ -1,10 +1,11 @@ const { isMainThread, parentPort } = require('worker_threads') const axios = require('axios') -const { getClickhouseUrl, samplesTableName } = require('./clickhouse') -const clickhouseOptions = require('./clickhouse').databaseOptions +const { getClickhouseUrl, samplesTableName } = require('./clickhouse_options') +const clickhouseOptions = require('./clickhouse_options').databaseOptions const logger = require('../logger') const clusterName = require('../../common').clusterName const dist = clusterName ? '_dist' : '' +const { EventEmitter } = require('events') const axiosError = async (err) => { try { @@ -71,14 +72,45 @@ const tracesThottler = new TimeoutThrottler( (trace_id, span_id, parent_id, name, timestamp_ns, duration_ns, service_name, payload_type, payload, tags) FORMAT JSONEachRow`) -if (isMainThread) { - module.exports = { - samplesThrottler, - timeSeriesThrottler, - TimeoutThrottler +const emitter = new EventEmitter() +let on = true +const postMessage = message => { + const genericRequest = (throttler) => { + throttler.queue.push(message.data) + throttler.resolvers.push(() => { + if (isMainThread) { + emitter.emit('message', { status: 'ok', id: message.id }) + return + } + parentPort.postMessage({ status: 'ok', id: message.id }) + }) + throttler.rejects.push(() => { + if (isMainThread) { + emitter.emit('message', { status: 'err', id: message.id }) + return + } + parentPort.postMessage({ status: 'err', id: message.id }) + }) } -} else { - let on = true + switch (message.type) { + case 'end': + on = false + if (!isMainThread) { + parentPort.removeAllListeners('message') + } + break + case 'values': + genericRequest(samplesThrottler) + break + case 'labels': + genericRequest(timeSeriesThrottler) + break + case 'traces': + genericRequest(tracesThottler) + } +} + +const init = () => { setTimeout(async () => { // eslint-disable-next-line no-unmodified-loop-condition while (on) { @@ -96,29 +128,25 @@ if (isMainThread) { } } }, 0) - parentPort.on('message', message => { - const genericRequest = (throttler) => { - throttler.queue.push(message.data) - throttler.resolvers.push(() => { - parentPort.postMessage({ status: 'ok', id: message.id }) - }) - throttler.rejects.push(() => { - parentPort.postMessage({ status: 'err', id: message.id }) - }) - } - switch (message.type) { - case 'end': - on = false - parentPort.removeAllListeners('message') - break - case 'values': - genericRequest(samplesThrottler) - break - case 'labels': - genericRequest(timeSeriesThrottler) - break - case 'traces': - genericRequest(tracesThottler) +} + +if (isMainThread) { + module.exports = { + samplesThrottler, + timeSeriesThrottler, + tracesThottler, + TimeoutThrottler, + postMessage, + on: emitter.on.bind(emitter), + removeAllListeners: emitter.removeAllListeners.bind(emitter), + init, + terminate: () => { + postMessage({ type: 'end' }) } + } +} else { + init() + parentPort.on('message', message => { + postMessage(message) }) } diff --git a/lib/db/zipkin.js b/lib/db/zipkin.js index 2837c727..3920dcfc 100644 --- a/lib/db/zipkin.js +++ b/lib/db/zipkin.js @@ -26,7 +26,13 @@ module.exports = class { * @returns {string} */ toJson () { - return JSON.stringify(this, (k, val) => typeof val === 'bigint' ? val.toString() : val) + const res = { + ...this, + timestamp_ns: this.timestamp_ns.toString(), + duration_ns: this.duration_ns.toString() + } + return JSON.stringify(res) + //return JSON.stringify(this, (k, val) => typeof val === 'bigint' ? val.toString() : val) } /** diff --git a/lib/handlers/404.js b/lib/handlers/404.js index bf1cb337..daba3363 100644 --- a/lib/handlers/404.js +++ b/lib/handlers/404.js @@ -1,6 +1,6 @@ function handler (req, res) { req.log.debug('unsupported', req.url) - return res.send('404 Not Supported') + return res.code(404).send('404 Not Supported') } module.exports = handler diff --git a/lib/handlers/datadog_log_push.js b/lib/handlers/datadog_log_push.js index cbb89883..a1cd8677 100644 --- a/lib/handlers/datadog_log_push.js +++ b/lib/handlers/datadog_log_push.js @@ -18,6 +18,11 @@ */ const stringify = require('../utils').stringify +const DATABASE = require('../db/clickhouse') +const { bulk_labels, bulk, labels } = DATABASE.cache +const { fingerPrint } = require('../utils') +const { readonly } = require('../../common') + const tagsToObject = (data, delimiter = ',') => Object.fromEntries(data.split(',').map(v => { const fields = v.split(':') @@ -25,13 +30,12 @@ const tagsToObject = (data, delimiter = ',') => })) async function handler (req, res) { - const self = this req.log.debug('Datadog Log Index Request') if (!req.body) { req.log.error('No Request Body or Target!') return res.code(400).send('{"status":400, "error": { "reason": "No Request Body" } }') } - if (this.readonly) { + if (readonly) { req.log.error('Readonly! No push support.') return res.code(400).send('{"status":400, "error": { "reason": "Read Only Mode" } }') } @@ -69,9 +73,9 @@ async function handler (req, res) { } // Calculate Fingerprint const strJson = stringify(JSONLabels) - finger = self.fingerPrint(strJson) + finger = fingerPrint(strJson) // Store Fingerprint - promises.push(self.bulk_labels.add([[ + promises.push(bulk_labels.add([[ new Date().toISOString().split('T')[0], finger, strJson, @@ -79,8 +83,8 @@ async function handler (req, res) { ]])) for (const key in JSONLabels) { req.log.debug({ key, data: JSONLabels[key] }, 'Storing label') - self.labels.add('_LABELS_', key) - self.labels.add(key, JSONLabels[key]) + labels.add('_LABELS_', key) + labels.add(key, JSONLabels[key]) } } catch (err) { req.log.error({ err }, 'failed ingesting datadog log') @@ -94,7 +98,7 @@ async function handler (req, res) { stream.message ] req.log.debug({ finger, values }, 'store') - promises.push(self.bulk.add([values])) + promises.push(bulk.add([values])) }) } await Promise.all(promises) diff --git a/lib/handlers/datadog_series_push.js b/lib/handlers/datadog_series_push.js index f7f92420..58cf1863 100644 --- a/lib/handlers/datadog_series_push.js +++ b/lib/handlers/datadog_series_push.js @@ -25,16 +25,19 @@ */ const stringify = require('../utils').stringify +const DATABASE = require('../db/clickhouse') +const { bulk_labels, bulk, labels } = DATABASE.cache +const { fingerPrint } = require('../utils') +const { readonly } = require('../../common') async function handler (req, res) { - const self = this req.log.debug('Datadog Series Index Request') if (!req.body) { req.log.error('No Request Body!') res.code(500).send() return } - if (this.readonly) { + if (readonly) { req.log.error('Readonly! No push support.') res.code(500).send() return @@ -63,18 +66,18 @@ async function handler (req, res) { } // Calculate Fingerprint const strJson = stringify(JSONLabels) - finger = self.fingerPrint(strJson) - self.labels.add(finger.toString(), stream.labels) + finger = fingerPrint(strJson) + labels.add(finger.toString(), stream.labels) // Store Fingerprint - promises.push(self.bulk_labels.add([[ + promises.push(bulk_labels.add([[ new Date().toISOString().split('T')[0], finger, strJson, JSONLabels.__name__ || 'undefined' ]])) for (const key in JSONLabels) { - self.labels.add('_LABELS_', key) - self.labels.add(key, JSONLabels[key]) + labels.add('_LABELS_', key) + labels.add(key, JSONLabels[key]) } } catch (err) { req.log.error({ err }) @@ -97,7 +100,7 @@ async function handler (req, res) { entry.value, JSONLabels.__name__ || 'undefined' ] - promises.push(self.bulk.add([values])) + promises.push(bulk.add([values])) }) } }) diff --git a/lib/handlers/elastic_bulk.js b/lib/handlers/elastic_bulk.js index afa3a418..f7668539 100644 --- a/lib/handlers/elastic_bulk.js +++ b/lib/handlers/elastic_bulk.js @@ -8,15 +8,18 @@ const { asyncLogError } = require('../../common') const stringify = require('../utils').stringify +const DATABASE = require('../db/clickhouse') +const { bulk_labels, bulk, labels } = DATABASE.cache +const { fingerPrint } = require('../utils') +const { readonly } = require('../../common') async function handler (req, res) { - const self = this req.log.debug('ELASTIC Bulk Request') if (!req.body) { asyncLogError('No Request Body or Target!' + req.body, req.log) return res.code(400).send('{"status":400, "error": { "reason": "No Request Body" } }') } - if (this.readonly) { + if (readonly) { asyncLogError('Readonly! No push support.', req.log) return res.code(400).send('{"status":400, "error": { "reason": "Read Only Mode" } }') } @@ -38,6 +41,9 @@ async function handler (req, res) { const promises = [] if (streams) { streams.forEach(function (stream) { + if (!stream) { + return + } try { stream = JSON.parse(stream) } catch (err) { asyncLogError(err, req.log); return }; @@ -67,10 +73,10 @@ async function handler (req, res) { } // Calculate Fingerprint const strJson = stringify(JSONLabels) - finger = self.fingerPrint(strJson) + finger = fingerPrint(strJson) req.log.debug({ JSONLabels, finger }, 'LABELS FINGERPRINT') // Store Fingerprint - promises.push(self.bulk_labels.add([[ + promises.push(bulk_labels.add([[ new Date().toISOString().split('T')[0], finger, strJson, @@ -78,8 +84,8 @@ async function handler (req, res) { ]])) for (const key in JSONLabels) { req.log.debug({ key, data: JSONLabels[key] }, 'Storing label') - self.labels.add('_LABELS_', key) - self.labels.add(key, JSONLabels[key]) + labels.add('_LABELS_', key) + labels.add(key, JSONLabels[key]) } } catch (err) { asyncLogError(err, req.log) @@ -93,7 +99,7 @@ async function handler (req, res) { JSON.stringify(stream) || stream ] req.log.debug({ finger, values }, 'store') - promises.push(self.bulk.add([values])) + promises.push(bulk.add([values])) // Reset State, Expect Command lastTags = false diff --git a/lib/handlers/elastic_index.js b/lib/handlers/elastic_index.js index 19528092..ee314c45 100644 --- a/lib/handlers/elastic_index.js +++ b/lib/handlers/elastic_index.js @@ -11,15 +11,19 @@ const { asyncLogError } = require('../../common') const stringify = require('../utils').stringify +const DATABASE = require('../db/clickhouse') +const { bulk_labels, bulk, labels } = DATABASE.cache +const { fingerPrint } = require('../utils') +const { readonly } = require('../../common') + async function handler (req, res) { - const self = this req.log.debug('ELASTIC Index Request') if (!req.body || !req.params.target) { asyncLogError('No Request Body or Target!', req.log) return res.code(400).send('{"status":400, "error": { "reason": "No Request Body" } }') } - if (this.readonly) { + if (readonly) { asyncLogError('Readonly! No push support.', req.log) return res.code(400).send('{"status":400, "error": { "reason": "Read Only Mode" } }') } @@ -57,9 +61,9 @@ async function handler (req, res) { } // Calculate Fingerprint const strJson = stringify(JSONLabels) - finger = self.fingerPrint(strJson) + finger = fingerPrint(strJson) // Store Fingerprint - promises.push(self.bulk_labels.add([[ + promises.push(bulk_labels.add([[ new Date().toISOString().split('T')[0], finger, strJson, @@ -67,8 +71,8 @@ async function handler (req, res) { ]])) for (const key in JSONLabels) { req.log.debug({ key, data: JSONLabels[key] }, 'Storing label') - self.labels.add('_LABELS_', key) - self.labels.add(key, JSONLabels[key]) + labels.add('_LABELS_', key) + labels.add(key, JSONLabels[key]) } } catch (err) { asyncLogError(err, req.log) @@ -87,7 +91,7 @@ async function handler (req, res) { JSON.stringify(stream) || stream ] req.log.debug({ finger, values }, 'store') - promises.push(self.bulk.add([values])) + promises.push(bulk.add([values])) }) } await Promise.all(promises) diff --git a/lib/handlers/influx_write.js b/lib/handlers/influx_write.js index 5563f8a9..42a93103 100644 --- a/lib/handlers/influx_write.js +++ b/lib/handlers/influx_write.js @@ -39,14 +39,17 @@ const stringify = require('../utils').stringify const influxParser = require('../influx') const { asyncLogError, errors } = require('../../common') +const DATABASE = require('../db/clickhouse') +const { bulk_labels, bulk, labels } = DATABASE.cache +const { fingerPrint } = require('../utils') +const { readonly } = require('../../common') async function handler (req, res) { - const self = this if (!req.body && !req.body.metrics) { asyncLogError('No Request Body!', req.log) return } - if (self.readonly) { + if (readonly) { asyncLogError('Readonly! No push support.', req.log) return res.code(500).send('') } @@ -75,10 +78,10 @@ async function handler (req, res) { } // Calculate Fingerprint const strLabels = stringify(Object.fromEntries(Object.entries(JSONLabels).sort())) - finger = self.fingerPrint(strLabels) - self.labels.add(finger.toString(), stream.labels) + finger = fingerPrint(strLabels) + labels.add(finger.toString(), stream.labels) // Store Fingerprint - self.bulk_labels.add([[ + bulk_labels.add([[ new Date().toISOString().split('T')[0], finger, strLabels, @@ -86,8 +89,8 @@ async function handler (req, res) { ]]) for (const key in JSONLabels) { // req.log.debug({ key, data: JSONLabels[key] }, 'Storing label'); - self.labels.add('_LABELS_', key) - self.labels.add(key, JSONLabels[key]) + labels.add('_LABELS_', key) + labels.add(key, JSONLabels[key]) } } catch (err) { asyncLogError(err, req.log) @@ -111,7 +114,7 @@ async function handler (req, res) { value || 0, key || '' ] - self.bulk.add([values]) + bulk.add([values]) } /* logs or syslog */ } else if (stream.measurement === 'syslog' || JSONFields.message) { @@ -123,7 +126,7 @@ async function handler (req, res) { null, JSONFields.message ] - self.bulk.add([values]) + bulk.add([values]) } }) } diff --git a/lib/handlers/newrelic_log_push.js b/lib/handlers/newrelic_log_push.js index c4b6fb6a..dda46c96 100644 --- a/lib/handlers/newrelic_log_push.js +++ b/lib/handlers/newrelic_log_push.js @@ -31,15 +31,18 @@ const { QrynBadRequest } = require('./errors') const stringify = require('../utils').stringify +const DATABASE = require('../db/clickhouse') +const { bulk_labels, bulk, labels } = DATABASE.cache +const { fingerPrint } = require('../utils') +const { readonly } = require('../../common') async function handler (req, res) { - const self = this req.log.debug('NewRelic Log Index Request') if (!req.body) { req.log.error('No Request Body') throw new QrynBadRequest('No request body') } - if (this.readonly) { + if (readonly) { req.log.error('Readonly! No push support.') throw new QrynBadRequest('Read only mode') } @@ -77,12 +80,12 @@ async function handler (req, res) { // Calculate Fingerprint const strJson = stringify(JSONLabels) - finger = self.fingerPrint(strJson) + finger = fingerPrint(strJson) // Store Fingerprint for (const key in JSONLabels) { req.log.debug({ key, data: JSONLabels[key] }, 'Storing label') - self.labels.add('_LABELS_', key) - self.labels.add(key, JSONLabels[key]) + labels.add('_LABELS_', key) + labels.add(key, JSONLabels[key]) } const dates = {} @@ -99,11 +102,11 @@ async function handler (req, res) { null, log.message ] - promises.push(self.bulk.add([values])) + promises.push(bulk.add([values])) }) } for (const d of Object.keys(dates)) { - promises.push(self.bulk_labels.add([[ + promises.push(bulk_labels.add([[ d, finger, strJson, diff --git a/lib/handlers/otlp_push.js b/lib/handlers/otlp_push.js index 1c93d30d..73a62d1e 100644 --- a/lib/handlers/otlp_push.js +++ b/lib/handlers/otlp_push.js @@ -17,16 +17,9 @@ }] */ -const { Transform } = require('stream') const { asyncLogError } = require('../../common') - -function handleOne (req, streams, promises) { - const self = this - streams.on('data', function (stream) { - stream = stream.value - promises.push(self.pushZipkin([stream])) - }) -} +const { pushOTLP } = require('../db/clickhouse') +const { readonly } = require('../../common') async function handler (req, res) { req.log.debug('POST /tempo/api/push') @@ -34,7 +27,7 @@ async function handler (req, res) { asyncLogError('No Request Body!', req.log) return res.code(500).send() } - if (this.readonly) { + if (readonly) { asyncLogError('Readonly! No push support.', req.log) return res.code(500).send() } @@ -53,7 +46,7 @@ async function handler (req, res) { spans.push.apply(spans, scope.spans) } } - await this.pushOTLP(spans) + await pushOTLP(spans) return res.code(200).send('OK') } diff --git a/lib/handlers/prom_push.js b/lib/handlers/prom_push.js index 9fcf36ae..249317e9 100644 --- a/lib/handlers/prom_push.js +++ b/lib/handlers/prom_push.js @@ -13,6 +13,10 @@ */ const { asyncLogError } = require('../../common') const stringify = require('../utils').stringify +const DATABASE = require('../db/clickhouse') +const { bulk_labels, bulk, labels } = DATABASE.cache +const { fingerPrint } = require('../utils') +const { readonly } = require('../../common') async function handler (req, res) { const self = this @@ -21,7 +25,7 @@ async function handler (req, res) { asyncLogError('No Request Body!', req.log) return res.code(500).send() } - if (this.readonly) { + if (readonly) { asyncLogError('Readonly! No push support.', req.log) return res.code(500).send() } @@ -41,9 +45,9 @@ async function handler (req, res) { }, {}) // Calculate Fingerprint const strJson = stringify(JSONLabels) - finger = self.fingerPrint(strJson) + finger = fingerPrint(strJson) req.log.debug({ labels: stream.labels, finger }, 'LABELS FINGERPRINT') - self.labels.add(finger.toString(), stream.labels) + labels.add(finger.toString(), stream.labels) const dates = {} if (stream.samples) { @@ -67,20 +71,20 @@ async function handler (req, res) { dates[ new Date(parseInt((ts / BigInt('1000000')).toString())).toISOString().split('T')[0] ] = 1 - promises.push(self.bulk.add([values])) + promises.push(bulk.add([values])) }) } for (const d of Object.keys(dates)) { // Store Fingerprint - promises.push(self.bulk_labels.add([[ + promises.push(bulk_labels.add([[ d, finger, strJson, JSONLabels.__name__ || 'undefined' ]])) for (const key in JSONLabels) { - self.labels.add('_LABELS_', key) - self.labels.add(key, JSONLabels[key]) + labels.add('_LABELS_', key) + labels.add(key, JSONLabels[key]) } } } catch (err) { diff --git a/lib/handlers/prom_query.js b/lib/handlers/prom_query.js index 91a1ec70..62938963 100644 --- a/lib/handlers/prom_query.js +++ b/lib/handlers/prom_query.js @@ -2,6 +2,7 @@ const { p2l } = require('@qxip/promql2logql'); const { asyncLogError, CORS } = require('../../common') +const { instantQueryScan } = require('../db/clickhouse') const empty = '{"status" : "success", "data" : {"resultType" : "scalar", "result" : []}}'; // to be removed const test = () => `{"status" : "success", "data" : {"resultType" : "scalar", "result" : [${Math.floor(Date.now() / 1000)}, "2"]}}`; // to be removed const exec = (val) => `{"status" : "success", "data" : {"resultType" : "scalar", "result" : [${Math.floor(Date.now() / 1000)}, val]}}`; // to be removed @@ -40,7 +41,7 @@ async function handler (req, res) { /* scan fingerprints */ /* TODO: handle time tag + direction + limit to govern the query output */ try { - const response = await this.instantQueryScan( + const response = await instantQueryScan( req.query ) res.code(200) diff --git a/lib/handlers/prom_query_range.js b/lib/handlers/prom_query_range.js index 74cd3460..987563ac 100644 --- a/lib/handlers/prom_query_range.js +++ b/lib/handlers/prom_query_range.js @@ -11,6 +11,7 @@ const { p2l } = require('@qxip/promql2logql') const { asyncLogError, CORS } = require('../../common') +const { scanFingerprints } = require('../db/clickhouse') async function handler (req, res) { req.log.debug('GET /api/v1/query_range') @@ -35,7 +36,7 @@ async function handler (req, res) { // Convert PromQL to LogQL and execute try { req.query.query = p2l(req.query.query) - const response = await this.scanFingerprints( + const response = await scanFingerprints( { ...req.query, start: parseInt(req.query.start) * 1e9, diff --git a/lib/handlers/push.js b/lib/handlers/push.js index 8380ba60..daef55c7 100644 --- a/lib/handlers/push.js +++ b/lib/handlers/push.js @@ -21,8 +21,13 @@ const FilterBase = require('stream-json/filters/FilterBase') const StreamValues = require('stream-json/streamers/StreamValues') const logger = require('../logger') const UTILS = require('../utils') +const DATABASE = require('../db/clickhouse') const { asyncLogError } = require('../../common') const stringify = UTILS.stringify +const fingerPrint = UTILS.fingerPrint +const { bulk_labels, bulk, labels } = DATABASE.cache +const toJson = UTILS.toJSON +const { readonly } = require('../../common') function processStream (stream, labels, bulkLabels, bulk, toJSON, fingerPrint) { let finger = null @@ -97,14 +102,13 @@ function processStream (stream, labels, bulkLabels, bulk, toJSON, fingerPrint) { } async function handler (req, res) { - const self = this req.log.debug('POST /loki/api/v1/push') if (!req.body) { - await processRawPush(req, self.labels, self.bulk_labels, self.bulk, - self.toJSON, self.fingerPrint) + await processRawPush(req, DATABASE.cache.labels, bulk_labels, bulk, + toJSON, fingerPrint) return res.code(200).send() } - if (this.readonly) { + if (readonly) { asyncLogError('Readonly! No push support.', req.log) return res.code(500).send() } @@ -124,8 +128,9 @@ async function handler (req, res) { const promises = [] if (streams) { streams.forEach(function (stream) { - promises.push(processStream(stream, self.labels, self.bulk_labels, self.bulk, - self.toJSON, self.fingerPrint)) + promises.push(processStream(stream, + DATABASE.cache.labels, DATABASE.cache.bulk_labels, DATABASE.cache.bulk, + UTILS.toJSON, fingerPrint)) }) await Promise.all(promises) } diff --git a/lib/handlers/query.js b/lib/handlers/query.js index df100fd1..aeb422d3 100644 --- a/lib/handlers/query.js +++ b/lib/handlers/query.js @@ -1,5 +1,6 @@ // Query Handler const { asyncLogError, CORS } = require('../../common') +const { instantQueryScan } = require('../db/clickhouse') async function handler (req, res) { req.log.debug('GET /loki/api/v1/query') @@ -12,7 +13,7 @@ async function handler (req, res) { /* scan fingerprints */ /* TODO: handle time tag + direction + limit to govern the query output */ try { - const response = await this.instantQueryScan(req.query) + const response = await instantQueryScan(req.query) res.code(200) res.headers({ 'Content-Type': 'application/json', diff --git a/lib/handlers/query_range.js b/lib/handlers/query_range.js index 66524a74..6939ff87 100644 --- a/lib/handlers/query_range.js +++ b/lib/handlers/query_range.js @@ -12,6 +12,7 @@ const { parseCliQL } = require('../cliql') const { checkCustomPlugins } = require('./common') const { asyncLogError, CORS } = require('../../common') +const { scanFingerprints, scanClickhouse } = require('../db/clickhouse') async function handler (req, res) { req.log.debug('GET /loki/api/v1/query_range') @@ -27,7 +28,7 @@ async function handler (req, res) { } const cliqlParams = parseCliQL(req.query.query) if (cliqlParams) { - this.scanClickhouse(cliqlParams, res, params) + scanClickhouse(cliqlParams, res, params) return } const pluginOut = await checkCustomPlugins(req.query) @@ -37,7 +38,7 @@ async function handler (req, res) { } req.query.optimizations = true try { - const response = await this.scanFingerprints(req.query) + const response = await scanFingerprints(req.query) res.code(200) res.headers({ 'Content-Type': 'application/json', diff --git a/lib/handlers/series.js b/lib/handlers/series.js index 3c856515..8f0bc62f 100644 --- a/lib/handlers/series.js +++ b/lib/handlers/series.js @@ -13,6 +13,7 @@ async function handler (req, res) { if (!match.length) { match = getArray(req.query['match[]']) } + console.log(match) if (!match.length) { throw new Error('Match param is required') } diff --git a/lib/handlers/tags.js b/lib/handlers/tags.js index d1aad9d9..43976fdb 100644 --- a/lib/handlers/tags.js +++ b/lib/handlers/tags.js @@ -11,9 +11,12 @@ } */ +const DATABASE = require('../db/clickhouse') +const { labels } = DATABASE.cache + function handler (req, res) { req.log.debug('GET /api/search/tags') - const allLabels = this.labels.get('_LABELS_') + const allLabels = labels.get('_LABELS_') const resp = { tagNames: allLabels } return res.send(resp) }; diff --git a/lib/handlers/tags_values.js b/lib/handlers/tags_values.js index ccc40ee5..72a186b1 100644 --- a/lib/handlers/tags_values.js +++ b/lib/handlers/tags_values.js @@ -11,15 +11,18 @@ } */ +const DATABASE = require('../db/clickhouse') +const { labels } = DATABASE.cache + function handler (req, res) { req.log.debug(`GET /api/search/tag/${req.params.name}/values`) if (req.params.name.includes('.')) { var tag = req.params.name.split('.').reduce((a, b) => a + b.charAt(0).toUpperCase() + b.slice(1)); - const allValues = this.labels.get(tag) + const allValues = labels.get(tag) const resp = { tagValues: allValues } return res.send(resp) } else { - const allValues = this.labels.get(req.params.name) + const allValues = labels.get(req.params.name) const resp = { tagValues: allValues } return res.send(resp) } diff --git a/lib/handlers/telegraf.js b/lib/handlers/telegraf.js index b5fe64ec..a21e1b20 100644 --- a/lib/handlers/telegraf.js +++ b/lib/handlers/telegraf.js @@ -12,13 +12,17 @@ const { asyncLogError } = require('../../common') const stringify = require('../utils').stringify +const DATABASE = require('../db/clickhouse') +const { bulk_labels, bulk, labels } = DATABASE.cache +const { fingerPrint } = require('../utils') +const { readonly } = require('../../common') function handler (req, res) { if (!req.body && !req.body.metrics) { asyncLogError('No Request Body!', req.log) return } - if (this.readonly) { + if (readonly) { asyncLogError('Readonly! No push support.', req.log) return res.send(500) } @@ -35,11 +39,11 @@ function handler (req, res) { JSONLabels.metric = stream.name // Calculate Fingerprint const strLabels = stringify(JSONLabels) - finger = this.fingerPrint(strLabels) + finger = fingerPrint(strLabels) req.log.debug({ JSONLabels, finger }, 'LABELS FINGERPRINT') - this.labels.add(finger.toString(), stream.labels) + labels.add(finger.toString(), stream.labels) // Store Fingerprint - this.bulk_labels.add(finger.toString(), [ + bulk_labels.add(finger.toString(), [ new Date().toISOString().split('T')[0], finger, strLabels, @@ -47,8 +51,8 @@ function handler (req, res) { ]) for (const key in JSONLabels) { // req.log.debug({ key, data: JSONLabels[key] }, 'Storing label'); - this.labels.add('_LABELS_', key) - this.labels.add(key, JSONLabels[key]) + labels.add('_LABELS_', key) + labels.add(key, JSONLabels[key]) } } catch (err) { asyncLogError(err, req.log) @@ -70,7 +74,7 @@ function handler (req, res) { stream.fields[entry] || 0, stream.fields[entry].toString() || '' ] - this.bulk.add(values) + bulk.add(values) }) } }) diff --git a/lib/handlers/tempo_push.js b/lib/handlers/tempo_push.js index e3662e3a..091be460 100644 --- a/lib/handlers/tempo_push.js +++ b/lib/handlers/tempo_push.js @@ -19,12 +19,14 @@ const { Transform } = require('stream') const { asyncLogError } = require('../../common') +const { readonly } = require('../../common') +const { pushZipkin } = require('../db/clickhouse') function handleOne (req, streams, promises) { const self = this streams.on('data', function (stream) { stream = stream.value - promises.push(self.pushZipkin([stream])) + promises.push(pushZipkin([stream])) }) } @@ -34,7 +36,7 @@ async function handler (req, res) { asyncLogError('No Request Body!', req.log) return res.code(500).send() } - if (this.readonly) { + if (readonly) { asyncLogError('Readonly! No push support.', req.log) return res.code(500).send() } diff --git a/lib/handlers/tempo_search.js b/lib/handlers/tempo_search.js index 2a52ace1..cf62c985 100644 --- a/lib/handlers/tempo_search.js +++ b/lib/handlers/tempo_search.js @@ -16,6 +16,7 @@ const logfmt = require('logfmt') const common = require('../../common') const { asyncLogError, CORS } = require('../../common') +const { scanTempo } = require('../db/clickhouse') async function handler (req, res) { req.log.debug('GET /api/search') @@ -39,7 +40,7 @@ async function handler (req, res) { req.log.debug(`Search Tempo ${req.query.query}, ${req.query.start}, ${req.query.end}`) try { - let resp = await this.scanTempo( + let resp = await scanTempo( req.query ) resp = [...resp.v2, ...resp.v1] diff --git a/lib/handlers/tempo_tags.js b/lib/handlers/tempo_tags.js index f6873f98..fe579eef 100644 --- a/lib/handlers/tempo_tags.js +++ b/lib/handlers/tempo_tags.js @@ -1,8 +1,8 @@ const { asyncLogError } = require('../../common') - +const { queryTempoTags } = require('../db/clickhouse') async function handler (req, res) { try { - const resp = await this.queryTempoTags() + const resp = await queryTempoTags() return res.send(resp.map(e => e.key)) } catch (e) { asyncLogError(e, req.log) diff --git a/lib/handlers/tempo_traces.js b/lib/handlers/tempo_traces.js index 79b8d050..3e832495 100644 --- a/lib/handlers/tempo_traces.js +++ b/lib/handlers/tempo_traces.js @@ -16,6 +16,7 @@ const TraceDataType = protoBuff.loadSync(__dirname + '/../opentelemetry/proto/tr const { stringify } = require('logfmt') const { flatOTLPAttrs, OTLPgetServiceNames } = require('../utils') const { asyncLogError } = require('../../common') +const { tempoQueryScan } = require('../db/clickhouse') function pad (pad, str, padLeft) { if (typeof str === 'undefined') { @@ -48,20 +49,18 @@ async function handler (req, res) { } /* transpile trace params to logql selector */ - if (req.query.tags) { + /*if (req.query.tags) { req.query.query = `{${req.query.tags}}` if (req.params.traceId) req.query.query += ` |~ "${req.params.traceId}"` } else if (this.tempo_tagtrace) { req.query.query = `{traceId="${req.params.traceId}"}` } else { req.query.query = `{type="tempo"} |~ "${req.params.traceId}"` - } - - req.log.debug('Scan Tempo', req.query, req.params.traceId) + }*/ /* TODO: handle time tag + direction + limit to govern the query output */ try { - const resp = await this.tempoQueryScan( + const resp = await tempoQueryScan( req.query, res, req.params.traceId ) /* Basic Structure for traces/v1 Protobuf encoder */ diff --git a/lib/handlers/tempo_values.js b/lib/handlers/tempo_values.js index a5528cfa..e84eff0d 100644 --- a/lib/handlers/tempo_values.js +++ b/lib/handlers/tempo_values.js @@ -11,6 +11,7 @@ } */ const { asyncLogError } = require('../../common') +const { queryTempoValues } = require('../db/clickhouse') async function handler (req, res) { req.log.debug(`GET /api/search/tag/${req.params.name}/values`) @@ -18,7 +19,7 @@ async function handler (req, res) { return res.send({ tagValues: [] }) } try { - const vals = (await this.queryTempoValues(req.params.name)).map(e => e.val) + const vals = (await queryTempoValues(req.params.name)).map(e => e.val) return res.send({ tagValues: vals }) } catch (e) { asyncLogError(e, req.log) diff --git a/package-lock.json b/package-lock.json index 68960f27..38bbf90b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -20,6 +20,7 @@ "@qxip/influx-line-protocol-parser": "^0.2.1", "@qxip/plugnplay": "^3.3.1", "@qxip/promql2logql": "^1.0.30", + "@stricjs/router": "^4.2.9", "axios": "^0.21.4", "bnf": "^1.0.1", "csv-writer": "^1.6.0", @@ -54,8 +55,8 @@ "yaml": "^1.10.2" }, "bin": { - "cloki": "qryn.js", - "qryn": "qryn.js" + "cloki": "qryn.mjs", + "qryn": "qryn.mjs" }, "devDependencies": { "@elastic/elasticsearch": "^8.5.0", @@ -2521,6 +2522,11 @@ "@sinonjs/commons": "^1.7.0" } }, + "node_modules/@stricjs/router": { + "version": "4.2.9", + "resolved": "https://registry.npmjs.org/@stricjs/router/-/router-4.2.9.tgz", + "integrity": "sha512-YHrfcQIOR+zfQxRSFKZe7Alhx3xzVHBkBfiqBBKjafVuf+2//1y2Xrc3rNxf4F4bPm7J35ZmRZkqRoi15/QQLQ==" + }, "node_modules/@tootallnate/once": { "version": "1.1.2", "resolved": "https://registry.npmjs.org/@tootallnate/once/-/once-1.1.2.tgz", @@ -12992,9 +12998,9 @@ } }, "node_modules/typescript": { - "version": "4.8.4", - "resolved": "https://registry.npmjs.org/typescript/-/typescript-4.8.4.tgz", - "integrity": "sha512-QCh+85mCy+h0IGff8r5XWzOVSbBO+KfeYrMQh7NJ58QujwcE22u+NUSmUxqF+un70P9GXKxa2HCNiTTMJknyjQ==", + "version": "5.2.2", + "resolved": "https://registry.npmjs.org/typescript/-/typescript-5.2.2.tgz", + "integrity": "sha512-mI4WrpHsbCIcwT9cF4FZvr80QUeKvsUsUvKDoR+X/7XHQH98xYD8YHZg7ANtz2GtZt/CBq2QJ0thkGJMHfqc1w==", "dev": true, "peer": true, "bin": { @@ -13002,7 +13008,7 @@ "tsserver": "bin/tsserver" }, "engines": { - "node": ">=4.2.0" + "node": ">=14.17" } }, "node_modules/uglify-js": { @@ -15406,6 +15412,11 @@ "@sinonjs/commons": "^1.7.0" } }, + "@stricjs/router": { + "version": "4.2.9", + "resolved": "https://registry.npmjs.org/@stricjs/router/-/router-4.2.9.tgz", + "integrity": "sha512-YHrfcQIOR+zfQxRSFKZe7Alhx3xzVHBkBfiqBBKjafVuf+2//1y2Xrc3rNxf4F4bPm7J35ZmRZkqRoi15/QQLQ==" + }, "@tootallnate/once": { "version": "1.1.2", "resolved": "https://registry.npmjs.org/@tootallnate/once/-/once-1.1.2.tgz", @@ -23453,9 +23464,9 @@ } }, "typescript": { - "version": "4.8.4", - "resolved": "https://registry.npmjs.org/typescript/-/typescript-4.8.4.tgz", - "integrity": "sha512-QCh+85mCy+h0IGff8r5XWzOVSbBO+KfeYrMQh7NJ58QujwcE22u+NUSmUxqF+un70P9GXKxa2HCNiTTMJknyjQ==", + "version": "5.2.2", + "resolved": "https://registry.npmjs.org/typescript/-/typescript-5.2.2.tgz", + "integrity": "sha512-mI4WrpHsbCIcwT9cF4FZvr80QUeKvsUsUvKDoR+X/7XHQH98xYD8YHZg7ANtz2GtZt/CBq2QJ0thkGJMHfqc1w==", "dev": true, "peer": true }, diff --git a/package.json b/package.json index 36c565f3..41d29fb9 100644 --- a/package.json +++ b/package.json @@ -2,20 +2,20 @@ "name": "qryn", "version": "2.4.4", "description": "Polyglot Observability Stack with ClickHouse storage", - "main": "qryn.js", + "main": "qryn.mjs", "bin": { - "cloki": "./qryn.js", - "qryn": "./qryn.js" + "cloki": "./qryn.mjs", + "qryn": "./qryn.mjs" }, "scripts": { "test": "jest", - "start": "node qryn.js", - "pretty": "node qryn.js | pino-pretty", + "start": "node qryn.mjs", + "pretty": "node qryn.mjs | pino-pretty", "postinstall": "patch-package", "install-view": "mkdir -p view && curl -L https://github.com/metrico/cloki-view/releases/latest/download/dist.zip | busybox unzip - -d ./view", "lint": "npx eslint --fix *.js lib parser plugins test", "bun-install": "bun install --platform node", - "bun-start": "bun run qryn.js" + "bun-start": "bun run qryn.mjs" }, "standard": { "env": [ @@ -74,7 +74,8 @@ "yaml": "^1.10.2", "@qxip/promql2logql": "^1.0.30", "node-gzip": "^1.1.2", - "csv-writer": "^1.6.0" + "csv-writer": "^1.6.0", + "@stricjs/router": "^4.2.9" }, "devDependencies": { "casual": "^1.6.2", diff --git a/parsers.js b/parsers.js index 0bfdb057..e994dff3 100644 --- a/parsers.js +++ b/parsers.js @@ -33,11 +33,12 @@ const wwwFormParser = async (req, payload) => { const lokiPushJSONParser = async (req, payload) => { try { const length = getContentLength(req, 1e9) - if (length > 5e6) { + if (length > 5 * 1024 * 1024) { return } await shaper.register(length) - return JSON.parse(await getContentBody(req, payload)) + const body = await getContentBody(req, payload) + return JSON.parse(body) } catch (err) { err.statusCode = 400 throw err @@ -331,9 +332,9 @@ async function getContentBody (req, payload) { if (req._rawBody) { return req._rawBody } - let body = '' + const body = [] payload.on('data', data => { - body += data.toString() + body.push(data)// += data.toString() }) if (payload.isPaused && payload.isPaused()) { payload.resume() @@ -342,8 +343,8 @@ async function getContentBody (req, payload) { payload.on('end', resolve) payload.on('close', resolve) }) - req._rawBody = body - return body + req._rawBody = Buffer.concat(body).toString() + return Buffer.concat(body).toString() } module.exports = { diff --git a/pm2.ecosystem.js b/pm2.ecosystem.js index c623ecaa..135bf447 100644 --- a/pm2.ecosystem.js +++ b/pm2.ecosystem.js @@ -1,7 +1,7 @@ module.exports = { apps: [{ name: 'qryn', - script: './qryn.js', + script: './qryn.mjs', env: { CLICKHOUSE_SERVER: 'localhost', CLICKHOUSE_PORT: 8123, diff --git a/qryn.mjs b/qryn.mjs new file mode 100644 index 00000000..a4a0e693 --- /dev/null +++ b/qryn.mjs @@ -0,0 +1,9 @@ +import {init} from './qryn_node_wrapper.js' +import {bun} from './common.js' +import bunInit from './qryn_bun.mjs' + +if (bun()) { + bunInit() +} else { + init() +} diff --git a/qryn_bun.mjs b/qryn_bun.mjs new file mode 100644 index 00000000..9f5b1332 --- /dev/null +++ b/qryn_bun.mjs @@ -0,0 +1,288 @@ +import { Router } from '@stricjs/router'; +import { wrapper, wsWrapper } from './lib/bun_wrapper.js'; + +import { + combinedParser, + jsonParser, + lokiPushJSONParser, + lokiPushProtoParser, otlpPushProtoParser, prometheusPushProtoParser, + rawStringParser, + tempoPushNDJSONParser, + tempoPushParser, wwwFormParser, yamlParser +} from './parsers.js' +import handlerPush from './lib/handlers/push.js' +import handle404 from './lib/handlers/404.js' +import handlerHello from './lib/handlers/ready.js' +import handlerElasticPush from './lib/handlers/elastic_index.js' +import handlerElasticBulk from './lib/handlers/elastic_bulk.js' +import handlerTempoPush from './lib/handlers/tempo_push.js' +import handlerTempoTraces from './lib/handlers/tempo_traces.js' +import handlerTempoLabel from './lib/handlers/tempo_tags.js' +import handlerTempoLabelValues from './lib/handlers/tempo_values.js' +import handlerTempoSearch from './lib/handlers/tempo_search.js' +import handlerTempoEcho from './lib/handlers/echo.js' +import handlerTelegraf from './lib/handlers/telegraf.js' +import handlerDatadogLogPush from './lib/handlers/datadog_log_push.js' +import handlerDatadogSeriesPush from './lib/handlers/datadog_series_push.js' +import handlerQueryRange from './lib/handlers/query_range.js' +import handlerQuery from './lib/handlers/query.js' +import handlerLabel from './lib/handlers/label.js' +import handlerLabelValues from './lib/handlers/label_values.js' +import handlerSeries from './lib/handlers/series.js' +import handlerPromSeries from './lib/handlers/prom_series.js' +import promWriteHandler from './lib/handlers/prom_push.js' +import handlerPromQueryRange from './lib/handlers/prom_query_range.js' +import handlerPromQuery from './lib/handlers/prom_query.js' +import handlerPromLabel from './lib/handlers/promlabel.js' +import handlerPromLabelValues from './lib/handlers/promlabel_values.js' +import handlerPromDefault from './lib/handlers/prom_default.js' +import handlerNewrelicLogPush from './lib/handlers/newrelic_log_push.js' +import handlerInfluxWrite from './lib/handlers/influx_write.js' +import handlerInfluxHealth from './lib/handlers/influx_health.js' +import handlerOTLPPush from './lib/handlers/otlp_push.js' +import handlerGetRules from './lib/handlers/alerts/get_rules.js' +import handlerGetGroup from './lib/handlers/alerts/get_group.js' +import handlerPostGroup from './lib/handlers/alerts/post_group.js' +import handlerDelGroup from './lib/handlers/alerts/del_group.js' +import handlerDelNS from './lib/handlers/alerts/del_ns.js' +import handlerPromGetRules from './lib/handlers/alerts/prom_get_rules.js' +import handlerTail from './lib/handlers/tail.js' + +import { readonly } from './common.js' +import DATABASE, { init } from './lib/db/clickhouse.js' +import { startAlerting } from './lib/db/alerting/index.js' + +export default async() => { + if (!readonly) { + await init(process.env.CLICKHOUSE_DB || 'cloki') + await startAlerting() + } + await DATABASE.checkDB() + + const app = new Router() + + app.get('/hello', wrapper(handlerHello)) + .get('/ready', wrapper(handlerHello)) + .post('/loki/api/v1/push', wrapper(handlerPush, { + 'application/json': lokiPushJSONParser, + 'application/x-protobuf': lokiPushProtoParser, + '*': lokiPushJSONParser + })) + .post('/:target/_doc', wrapper(handlerElasticPush, { + 'application/json': jsonParser, + '*': rawStringParser + })) + .post('/:target/_create/:id', wrapper(handlerElasticPush, { + 'application/json': jsonParser, + '*': rawStringParser + })) + .put('/:target/_doc/:id', wrapper(handlerElasticPush, { + 'application/json': jsonParser, + '*': rawStringParser + })) + .put('/:target/_create/:id', wrapper(handlerElasticPush, { + 'application/json': jsonParser, + '*': rawStringParser + })) + .post('/_bulk', wrapper(handlerElasticBulk, { + 'application/json': jsonParser, + '*': rawStringParser + })) + .post('/:target/_bulk', wrapper(handlerElasticBulk, { + 'application/json': jsonParser, + '*': rawStringParser + })) + .post('/tempo/api/push', wrapper(handlerTempoPush, { + 'application/json': tempoPushParser, + 'application/x-ndjson': tempoPushNDJSONParser, + '*': tempoPushParser + })) + .post('/tempo/spans', wrapper(handlerTempoPush, { + 'application/json': tempoPushParser, + 'application/x-ndjson': tempoPushNDJSONParser, + '*': tempoPushParser + })) + .post('/api/v2/spans', wrapper(handlerTempoPush, { + 'application/json': tempoPushParser, + 'application/x-ndjson': tempoPushNDJSONParser, + '*': tempoPushParser + })) + .get('/api/traces/:traceId', wrapper(handlerTempoTraces)) + .get('/api/traces/:traceId/:json', wrapper(handlerTempoTraces)) + .get('/tempo/api/traces/:traceId', wrapper(handlerTempoTraces)) + .get('/tempo/api/traces/:traceId/:json', wrapper(handlerTempoTraces)) + .get('/api/echo', wrapper(handlerTempoEcho)) + .get('/tempo/api/echo', wrapper(handlerTempoEcho)) + .ws('/loki/api/v1/tail', wsWrapper(handlerTail)) + .get('/config', () => new Response('not supported')) + .get('/metrics', () => new Response('not supported')) + .get('/influx/api/v2/write/health', () => new Response('ok')) + + + const fastify = { + get: (path, hndl, parsers) => { + app.get(path, wrapper(hndl, parsers)) + }, + post: (path, hndl, parsers) => { + app.post(path, wrapper(hndl, parsers)) + }, + put: (path, hndl, parsers) => { + app.put(path, wrapper(hndl, parsers)) + }, + delete: (path, hndl, parsers) => { + app.delete(path, wrapper(hndl, parsers)) + } + } + + fastify.get('/api/search/tags', handlerTempoLabel) + fastify.get('/tempo/api/search/tags', handlerTempoLabel) + + /* Tempo Tag Value Handler */ + fastify.get('/api/search/tag/:name/values', handlerTempoLabelValues) + fastify.get('/tempo/api/search/tag/:name/values', handlerTempoLabelValues) + + /* Tempo Traces Query Handler */ + fastify.get('/api/search', handlerTempoSearch) + fastify.get('/tempo/api/search', handlerTempoSearch) + + /* Tempo Echo Handler */ + fastify.get('/api/echo', handlerTempoEcho) + fastify.get('/tempo/api/echo', handlerTempoEcho) + + /* Telegraf HTTP Bulk handler */ + fastify.post('/telegraf', handlerTelegraf, { + '*': jsonParser + }) + + /* Datadog Log Push Handler */ + fastify.post('/api/v2/logs', handlerDatadogLogPush, { + 'application/json': jsonParser, + '*': rawStringParser + }) + + /* Datadog Series Push Handler */ + + fastify.post('/api/v2/series', handlerDatadogSeriesPush, { + 'application/json': jsonParser, + '*': rawStringParser + }) + + /* Query Handler */ + + fastify.get('/loki/api/v1/query_range', handlerQueryRange) + + /* Label Handlers */ + /* Label Value Handler via query (test) */ + + fastify.get('/loki/api/v1/query', handlerQuery) + + /* Label Handlers */ + fastify.get('/loki/api/v1/label', handlerLabel) + fastify.get('/loki/api/v1/labels', handlerLabel) + + /* Label Value Handler */ + + fastify.get('/loki/api/v1/label/:name/values', handlerLabelValues) + + /* Series Handler - experimental support for both Loki and Prometheus */ + + fastify.get('/loki/api/v1/series', handlerSeries) + + fastify.get('/api/v1/series', handlerPromSeries) + fastify.post('/api/v1/series', handlerPromSeries, { + 'application/x-www-form-urlencoded': wwwFormParser + }) + + /* ALERT MANAGER Handlers */ + fastify.get('/api/prom/rules', handlerGetRules) + fastify.get('/api/prom/rules/:ns/:group', handlerGetGroup) + fastify.post('/api/prom/rules/:ns', handlerPostGroup, { + '*': yamlParser + }) + fastify.delete('/api/prom/rules/:ns/:group', handlerDelGroup) + fastify.delete('/api/prom/rules/:ns', handlerDelNS) + fastify.get('/prometheus/api/v1/rules', handlerPromGetRules) + + /* PROMETHEUS REMOTE WRITE Handlers */ + + fastify.post('/api/v1/prom/remote/write', promWriteHandler, { + 'application/x-protobuf': prometheusPushProtoParser, + 'application/json': jsonParser, + '*': combinedParser(prometheusPushProtoParser, jsonParser) + }) + fastify.post('/api/prom/remote/write', promWriteHandler, { + 'application/x-protobuf': prometheusPushProtoParser, + 'application/json': jsonParser, + '*': combinedParser(prometheusPushProtoParser, jsonParser) + }) + fastify.post('/prom/remote/write', promWriteHandler, { + 'application/x-protobuf': prometheusPushProtoParser, + 'application/json': jsonParser, + '*': combinedParser(prometheusPushProtoParser, jsonParser) + }) + + /* PROMQETHEUS API EMULATION */ + + fastify.post('/api/v1/query_range', handlerPromQueryRange, { + 'application/x-www-form-urlencoded': wwwFormParser + }) + fastify.get('/api/v1/query_range', handlerPromQueryRange) + + fastify.post('/api/v1/query', handlerPromQuery, { + 'application/x-www-form-urlencoded': wwwFormParser + }) + fastify.get('/api/v1/query', handlerPromQuery) + fastify.get('/api/v1/labels', handlerPromLabel) // piggyback on qryn labels + fastify.get('/api/v1/label/:name/values', handlerPromLabelValues) // piggyback on qryn values + fastify.post('/api/v1/labels', handlerPromLabel, { + '*': rawStringParser + }) // piggyback on qryn labels + fastify.post('/api/v1/label/:name/values', handlerPromLabelValues, { + '*': rawStringParser + }) // piggyback on qryn values + + fastify.get('/api/v1/metadata', handlerPromDefault.misc) // default handler TBD + fastify.get('/api/v1/rules', handlerPromDefault.rules) // default handler TBD + fastify.get('/api/v1/query_exemplars', handlerPromDefault.misc) // default handler TBD + fastify.post('/api/v1/query_exemplars', handlerPromDefault.misc, { + 'application/x-www-form-urlencoded': wwwFormParser + }) // default handler TBD + fastify.get('/api/v1/format_query', handlerPromDefault.misc) // default handler TBD + fastify.post('/api/v1/format_query', handlerPromDefault.misc, { + 'application/x-www-form-urlencoded': wwwFormParser + }) // default handler TBD + fastify.get('/api/v1/status/buildinfo', handlerPromDefault.buildinfo) // default handler TBD + + /* NewRelic Log Handler */ + + fastify.post('/log/v1', handlerNewrelicLogPush, { + 'text/plain': jsonParser, + '*': jsonParser + }) + + /* INFLUX WRITE Handlers */ + + fastify.post('/write', handlerInfluxWrite, { + '*': rawStringParser + }) + fastify.post('/influx/api/v2/write', handlerInfluxWrite, { + '*': rawStringParser + }) + /* INFLUX HEALTH Handlers */ + + fastify.get('/health', handlerInfluxHealth) + fastify.get('/influx/health', handlerInfluxHealth) + + + fastify.post('/v1/traces', handlerOTLPPush, { + '*': otlpPushProtoParser + }) + + + + + app.use(404, wrapper(handle404)) + app.port = process.env.PORT || 3100 + app.hostname = process.env.HOST || '0.0.0.0' + app.listen() +} diff --git a/qryn.js b/qryn_node.js similarity index 98% rename from qryn.js rename to qryn_node.js index f82f6e83..183348ce 100755 --- a/qryn.js +++ b/qryn_node.js @@ -118,7 +118,11 @@ let fastify = require('fastify')({ if (process.env.FASTIFY_METRICS) { const metricsPlugin = require('fastify-metrics') fastify.register(metricsPlugin, { endpoint: '/metrics' }) + } else { + fastify.get('/metrics', () => 'not supported') } + fastify.get('/config', () => 'not supported') + fastify.get('/influx/api/v2/write/health', () => 'ok') /* CORS Helper */ const CORS = process.env.CORS_ALLOW_ORIGIN || '*' fastify.register(require('@fastify/cors'), { diff --git a/qryn_node_wrapper.js b/qryn_node_wrapper.js new file mode 100644 index 00000000..f7fe3b15 --- /dev/null +++ b/qryn_node_wrapper.js @@ -0,0 +1,10 @@ +module.exports.init = () => { + require('./qryn_node') +} +module.exports.bun = () => { + try { + return Bun + } catch (e) { + return false + } +}