Skip to content

Commit

Permalink
feat: first pass horribly broken 2-way p2p chat.
Browse files Browse the repository at this point in the history
  • Loading branch information
zicklag committed Dec 30, 2024
1 parent c531e2b commit 45d1d45
Show file tree
Hide file tree
Showing 6 changed files with 216 additions and 51 deletions.
6 changes: 3 additions & 3 deletions src/client/initMatrix.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import { cryptoCallbacks } from './state/secretStorageKeys';

global.Olm = Olm;

if (import.meta.env.PROD) {
logger.disableAll();
}
// if (import.meta.env.PROD) {
logger.disableAll()
// }

type Session = {
baseUrl: string;
Expand Down
1 change: 0 additions & 1 deletion src/matrix-shim/data.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
/* eslint-disable no-restricted-syntax */
/* eslint-disable max-classes-per-file */
/* eslint-disable @typescript-eslint/no-explicit-any */
import _ from 'lodash';
import { IRoomEvent, IStateEvent } from 'matrix-js-sdk';

type Message = {
Expand Down
104 changes: 101 additions & 3 deletions src/matrix-shim/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ export class MatrixShim {

agent?: Agent;

connectionManager: PeerjsConnectionManager = new PeerjsConnectionManager();
connectionManager: PeerjsConnectionManager;

webrtcPeerConns: { [did: string]: DataConnection } = {};

Expand Down Expand Up @@ -241,10 +241,40 @@ export class MatrixShim {
this.kvdb = kvdb;
this.keypair = keypair;

this.connectionManager.openHandlers.push(() => {
this.setPeerIdRecordInPds();
// Whenever a peer is opened, set that PeerId as our current peer ID in our AtProto PDS.
this.connectionManager = new PeerjsConnectionManager({
peerOpenHandlers: [
() => {
this.setPeerIdRecordInPds();
this.connectToPeers();
},
],
peerConnectHandlers: [
(transport) => {
(async () => {
for await (const message of transport) {
for (const roomId of this.data.roomIds()) {
const room = this.data.rooms[roomId];
if (room.direct) {
if (this.oauthSession) {
this.data.roomSendMessage(
roomId,
room.members[0].id,
crypto.randomUUID(),
new TextDecoder().decode(message)
);
this.changes.notify();
}
}
}
}
})();
},
],
});

this.connectToPeers();

const router = AutoRouter();

router.get('/_matrix/custom/authchecktest', () => {
Expand Down Expand Up @@ -478,6 +508,7 @@ export class MatrixShim {
data.is_direct
);
this.changes.notify();
this.connectToPeers();
return { room_id: roomId };
});

Expand All @@ -500,6 +531,11 @@ export class MatrixShim {
params.txid,
content.body || '[unknown body]'
);

for (const transport of this.connectionManager.transports) {
transport.send(new TextEncoder().encode(content.body));
}

this.changes.notify();

return {
Expand Down Expand Up @@ -583,6 +619,68 @@ export class MatrixShim {
this.agent = new Agent(this.oauthSession);
this.userHandle = (await resolveDid(this.oauthSession.did)) || this.oauthSession.did;
this.kvdb.set('did', this.oauthSession.did);
this.connectToPeers();
}

async getPeerIdForDid(did: string): Promise<string | undefined> {
if (this.agent && this.oauthSession) {
const record = await this.agent.com.atproto.repo.getRecord(
{
collection: 'peer.pigeon.muni.town',
repo: did,
rkey: 'self',
},
{ headers: { 'atproto-proxy': `${did}#atproto_pds` } }
);

return (record.data.value as any)?.id;
}

return undefined;
}

/** Connect to all peers */
async connectToPeers() {
console.info('Connecting to peers');
const membersList: Set<string> = new Set();
for (const roomId of this.data.roomIds()) {
const room = this.data.rooms[roomId];
membersList.add(room.owner.id);
room.members.forEach((x) => membersList.add(x.id));
}
if (this.oauthSession) membersList.delete(this.oauthSession.did);

console.log('Peer dids:', membersList);

const ids = await Promise.all(
[...membersList.values()].map(async (x) => [x, await this.getPeerIdForDid(x)])
);
console.log('Peer ids:', ids);

for (const [did, peerId] of ids) {
if (peerId) {
console.info('Connecting to peer', peerId);
const transport = await this.connectionManager.connect(peerId);
(async () => {
for await (const message of transport) {
for (const roomId of this.data.roomIds()) {
const room = this.data.rooms[roomId];
if (room.direct && room.members.some((x) => x.id === did)) {
if (this.oauthSession) {
this.data.roomSendMessage(
roomId,
did!,
crypto.randomUUID(),
new TextDecoder().decode(message)
);
this.changes.notify();
}
}
}
}
})();
}
}
}

/**
Expand Down
83 changes: 58 additions & 25 deletions src/matrix-shim/peerjsFrontend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,27 @@ export class PeerjsFrontendManager {
constructor() {
this.peer = new Peer();
this.peer.on('open', (id) => {
console.trace('Peer opened:', id);
console.info('Peer opened:', id);
const message: PeerjsFrontendMessage = { type: 'peerOpened', peerId: id };
this.sender.postMessage(message);
});

this.peer.on('disconnected', () => {
console.error('Peer disconnected.');
});
this.peer.on('error', (error) => {
console.error('Peer error', error);
});

// When we receive a connection from outside
this.peer.on('connection', (conn) => {
this.addConnectionDataCloseHandlers(conn);

console.info('Incomming peer connection', conn);

// Add the connection to the list
this.connections[conn.connectionId] = conn;

// And send a connected event to the service worker
const m: PeerjsFrontendMessage = {
type: 'incomingConnected',
Expand All @@ -51,6 +63,7 @@ export class PeerjsFrontendManager {

// When the peer closes
this.peer.on('close', () => {
console.info('Peer closed');
// Tell the service worker
const m: PeerjsFrontendMessage = {
type: 'peerClosed',
Expand All @@ -60,7 +73,7 @@ export class PeerjsFrontendManager {
});

// When we receive a message from our service worker
this.sender.addEventListener('message', (event) => {
this.receiver.addEventListener('message', (event) => {
const message: PeerjsBackendMessage = event.data;

// If we should connect to another peer
Expand All @@ -71,10 +84,15 @@ export class PeerjsFrontendManager {
// good.

// Create the connection
console.info('Connecting to peer:', message.remotePeerId);
const conn = this.peer.connect(message.remotePeerId, { reliable: true });
console.info('Conencted to peer:', conn.peer, conn);

this.connections[conn.connectionId] = conn;

// When the connection opens
conn.on('open', () => {
console.info('Connection opened', conn.connectionId);
// Tell the service worker the connection has opened.
const m: PeerjsFrontendMessage = {
type: 'connOpened',
Expand All @@ -85,34 +103,15 @@ export class PeerjsFrontendManager {
this.sender.postMessage(m);
});

// When the connection has data
conn.on('data', (data) => {
// Tell the service worker the connection has opened.
const m: PeerjsFrontendMessage = {
type: 'connData',
peerId: this.peer.id,
connectionId: conn.connectionId,
data,
};
this.sender.postMessage(m);
});

// When the connection closees
conn.on('close', () => {
// Tell the service worker the connection has opened.
const m: PeerjsFrontendMessage = {
type: 'connClosed',
peerId: this.peer.id,
connectionId: conn.connectionId,
};
this.sender.postMessage(m);
});
this.addConnectionDataCloseHandlers(conn);

// If the service worker wants us to send data
} else if (message.type === 'sendData' && message.peerId === this.peer.id) {
} else if (message.type === 'sendData') {
console.log('wants to send', message);
// Get the connection and send it
const conn = this.connections[message.connectionId];
if (conn) {
console.info('Sending data to ', conn.peer, message.data);
conn.send(message.data);
}
} else if (message.type === 'getPeerId') {
Expand All @@ -125,4 +124,38 @@ export class PeerjsFrontendManager {
}
});
}

addConnectionDataCloseHandlers(conn: DataConnection) {
conn.on('error', (error) => {
console.error('Peer connection error', error);
});
conn.on('iceStateChanged', (state) => {
console.log('Connection state change', state);
});

// When the connection has data
conn.on('data', (data) => {
console.info('Connection data', conn.connectionId, data);
// Tell the service worker the connection has opened.
const m: PeerjsFrontendMessage = {
type: 'connData',
peerId: this.peer.id,
connectionId: conn.connectionId,
data,
};
this.sender.postMessage(m);
});

// When the connection closees
conn.on('close', () => {
console.info('Connection closed', conn.connectionId);
// Tell the service worker the connection has opened.
const m: PeerjsFrontendMessage = {
type: 'connClosed',
peerId: this.peer.id,
connectionId: conn.connectionId,
};
this.sender.postMessage(m);
});
}
}
Loading

0 comments on commit 45d1d45

Please sign in to comment.