diff --git a/ddtrace/internal/uds.py b/ddtrace/internal/uds.py index 733bf27b197..715bad025cf 100644 --- a/ddtrace/internal/uds.py +++ b/ddtrace/internal/uds.py @@ -2,9 +2,13 @@ import socket from typing import Any # noqa:F401 +from .constants import DEFAULT_TIMEOUT from .http import HTTPConnectionMixin +_GLOBAL_DEFAULT_TIMEOUT = getattr(socket, "_GLOBAL_DEFAULT_TIMEOUT", None) + + class UDSHTTPConnection(HTTPConnectionMixin, httplib.HTTPConnection): """An HTTP connection established over a Unix Domain Socket.""" @@ -23,5 +27,11 @@ def __init__( def connect(self): # type: () -> None sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + timeout = self.timeout + if timeout is _GLOBAL_DEFAULT_TIMEOUT: + timeout = socket.getdefaulttimeout() + if timeout is None: + timeout = DEFAULT_TIMEOUT + sock.settimeout(timeout) sock.connect(self.path) self.sock = sock diff --git a/releasenotes/notes/uds-socket-timeout-c1c2055f1b661827.yaml b/releasenotes/notes/uds-socket-timeout-c1c2055f1b661827.yaml new file mode 100644 index 00000000000..05546394e5d --- /dev/null +++ b/releasenotes/notes/uds-socket-timeout-c1c2055f1b661827.yaml @@ -0,0 +1,4 @@ +--- +fixes: + - | + Add a timeout to Unix socket connections to prevent thread I/O hangs during pre-fork shutdown. diff --git a/tests/tracer/test_writer.py b/tests/tracer/test_writer.py index c0a4fa00774..5c46efc046e 100644 --- a/tests/tracer/test_writer.py +++ b/tests/tracer/test_writer.py @@ -969,6 +969,73 @@ def test_flush_connection_timeout(endpoint_test_timeout_server, writer_class): writer.flush_queue(raise_exc=True) +def test_periodic_thread_uds_callback_unblocks_with_timeout(): + import os + import tempfile + + from ddtrace.internal._threads import PeriodicThread + from ddtrace.internal.uds import UDSHTTPConnection + + sock_dir = tempfile.mkdtemp(prefix="ddtrace-uds-fork-repro-") + sock_path = os.path.join(sock_dir, "blackhole.sock") + server_ready = threading.Event() + server_stop = threading.Event() + callback_started = threading.Event() + callback_done = threading.Event() + + def _blackhole_server(): + srv = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + conn = None + try: + srv.bind(sock_path) + srv.listen(1) + server_ready.set() + conn, _ = srv.accept() + # Keep the connection open and never send a response so the client + # can only make progress via its socket timeout. + while not server_stop.wait(0.05): + pass + finally: + if conn is not None: + conn.close() + srv.close() + + def _callback(): + callback_started.set() + conn = UDSHTTPConnection(sock_path, "localhost", 80) + try: + conn.request("GET", "/") + conn.getresponse().read() + except Exception: + # Timeout (or an equivalent network error path) is expected. + pass + finally: + try: + conn.close() + except Exception: + pass + callback_done.set() + + server_thread = threading.Thread(target=_blackhole_server, daemon=True) + server_thread.start() + assert server_ready.wait(timeout=2) + + worker = PeriodicThread(interval=60.0, target=_callback, name="repro:UDSForkTimeout", no_wait_at_start=True) + worker.start() + assert callback_started.wait(timeout=2) + + try: + assert callback_done.wait(timeout=5) + finally: + server_stop.set() + worker.stop() + worker.join(timeout=1.0) + server_thread.join(timeout=1.0) + if os.path.exists(sock_path): + os.unlink(sock_path) + os.rmdir(sock_dir) + + @pytest.mark.parametrize("writer_class", (AgentWriter, CIVisibilityWriter, NativeWriter)) def test_flush_connection_reset(endpoint_test_reset_server, writer_class): with (