From 301346ab4d1eee108529d3a7fac933c910d116a9 Mon Sep 17 00:00:00 2001 From: Dylan Russell Date: Mon, 12 May 2025 19:53:59 +0000 Subject: [PATCH 01/10] Update `BatchSpanProcessor` to use new `BatchProcessor` class --- CHANGELOG.md | 2 +- .../sdk/trace/export/__init__.py | 244 +----------- .../shared_internal/test_batch_processor.py | 41 +- .../tests/trace/export/test_export.py | 370 ++---------------- 4 files changed, 82 insertions(+), 575 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 85ab5a84852..d247aadff22 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,7 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased -- Refactor `BatchLogRecordProcessor` to simplify code and make the control flow more +- Refactor `BatchLogRecordProcessor` and `BatchSpanProcessor` to simplify code and make the control flow more clear ([#4562](https://github.com/open-telemetry/opentelemetry-python/pull/4562/) and [#4535](https://github.com/open-telemetry/opentelemetry-python/pull/4535)). diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py index 9e60d6cff9b..c02dc8a227e 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py @@ -13,16 +13,11 @@ # limitations under the License. from __future__ import annotations -import collections import logging -import os import sys -import threading import typing -import weakref from enum import Enum from os import environ, linesep -from time import time_ns from opentelemetry.context import ( _SUPPRESS_INSTRUMENTATION_KEY, @@ -31,6 +26,7 @@ detach, set_value, ) +from opentelemetry.sdk._shared_internal import BatchProcessor from opentelemetry.sdk.environment_variables import ( OTEL_BSP_EXPORT_TIMEOUT, OTEL_BSP_MAX_EXPORT_BATCH_SIZE, @@ -38,7 +34,6 @@ OTEL_BSP_SCHEDULE_DELAY, ) from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor -from opentelemetry.util._once import Once _DEFAULT_SCHEDULE_DELAY_MILLIS = 5000 _DEFAULT_MAX_EXPORT_BATCH_SIZE = 512 @@ -125,19 +120,6 @@ def force_flush(self, timeout_millis: int = 30000) -> bool: return True -class _FlushRequest: - """Represents a request for the BatchSpanProcessor to flush spans.""" - - __slots__ = ["event", "num_spans"] - - def __init__(self): - self.event = threading.Event() - self.num_spans = 0 - - -_BSP_RESET_ONCE = Once() - - class BatchSpanProcessor(SpanProcessor): """Batch span processor implementation. @@ -151,6 +133,8 @@ class BatchSpanProcessor(SpanProcessor): - :envvar:`OTEL_BSP_MAX_QUEUE_SIZE` - :envvar:`OTEL_BSP_MAX_EXPORT_BATCH_SIZE` - :envvar:`OTEL_BSP_EXPORT_TIMEOUT` + + All the logic for emitting spans, shutting down etc. resides in the BatchProcessor class. """ def __init__( @@ -174,6 +158,7 @@ def __init__( BatchSpanProcessor._default_max_export_batch_size() ) + # Not used. No way currently to pass timeout to export. if export_timeout_millis is None: export_timeout_millis = ( BatchSpanProcessor._default_export_timeout_millis() @@ -183,27 +168,14 @@ def __init__( max_queue_size, schedule_delay_millis, max_export_batch_size ) - self.span_exporter = span_exporter - self.queue = collections.deque([], max_queue_size) # type: typing.Deque[Span] - self.worker_thread = threading.Thread( - name="OtelBatchSpanProcessor", target=self.worker, daemon=True + self._batch_processor = BatchProcessor( + span_exporter, + schedule_delay_millis, + max_export_batch_size, + export_timeout_millis, + max_queue_size, + "Span", ) - self.condition = threading.Condition(threading.Lock()) - self._flush_request = None # type: typing.Optional[_FlushRequest] - self.schedule_delay_millis = schedule_delay_millis - self.max_export_batch_size = max_export_batch_size - self.max_queue_size = max_queue_size - self.export_timeout_millis = export_timeout_millis - self.done = False - # flag that indicates that spans are being dropped - self._spans_dropped = False - # precallocated list to send spans to exporter - self.spans_list = [None] * self.max_export_batch_size # type: typing.List[typing.Optional[Span]] - self.worker_thread.start() - if hasattr(os, "register_at_fork"): - weak_reinit = weakref.WeakMethod(self._at_fork_reinit) - os.register_at_fork(after_in_child=lambda: weak_reinit()()) # pylint: disable=unnecessary-lambda - self._pid = os.getpid() def on_start( self, span: Span, parent_context: Context | None = None @@ -211,199 +183,15 @@ def on_start( pass def on_end(self, span: ReadableSpan) -> None: - if self.done: - logger.warning("Already shutdown, dropping span.") - return if not span.context.trace_flags.sampled: return - if self._pid != os.getpid(): - _BSP_RESET_ONCE.do_once(self._at_fork_reinit) - - if len(self.queue) == self.max_queue_size: - if not self._spans_dropped: - logger.warning("Queue is full, likely spans will be dropped.") - self._spans_dropped = True - - self.queue.appendleft(span) - - if len(self.queue) >= self.max_export_batch_size: - with self.condition: - self.condition.notify() - - def _at_fork_reinit(self): - self.condition = threading.Condition(threading.Lock()) - self.queue.clear() - - # worker_thread is local to a process, only the thread that issued fork continues - # to exist. A new worker thread must be started in child process. - self.worker_thread = threading.Thread( - name="OtelBatchSpanProcessor", target=self.worker, daemon=True - ) - self.worker_thread.start() - self._pid = os.getpid() - - def worker(self): - timeout = self.schedule_delay_millis / 1e3 - flush_request = None # type: typing.Optional[_FlushRequest] - while not self.done: - with self.condition: - if self.done: - # done flag may have changed, avoid waiting - break - flush_request = self._get_and_unset_flush_request() - if ( - len(self.queue) < self.max_export_batch_size - and flush_request is None - ): - self.condition.wait(timeout) - flush_request = self._get_and_unset_flush_request() - if not self.queue: - # spurious notification, let's wait again, reset timeout - timeout = self.schedule_delay_millis / 1e3 - self._notify_flush_request_finished(flush_request) - flush_request = None - continue - if self.done: - # missing spans will be sent when calling flush - break - - # subtract the duration of this export call to the next timeout - start = time_ns() - self._export(flush_request) - end = time_ns() - duration = (end - start) / 1e9 - timeout = self.schedule_delay_millis / 1e3 - duration - - self._notify_flush_request_finished(flush_request) - flush_request = None - - # there might have been a new flush request while export was running - # and before the done flag switched to true - with self.condition: - shutdown_flush_request = self._get_and_unset_flush_request() - - # be sure that all spans are sent - self._drain_queue() - self._notify_flush_request_finished(flush_request) - self._notify_flush_request_finished(shutdown_flush_request) - - def _get_and_unset_flush_request( - self, - ) -> typing.Optional[_FlushRequest]: - """Returns the current flush request and makes it invisible to the - worker thread for subsequent calls. - """ - flush_request = self._flush_request - self._flush_request = None - if flush_request is not None: - flush_request.num_spans = len(self.queue) - return flush_request - - @staticmethod - def _notify_flush_request_finished( - flush_request: typing.Optional[_FlushRequest], - ): - """Notifies the flush initiator(s) waiting on the given request/event - that the flush operation was finished. - """ - if flush_request is not None: - flush_request.event.set() - - def _get_or_create_flush_request(self) -> _FlushRequest: - """Either returns the current active flush event or creates a new one. + self._batch_processor.emit(span) - The flush event will be visible and read by the worker thread before an - export operation starts. Callers of a flush operation may wait on the - returned event to be notified when the flush/export operation was - finished. + def shutdown(self): + return self._batch_processor.shutdown() - This method is not thread-safe, i.e. callers need to take care about - synchronization/locking. - """ - if self._flush_request is None: - self._flush_request = _FlushRequest() - return self._flush_request - - def _export(self, flush_request: typing.Optional[_FlushRequest]): - """Exports spans considering the given flush_request. - - In case of a given flush_requests spans are exported in batches until - the number of exported spans reached or exceeded the number of spans in - the flush request. - In no flush_request was given at most max_export_batch_size spans are - exported. - """ - if not flush_request: - self._export_batch() - return - - num_spans = flush_request.num_spans - while self.queue: - num_exported = self._export_batch() - num_spans -= num_exported - - if num_spans <= 0: - break - - def _export_batch(self) -> int: - """Exports at most max_export_batch_size spans and returns the number of - exported spans. - """ - idx = 0 - # currently only a single thread acts as consumer, so queue.pop() will - # not raise an exception - while idx < self.max_export_batch_size and self.queue: - self.spans_list[idx] = self.queue.pop() - idx += 1 - token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) - try: - # Ignore type b/c the Optional[None]+slicing is too "clever" - # for mypy - self.span_exporter.export(self.spans_list[:idx]) # type: ignore - except Exception: # pylint: disable=broad-exception-caught - logger.exception("Exception while exporting Span batch.") - detach(token) - - # clean up list - for index in range(idx): - self.spans_list[index] = None - return idx - - def _drain_queue(self): - """Export all elements until queue is empty. - - Can only be called from the worker thread context because it invokes - `export` that is not thread safe. - """ - while self.queue: - self._export_batch() - - def force_flush(self, timeout_millis: int | None = None) -> bool: - if timeout_millis is None: - timeout_millis = self.export_timeout_millis - - if self.done: - logger.warning("Already shutdown, ignoring call to force_flush().") - return True - - with self.condition: - flush_request = self._get_or_create_flush_request() - # signal the worker thread to flush and wait for it to finish - self.condition.notify_all() - - # wait for token to be processed - ret = flush_request.event.wait(timeout_millis / 1e3) - if not ret: - logger.warning("Timeout was exceeded in force_flush().") - return ret - - def shutdown(self) -> None: - # signal the worker thread to finish and then wait for it - self.done = True - with self.condition: - self.condition.notify_all() - self.worker_thread.join() - self.span_exporter.shutdown() + def force_flush(self, timeout_millis: typing.Optional[int] = None): + return self._batch_processor.force_flush(timeout_millis) @staticmethod def _default_max_queue_size(): diff --git a/opentelemetry-sdk/tests/shared_internal/test_batch_processor.py b/opentelemetry-sdk/tests/shared_internal/test_batch_processor.py index a48964b30b5..7d4e35b4eff 100644 --- a/opentelemetry-sdk/tests/shared_internal/test_batch_processor.py +++ b/opentelemetry-sdk/tests/shared_internal/test_batch_processor.py @@ -34,6 +34,8 @@ from opentelemetry.sdk._logs.export import ( BatchLogRecordProcessor, ) +from opentelemetry.sdk.trace import ReadableSpan +from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.sdk.util.instrumentation import InstrumentationScope EMPTY_LOG = LogData( @@ -41,11 +43,18 @@ instrumentation_scope=InstrumentationScope("example", "example"), ) +BASIC_SPAN = ReadableSpan( + "MySpan", + instrumentation_scope=InstrumentationScope("example", "example"), +) + +multiprocessing.set_start_method("fork") -# BatchLogRecodpRocessor initializes / uses BatchProcessor. + +# BatchLogRecodProcessor/BatchSpanProcessor initialize and use BatchProcessor. @pytest.mark.parametrize( "batch_processor_class,telemetry", - [(BatchLogRecordProcessor, EMPTY_LOG)], + [(BatchLogRecordProcessor, EMPTY_LOG), (BatchSpanProcessor, BASIC_SPAN)], ) class TestBatchProcessor: # pylint: disable=no-self-use @@ -54,7 +63,7 @@ def test_telemetry_exported_once_batch_size_reached( ): exporter = Mock() batch_processor = batch_processor_class( - exporter=exporter, + exporter, max_queue_size=15, max_export_batch_size=15, # Will not reach this during the test, this sleep should be interrupted when batch size is reached. @@ -63,7 +72,7 @@ def test_telemetry_exported_once_batch_size_reached( ) before_export = time.time_ns() for _ in range(15): - batch_processor.emit(telemetry) + batch_processor._batch_processor.emit(telemetry) # Wait a bit for the worker thread to wake up and call export. time.sleep(0.1) exporter.export.assert_called_once() @@ -77,13 +86,13 @@ def test_telemetry_exported_once_schedule_delay_reached( ): exporter = Mock() batch_processor = batch_processor_class( - exporter=exporter, + exporter, max_queue_size=15, max_export_batch_size=15, schedule_delay_millis=100, export_timeout_millis=500, ) - batch_processor.emit(telemetry) + batch_processor._batch_processor.emit(telemetry) time.sleep(0.2) exporter.export.assert_called_once_with([telemetry]) @@ -92,7 +101,7 @@ def test_telemetry_flushed_before_shutdown_and_dropped_after_shutdown( ): exporter = Mock() batch_processor = batch_processor_class( - exporter=exporter, + exporter, # Neither of these thresholds should be hit before test ends. max_queue_size=15, max_export_batch_size=15, @@ -100,13 +109,13 @@ def test_telemetry_flushed_before_shutdown_and_dropped_after_shutdown( export_timeout_millis=500, ) # This log should be flushed because it was written before shutdown. - batch_processor.emit(telemetry) + batch_processor._batch_processor.emit(telemetry) batch_processor.shutdown() exporter.export.assert_called_once_with([telemetry]) assert batch_processor._batch_processor._shutdown is True # This should not be flushed. - batch_processor.emit(telemetry) + batch_processor._batch_processor.emit(telemetry) assert len(caplog.records) == 1 assert "Shutdown called, ignoring" in caplog.text exporter.export.assert_called_once() @@ -117,7 +126,7 @@ def test_force_flush_flushes_telemetry( ): exporter = Mock() batch_processor = batch_processor_class( - exporter=exporter, + exporter, # Neither of these thresholds should be hit before test ends. max_queue_size=15, max_export_batch_size=15, @@ -125,7 +134,7 @@ def test_force_flush_flushes_telemetry( export_timeout_millis=500, ) for _ in range(10): - batch_processor.emit(telemetry) + batch_processor._batch_processor.emit(telemetry) batch_processor.force_flush() exporter.export.assert_called_once_with([telemetry for _ in range(10)]) @@ -136,7 +145,7 @@ def test_force_flush_flushes_telemetry( def test_with_multiple_threads(self, batch_processor_class, telemetry): exporter = Mock() batch_processor = batch_processor_class( - exporter=exporter, + exporter, max_queue_size=3000, max_export_batch_size=1000, schedule_delay_millis=30000, @@ -145,7 +154,7 @@ def test_with_multiple_threads(self, batch_processor_class, telemetry): def bulk_emit_and_flush(num_emit): for _ in range(num_emit): - batch_processor.emit(telemetry) + batch_processor._batch_processor.emit(telemetry) batch_processor.force_flush() with ThreadPoolExecutor(max_workers=69) as executor: @@ -175,13 +184,11 @@ def test_batch_telemetry_record_processor_fork( # _at_fork_reinit should be called in the child process, to # clear the logs/spans in the child process. for _ in range(9): - batch_processor.emit(telemetry) - - multiprocessing.set_start_method("fork") + batch_processor._batch_processor.emit(telemetry) def child(conn): for _ in range(100): - batch_processor.emit(telemetry) + batch_processor._batch_processor.emit(telemetry) batch_processor.force_flush() # Expect force flush to export 10 batches of max export batch size (10) diff --git a/opentelemetry-sdk/tests/trace/export/test_export.py b/opentelemetry-sdk/tests/trace/export/test_export.py index a6d9c36875b..61a4f1c330a 100644 --- a/opentelemetry-sdk/tests/trace/export/test_export.py +++ b/opentelemetry-sdk/tests/trace/export/test_export.py @@ -12,20 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -import gc -import multiprocessing + import os import threading import time import unittest -import weakref -from concurrent.futures import ThreadPoolExecutor -from logging import WARNING -from platform import python_implementation, system from unittest import mock -from pytest import mark - from opentelemetry import trace as trace_api from opentelemetry.context import Context from opentelemetry.sdk import trace @@ -35,13 +28,10 @@ OTEL_BSP_MAX_QUEUE_SIZE, OTEL_BSP_SCHEDULE_DELAY, ) -from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import export from opentelemetry.sdk.trace.export import logger -from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( - InMemorySpanExporter, -) -from opentelemetry.test.concurrency_test import ConcurrencyTestBase + +# pylint: disable=protected-access class MySpanExporter(export.SpanExporter): @@ -154,23 +144,7 @@ def test_simple_span_processor_not_sampled(self): self.assertListEqual([], spans_names_list) -def _create_start_and_end_span(name, span_processor, resource): - span = trace._Span( - name, - trace_api.SpanContext( - 0xDEADBEEF, - 0xDEADBEEF, - is_remote=False, - trace_flags=trace_api.TraceFlags(trace_api.TraceFlags.SAMPLED), - ), - span_processor=span_processor, - resource=resource, - ) - span.start() - span.end() - - -class TestBatchSpanProcessor(ConcurrencyTestBase): +class TestBatchSpanProcessor(unittest.TestCase): @mock.patch.dict( "os.environ", { @@ -185,20 +159,36 @@ def test_args_env_var(self): MySpanExporter(destination=[]) ) - self.assertEqual(batch_span_processor.max_queue_size, 10) - self.assertEqual(batch_span_processor.schedule_delay_millis, 2) - self.assertEqual(batch_span_processor.max_export_batch_size, 3) - self.assertEqual(batch_span_processor.export_timeout_millis, 4) + self.assertEqual( + batch_span_processor._batch_processor._max_queue_size, 10 + ) + self.assertEqual( + batch_span_processor._batch_processor._schedule_delay_millis, 2 + ) + self.assertEqual( + batch_span_processor._batch_processor._max_export_batch_size, 3 + ) + self.assertEqual( + batch_span_processor._batch_processor._export_timeout_millis, 4 + ) def test_args_env_var_defaults(self): batch_span_processor = export.BatchSpanProcessor( MySpanExporter(destination=[]) ) - self.assertEqual(batch_span_processor.max_queue_size, 2048) - self.assertEqual(batch_span_processor.schedule_delay_millis, 5000) - self.assertEqual(batch_span_processor.max_export_batch_size, 512) - self.assertEqual(batch_span_processor.export_timeout_millis, 30000) + self.assertEqual( + batch_span_processor._batch_processor._max_queue_size, 2048 + ) + self.assertEqual( + batch_span_processor._batch_processor._schedule_delay_millis, 5000 + ) + self.assertEqual( + batch_span_processor._batch_processor._max_export_batch_size, 512 + ) + self.assertEqual( + batch_span_processor._batch_processor._export_timeout_millis, 30000 + ) @mock.patch.dict( "os.environ", @@ -216,10 +206,18 @@ def test_args_env_var_value_error(self): ) logger.disabled = False - self.assertEqual(batch_span_processor.max_queue_size, 2048) - self.assertEqual(batch_span_processor.schedule_delay_millis, 5000) - self.assertEqual(batch_span_processor.max_export_batch_size, 512) - self.assertEqual(batch_span_processor.export_timeout_millis, 30000) + self.assertEqual( + batch_span_processor._batch_processor._max_queue_size, 2048 + ) + self.assertEqual( + batch_span_processor._batch_processor._schedule_delay_millis, 5000 + ) + self.assertEqual( + batch_span_processor._batch_processor._max_export_batch_size, 512 + ) + self.assertEqual( + batch_span_processor._batch_processor._export_timeout_millis, 30000 + ) def test_on_start_accepts_parent_context(self): # pylint: disable=no-self-use @@ -238,149 +236,6 @@ def test_on_start_accepts_parent_context(self): span, parent_context=context ) - def test_shutdown(self): - spans_names_list = [] - - my_exporter = MySpanExporter(destination=spans_names_list) - span_processor = export.BatchSpanProcessor(my_exporter) - - span_names = ["xxx", "bar", "foo"] - - resource = Resource.create({}) - for name in span_names: - _create_start_and_end_span(name, span_processor, resource) - - span_processor.shutdown() - self.assertTrue(my_exporter.is_shutdown) - - # check that spans are exported without an explicitly call to - # force_flush() - self.assertListEqual(span_names, spans_names_list) - - def test_flush(self): - spans_names_list = [] - - my_exporter = MySpanExporter(destination=spans_names_list) - span_processor = export.BatchSpanProcessor(my_exporter) - - span_names0 = ["xxx", "bar", "foo"] - span_names1 = ["yyy", "baz", "fox"] - - resource = Resource.create({}) - for name in span_names0: - _create_start_and_end_span(name, span_processor, resource) - - self.assertTrue(span_processor.force_flush()) - self.assertListEqual(span_names0, spans_names_list) - - # create some more spans to check that span processor still works - for name in span_names1: - _create_start_and_end_span(name, span_processor, resource) - - self.assertTrue(span_processor.force_flush()) - self.assertListEqual(span_names0 + span_names1, spans_names_list) - - span_processor.shutdown() - - def test_flush_empty(self): - spans_names_list = [] - - my_exporter = MySpanExporter(destination=spans_names_list) - span_processor = export.BatchSpanProcessor(my_exporter) - - self.assertTrue(span_processor.force_flush()) - - def test_flush_from_multiple_threads(self): - num_threads = 50 - num_spans = 10 - - span_list = [] - - my_exporter = MySpanExporter(destination=span_list) - span_processor = export.BatchSpanProcessor( - my_exporter, max_queue_size=512, max_export_batch_size=128 - ) - - resource = Resource.create({}) - - def create_spans_and_flush(tno: int): - for span_idx in range(num_spans): - _create_start_and_end_span( - f"Span {tno}-{span_idx}", span_processor, resource - ) - self.assertTrue(span_processor.force_flush()) - - with ThreadPoolExecutor(max_workers=num_threads) as executor: - future_list = [] - for thread_no in range(num_threads): - future = executor.submit(create_spans_and_flush, thread_no) - future_list.append(future) - - executor.shutdown() - - self.assertEqual(num_threads * num_spans, len(span_list)) - - def test_flush_timeout(self): - spans_names_list = [] - - my_exporter = MySpanExporter( - destination=spans_names_list, export_timeout_millis=500 - ) - span_processor = export.BatchSpanProcessor(my_exporter) - - resource = Resource.create({}) - _create_start_and_end_span("foo", span_processor, resource) - - # check that the timeout is not meet - with self.assertLogs(level=WARNING): - self.assertFalse(span_processor.force_flush(100)) - span_processor.shutdown() - - def test_batch_span_processor_lossless(self): - """Test that no spans are lost when sending max_queue_size spans""" - spans_names_list = [] - - my_exporter = MySpanExporter( - destination=spans_names_list, max_export_batch_size=128 - ) - span_processor = export.BatchSpanProcessor( - my_exporter, max_queue_size=512, max_export_batch_size=128 - ) - - resource = Resource.create({}) - for _ in range(512): - _create_start_and_end_span("foo", span_processor, resource) - - time.sleep(1) - self.assertTrue(span_processor.force_flush()) - self.assertEqual(len(spans_names_list), 512) - span_processor.shutdown() - - def test_batch_span_processor_many_spans(self): - """Test that no spans are lost when sending many spans""" - spans_names_list = [] - - my_exporter = MySpanExporter( - destination=spans_names_list, max_export_batch_size=128 - ) - span_processor = export.BatchSpanProcessor( - my_exporter, - max_queue_size=256, - max_export_batch_size=64, - schedule_delay_millis=100, - ) - - resource = Resource.create({}) - for _ in range(4): - for _ in range(256): - _create_start_and_end_span("foo", span_processor, resource) - - time.sleep(0.1) # give some time for the exporter to upload spans - - self.assertTrue(span_processor.force_flush()) - self.assertEqual(len(spans_names_list), 1024) - span_processor.shutdown() - def test_batch_span_processor_not_sampled(self): tracer_provider = trace.TracerProvider( sampler=trace.sampling.ALWAYS_OFF @@ -402,136 +257,10 @@ def test_batch_span_processor_not_sampled(self): pass time.sleep(0.05) # give some time for the exporter to upload spans - self.assertTrue(span_processor.force_flush()) + span_processor.force_flush() self.assertEqual(len(spans_names_list), 0) span_processor.shutdown() - def _check_fork_trace(self, exporter, expected): - time.sleep(0.5) # give some time for the exporter to upload spans - spans = exporter.get_finished_spans() - for span in spans: - self.assertIn(span.name, expected) - - @unittest.skipUnless( - hasattr(os, "fork"), - "needs *nix", - ) - def test_batch_span_processor_fork(self): - # pylint: disable=invalid-name - tracer_provider = trace.TracerProvider() - tracer = tracer_provider.get_tracer(__name__) - - exporter = InMemorySpanExporter() - span_processor = export.BatchSpanProcessor( - exporter, - max_queue_size=256, - max_export_batch_size=64, - schedule_delay_millis=10, - ) - tracer_provider.add_span_processor(span_processor) - with tracer.start_as_current_span("foo"): - pass - time.sleep(0.5) # give some time for the exporter to upload spans - - self.assertTrue(span_processor.force_flush()) - self.assertEqual(len(exporter.get_finished_spans()), 1) - exporter.clear() - - def child(conn): - def _target(): - with tracer.start_as_current_span("span") as s: - s.set_attribute("i", "1") - with tracer.start_as_current_span("temp"): - pass - - self.run_with_many_threads(_target, 100) - - time.sleep(0.5) - - spans = exporter.get_finished_spans() - conn.send(len(spans) == 200) - conn.close() - - parent_conn, child_conn = multiprocessing.Pipe() - p = multiprocessing.Process(target=child, args=(child_conn,)) - p.start() - self.assertTrue(parent_conn.recv()) - p.join() - - span_processor.shutdown() - - @mark.skipif( - python_implementation() == "PyPy" or system() == "Windows", - reason="This test randomly fails with huge delta in Windows or PyPy", - ) - def test_batch_span_processor_scheduled_delay(self): - """Test that spans are exported each schedule_delay_millis""" - spans_names_list = [] - - export_event = threading.Event() - my_exporter = MySpanExporter( - destination=spans_names_list, export_event=export_event - ) - start_time = time.time() - span_processor = export.BatchSpanProcessor( - my_exporter, - schedule_delay_millis=500, - ) - - # create single span - resource = Resource.create({}) - _create_start_and_end_span("foo", span_processor, resource) - - self.assertTrue(export_event.wait(2)) - export_time = time.time() - self.assertEqual(len(spans_names_list), 1) - self.assertAlmostEqual((export_time - start_time) * 1e3, 500, delta=25) - - span_processor.shutdown() - - @mark.skipif( - python_implementation() == "PyPy" and system() == "Windows", - reason="This test randomly fails in Windows with PyPy", - ) - def test_batch_span_processor_reset_timeout(self): - """Test that the scheduled timeout is reset on cycles without spans""" - spans_names_list = [] - - export_event = threading.Event() - my_exporter = MySpanExporter( - destination=spans_names_list, - export_event=export_event, - export_timeout_millis=50, - ) - - span_processor = export.BatchSpanProcessor( - my_exporter, - schedule_delay_millis=50, - ) - - with mock.patch.object(span_processor.condition, "wait") as mock_wait: - resource = Resource.create({}) - _create_start_and_end_span("foo", span_processor, resource) - self.assertTrue(export_event.wait(2)) - - # give some time for exporter to loop - # since wait is mocked it should return immediately - time.sleep(0.1) - mock_wait_calls = list(mock_wait.mock_calls) - - # find the index of the call that processed the singular span - for idx, wait_call in enumerate(mock_wait_calls): - _, args, __ = wait_call - if args[0] <= 0: - after_calls = mock_wait_calls[idx + 1 :] - break - - self.assertTrue( - all(args[0] >= 0.05 for _, args, __ in after_calls) - ) - - span_processor.shutdown() - def test_batch_span_processor_parameters(self): # zero max_queue_size self.assertRaises( @@ -587,23 +316,6 @@ def test_batch_span_processor_parameters(self): max_export_batch_size=512, ) - def test_batch_span_processor_gc(self): - # Given a BatchSpanProcessor - exporter = MySpanExporter(destination=[]) - processor = export.BatchSpanProcessor(exporter) - weak_ref = weakref.ref(processor) - processor.shutdown() - - # When the processor is garbage collected - del processor - gc.collect() - - # Then the reference to the processor should no longer exist - self.assertIsNone( - weak_ref(), - "The BatchSpanProcessor object created by this test wasn't garbage collected", - ) - class TestConsoleSpanExporter(unittest.TestCase): def test_export(self): # pylint: disable=no-self-use From 4658bdae66af3309b7a96c71e3f05157a3ba01e1 Mon Sep 17 00:00:00 2001 From: Dylan Russell Date: Mon, 12 May 2025 19:57:18 +0000 Subject: [PATCH 02/10] Update changelog --- CHANGELOG.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d247aadff22..b1203b445b6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Refactor `BatchLogRecordProcessor` and `BatchSpanProcessor` to simplify code and make the control flow more clear ([#4562](https://github.com/open-telemetry/opentelemetry-python/pull/4562/) - and [#4535](https://github.com/open-telemetry/opentelemetry-python/pull/4535)). + [#4535](https://github.com/open-telemetry/opentelemetry-python/pull/4535), and + [#4580](https://github.com/open-telemetry/opentelemetry-python/pull/4580)). ## Version 1.33.0/0.54b0 (2025-05-09) From 4b0a4587d6ecc9744e42b747fa10a8f5ed5c6a43 Mon Sep 17 00:00:00 2001 From: Dylan Russell Date: Tue, 13 May 2025 14:47:41 +0000 Subject: [PATCH 03/10] fork does not exist on windows. --- .../tests/shared_internal/test_batch_processor.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/opentelemetry-sdk/tests/shared_internal/test_batch_processor.py b/opentelemetry-sdk/tests/shared_internal/test_batch_processor.py index 7d4e35b4eff..e1474e844da 100644 --- a/opentelemetry-sdk/tests/shared_internal/test_batch_processor.py +++ b/opentelemetry-sdk/tests/shared_internal/test_batch_processor.py @@ -48,7 +48,8 @@ instrumentation_scope=InstrumentationScope("example", "example"), ) -multiprocessing.set_start_method("fork") +if system() != "Windows": + multiprocessing.set_start_method("fork") # BatchLogRecodProcessor/BatchSpanProcessor initialize and use BatchProcessor. From b190d85e095b979860e8afb8dc399827a21530d6 Mon Sep 17 00:00:00 2001 From: Dylan Russell Date: Wed, 14 May 2025 15:18:52 +0000 Subject: [PATCH 04/10] Update force_flush to return a bool. Currently force_flush ignores it's timeout which is bad, but the behavior before made even less sense.. --- .../opentelemetry/sdk/_logs/_internal/export/__init__.py | 2 +- .../src/opentelemetry/sdk/_shared_internal/__init__.py | 6 ++++-- .../src/opentelemetry/sdk/trace/export/__init__.py | 4 ++-- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py index f748e2f1f9b..71cd2ed931b 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py @@ -192,7 +192,7 @@ def emit(self, log_data: LogData) -> None: def shutdown(self): return self._batch_processor.shutdown() - def force_flush(self, timeout_millis: Optional[int] = None): + def force_flush(self, timeout_millis: Optional[int] = None) -> bool: return self._batch_processor.force_flush(timeout_millis) @staticmethod diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py index 105abe466ae..d8548a1b564 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py @@ -191,8 +191,10 @@ def shutdown(self): self._worker_thread.join() self._exporter.shutdown() - def force_flush(self, timeout_millis: Optional[int] = None): + # TODO: Fix force flush so the timeout is used https://github.com/open-telemetry/opentelemetry-python/issues/4568. + def force_flush(self, timeout_millis: Optional[int] = None) -> bool: if self._shutdown: - return + return False # Blocking call to export. self._export(BatchExportStrategy.EXPORT_ALL) + return True diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py index c02dc8a227e..475a3501d11 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py @@ -169,7 +169,7 @@ def __init__( ) self._batch_processor = BatchProcessor( - span_exporter, + span_exporter, # type: ignore [reportArgumentType] schedule_delay_millis, max_export_batch_size, export_timeout_millis, @@ -190,7 +190,7 @@ def on_end(self, span: ReadableSpan) -> None: def shutdown(self): return self._batch_processor.shutdown() - def force_flush(self, timeout_millis: typing.Optional[int] = None): + def force_flush(self, timeout_millis: typing.Optional[int] = None) -> bool: return self._batch_processor.force_flush(timeout_millis) @staticmethod From 2eb9fd1463f6642bb154faaa0ed140a336206939 Mon Sep 17 00:00:00 2001 From: Dylan Russell Date: Wed, 14 May 2025 19:26:02 +0000 Subject: [PATCH 05/10] Fix changelog --- CHANGELOG.md | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 960e160a4cf..8cea07f82a2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,11 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased -- Refactor `BatchLogRecordProcessor` and `BatchSpanProcessor` to simplify code and make the control flow more - typecheck: add sdk/resources and drop mypy ([#4578](https://github.com/open-telemetry/opentelemetry-python/pull/4578)) -- Refactor `BatchLogRecordProcessor` to simplify code and make the control flow more - clear ([#4562](https://github.com/open-telemetry/opentelemetry-python/pull/4562/) +- Refactor `BatchLogRecordProcessor` and `BatchSpanProcessor` to simplify code + and make the control flow more clear ([#4562](https://github.com/open-telemetry/opentelemetry-python/pull/4562/) [#4535](https://github.com/open-telemetry/opentelemetry-python/pull/4535), and [#4580](https://github.com/open-telemetry/opentelemetry-python/pull/4580)). From af153d01d8abb04f621c683ae7ed25010c610ab3 Mon Sep 17 00:00:00 2001 From: Dylan Russell Date: Wed, 14 May 2025 20:34:19 +0000 Subject: [PATCH 06/10] Add backtic's around BatchProcessor --- .../src/opentelemetry/sdk/trace/export/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py index 475a3501d11..857446e63d5 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py @@ -134,7 +134,7 @@ class BatchSpanProcessor(SpanProcessor): - :envvar:`OTEL_BSP_MAX_EXPORT_BATCH_SIZE` - :envvar:`OTEL_BSP_EXPORT_TIMEOUT` - All the logic for emitting spans, shutting down etc. resides in the BatchProcessor class. + All the logic for emitting spans, shutting down etc. resides in the `BatchProcessor` class. """ def __init__( From 224ba646344be006f7a7b6ec6edcbbe2f825614b Mon Sep 17 00:00:00 2001 From: Dylan Russell Date: Fri, 16 May 2025 18:07:54 +0000 Subject: [PATCH 07/10] Require export get called by position only --- .../src/opentelemetry/sdk/_shared_internal/__init__.py | 2 +- .../src/opentelemetry/sdk/trace/export/__init__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py index d8548a1b564..d365cbd207c 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py @@ -48,7 +48,7 @@ class BatchExportStrategy(enum.Enum): class Exporter(Protocol[Telemetry]): @abstractmethod - def export(self, batch: list[Telemetry]): + def export(self, batch: list[Telemetry], /): raise NotImplementedError @abstractmethod diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py index 857446e63d5..3520c6a9f6a 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py @@ -169,7 +169,7 @@ def __init__( ) self._batch_processor = BatchProcessor( - span_exporter, # type: ignore [reportArgumentType] + span_exporter, schedule_delay_millis, max_export_batch_size, export_timeout_millis, From 09f08d2adb2a4ba63fd5fda01b5e73e27f30ab1a Mon Sep 17 00:00:00 2001 From: Dylan Russell Date: Tue, 20 May 2025 18:42:29 +0000 Subject: [PATCH 08/10] Add comment that there are additional tests for the BatchSpan/LogProcessor in the shared_internal directory. --- opentelemetry-sdk/tests/logs/test_export.py | 2 ++ opentelemetry-sdk/tests/trace/export/test_export.py | 2 ++ 2 files changed, 4 insertions(+) diff --git a/opentelemetry-sdk/tests/logs/test_export.py b/opentelemetry-sdk/tests/logs/test_export.py index 721bf7df665..99bdbe33c66 100644 --- a/opentelemetry-sdk/tests/logs/test_export.py +++ b/opentelemetry-sdk/tests/logs/test_export.py @@ -329,6 +329,8 @@ def test_simple_log_record_processor_different_msg_types_with_formatter( class TestBatchLogRecordProcessor(unittest.TestCase): + # Many more test cases for the BatchLogRecordProcessor exist under + # opentelemetry-sdk/tests/shared_internal/test_batch_processor.py. def test_emit_call_log_record(self): exporter = InMemoryLogExporter() log_record_processor = Mock(wraps=BatchLogRecordProcessor(exporter)) diff --git a/opentelemetry-sdk/tests/trace/export/test_export.py b/opentelemetry-sdk/tests/trace/export/test_export.py index 61a4f1c330a..77e28f9f989 100644 --- a/opentelemetry-sdk/tests/trace/export/test_export.py +++ b/opentelemetry-sdk/tests/trace/export/test_export.py @@ -145,6 +145,8 @@ def test_simple_span_processor_not_sampled(self): class TestBatchSpanProcessor(unittest.TestCase): + # Many more test cases for the BatchSpanProcessor exist under + # opentelemetry-sdk/tests/shared_internal/test_batch_processor.py. @mock.patch.dict( "os.environ", { From eb098ecd0a0870936d620650bc7aad3785e473f4 Mon Sep 17 00:00:00 2001 From: Aaron Abbott Date: Tue, 20 May 2025 22:16:44 +0000 Subject: [PATCH 09/10] Empty commit to bump From 0ead3d949755ebd154d73d402ed34e2afcc25c66 Mon Sep 17 00:00:00 2001 From: Dylan Russell Date: Wed, 21 May 2025 14:23:21 +0000 Subject: [PATCH 10/10] Fix broken test --- opentelemetry-sdk/tests/shared_internal/test_batch_processor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-sdk/tests/shared_internal/test_batch_processor.py b/opentelemetry-sdk/tests/shared_internal/test_batch_processor.py index a474f5bfb32..37d1e11a27e 100644 --- a/opentelemetry-sdk/tests/shared_internal/test_batch_processor.py +++ b/opentelemetry-sdk/tests/shared_internal/test_batch_processor.py @@ -116,7 +116,7 @@ def test_telemetry_flushed_before_shutdown_and_dropped_after_shutdown( assert batch_processor._batch_processor._shutdown is True # This should not be flushed. - batch_processor.emit(telemetry) + batch_processor._batch_processor.emit(telemetry) exporter.export.assert_called_once() # pylint: disable=no-self-use