forked from open-telemetry/opentelemetry-rust
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathruntime_tests.rs
More file actions
160 lines (142 loc) · 6.09 KB
/
runtime_tests.rs
File metadata and controls
160 lines (142 loc) · 6.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
// Note that all tests here should be marked as ignore so that it won't be picked up by default We
// need to run those tests one by one as the GlobalTracerProvider is a shared object between
// threads Use cargo test -- --ignored --test-threads=1 to run those tests.
#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))]
use crate::export::trace::{ExportResult, SpanExporter};
#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))]
use crate::runtime;
#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))]
use crate::runtime::RuntimeChannel;
#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))]
use futures_util::future::BoxFuture;
#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))]
use opentelemetry::global::*;
#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))]
use opentelemetry::trace::Tracer;
#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))]
use std::fmt::Debug;
#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))]
use std::sync::atomic::{AtomicUsize, Ordering};
#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))]
use std::sync::Arc;
#[derive(Debug)]
#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))]
struct SpanCountExporter {
span_count: Arc<AtomicUsize>,
}
#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))]
impl SpanExporter for SpanCountExporter {
fn export(
&mut self,
batch: Vec<crate::export::trace::SpanData>,
) -> BoxFuture<'static, ExportResult> {
self.span_count.fetch_add(batch.len(), Ordering::SeqCst);
Box::pin(async { Ok(()) })
}
}
#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))]
impl SpanCountExporter {
fn new() -> SpanCountExporter {
SpanCountExporter {
span_count: Arc::new(AtomicUsize::new(0)),
}
}
}
#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))]
fn build_batch_tracer_provider<R: RuntimeChannel>(
exporter: SpanCountExporter,
runtime: R,
) -> crate::trace::TracerProvider {
use crate::trace::TracerProvider;
TracerProvider::builder()
.with_batch_exporter(exporter, runtime)
.build()
}
#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))]
fn build_simple_tracer_provider(exporter: SpanCountExporter) -> crate::trace::TracerProvider {
use crate::trace::TracerProvider;
TracerProvider::builder()
.with_simple_exporter(exporter)
.build()
}
#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))]
async fn test_set_provider_in_tokio<R: RuntimeChannel>(
runtime: R,
) -> (Arc<AtomicUsize>, crate::trace::TracerProvider) {
let exporter = SpanCountExporter::new();
let span_count = exporter.span_count.clone();
let tracer_provider = build_batch_tracer_provider(exporter, runtime);
let _ = set_tracer_provider(tracer_provider.clone());
let tracer = tracer("opentelemetery");
tracer.in_span("test", |_cx| {});
(span_count, tracer_provider)
}
// When using `tokio::spawn` to spawn the worker task in batch processor
//
// multiple -> no shut down -> not export
// multiple -> shut down -> export
// single -> no shutdown -> not export
// single -> shutdown -> hang forever
// When using |fut| tokio::task::spawn_blocking(|| futures::executor::block_on(fut))
// to spawn the worker task in batch processor
//
// multiple -> no shutdown -> hang forever
// multiple -> shut down -> export
// single -> shut down -> export
// single -> no shutdown -> hang forever
// Test if the multiple thread tokio runtime could exit successfully when not force flushing spans
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[ignore = "requires --test-threads=1"]
#[cfg(feature = "rt-tokio")]
async fn test_set_provider_multiple_thread_tokio() {
let (span_count, _) = test_set_provider_in_tokio(runtime::Tokio).await;
assert_eq!(span_count.load(Ordering::SeqCst), 0);
}
// Test if the multiple thread tokio runtime could exit successfully when force flushing spans
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[ignore = "requires --test-threads=1"]
#[cfg(feature = "rt-tokio")]
async fn test_set_provider_multiple_thread_tokio_shutdown() {
let (span_count, tracer_provider) = test_set_provider_in_tokio(runtime::Tokio).await;
tracer_provider
.shutdown()
.expect("TracerProvider should shutdown properly");
assert!(span_count.load(Ordering::SeqCst) > 0);
}
// Test use simple processor in single thread tokio runtime.
// Expected to see the spans being exported to buffer
#[tokio::test]
#[ignore = "requires --test-threads=1"]
#[cfg(feature = "rt-tokio")]
async fn test_set_provider_single_thread_tokio_with_simple_processor() {
let exporter = SpanCountExporter::new();
let span_count = exporter.span_count.clone();
let tracer_provider = build_simple_tracer_provider(exporter);
let _ = set_tracer_provider(tracer_provider.clone());
let tracer = tracer("opentelemetry");
tracer.in_span("test", |_cx| {});
tracer_provider
.shutdown()
.expect("TracerProvider should shutdown properly");
assert!(span_count.load(Ordering::SeqCst) > 0);
}
// Test if the single thread tokio runtime could exit successfully when not force flushing spans
#[tokio::test]
#[ignore = "requires --test-threads=1"]
#[cfg(feature = "rt-tokio-current-thread")]
async fn test_set_provider_single_thread_tokio() {
let (span_count, _) = test_set_provider_in_tokio(runtime::TokioCurrentThread).await;
assert_eq!(span_count.load(Ordering::SeqCst), 0)
}
// Test if the single thread tokio runtime could exit successfully when force flushing spans.
#[tokio::test]
#[ignore = "requires --test-threads=1"]
#[cfg(feature = "rt-tokio-current-thread")]
async fn test_set_provider_single_thread_tokio_shutdown() {
let (span_count, tracer_provider) =
test_set_provider_in_tokio(runtime::TokioCurrentThread).await;
tracer_provider
.shutdown()
.expect("TracerProvider should shutdown properly");
assert!(span_count.load(Ordering::SeqCst) > 0)
}