Skip to content

Commit 372253f

Browse files
authored
feat: sync up-to-date health check information with AppProxy (#5230)
1 parent faa8d8c commit 372253f

File tree

4 files changed

+44
-9
lines changed

4 files changed

+44
-9
lines changed

changes/5230.feature.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Sync model service's health information real-time with AppProxy

src/ai/backend/common/clients/valkey_client/valkey_live/client.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import json
12
import logging
23
from collections.abc import Mapping
34
from typing import (
@@ -7,6 +8,7 @@
78
Self,
89
cast,
910
)
11+
from uuid import UUID
1012

1113
from glide import (
1214
Batch,
@@ -22,6 +24,7 @@
2224
create_layer_aware_valkey_decorator,
2325
create_valkey_client,
2426
)
27+
from ai.backend.common.data.config.types import HealthCheckConfig
2528
from ai.backend.common.metrics.metric import LayerType
2629
from ai.backend.common.types import RedisTarget
2730
from ai.backend.logging.utils import BraceStyleAdapter
@@ -473,6 +476,32 @@ async def hgetall_str(self, key: str) -> dict[str, str]:
473476

474477
return str_result
475478

479+
@valkey_decorator()
480+
async def update_appproxy_redis_info(
481+
self,
482+
endpoint_id: UUID,
483+
connection_info: dict[str, Any],
484+
health_check_config: HealthCheckConfig | None,
485+
) -> None:
486+
pipe = self._create_batch()
487+
pipe.set(
488+
f"endpoint.{endpoint_id}.route_connection_info",
489+
json.dumps(connection_info),
490+
expiry=ExpirySet(ExpiryType.SEC, 3600),
491+
)
492+
pipe.set(
493+
f"endpoint.{endpoint_id}.health_check_enabled",
494+
"true" if health_check_config is not None else "false",
495+
expiry=ExpirySet(ExpiryType.SEC, 3600),
496+
)
497+
if health_check_config:
498+
pipe.set(
499+
f"endpoint.{endpoint_id}.health_check_config",
500+
health_check_config.model_dump_json(),
501+
expiry=ExpirySet(ExpiryType.SEC, 3600),
502+
)
503+
await self._client.client.exec(pipe, True)
504+
476505
@valkey_decorator()
477506
async def delete_key(self, key: str) -> int:
478507
"""

src/ai/backend/manager/models/endpoint.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
from .image import ImageRow
7171
from .routing import RouteStatus
7272
from .scaling_group import scaling_groups
73+
from .session import SessionStatus
7374
from .user import UserRow
7475
from .vfolder import prepare_vfolder_mounts
7576

@@ -543,7 +544,9 @@ async def generate_redis_route_info(
543544
[
544545
r.session
545546
for r in active_routes
546-
if r.status in RouteStatus.active_route_statuses() and r.session
547+
if r.status in RouteStatus.active_route_statuses()
548+
and r.session
549+
and r.session_row.status == SessionStatus.RUNNING
547550
],
548551
)
549552
session_id_to_route_map = {r.session: r for r in active_routes}

src/ai/backend/manager/registry.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3649,7 +3649,8 @@ async def create_appproxy_endpoint(
36493649

36503650
model = await VFolderRow.get(db_sess, endpoint.model)
36513651

3652-
health_check_information = await self.get_health_check_info(endpoint, model)
3652+
health_check_config = await self.get_health_check_info(endpoint, model)
3653+
36533654
request_body = {
36543655
"version": "v2",
36553656
"service_name": endpoint.name,
@@ -3666,8 +3667,8 @@ async def create_appproxy_endpoint(
36663667
},
36673668
},
36683669
"open_to_public": endpoint.open_to_public,
3669-
"health_check": health_check_information.model_dump(mode="json")
3670-
if health_check_information
3670+
"health_check": health_check_config.model_dump(mode="json")
3671+
if health_check_config
36713672
else None,
36723673
}
36733674
endpoint_json = await wsproxy_client.create_endpoint(endpoint.id, request_body)
@@ -3692,12 +3693,13 @@ async def notify_endpoint_route_update_to_appproxy(self, endpoint_id: uuid.UUID)
36923693
async with self.db.begin_readonly_session() as db_sess:
36933694
endpoint = await EndpointRow.get(db_sess, endpoint_id)
36943695
connection_info = await endpoint.generate_redis_route_info(db_sess)
3696+
model = await VFolderRow.get(db_sess, endpoint.model)
36953697

3696-
await self.valkey_live.delete_key(f"endpoint.{endpoint.id}.route_connection_info")
3697-
await self.valkey_live.store_live_data(
3698-
f"endpoint.{endpoint.id}.route_connection_info",
3699-
json.dumps(connection_info),
3700-
ex=3600,
3698+
health_check_config = await self.get_health_check_info(endpoint, model)
3699+
await self.valkey_live.update_appproxy_redis_info(
3700+
endpoint.id,
3701+
connection_info,
3702+
health_check_config,
37013703
)
37023704

37033705
await self.event_producer.anycast_event(EndpointRouteListUpdatedEvent(endpoint.id))

0 commit comments

Comments
 (0)