From c1fad606f64cacb14391575f524206a9666580f2 Mon Sep 17 00:00:00 2001 From: Jeremy Kahn Date: Mon, 14 Nov 2022 07:27:41 -0600 Subject: [PATCH] fix: [#67] enable joining peers to receive all video streams --- src/services/PeerRoom/PeerRoom.ts | 32 +++++++++++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/src/services/PeerRoom/PeerRoom.ts b/src/services/PeerRoom/PeerRoom.ts index 7027b8f..7fedf6e 100644 --- a/src/services/PeerRoom/PeerRoom.ts +++ b/src/services/PeerRoom/PeerRoom.ts @@ -1,6 +1,8 @@ import { joinRoom, Room, BaseRoomConfig } from 'trystero' import { TorrentRoomConfig } from 'trystero/torrent' +import { sleep } from 'utils' + export enum PeerHookType { NEW_PEER = 'NEW_PEER', AUDIO = 'AUDIO', @@ -14,6 +16,8 @@ export enum PeerStreamType { SCREEN = 'SCREEN', } +const streamQueueAddDelay = 500 + export class PeerRoom { private room: Room @@ -34,6 +38,10 @@ export class PeerRoom { Parameters[0] > = new Map() + private streamQueue: (() => Promise)[] = [] + + private isProcessingPendingStreams = false + constructor(config: TorrentRoomConfig & BaseRoomConfig, roomId: string) { this.roomConfig = config this.room = joinRoom(this.roomConfig, roomId) @@ -109,8 +117,28 @@ export class PeerRoom { return this.room.makeAction(namespace) } - addStream: Room['addStream'] = (...args) => { - return this.room.addStream(...args) + addStream = (...args: Parameters) => { + // New streams need to be added as a delayed queue to prevent race + // conditions on the receiver's end where streams and their metadata get + // mixed up. + this.streamQueue.push( + () => Promise.all(this.room.addStream(...args)), + () => sleep(streamQueueAddDelay) + ) + + this.processPendingStreams() + } + + private processPendingStreams = async () => { + if (this.isProcessingPendingStreams) return + + this.isProcessingPendingStreams = true + + while (this.streamQueue.length > 0) { + await this.streamQueue.shift()?.() + } + + this.isProcessingPendingStreams = false } removeStream: Room['removeStream'] = (stream, targetPeers) => {