@@ -540,8 +540,6 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
540
540
541
541
private int nackIndex ;
542
542
543
- private Object sample ;
544
-
545
543
private volatile boolean consumerPaused ;
546
544
547
545
private volatile long lastPoll = System .currentTimeMillis ();
@@ -618,17 +616,7 @@ else if (listener instanceof MessageListener) {
618
616
this .containerProperties .setSyncCommitTimeout (this .syncCommitTimeout );
619
617
}
620
618
this .maxPollInterval = obtainMaxPollInterval (consumerProperties );
621
- MicrometerHolder holder = null ;
622
- try {
623
- if (MICROMETER_PRESENT && this .containerProperties .isMicrometerEnabled ()) {
624
- holder = new MicrometerHolder (getApplicationContext (), getBeanName (),
625
- this .containerProperties .getMicrometerTags ());
626
- }
627
- }
628
- catch (@ SuppressWarnings ("unused" ) IllegalStateException ex ) {
629
- // NOSONAR - no micrometer or meter registry
630
- }
631
- this .micrometerHolder = holder ;
619
+ this .micrometerHolder = obtainMicrometerHolder ();
632
620
}
633
621
634
622
private long obtainMaxPollInterval (Properties consumerProperties ) {
@@ -793,6 +781,21 @@ protected ErrorHandler determineErrorHandler(GenericErrorHandler<?> errHandler)
793
781
: this .transactionManager != null ? null : new LoggingErrorHandler ();
794
782
}
795
783
784
+ @ Nullable
785
+ private MicrometerHolder obtainMicrometerHolder () {
786
+ MicrometerHolder holder = null ;
787
+ try {
788
+ if (MICROMETER_PRESENT && this .containerProperties .isMicrometerEnabled ()) {
789
+ holder = new MicrometerHolder (getApplicationContext (), getBeanName (),
790
+ this .containerProperties .getMicrometerTags ());
791
+ }
792
+ }
793
+ catch (@ SuppressWarnings ("unused" ) IllegalStateException ex ) {
794
+ // NOSONAR - no micrometer or meter registry
795
+ }
796
+ return holder ;
797
+ }
798
+
796
799
private void seekPartitions (Collection <TopicPartition > partitions , boolean idle ) {
797
800
this .consumerSeekAwareListener .registerSeekCallback (this );
798
801
Map <TopicPartition , Long > current = new HashMap <>();
@@ -1196,19 +1199,13 @@ private List<ConsumerRecord<K, V>> createRecordList(final ConsumerRecords<K, V>
1196
1199
private RuntimeException doInvokeBatchListener (final ConsumerRecords <K , V > records ,
1197
1200
List <ConsumerRecord <K , V >> recordList , @ SuppressWarnings (RAW_TYPES ) Producer producer ) {
1198
1201
1202
+ Object sample = startMicrometerSample ();
1199
1203
try {
1200
- if (this .micrometerHolder != null ) {
1201
- this .sample = this .micrometerHolder .start ();
1202
- }
1203
1204
invokeBatchOnMessage (records , recordList , producer );
1204
- if (this .sample != null ) {
1205
- this .micrometerHolder .success (this .sample );
1206
- }
1205
+ successTimer (sample );
1207
1206
}
1208
1207
catch (RuntimeException e ) {
1209
- if (this .sample != null ) {
1210
- this .micrometerHolder .failure (this .sample );
1211
- }
1208
+ failureTimer (sample );
1212
1209
if (this .containerProperties .isAckOnError () && !this .autoCommit && producer == null ) {
1213
1210
this .acks .addAll (getHighestOffsetRecords (records ));
1214
1211
}
@@ -1233,6 +1230,26 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords<K, V> recor
1233
1230
return null ;
1234
1231
}
1235
1232
1233
+ @ Nullable
1234
+ private Object startMicrometerSample () {
1235
+ if (this .micrometerHolder != null ) {
1236
+ return this .micrometerHolder .start ();
1237
+ }
1238
+ return null ;
1239
+ }
1240
+
1241
+ private void successTimer (@ Nullable Object sample ) {
1242
+ if (sample != null ) {
1243
+ this .micrometerHolder .success (sample );
1244
+ }
1245
+ }
1246
+
1247
+ private void failureTimer (@ Nullable Object sample ) {
1248
+ if (sample != null ) {
1249
+ this .micrometerHolder .failure (sample );
1250
+ }
1251
+ }
1252
+
1236
1253
private void invokeBatchOnMessage (final ConsumerRecords <K , V > records , // NOSONAR - Cyclomatic Complexity
1237
1254
List <ConsumerRecord <K , V >> recordList , @ SuppressWarnings (RAW_TYPES ) Producer producer ) throws InterruptedException {
1238
1255
@@ -1453,19 +1470,14 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> recor
1453
1470
@ SuppressWarnings (RAW_TYPES ) Producer producer ,
1454
1471
Iterator <ConsumerRecord <K , V >> iterator ) {
1455
1472
1473
+ Object sample = startMicrometerSample ();
1474
+
1456
1475
try {
1457
- if (this .micrometerHolder != null ) {
1458
- this .sample = this .micrometerHolder .start ();
1459
- }
1460
1476
invokeOnMessage (record , producer );
1461
- if (this .sample != null ) {
1462
- this .micrometerHolder .success (this .sample );
1463
- }
1477
+ successTimer (sample );
1464
1478
}
1465
1479
catch (RuntimeException e ) {
1466
- if (this .sample != null ) {
1467
- this .micrometerHolder .failure (this .sample );
1468
- }
1480
+ failureTimer (sample );
1469
1481
if (this .containerProperties .isAckOnError () && !this .autoCommit && producer == null ) {
1470
1482
ackCurrent (record );
1471
1483
}
0 commit comments