Skip to content

Commit 50da0a9

Browse files
committed
Fixed race conditions in multithreaded log parsing.
Multiple threads were accessing shared state without proper synchronization, causing data races detected by ThreadSanitizer. Race conditions fixed: - job->cnt: Multiple threads incrementing without atomics - glog->log_erridx and glog->errors[]: Unsynchronized array access - glog->invalid: Non-atomic increments across threads - glog->read: Non-atomic increments across threads - conf.stop_processing: Concurrent reads/writes without synchronization Changes: - Convert shared counters to atomic types (cnt, read, invalid, log_erridx) - Add mutex protection for glog->errors[] array access - Use local counters in read_lines_thread to minimize atomic contention - Reset conf.stop_processing atomically at start of read_log() - Initialize and cleanup error_mutex properly in GLog lifecycle This ensures thread-safe access to shared state during parallel log parsing while maintaining performance through the use of atomics and minimal lock contention.
1 parent 2f33bae commit 50da0a9

File tree

5 files changed

+68
-41
lines changed

5 files changed

+68
-41
lines changed

Makefile.am

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ AM_CFLAGS += -Wall -Wextra -Wnested-externs -Wformat=2 -g
251251
AM_CFLAGS += -Wmissing-prototypes -Wstrict-prototypes -Wmissing-declarations
252252
AM_CFLAGS += -Wwrite-strings -Wshadow -Wpointer-arith -Wsign-compare
253253
AM_CFLAGS += -Wbad-function-cast -Wcast-align
254-
AM_CFLAGS += -Wdeclaration-after-statement -Wshadow -Wold-style-definition
254+
AM_CFLAGS += -Wdeclaration-after-statement -Wshadow -Wold-style-definition -fsanitize=thread
255255

256256
if WITH_ASAN
257257
AM_CFLAGS += -fsanitize=address

src/gstorage.c

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
#endif
3434
#include <stdlib.h>
3535
#include <string.h>
36+
#include <stdatomic.h>
3637

3738
#include "gstorage.h"
3839

@@ -559,15 +560,23 @@ count_bw (int numdate, uint64_t resp_size) {
559560
/* Keep track of all invalid log strings. */
560561
static void
561562
count_invalid (GLog *glog, GLogItem *logitem, const char *line) {
562-
glog->invalid++;
563+
atomic_fetch_add (&glog->invalid, 1);
563564
ht_inc_cnt_overall ("failed_requests", 1);
564565

565566
if (conf.invalid_requests_log) {
566567
LOG_INVALID (("%s", line));
567568
}
568569

569-
if (logitem->errstr && glog->log_erridx < MAX_LOG_ERRORS) {
570-
glog->errors[glog->log_erridx++] = xstrdup (logitem->errstr);
570+
if (logitem->errstr) {
571+
pthread_mutex_lock (&glog->error_mutex);
572+
573+
uint8_t idx = atomic_load (&glog->log_erridx);
574+
if (idx < MAX_LOG_ERRORS) {
575+
glog->errors[idx] = xstrdup (logitem->errstr);
576+
atomic_store (&glog->log_erridx, idx + 1);
577+
}
578+
579+
pthread_mutex_unlock (&glog->error_mutex);
571580
}
572581
}
573582

@@ -578,10 +587,9 @@ count_invalid (GLog *glog, GLogItem *logitem, const char *line) {
578587
*/
579588
void
580589
uncount_invalid (GLog *glog) {
581-
if (glog->invalid > conf.num_tests)
582-
glog->invalid -= conf.num_tests;
583-
else
584-
glog->invalid = 0;
590+
uint64_t current = atomic_load (&glog->invalid);
591+
uint64_t new_val = (current > conf.num_tests) ? (current - conf.num_tests) : 0;
592+
atomic_store (&glog->invalid, new_val);
585593
}
586594

587595
/* Count down the number of processed hits.

src/parser.c

Lines changed: 43 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -229,20 +229,18 @@ void
229229
free_logs (Logs *logs) {
230230
GLog *glog = NULL;
231231
int i;
232-
233232
for (i = 0; i < logs->size; ++i) {
234233
glog = &logs->glog[i];
235-
236234
free (glog->props.filename);
237235
free (glog->props.fname);
238236
free (glog->fname_as_vhost);
239237
free_logerrors (glog);
240238
free (glog->errors);
239+
pthread_mutex_destroy (&glog->error_mutex); // Destroy mutex
241240
if (glog->pipe) {
242241
fclose (glog->pipe);
243242
}
244243
}
245-
246244
free (logs->glog);
247245
free (logs);
248246
}
@@ -2006,8 +2004,16 @@ validate_and_parse_line (char *line, GLogItem *logitem) {
20062004
/* Collect error messages without incrementing counters (for dry run/testing) */
20072005
static void
20082006
collect_invalid_errors (GLog *glog, GLogItem *logitem) {
2009-
if (logitem->errstr && glog->log_erridx < MAX_LOG_ERRORS) {
2010-
glog->errors[glog->log_erridx++] = xstrdup (logitem->errstr);
2007+
if (logitem->errstr) {
2008+
pthread_mutex_lock (&glog->error_mutex);
2009+
2010+
uint8_t idx = atomic_load (&glog->log_erridx);
2011+
if (idx < MAX_LOG_ERRORS) {
2012+
glog->errors[idx] = xstrdup (logitem->errstr);
2013+
atomic_store (&glog->log_erridx, idx + 1);
2014+
}
2015+
2016+
pthread_mutex_unlock (&glog->error_mutex);
20112017
}
20122018
}
20132019

@@ -2140,26 +2146,20 @@ read_line (GLog *glog, char *line, int *test, uint32_t *cnt, int dry_run) {
21402146
GLogItem *logitem = NULL;
21412147
int ret = 0;
21422148

2143-
/* Begin processing the log line - in case of an invalid log format, flip
2144-
* the test only if there's at least one valid record discovered during the log
2145-
* format test. This condition applies solely when reading a log from the
2146-
* beginning, not when tailing an ongoing log. */
21472149
if ((ret = parse_line (glog, line, dry_run, &logitem)) == 0)
21482150
*test = 0;
21492151

2150-
/* soft ignore these lines from parse_line */
21512152
if (ret == -1)
21522153
return NULL;
21532154

2154-
/* reached num of lines to test and no valid records were found, log
2155-
* format is likely not matching */
2155+
/* Increment local counter (will be synchronized later) */
21562156
if (conf.num_tests && ++(*cnt) >= conf.num_tests && *test) {
21572157
uncount_processed (glog);
21582158
uncount_invalid (glog);
21592159
return NULL;
21602160
}
2161-
glog->read++;
21622161

2162+
atomic_fetch_add (&glog->read, 1); // Only 2 arguments!
21632163
return logitem;
21642164
}
21652165

@@ -2168,19 +2168,29 @@ static void *
21682168
read_lines_thread (void *arg) {
21692169
GJob *job = (GJob *) arg;
21702170
int i = 0;
2171+
uint32_t local_cnt = atomic_load (&job->cnt);
21712172

21722173
for (i = 0; i < job->p; i++) {
2173-
/* ensure we don't process more than we should when testing for log format,
2174-
* else free chunk and stop processing threads */
2175-
if (!job->test || (job->test && job->cnt < conf.num_tests))
2176-
job->logitems[i] = read_line (job->glog, job->lines[i], &job->test, &job->cnt, job->dry_run);
2177-
else
2178-
conf.stop_processing = 1;
2174+
/* Check stop_processing atomically */
2175+
if (atomic_load (&conf.stop_processing))
2176+
break;
2177+
2178+
/* ensure we don't process more than we should when testing for log format */
2179+
if (!job->test || (job->test && local_cnt < conf.num_tests)) {
2180+
job->logitems[i] = read_line (job->glog, job->lines[i], &job->test, &local_cnt, job->dry_run);
2181+
} else {
2182+
atomic_store (&conf.stop_processing, 1);
2183+
break;
2184+
}
21792185

21802186
#ifdef WITH_GETLINE
21812187
free (job->lines[i]);
21822188
#endif
21832189
}
2190+
2191+
/* Update shared counter atomically */
2192+
atomic_store (&job->cnt, local_cnt);
2193+
21842194
return (void *) 0;
21852195
}
21862196

@@ -2257,10 +2267,17 @@ init_jobs (GJob jobs[2][conf.jobs], GLog *glog, int dry_run, int test) {
22572267
int i = 0;
22582268
#endif
22592269

2270+
/* Initialize error mutex once */
2271+
static int mutex_initialized = 0;
2272+
if (!mutex_initialized) {
2273+
pthread_mutex_init (&glog->error_mutex, NULL);
2274+
mutex_initialized = 1;
2275+
}
2276+
22602277
for (b = 0; b < 2; b++) {
22612278
for (k = 0; k < conf.jobs; k++) {
22622279
jobs[b][k].p = 0;
2263-
jobs[b][k].cnt = 0;
2280+
atomic_store (&jobs[b][k].cnt, 0); // Use atomic store
22642281
jobs[b][k].glog = glog;
22652282
jobs[b][k].test = test;
22662283
jobs[b][k].dry_run = dry_run;
@@ -2299,11 +2316,13 @@ read_lines_from_file (FILE *fp, GLog *glog, GJob jobs[2][conf.jobs], int b, char
22992316
static void
23002317
process_lines (GJob jobs[2][conf.jobs], uint32_t *cnt, int *test, int b) {
23012318
int k = 0;
2302-
23032319
for (k = 0; k < conf.jobs; k++) {
23042320
process_lines_thread (&jobs[b][k]);
2305-
*cnt += jobs[b][k].cnt;
2306-
jobs[b][k].cnt = 0;
2321+
2322+
/* Read atomic counter */
2323+
*cnt += atomic_load (&jobs[b][k].cnt);
2324+
atomic_store (&jobs[b][k].cnt, 0);
2325+
23072326
*test &= jobs[b][k].test;
23082327
jobs[b][k].p = 0;
23092328
}
@@ -2488,7 +2507,7 @@ read_log (GLog *glog, int dry_run) {
24882507
struct stat fdstat;
24892508

24902509
/* Reset stop_processing flag for new parse attempt */
2491-
conf.stop_processing = 0;
2510+
atomic_store (&conf.stop_processing, 0);
24922511

24932512
/* Ensure we have a valid pipe to read from stdin. Only checking for
24942513
* conf.read_stdin without verifying for a valid FILE pointer would certainly

src/parser.h

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@
5454

5555

5656
#include <stdio.h>
57+
#include <stdatomic.h>
58+
#include <pthread.h>
59+
5760
#include "commons.h"
5861
#include "gslist.h"
5962

@@ -126,25 +129,22 @@ typedef struct GLastParse_ {
126129
/* Overall parsed log properties */
127130
typedef struct GLog_ {
128131
uint8_t piping:1;
129-
uint8_t log_erridx;
130-
uint32_t read; /* lines read/parsed */
132+
_Atomic uint8_t log_erridx;
133+
_Atomic uint32_t read; /* lines read/parsed */
131134
uint64_t bytes; /* bytes read on each iteration */
132135
uint64_t length; /* length read from the log so far */
133-
uint64_t invalid; /* invalid lines for this log */
136+
_Atomic uint64_t invalid; /* invalid lines for this log */
134137
uint64_t processed; /* lines proceeded for this log */
135138

136-
/* file test for persisted/restored data */
137139
uint16_t snippetlen;
138140
char snippet[READ_BYTES + 1];
139-
140141
GLastParse lp;
141142
GLogProp props;
142143
struct tm start_time;
143-
144144
char *fname_as_vhost;
145145
char **errors;
146-
147146
FILE *pipe;
147+
pthread_mutex_t error_mutex; // Add mutex for error array
148148
} GLog;
149149

150150
/* Container for all logs */
@@ -161,7 +161,7 @@ typedef struct Logs_ {
161161

162162
/* Pthread jobs for multi-thread */
163163
typedef struct GJob_ {
164-
uint32_t cnt;
164+
_Atomic uint32_t cnt; // Make atomic
165165
int p, test, dry_run, running;
166166
GLog *glog;
167167
GLogItem **logitems;

src/settings.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ typedef struct GConf_
213213
int hour_spec_min; /* hour specificity - min */
214214
int read_stdin; /* read from stdin */
215215
int serve_usecs; /* is there time served within req line */
216-
int stop_processing; /* stop all processing */
216+
_Atomic int stop_processing; /* stop all processing */
217217
int tailing_mode; /* in tailing-mode? */
218218

219219
/* Array indices */

0 commit comments

Comments
 (0)