diff --git a/.aegir.js b/.aegir.js index fa6dfdb..764ebb3 100644 --- a/.aegir.js +++ b/.aegir.js @@ -1,51 +1,58 @@ -import { spawn, exec } from "child_process"; -import { existsSync } from "fs"; +import { pipe } from 'it-pipe' +import { noise } from '@chainsafe/libp2p-noise' +import { createLibp2p } from 'libp2p' /** @type {import('aegir/types').PartialOptions} */ export default { test: { - async before() { - if (!existsSync("./go-libp2p-webtransport-server/main")) { - await new Promise((resolve, reject) => { - exec('go build -o main main.go', - { cwd: "./go-libp2p-webtransport-server" }, - (error, stdout, stderr) => { - if (error) { - reject(error) - console.error(`exec error: ${error}`); - return; - } - resolve() - }); - }) - } + async before () { + const { generateWebTransportCertificates } = await import('./dist/test/certificate.js') + const { webTransport } = await import('./dist/src/index.js') - const server = spawn('./main', [], { cwd: "./go-libp2p-webtransport-server", killSignal: "SIGINT" }); - server.stderr.on('data', (data) => { - console.log(`stderr: ${data}`, typeof data); + const node = await createLibp2p({ + addresses: { + listen: ['/ip4/0.0.0.0/udp/0/quic/webtransport'] + }, + transports: [webTransport({ + certificates: await generateWebTransportCertificates([ + { shortName: 'C', value: 'DE' }, + { shortName: 'ST', value: 'Berlin' }, + { shortName: 'L', value: 'Berlin' }, + { shortName: 'O', value: '@libp2p/webtransport tests' }, + { shortName: 'CN', value: '127.0.0.1' } + ], [{ + // can be max 14 days according to the spec + days: 13 + }, { + // can be max 14 days according to the spec + days: 13, + // start the second certificate after the first expires + start: new Date(Date.now() + (86400000 * 13)) + }]) + })], + connectionEncryption: [noise()] }) - const serverAddr = await (new Promise((resolve => { - server.stdout.on('data', (data) => { - console.log(`stdout: ${data}`, typeof data); - if (data.includes("addr=")) { - // Parse the addr out - resolve((data + "").match(/addr=([^\s]*)/)[1]) - } - }); - }))) + + await node.start() + + await node.handle('echo', ({ stream }) => { + void pipe(stream, stream) + }) + + const multiaddrs = node.getMultiaddrs() return { - server, + node, env: { - serverAddr + serverAddr: multiaddrs[0].toString() } } }, - async after(_, { server }) { - server.kill("SIGINT") + async after (_, before) { + await before.node.stop() } }, build: { - bundlesizeMax: '18kB' + bundlesizeMax: '97kB' } } diff --git a/README.md b/README.md index 59fca5c..639dbf9 100644 --- a/README.md +++ b/README.md @@ -40,7 +40,7 @@ Loading this module through a script tag will make it's exports available as `Li ## Description -`libp2p-webtransport` is the WebTransport transport implementation compatible with libp2p. +`@libp2p/webtransport` is the WebTransport transport implementation compatible with libp2p. ## Usage diff --git a/package.json b/package.json index b4052b7..ea7dff1 100644 --- a/package.json +++ b/package.json @@ -148,29 +148,44 @@ "lint": "aegir lint", "dep-check": "aegir dep-check", "build": "aegir build", - "test": "aegir test -t browser", + "test": "aegir test -t node -t browser", + "test:node": "aegir test -t node --cov", "test:chrome": "aegir test -t browser --cov", + "test:chrome-webworker": "aegir test -t webworker", "release": "aegir release", "docs": "aegir docs" }, "dependencies": { - "@chainsafe/libp2p-noise": "^11.0.0", + "@chainsafe/libp2p-noise": "^11.0.1", + "@fails-components/webtransport": "^0.1.2", "@libp2p/interface-connection": "^3.0.2", "@libp2p/interface-peer-id": "^2.0.0", "@libp2p/interface-stream-muxer": "^3.0.0", "@libp2p/interface-transport": "^2.0.0", + "@libp2p/interfaces": "^3.0.4", "@libp2p/logger": "^2.0.2", - "@libp2p/peer-id": "^2.0.0", + "@libp2p/peer-id": "^2.0.3", + "@libp2p/utils": "^3.0.2", "@multiformats/multiaddr": "^12.1.0", + "browser-readablestream-to-it": "^2.0.0", "it-stream-types": "^1.0.4", - "multiformats": "^11.0.0", + "multiformats": "^11.0.2", + "node-forge": "^1.3.1", + "p-timeout": "^6.0.0", "uint8arraylist": "^2.3.3" }, "devDependencies": { + "@libp2p/interface-transport-compliance-tests": "^3.0.2", + "@libp2p/peer-id-factory": "^2.0.3", "aegir": "^38.1.7", - "libp2p": "^0.43.2" + "iso-random-stream": "^2.0.2", + "it-pipe": "^2.0.4", + "libp2p": "^0.43.2", + "sinon": "^15.0.0" }, "browser": { - "./dist/src/listener.js": "./dist/src/listener.browser.js" + "./dist/src/listener.js": "./dist/src/listener.browser.js", + "./dist/src/webtransport.js": "./dist/src/webtransport.browser.js", + "./dist/test/certificate.js": "./dist/test/certificate.browser.js" } } diff --git a/src/create-server.ts b/src/create-server.ts new file mode 100644 index 0000000..edfd9bd --- /dev/null +++ b/src/create-server.ts @@ -0,0 +1,127 @@ +import { EventEmitter } from 'events' +import { logger } from '@libp2p/logger' +import { Http3Server, WebTransportSession } from '@fails-components/webtransport' +import pTimeout from 'p-timeout' + +const log = logger('libp2p:webtransport:server') + +export interface WebTransportServer extends EventEmitter { + listening: boolean + sessionTimeout: number + + close: (callback?: () => void) => void + listen: () => void + address: () => { port: number, host: string, family: 'IPv4' | 'IPv6' } | null +} + +class DefaultWebTransportServer extends EventEmitter implements WebTransportServer { + private readonly server: Http3Server + public listening: boolean + /** + * How long in ms to wait for an incoming session to be ready + */ + public sessionTimeout: number + + constructor (init: any) { + super() + + this.server = new Http3Server(init) + this.listening = false + + this.sessionTimeout = 1000 + } + + close (callback?: () => void): void { + if (callback != null) { + this.addListener('close', callback) + } + + this.server.stopServer() + this.server.closed + .then(() => { + this.listening = false + this.emit('close') + }) + .catch((err) => { + this.emit('error', err) + }) + } + + listen (): void { + this.server.startServer() + this.server.ready + .then(() => { + this.listening = true + this.emit('listening') + + this._processIncomingSessions().catch(err => { + this.emit('error', err) + }) + }) + .catch((err) => { + this.emit('error', err) + }) + } + + address (): { port: number, host: string, family: 'IPv4' | 'IPv6' } | null { + return this.server.address() + } + + async _processIncomingSessions (): Promise { + // FIXME: incompatible webtransport implementations + const paths = [ + // Chrome + '/.well-known/libp2p-webtransport?type=noise', + + // @fails-components/webtransport + '/.well-known/libp2p-webtransport' + ] + + await Promise.all( + paths.map(async path => { + const sessionStream = this.server.sessionStream(path) + const sessionReader = sessionStream.getReader() + + while (true) { + const { done, value: session } = await sessionReader.read() + + if (done) { + log('session reader finished') + break + } + + void Promise.resolve() + .then(async () => { + const timeout = pTimeout(session.ready, { + milliseconds: this.sessionTimeout + }) + + try { + await timeout + + this.emit('session', session) + } catch (err) { + log.error('error waiting for session to become ready', err) + } finally { + timeout.clear() + } + }) + } + }) + ) + } +} + +export interface SessionHandler { + (session: WebTransportSession): void +} + +export function createServer (init: any, sessionHandler?: SessionHandler): WebTransportServer { + const server = new DefaultWebTransportServer(init) + + if (sessionHandler != null) { + server.addListener('session', sessionHandler) + } + + return server +} diff --git a/src/index.ts b/src/index.ts index 60342a5..732e411 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,21 +1,29 @@ import { logger } from '@libp2p/logger' import { noise } from '@chainsafe/libp2p-noise' import { Transport, symbol, CreateListenerOptions, DialOptions, Listener } from '@libp2p/interface-transport' -import type { Connection, Direction, MultiaddrConnection, Stream } from '@libp2p/interface-connection' +import type { Connection, MultiaddrConnection } from '@libp2p/interface-connection' import { Multiaddr, protocols } from '@multiformats/multiaddr' import { peerIdFromString } from '@libp2p/peer-id' import { bases, digest } from 'multiformats/basics' import type { MultihashDigest } from 'multiformats/hashes/interface' import type { PeerId } from '@libp2p/interface-peer-id' import type { Duplex, Source } from 'it-stream-types' -import type { StreamMuxerFactory, StreamMuxerInit, StreamMuxer } from '@libp2p/interface-stream-muxer' -import { Uint8ArrayList } from 'uint8arraylist' +import createListener from './listener.js' +import WebTransport from './webtransport.js' +import { inertDuplex, isSubset } from './utils.js' +import { webtransportMuxer } from './muxer.js' +import { AbortError } from '@libp2p/interfaces/errors' const log = logger('libp2p:webtransport') -declare global { - interface Window { - WebTransport: any - } + +/** + * PEM format server certificate and private key + */ +export interface WebTransportCertificate { + privateKey: string + pem: string + hash: MultihashDigest + secret: string } // @ts-expect-error - Not easy to combine these types. @@ -25,161 +33,6 @@ function decodeCerthashStr (s: string): MultihashDigest { return digest.decode(multibaseDecoder.decode(s)) } -// Duplex that does nothing. Needed to fulfill the interface -function inertDuplex (): Duplex { - return { - source: { - [Symbol.asyncIterator] () { - return { - async next () { - // This will never resolve - return await new Promise(() => { }) - } - } - } - }, - sink: async (source: Source) => { - // This will never resolve - return await new Promise(() => { }) - } - } -} - -async function webtransportBiDiStreamToStream (bidiStream: any, streamId: string, direction: Direction, activeStreams: Stream[], onStreamEnd: undefined | ((s: Stream) => void)): Promise { - const writer = bidiStream.writable.getWriter() - const reader = bidiStream.readable.getReader() - await writer.ready - - function cleanupStreamFromActiveStreams (): void { - const index = activeStreams.findIndex(s => s === stream) - if (index !== -1) { - activeStreams.splice(index, 1) - stream.stat.timeline.close = Date.now() - onStreamEnd?.(stream) - } - } - - let writerClosed = false - let readerClosed = false; - (async function () { - const err: Error | undefined = await writer.closed.catch((err: Error) => err) - if (err != null) { - const msg = err.message - if (!(msg.includes('aborted by the remote server') || msg.includes('STOP_SENDING'))) { - log.error(`WebTransport writer closed unexpectedly: streamId=${streamId} err=${err.message}`) - } - } - writerClosed = true - if (writerClosed && readerClosed) { - cleanupStreamFromActiveStreams() - } - })().catch(() => { - log.error('WebTransport failed to cleanup closed stream') - }); - - (async function () { - const err: Error | undefined = await reader.closed.catch((err: Error) => err) - if (err != null) { - log.error(`WebTransport reader closed unexpectedly: streamId=${streamId} err=${err.message}`) - } - readerClosed = true - if (writerClosed && readerClosed) { - cleanupStreamFromActiveStreams() - } - })().catch(() => { - log.error('WebTransport failed to cleanup closed stream') - }) - - let sinkSunk = false - const stream: Stream = { - id: streamId, - abort (_err: Error) { - if (!writerClosed) { - writer.abort() - writerClosed = true - } - stream.closeRead() - readerClosed = true - cleanupStreamFromActiveStreams() - }, - close () { - stream.closeRead() - stream.closeWrite() - cleanupStreamFromActiveStreams() - }, - - closeRead () { - if (!readerClosed) { - reader.cancel().catch((err: any) => { - if (err.toString().includes('RESET_STREAM') === true) { - writerClosed = true - } - }) - readerClosed = true - } - if (writerClosed) { - cleanupStreamFromActiveStreams() - } - }, - closeWrite () { - if (!writerClosed) { - writerClosed = true - writer.close().catch((err: any) => { - if (err.toString().includes('RESET_STREAM') === true) { - readerClosed = true - } - }) - } - if (readerClosed) { - cleanupStreamFromActiveStreams() - } - }, - reset () { - stream.close() - }, - stat: { - direction, - timeline: { open: Date.now() } - }, - metadata: {}, - source: (async function * () { - while (true) { - const val = await reader.read() - if (val.done === true) { - readerClosed = true - if (writerClosed) { - cleanupStreamFromActiveStreams() - } - return - } - - yield new Uint8ArrayList(val.value) - } - })(), - sink: async function (source: Source) { - if (sinkSunk) { - throw new Error('sink already called on stream') - } - sinkSunk = true - try { - for await (const chunks of source) { - if (chunks instanceof Uint8Array) { - await writer.write(chunks) - } else { - for (const buf of chunks) { - await writer.write(buf) - } - } - } - } finally { - stream.closeWrite() - } - } - } - - return stream -} - function parseMultiaddr (ma: Multiaddr): { url: string, certhashes: MultihashDigest[], remotePeer?: PeerId } { const parts = ma.stringTuples() @@ -238,40 +91,28 @@ function parseMultiaddr (ma: Multiaddr): { url: string, certhashes: MultihashDig return { url, certhashes, remotePeer } } -// Determines if `maybeSubset` is a subset of `set`. This means that all byte arrays in `maybeSubset` are present in `set`. -export function isSubset (set: Uint8Array[], maybeSubset: Uint8Array[]): boolean { - const intersection = maybeSubset.filter(byteArray => { - return Boolean(set.find((otherByteArray: Uint8Array) => { - if (byteArray.length !== otherByteArray.length) { - return false - } - - for (let index = 0; index < byteArray.length; index++) { - if (otherByteArray[index] !== byteArray[index]) { - return false - } - } - return true - })) - }) - return (intersection.length === maybeSubset.length) -} - export interface WebTransportInit { maxInboundStreams?: number + certificates?: WebTransportCertificate[] } export interface WebTransportComponents { peerId: PeerId } -class WebTransport implements Transport { +export interface WebTransportConfig { + maxInboundStreams: number + certificates?: WebTransportCertificate[] +} + +class WebTransportTransport implements Transport { private readonly components: WebTransportComponents - private readonly config: Required + private readonly config: WebTransportConfig constructor (components: WebTransportComponents, init: WebTransportInit = {}) { this.components = components this.config = { + ...init, maxInboundStreams: init.maxInboundStreams ?? 1000 } } @@ -299,7 +140,7 @@ class WebTransport implements Transport { throw new Error('Expected multiaddr to contain certhashes') } - const wt = new window.WebTransport(`${url}/.well-known/libp2p-webtransport?type=noise`, { + const wt = new WebTransport(`${url}/.well-known/libp2p-webtransport?type=noise`, { serverCertificateHashes: certhashes.map(certhash => ({ algorithm: 'sha-256', value: certhash.digest @@ -308,58 +149,75 @@ class WebTransport implements Transport { wt.closed.catch((error: Error) => { log.error('WebTransport transport closed due to:', error) }) + await wt.ready - if (remotePeer == null) { - throw new Error('Need a target peerid') - } + try { + if (!await this.authenticateWebTransport(wt, localPeer, certhashes, remotePeer)) { + throw new Error('Failed to authenticate webtransport') + } - if (!await this.authenticateWebTransport(wt, localPeer, remotePeer, certhashes)) { - throw new Error('Failed to authenticate webtransport') - } + const maConn: MultiaddrConnection = { + close: async (err?: Error) => { + if (err != null) { + log('Closing webtransport with err:', err) + } + wt.close() + }, + remoteAddr: ma, + timeline: { + open: Date.now() + }, + // This connection is never used directly since webtransport supports native streams. + ...inertDuplex() + } - const maConn: MultiaddrConnection = { - close: async (err?: Error) => { - if (err != null) { - log('Closing webtransport with err:', err) - } - wt.close() - }, - remoteAddr: ma, - timeline: { - open: Date.now() - }, - // This connection is never used directly since webtransport supports native streams. - ...inertDuplex() - } + wt.closed.catch((err: Error) => { + log.error('WebTransport connection closed with err:', err) + }) + .finally(() => { + // This is how we specify the connection is closed and shouldn't be used. + maConn.timeline.close = Date.now() + }) - wt.closed.catch((err: Error) => { - log.error('WebTransport connection closed:', err) - // This is how we specify the connection is closed and shouldn't be used. - maConn.timeline.close = Date.now() - }) + if (options.signal?.aborted === true) { + wt.close() + throw new AbortError() + } - try { - options?.signal?.throwIfAborted() - } catch (e) { + return await options.upgrader.upgradeOutbound(maConn, { + skipEncryption: true, + muxerFactory: webtransportMuxer(wt, wt.incomingBidirectionalStreams.getReader(), this.config), + skipProtection: true + }) + } catch (err) { wt.close() - throw e + throw err } - - return await options.upgrader.upgradeOutbound(maConn, { skipEncryption: true, muxerFactory: this.webtransportMuxer(wt), skipProtection: true }) } - async authenticateWebTransport (wt: typeof window.WebTransport, localPeer: PeerId, remotePeer: PeerId, certhashes: Array>): Promise { + async authenticateWebTransport (wt: WebTransport, localPeer: PeerId, certhashes: Array>, remotePeer?: PeerId): Promise { const stream = await wt.createBidirectionalStream() const writer = stream.writable.getWriter() const reader = stream.readable.getReader() await writer.ready - const duplex = { + const duplex: Duplex = { source: (async function * () { - while (true) { - const val = await reader.read() - yield val.value + try { + while (true) { + const val = await reader.read() + + if (val.done) { + return + } + + if (val.value != null) { + yield val.value + } + } + } finally { + reader.releaseLock() } })(), sink: async function (source: Source) { @@ -369,9 +227,9 @@ class WebTransport implements Transport { } } - const n = noise()() + const encrypter = noise()() - const { remoteExtensions } = await n.secureOutbound(localPeer, duplex, remotePeer) + const { remoteExtensions } = await encrypter.secureOutbound(localPeer, duplex, remotePeer) // We're done with this authentication stream writer.close().catch((err: Error) => { @@ -390,87 +248,16 @@ class WebTransport implements Transport { return true } - webtransportMuxer (wt: typeof window.WebTransport): StreamMuxerFactory { - let streamIDCounter = 0 - const config = this.config - return { - protocol: 'webtransport', - createStreamMuxer: (init?: StreamMuxerInit): StreamMuxer => { - // !TODO handle abort signal when WebTransport supports this. - - if (typeof init === 'function') { - // The api docs say that init may be a function - init = { onIncomingStream: init } - } - - const activeStreams: Stream[] = []; - - (async function () { - //! TODO unclear how to add backpressure here? - - const reader = wt.incomingBidirectionalStreams.getReader() - while (true) { - const { done, value: wtStream } = await reader.read() - if (done === true) { - break - } - if (activeStreams.length >= config.maxInboundStreams) { - // We've reached our limit, close this stream. - wtStream.writable.close().catch((err: Error) => { - log.error(`Failed to close inbound stream that crossed our maxInboundStream limit: ${err.message}`) - }) - wtStream.readable.cancel().catch((err: Error) => { - log.error(`Failed to close inbound stream that crossed our maxInboundStream limit: ${err.message}`) - }) - } else { - const stream = await webtransportBiDiStreamToStream(wtStream, String(streamIDCounter++), 'inbound', activeStreams, init?.onStreamEnd) - activeStreams.push(stream) - init?.onIncomingStream?.(stream) - } - } - })().catch(() => { - log.error('WebTransport failed to receive incoming stream') - }) - - const muxer: StreamMuxer = { - protocol: 'webtransport', - streams: activeStreams, - newStream: async (name?: string): Promise => { - const wtStream = await wt.createBidirectionalStream() - - const stream = await webtransportBiDiStreamToStream(wtStream, String(streamIDCounter++), init?.direction ?? 'outbound', activeStreams, init?.onStreamEnd) - activeStreams.push(stream) - - return stream - }, - - /** - * Close or abort all tracked streams and stop the muxer - */ - close: (err?: Error) => { - if (err != null) { - log('Closing webtransport muxer with err:', err) - } - wt.close() - }, - // This stream muxer is webtransport native. Therefore it doesn't plug in with any other duplex. - ...inertDuplex() - } - - try { - init?.signal?.throwIfAborted() - } catch (e) { - wt.close() - throw e - } - - return muxer - } + createListener (options: CreateListenerOptions): Listener { + if (this.config.certificates == null || this.config.certificates.length === 0) { + throw new Error('WebTransport certificate is required') } - } - createListener (options: CreateListenerOptions): Listener { - throw new Error('Webtransport servers are not supported in Node or the browser') + return createListener({ + ...options, + certificates: this.config.certificates, + peerId: this.components.peerId + }) } /** @@ -482,5 +269,5 @@ class WebTransport implements Transport { } export function webTransport (init: WebTransportInit = {}): (components: WebTransportComponents) => Transport { - return (components: WebTransportComponents) => new WebTransport(components, init) + return (components: WebTransportComponents) => new WebTransportTransport(components, init) } diff --git a/src/listener.browser.ts b/src/listener.browser.ts new file mode 100644 index 0000000..63cd881 --- /dev/null +++ b/src/listener.browser.ts @@ -0,0 +1,5 @@ +import type { CreateListenerOptions, Listener } from '@libp2p/interface-transport' + +export default function createListener (options: CreateListenerOptions): Listener { + throw new Error('Not implemented') +} diff --git a/src/listener.ts b/src/listener.ts new file mode 100644 index 0000000..6328765 --- /dev/null +++ b/src/listener.ts @@ -0,0 +1,276 @@ +import { logger } from '@libp2p/logger' +import { EventEmitter, CustomEvent } from '@libp2p/interfaces/events' +import type { WebTransportCertificate } from './index.js' +import { multiaddr } from '@multiformats/multiaddr' +import * as os from 'os' +import { noise } from '@chainsafe/libp2p-noise' +import toIt from 'browser-readablestream-to-it' +import type { MultiaddrConnection, Connection } from '@libp2p/interface-connection' +import type { Upgrader, Listener, ListenerEvents, CreateListenerOptions } from '@libp2p/interface-transport' +import type { Multiaddr } from '@multiformats/multiaddr' +import type { PeerId } from '@libp2p/interface-peer-id' +import type { Duplex, Source } from 'it-stream-types' +import type { WebTransportSession } from '@fails-components/webtransport' +import { inertDuplex } from './utils.js' +import { webtransportMuxer } from './muxer.js' +import { createServer, WebTransportServer } from './create-server.js' +import { ipPortToMultiaddr } from '@libp2p/utils/ip-port-to-multiaddr' +import { base64url } from 'multiformats/bases/base64' + +const log = logger('libp2p:webtransport:listener') +const CODE_P2P = 421 + +const networks = os.networkInterfaces() + +function isAnyAddr (ip: string): boolean { + return ['0.0.0.0', '::'].includes(ip) +} + +function getNetworkAddrs (family: string): string[] { + const addresses: string[] = [] + + for (const [, netAddrs] of Object.entries(networks)) { + if (netAddrs != null) { + for (const netAddr of netAddrs) { + if (netAddr.family === family) { + addresses.push(netAddr.address) + } + } + } + } + + return addresses +} + +const ProtoFamily = { ip4: 'IPv4', ip6: 'IPv6' } + +function getMultiaddrs (proto: 'ip4' | 'ip6', ip: string, port: number, certificates: WebTransportCertificate[]): Multiaddr[] { + const certhashes = certificates.map(cert => { + return `/certhash/${base64url.encode(cert.hash.bytes)}` + }).join('') + + const toMa = (ip: string): Multiaddr => multiaddr(`/${proto}/${ip}/udp/${port}/quic/webtransport${certhashes}`) + return (isAnyAddr(ip) ? getNetworkAddrs(ProtoFamily[proto]) : [ip]).map(toMa) +} + +/** + * Attempts to close the given maConn. If a failure occurs, it will be logged + */ +async function attemptClose (conn: Connection): Promise { + try { + await conn.close() + } catch (err) { + log.error('an error occurred closing the connection', err) + } +} + +interface WebTransportListenerInit extends CreateListenerOptions { + peerId: PeerId + handler?: (conn: Connection) => void + upgrader: Upgrader + certificates: WebTransportCertificate[] +} + +type Status = { started: false } | { started: true, listeningAddr: Multiaddr, peerId: string | null } + +class WebTransportListener extends EventEmitter implements Listener { + private server?: WebTransportServer + private readonly certificates: WebTransportCertificate[] + private readonly peerId: PeerId + private readonly upgrader: Upgrader + private readonly handler?: (conn: Connection) => void + /** Keep track of open connections to destroy in case of timeout */ + private readonly connections: Connection[] + + private status: Status = { started: false } + + constructor (init: WebTransportListenerInit) { + super() + + this.certificates = init.certificates + this.peerId = init.peerId + this.upgrader = init.upgrader + this.handler = init.handler + this.connections = [] + } + + async onSession (session: WebTransportSession): Promise { + const bidiReader = session.incomingBidirectionalStreams.getReader() + + // read one stream to do authentication + log('read authentication stream') + const bidistr = await bidiReader.read() + + if (bidistr.done) { + return + } + + // ok we got a stream + const bidistream = bidistr.value + const writer = bidistream.writable.getWriter() + + const encrypter = noise({ + extensions: { + webtransportCerthashes: this.certificates.map(cert => cert.hash.bytes) + } + })() + const duplex: Duplex = { + source: toIt(bidistream.readable), + sink: async (source: Source) => { + for await (const buf of source) { + await writer.write(buf) + } + } + } + + log('secure inbound stream') + const { remotePeer } = await encrypter.secureInbound(this.peerId, duplex) + + // upgrade it + const maConn: MultiaddrConnection = { + close: async (err?: Error) => { + if (err != null) { + log('Closing webtransport with err:', err) + } + session.close() + }, + // TODO: pull this from webtransport + // remoteAddr: ipPortToMultiaddr(session.remoteAddress, session.remotePort), + remoteAddr: ipPortToMultiaddr('127.0.0.1', 8080).encapsulate(`/p2p/${remotePeer.toString()}`), + timeline: { + open: Date.now() + }, + // This connection is never used directly since webtransport supports native streams. + ...inertDuplex() + } + + session.closed.catch((err: Error) => { + log.error('WebTransport connection closed with error:', err) + }).finally(() => { + // This is how we specify the connection is closed and shouldn't be used. + maConn.timeline.close = Date.now() + }) + + try { + log('upgrade inbound stream') + const connection = await this.upgrader.upgradeInbound(maConn, { + skipEncryption: true, + skipProtection: true, + muxerFactory: webtransportMuxer(session, bidiReader, { + maxInboundStreams: 1000 + }) + }) + + // We're done with this authentication stream + writer.close().catch((err: Error) => { + log.error('Failed to close authentication stream writer', err) + }) + + this.connections.push(connection) + + if (this.handler != null) { + this.handler(connection) + } + + this.dispatchEvent(new CustomEvent('connection', { + detail: connection + })) + } catch (err: any) { + session.close({ + closeCode: 500, + reason: err.message + }) + } + } + + getAddrs (): Multiaddr[] { + if (!this.status.started || this.server == null) { + return [] + } + + let addrs: Multiaddr[] = [] + const address = this.server.address() + + if (address == null) { + return [] + } + + try { + if (address.family === 'IPv4') { + addrs = addrs.concat(getMultiaddrs('ip4', address.host, address.port, this.certificates)) + } else if (address.family === 'IPv6') { + addrs = addrs.concat(getMultiaddrs('ip6', address.host, address.port, this.certificates)) + } + } catch (err) { + log.error('could not turn %s:%s into multiaddr', address.host, address.port, err) + } + + return addrs.map(ma => this.peerId != null ? ma.encapsulate(`/p2p/${this.peerId.toString()}`) : ma) + } + + async listen (ma: Multiaddr): Promise { + log('listen on multiaddr %s', ma) + + const peerId = ma.getPeerId() + const listeningAddr = peerId == null ? ma.decapsulateCode(CODE_P2P) : ma + + this.status = { started: true, listeningAddr, peerId } + + const options = listeningAddr.toOptions() + + const server = this.server = createServer({ + port: options.port, + host: options.host, + secret: this.certificates[0].secret, + cert: this.certificates[0].pem, + privKey: this.certificates[0].privateKey + }) + + server.on('listening', () => { + log('server listening %s', ma, server.address()) + this.dispatchEvent(new CustomEvent('listening')) + }) + server.on('error', (err) => { + log('server error %s', ma, err) + this.dispatchEvent(new CustomEvent('error', { detail: err })) + }) + server.on('session', (session) => { + log('server new session %s', ma) + this.onSession(session) + .catch(err => { + log.error('error handling new session', err) + session.close() + }) + }) + server.on('close', () => { + log('server close %s', ma) + this.dispatchEvent(new CustomEvent('close')) + }) + + await new Promise((resolve, reject) => { + server.listen() + + server.on('listening', () => { + resolve() + }) + }) + } + + async close (): Promise { + if (this.server == null) { + return + } + + log('closing connections') + await Promise.all( + this.connections.map(async conn => { await attemptClose(conn) }) + ) + + log('stopping server') + this.server.close() + } +} + +export default function createListener (options: WebTransportListenerInit): Listener { + return new WebTransportListener(options) +} diff --git a/src/muxer.ts b/src/muxer.ts new file mode 100644 index 0000000..dbd0ba7 --- /dev/null +++ b/src/muxer.ts @@ -0,0 +1,229 @@ +import type { Direction, Stream } from '@libp2p/interface-connection' +import type { StreamMuxer, StreamMuxerFactory, StreamMuxerInit } from '@libp2p/interface-stream-muxer' +import { logger } from '@libp2p/logger' +import type { Source } from 'it-stream-types' +import { Uint8ArrayList } from 'uint8arraylist' +import { inertDuplex } from './utils.js' +import type WebTransport from './webtransport.js' +import type { WebTransportBidirectionalStream } from '@fails-components/webtransport' +import { AbortError } from '@libp2p/interfaces/errors' + +const log = logger('libp2p:webtransport:muxer') + +export interface WebTransportMuxerInit { + maxInboundStreams: number +} + +export function webtransportMuxer (wt: WebTransport, reader: ReadableStreamDefaultReader, config: WebTransportMuxerInit): StreamMuxerFactory { + let streamIDCounter = 0 + + return { + protocol: 'webtransport', + createStreamMuxer: (init?: StreamMuxerInit): StreamMuxer => { + // !TODO handle abort signal when WebTransport supports this. + + const activeStreams: Stream[] = [] + + void Promise.resolve().then(async () => { + //! TODO unclear how to add backpressure here? + while (true) { + const { done, value: wtStream } = await reader.read() + + if (done) { + log('streams have ended') + break + } + + if (wtStream == null) { + log('new incoming stream was undefined') + break + } + + log('new incoming stream') + + if (activeStreams.length >= config.maxInboundStreams) { + // We've reached our limit, close this stream. + wtStream.writable.close().catch((err: Error) => { + log.error(`Failed to close inbound stream that crossed our maxInboundStream limit: ${err.message}`) + }) + wtStream.readable.cancel().catch((err: Error) => { + log.error(`Failed to close inbound stream that crossed our maxInboundStream limit: ${err.message}`) + }) + } else { + const stream = await webtransportBiDiStreamToStream(wtStream, String(streamIDCounter++), 'inbound', activeStreams, init?.onStreamEnd) + activeStreams.push(stream) + init?.onIncomingStream?.(stream) + } + } + }) + + const muxer: StreamMuxer = { + protocol: 'webtransport', + streams: activeStreams, + newStream: async (name?: string): Promise => { + log('new outgoing stream') + + const wtStream = await wt.createBidirectionalStream() + const stream = await webtransportBiDiStreamToStream(wtStream, String(streamIDCounter++), init?.direction ?? 'outbound', activeStreams, init?.onStreamEnd) + activeStreams.push(stream) + + return stream + }, + + /** + * Close or abort all tracked streams and stop the muxer + */ + close: (err?: Error) => { + if (err != null) { + log('Closing webtransport muxer with err:', err) + } + wt.close() + }, + // This stream muxer is webtransport native. Therefore it doesn't plug in with any other duplex. + ...inertDuplex() + } + + if (init?.signal?.aborted === true) { + wt.close() + throw new AbortError() + } + + return muxer + } + } +} + +async function webtransportBiDiStreamToStream (bidiStream: any, streamId: string, direction: Direction, activeStreams: Stream[], onStreamEnd: undefined | ((s: Stream) => void)): Promise { + const writer = bidiStream.writable.getWriter() + const reader = bidiStream.readable.getReader() + await writer.ready + + function cleanupStreamFromActiveStreams (): void { + const index = activeStreams.findIndex(s => s === stream) + if (index !== -1) { + activeStreams.splice(index, 1) + stream.stat.timeline.close = Date.now() + onStreamEnd?.(stream) + } + } + + let writerClosed = false + let readerClosed = false; + (async function () { + const err: Error | undefined = await writer.closed.catch((err: Error) => err) + if (err != null) { + const msg = err.message + if (!(msg.includes('aborted by the remote server') || msg.includes('STOP_SENDING'))) { + log.error(`WebTransport writer closed unexpectedly: streamId=${streamId} err=${err.message}`) + } + } + writerClosed = true + if (writerClosed && readerClosed) { + cleanupStreamFromActiveStreams() + } + })().catch(() => { + log.error('WebTransport failed to cleanup closed stream') + }); + + (async function () { + const err: Error | undefined = await reader.closed.catch((err: Error) => err) + if (err != null) { + log.error(`WebTransport reader closed unexpectedly: streamId=${streamId} err=${err.message}`) + } + readerClosed = true + if (writerClosed && readerClosed) { + cleanupStreamFromActiveStreams() + } + })().catch(() => { + log.error('WebTransport failed to cleanup closed stream') + }) + + let sinkSunk = false + const stream: Stream = { + id: streamId, + abort (_err: Error) { + if (!writerClosed) { + writer.abort() + writerClosed = true + } + stream.closeRead() + readerClosed = true + cleanupStreamFromActiveStreams() + }, + close () { + stream.closeRead() + stream.closeWrite() + cleanupStreamFromActiveStreams() + }, + + closeRead () { + if (!readerClosed) { + reader.cancel().catch((err: any) => { + if (err.toString().includes('RESET_STREAM') === true) { + writerClosed = true + } + }) + readerClosed = true + } + if (writerClosed) { + cleanupStreamFromActiveStreams() + } + }, + closeWrite () { + if (!writerClosed) { + writerClosed = true + writer.close().catch((err: any) => { + if (err.toString().includes('RESET_STREAM') === true) { + readerClosed = true + } + }) + } + if (readerClosed) { + cleanupStreamFromActiveStreams() + } + }, + reset () { + stream.close() + }, + stat: { + direction, + timeline: { open: Date.now() } + }, + metadata: {}, + source: (async function * () { + while (true) { + const val = await reader.read() + if (val.done === true) { + readerClosed = true + if (writerClosed) { + cleanupStreamFromActiveStreams() + } + return + } + + yield new Uint8ArrayList(val.value) + } + })(), + sink: async function (source: Source) { + if (sinkSunk) { + throw new Error('sink already called on stream') + } + sinkSunk = true + try { + for await (const chunks of source) { + if (chunks instanceof Uint8Array) { + await writer.write(chunks) + } else { + for (const buf of chunks) { + await writer.write(buf) + } + } + } + } finally { + stream.closeWrite() + } + } + } + + return stream +} diff --git a/src/utils.ts b/src/utils.ts new file mode 100644 index 0000000..45b3402 --- /dev/null +++ b/src/utils.ts @@ -0,0 +1,40 @@ +import type { Duplex, Source } from 'it-stream-types' + +// Determines if `maybeSubset` is a subset of `set`. This means that all byte arrays in `maybeSubset` are present in `set`. +export function isSubset (set: Uint8Array[], maybeSubset: Uint8Array[]): boolean { + const intersection = maybeSubset.filter(byteArray => { + return Boolean(set.find((otherByteArray: Uint8Array) => { + if (byteArray.length !== otherByteArray.length) { + return false + } + + for (let index = 0; index < byteArray.length; index++) { + if (otherByteArray[index] !== byteArray[index]) { + return false + } + } + return true + })) + }) + return (intersection.length === maybeSubset.length) +} + +// Duplex that does nothing. Needed to fulfill the interface +export function inertDuplex (): Duplex { + return { + source: { + [Symbol.asyncIterator] () { + return { + async next () { + // This will never resolve + return await new Promise(() => { }) + } + } + } + }, + sink: async (source: Source) => { + // This will never resolve + return await new Promise(() => { }) + } + } +} diff --git a/src/webtransport.browser.ts b/src/webtransport.browser.ts new file mode 100644 index 0000000..39b404b --- /dev/null +++ b/src/webtransport.browser.ts @@ -0,0 +1,4 @@ + +import type { WebTransport } from '@fails-components/webtransport' + +export default WebTransport diff --git a/src/webtransport.ts b/src/webtransport.ts new file mode 100644 index 0000000..5d83b49 --- /dev/null +++ b/src/webtransport.ts @@ -0,0 +1,3 @@ +import { WebTransport } from '@fails-components/webtransport' + +export default WebTransport diff --git a/test/certificate.browser.ts b/test/certificate.browser.ts new file mode 100644 index 0000000..3f495c3 --- /dev/null +++ b/test/certificate.browser.ts @@ -0,0 +1,4 @@ + +export async function generateWebTransportCertificates (): Promise { + throw new Error('Not implemented') +} diff --git a/test/certificate.ts b/test/certificate.ts new file mode 100644 index 0000000..2192646 --- /dev/null +++ b/test/certificate.ts @@ -0,0 +1,495 @@ +// Copyright (c) 2022 Marten Richter or other contributers (see commit). All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +// large portions taken from selfsigned with the following Copyright and license +/* MIT License +Copyright (c) 2013 José F. Romaniello +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +*/ + +// also large portions taken from the node-forge project licensed under +/* New BSD License (3-clause) +Copyright (c) 2010, Digital Bazaar, Inc. +All rights reserved. +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + * Neither the name of Digital Bazaar, Inc. nor the + names of its contributors may be used to endorse or promote products + derived from this software without specific prior written permission. +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL DIGITAL BAZAAR BE LIABLE FOR ANY +DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ + +// @ts-expect-error has no types, @types/node-forge is broken +import forge from 'node-forge' +import { webcrypto as crypto, X509Certificate } from 'crypto' +import * as Digest from 'multiformats/hashes/digest' +import { sha256 } from 'multiformats/hashes/sha2' +import type { WebTransportCertificate } from '../src/index.js' + +const { pki, asn1, oids } = forge +// taken from node-forge +/** + * Converts an X.509 subject or issuer to an ASN.1 RDNSequence. + * + * @param obj - the subject or issuer (distinguished name). + * + * @returns the ASN.1 RDNSequence. + */ +function _dnToAsn1 (obj: any): any { + // create an empty RDNSequence + const rval = asn1.create(asn1.Class.UNIVERSAL, asn1.Type.SEQUENCE, true, []) + + // iterate over attributes + let attr, set + const attrs = obj.attributes + for (let i = 0; i < attrs.length; ++i) { + attr = attrs[i] + let value = attr.value + + // reuse tag class for attribute value if available + let valueTagClass = asn1.Type.PRINTABLESTRING + if ('valueTagClass' in attr) { + valueTagClass = attr.valueTagClass + + if (valueTagClass === asn1.Type.UTF8) { + value = forge.util.encodeUtf8(value) + } + // FIXME: handle more encodings + } + + // create a RelativeDistinguishedName set + // each value in the set is an AttributeTypeAndValue first + // containing the type (an OID) and second the value + set = asn1.create(asn1.Class.UNIVERSAL, asn1.Type.SET, true, [ + asn1.create(asn1.Class.UNIVERSAL, asn1.Type.SEQUENCE, true, [ + // AttributeType + asn1.create( + asn1.Class.UNIVERSAL, + asn1.Type.OID, + false, + asn1.oidToDer(attr.type).getBytes() + ), + // AttributeValue + asn1.create(asn1.Class.UNIVERSAL, valueTagClass, false, value) + ]) + ]) + rval.value.push(set) + } + + return rval +} + +// taken from node-forge almost not modified +/** + * Converts a Date object to ASN.1 + * Handles the different format before and after 1st January 2050 + * + * @param date - date object. + * + * @returns the ASN.1 object representing the date. + */ + +const jan11950 = new Date('1950-01-01T00:00:00Z') +const jan12050 = new Date('2050-01-01T00:00:00Z') +function _dateToAsn1 (date: Date): any { + if (date >= jan11950 && date < jan12050) { + return asn1.create( + asn1.Class.UNIVERSAL, + asn1.Type.UTCTIME, + false, + asn1.dateToUtcTime(date) + ) + } else { + return asn1.create( + asn1.Class.UNIVERSAL, + asn1.Type.GENERALIZEDTIME, + false, + asn1.dateToGeneralizedTime(date) + ) + } +} + +// taken from node-forge almost not modified +/** + * Convert signature parameters object to ASN.1 + * + * @param {string} oid - Signature algorithm OID + * @param params - The signature parametrs object + * @returns ASN.1 object representing signature parameters + */ +function _signatureParametersToAsn1 (oid: string, params: any): any { + const parts = [] + switch (oid) { + case oids['RSASSA-PSS']: + if (params.hash.algorithmOid !== undefined) { + parts.push( + asn1.create(asn1.Class.CONTEXT_SPECIFIC, 0, true, [ + asn1.create(asn1.Class.UNIVERSAL, asn1.Type.SEQUENCE, true, [ + asn1.create( + asn1.Class.UNIVERSAL, + asn1.Type.OID, + false, + asn1.oidToDer(params.hash.algorithmOid).getBytes() + ), + asn1.create(asn1.Class.UNIVERSAL, asn1.Type.NULL, false, '') + ]) + ]) + ) + } + + if (params.mgf.algorithmOid !== undefined) { + parts.push( + asn1.create(asn1.Class.CONTEXT_SPECIFIC, 1, true, [ + asn1.create(asn1.Class.UNIVERSAL, asn1.Type.SEQUENCE, true, [ + asn1.create( + asn1.Class.UNIVERSAL, + asn1.Type.OID, + false, + asn1.oidToDer(params.mgf.algorithmOid).getBytes() + ), + asn1.create(asn1.Class.UNIVERSAL, asn1.Type.SEQUENCE, true, [ + asn1.create( + asn1.Class.UNIVERSAL, + asn1.Type.OID, + false, + asn1.oidToDer(params.mgf.hash.algorithmOid).getBytes() + ), + asn1.create(asn1.Class.UNIVERSAL, asn1.Type.NULL, false, '') + ]) + ]) + ]) + ) + } + + if (params.saltLength !== undefined) { + parts.push( + asn1.create(asn1.Class.CONTEXT_SPECIFIC, 2, true, [ + asn1.create( + asn1.Class.UNIVERSAL, + asn1.Type.INTEGER, + false, + asn1.integerToDer(params.saltLength).getBytes() + ) + ]) + ) + } + + return asn1.create(asn1.Class.UNIVERSAL, asn1.Type.SEQUENCE, true, parts) + default: + return asn1.create(asn1.Class.UNIVERSAL, asn1.Type.NULL, false, '') + } +} + +// taken from node-forge and modified to work with ECDSA +/** + * Gets the ASN.1 TBSCertificate part of an X.509v3 certificate. + * + * @param cert - the certificate. + * + * @returns the asn1 TBSCertificate. + */ +function getTBSCertificate (cert: PKICertificate): any { + // TBSCertificate + const notBefore = _dateToAsn1(cert.validity.notBefore) + const notAfter = _dateToAsn1(cert.validity.notAfter) + + const tbs = asn1.create(asn1.Class.UNIVERSAL, asn1.Type.SEQUENCE, true, [ + // version + asn1.create(asn1.Class.CONTEXT_SPECIFIC, 0, true, [ + // integer + asn1.create( + asn1.Class.UNIVERSAL, + asn1.Type.INTEGER, + false, + asn1.integerToDer(cert.version).getBytes() + ) + ]), + // serialNumber + asn1.create( + asn1.Class.UNIVERSAL, + asn1.Type.INTEGER, + false, + forge.util.hexToBytes(cert.serialNumber) + ), + // signature + asn1.create(asn1.Class.UNIVERSAL, asn1.Type.SEQUENCE, true, [ + // algorithm + asn1.create( + asn1.Class.UNIVERSAL, + asn1.Type.OID, + false, + asn1.oidToDer(cert.siginfo.algorithmOid).getBytes() + ), + // parameters + _signatureParametersToAsn1( + cert.siginfo.algorithmOid, + cert.siginfo.parameters + ) + ]), + // issuer + _dnToAsn1(cert.issuer), + // validity + asn1.create(asn1.Class.UNIVERSAL, asn1.Type.SEQUENCE, true, [ + notBefore, + notAfter + ]), + // subject + _dnToAsn1(cert.subject), + // SubjectPublicKeyInfo + // here comes our modification, we are other objects here + asn1.fromDer( + new forge.util.ByteBuffer( + cert.publicKey + ) /* is in already SPKI format but in DER encoding */ + ) + ]) + + if (cert.issuer.uniqueId !== '' && cert.issuer.uniqueId != null) { + // issuerUniqueID (optional) + tbs.value.push( + asn1.create(asn1.Class.CONTEXT_SPECIFIC, 1, true, [ + asn1.create( + asn1.Class.UNIVERSAL, + asn1.Type.BITSTRING, + false, + // TODO: support arbitrary bit length ids + String.fromCharCode(0x00) + cert.issuer.uniqueId + ) + ]) + ) + } + if (cert.subject.uniqueId !== '' && cert.subject.uniqueId != null) { + // subjectUniqueID (optional) + tbs.value.push( + asn1.create(asn1.Class.CONTEXT_SPECIFIC, 2, true, [ + asn1.create( + asn1.Class.UNIVERSAL, + asn1.Type.BITSTRING, + false, + // TODO: support arbitrary bit length ids + String.fromCharCode(0x00) + cert.subject.uniqueId + ) + ]) + ) + } + + if (cert.extensions.length > 0) { + // extensions (optional) + tbs.value.push(pki.certificateExtensionsToAsn1(cert.extensions)) + } + + return tbs +} + +// function taken form selfsigned +// a hexString is considered negative if it's most significant bit is 1 +// because serial numbers use ones' complement notation +// this RFC in section 4.1.2.2 requires serial numbers to be positive +// http://www.ietf.org/rfc/rfc5280.txt +function toPositiveHex (hexString: string): string { + let mostSiginficativeHexAsInt = parseInt(hexString[0], 16) + if (mostSiginficativeHexAsInt < 8) { + return hexString + } + + mostSiginficativeHexAsInt -= 8 + return mostSiginficativeHexAsInt.toString() + hexString.substring(1) +} + +export interface GenerateWebTransportCertificateOptions { + days?: number + start?: Date + extensions?: any[] +} + +export interface ForgeWebTransportCertificate { + private: string + public: string + privateRaw: Uint8Array + publicRaw: Uint8Array + cert: string + hash: Uint8Array + fingerprint: string +} + +interface PKICertificate { + serialNumber: string + validity: { + notBefore: Date + notAfter: Date + } + publicKey: ArrayBuffer + siginfo: { + algorithmOid: string + parameters: any + } + signatureOid: string + signature: ArrayBuffer + tbsCertificate: ArrayBuffer + md: ArrayBuffer + version: number + extensions: any[] + subject: { + uniqueId: string + } + issuer: { + uniqueId: string + } + + setSubject: (attrs: any) => void + setIssuer: (attrs: any) => void + setExtensions: (attrs: any) => void +} + +// the next is an edit of the selfsigned function reduced to the function necessary for webtransport +async function generateWebTransportCertificate (attrs: Array<{ shortName: string, value: string }>, options: GenerateWebTransportCertificateOptions = {}): Promise { + const keyPair = await crypto.subtle.generateKey( + { + name: 'ECDSA', + namedCurve: 'P-256' + }, + true, + ['sign', 'verify'] + ) + + const cert: PKICertificate = pki.createCertificate() + + cert.serialNumber = toPositiveHex( + forge.util.bytesToHex(forge.random.getBytesSync(9)) + ) // the serial number can be decimal or hex (if preceded by 0x) + cert.validity.notBefore = options.start ?? new Date() + cert.validity.notAfter = new Date() + cert.validity.notAfter.setDate( + cert.validity.notBefore.getDate() + (options.days ?? 14) + ) // per spec only 14 days allowed + + cert.setSubject(attrs) + cert.setIssuer(attrs) + + const privateKey = crypto.subtle.exportKey('pkcs8', keyPair.privateKey) + const publicKey = (cert.publicKey = await crypto.subtle.exportKey( + 'spki', + keyPair.publicKey + )) + + cert.setExtensions( + (options.extensions != null) || [ + { + name: 'basicConstraints', + cA: true + }, + { + name: 'keyUsage', + keyCertSign: true, + digitalSignature: true, + nonRepudiation: true, + keyEncipherment: true, + dataEncipherment: true + }, + { + name: 'subjectAltName', + altNames: [ + { + type: 6, // URI + value: 'http://example.org/webid#me' + } + ] + } + ] + ) + + // to signing + // patch oids object + oids['1.2.840.10045.4.3.2'] = 'ecdsa-with-sha256' + oids['ecdsa-with-sha256'] = '1.2.840.10045.4.3.2' + + cert.siginfo.algorithmOid = cert.signatureOid = '1.2.840.10045.4.3.2' // 'ecdsa-with-sha256' + + cert.tbsCertificate = getTBSCertificate(cert) + const encoded = Buffer.from( + asn1.toDer(cert.tbsCertificate).getBytes(), + 'binary' + ) + cert.md = await crypto.subtle.digest('SHA-256', encoded) + cert.signature = await crypto.subtle.sign( + { + name: 'ECDSA', + hash: { name: 'SHA-256' } + }, + keyPair.privateKey, + encoded + ) + + const pemcert = pki.certificateToPem(cert) + + const x509cert = new X509Certificate(pemcert) + + const certhash = Buffer.from( + x509cert.fingerprint256.split(':').map((el) => parseInt(el, 16)) + ) + + const privateKeyBuffer = await privateKey + + const pem = { + private: forge.pem.encode({ + type: 'PRIVATE KEY', + body: new forge.util.ByteBuffer(privateKeyBuffer).getBytes() + }), + public: forge.pem.encode({ + type: 'PUBLIC KEY', + body: new forge.util.ByteBuffer(publicKey).getBytes() + }), + privateRaw: new Uint8Array(privateKeyBuffer, 0, privateKeyBuffer.byteLength), + publicRaw: new Uint8Array(publicKey, 0, publicKey.byteLength), + cert: pemcert, + hash: certhash, + fingerprint: x509cert.fingerprint256 + } + + return pem +} + +export async function generateWebTransportCertificates (attrs: Array<{ shortName: string, value: string }>, options: GenerateWebTransportCertificateOptions[] = []): Promise { + return await Promise.all( + options.map(async options => { + const certificate = await generateWebTransportCertificate(attrs, options) + + const digest = Digest.create(sha256.code, certificate.hash) + + return { + privateKey: certificate.private, + pem: certificate.cert, + hash: digest, + secret: 'super-secret-shhhhhh' + } + }) + ) +} diff --git a/test/compliance.node.ts b/test/compliance.node.ts new file mode 100644 index 0000000..4053402 --- /dev/null +++ b/test/compliance.node.ts @@ -0,0 +1,63 @@ +import sinon from 'sinon' +import tests from '@libp2p/interface-transport-compliance-tests' +import { multiaddr } from '@multiformats/multiaddr' +import { createEd25519PeerId } from '@libp2p/peer-id-factory' +import { webTransport, WebTransportComponents } from '../src/index.js' +import { generateWebTransportCertificates } from './certificate.js' +import { base64url } from 'multiformats/bases/base64' + +describe('interface-transport compliance', () => { + tests({ + async setup () { + const components: WebTransportComponents = { + peerId: await createEd25519PeerId() + } + + const certificates = await generateWebTransportCertificates([ + { shortName: 'C', value: 'DE' }, + { shortName: 'ST', value: 'Berlin' }, + { shortName: 'L', value: 'Berlin' }, + { shortName: 'O', value: 'webtransport Test Server' }, + { shortName: 'CN', value: '127.0.0.1' } + ], [{ + // can be max 14 days according to the spec + days: 13 + }]) + + const certhash = base64url.encode(certificates[0].hash.bytes) + + const transport = webTransport({ + certificates + })(components) + const addrs = [ + multiaddr(`/ip4/127.0.0.1/udp/9091/quic/webtransport/certhash/${certhash}/p2p/${components.peerId.toString()}`), + multiaddr(`/ip4/127.0.0.1/udp/9092/quic/webtransport/certhash/${certhash}/p2p/${components.peerId.toString()}`), + multiaddr(`/ip4/127.0.0.1/udp/9093/quic/webtransport/certhash/${certhash}/p2p/${components.peerId.toString()}`), + multiaddr(`/ip6/::/udp/9094/quic/webtransport/certhash/${certhash}/p2p/${components.peerId.toString()}`) + ] + + // Used by the dial tests to simulate a delayed connect + const connector = { + delay (delayMs: number) { + // @ts-expect-error method is not part of transport interface + const authenticateWebTransport = transport.authenticateWebTransport.bind(transport) + + // @ts-expect-error method is not part of transport interface + sinon.replace(transport, 'authenticateWebTransport', async (wt: WebTransport, localPeer: PeerId, remotePeer: PeerId, certhashes: Array>) => { + await new Promise((resolve) => { + setTimeout(() => { resolve() }, delayMs) + }) + + return authenticateWebTransport(wt, localPeer, remotePeer, certhashes) + }) + }, + restore () { + sinon.restore() + } + } + + return { transport, addrs, connector } + }, + async teardown () {} + }) +}) diff --git a/test/node.ts b/test/node.ts new file mode 100644 index 0000000..18073fb --- /dev/null +++ b/test/node.ts @@ -0,0 +1 @@ +import './compliance.node.js' diff --git a/test/browser.ts b/test/webtransport.spec.ts similarity index 86% rename from test/browser.ts rename to test/webtransport.spec.ts index f466c7b..9c17f83 100644 --- a/test/browser.ts +++ b/test/webtransport.spec.ts @@ -4,17 +4,13 @@ import { expect } from 'aegir/chai' import { multiaddr } from '@multiformats/multiaddr' import { noise } from '@chainsafe/libp2p-noise' -import { webTransport, isSubset } from '../src/index' +import { webTransport } from '../src/index.js' import { createLibp2p } from 'libp2p' +import { isSubset } from '../src/utils.js' +import { randomBytes } from 'iso-random-stream' -declare global { - interface Window { - WebTransport: any - } -} - -describe('libp2p-webtransport', () => { - it('webtransport connects to go-libp2p', async () => { +describe('@libp2p/webtransport', () => { + it('webtransport connects', async () => { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion const maStr: string = process.env.serverAddr! const ma = multiaddr(maStr) @@ -35,8 +31,7 @@ describe('libp2p-webtransport', () => { // we can use the builtin ping system const stream = await node.dialProtocol(ma, '/ipfs/ping/1.0.0') - const data = new Uint8Array(32) - globalThis.crypto.getRandomValues(data) + const data = randomBytes(32) const pong = new Promise((resolve, reject) => { (async () => { @@ -84,13 +79,14 @@ describe('libp2p-webtransport', () => { await node.start() const err = await expect(node.dial(ma)).to.eventually.be.rejected() - expect(err.toString()).to.contain('Expected multiaddr to contain certhashes') + + expect(err.toString()).to.include('Expected multiaddr to contain certhashes') await node.stop() }) - it('Closes writes of streams after they have sunk a source', async () => { - // This is the behavor of stream muxers: (see mplex, yamux and compliance tests: https://github.com/libp2p/js-libp2p-interfaces/blob/master/packages/interface-stream-muxer-compliance-tests/src/close-test.ts) + it('closes writes of streams after they have sunk a source', async () => { + // This is the behaviour of stream muxers: (see mplex, yamux and compliance tests: https://github.com/libp2p/js-libp2p-interfaces/blob/master/packages/interface-stream-muxer-compliance-tests/src/close-test.ts) // eslint-disable-next-line @typescript-eslint/no-non-null-assertion const maStr: string = process.env.serverAddr! const ma = multiaddr(maStr)