Skip to content

opentelemetry: general fixes for log handling #10693

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Aug 7, 2025
4 changes: 4 additions & 0 deletions cmake/msgpack.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,9 @@ option(MSGPACK_BUILD_EXAMPLES OFF)
include_directories(
${FLB_PATH_ROOT_SOURCE}/${FLB_PATH_LIB_MSGPACK}/include
)

# define preprocessor MSGPACK_EMBED_STACK_SIZE to 1024
add_compile_definitions(MSGPACK_EMBED_STACK_SIZE=64)

add_subdirectory(${FLB_PATH_LIB_MSGPACK} EXCLUDE_FROM_ALL)
set(MSGPACK_LIBRARIES "msgpack-c-static")
5 changes: 5 additions & 0 deletions plugins/in_opentelemetry/opentelemetry_logs.c
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,11 @@ static int otlp_pack_any_value(msgpack_packer *mp_pck,
result = otel_pack_bytes(mp_pck, body->bytes_value);
break;

case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE__NOT_SET:
/* treat an unset value as null */
result = msgpack_pack_nil(mp_pck);
break;

default:
break;
}
Expand Down
5 changes: 4 additions & 1 deletion src/flb_pack.c
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ int flb_json_tokenise(const char *js, size_t len,
state->tokens = tmp;
state->tokens_size += new_tokens;

/* Reset parser to reprocess the JSON data from the beginning */
jsmn_init(&state->parser);

ret = jsmn_parse(&state->parser, js, len,
state->tokens, state->tokens_size);
}
Expand Down Expand Up @@ -217,7 +220,7 @@ static char *tokens_to_msgpack(struct flb_pack_state *state,
for (i = 0; i < arr_size ; i++) {
t = &tokens[i];

if (t->start == -1 || t->end == -1 || (t->start == 0 && t->end == 0)) {
if (t->start < 0 || t->end <= 0) {
msgpack_sbuffer_destroy(&sbuf);
return NULL;
}
Expand Down
45 changes: 33 additions & 12 deletions src/opentelemetry/flb_opentelemetry_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,9 @@ int flb_otel_utils_json_payload_get_wrapped_value(msgpack_object *wrapper,
internal_type = MSGPACK_OBJECT_BIN;
}
else if (strncasecmp(kv_key->ptr, "arrayValue", kv_key->size) == 0) {
if (kv_value->type != MSGPACK_OBJECT_ARRAY) {
/* If the value is not an array, we cannot process it */
if (kv_value->type != MSGPACK_OBJECT_ARRAY &&
kv_value->type != MSGPACK_OBJECT_MAP) {
/* If the value is not an array or map, we cannot process it */
return -2;
}
internal_type = MSGPACK_OBJECT_ARRAY;
Expand Down Expand Up @@ -233,7 +234,6 @@ int flb_otel_utils_json_payload_append_converted_value(
target_field,
(char *) object->via.str.ptr,
object->via.str.size);

break;
case MSGPACK_OBJECT_NIL:
/* Append a null value */
Expand Down Expand Up @@ -262,7 +262,6 @@ int flb_otel_utils_json_payload_append_converted_value(
encoder,
target_field,
object);

break;
default:
break;
Expand Down Expand Up @@ -375,11 +374,8 @@ int flb_otel_utils_json_payload_append_converted_map(
object,
&encoder_result);

if (result == 0 && encoder_result == FLB_EVENT_ENCODER_SUCCESS) {
return result;
}
else if (result != 0) {
return result;
if (result == 0) {
return encoder_result;
}

result = flb_log_event_encoder_begin_map(encoder, target_field);
Expand Down Expand Up @@ -451,6 +447,9 @@ int flb_otel_utils_json_payload_append_converted_kvlist(
int value_index;
int key_index;
int result;
int pack_null_value = FLB_FALSE;
int pack_string_value = FLB_FALSE;
int pack_value = FLB_FALSE;
size_t index;
msgpack_object_array *array;
msgpack_object_map *entry;
Expand All @@ -476,15 +475,33 @@ int flb_otel_utils_json_payload_append_converted_kvlist(
result = FLB_EVENT_ENCODER_ERROR_INVALID_ARGUMENT;
}

value_index = -1;
pack_null_value = FLB_FALSE;
pack_string_value = FLB_FALSE;
pack_value = FLB_FALSE;

if (result == FLB_EVENT_ENCODER_SUCCESS) {
value_index = flb_otel_utils_find_map_entry_by_key(entry, "value", 0, FLB_TRUE);

if (value_index >= 0 &&
entry->ptr[value_index].val.type == MSGPACK_OBJECT_MAP &&
entry->ptr[value_index].val.via.map.size == 0) {
/*
* if value is an empty map it represents an unset value, pack as NULL
*/
pack_null_value = FLB_TRUE;
}
}

if (value_index == -1) {
/*
* if value is missing basically is 'unset' and handle as Empty() in OTel world, in
* this case we just pack an empty string value
*/
pack_string_value = FLB_TRUE;
}
else if (!pack_null_value) {
pack_value = FLB_TRUE;
}

if (result == FLB_EVENT_ENCODER_SUCCESS) {
Expand All @@ -495,14 +512,18 @@ int flb_otel_utils_json_payload_append_converted_kvlist(
}

if (result == FLB_EVENT_ENCODER_SUCCESS) {
/* if the value is not set, register an empty string as value */
if (value_index == -1) {
if (pack_null_value) {
/* pack NULL for unset values (empty maps) */
result = flb_log_event_encoder_append_null(encoder, target_field);
}
else if (pack_string_value) {
/* if the value is not set, register an empty string as value */
result = flb_log_event_encoder_append_string(
encoder,
target_field,
"", 0);
}
else {
else if (pack_value) {
/* expected value must come in a map */
if (entry->ptr[value_index].val.type != MSGPACK_OBJECT_MAP) {
result = -1;
Expand Down
97 changes: 97 additions & 0 deletions tests/internal/data/opentelemetry/test_cases.json
Original file line number Diff line number Diff line change
Expand Up @@ -1225,5 +1225,102 @@
"log_metadata": {"otlp":{}},
"log_body": {"test_key": "", "normal_key": "normal_value"}
}
},
"issue_10672_empty_map_as_null": {
"input": {
"resourceLogs": [{
"scopeLogs": [{
"logRecords": [{
"timeUnixNano": "1722904465173450100",
"body": {
"kvlistValue": {
"values": [
{"key": "empty_map_key", "value": {}},
{"key": "normal_key", "value": {"stringValue": "normal_value"}},
{"key": "missing_value_key"}
]
}
}
}]
}]
}]
},
"expected": {
"group_metadata": {"schema":"otlp","resource_id":0,"scope_id":0},
"group_body": {"resource":{}},
"log_metadata": {"otlp":{}},
"log_body": {"empty_map_key": null, "normal_key": "normal_value", "missing_value_key": ""}
}
},
"arrayvalue_wrapped_map": {
"input": {
"resourceLogs": [{
"scopeLogs": [{
"logRecords": [{
"timeUnixNano": "1640995200000000000",
"body": {
"kvlistValue": {
"values": [{
"key": "a",
"value": {
"kvlistValue": {
"values": [{
"key": "d",
"value": {
"arrayValue": {
"values": [{
"kvlistValue": {
"values": [{
"key": "e",
"value": {"stringValue": "g"}
}]
}
}]
}
}
}]
}
}
}]
}
}
}]
}]
}]
},
"expected": {
"group_metadata": {"schema":"otlp","resource_id":0,"scope_id":0},
"group_body": {"resource":{}},
"log_metadata": {"otlp":{}},
"log_body": {"a":{"d":[{"e":"g"}]}}
}
},
"unwrapped_plain_map": {
"input": {
"resourceLogs": [{
"scopeLogs": [{
"logRecords": [{
"timeUnixNano": "1640995200000000000",
"body": {
"kvlistValue": {
"values": [{
"key": "plain",
"value": {
"key1": {"stringValue": "v1"},
"key2": {"intValue": 2}
}
}]
}
}
}]
}]
}]
},
"expected": {
"group_metadata": {"schema":"otlp","resource_id":0,"scope_id":0},
"group_body": {"resource":{}},
"log_metadata": {"otlp":{}},
"log_body": {"plain": {"key1": "v1", "key2": 2}}
}
}
}
35 changes: 35 additions & 0 deletions tests/internal/pack.c
Original file line number Diff line number Diff line change
Expand Up @@ -970,6 +970,40 @@ void test_json_pack_bug5336()
}
}

/* Ensure empty arrays inside nested objects are handled */
void test_json_pack_empty_array()
{
int ret;
int root_type;
size_t out_size;
char *out_buf;
char *json = "{\"resourceLogs\":[{\"resource\":{},\"scopeLogs\":[{\"scope\":{},\"logRecords\":[{\"observedTimeUnixNano\":\"1754059282910920618\",\"body\":{\"kvlistValue\":{\"values\":[{\"key\":\"a\",\"value\":{\"kvlistValue\":{\"values\":[{\"key\":\"b\",\"value\":{\"kvlistValue\":{\"values\":[{\"key\":\"c\",\"value\":{\"kvlistValue\":{\"values\":[{\"key\":\"d\",\"value\":{\"arrayValue\":{\"values\":[{\"kvlistValue\":{\"values\":[{\"key\":\"e\",\"value\":{\"kvlistValue\":{\"values\":[{\"key\":\"f\",\"value\":{\"stringValue\":\"g\"}}]}}}]}}]}}}]}}}]}}}]}}}]}}}]}]}]}";

ret = flb_pack_json( (char*) json, strlen((char *) json), &out_buf, &out_size, &root_type, NULL);
TEST_CHECK(ret == 0);

if (ret != 0) {
printf("flb_pack_json failed: %d\n", ret);
exit(EXIT_FAILURE);
}

/* unpack just to validate the msgpack buffer */
msgpack_unpacked result;
msgpack_object obj;
size_t off = 0;
msgpack_unpacked_init(&result);
off = 0;
TEST_CHECK(msgpack_unpack_next(&result, out_buf, out_size, &off) == MSGPACK_UNPACK_SUCCESS);

printf("\nmsgpack---:\n");
msgpack_object_print(stdout, result.data);
printf("\n");

msgpack_unpacked_destroy(&result);

flb_free(out_buf);
}

const char input_msgpack[] = {0x92,/* array 2 */
0xd7, 0x00, /* event time*/
0x07, 0x5b, 0xcd, 0x15, /* second = 123456789 = 1973/11/29 21:33:09 */
Expand Down Expand Up @@ -1115,6 +1149,7 @@ TEST_LIST = {
{ "json_pack_bug1278" , test_json_pack_bug1278},
{ "json_pack_nan" , test_json_pack_nan},
{ "json_pack_bug5336" , test_json_pack_bug5336},
{ "json_pack_empty_array", test_json_pack_empty_array},
{ "json_date_iso8601" , test_json_date_iso8601},
{ "json_date_double" , test_json_date_double},
{ "json_date_java_sql" , test_json_date_java_sql},
Expand Down
Loading