Skip to content

Commit a17c22f

Browse files
authored
refactor: per-register cache to fix stale data on writes (#8)
* refactor: switch cache to per-register storage Cache keys now address individual registers/coils instead of request ranges. This fixes stale data when writes hit registers that are part of a larger cached read range. Key changes: - RegKey(slaveID, fc, addr) for per-register storage - RangeKey(slaveID, fc, addr, qty) for request coalescing - GetRange/SetRange/DeleteRange for batch operations - Rename GetOrFetch to Coalesce (no longer interacts with cache) - Add keepStale flag to prevent cleanup from removing entries needed for stale-serve fallback * refactor: update proxy for per-register cache Decompose upstream responses into per-register cache entries and reassemble from cache on hits. Write invalidation now correctly removes individual registers in the written range, fixing stale data when writes overlap with larger cached read ranges. New helpers: - decomposeResponse: extract per-register values from Modbus PDU - assembleResponse: reconstruct Modbus PDU from cached values - Roundtrip tests for all function codes (registers + coils) - Tests verifying write invalidation of overlapping reads * fix: log cache miss for all callers, not just the fetcher Move the cache miss log before Coalesce so coalesced waiters also get a log entry. The upstream client already logs request completion with duration, so fetches vs coalesced waits are distinguishable. * fix: address copilot review comments - Guard GetRange/GetRangeStale against quantity=0 (false cache hit) - Use shared coilOn/coilOff slices in decomposeResponse to reduce per-coil allocations - Extract cleanupOnce so tests exercise the real keepStale guard instead of manually simulating cleanup * test: cover proxy cache miss and stale fallback Update README and SPEC for the per-register cache design, then add proxy-level tests for miss -> fetch -> store -> hit and stale fallback on upstream errors. Introduce a small upstream client interface so tests can use a mock upstream without a real Modbus connection. * fix: include health check in proxy test client interface Main added Proxy.Healthy, which delegates to the upstream client. The local mockable interface needs to include Healthy so PR merge builds compile against the current base branch.
1 parent 290c895 commit a17c22f

6 files changed

Lines changed: 858 additions & 132 deletions

File tree

README.md

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -134,10 +134,12 @@ docker run --rm -v $(pwd):/app -w /app golang:1.24 go test ./...
134134

135135
## Cache Behavior
136136

137-
- **Key format**: `{slave_id}:{function_code}:{start_address}:{quantity}`
138-
- **Read requests**: Served from cache if available and not expired
139-
- **Write requests**: Forwarded to upstream (if allowed), exact matching cache entries invalidated
140-
- **Request coalescing**: Multiple identical requests during a cache miss share a single upstream fetch
137+
- **Key format**: values are cached per register/coil as `{slave_id}:{function_code}:{address}`
138+
- **Read requests**: Served from cache only if every register/coil in the requested range is present and not expired
139+
- **Cache misses**: If any value in the requested range is missing or expired, the full range is fetched from upstream and decomposed into per-register/coil cache entries
140+
- **Write requests**: Forwarded to upstream (if allowed), then invalidate the written address range so overlapping cached reads cannot return stale values
141+
- **Request coalescing**: Multiple identical range requests during a cache miss share a single upstream fetch using `{slave_id}:{function_code}:{start_address}:{quantity}` as the coalescing key
142+
- **Stale fallback**: If enabled, expired entries are retained and can be served when upstream requests fail
141143

142144
## License
143145

SPEC.md

Lines changed: 69 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -49,28 +49,36 @@ Many Modbus devices (inverters, meters, battery systems) have limited polling ca
4949
### 3. In-Memory Cache
5050

5151
#### Cache Key Structure
52+
53+
Values are cached per register/coil:
54+
```
55+
{slave_id}:{function_code}:{address}
56+
```
57+
58+
Request coalescing still uses the requested range as its key:
5259
```
5360
{slave_id}:{function_code}:{start_address}:{quantity}
5461
```
5562

5663
#### Cache Entry
5764
```go
5865
type CacheEntry struct {
59-
Data []byte
66+
Data []byte // one register (2 bytes) or one coil/input bit (1 byte: 0 or 1)
6067
Timestamp time.Time
6168
TTL time.Duration
6269
}
6370
```
6471

6572
#### Cache Behavior
66-
- **Read Operations**: Check cache first, return if valid (not expired)
67-
- **Write Operations**: Always forward to device, invalidate exact matching cache entries (same slave_id, function_code, start_address, quantity)
73+
- **Read Operations**: Check the per-register/coil cache first. Return from cache only if every value in the requested range is present and not expired.
74+
- **Cache Misses**: If any value in the requested range is missing or expired, fetch the full requested range from upstream, then decompose the response into per-register/coil cache entries.
75+
- **Write Operations**: Always forward to the device when writes are allowed, then invalidate each cached register/coil in the written address range. This prevents overlapping cached read ranges from serving stale values after frequent writes.
6876
- **TTL**: Configurable (default: 10 seconds)
69-
- **Cleanup**: Time-based expiration (entries removed when TTL expires)
70-
- **Staleness**: Option to serve stale data on upstream failure (default: off)
77+
- **Cleanup**: Time-based expiration. Expired entries are removed during cleanup unless stale serving is enabled.
78+
- **Staleness**: Option to serve stale data on upstream failure (default: off). When enabled, expired entries are retained so they remain available for fallback.
7179

7280
### Request Coalescing
73-
- Identical in-flight requests are coalesced (same slave_id, function, address, quantity)
81+
- Identical in-flight range requests are coalesced (same slave_id, function, address, quantity)
7482
- Second request arriving while first is pending will wait for and share the first's response
7583
- Prevents thundering herd on cache miss
7684

@@ -144,47 +152,82 @@ type CachingHandler struct {
144152

145153
```go
146154
type Cache struct {
147-
mu sync.RWMutex
148-
entries map[string]*CacheEntry
149-
ttl time.Duration // default: 10 * time.Second
155+
mu sync.RWMutex
156+
entries map[string]*CacheEntry
157+
defaultTTL time.Duration
158+
keepStale bool
159+
160+
// Request coalescing for identical range requests.
161+
inflight map[string]*inflightRequest
162+
inflightMu sync.Mutex
163+
}
164+
165+
func RegKey(slaveID, functionCode byte, address uint16) string {
166+
return fmt.Sprintf("%d:%d:%d", slaveID, functionCode, address)
150167
}
151168

152-
func (c *Cache) Get(key string) ([]byte, bool) {
169+
func RangeKey(slaveID, functionCode byte, address, quantity uint16) string {
170+
return fmt.Sprintf("%d:%d:%d:%d", slaveID, functionCode, address, quantity)
171+
}
172+
173+
func (c *Cache) GetRange(slaveID, functionCode byte, address, quantity uint16) ([][]byte, bool) {
174+
if quantity == 0 {
175+
return nil, false
176+
}
177+
153178
c.mu.RLock()
154179
defer c.mu.RUnlock()
155-
156-
entry, ok := c.entries[key]
157-
if !ok || time.Since(entry.Timestamp) > entry.TTL {
158-
return nil, false
180+
181+
values := make([][]byte, quantity)
182+
for i := uint16(0); i < quantity; i++ {
183+
entry, ok := c.entries[RegKey(slaveID, functionCode, address+i)]
184+
if !ok || entry.IsExpired() {
185+
return nil, false
186+
}
187+
values[i] = append([]byte(nil), entry.Data...)
188+
}
189+
return values, true
190+
}
191+
192+
func (c *Cache) SetRange(slaveID, functionCode byte, address uint16, values [][]byte) {
193+
c.mu.Lock()
194+
defer c.mu.Unlock()
195+
196+
now := time.Now()
197+
for i, value := range values {
198+
c.entries[RegKey(slaveID, functionCode, address+uint16(i))] = &CacheEntry{
199+
Data: append([]byte(nil), value...),
200+
Timestamp: now,
201+
TTL: c.defaultTTL,
202+
}
159203
}
160-
return entry.Data, true
161204
}
162205

163-
func (c *Cache) Set(key string, data []byte, ttl time.Duration) {
206+
func (c *Cache) DeleteRange(slaveID, functionCode byte, address, quantity uint16) {
164207
c.mu.Lock()
165208
defer c.mu.Unlock()
166-
167-
c.entries[key] = &CacheEntry{
168-
Data: data,
169-
Timestamp: time.Now(),
170-
TTL: ttl,
209+
210+
for i := uint16(0); i < quantity; i++ {
211+
delete(c.entries, RegKey(slaveID, functionCode, address+i))
171212
}
172213
}
173214
```
174215

216+
The cache also exposes `Coalesce(ctx, rangeKey, fetch)` for request coalescing. It does not read or write cache entries directly; the proxy performs cache lookups and stores decomposed responses.
217+
175218
### Request Flow
176219

177220
1. Client sends Modbus TCP request
178221
2. Parse request: extract slave ID, function code, address, quantity
179222
3. **For reads**:
180-
- Build cache key
181-
- Check cache → if hit & valid, return cached data
182-
- On miss: forward to upstream device
183-
- Store response in cache
223+
- Check every per-register/coil cache key in the requested range
224+
- If all values are present and valid, reassemble and return the Modbus response
225+
- On any miss or expired value: coalesce identical in-flight range requests, then forward to upstream device
226+
- Decompose successful upstream responses into per-register/coil cache entries
184227
- Return response to client
185228
4. **For writes**:
186229
- Check readonly mode
187-
- If allowed: forward to upstream, optionally invalidate cache
230+
- If allowed: forward to upstream, then invalidate every cached register/coil in the written address range
188231
- Return response
189232

190233
## Logging

0 commit comments

Comments
 (0)