Skip to content

Commit 899136b

Browse files
committed
Merge branch 'upstream/subscription-of-meters'
2 parents 4b93213 + f9eb37a commit 899136b

File tree

11 files changed

+790
-81
lines changed

11 files changed

+790
-81
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,4 @@ yarn-error.log
1515
!.yarn/plugins
1616
!.yarn/releases
1717
!.yarn/sdks
18-
!.yarn/versions
18+
!.yarn/versions

README.md

Lines changed: 51 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,14 @@ It has been tested with _Lawo Ruby_, _Lawo R3lay_, and _Lawo MxGUI_.
77
The current version is very losely based on the original library and Mr Gilles Dufour's rewrites. It is however rewritten almost completely from scratch and bears little to no resemblance to earlier libraries.
88

99
### Repository-specific Info for Developers
10-
* [Developer Info](DEVELOPER.md)
11-
* [Contribution Guidelines](CONTRIBUTING.md)
12-
13-
### General Sofie System Info
14-
* [Documentation](https://nrkno.github.io/sofie-core/)
15-
* [Releases](https://nrkno.github.io/sofie-core/releases)
1610

11+
- [Developer Info](DEVELOPER.md)
12+
- [Contribution Guidelines](CONTRIBUTING.md)
1713

14+
### General Sofie System Info
1815

16+
- [Documentation](https://nrkno.github.io/sofie-core/)
17+
- [Releases](https://nrkno.github.io/sofie-core/releases)
1918

2019
---
2120

@@ -26,24 +25,36 @@ The current version is very losely based on the original library and Mr Gilles D
2625
Get Full tree:
2726

2827
```javascript
29-
const { EmberClient } = require('emberplus-connection');
30-
const client = new EmberClient("10.9.8.7", 9000);
31-
client.on("error", e => {
32-
console.log(e);
33-
});
28+
const { EmberClient, StreamManager } = require('emberplus-connection')
29+
const client = new EmberClient('10.9.8.7', 9000)
30+
client.on('error', (e) => {
31+
console.log(e)
32+
})
3433
await client.connect()
34+
35+
// If you want to listen to stream updates - you can do it like this:
36+
client.on('streamUpdate', (internalNodePath, value) => {
37+
console.log('Stream Update:', {
38+
path: internalNodePath,
39+
value: value,
40+
})
41+
// You can get the internal node path, the internal path can be different from the path you requested,
42+
// depending on wheter you request a numbered node or via the description
43+
// the client has a client.getInternalNodePath(node) that you can request and use as reference when subsribing to a node
44+
})
45+
3546
// Get Root info
3647
const req = await client.getDirectory(client.tree)
3748
await req.response
3849
// Get a Specific Node
39-
const node = await client.getElementByPath("0.0.2")
40-
console.log(node);
50+
const node = await client.getElementByPath('0.0.2')
51+
console.log(node)
4152
// Get a node by its path identifiers
42-
const node2 = await client.getElementByPath("path.to.node"))
43-
console.log(node2);
53+
const node2 = await client.getElementByPath('path.to.node')
54+
console.log(node2)
4455
// Get a node by its path descriptions
45-
const node3 = await client.getElementByPath("descr1.descr2.descr3"))
46-
console.log(node3);
56+
const node3 = await client.getElementByPath('descr1.descr2.descr3')
57+
console.log(node3)
4758
// Expand entire tree under node 0
4859
await client.expand(client.tree)
4960
console.log(client.tree)
@@ -70,6 +81,10 @@ client
7081
})
7182
.then(() => client.getElementByPath('0.2'))
7283
.then(async (node) => {
84+
// You can get the internal node path, the internal path can be different from the requested,
85+
// depending on wheter you request a numbered node or via the description
86+
console.log('This is the internal node path :', client.getInternalNodePath(node))
87+
7388
// For non-streams a getDirectory will automatically subscribe for update
7489
return (
7590
await client.getDirectory(node, (update) => {
@@ -84,14 +99,22 @@ client
8499
console.log(update)
85100
})
86101
)
102+
client.on('streamUpdate', (internalNodePath, value) => {
103+
console.log('Stream Update:', {
104+
path: internalNodePath,
105+
value: value,
106+
})
107+
})
87108
```
88109

89110
### Setting New Value
90111

91112
```javascript
92113
client = new EmberClient(LOCALHOST, PORT)
93114
await client.connect()
94-
await (await client.getDirectory()).response
115+
await (
116+
await client.getDirectory()
117+
).response
95118
const req = await client.setValue(await client.getElementByPath('0.0.1'), 'gdnet')
96119
await req.response
97120
console.log('result', req.response)
@@ -107,7 +130,9 @@ const { EmberClient, EmberLib } = require('node-emberplus')
107130

108131
const client = new EmberClient(HOST, PORT)
109132
await client.connect()
110-
await (await client.getDirectory()).response
133+
await (
134+
await client.getDirectory()
135+
).response
111136
const fn = await client.getElementByPath('path.to.function')
112137
const req = await client.invoke(fn, 1, 2, 3)
113138
console.log('result', await req.response)
@@ -126,7 +151,7 @@ const {
126151
ParameterAccess,
127152
MatrixImpl,
128153
MatrixType,
129-
MatrixAddressingMode
154+
MatrixAddressingMode,
130155
} = require('emberplus-connection')
131156

132157
const s = new EmberServer(9000) // start server on port 9000
@@ -187,14 +212,14 @@ const tree = {
187212
undefined,
188213
ParameterAccess.ReadWrite
189214
)
190-
)
215+
),
191216
}),
192217

193218
2: new NumberedTreeNodeImpl(2, new EmberNodeImpl('Functions', undefined, undefined, true), {
194219
1: new NumberedTreeNodeImpl(
195220
1,
196221
new EmberFunctionImpl(undefined, undefined) //, [{ type: ParameterType.Boolean, name: 'Test' }])
197-
)
222+
),
198223
}),
199224

200225
3: new NumberedTreeNodeImpl(3, new EmberNodeImpl('Matrices', undefined, undefined, true), {
@@ -211,13 +236,14 @@ const tree = {
211236
5,
212237
5
213238
)
214-
)
215-
})
216-
})
239+
),
240+
}),
241+
}),
217242
}
218243

219244
s.init(tree) // initiate the provider with the tree
220245
```
246+
221247
---
222248

223249
_The NRK logo is a registered trademark of Norsk rikskringkasting AS. The license does not grant any right to use, in any way, any trademarks, service marks or logos of Norsk rikskringkasting AS._
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
// Setting the environment variable DEBUG=emberplus-connection:*
2+
// will show debug information from the emberplus-connection module
3+
4+
process.env.DEBUG = 'emberplus-connection:*'
5+
// Note: it's also possible to only log parts of the module by using a subset of the debug name,
6+
// 'emberplus-connection:S101Client' // for the S101Client class
7+
// 'emberplus-connection:S101Codec' // for the S101Codec class
8+
// 'emberplus-connection:StreamManager' // for the StreamManager class
9+
10+
const { EmberClient } = require('../dist/index')
11+
12+
//-------------------------------------------------------------------------
13+
// Client
14+
// log output from lawo_mc2_fader_metering_mock.js
15+
// ------------------------------------------------------------------------
16+
17+
const client = new EmberClient('192.168.1.67', 9000)
18+
let node1InternalNodePath = ''
19+
20+
client.on('disconnected', () => {
21+
console.error('Client Lost Ember connection')
22+
client.tree = []
23+
})
24+
25+
// Handle successful connection
26+
client.on('connected', () => {
27+
console.log('Client Found Ember connection')
28+
client.tree = []
29+
30+
client
31+
.getDirectory(client.tree)
32+
.then((req) => {
33+
console.log(' Req:', req)
34+
return req.response
35+
})
36+
.then(() => {
37+
console.log(' Getting node...')
38+
39+
const path_1 = 'Channels.Inputs._1.Metering.Main Level'
40+
return client.getElementByPath(path_1)
41+
})
42+
.then((node1) => {
43+
if (!node1) {
44+
throw new Error(' Could not find node 1')
45+
}
46+
console.log('Found node:', node1)
47+
node1InternalNodePath = client.getInternalNodePath(node1)
48+
49+
// Subscribe to changes
50+
client.subscribe(node1, (node1) => {
51+
const value = node1.contents
52+
console.log('Node 1 subscription :', value)
53+
})
54+
})
55+
.catch((error) => {
56+
console.error(' Error:', error)
57+
})
58+
})
59+
client.on('streamUpdate', (internalNodePath, value) => {
60+
if (internalNodePath !== node1InternalNodePath) {
61+
return
62+
}
63+
console.log('Stream Update:', {
64+
path: internalNodePath,
65+
value: value,
66+
})
67+
})
68+
69+
console.log('-----------------------------------------------------------------------------')
70+
console.log('log output from mc2_fader_metering_example.js')
71+
console.log('Connecting to Client...')
72+
client.connect().catch((error) => {
73+
console.error('Client 2 Error when connecting:', error)
74+
})

src/Ember/Client/StreamManager.ts

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
import { EventEmitter } from 'eventemitter3'
2+
import { Parameter, ParameterType } from '../../model/Parameter'
3+
import { EmberValue } from '../../types'
4+
import { Collection } from '../../types/types'
5+
import { StreamEntry } from '../../model'
6+
7+
import Debug from 'debug'
8+
9+
const debug = Debug('emberplus-connection:StreamManager')
10+
11+
export type StreamManagerEvents = {
12+
streamUpdate: [path: string, value: EmberValue]
13+
}
14+
15+
interface StreamInfo {
16+
parameter: Parameter
17+
path: string
18+
streamIdentifier: number
19+
offset: number
20+
}
21+
22+
export class StreamManager extends EventEmitter<StreamManagerEvents> {
23+
/** Maps path -> StreamInfo */
24+
private registeredStreams: Map<string, StreamInfo> = new Map()
25+
26+
/** Maps streamIdentifier -> Set<path> */
27+
private streamsByIdentifier: Map<number, Set<string>> = new Map() // Lookup by identifier for O(1) access
28+
29+
constructor() {
30+
super()
31+
}
32+
33+
public registerParameter(parameter: Parameter, path: string): void {
34+
if (!parameter.streamIdentifier) {
35+
debug('Warning: Attempted to register parameter without streamIdentifier')
36+
return
37+
}
38+
// Check if already registered
39+
if (this.registeredStreams.has(path)) {
40+
debug('Stream already registered:', {
41+
path,
42+
identifier: parameter.streamIdentifier,
43+
})
44+
return
45+
}
46+
47+
const streamInfo: StreamInfo = {
48+
parameter,
49+
path,
50+
streamIdentifier: parameter.streamIdentifier,
51+
offset: parameter.streamDescriptor?.offset || 0,
52+
}
53+
54+
// Store both mappings
55+
this.registeredStreams.set(path, streamInfo)
56+
57+
// Add to identifier lookup
58+
if (!this.streamsByIdentifier.has(parameter.streamIdentifier)) {
59+
this.streamsByIdentifier.set(parameter.streamIdentifier, new Set())
60+
debug('Registered new stream identifier and adding set:', parameter.streamIdentifier)
61+
}
62+
this.streamsByIdentifier.get(parameter.streamIdentifier)?.add(path)
63+
64+
debug('Registered new stream:', {
65+
path,
66+
identifier: parameter.streamIdentifier,
67+
totalRegistered: this.registeredStreams.size,
68+
})
69+
}
70+
71+
public unregisterParameter(path: string): void {
72+
const streamInfo = this.registeredStreams.get(path)
73+
if (streamInfo?.streamIdentifier) {
74+
// Clean up both maps
75+
this.registeredStreams.delete(path)
76+
const paths = this.streamsByIdentifier.get(streamInfo.streamIdentifier)
77+
if (paths) {
78+
paths.delete(path)
79+
if (paths.size === 0) {
80+
this.streamsByIdentifier.delete(streamInfo.streamIdentifier)
81+
}
82+
}
83+
84+
debug('Unregistered stream:', {
85+
path: path,
86+
identifier: streamInfo.parameter.identifier,
87+
})
88+
}
89+
}
90+
91+
public getStreamInfoByPath(path: string): StreamInfo | undefined {
92+
return this.registeredStreams.get(path)
93+
}
94+
95+
public hasStream(identifier: string): boolean {
96+
return this.registeredStreams.has(identifier)
97+
}
98+
99+
public updateStreamValues(streamEntries: Collection<StreamEntry>): void {
100+
Object.values<StreamEntry>(streamEntries).forEach((streamEntry) => {
101+
// O(1) lookup by identifier
102+
const paths = this.streamsByIdentifier.get(streamEntry.identifier)
103+
104+
if (!paths) {
105+
debug('Received update for unregistered stream:', streamEntry.identifier)
106+
return
107+
}
108+
109+
// Process each matching stream
110+
paths.forEach((path) => {
111+
const streamInfo = this.registeredStreams.get(path)
112+
if (!streamInfo || !streamEntry.value) return
113+
114+
if (streamEntry.value.type === ParameterType.Integer) {
115+
this.updateStreamValue(path, streamEntry.value.value)
116+
} else if (streamEntry.value.type === ParameterType.Octets && Buffer.isBuffer(streamEntry.value.value)) {
117+
const buffer = streamEntry.value.value
118+
if (buffer.length >= streamInfo.offset + 4) {
119+
const view = new DataView(buffer.buffer, buffer.byteOffset, buffer.length)
120+
const decodedValue = view.getFloat32(streamInfo.offset, true)
121+
this.updateStreamValue(path, decodedValue)
122+
}
123+
} // Note: we've never seen any other type of stream value, so not implementing for now.
124+
})
125+
})
126+
}
127+
128+
private updateStreamValue(path: string, value: EmberValue): void {
129+
if (path) {
130+
const streamInfo = this.registeredStreams.get(path)
131+
if (streamInfo) {
132+
streamInfo.parameter.value = value
133+
this.emit('streamUpdate', path, value)
134+
}
135+
}
136+
}
137+
138+
public getAllRegisteredPaths(): string[] {
139+
return Array.from(this.registeredStreams.keys())
140+
}
141+
142+
// Debug helper
143+
public printStreamState(): void {
144+
debug('\nCurrent Stream State:')
145+
debug('Registered Streams:')
146+
this.registeredStreams.forEach((info, path) => {
147+
debug(` Path: ${path}`)
148+
debug(` Identifier: ${info.parameter.identifier}`)
149+
debug(` StreamId: ${info.parameter.streamIdentifier}`)
150+
debug(` Current Value: ${info.parameter.value}`)
151+
})
152+
}
153+
}

0 commit comments

Comments
 (0)