Skip to content

Commit

Permalink
Merge pull request #363 from metrico/bun_support
Browse files Browse the repository at this point in the history
3.0: Bun support
  • Loading branch information
akvlad authored Oct 27, 2023
2 parents addc20d + 56c1d70 commit 79c374c
Show file tree
Hide file tree
Showing 40 changed files with 799 additions and 172 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/node-clickhouse.js.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:

strategy:
matrix:
node-version: [18, 16.x]
node-version: [18, 16.x, 20]
# See supported Node.js release schedule at https://nodejs.org/en/about/releases/

steps:
Expand All @@ -43,4 +43,4 @@ jobs:
CLICKHOUSE_TSDB: loki
INTEGRATION_E2E: 1
CLOKI_EXT_URL: 127.0.0.1:3100
run: node qryn.js >/dev/stdout & npm run test --forceExit
run: node qryn.mjs >/dev/stdout & npm run test --forceExit
15 changes: 15 additions & 0 deletions Dockerfile_bun
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Qryn
FROM oven/bun:1-alpine

# BUILD FORCE
ENV BUILD 703030
ENV PORT 3100

COPY . /app
WORKDIR /app
RUN bun install

# Expose Ports
EXPOSE 3100

CMD [ "bun", "qryn.mjs" ]
10 changes: 10 additions & 0 deletions common.js
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,13 @@ module.exports.isCustomSamplesOrderingRule = () => {
module.exports.CORS = process.env.CORS_ALLOW_ORIGIN || '*'

module.exports.clusterName = process.env.CLUSTER_NAME

module.exports.readonly = process.env.READONLY || false

module.exports.bun = () => {
try {
return Bun
} catch (err) {
return false
}
}
2 changes: 1 addition & 1 deletion docker/docker-compose-centos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@ services:
container_name: centos
volumes:
- ../:/opt/qryn
entrypoint: bash -c 'cd ~ ; cp -rf /opt/qryn . ; cd qryn; ls -la ; rm -rf node_modules ; npm install ; CLICKHOUSE_DB=loki CLICKHOUSE_TSDB=loki INTEGRATION_E2E=1 CLICKHOUSE_SERVER=clickhouse-seed node qryn.js'
entrypoint: bash -c 'cd ~ ; cp -rf /opt/qryn . ; cd qryn; ls -la ; rm -rf node_modules ; npm install ; CLICKHOUSE_DB=loki CLICKHOUSE_TSDB=loki INTEGRATION_E2E=1 CLICKHOUSE_SERVER=clickhouse-seed node qryn.mjs'
168 changes: 168 additions & 0 deletions lib/bun_wrapper.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
const { Transform } = require('stream')
const log = require('./logger')
const { EventEmitter } = require('events')

class BodyStream extends Transform {
_transform (chunk, encoding, callback) {
callback(null, chunk)
}

once (event, listerer) {
const self = this
const _listener = (e) => {
listerer(e)
self.removeListener(event, _listener)
}
this.on(event, _listener)
}
}

const wrapper = (handler, parsers) => {
/**
* @param ctx {Request}
*/
const res = async (ctx, server) => {
let response = ''
let status = 200
let reqBody = ''
let headers = {}
log.info(`${ctx.url}`)

const stream = new BodyStream()
setTimeout(async () => {
if (!ctx.body) {
stream.end()
return
}
for await (const chunk of ctx.body) {
stream.write(chunk)
}
stream.end()
})
const req = {
headers: Object.fromEntries(ctx.headers.entries()),
raw: stream,
log: log,
params: ctx.params || {},
query: {}
}
for (const [key, value] of (new URL(ctx.url)).searchParams) {
if (!(key in req.query)) {
req.query[key] = value
continue
}
req.query[key] = Array.isArray(req.query[key])
? [...req.query[key], value]
: [req.query[key], value]
}
const res = {
send: (msg) => {
response = msg
},
code: (code) => {
status = code
return res
},
header: (key, value) => {
headers[key] = value
return res
},
headers: (hdrs) => {
headers = { ...headers, ...hdrs }
return res
}
}

if (parsers) {
const contentType = (ctx.headers.get('Content-Type') || '')
let ok = false
for (const [type, parser] of Object.entries(parsers)) {
if (type !== '*' && contentType.indexOf(type) > -1) {
log.debug(`parsing ${type}`)
reqBody = await parser(req, stream)
ok = true
log.debug(`parsing ${type} ok`)
}
}
if (!ok && parsers['*']) {
log.debug('parsing *')
reqBody = await parsers['*'](req, stream)
ok = true
log.debug('parsing * ok')
}
if (!ok) {
throw new Error('undefined content type ' + contentType)
}
}

req.body = reqBody || stream

let result = handler(req, res)
if (result && result.then) {
result = await result
}
if (result && result.on) {
response = ''
result.on('data', (d) => {
response += d
})
await new Promise((resolve, reject) => {
result.on('end', resolve)
result.on('error', reject)
result.on('close', resolve)
})
result = null
}
if (result) {
response = result
}
if (response instanceof Object && typeof response !== 'string' && !Buffer.isBuffer(response)) {
response = JSON.stringify(response)
}
return new Response(response, { status: status, headers: headers })
}
return res
}

const wsWrapper = (handler) => {
/**
* @param ctx {Request}
*/
const res = {
open: async (ctx, server) => {
const req = {
headers: Object.fromEntries(ctx.data.ctx.headers.entries()),
log: log,
query: {}
}
for (const [key, value] of (new URL(ctx.data.ctx.url)).searchParams) {
if (!(key in req.query)) {
req.query[key] = value
continue
}
req.query[key] = Array.isArray(req.query[key])
? [...req.query[key], value]
: [req.query[key], value]
}

ctx.closeEmitter = new EventEmitter()
ctx.closeEmitter.send = ctx.send.bind(ctx)

const ws = {
socket: ctx.closeEmitter
}

const result = handler(ws, { query: req.query })
if (result && result.then) {
await result
}
},
close: (ctx) => { ctx.closeEmitter.emit('close') }
}
return res
}

module.exports = {
wrapper,
wsWrapper
}
45 changes: 28 additions & 17 deletions lib/db/clickhouse.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,6 @@ const dist = clusterName ? '_dist' : ''

/* DB Helper */
const ClickHouse = require('@apla/clickhouse')
const clickhouseOptions = {
host: process.env.CLICKHOUSE_SERVER || 'localhost',
port: process.env.CLICKHOUSE_PORT || 8123,
auth: process.env.CLICKHOUSE_AUTH || 'default:',
protocol: process.env.CLICKHOUSE_PROTO ? process.env.CLICKHOUSE_PROTO + ':' : 'http:',
readonly: !!process.env.READONLY,
queryOptions: { database: process.env.CLICKHOUSE_DB || 'cloki' }
}

const transpiler = require('../../parser/transpiler')
const rotationLabels = process.env.LABELS_DAYS || 7
Expand All @@ -33,9 +25,9 @@ const axios = require('axios')
const { samplesTableName, samplesReadTableName } = UTILS
const path = require('path')
const { Transform } = require('stream')
const { CORS } = require('../../common')

const protocol = process.env.CLICKHOUSE_PROTO || 'http'
const { CORS, bun } = require('../../common')
const clickhouseOptions = require('./clickhouse_options').databaseOptions
const { getClickhouseUrl } = require('./clickhouse_options')

// External Storage Policy for Tables (S3, MINIO)
const storagePolicy = process.env.STORAGE_POLICY || false
Expand Down Expand Up @@ -76,7 +68,8 @@ const conveyor = {
let throttler = null
const resolvers = {}
const rejectors = {}
if (isMainThread) {
let first = false
if (isMainThread && !bun()) {
throttler = new Worker(path.join(__dirname, 'throttler.js'))
throttler.on('message', (msg) => {
switch (msg.status) {
Expand All @@ -90,8 +83,29 @@ if (isMainThread) {
delete resolvers[msg.id]
delete rejectors[msg.id]
})
} else if (isMainThread && !first) {
first = true
const _throttler = require('./throttler')
throttler = {
on: _throttler.on,
postMessage: _throttler.postMessage,
removeAllListeners: _throttler.removeAllListeners,
terminate: _throttler.terminate
}
_throttler.init()
throttler.on('message', (msg) => {
switch (msg.status) {
case 'ok':
resolvers[msg.id]()
break
case 'err':
rejectors[msg.id](new Error('Database push error'))
break
}
delete resolvers[msg.id]
delete rejectors[msg.id]
})
}

// timeSeriesv2Throttler.start();

/* Cache Helper */
Expand Down Expand Up @@ -348,10 +362,6 @@ function pushOTLP (traces) {
})
}

function getClickhouseUrl () {
return `${protocol}://${clickhouseOptions.auth}@${clickhouseOptions.host}:${clickhouseOptions.port}`
}

/**
* @param query {{
* query: string,
Expand Down Expand Up @@ -455,6 +465,7 @@ const queryTempoScanV2 = async function (query) {
}
const limit = query.limit ? `LIMIT ${parseInt(query.limit)}` : ''
const sql = `${select} ${from} WHERE ${where.join(' AND ')} ORDER BY timestamp_ns DESC ${limit} FORMAT JSON`
console.log(sql)
const resp = await rawRequest(sql, null, process.env.CLICKHOUSE_DB || 'cloki')
return resp.data.data ? resp.data.data : JSON.parse(resp.data).data
}
Expand Down
22 changes: 22 additions & 0 deletions lib/db/clickhouse_options.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
const UTILS = require('../utils')
const { samplesTableName, samplesReadTableName } = UTILS

const clickhouseOptions = {
host: process.env.CLICKHOUSE_SERVER || 'localhost',
port: process.env.CLICKHOUSE_PORT || 8123,
auth: process.env.CLICKHOUSE_AUTH || 'default:',
protocol: process.env.CLICKHOUSE_PROTO ? process.env.CLICKHOUSE_PROTO + ':' : 'http:',
readonly: !!process.env.READONLY,
queryOptions: { database: process.env.CLICKHOUSE_DB || 'cloki' }
}

function getClickhouseUrl () {
return `${clickhouseOptions.protocol}//${clickhouseOptions.auth}@${clickhouseOptions.host}:${clickhouseOptions.port}`
}

module.exports = {
samplesTableName,
samplesReadTableName,
getClickhouseUrl,
databaseOptions: clickhouseOptions
}
Loading

0 comments on commit 79c374c

Please sign in to comment.