Skip to content

Commit 500a3d7

Browse files
zzstoatzzclaude
andauthored
feat: convert telemetry service to docket perpetual function (#19754)
Co-authored-by: Claude <[email protected]>
1 parent 57d6b87 commit 500a3d7

File tree

6 files changed

+132
-146
lines changed

6 files changed

+132
-146
lines changed

src/prefect/server/services/base.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,12 @@ def _known_service_modules() -> list[ModuleType]:
4040
from prefect.server.services import (
4141
scheduler,
4242
task_run_recorder,
43-
telemetry,
4443
)
4544

4645
return [
4746
# Orchestration services
4847
scheduler,
4948
task_run_recorder,
50-
telemetry,
5149
# Events services
5250
event_logger,
5351
event_persister,
Lines changed: 86 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -1,143 +1,114 @@
11
"""
2-
The Telemetry service.
2+
The Telemetry service. Sends anonymous data to Prefect to help us improve.
33
"""
44

5-
import asyncio
5+
import logging
66
import os
77
import platform
8-
from typing import Any, Optional
8+
from datetime import timedelta
99
from uuid import uuid4
1010

1111
import httpx
12+
from docket import Perpetual
1213

1314
import prefect
14-
from prefect.server.database import PrefectDBInterface
15-
from prefect.server.database.dependencies import db_injector
15+
from prefect.logging import get_logger
16+
from prefect.server.database import PrefectDBInterface, provide_database_interface
1617
from prefect.server.models import configuration
1718
from prefect.server.schemas.core import Configuration
18-
from prefect.server.services.base import (
19-
LoopService,
20-
RunInEphemeralServers,
21-
RunInWebservers,
22-
)
19+
from prefect.server.services.perpetual_services import perpetual_service
2320
from prefect.settings import PREFECT_DEBUG_MODE
2421
from prefect.settings.context import get_current_settings
25-
from prefect.settings.models.server.services import ServicesBaseSetting
2622
from prefect.types._datetime import now
2723

24+
logger: logging.Logger = get_logger(__name__)
25+
2826

29-
class Telemetry(RunInEphemeralServers, RunInWebservers, LoopService):
27+
async def _fetch_or_set_telemetry_session(
28+
db: PrefectDBInterface,
29+
) -> tuple[str, str]:
3030
"""
31-
Sends anonymous data to Prefect to help us improve
31+
Fetch or create a telemetry session in the configuration table.
3232
33-
It can be toggled off with the PREFECT_SERVER_ANALYTICS_ENABLED setting.
33+
Returns:
34+
tuple of (session_start_timestamp, session_id)
3435
"""
36+
async with db.session_context(begin_transaction=True) as session:
37+
telemetry_session = await configuration.read_configuration(
38+
session, "TELEMETRY_SESSION"
39+
)
3540

36-
loop_seconds: float = 600
41+
if telemetry_session is None:
42+
logger.debug("No telemetry session found, setting")
43+
session_id = str(uuid4())
44+
session_start_timestamp = now("UTC").isoformat()
45+
46+
telemetry_session = Configuration(
47+
key="TELEMETRY_SESSION",
48+
value={
49+
"session_id": session_id,
50+
"session_start_timestamp": session_start_timestamp,
51+
},
52+
)
3753

38-
@classmethod
39-
def service_settings(cls) -> ServicesBaseSetting:
40-
raise NotImplementedError("Telemetry service does not have settings")
54+
await configuration.write_configuration(session, telemetry_session)
55+
else:
56+
logger.debug("Session information retrieved from database")
57+
session_id = telemetry_session.value["session_id"]
58+
session_start_timestamp = telemetry_session.value["session_start_timestamp"]
4159

42-
@classmethod
43-
def environment_variable_name(cls) -> str:
44-
return "PREFECT_SERVER_ANALYTICS_ENABLED"
60+
logger.debug(f"Telemetry Session: {session_id}, {session_start_timestamp}")
61+
return (session_start_timestamp, session_id)
4562

46-
@classmethod
47-
def enabled(cls) -> bool:
48-
return get_current_settings().server.analytics_enabled
4963

50-
def __init__(self, loop_seconds: Optional[int] = None, **kwargs: Any):
51-
super().__init__(loop_seconds=loop_seconds, **kwargs)
52-
self.telemetry_environment: str = os.environ.get(
53-
"PREFECT_API_TELEMETRY_ENVIRONMENT", "production"
54-
)
64+
@perpetual_service(
65+
enabled_getter=lambda: get_current_settings().server.analytics_enabled,
66+
run_in_ephemeral=True,
67+
run_in_webserver=True,
68+
)
69+
async def send_telemetry_heartbeat(
70+
perpetual: Perpetual = Perpetual(automatic=True, every=timedelta(seconds=600)),
71+
) -> None:
72+
"""
73+
Sends anonymous telemetry data to Prefect to help us improve.
5574
56-
@db_injector
57-
async def _fetch_or_set_telemetry_session(self, db: PrefectDBInterface):
58-
"""
59-
This method looks for a telemetry session in the configuration table. If there
60-
isn't one, it sets one. It then sets `self.session_id` and
61-
`self.session_start_timestamp`.
62-
63-
Telemetry sessions last until the database is reset.
64-
"""
65-
async with db.session_context(begin_transaction=True) as session:
66-
telemetry_session = await configuration.read_configuration(
67-
session, "TELEMETRY_SESSION"
75+
It can be toggled off with the PREFECT_SERVER_ANALYTICS_ENABLED setting.
76+
"""
77+
from prefect.client.constants import SERVER_API_VERSION
78+
79+
db = provide_database_interface()
80+
session_start_timestamp, session_id = await _fetch_or_set_telemetry_session(db=db)
81+
telemetry_environment = os.environ.get(
82+
"PREFECT_API_TELEMETRY_ENVIRONMENT", "production"
83+
)
84+
85+
heartbeat = {
86+
"source": "prefect_server",
87+
"type": "heartbeat",
88+
"payload": {
89+
"platform": platform.system(),
90+
"architecture": platform.machine(),
91+
"python_version": platform.python_version(),
92+
"python_implementation": platform.python_implementation(),
93+
"environment": telemetry_environment,
94+
"ephemeral_server": bool(os.getenv("PREFECT__SERVER_EPHEMERAL", False)),
95+
"api_version": SERVER_API_VERSION,
96+
"prefect_version": prefect.__version__,
97+
"session_id": session_id,
98+
"session_start_timestamp": session_start_timestamp,
99+
},
100+
}
101+
102+
try:
103+
async with httpx.AsyncClient() as client:
104+
result = await client.post(
105+
"https://sens-o-matic.prefect.io/",
106+
json=heartbeat,
107+
headers={"x-prefect-event": "prefect_server"},
68108
)
69-
70-
if telemetry_session is None:
71-
self.logger.debug("No telemetry session found, setting")
72-
session_id = str(uuid4())
73-
session_start_timestamp = now("UTC").isoformat()
74-
75-
telemetry_session = Configuration(
76-
key="TELEMETRY_SESSION",
77-
value={
78-
"session_id": session_id,
79-
"session_start_timestamp": session_start_timestamp,
80-
},
81-
)
82-
83-
await configuration.write_configuration(session, telemetry_session)
84-
85-
self.session_id = session_id
86-
self.session_start_timestamp = session_start_timestamp
87-
else:
88-
self.logger.debug("Session information retrieved from database")
89-
self.session_id: str = telemetry_session.value["session_id"]
90-
self.session_start_timestamp: str = telemetry_session.value[
91-
"session_start_timestamp"
92-
]
93-
self.logger.debug(
94-
f"Telemetry Session: {self.session_id}, {self.session_start_timestamp}"
109+
result.raise_for_status()
110+
except Exception as exc:
111+
logger.error(
112+
f"Failed to send telemetry: {exc}",
113+
exc_info=PREFECT_DEBUG_MODE.value(),
95114
)
96-
return (self.session_start_timestamp, self.session_id)
97-
98-
async def run_once(self) -> None:
99-
"""
100-
Sends a heartbeat to the sens-o-matic
101-
"""
102-
from prefect.client.constants import SERVER_API_VERSION
103-
104-
if not hasattr(self, "session_id"):
105-
await self._fetch_or_set_telemetry_session()
106-
107-
heartbeat = {
108-
"source": "prefect_server",
109-
"type": "heartbeat",
110-
"payload": {
111-
"platform": platform.system(),
112-
"architecture": platform.machine(),
113-
"python_version": platform.python_version(),
114-
"python_implementation": platform.python_implementation(),
115-
"environment": self.telemetry_environment,
116-
"ephemeral_server": bool(os.getenv("PREFECT__SERVER_EPHEMERAL", False)),
117-
"api_version": SERVER_API_VERSION,
118-
"prefect_version": prefect.__version__,
119-
"session_id": self.session_id,
120-
"session_start_timestamp": self.session_start_timestamp,
121-
},
122-
}
123-
124-
try:
125-
async with httpx.AsyncClient() as client:
126-
result = await client.post(
127-
"https://sens-o-matic.prefect.io/",
128-
json=heartbeat,
129-
headers={"x-prefect-event": "prefect_server"},
130-
)
131-
result.raise_for_status()
132-
except Exception as exc:
133-
self.logger.error(
134-
f"Failed to send telemetry: {exc}\nShutting down telemetry service...",
135-
# The traceback is only needed if doing deeper debugging, otherwise
136-
# this looks like an impactful server error
137-
exc_info=PREFECT_DEBUG_MODE.value(),
138-
)
139-
await self.stop(block=False)
140-
141-
142-
if __name__ == "__main__":
143-
asyncio.run(Telemetry(handle_signals=True).start())

tests/cli/test_server_services.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,9 @@ def test_list_services(self):
126126
"Available Services",
127127
"Scheduler",
128128
"PREFECT_SERVER_SERVICES_SCHEDULER_ENABLED",
129-
"Telemetry",
130-
"PREFECT_SERVER_ANALYTICS_ENABLED",
129+
"TaskRunRecorder",
130+
# May be truncated in table display
131+
"PREFECT_SERVER_SERVICES_TASK_RUN_RECORDER",
131132
],
132133
expected_code=0,
133134
)

tests/server/services/test_perpetual_services.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,23 @@ def test_foreman_service_registered():
3838
assert "monitor_worker_health" in service_names
3939

4040

41+
def test_telemetry_service_registered():
42+
"""Test that telemetry perpetual service is registered."""
43+
service_names = [config.function.__name__ for config in _PERPETUAL_SERVICES]
44+
assert "send_telemetry_heartbeat" in service_names
45+
46+
47+
def test_telemetry_runs_in_all_modes():
48+
"""Test that telemetry is configured to run in ephemeral and webserver modes."""
49+
config = next(
50+
c
51+
for c in _PERPETUAL_SERVICES
52+
if c.function.__name__ == "send_telemetry_heartbeat"
53+
)
54+
assert config.run_in_ephemeral is True
55+
assert config.run_in_webserver is True
56+
57+
4158
def test_get_perpetual_services_returns_all_in_default_mode():
4259
"""Test that get_perpetual_services returns all services in default mode."""
4360
services = get_perpetual_services(ephemeral=False, webserver_only=False)

tests/server/services/test_service_subsets.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,12 @@
77
from prefect.server.services.base import RunInEphemeralServers, RunInWebservers, Service
88
from prefect.server.services.scheduler import RecentDeploymentsScheduler, Scheduler
99
from prefect.server.services.task_run_recorder import TaskRunRecorder
10-
from prefect.server.services.telemetry import Telemetry
1110

1211

1312
def test_the_all_service_subset():
1413
"""The following services should be enabled on background servers or full-featured
1514
API servers"""
1615
assert set(Service.all_services()) == {
17-
Telemetry,
1816
# Orchestration services
1917
RecentDeploymentsScheduler,
2018
Scheduler,
@@ -34,7 +32,6 @@ def test_the_all_service_subset():
3432
def test_run_in_ephemeral_servers():
3533
"""The following services should be enabled on ephemeral servers"""
3634
assert set(RunInEphemeralServers.all_services()) == {
37-
Telemetry,
3835
# Orchestration services
3936
TaskRunRecorder,
4037
# Events services
@@ -52,7 +49,6 @@ def test_run_in_ephemeral_servers():
5249
def test_run_in_webservers():
5350
"""The following services should be enabled on webservers"""
5451
assert set(RunInWebservers.all_services()) == {
55-
Telemetry,
5652
# Events services
5753
Distributor,
5854
# Logs services

tests/server/services/test_telemetry.py

Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@
66
from httpx import Response
77

88
import prefect
9-
from prefect.server.services.telemetry import Telemetry
9+
from prefect.server.services.telemetry import (
10+
_fetch_or_set_telemetry_session,
11+
send_telemetry_heartbeat,
12+
)
1013

1114

1215
@pytest.fixture
@@ -32,8 +35,7 @@ def error_sens_o_matic_mock():
3235
async def test_sens_o_matic_called_correctly(sens_o_matic_mock):
3336
from prefect.client.constants import SERVER_API_VERSION
3437

35-
telemetry = Telemetry()
36-
await telemetry.start(loops=1)
38+
await send_telemetry_heartbeat()
3739

3840
assert sens_o_matic_mock.called
3941
assert sens_o_matic_mock.call_count == 1
@@ -54,34 +56,35 @@ async def test_sens_o_matic_called_correctly(sens_o_matic_mock):
5456
assert payload["api_version"] == SERVER_API_VERSION
5557
assert payload["prefect_version"] == prefect.__version__
5658

57-
assert payload["session_id"] == telemetry.session_id
58-
assert payload["session_start_timestamp"] == telemetry.session_start_timestamp
59+
# Session info should be present
60+
assert payload["session_id"]
61+
assert payload["session_start_timestamp"]
5962

6063

61-
async def test_sets_and_fetches_session_information(sens_o_matic_mock):
62-
telemetry = Telemetry()
63-
await telemetry.start(loops=1)
64+
async def test_sets_and_fetches_session_information():
65+
from prefect.server.database import provide_database_interface
6466

65-
# set it on the first call
66-
sid = telemetry.session_id
67-
sts = telemetry.session_start_timestamp
68-
assert sid
69-
assert sts
67+
db = provide_database_interface()
7068

71-
# retrieve from the db if process restarted
72-
telemetry_2 = Telemetry()
73-
await telemetry_2.start(loops=1)
74-
assert telemetry_2.session_id == sid
75-
assert telemetry_2.session_start_timestamp == sts
69+
# Get session info from first call
70+
session_start_timestamp_1, session_id_1 = await _fetch_or_set_telemetry_session(
71+
db=db
72+
)
73+
assert session_id_1
74+
assert session_start_timestamp_1
7675

76+
# Retrieve from the db if process restarted (same session info)
77+
session_start_timestamp_2, session_id_2 = await _fetch_or_set_telemetry_session(
78+
db=db
79+
)
80+
assert session_id_2 == session_id_1
81+
assert session_start_timestamp_2 == session_start_timestamp_1
7782

78-
async def test_errors_shutdown_service(error_sens_o_matic_mock, caplog):
79-
# When telemetry encounters an error on any loop the service is stopped
80-
telemetry = Telemetry()
8183

82-
await telemetry.start(loops=5)
84+
async def test_errors_do_not_crash_service(error_sens_o_matic_mock, caplog):
85+
# When telemetry encounters an error, it logs but doesn't crash
86+
await send_telemetry_heartbeat()
8387

84-
# The service should only be hit once
8588
assert error_sens_o_matic_mock.called
8689
assert error_sens_o_matic_mock.call_count == 1
8790

0 commit comments

Comments
 (0)