Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions ddtrace/internal/uds.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@
import socket
from typing import Any # noqa:F401

from .constants import DEFAULT_TIMEOUT
from .http import HTTPConnectionMixin


_GLOBAL_DEFAULT_TIMEOUT = getattr(socket, "_GLOBAL_DEFAULT_TIMEOUT", None)


class UDSHTTPConnection(HTTPConnectionMixin, httplib.HTTPConnection):
"""An HTTP connection established over a Unix Domain Socket."""

Expand All @@ -23,5 +27,11 @@ def __init__(
def connect(self):
# type: () -> None
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
timeout = self.timeout
if timeout is _GLOBAL_DEFAULT_TIMEOUT:
timeout = socket.getdefaulttimeout()
if timeout is None:
timeout = DEFAULT_TIMEOUT
sock.settimeout(timeout)
sock.connect(self.path)
self.sock = sock
4 changes: 4 additions & 0 deletions releasenotes/notes/uds-socket-timeout-c1c2055f1b661827.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
fixes:
- |
Add a timeout to Unix socket connections to prevent thread I/O hangs during pre-fork shutdown.
6 changes: 3 additions & 3 deletions tests/llmobs/test_llmobs_span_agentless_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,6 @@ def test_send_on_exit(run_python_code_in_subprocess):
)
assert status == 0, err
assert out == b""
assert b"got response code 403" in err
# Backend may return "API key is invalid" or "API key is missing"
assert b'"status":"403"' in err and b'"title":"Forbidden"' in err
assert b"403" in err
assert b"Forbidden" in err
assert b"errors" in err
67 changes: 67 additions & 0 deletions tests/tracer/test_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -969,6 +969,73 @@ def test_flush_connection_timeout(endpoint_test_timeout_server, writer_class):
writer.flush_queue(raise_exc=True)


def test_periodic_thread_uds_callback_unblocks_with_timeout():
import os
import tempfile

from ddtrace.internal._threads import PeriodicThread
from ddtrace.internal.uds import UDSHTTPConnection

sock_dir = tempfile.mkdtemp(prefix="ddtrace-uds-fork-repro-")
sock_path = os.path.join(sock_dir, "blackhole.sock")
server_ready = threading.Event()
server_stop = threading.Event()
callback_started = threading.Event()
callback_done = threading.Event()

def _blackhole_server():
srv = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
conn = None
try:
srv.bind(sock_path)
srv.listen(1)
server_ready.set()
conn, _ = srv.accept()
# Keep the connection open and never send a response so the client
# can only make progress via its socket timeout.
while not server_stop.wait(0.05):
pass
finally:
if conn is not None:
conn.close()
srv.close()

def _callback():
callback_started.set()
conn = UDSHTTPConnection(sock_path, "localhost", 80)
try:
conn.request("GET", "/")
conn.getresponse().read()
except Exception:
# Timeout (or an equivalent network error path) is expected.
pass
finally:
try:
conn.close()
except Exception:
pass
callback_done.set()

server_thread = threading.Thread(target=_blackhole_server, daemon=True)
server_thread.start()
assert server_ready.wait(timeout=2)

worker = PeriodicThread(interval=60.0, target=_callback, name="repro:UDSForkTimeout", no_wait_at_start=True)
worker.start()
assert callback_started.wait(timeout=2)

try:
assert callback_done.wait(timeout=5)
finally:
server_stop.set()
worker.stop()
worker.join(timeout=1.0)
server_thread.join(timeout=1.0)
if os.path.exists(sock_path):
os.unlink(sock_path)
os.rmdir(sock_dir)


@pytest.mark.parametrize("writer_class", (AgentWriter, CIVisibilityWriter, NativeWriter))
def test_flush_connection_reset(endpoint_test_reset_server, writer_class):
with (
Expand Down