feat: add rendezvous server for p2p prover via websocket (#34)

This commit is contained in:
tsukino
2024-11-06 01:44:09 -05:00
committed by GitHub
parent abf3720f5f
commit cd4b22a6f0
3 changed files with 1414 additions and 224 deletions

1475
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -32,6 +32,7 @@
"@pinata/sdk": "^2.1.0",
"@types/node-forge": "^1.3.11",
"@web-std/file": "^3.0.3",
"async-mutex": "^0.5.0",
"buffer": "^6.0.3",
"charwise": "^3.0.1",
"classnames": "^2.3.2",
@@ -44,7 +45,9 @@
"isomorphic-fetch": "^3.0.0",
"multiformats": "^13.1.0",
"node-forge": "^1.3.1",
"node-html-to-image": "^4.0.0",
"node-loader": "^2.0.0",
"qs": "^6.12.1",
"react": "^18.2.0",
"react-dom": "^18.2.0",
"react-redux": "^8.1.2",
@@ -56,7 +59,8 @@
"stream": "^0.0.2",
"tailwindcss": "^3.3.3",
"tlsn-js": "0.1.0-alpha.7.1",
"tlsn-js-v5": "npm:tlsn-js@0.1.0-alpha.5.4"
"tlsn-js-v5": "npm:tlsn-js@0.1.0-alpha.5.4",
"ws": "^8.16.0"
},
"devDependencies": {
"@babel/core": "^7.20.12",

View File

@@ -15,9 +15,18 @@ import { verify } from '../rs/verifier/index.node';
import { verify as verifyV7 } from '../rs/0.1.0-alpha.7/index.node';
import { Attestation } from '../web/utils/types/types';
import { convertNotaryWsToHttp } from '../web/utils';
import { IncomingMessage } from 'node:http';
import { createServer } from 'http';
import { WebSocketServer, type RawData, type WebSocket } from 'ws';
import crypto from 'crypto';
import qs from 'qs';
import { Mutex } from 'async-mutex';
const mutex = new Mutex();
const app = express();
const port = 3000;
const port = process.env.PORT || 3000;
const server = createServer(app);
const wss = new WebSocketServer({ server });
app.use((req, res, next) => {
res.setHeader('Access-Control-Allow-Origin', '*');
@@ -221,13 +230,157 @@ app.get('*', (req, res) => {
`);
});
app.listen(port, () => {
server.listen(port, () => {
console.log(`explorer server listening on port ${port}`);
});
const clients: Map<string, WebSocket> = new Map<string, WebSocket>();
const pairs: Map<string, string> = new Map<string, string>();
wss.on('connection', (client: WebSocket, request: IncomingMessage) => {
// you have a new client
console.log('New Connection');
// add this client to the clients array
const query = qs.parse((request.url || '').replace(/\/\?/g, ''));
const clientId = (query?.clientId as string) || crypto.randomUUID();
clients.set(clientId, client);
console.log(`New Connection - ${clientId}`);
if (!clientId.includes(':proof')) {
client.send(
bufferify({
method: 'client_connect',
params: { clientId },
}),
);
}
// set up client event listeners:
client.on('message', onClientMessage);
client.on('close', endClient);
async function endClient() {
clients.delete(clientId);
if (!clientId.includes(':proof')) {
const pair = pairs.get(clientId);
if (pair) {
pairs.delete(pair);
pairs.delete(clientId);
await send(
pair,
bufferify({
method: 'pair_disconnect',
params: { pairId: clientId },
}),
);
}
}
console.log(`Connection closed - ${clientId}`);
}
async function onClientMessage(rawData: RawData) {
try {
const msg = safeParseJSON(rawData.toString());
if (!msg) {
const [cid] = clientId.split(':');
const pairedClientId = pairs.get(cid);
await send(pairedClientId + ':proof', rawData);
return;
}
const { to } = msg.params;
switch (msg.method) {
case 'pair_request':
case 'pair_request_sent':
case 'pair_request_cancel':
case 'pair_request_cancelled':
case 'pair_request_reject':
case 'pair_request_rejected':
case 'pair_request_accept':
case 'request_proof':
case 'request_proof_by_hash':
case 'request_proof_by_hash_failed':
case 'proof_request_received':
case 'proof_request_accept':
case 'verifier_started':
case 'prover_setup':
case 'prover_started':
case 'proof_request_start':
case 'proof_request_cancelled':
case 'proof_request_rejected':
case 'proof_request_cancel':
case 'proof_request_reject':
case 'proof_request_end':
await send(to, rawData);
break;
case 'pair_request_success': {
if (await send(to, rawData)) {
pairs.set(to, clientId);
pairs.set(clientId, to);
}
break;
}
default:
console.log('unknown msg', msg);
break;
}
} catch (e) {
console.error(e);
}
}
// This function broadcasts messages to all webSocket clients
function broadcast(data: string) {
clients.forEach((c) => c.send(data));
}
async function send(clientId: string, data: RawData) {
return mutex.runExclusive(async () => {
const res = await new Promise((resolve) => {
const target = clients.get(clientId);
if (!target) {
client.send(
bufferify({
error: {
message: `client "${clientId}" does not exist`,
},
}),
(err) => {
resolve(false);
},
);
} else {
target.send(data, (err) => {
resolve(!err);
});
}
});
return res;
});
}
});
function bufferify(data: any) {
return Buffer.from(JSON.stringify(data));
}
async function fetchPublicKeyFromNotary(notaryUrl: string) {
const res = await fetch(notaryUrl + '/info');
const json: any = await res.json();
if (!json.publicKey) throw new Error('invalid response');
return json.publicKey;
}
function safeParseJSON(data: string) {
try {
return JSON.parse(data);
} catch (e) {
return null;
}
}