Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 22 additions & 14 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type cap struct {
// much simpler.
type Client struct {
*Conn
rwc io.ReadWriteCloser
config ClientConfig

// Internal state
Expand All @@ -63,9 +64,10 @@ type Client struct {
}

// NewClient creates a client given an io stream and a client config.
func NewClient(rw io.ReadWriter, config ClientConfig) *Client {
func NewClient(rwc io.ReadWriteCloser, config ClientConfig) *Client {
c := &Client{
Conn: NewConn(rw),
Conn: NewConn(rwc),
rwc: rwc,
config: config,
errChan: make(chan error, 1),
caps: make(map[string]cap),
Expand Down Expand Up @@ -238,25 +240,30 @@ func (c *Client) sendError(err error) {
}
}

func (c *Client) startReadLoop(wg *sync.WaitGroup) {
func (c *Client) startReadLoop(wg *sync.WaitGroup, exiting chan struct{}) {
wg.Add(1)

go func() {
defer wg.Done()

for {
m, err := c.ReadMessage()
if err != nil {
c.sendError(err)
break
}
select {
case <-exiting:
return
default:
m, err := c.ReadMessage()
if err != nil {
c.sendError(err)
break
}

if f, ok := clientFilters[m.Command]; ok {
f(c, m)
}
if f, ok := clientFilters[m.Command]; ok {
f(c, m)
}

if c.config.Handler != nil {
c.config.Handler.Handle(c, m)
if c.config.Handler != nil {
c.config.Handler.Handle(c, m)
}
}
}

Expand Down Expand Up @@ -296,7 +303,7 @@ func (c *Client) RunContext(ctx context.Context) error {

// Now that the handshake is pretty much done, we can start listening for
// messages.
c.startReadLoop(&wg)
c.startReadLoop(&wg, exiting)

// Wait for an error from any goroutine or for the context to time out, then
// signal we're exiting and wait for the goroutines to exit.
Expand All @@ -307,6 +314,7 @@ func (c *Client) RunContext(ctx context.Context) error {
}

close(exiting)
c.rwc.Close()
wg.Wait()

return err
Expand Down
12 changes: 12 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,4 +408,16 @@ func TestPingLoop(t *testing.T) {
SendLine("001 :hello_world\r\n"),
Delay(25 * time.Millisecond),
})

// See if we can get the client to hang
runClientTest(t, config, errors.New("test error"), nil, []TestAction{
ExpectLine("PASS :test_pass\r\n"),
ExpectLine("NICK :test_nick\r\n"),
ExpectLine("USER test_user 0 * :test_name\r\n"),
// We queue this up a line early because the next write will happen after the delay.
QueueWriteError(errors.New("test error")),
SendLine("001 :hello_world\r\n"),
Delay(2 * time.Second),
AssertClosed(),
})
}
25 changes: 24 additions & 1 deletion stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@ func SendLine(output string) TestAction {
return SendLineWithTimeout(output, 1*time.Second)
}

func AssertClosed() TestAction {
return func(t *testing.T, rw *testReadWriter) {
if !rw.closed {
assert.Fail(t, "Expected conn to be closed")
}
}
}

func SendLineWithTimeout(output string, timeout time.Duration) TestAction {
return func(t *testing.T, rw *testReadWriter) {
waitChan := time.After(timeout)
Expand Down Expand Up @@ -116,6 +124,7 @@ type testReadWriter struct {
readEmptyChan chan struct{}
exiting chan struct{}
clientDone chan struct{}
closed bool
serverBuffer bytes.Buffer
}

Expand Down Expand Up @@ -182,6 +191,20 @@ func (rw *testReadWriter) Write(buf []byte) (int, error) {
}
}

func (rw *testReadWriter) Close() error {
select {
case <-rw.exiting:
return errors.New("Connection closed")
default:
// Ensure no double close
if !rw.closed {
rw.closed = true
close(rw.exiting)
}
return nil
}
}

func newTestReadWriter(actions []TestAction) *testReadWriter {
return &testReadWriter{
actions: actions,
Expand Down Expand Up @@ -223,7 +246,7 @@ func runTest(t *testing.T, rw *testReadWriter, actions []TestAction) {
// TODO: Make sure there are no more incoming messages

// Ask everything to shut down
close(rw.exiting)
rw.Close()

// Wait for the client to stop
select {
Expand Down