-
Notifications
You must be signed in to change notification settings - Fork 8
/
index.ts
183 lines (150 loc) · 5.3 KB
/
index.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
type ConnInfo = Deno.ServeHandlerInfo
interface ClientInfo {
priAddr: string,
chanName: string,
strategy?: string[],
nPlan?: number,
tsAddr?: string,
tsCap?: number,
}
interface AddrPair {
pubAddr: string,
priAddr: string,
}
interface ReplyInfo {
peerAddrs: AddrPair[],
strategy?: string[],
peerNPlan?: number,
tsAddr?: string,
tsCap?: number,
}
async function handleExchangeV2(req: Request, connInfo: ConnInfo): Promise<Response> {
if (req.method != "POST")
return new Response("Invalid method", { status: 405 })
if (req.headers.get("Content-Type") != "application/octet-stream")
return new Response("Invalid content type", { status: 415 })
const pubAddr = joinHostPort(connInfo.remoteAddr as Deno.NetAddr)
const conn = req.body!.getReader({ mode: "byob" })
const { priAddr, chanName, nPlan = 1, ...otherInfo }: ClientInfo = JSON.parse(
new TextDecoder().decode(await receivePacket(conn))
)
const reply: ReplyInfo = {
peerAddrs: [{ pubAddr, priAddr }],
peerNPlan: nPlan,
...otherInfo,
}
const x0 = JSON.stringify(reply)
//console.log(`accepted from ${x0}`)
const x1 = await exchange(chanName, x0, conn)
if (x1 == "")
return new Response("")
//console.log(`exchanged, got ${x1}`)
const msg = marshallPacket(new TextEncoder().encode(x1))
return new Response(msg)
}
// (priAddr0|chanName) -> pubAddr1|priAddr1
async function handleExchangeV1(req: Request, connInfo: ConnInfo): Promise<Response> {
if (req.method != "POST")
return new Response("Invalid method", { status: 405 })
if (req.headers.get("Content-Type") != "application/octet-stream")
return new Response("Invalid content type", { status: 415 })
const pubAddr = joinHostPort(connInfo.remoteAddr as Deno.NetAddr)
const conn = req.body!.getReader({ mode: "byob" })
const [priAddr, chanName] = new TextDecoder().decode(
await receivePacket(conn)
).split('|')
const x0 = `${pubAddr}|${priAddr}`
//console.log(`accepted from ${x0}`)
const x1 = await exchange(chanName, x0, conn)
if (x1 == "")
return new Response("")
//console.log(`exchanged, got ${x1}`)
const msg = marshallPacket(new TextEncoder().encode(x1))
return new Response(msg)
}
type ResolveStr = (x: string) => void
const inbox = new Map<string, {xa: string, xb_resolve: ResolveStr}>()
async function exchange(name: string, x0: string, conn: ReadableStreamBYOBReader): Promise<string> {
if (inbox.has(name)) { // the other party has set up an in-memory exchange
const { xa: x1, xb_resolve: x0_resolve } = inbox.get(name)!
x0_resolve(x0)
inbox.delete(name)
return x1
}
const [x1, source] = await Promise.any([
// attempt to set up an in-memory exchange
new Promise((resolve: ResolveStr) => {
inbox.set(name, { xa: x0, xb_resolve: resolve })
}).then((x) => [x, "in-memory"]),
// attempt to do cross-regional exchange
exchangeViaBroadcastChannel(name, x0).then((x) => [x, "broadcast"]),
// if client closes early
clientClosed(conn).then(() => ["", "cancel"]),
])
if (source != "in-memory") { // cancel in-memory exchange
inbox.delete(name)
}
if (source != "broadcast") {
channels.get(name)?.close()
channels.delete(name)
}
return x1
}
const channels = new Map<string, BroadcastChannel>()
async function exchangeViaBroadcastChannel(name: string, x0: string): Promise<string> {
const channel = new BroadcastChannel(name)
channels.set(name, channel)
channel.postMessage(x0) // if the other party has already subscribed
const x1 = await (new Promise((resolve: ResolveStr) => {
channel.onmessage = (event: MessageEvent) => resolve(event.data)
}))
channel.postMessage(x0) // if the other party subscribes after the first post
return x1
}
async function clientClosed(conn: ReadableStreamBYOBReader): Promise<void> {
await conn.read(new Uint8Array(1))
//console.log("client early close")
}
function joinHostPort(addr: Deno.NetAddr): string {
if (addr.hostname.includes(":"))
return `[${addr.hostname}]:${addr.port}`
return `${addr.hostname}:${addr.port}`
}
async function receivePacket(conn: ReadableStreamBYOBReader): Promise<Uint8Array> {
let buf = (await conn.read(new Uint8Array(2))).value!
const lenCap = 1e3
const plen = (buf[0]<<8) | buf[1] // uint16, BE
if (plen == 0 || plen > lenCap) {
console.error(`received suspicious packet header declearing len=${plen}`)
throw new Deno.errors.InvalidData
}
buf = (await conn.read(new Uint8Array(plen))).value!
return buf
}
function marshallPacket(data: Uint8Array): Uint8Array {
const l = data.length
const p = new Uint8Array(2 + l)
p.set([(l>>8)&0xff, l&0xff]) // uint16, BE
p.set(data, 2)
return p
}
async function handler(req: Request, connInfo: ConnInfo): Promise<Response> {
const url = new URL(req.url)
switch (url.pathname) {
case "/get":
return new Response(
`
curl -fsSL "https://bina.egoist.dev/contextualist/acp${url.search}" | sh
if [ $# -eq 0 ]; then acp --setup; else acp "$@"; fi
`,
{ headers: { "Content-Type": "text/plain; charset=utf-8" } }
)
case "/v2/exchange":
return await handleExchangeV2(req, connInfo)
case "/exchange":
return await handleExchangeV1(req, connInfo)
default:
return new Response("Not found", { status: 404 })
}
}
Deno.serve(handler)