fix: [#67] enable joining peers to receive all video streams

This commit is contained in:
Jeremy Kahn 2022-11-14 07:27:41 -06:00
parent cbe28caeb9
commit c1fad606f6

View File

@ -1,6 +1,8 @@
import { joinRoom, Room, BaseRoomConfig } from 'trystero' import { joinRoom, Room, BaseRoomConfig } from 'trystero'
import { TorrentRoomConfig } from 'trystero/torrent' import { TorrentRoomConfig } from 'trystero/torrent'
import { sleep } from 'utils'
export enum PeerHookType { export enum PeerHookType {
NEW_PEER = 'NEW_PEER', NEW_PEER = 'NEW_PEER',
AUDIO = 'AUDIO', AUDIO = 'AUDIO',
@ -14,6 +16,8 @@ export enum PeerStreamType {
SCREEN = 'SCREEN', SCREEN = 'SCREEN',
} }
const streamQueueAddDelay = 500
export class PeerRoom { export class PeerRoom {
private room: Room private room: Room
@ -34,6 +38,10 @@ export class PeerRoom {
Parameters<Room['onPeerStream']>[0] Parameters<Room['onPeerStream']>[0]
> = new Map() > = new Map()
private streamQueue: (() => Promise<any>)[] = []
private isProcessingPendingStreams = false
constructor(config: TorrentRoomConfig & BaseRoomConfig, roomId: string) { constructor(config: TorrentRoomConfig & BaseRoomConfig, roomId: string) {
this.roomConfig = config this.roomConfig = config
this.room = joinRoom(this.roomConfig, roomId) this.room = joinRoom(this.roomConfig, roomId)
@ -109,8 +117,28 @@ export class PeerRoom {
return this.room.makeAction<T>(namespace) return this.room.makeAction<T>(namespace)
} }
addStream: Room['addStream'] = (...args) => { addStream = (...args: Parameters<Room['addStream']>) => {
return this.room.addStream(...args) // New streams need to be added as a delayed queue to prevent race
// conditions on the receiver's end where streams and their metadata get
// mixed up.
this.streamQueue.push(
() => Promise.all(this.room.addStream(...args)),
() => sleep(streamQueueAddDelay)
)
this.processPendingStreams()
}
private processPendingStreams = async () => {
if (this.isProcessingPendingStreams) return
this.isProcessingPendingStreams = true
while (this.streamQueue.length > 0) {
await this.streamQueue.shift()?.()
}
this.isProcessingPendingStreams = false
} }
removeStream: Room['removeStream'] = (stream, targetPeers) => { removeStream: Room['removeStream'] = (stream, targetPeers) => {