feat(log-collector): add per-pod K8s events collector#3037
feat(log-collector): add per-pod K8s events collector#3037ygrishajev wants to merge 1 commit intomainfrom
Conversation
📝 WalkthroughWalkthroughThe changes extend the log collector to simultaneously collect Kubernetes events alongside container logs. A new Changes
Sequence Diagram(s)sequenceDiagram
participant PC as Pod Discovery
participant KCS as K8sCollectorService
participant PECF as PodEventsCollectorFactory
participant KECS as K8sEventsCollectorService
participant PLCF as PodLogsCollectorFactory
participant PLCS as PodLogsCollectorService
participant Watch as Kubernetes Watch API
participant FD as FileDestinationService
PC->>KCS: Pod detected
KCS->>PECF: create(podInfo, fileDestination, signal)
PECF->>KECS: instantiate with DI deps
KCS->>PLCF: create(podInfo, fileDestination, signal)
PLCF->>PLCS: instantiate with DI deps
par Event Collection
KECS->>Watch: watch(events, fieldSelector=podName)
Watch-->>KECS: event stream
KECS->>KECS: format JSON line (timestamp, reason, phase)
KECS->>FD: write(jsonEvent)
and Log Collection
PLCS->>FD: stream logs from pod
FD-->>FD: rotate & write logs
end
PC->>KCS: Pod deleted
KCS->>KECS: abort signal triggered
KCS->>PLCS: abort signal triggered
KECS-->>FD: close stream
PLCS-->>FD: close stream
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Comment |
|
|
||
| describe("Logs and Events Collector E2E", () => { | ||
| beforeAll(() => { | ||
| execSync(`kubectl apply -f ${K8S_DIR}`, { stdio: "ignore" }); |
Check warning
Code scanning / CodeQL
Shell command built from environment values Medium test
| { timeout: 15_000, interval: 2000 } | ||
| ); | ||
| } finally { | ||
| execSync(`kubectl apply -f ${K8S_DIR}/role.yaml`, { stdio: "ignore" }); |
Check warning
Code scanning / CodeQL
Shell command built from environment values Medium test
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3037 +/- ##
==========================================
- Coverage 59.61% 59.40% -0.22%
==========================================
Files 1034 1010 -24
Lines 24242 23900 -342
Branches 6012 5969 -43
==========================================
- Hits 14453 14198 -255
+ Misses 8539 8456 -83
+ Partials 1250 1246 -4
*This pull request uses carry forward flags. Click here to find out more.
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
apps/log-collector/src/services/k8s-collector/k8s-collector.service.ts (1)
40-56:⚠️ Potential issue | 🟠 MajorConcurrent writes to shared stream will cause interleaved JSON output.
Both
collectPodLogs()andcollectPodEvents()callcreateWriteStream()on the samefileDestinationinstance, which returns the same memoizedPassThroughstream. Both collectors then write to it concurrently viaPromise.all. Node.jsWritableStream.write()is not atomic—concurrent writes can interleave, corrupting the JSON lines in the output file.Consider one of these approaches:
- Use separate file destinations per collector (e.g.,
_logs.logand_events.log)- Serialize writes through a mutex or queue
- Buffer complete lines before writing, ensuring atomic operations
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/log-collector/src/services/k8s-collector/k8s-collector.service.ts` around lines 40 - 56, startCollectionForPod currently creates a single fileDestination and passes it to both collectPodLogs() and collectPodEvents(), causing both to call createWriteStream() on the same memoized PassThrough and produce interleaved JSON; fix by giving each collector its own independent destination (e.g., create a separate fileDestination/logs and fileDestination/events via this.fileDestinationFactory.create or extend the factory to produce distinct child destinations) and pass those to podLogsCollector.collectPodLogs() and podEventsCollector.collectPodEvents() instead of sharing one, or alternatively implement a serialization layer (mutex/queue) inside the fileDestination.createWriteStream implementation so writes from collectPodLogs and collectPodEvents are serialized and cannot interleave.
🧹 Nitpick comments (1)
apps/log-collector/test/e2e/events-collector.e2e.ts (1)
189-205: Consider adding error handling for Datadog API responses.The
queryDatadogfunction assumes a successful JSON response structure. A non-2xx response or malformed JSON could cause confusing test failures.🔧 Optional: Add response status check
const response = await fetch(`https://api.${DD_SITE}/api/v2/logs/events/search`, { method: "POST", headers: { "Content-Type": "application/json", "DD-API-KEY": apiKey, "DD-APPLICATION-KEY": appKey }, body: JSON.stringify({ filter: { query, from: "now-5m", to: "now" }, page: { limit: 10 }, sort: "-timestamp" }) }); + if (!response.ok) { + throw new Error(`Datadog API error: ${response.status} ${response.statusText}`); + } + return (await response.json()) as { data: Array<{ attributes: Record<string, unknown> }> };🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/log-collector/test/e2e/events-collector.e2e.ts` around lines 189 - 205, The queryDatadog function assumes a successful JSON response; add robust response/error handling in queryDatadog: after the fetch, check response.ok and if false read response.text() and throw a descriptive Error including response.status, response.statusText and the response body; wrap the JSON parse in try/catch and on parse failure throw an Error including the raw response text and the original parse error; keep the returned shape the same when successful. This change should be applied inside the queryDatadog function to ensure non-2xx responses and malformed JSON produce clear, actionable errors.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@apps/log-collector/src/services/k8s-events-collector/k8s-events-collector.service.ts`:
- Around line 46-87: The AsyncChannel's internal queue is unbounded and can grow
if Kubernetes emits events faster than watchEvents consumes them; update the
implementation to provide backpressure (e.g., add a maxSize parameter to
AsyncChannel and either block producers when full or apply an overflow policy
like dropping oldest/newest) and use that in watchEvents (the channel created in
watchEvents) so the watcher callback either awaits when the channel is full or
drops events according to the chosen policy; ensure the watcher callback (passed
into this.watch.watch) handles the async backpressure (or checks channel.push
result) and that the for-await consumer in watchEvents continues to drain the
bounded channel to avoid unbounded memory growth.
---
Outside diff comments:
In `@apps/log-collector/src/services/k8s-collector/k8s-collector.service.ts`:
- Around line 40-56: startCollectionForPod currently creates a single
fileDestination and passes it to both collectPodLogs() and collectPodEvents(),
causing both to call createWriteStream() on the same memoized PassThrough and
produce interleaved JSON; fix by giving each collector its own independent
destination (e.g., create a separate fileDestination/logs and
fileDestination/events via this.fileDestinationFactory.create or extend the
factory to produce distinct child destinations) and pass those to
podLogsCollector.collectPodLogs() and podEventsCollector.collectPodEvents()
instead of sharing one, or alternatively implement a serialization layer
(mutex/queue) inside the fileDestination.createWriteStream implementation so
writes from collectPodLogs and collectPodEvents are serialized and cannot
interleave.
---
Nitpick comments:
In `@apps/log-collector/test/e2e/events-collector.e2e.ts`:
- Around line 189-205: The queryDatadog function assumes a successful JSON
response; add robust response/error handling in queryDatadog: after the fetch,
check response.ok and if false read response.text() and throw a descriptive
Error including response.status, response.statusText and the response body; wrap
the JSON parse in try/catch and on parse failure throw an Error including the
raw response text and the original parse error; keep the returned shape the same
when successful. This change should be applied inside the queryDatadog function
to ensure non-2xx responses and malformed JSON produce clear, actionable errors.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 654cfa46-860f-40c8-916b-92c303105310
⛔ Files ignored due to path filters (1)
apps/log-collector/docs/architecture.pngis excluded by!**/*.png
📒 Files selected for processing (15)
apps/log-collector/README.mdapps/log-collector/docs/ARCHITECTURE.mdapps/log-collector/k8s/role.yamlapps/log-collector/package.jsonapps/log-collector/src/bootstrap/bootstrap.tsapps/log-collector/src/factories/pod-events-collector/pod-events-collector.factory.tsapps/log-collector/src/index.tsapps/log-collector/src/services/file-destination/file-destination.service.spec.tsapps/log-collector/src/services/k8s-collector/k8s-collector.service.spec.tsapps/log-collector/src/services/k8s-collector/k8s-collector.service.tsapps/log-collector/src/services/k8s-events-collector/k8s-events-collector.service.spec.tsapps/log-collector/src/services/k8s-events-collector/k8s-events-collector.service.tsapps/log-collector/test/e2e/events-collector.e2e.tsapps/log-collector/test/seeders/kubernetes-event.seeder.tsapps/log-collector/vitest.e2e.config.ts
apps/log-collector/src/services/k8s-events-collector/k8s-events-collector.service.ts
Show resolved
Hide resolved
|
Caution Review failedAn error occurred during the review process. Please try again later. 📝 WalkthroughWalkthroughThe changes extend the log collector to simultaneously collect Kubernetes events alongside container logs. A new Changes
Sequence Diagram(s)sequenceDiagram
participant PC as Pod Discovery
participant KCS as K8sCollectorService
participant PECF as PodEventsCollectorFactory
participant KECS as K8sEventsCollectorService
participant PLCF as PodLogsCollectorFactory
participant PLCS as PodLogsCollectorService
participant Watch as Kubernetes Watch API
participant FD as FileDestinationService
PC->>KCS: Pod detected
KCS->>PECF: create(podInfo, fileDestination, signal)
PECF->>KECS: instantiate with DI deps
KCS->>PLCF: create(podInfo, fileDestination, signal)
PLCF->>PLCS: instantiate with DI deps
par Event Collection
KECS->>Watch: watch(events, fieldSelector=podName)
Watch-->>KECS: event stream
KECS->>KECS: format JSON line (timestamp, reason, phase)
KECS->>FD: write(jsonEvent)
and Log Collection
PLCS->>FD: stream logs from pod
FD-->>FD: rotate & write logs
end
PC->>KCS: Pod deleted
KCS->>KECS: abort signal triggered
KCS->>PLCS: abort signal triggered
KECS-->>FD: close stream
PLCS-->>FD: close stream
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
Why
Closes CON-1
The log-collector captures pod logs but not Kubernetes events. Events like scheduling failures, image pull errors, OOMKills, and back-off restarts are critical observability signals that are currently lost.
What
K8sEventsCollectorServicethat watches K8s events per pod via the Watch APIPodEventsCollectorFactoryfor per-pod event collector instanceswatchverb to events RBAC rolemake devand a local K8s cluster