Skip to content

Commit 1fafcda

Browse files
authored
Overhaul Proxy events (#20)
* overhaul proxy events * comments
1 parent 7006dd1 commit 1fafcda

File tree

12 files changed

+239
-98
lines changed

12 files changed

+239
-98
lines changed

mpd-web-proxy/Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ WORKDIR /app
77
ARG VERSION='development'
88

99
COPY go.mod go.sum ./
10+
RUN go mod download
1011
COPY *.go ./
1112
COPY gpio/ ./gpio/
1213
COPY art/ ./art/

mpd-web-proxy/config.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ import "os"
44

55
var (
66
MpdAuthority string
7-
PinFile string
8-
BindAddr string
9-
MusicDir string
7+
PinFile string
8+
BindAddr string
9+
MusicDir string
1010
)
1111

1212
func init() {

mpd-web-proxy/events.go

Lines changed: 107 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package main
33
import (
44
"bufio"
55
"context"
6+
"errors"
67
"fmt"
78
"io"
89
"log/slog"
@@ -16,80 +17,132 @@ type EventType int
1617
const (
1718
EventTypeUnset EventType = iota
1819
EventTypePing
20+
EventTypeServer
1921
EventTypeMPD
2022
)
2123

22-
// the MPDIdler continually calls the `idle` MPD
23-
// command and sends idle events on the returned
24-
// string channel.
25-
func startMPDIdler(mpd net.Conn) chan string {
26-
eventC := make(chan string)
27-
go func() {
28-
slog.Debug("start MPD Idler")
29-
defer slog.Debug("end MPD Idler")
30-
defer close(eventC)
31-
sc := bufio.NewScanner(mpd)
32-
// MPD Header
33-
sc.Scan()
24+
// An Event contains information about something
25+
// that happened on the proxy that may be of interest
26+
// to the frontend.
27+
type Event struct {
28+
Type EventType
29+
Payload string
30+
}
31+
32+
// SSEPayload formats the given Event
33+
// as a Server-Sent Event payload
34+
func (ev Event) SSEPayload() string {
35+
var sb strings.Builder
36+
var typ string
37+
switch ev.Type {
38+
case EventTypePing:
39+
typ = "ping"
40+
case EventTypeMPD:
41+
typ = "mpd"
42+
case EventTypeServer:
43+
typ = "server"
44+
default:
45+
typ = ""
46+
}
47+
if typ != "" {
48+
fmt.Fprintf(&sb, "event: %s\n", typ)
49+
}
50+
fmt.Fprintf(&sb, "data: %s\n", ev.Payload)
51+
return sb.String() + "\n"
52+
}
3453

54+
// The MPPIdler opens a connection to the MPD
55+
// server and receives real-time updates using the
56+
// `idle` command. All updates returned to the idler
57+
// are published to the given topic as an `mpd:{type}`
58+
// event.
59+
//
60+
// If the "idle" loop fails due to a dropped connection
61+
// or otherwise, the idler logs a "server:mpd-connection-lost"
62+
// event and tries to re-create the connection. After re-
63+
// connecting, the idler sends a `server:mpd-connected`
64+
// event and continues idling.
65+
func MPDIdler(tpc *Topic[Event]) {
66+
oneRound := func() error {
67+
slog.Info("start idler")
68+
defer slog.Info("stop idler")
69+
mpd, err := net.Dial("tcp", MpdAuthority)
70+
if err != nil {
71+
return err
72+
}
73+
defer mpd.Close()
74+
tpc.Publish(Event{
75+
Type: EventTypeServer,
76+
Payload: "mpd-connected",
77+
})
78+
return mpdIdle(mpd, tpc)
79+
}
80+
for {
81+
err := oneRound()
82+
tpc.Publish(Event{
83+
Type: EventTypeServer,
84+
Payload: "mpd-connection-lost",
85+
})
86+
slog.Error("idler exited", "error", err)
87+
time.Sleep(2 * time.Second)
88+
}
89+
}
90+
91+
// the MPDIdler continually calls the `idle` MPD
92+
// command and sends idle events on the returned
93+
// string channel.
94+
func mpdIdle(mpd net.Conn, tpc *Topic[Event]) error {
95+
rd := bufio.NewReader(mpd)
96+
_, err := rd.ReadString('\n')
97+
if err != nil {
98+
return err
99+
}
100+
for {
101+
_, err := io.WriteString(mpd, "idle\n")
102+
if err != nil {
103+
return err
104+
}
35105
for {
36-
io.WriteString(mpd, "idle\n")
37-
for sc.Scan() {
38-
line := sc.Text()
39-
if line == "OK" {
40-
break
41-
}
42-
parts := strings.SplitN(line, ":", 2)
43-
if len(parts) != 2 {
44-
slog.Error(fmt.Sprintf("unexpected string: %s\n", line))
45-
return
46-
}
47-
ev := strings.TrimSpace(parts[1])
48-
slog.Debug(fmt.Sprint("sending event:", ev))
49-
eventC <- ev
106+
line, err := rd.ReadString('\n')
107+
if err != nil {
108+
return err
50109
}
51-
if err := sc.Err(); err != nil {
52-
if _, ok := err.(*net.OpError); !ok {
53-
slog.Error(fmt.Sprintf("unexpected error: %T %s\n", err, err))
54-
}
55-
return
110+
line = strings.TrimSpace(line)
111+
if line == "OK" {
112+
break
113+
}
114+
parts := strings.SplitN(line, ":", 2)
115+
if len(parts) != 2 {
116+
return errors.New("MPD returned strange response: " + line)
56117
}
118+
ev := strings.TrimSpace(parts[1])
119+
tpc.Publish(Event{
120+
Type: EventTypeMPD,
121+
Payload: ev,
122+
})
57123
}
58-
}()
59-
return eventC
60-
}
61-
62-
// An Event is any asynchronous event
63-
// that should be sent to a listening client.
64-
type Event struct {
65-
Type EventType
66-
Data string
124+
}
67125
}
68126

69-
// getEvents starts a goroutine which emits events
70-
// to send to the client. Events include:
71-
// - any string sent on the provided string channel is sent as an MPDEvent
72-
// - the emitter produces a "ping" event that clients can use as a heartbeat.
73-
// The emitter exits and closes the returned channel when the given context
74-
// is cancelled.
75-
func getEvents(ts chan string, ctx context.Context) chan Event {
127+
// getEvents wraps the given Topic[Event] with a ticker
128+
// that sends a `ping` event every 5 seconds.
129+
func getEvents(tpc *Topic[Event], ctx context.Context) chan Event {
76130
ret := make(chan Event)
77-
ticker := time.NewTicker(5 * time.Second)
78131
go func() {
79132
defer close(ret)
133+
ts := tpc.Subscribe()
134+
defer tpc.Unsubscribe(ts)
135+
ticker := time.NewTicker(5 * time.Second)
80136
defer ticker.Stop()
81137
ret <- Event{Type: EventTypePing}
82138
for {
83139
select {
140+
case <-ctx.Done():
141+
return
84142
case <-ticker.C:
85143
ret <- Event{Type: EventTypePing}
86144
case t := <-ts:
87-
if t == "" {
88-
return
89-
}
90-
ret <- Event{Type: EventTypeMPD, Data: t}
91-
case <-ctx.Done():
92-
return
145+
ret <- t
93146
}
94147
}
95148
}()

mpd-web-proxy/gpio/arm.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
//go:build arm
1+
//go:build arm || arm64
22
package gpio
33

44
import (

mpd-web-proxy/logging.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ import (
1010
// the status code returned from an HTTP handler.
1111
//
1212
// XXX - this is a barebones handler that does not implement
13-
// flushing, hijacking or any of the other http interfaces.
13+
//
14+
// flushing, hijacking or any of the other http interfaces.
1415
type StatusCaptureRW struct {
1516
http.ResponseWriter
1617
status int
@@ -30,17 +31,16 @@ func loggingMiddleware(next http.Handler) http.Handler {
3031
sc := NewStatusCaptureRW(rw)
3132
next.ServeHTTP(sc, req)
3233

33-
line := fmt.Sprintf("%s %s %d", req.RemoteAddr, req.URL.String(), sc.status)
34+
line := fmt.Sprintf("%s %s %d", req.RemoteAddr, req.URL.String(), sc.status)
3435

3536
switch sc.status / 100 {
36-
case 3, 4:
37-
slog.Warn(line)
38-
case 5:
39-
slog.Error(line)
40-
default:
41-
slog.Info(line)
37+
case 3, 4:
38+
slog.Warn(line)
39+
case 5:
40+
slog.Error(line)
41+
default:
42+
slog.Info(line)
4243
}
4344
}
4445
return http.HandlerFunc(f)
4546
}
46-

mpd-web-proxy/main.go

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,9 @@ const (
2626
)
2727

2828
type Server struct {
29-
Pins []gpio.Pin
30-
PinState *gpio.PinState
29+
Pins []gpio.Pin
30+
PinState *gpio.PinState
31+
MPDStateTopic *Topic[Event]
3132
}
3233

3334
func (s *Server) WriteResponse(wr io.Writer) error {
@@ -102,7 +103,7 @@ func WriteBadRequest(rw http.ResponseWriter, msg string) {
102103
//
103104
// Additionally, the endpoint will deliver "ping" events on
104105
// a set interval as a sort of heartbeat.
105-
func MpdEvents(rw http.ResponseWriter, req *http.Request) {
106+
func (s *Server) MpdEvents(rw http.ResponseWriter, req *http.Request) {
106107
slog.Info(fmt.Sprintf("%s %s", req.RemoteAddr, req.URL))
107108
defer slog.Info(fmt.Sprintf("%s %s CLIENT EXIT", req.RemoteAddr, req.URL))
108109

@@ -118,18 +119,8 @@ func MpdEvents(rw http.ResponseWriter, req *http.Request) {
118119
io.Copy(io.Discard, req.Body)
119120
}()
120121

121-
mpd := Must(net.Dial("tcp", MpdAuthority))
122-
defer mpd.Close()
123-
events := startMPDIdler(mpd)
124-
for ev := range getEvents(events, req.Context()) {
125-
var eventPayload string
126-
switch ev.Type {
127-
case EventTypePing:
128-
eventPayload = "event: ping\ndata: hello\n\n"
129-
case EventTypeMPD:
130-
slog.Debug(fmt.Sprintf("MPD event: %s\n", ev.Data))
131-
eventPayload = fmt.Sprintf("data: %s\n\n", ev.Data)
132-
}
122+
for ev := range getEvents(s.MPDStateTopic, req.Context()) {
123+
eventPayload := ev.SSEPayload()
133124
_, err := io.WriteString(rw, eventPayload)
134125
if err != nil {
135126
slog.Error(
@@ -222,15 +213,17 @@ func AlbumArt(rw http.ResponseWriter, req *http.Request) {
222213
func httpServer(s *Server) {
223214
var nonEventMux http.ServeMux
224215
nonEventMux.HandleFunc("/go/version", func(w http.ResponseWriter, r *http.Request) {
225-
json.NewEncoder(w).Encode(struct{Version string `json:"version"`}{version.Version})
216+
json.NewEncoder(w).Encode(struct {
217+
Version string `json:"version"`
218+
}{version.Version})
226219
})
227220
nonEventMux.HandleFunc("/go/cmd", MpdCommand)
228221
nonEventMux.HandleFunc("/go/mpd/version", MpdVersion)
229222
nonEventMux.HandleFunc("/go/channels", s.Channels)
230223
nonEventMux.HandleFunc("/go/art/{albumartist}/{album}", AlbumArt)
231224

232225
http.Handle("/go/", loggingMiddleware(&nonEventMux))
233-
http.HandleFunc("/go/events", MpdEvents)
226+
http.HandleFunc("/go/events", s.MpdEvents)
234227

235228
http.ListenAndServe(BindAddr, nil)
236229
}
@@ -251,5 +244,7 @@ func main() {
251244
var s Server
252245
s.PinState = ps
253246
s.Pins = pins
247+
s.MPDStateTopic = NewTopic[Event]()
248+
go MPDIdler(s.MPDStateTopic)
254249
httpServer(&s)
255250
}

mpd-web-proxy/topic.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package main
2+
3+
import (
4+
"log/slog"
5+
"sync"
6+
)
7+
8+
type Topic[T any] struct {
9+
mu *sync.Mutex
10+
subscribers map[chan<- T]struct{}
11+
}
12+
13+
func NewTopic[T any]() *Topic[T] {
14+
return &Topic[T]{
15+
mu: new(sync.Mutex),
16+
subscribers: make(map[chan<- T]struct{}),
17+
}
18+
}
19+
20+
func (t *Topic[T]) Subscribe() chan T {
21+
t.mu.Lock()
22+
defer t.mu.Unlock()
23+
myChan := make(chan T)
24+
t.subscribers[myChan] = struct{}{}
25+
slog.Debug("client subscribed", "chan", myChan)
26+
return myChan
27+
}
28+
29+
func (t *Topic[T]) Unsubscribe(myChan chan<- T) {
30+
t.mu.Lock()
31+
defer t.mu.Unlock()
32+
slog.Debug("client unsubscribed", "chan", myChan)
33+
delete(t.subscribers, myChan)
34+
close(myChan)
35+
}
36+
37+
func (t *Topic[T]) Publish(ev T) {
38+
t.mu.Lock()
39+
defer t.mu.Unlock()
40+
slog.Debug("publish event", "event", ev, "to", t.subscribers)
41+
for c := range t.subscribers {
42+
c <- ev
43+
}
44+
}

web/src/App/App.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import FunPage from "../Fun";
2121
import NavButton from "../NavButton";
2222
import VolDown from "../icons/vol_down.svg";
2323
import VolUp from "../icons/vol_up.svg";
24-
import { ConnectionContext } from "./Context";
24+
import { ConnectionContext, NotConnectedMsg } from "./Context";
2525
import { QueueContext } from "../Queue/Context";
2626
import { PlaybackContext } from "../PlaybackControls/Context";
2727

@@ -63,9 +63,9 @@ export default function App() {
6363
<Link_ to="/web/stats">Settings {dbUpdating ? " (U)" : ""}</Link_>
6464
<div
6565
className={styles.noConnection}
66-
style={{ display: connected ? "none" : "block" }}
66+
style={{ display: connected === 0 ? "none" : "block" }}
6767
>
68-
Not Connected
68+
{NotConnectedMsg(connected)}
6969
</div>
7070
</div>
7171
</div>

0 commit comments

Comments
 (0)