Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions .github/workflows/java_unit_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,13 @@ jobs:
distribution: 'temurin'
- name: setup OpenGemini
uses: hezhangjian/setup-opengemini-action@main
- name: unit tests
run: mvn -B clean test
- name: wait opengemini started
run: sleep 5
- name: setup go-env
uses: actions/setup-go@v6
with:
go-version: 'stable'
- name: setup ts-trace
run: go install github.com/openGemini/observability/trace/cmd/ts-trace@latest && ts-trace &
- name: unit tests
run: mvn -B clean test
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright 2025 openGemini Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.opengemini.client.interceptor;

import io.github.openfacade.http.HttpClientConfig;
import io.opengemini.client.api.Address;
import io.opengemini.client.api.Configuration;
import io.opengemini.client.api.Query;
import io.opengemini.client.api.Write;
import io.opengemini.client.impl.OpenGeminiClient;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
import io.opentelemetry.semconv.resource.attributes.ResourceAttributes;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

/**
* Example demonstrating OpenGemini client usage with interceptors.
*/

public class TraceFailureToleranceTest {

private OpenGeminiClient openGeminiClient;

@BeforeEach
void setUp() {
HttpClientConfig httpConfig = new HttpClientConfig.Builder()
.connectTimeout(Duration.ofSeconds(3))
.timeout(Duration.ofSeconds(3))
.build();
Configuration configuration = Configuration.builder()
.addresses(Collections.singletonList(new Address("127.0.0.1", 8086)))
.httpConfig(httpConfig)
.gzipEnabled(false)
.build();
this.openGeminiClient = new OpenGeminiClient(configuration);

OtelInterceptor otelInterceptor = new OtelInterceptor();

otelInterceptor.setTracer(getErrTracer());
openGeminiClient.addInterceptors(otelInterceptor);
}

private Tracer getErrTracer() {
OpenTelemetry openTelemetry;
OtlpGrpcSpanExporter otlpGrpcSpanExporter = OtlpGrpcSpanExporter.builder()
.setEndpoint("http://127.0.0.1:38086") // error endpoiont to test the failure tolerance
.build();

BatchSpanProcessor spanProcessor = BatchSpanProcessor.builder(otlpGrpcSpanExporter)
.setScheduleDelay(100, TimeUnit.MILLISECONDS)
.build();

SdkTracerProvider tracerProvider = SdkTracerProvider.builder()
.addSpanProcessor(spanProcessor)
.setResource(Resource.create(
Attributes.of(ResourceAttributes.SERVICE_NAME, "opengemini-client-java")
))
.build();

openTelemetry = OpenTelemetrySdk.builder()
.setTracerProvider(tracerProvider)
.build();

return openTelemetry.getTracer("opengemini-client-java");
}

@Test
void testTracingIntegration() throws ExecutionException, InterruptedException {
String databaseTestName = "tracing_test_db";
CompletableFuture<Void> createdb = openGeminiClient.createDatabase(databaseTestName);
createdb.get();

Assertions.assertDoesNotThrow(() -> {
Write write = new Write(
"tracing_test_db",
"autogen",
"tracing_measurement,tag=test value=8 " + System.currentTimeMillis(),
"ns"
);

openGeminiClient.executeWrite(
write.getDatabase(),
write.getRetentionPolicy(),
write.getLineProtocol()
).get(10, TimeUnit.SECONDS);

Query query = new Query("SELECT * FROM tracing_measurement");
openGeminiClient.query(query).get(10, TimeUnit.SECONDS);

}, "Tracing integration should not throw an exception");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.exporter.jaeger.JaegerGrpcSpanExporter;
import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
Expand All @@ -39,6 +39,7 @@

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand All @@ -51,8 +52,14 @@ public class TracingIntegrationTest {

private OpenGeminiClient openGeminiClient;

private String databaseName = "jaeger_storage";

private String rpName = "trace";

private String measurementName = "opengemini-client-java";

@BeforeEach
void setUp() {
void setUp() throws ExecutionException, InterruptedException {
HttpClientConfig httpConfig = new HttpClientConfig.Builder()
.connectTimeout(Duration.ofSeconds(3))
.timeout(Duration.ofSeconds(3))
Expand All @@ -68,6 +75,8 @@ void setUp() {

otelInterceptor.setTracer(getTestTracer());
openGeminiClient.addInterceptors(otelInterceptor);

cleanTrace();
}

@AfterEach
Expand All @@ -79,12 +88,13 @@ void setDown() throws InterruptedException {
private Tracer getTestTracer() {
OpenTelemetry openTelemetry;
try {
JaegerGrpcSpanExporter jaegerExporter = JaegerGrpcSpanExporter.builder()
.setEndpoint("http://localhost:14250")
OtlpGrpcSpanExporter otlpGrpcSpanExporter = OtlpGrpcSpanExporter.builder()
.setEndpoint("http://127.0.0.1:18086")
.addHeader("Authentication", "")
.build();

BatchSpanProcessor spanProcessor = BatchSpanProcessor.builder(jaegerExporter)
.setScheduleDelay(100, java.util.concurrent.TimeUnit.MILLISECONDS)
BatchSpanProcessor spanProcessor = BatchSpanProcessor.builder(otlpGrpcSpanExporter)
.setScheduleDelay(100, TimeUnit.MILLISECONDS)
.build();

SdkTracerProvider tracerProvider = SdkTracerProvider.builder()
Expand All @@ -100,57 +110,79 @@ private Tracer getTestTracer() {

return openTelemetry.getTracer("opengemini-client-java");
} catch (Exception e) {
// Fallback to no-op implementation
openTelemetry = OpenTelemetry.noop();
return openTelemetry.getTracer("opengemini-client-java");
}

}

private void cleanTrace() throws ExecutionException, InterruptedException {
Query dropMeasurement = new Query("DROP MEASUREMENT \"%s\"".formatted(measurementName), databaseName, rpName);
openGeminiClient.query(dropMeasurement).get();
}

private void checkLastTraceCommand(String command) throws ExecutionException, InterruptedException {
String queryTraceCommand = "SELECT command FROM \"%s\"".formatted(measurementName);
Query checkTraceQuery = new Query(queryTraceCommand, databaseName, rpName);
QueryResult checkRst = openGeminiClient.query(checkTraceQuery).get();
List<List<Object>> queryValues = checkRst.getResults().get(0).getSeries().get(0).getValues();
int recordNum = queryValues.size();
Assertions.assertEquals(command, queryValues.get(recordNum - 1).get(1));
}

@Test
void testDatabaseCreation() {
void testDatabaseCreation() throws ExecutionException, InterruptedException {
String command = "CREATE DATABASE test_db";
Assertions.assertDoesNotThrow(() -> {
Query createDbQuery = new Query("CREATE DATABASE test_db");
Query createDbQuery = new Query(command);
openGeminiClient.query(createDbQuery).get(10, TimeUnit.SECONDS);
}, "Database creation should not throw an exception");

Thread.sleep(3000);
checkLastTraceCommand(command);
}

@Test
void testQueryOperation() {
void testQueryOperation() throws InterruptedException, ExecutionException {
Configuration config = new Configuration();
config.setAddresses(java.util.Collections.singletonList(new Address("localhost", 8086)));
config.setAddresses(Collections.singletonList(new Address("localhost", 8086)));
if (config.getHttpConfig() == null) {
config.setHttpConfig(new HttpClientConfig.Builder().build());
}

String command = "SHOW DATABASES";
Assertions.assertDoesNotThrow(() -> {
Query createDbQuery = new Query("CREATE DATABASE test_db");
openGeminiClient.query(createDbQuery).get(10, TimeUnit.SECONDS);

Query showDbQuery = new Query("SHOW DATABASES");
Query showDbQuery = new Query(command);
QueryResult result = openGeminiClient.query(showDbQuery).get(10, TimeUnit.SECONDS);
Assertions.assertNotNull(result, "Query result should not be null");
}, "Query operation should not throw an exception");

Thread.sleep(3000);
checkLastTraceCommand(command);
}

@Test
void testWriteOperation() throws InterruptedException {
void testWriteOperation() throws InterruptedException, ExecutionException {
Configuration config = new Configuration();
config.setAddresses(java.util.Collections.singletonList(
config.setAddresses(Collections.singletonList(
new Address("localhost", 8086)));

if (config.getHttpConfig() == null) {
config.setHttpConfig(new HttpClientConfig.Builder().build());
}

String lineProtocol = "temperature,location=room1 value=25.5";
Assertions.assertDoesNotThrow(() -> {
Query createDbQuery = new Query("CREATE DATABASE test_db");
openGeminiClient.query(createDbQuery).get(10, TimeUnit.SECONDS);

Write write = new Write(
"test_db",
"autogen",
"temperature,location=room1 value=25.5 " + System.currentTimeMillis(),
lineProtocol,
"ns"
);

Expand All @@ -161,6 +193,9 @@ void testWriteOperation() throws InterruptedException {
).get(10, TimeUnit.SECONDS);

}, "Write operation should not throw an exception");

Thread.sleep(3000);
checkLastTraceCommand(lineProtocol);
}

@Test
Expand All @@ -169,6 +204,7 @@ void testTracingIntegration() throws ExecutionException, InterruptedException {
CompletableFuture<Void> createdb = openGeminiClient.createDatabase(databaseTestName);
createdb.get();

String command = "SELECT * FROM tracing_measurement";
Assertions.assertDoesNotThrow(() -> {

Write write = new Write(
Expand All @@ -184,9 +220,12 @@ void testTracingIntegration() throws ExecutionException, InterruptedException {
write.getLineProtocol()
).get(10, TimeUnit.SECONDS);

Query query = new Query("SELECT * FROM tracing_measurement");
Query query = new Query(command);
openGeminiClient.query(query).get(10, TimeUnit.SECONDS);

}, "Tracing integration should not throw an exception");

Thread.sleep(3000);
checkLastTraceCommand(command);
}
}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-jaeger</artifactId>
<artifactId>opentelemetry-exporter-otlp</artifactId>
<version>${opentelemetry-exporter.version}</version>
</dependency>
<dependency>
Expand Down
Loading