Skip to content

Commit f5d11df

Browse files
committed
API: Review DataChannel
Resolves #427
1 parent bfba811 commit f5d11df

File tree

10 files changed

+96
-215
lines changed

10 files changed

+96
-215
lines changed

datachannel.go

Lines changed: 33 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"sync"
77

88
"github.com/pions/datachannel"
9-
sugar "github.com/pions/webrtc/pkg/datachannel"
109
"github.com/pions/webrtc/pkg/rtcerr"
1110
"github.com/pkg/errors"
1211
)
@@ -86,12 +85,10 @@ type DataChannel struct {
8685
// "blob". This attribute controls how binary data is exposed to scripts.
8786
// binaryType string
8887

89-
// OnOpen func()
9088
// OnBufferedAmountLow func()
9189
// OnError func()
92-
// OnClose func()
9390

94-
onMessageHandler func(sugar.Payload)
91+
onMessageHandler func(DataChannelMessage)
9592
onOpenHandler func()
9693
onCloseHandler func()
9794

@@ -265,35 +262,36 @@ func (d *DataChannel) onClose() (done chan struct{}) {
265262
return
266263
}
267264

268-
// OnMessage sets an event handler which is invoked on a message
269-
// arrival over the sctp transport from a remote peer.
265+
// DataChannelMessage represents a message received from the
266+
// data channel. IsString will be set to true if the incoming
267+
// message is of the string type. Otherwise the message is of
268+
// a binary type.
269+
type DataChannelMessage struct {
270+
IsString bool
271+
Data []byte
272+
}
273+
274+
// OnMessage sets an event handler which is invoked on a binary
275+
// message arrival over the sctp transport from a remote peer.
270276
// OnMessage can currently receive messages up to 16384 bytes
271277
// in size. Check out the detach API if you want to use larger
272278
// message sizes. Note that browser support for larger messages
273279
// is also limited.
274-
func (d *DataChannel) OnMessage(f func(p sugar.Payload)) {
280+
func (d *DataChannel) OnMessage(f func(msg DataChannelMessage)) {
275281
d.mu.Lock()
276282
defer d.mu.Unlock()
277283
d.onMessageHandler = f
278284
}
279285

280-
func (d *DataChannel) onMessage(p sugar.Payload) {
286+
func (d *DataChannel) onMessage(msg DataChannelMessage) {
281287
d.mu.RLock()
282288
hdlr := d.onMessageHandler
283289
d.mu.RUnlock()
284290

285-
if hdlr == nil || p == nil {
291+
if hdlr == nil {
286292
return
287293
}
288-
hdlr(p)
289-
}
290-
291-
// Onmessage sets an event handler which is invoked on a message
292-
// arrival over the sctp transport from a remote peer.
293-
//
294-
// Deprecated: use OnMessage instead.
295-
func (d *DataChannel) Onmessage(f func(p sugar.Payload)) {
296-
d.OnMessage(f)
294+
hdlr(msg)
297295
}
298296

299297
func (d *DataChannel) handleOpen(dc *datachannel.DataChannel) {
@@ -331,39 +329,38 @@ func (d *DataChannel) readLoop() {
331329
return
332330
}
333331

334-
if isString {
335-
d.onMessage(&sugar.PayloadString{Data: buffer[:n]})
336-
continue
337-
}
338-
d.onMessage(&sugar.PayloadBinary{Data: buffer[:n]})
332+
d.onMessage(DataChannelMessage{Data: buffer[:n], IsString: isString})
339333
}
340334
}
341335

342-
// Send sends the passed message to the DataChannel peer
343-
func (d *DataChannel) Send(payload sugar.Payload) error {
336+
// Send sends the binary message to the DataChannel peer
337+
func (d *DataChannel) Send(data []byte) error {
344338
err := d.ensureOpen()
345339
if err != nil {
346340
return err
347341
}
348342

349-
var data []byte
350-
isString := false
343+
if len(data) == 0 {
344+
data = []byte{0}
345+
}
351346

352-
switch p := payload.(type) {
353-
case sugar.PayloadString:
354-
data = p.Data
355-
isString = true
356-
case sugar.PayloadBinary:
357-
data = p.Data
358-
default:
359-
return errors.Errorf("unknown DataChannel Payload (%s)", payload.PayloadType())
347+
_, err = d.dataChannel.WriteDataChannel(data, false)
348+
return err
349+
}
350+
351+
// SendText sends the text message to the DataChannel peer
352+
func (d *DataChannel) SendText(s string) error {
353+
err := d.ensureOpen()
354+
if err != nil {
355+
return err
360356
}
361357

358+
data := []byte(s)
362359
if len(data) == 0 {
363360
data = []byte{0}
364361
}
365362

366-
_, err = d.dataChannel.WriteDataChannel(data, isString)
363+
_, err = d.dataChannel.WriteDataChannel(data, true)
367364
return err
368365
}
369366

datachannel_ortc_test.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"time"
66

77
"github.com/pions/transport/test"
8-
"github.com/pions/webrtc/pkg/datachannel"
98
)
109

1110
func TestDataChannel_ORTCE2E(t *testing.T) {
@@ -26,11 +25,11 @@ func TestDataChannel_ORTCE2E(t *testing.T) {
2625
awaitBinary := make(chan struct{})
2726
stackB.sctp.OnDataChannel(func(d *DataChannel) {
2827
close(awaitSetup)
29-
d.OnMessage(func(payload datachannel.Payload) {
30-
switch payload.(type) {
31-
case *datachannel.PayloadString:
28+
29+
d.OnMessage(func(msg DataChannelMessage) {
30+
if msg.IsString {
3231
close(awaitString)
33-
case *datachannel.PayloadBinary:
32+
} else {
3433
close(awaitBinary)
3534
}
3635
})
@@ -52,11 +51,11 @@ func TestDataChannel_ORTCE2E(t *testing.T) {
5251

5352
<-awaitSetup
5453

55-
err = channelA.Send(datachannel.PayloadString{Data: []byte("ABC")})
54+
err = channelA.SendText("ABC")
5655
if err != nil {
5756
t.Fatal(err)
5857
}
59-
err = channelA.Send(datachannel.PayloadBinary{Data: []byte("ABC")})
58+
err = channelA.Send([]byte("ABC"))
6059
if err != nil {
6160
t.Fatal(err)
6261
}

datachannel_test.go

Lines changed: 20 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@ import (
88
"testing"
99
"time"
1010

11-
sugar "github.com/pions/webrtc/pkg/datachannel"
12-
1311
"github.com/pions/transport/test"
1412
"github.com/stretchr/testify/assert"
1513
)
@@ -85,20 +83,20 @@ func TestDataChannel_Send(t *testing.T) {
8583
assert.True(t, dc.Ordered, "Ordered should be set to true")
8684

8785
dc.OnOpen(func() {
88-
e := dc.Send(sugar.PayloadString{Data: []byte("Ping")})
86+
e := dc.SendText("Ping")
8987
if e != nil {
9088
t.Fatalf("Failed to send string on data channel")
9189
}
9290
})
93-
dc.OnMessage(func(payload sugar.Payload) {
91+
dc.OnMessage(func(msg DataChannelMessage) {
9492
done <- true
9593
})
9694

9795
answerPC.OnDataChannel(func(d *DataChannel) {
9896
assert.True(t, d.Ordered, "Ordered should be set to true")
9997

100-
d.OnMessage(func(payload sugar.Payload) {
101-
e := d.Send(sugar.PayloadBinary{Data: []byte("Pong")})
98+
d.OnMessage(func(msg DataChannelMessage) {
99+
e := d.Send([]byte("Pong"))
102100
if e != nil {
103101
t.Fatalf("Failed to send string on data channel")
104102
}
@@ -124,43 +122,27 @@ func TestDataChannel_EventHandlers(t *testing.T) {
124122
api := NewAPI()
125123
dc := &DataChannel{api: api}
126124

127-
onOpenCalled := make(chan bool)
128-
onMessageCalled := make(chan bool)
125+
onOpenCalled := make(chan struct{})
126+
onMessageCalled := make(chan struct{})
129127

130128
// Verify that the noop case works
131129
assert.NotPanics(t, func() { dc.onOpen() })
132-
assert.NotPanics(t, func() { dc.onMessage(nil) })
133130

134131
dc.OnOpen(func() {
135-
onOpenCalled <- true
132+
close(onOpenCalled)
136133
})
137134

138-
dc.OnMessage(func(p sugar.Payload) {
139-
go func() {
140-
onMessageCalled <- true
141-
}()
135+
dc.OnMessage(func(p DataChannelMessage) {
136+
close(onMessageCalled)
142137
})
143138

144-
// Verify that the handlers deal with nil inputs
145-
assert.NotPanics(t, func() { dc.onMessage(nil) })
146-
147139
// Verify that the set handlers are called
148140
assert.NotPanics(t, func() { dc.onOpen() })
149-
assert.NotPanics(t, func() { dc.onMessage(&sugar.PayloadString{Data: []byte("o hai")}) })
141+
assert.NotPanics(t, func() { dc.onMessage(DataChannelMessage{Data: []byte("o hai")}) })
150142

151-
allTrue := func(vals []bool) bool {
152-
for _, val := range vals {
153-
if !val {
154-
return false
155-
}
156-
}
157-
return true
158-
}
159-
160-
assert.True(t, allTrue([]bool{
161-
<-onOpenCalled,
162-
<-onMessageCalled,
163-
}))
143+
// Wait for all handlers to be called
144+
<-onOpenCalled
145+
<-onMessageCalled
164146
}
165147

166148
func TestDataChannel_MessagesAreOrdered(t *testing.T) {
@@ -172,34 +154,31 @@ func TestDataChannel_MessagesAreOrdered(t *testing.T) {
172154

173155
max := 512
174156
out := make(chan int)
175-
inner := func(payload sugar.Payload) {
157+
inner := func(msg DataChannelMessage) {
176158
// randomly sleep
177159
// NB: The big.Int/crypto.Rand is overkill but makes the linter happy
178160
randInt, err := rand.Int(rand.Reader, big.NewInt(int64(max)))
179161
if err != nil {
180162
t.Fatalf("Failed to get random sleep duration: %s", err)
181163
}
182164
time.Sleep(time.Duration(randInt.Int64()) * time.Microsecond)
183-
p, ok := payload.(*sugar.PayloadBinary)
184-
if ok {
185-
s, _ := binary.Varint(p.Data)
186-
out <- int(s)
187-
}
165+
s, _ := binary.Varint(msg.Data)
166+
out <- int(s)
188167
}
189-
dc.OnMessage(func(p sugar.Payload) {
168+
dc.OnMessage(func(p DataChannelMessage) {
190169
inner(p)
191170
})
192171

193172
go func() {
194173
for i := 1; i <= max; i++ {
195174
buf := make([]byte, 8)
196175
binary.PutVarint(buf, int64(i))
197-
dc.onMessage(&sugar.PayloadBinary{Data: buf})
176+
dc.onMessage(DataChannelMessage{Data: buf})
198177
// Change the registered handler a couple of times to make sure
199178
// that everything continues to work, we don't lose messages, etc.
200179
if i%2 == 0 {
201-
hdlr := func(p sugar.Payload) {
202-
inner(p)
180+
hdlr := func(msg DataChannelMessage) {
181+
inner(msg)
203182
}
204183
dc.OnMessage(hdlr)
205184
}

examples/data-channels-close/main.go

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"time"
77

88
"github.com/pions/webrtc"
9-
"github.com/pions/webrtc/pkg/datachannel"
109

1110
"github.com/pions/webrtc/examples/internal/signal"
1211
)
@@ -56,9 +55,10 @@ func main() {
5655
cnt := *closeAfter
5756
for range ticker.C {
5857
message := signal.RandSeq(15)
59-
fmt.Printf("Sending %s \n", message)
58+
fmt.Printf("Sending '%s'\n", message)
6059

61-
err := d.Send(datachannel.PayloadString{Data: []byte(message)})
60+
// Send the message as text
61+
err := d.SendText(message)
6262
if err != nil {
6363
panic(err)
6464
}
@@ -76,15 +76,8 @@ func main() {
7676
})
7777

7878
// Register message handling
79-
d.OnMessage(func(payload datachannel.Payload) {
80-
switch p := payload.(type) {
81-
case *datachannel.PayloadString:
82-
fmt.Printf("Message '%s' from DataChannel '%s' payload '%s'\n", p.PayloadType().String(), d.Label, string(p.Data))
83-
case *datachannel.PayloadBinary:
84-
fmt.Printf("Message '%s' from DataChannel '%s' payload '% 02x'\n", p.PayloadType().String(), d.Label, p.Data)
85-
default:
86-
fmt.Printf("Message '%s' from DataChannel '%s' no payload \n", p.PayloadType().String(), d.Label)
87-
}
79+
d.OnMessage(func(msg webrtc.DataChannelMessage) {
80+
fmt.Printf("Message from DataChannel '%s': '%s'\n", d.Label, string(msg.Data))
8881
})
8982
})
9083

examples/data-channels-create/main.go

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"time"
66

77
"github.com/pions/webrtc"
8-
sugar "github.com/pions/webrtc/pkg/datachannel"
98

109
"github.com/pions/webrtc/examples/internal/signal"
1110
)
@@ -46,25 +45,19 @@ func main() {
4645

4746
for range time.NewTicker(5 * time.Second).C {
4847
message := signal.RandSeq(15)
49-
fmt.Printf("Sending %s \n", message)
48+
fmt.Printf("Sending '%s'\n", message)
5049

51-
err := dataChannel.Send(sugar.PayloadString{Data: []byte(message)})
50+
// Send the message as text
51+
err := dataChannel.SendText(message)
5252
if err != nil {
5353
panic(err)
5454
}
5555
}
5656
})
5757

58-
// Register the OnMessage to handle incoming messages
59-
dataChannel.OnMessage(func(payload sugar.Payload) {
60-
switch p := payload.(type) {
61-
case *sugar.PayloadString:
62-
fmt.Printf("Message '%s' from DataChannel '%s' payload '%s'\n", p.PayloadType().String(), dataChannel.Label, string(p.Data))
63-
case *sugar.PayloadBinary:
64-
fmt.Printf("Message '%s' from DataChannel '%s' payload '% 02x'\n", p.PayloadType().String(), dataChannel.Label, p.Data)
65-
default:
66-
fmt.Printf("Message '%s' from DataChannel '%s' no payload \n", p.PayloadType().String(), dataChannel.Label)
67-
}
58+
// Register text message handling
59+
dataChannel.OnMessage(func(msg webrtc.DataChannelMessage) {
60+
fmt.Printf("Message from DataChannel '%s': '%s'\n", dataChannel.Label, string(msg.Data))
6861
})
6962

7063
// Create an offer to send to the browser

0 commit comments

Comments
 (0)