@@ -969,6 +969,73 @@ def test_flush_connection_timeout(endpoint_test_timeout_server, writer_class):
969969 writer .flush_queue (raise_exc = True )
970970
971971
972+ def test_periodic_thread_uds_callback_unblocks_with_timeout ():
973+ import os
974+ import tempfile
975+
976+ from ddtrace .internal ._threads import PeriodicThread
977+ from ddtrace .internal .uds import UDSHTTPConnection
978+
979+ sock_dir = tempfile .mkdtemp (prefix = "ddtrace-uds-fork-repro-" )
980+ sock_path = os .path .join (sock_dir , "blackhole.sock" )
981+ server_ready = threading .Event ()
982+ server_stop = threading .Event ()
983+ callback_started = threading .Event ()
984+ callback_done = threading .Event ()
985+
986+ def _blackhole_server ():
987+ srv = socket .socket (socket .AF_UNIX , socket .SOCK_STREAM )
988+ conn = None
989+ try :
990+ srv .bind (sock_path )
991+ srv .listen (1 )
992+ server_ready .set ()
993+ conn , _ = srv .accept ()
994+ # Keep the connection open and never send a response so the client
995+ # can only make progress via its socket timeout.
996+ while not server_stop .wait (0.05 ):
997+ pass
998+ finally :
999+ if conn is not None :
1000+ conn .close ()
1001+ srv .close ()
1002+
1003+ def _callback ():
1004+ callback_started .set ()
1005+ conn = UDSHTTPConnection (sock_path , "localhost" , 80 )
1006+ try :
1007+ conn .request ("GET" , "/" )
1008+ conn .getresponse ().read ()
1009+ except Exception :
1010+ # Timeout (or an equivalent network error path) is expected.
1011+ pass
1012+ finally :
1013+ try :
1014+ conn .close ()
1015+ except Exception :
1016+ pass
1017+ callback_done .set ()
1018+
1019+ server_thread = threading .Thread (target = _blackhole_server , daemon = True )
1020+ server_thread .start ()
1021+ assert server_ready .wait (timeout = 2 )
1022+
1023+ worker = PeriodicThread (interval = 60.0 , target = _callback , name = "repro:UDSForkTimeout" , no_wait_at_start = True )
1024+ worker .start ()
1025+ assert callback_started .wait (timeout = 2 )
1026+
1027+ try :
1028+ assert callback_done .wait (timeout = 5 )
1029+ finally :
1030+ server_stop .set ()
1031+ worker .stop ()
1032+ worker .join (timeout = 1.0 )
1033+ server_thread .join (timeout = 1.0 )
1034+ if os .path .exists (sock_path ):
1035+ os .unlink (sock_path )
1036+ os .rmdir (sock_dir )
1037+
1038+
9721039@pytest .mark .parametrize ("writer_class" , (AgentWriter , CIVisibilityWriter , NativeWriter ))
9731040def test_flush_connection_reset (endpoint_test_reset_server , writer_class ):
9741041 with (
0 commit comments