-
Notifications
You must be signed in to change notification settings - Fork 1
/
mod.ts
117 lines (100 loc) · 3.02 KB
/
mod.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
/**
* A Kafka client for Deno built using WebAssembly.
*
* ### Example reader
*
* ```typescript
* const kafkaGoSaur = new KafkaGoSaur();
* const reader = await kafkaGoSaur.createReader({
* brokers: ["localhost:9092"],
* topic: "test-0",
* });
*
* const readMsg = await reader.readMessage();
* ```
*
* ### Example writer
*
* ```typescript
* const kafkaGoSaur = new KafkaGoSaur();
* const writer = await kafkaGoSaur.createWriter({
* broker: "localhost:9092",
* topic: "test-0",
* });
*
* const enc = new TextEncoder();
* const msgs = [{ value: enc.encode("value") }];
*
* await writer.writeMessages(msgs);
* ```
*
* @module
*/
// @deno-types="./global.d.ts"
import "./lib/wasm_exec.js";
import { deadline, DeadlineError, delay } from "./deps.ts";
import { dial as dialNode } from "./net/node-connection.ts";
import { dial as dialDeno } from "./net/deno-connection.ts";
import { KafkaDialer, KafkaDialerConfig } from "./dialer.ts";
import { KafkaReader, KafkaReaderConfig } from "./reader.ts";
import { KafkaWriter, KafkaWriterConfig } from "./writer.ts";
const runGoWasm = async (wasmFilePath: string): Promise<void> => {
const go = new global.Go();
const wasmBytes = await Deno.readFile(wasmFilePath);
const instiatedSource = await WebAssembly.instantiate(
wasmBytes,
go.importObject,
);
return go.run(instiatedSource.instance);
};
const untilGloballyDefined = (
key: string,
): Promise<unknown> => {
const backoffMs = 30;
const maxDelayMs = 1000;
const globalGoInteropKey = "kafkagosaur.go";
const loop = async (): Promise<unknown> => {
const interopObject =
(globalThis as Record<string, unknown>)[globalGoInteropKey];
if (interopObject !== undefined) {
return Promise.resolve((interopObject as Record<string, unknown>)[key]);
} else {
await delay(backoffMs);
return loop();
}
};
return deadline(loop(), maxDelayMs).catch((e) => {
if (e instanceof DeadlineError) {
Promise.reject(`Global key ${key} undefined`);
} else Promise.reject(e);
});
};
export const setOnGlobal = (value: unknown) => {
const globalDenoInteropKey = "kafkagosaur.deno";
(globalThis as Record<string, unknown>)[globalDenoInteropKey] = value;
};
class KafkaGoSaur {
constructor() {
setOnGlobal({ dialNode, dialDeno });
runGoWasm("./bin/kafkagosaur.wasm");
}
async createDialer(config: KafkaDialerConfig): Promise<KafkaDialer> {
const create = await untilGloballyDefined(
"createDialer",
) as (config: KafkaDialerConfig) => KafkaDialer;
return create(config);
}
async createReader(config: KafkaReaderConfig): Promise<KafkaReader> {
const create = await untilGloballyDefined(
"createReader",
) as (config: KafkaReaderConfig) => KafkaReader;
return create(config);
}
async createWriter(config: KafkaWriterConfig): Promise<KafkaWriter> {
const create = await untilGloballyDefined(
"createWriter",
) as (config: KafkaWriterConfig) => KafkaWriter;
return create(config);
}
}
export default KafkaGoSaur;