Skip to content

Commit 4be2201

Browse files
feat: Event queue early stop in buffer overflow (#118)
- drop events early (optimization) - rethrow initialization error (fix oflline) <!-- CURSOR_SUMMARY --> --- > [!NOTE] > Optimizes event batching and improves error handling. > > - In `EventQueue`, introduce `SendResult` and change batch `send([payloads])` to early-stop on total buffer overflow and skip further items for exporters that hit per-exporter limits; `send(_:)` now returns a result and triggers overflow notifications before stopping. > - In `SessionReplayExporter`, rethrow initialization errors after logging so callers can handle failures. > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit f8ae091. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent 8a03120 commit 4be2201

File tree

2 files changed

+27
-5
lines changed

2 files changed

+27
-5
lines changed

Sources/LaunchDarklyObservability/Transport/EventQueue.swift

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,12 @@ public actor EventQueue: EventQueuing {
5151
let cost: Int
5252
}
5353

54+
private enum SendResult {
55+
case success
56+
case overflowedPartial(ObjectIdentifier)
57+
case overflowedTotal
58+
}
59+
5460
private var storage = [ObjectIdentifier: [EventQueueItem]]()
5561
private var lastEventTime: TimeInterval = 0
5662
private let limitSize: Int
@@ -73,29 +79,44 @@ public actor EventQueue: EventQueuing {
7379
}
7480

7581
public func send(_ payloads: [EventQueueItemPayload]) async {
76-
payloads.forEach {
77-
send(EventQueueItem(payload: $0))
82+
var overflowedExporters: Set<ObjectIdentifier> = []
83+
for payload in payloads {
84+
let item = EventQueueItem(payload: payload)
85+
guard overflowedExporters.contains(item.exporterTypeId) == false else {
86+
continue
87+
}
88+
89+
let result = send(item)
90+
switch result {
91+
case .success:
92+
continue
93+
case .overflowedPartial(let exporterId):
94+
overflowedExporters.insert(exporterId)
95+
case .overflowedTotal:
96+
return
97+
}
7898
}
7999
}
80100

81-
func send(_ item: EventQueueItem) {
101+
private func send(_ item: EventQueueItem) -> SendResult {
82102
guard currentSize == 0 || currentSize + item.cost <= limitSize else {
83103
var exporterState = currentSizes[item.exporterTypeId, default: EventExporterState()]
84104
notifyOverflowIfNeeded(typeId: item.exporterTypeId, exporterState)
85-
return
105+
return .overflowedTotal
86106
}
87107

88108
var exporterState = currentSizes[item.exporterTypeId, default: EventExporterState()]
89109
guard exporterState.size + item.cost <= exporterLimitSize else {
90110
notifyOverflowIfNeeded(typeId: item.exporterTypeId, exporterState)
91-
return
111+
return .overflowedPartial(item.exporterTypeId)
92112
}
93113

94114
storage[item.exporterTypeId, default: []].append(item)
95115
lastEventTime = item.timestamp
96116
currentSize += item.cost
97117
exporterState.size += item.cost
98118
currentSizes[item.exporterTypeId] = exporterState
119+
return .success
99120
}
100121

101122
private func notify(typeId: ObjectIdentifier, _ status: EventStatus) {

Sources/LaunchDarklySessionReplay/Exporter/SessionReplayExporter.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ actor SessionReplayExporter: EventExporting {
7777
} catch {
7878
initializedSession = nil
7979
os_log("%{public}@", log: log, type: .error, "Failed to initialize Session Replay:\n\(error)")
80+
throw error
8081
}
8182
}
8283

0 commit comments

Comments
 (0)