-
Notifications
You must be signed in to change notification settings - Fork 88
[Bugifx] Shubhra/ Add buffer to FFI events #508
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
🦋 Changeset detectedLatest commit: 772570f The changes in this PR will be included in the next version bump. This PR includes changesets to release 6 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
tsconfig.json
Outdated
@@ -1,6 +1,6 @@ | |||
{ | |||
"compilerOptions": { | |||
"lib": ["es2015"], | |||
"lib": ["es2015", "ES2021.WeakRef"], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need this so that linter doesn't throw an error when using FinalizationRegistry
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lukasIO is there a reason this is es2015
can we just update it to something later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we don't need FinalizationRegistry
. But still would be good to upgrade this?
tsconfig.json
Outdated
@@ -1,6 +1,6 @@ | |||
{ | |||
"compilerOptions": { | |||
"lib": ["es2015"], | |||
"lib": ["es2015", "ES2021.WeakRef"], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lukasIO is there a reason this is es2015
can we just update it to something later.
} | ||
} | ||
|
||
subscribe(): ReadableStream<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I chose this to be the equivalent of the asyncio's Queue
we use in python. The main feature we care about is asynchronous reads and waiting for events while the stream is empty.
packages/livekit-rtc/src/room.ts
Outdated
@@ -88,8 +90,19 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks> | |||
remoteParticipants: Map<string, RemoteParticipant> = new Map(); | |||
localParticipant?: LocalParticipant; | |||
|
|||
private static cleanupRegistry = new FinalizationRegistry((cleanup: () => void) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use FinalizationRegistry (see docs) to act as an equivalent of python's __del__
method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's also Symbol.dispose as defined in a ECMA proposal https://github.com/tc39/proposal-explicit-resource-management but I wasn't sure if this was merged in yet.
packages/livekit-rtc/src/room.ts
Outdated
} catch (error) { | ||
log.debug(error, 'Listen task ended'); | ||
} finally { | ||
this.listenTaskPromise = undefined; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it better to type it as | null
?
@davidzhao this should fix the issues folks were having with inbound sip calls in agents js |
} catch (error: unknown) { | ||
log.error(error, 'Error enqueuing item to stream'); | ||
toRemove.add(controller); | ||
} | ||
} | ||
|
||
for (const controller of toRemove) { | ||
this.subscribers.delete(controller); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this overkill? Just want to make sure there are no memory leaks.
/** | ||
* @throws "TypeError: Invalid state: ReadableStream is locked" | ||
* if the stream is locked, make sure the stream is not being read from | ||
* before calling this method. | ||
*/ | ||
unsubscribe(stream: ReadableStream<T>): void { | ||
stream.cancel(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't love this, but it works for now.
packages/livekit-rtc/src/room.ts
Outdated
constructor() { | ||
super(); | ||
// Register a finalizer to disconnect the room when it's garbage collected | ||
Room.cleanupRegistry.register(this, () => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need this. If the FfiHandle is dropped, the Rust side will already close the room
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good to know. I saw in the python side so that's why I implemented it - but had a felling we didn't need this.
packages/livekit-rtc/src/room.ts
Outdated
@@ -180,6 +193,9 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks> | |||
options, | |||
}); | |||
|
|||
// subscribe before connecting so we don't miss any events | |||
this.ffiQueue = FfiClient.instance.queue.subscribe(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On JS it may be simpler, just hooking into the FfiEvent using on
. We could just append to a list?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it's fine, but IIRC that was the only difference. JS using events, but python using queues
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On JS it may be simpler, just hooking into the FfiEvent using on that's exactly what this is doing.
We grab the events and append them to a queue. The only reason it's a stream is so reads are async and we don't block the main event loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean get rid of FfiClient.instance.queue
all together. Replace this with
connect(){
// wire up the event listener first
FfiClient.instance.on(FfiClientEvent.FfiEvent, this.onFfiEvent)
// send the connection request
const res = FfiClient.instance.request<ConnectResponse>({ ...... })
}
//
onEvent(event): {
this.buffer.push(event)
// only start processing the events once we've connected.
if (connected) {
this.proccessEvent(event)
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I understand it the order of operations are.
- Create a buffer for all the events coming in
- Connect to the room
- Immediately populate the buffer so we don't drop events
- Start processing events
Right now step 1 is done with this.ffiQueue = FfiClient.instance.queue.subscribe();
if we replace it with FfiClient.instance.on(FfiClientEvent.FfiEvent, this.onFfiEvent)
we still need to buffer the events to process until we receive the ConnectResponse
?
Discussed offline with @theomonnom - we're just going to buffer the events that come in before the connect in |
This ports over a fix that was done in the python side, but never implemented in the node sdk (PR).
User reported that the
Room
would throw an error if there was already a participant inside and they tried to connect. This is because we were processing theparticipant_connected
event before thetrackSubscribed
event.Before fix:
room.connect
trackSubscribed
← Tries to find participant that was never created!After fix:
room.connect
participants_updated
participant_connected
← Creates the participanttrack_published
track_subscribed
← Works because participant exists