Skip to content

Commit 22a7a46

Browse files
authored
feat: send/receive relations for forwarding OTLP metrics (#131)
1 parent 4b80462 commit 22a7a46

25 files changed

+1144
-123
lines changed

charmcraft.yaml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,17 @@ provides:
8181
interface: profiling
8282
optional: true
8383
description: Receive profiles from other charms.
84+
receive-otlp:
85+
interface: otlp
86+
optional: true
87+
description: |
88+
Receive OTLP data with an OTLP server by publishing all OTLP endpoints for the supported
89+
protocols and telemetries of this server. OTLP data can contain multiple telemetries, making
90+
this endpoint a modern alternative to having one relation per telemetry-type, e.g.
91+
`receive-loki-logs` and/or `receive-remote-write`.
92+
93+
This endpoint currently supports:
94+
- HTTP for ["metrics"]
8495
8596
requires:
8697
send-profiles:
@@ -152,6 +163,16 @@ requires:
152163
description: |
153164
Ingress integration for the Otelcol server, so that cross-model workloads can send their data
154165
through the ingress. Uses `traefik_route` to open ports on Traefik host for tracing ingesters.
166+
send-otlp:
167+
interface: otlp
168+
optional: true
169+
description: |
170+
Send OTLP data to an OTLP server by agregating all supported OTLP endpoints offered by an
171+
OTLP server. If multiple endpoints are supported, then the first supported one is chosen.
172+
173+
This endpoint currently supports:
174+
- gRPC for ["logs", "metrics"]
175+
- HTTP for ["logs", "metrics"]
155176
156177
peers:
157178
peers:

src/charm.py

Lines changed: 43 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,10 @@
3636

3737
logger = logging.getLogger(__name__)
3838

39-
def charm_address(container: Container, ingress: integrations.TraefikRouteRequirer) -> integrations.Address:
39+
40+
def charm_address(
41+
container: Container, ingress: integrations.TraefikRouteRequirer
42+
) -> integrations.Address:
4043
"""Return the Address dataclass from charm context.
4144
4245
Args:
@@ -50,18 +53,22 @@ def charm_address(container: Container, ingress: integrations.TraefikRouteRequir
5053
internal_scheme = "https" if tls else "http"
5154
internal_url = f"{internal_scheme}://{socket.getfqdn()}"
5255
external_url = (
53-
f"{ingress.scheme}://{ingress.external_host}" if integrations.ingress_ready(ingress) else None
56+
f"{ingress.scheme}://{ingress.external_host}"
57+
if integrations.ingress_ready(ingress)
58+
else None
5459
)
5560
resolved_url = external_url if external_url else internal_url
5661
resolved_scheme = urlparse(external_url).scheme if external_url else "https" if tls else "http"
5762

5863
return integrations.Address(
59-
internal_scheme,
60-
internal_url,
61-
resolved_scheme,
62-
resolved_url,
64+
ingress=integrations.ingress_ready(ingress),
65+
internal_scheme=internal_scheme,
66+
internal_url=internal_url,
67+
resolved_scheme=resolved_scheme,
68+
resolved_url=resolved_url,
6369
)
6470

71+
6572
def refresh_certs(container: Container):
6673
"""Run `update-ca-certificates` to refresh the trusted system certs."""
6774
container.exec(["update-ca-certificates", "--fresh"]).wait()
@@ -83,6 +90,7 @@ def _get_missing_mandatory_relations(charm: CharmBase) -> Optional[str]:
8390
pairs={
8491
"metrics-endpoint": [ # must be paired with:
8592
{"send-remote-write"}, # or
93+
{"send-otlp"},
8694
{"cloud-config"},
8795
],
8896
"receive-loki-logs": [ # must be paired with:
@@ -97,6 +105,15 @@ def _get_missing_mandatory_relations(charm: CharmBase) -> Optional[str]:
97105
{"grafana-dashboards-provider"}, # or
98106
{"cloud-config"},
99107
],
108+
"receive-otlp": [ # must be paired with:
109+
{"send-otlp"}, # or
110+
# Technically, this would depend on databag contents for enabled pipelines,
111+
# but we keep it simple for now.
112+
{"send-traces"},
113+
{"send-loki-logs"},
114+
{"send-remote-write"},
115+
{"cloud-config"},
116+
],
100117
}
101118
)
102119
active_relations = {name for name, relation in charm.model.relations.items() if relation}
@@ -200,6 +217,11 @@ def _reconcile(self):
200217
# cf: https://github.com/canonical/opentelemetry-collector-k8s-operator/issues/17
201218
feature_gates: Optional[str] = None
202219

220+
# OTLP setup
221+
integrations.receive_otlp(self, otelcol_address.resolved_url)
222+
otlp_endpoints = integrations.send_otlp(self)
223+
config_manager.add_otlp_forwarding(otlp_endpoints)
224+
203225
# Logs setup
204226
integrations.receive_loki_logs(self, otelcol_address)
205227
loki_endpoints = integrations.send_loki_logs(self)
@@ -319,6 +341,21 @@ def _reconcile(self):
319341
if missing_relations:
320342
self.unit.status = BlockedStatus(missing_relations)
321343

344+
# Cyclic OTLP relations
345+
if integrations.cyclic_otlp_relations_exist(self):
346+
self.unit.status = BlockedStatus("cyclic OTLP relations exist")
347+
348+
# Ingress and scaling status
349+
if self.model.unit.is_leader():
350+
if self.app.planned_units() > 1 and not otelcol_address.ingress:
351+
self.unit.status = BlockedStatus(
352+
"Ingress missing - routing only to leader; see debug-log"
353+
)
354+
logger.warning(
355+
"without ingress and planned_units > 1, all data is forwarded to the leader "
356+
"unit, with nothing sent to non-leader units."
357+
)
358+
322359
# Workload version
323360
self.unit.set_workload_version(self._otelcol_version or "")
324361

src/config_builder.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ def add_default_config(self):
156156
# fighting for port bindings. This is only for relevant for the vm charm
157157
self.add_component(
158158
Component.receiver,
159-
"otlp",
159+
f"otlp/{self._unit_name}",
160160
{
161161
"protocols": {
162162
"http": {"endpoint": f"0.0.0.0:{Port.otlp_http.value}"},

src/config_manager.py

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,14 @@
22

33
import logging
44
from typing import Any, Dict, List, Literal, Optional, Set
5+
from urllib.parse import urlparse
56

67
import yaml
78

89
from config_builder import Component, ConfigBuilder, Port
910
from constants import FILE_STORAGE_DIRECTORY
1011
from integrations import ProfilingEndpoint
12+
from otlp import OtlpEndpoint
1113

1214
logger = logging.getLogger(__name__)
1315

@@ -251,14 +253,14 @@ def add_profile_ingestion(self):
251253
"""Configure ingesting profiles."""
252254
self.config.add_component(
253255
Component.receiver,
254-
"otlp",
256+
f"otlp/{self._unit_name}",
255257
{
256258
"protocols": {
257259
"http": {"endpoint": f"0.0.0.0:{Port.otlp_http.value}"},
258260
"grpc": {"endpoint": f"0.0.0.0:{Port.otlp_grpc.value}"},
259261
},
260262
},
261-
pipelines=["profiles"],
263+
pipelines=[f"profiles/{self._unit_name}"],
262264
)
263265

264266
def add_profile_forwarding(self, endpoints: List[ProfilingEndpoint]):
@@ -282,7 +284,7 @@ def add_profile_forwarding(self, endpoints: List[ProfilingEndpoint]):
282284
"insecure_skip_verify": self._insecure_skip_verify,
283285
},
284286
},
285-
pipelines=["profiles"],
287+
pipelines=[f"profiles/{self._unit_name}"],
286288
)
287289

288290
def add_self_scrape(self, identifier: str, labels: Dict) -> None:
@@ -367,8 +369,38 @@ def add_remote_write(self, endpoints: List[Dict[str, str]]):
367369
pipelines=[f"metrics/{self._unit_name}"],
368370
)
369371

370-
# TODO Receive alert rules via remote write
371-
# https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/37277
372+
def add_otlp_forwarding(self, relation_map: Dict[int, OtlpEndpoint]):
373+
"""Configure sending OTLP telemetry to an OTLP endpoint.
374+
375+
There are 2 different OTLP exporters for their respective protocols: gRPC and HTTP. If a
376+
gRPC endpoint is provided, it is preferred over the HTTP equivalent.
377+
378+
Telemetry is sent to all pipelines since OTLP supports all and its computationally
379+
inexpensive unless a receiver is connected and receiving telemetry.
380+
381+
Args:
382+
relation_map: a mapping of relation ID to a mapping of unit name to OtlpEndpoint
383+
"""
384+
# https://github.com/open-telemetry/opentelemetry-collector/tree/main/exporter/otlpexporter
385+
# https://github.com/open-telemetry/opentelemetry-collector/tree/main/exporter/otlphttpexporter
386+
387+
if not relation_map:
388+
return
389+
390+
# Exporter config
391+
for rel_id, otlp_endpoint in relation_map.items():
392+
insecure = urlparse(otlp_endpoint.endpoint).scheme == "http"
393+
tls_config: Dict[str, Any] = {
394+
"insecure": insecure,
395+
"insecure_skip_verify": self._insecure_skip_verify,
396+
}
397+
exporter_type = "otlp" if otlp_endpoint.protocol == "grpc" else "otlphttp"
398+
self.config.add_component(
399+
Component.exporter,
400+
f"{exporter_type}/rel-{rel_id}/{self._unit_name}",
401+
{"endpoint": otlp_endpoint.endpoint, "tls": tls_config},
402+
pipelines=[f"{_type}/{self._unit_name}" for _type in otlp_endpoint.telemetries],
403+
)
372404

373405
def add_traces_ingestion(
374406
self,

src/integrations.py

Lines changed: 58 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,13 @@
6464
SERVER_CERT_PATH,
6565
SERVER_CERT_PRIVATE_KEY_PATH,
6666
)
67+
from otlp import (
68+
OtlpConsumer,
69+
OtlpEndpoint,
70+
OtlpProvider,
71+
DEFAULT_PROVIDER_RELATION_NAME,
72+
DEFAULT_CONSUMER_RELATION_NAME,
73+
)
6774

6875
logger = logging.getLogger(__name__)
6976

@@ -247,8 +254,6 @@ def send_remote_write(charm: CharmBase) -> List[Dict[str, str]]:
247254
peer_relation_name="peers",
248255
)
249256
charm.__setattr__("remote_write", remote_write)
250-
# TODO: add alerts from remote write
251-
# https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/37277
252257
# TODO: Luca: probably don't need this anymore
253258
remote_write.reload_alerts()
254259
return remote_write.endpoints
@@ -460,6 +465,51 @@ def forward_dashboards(charm: CharmBase):
460465
# grafana_dashboards_provider._reinitialize_dashboard_data(inject_dropdowns=False)
461466

462467

468+
def cyclic_otlp_relations_exist(charm: CharmBase) -> bool:
469+
"""Check if any application is related on both send-otlp and receive-otlp.
470+
471+
This function only checks relations for the current charm, i.e. one level deep. If there is
472+
another charm in between these applications, but is still cyclic, then it will not be caught.
473+
"""
474+
receive_relations = charm.model.relations.get(DEFAULT_PROVIDER_RELATION_NAME, [])
475+
send_relations = charm.model.relations.get(DEFAULT_CONSUMER_RELATION_NAME, [])
476+
477+
if not receive_relations or not send_relations:
478+
return False
479+
480+
receive_apps = {rel.app.name for rel in receive_relations if rel.app}
481+
send_apps = {rel.app.name for rel in send_relations if rel.app}
482+
483+
return not receive_apps.isdisjoint(send_apps)
484+
485+
486+
def receive_otlp(charm: CharmBase, resolved_url: str) -> None:
487+
"""Instantiate the OtlpProvider.
488+
489+
The gRPC protocol is not supported because Traefik (ingress) does not support it.
490+
"""
491+
otlp_provider = OtlpProvider(charm)
492+
# TODO: We can remove this since the lib doesn't observe events
493+
charm.__setattr__("otlp_provider", otlp_provider)
494+
otlp_provider.add_endpoint("http", f"{resolved_url}:4318", ["metrics"])
495+
otlp_provider.publish()
496+
497+
498+
def send_otlp(charm: CharmBase) -> Dict[int, OtlpEndpoint]:
499+
"""Instantiate the OtlpConsumer.
500+
501+
This provides otelcol with the remote's OTLP endpoint for each relation.
502+
"""
503+
otlp_consumer = OtlpConsumer(
504+
charm,
505+
protocols=["grpc", "http"],
506+
telemetries=["logs", "metrics"],
507+
)
508+
# TODO: We can remove this since the lib doesn't observe events
509+
charm.__setattr__("otlp_consumer", otlp_consumer)
510+
return otlp_consumer.get_remote_otlp_endpoints()
511+
512+
463513
# TODO: Luca: move this into the GrafanCloudIntegrator library
464514
@dataclass
465515
class CloudIntegratorData:
@@ -630,7 +680,10 @@ def _static_ingress_config() -> dict:
630680

631681

632682
def _build_lb_server_config(scheme: str, port: int) -> List[Dict[str, str]]:
633-
"""Build the server portion of the loadbalancer config of Traefik ingress."""
683+
"""Build the server portion of the loadbalancer config of Traefik ingress.
684+
685+
The leader provides the kubernetes service address to Traefik to serve as ingress.
686+
"""
634687
return [{"url": f"{scheme}://{socket.getfqdn()}:{port}"}]
635688

636689

@@ -641,7 +694,7 @@ def is_tls_ready(container: Container) -> bool:
641694
)
642695

643696

644-
@dataclass
697+
@dataclass(kw_only=True)
645698
class Address:
646699
"""Provide address information for the charm.
647700
@@ -650,6 +703,7 @@ class Address:
650703
- tls events
651704
"""
652705

706+
ingress: bool
653707
internal_scheme: str # Only TLS context
654708
internal_url: str # Only TLS context
655709
resolved_scheme: str # TLS & ingress context

0 commit comments

Comments
 (0)