17
17
package org .springframework .integration .jdbc .store ;
18
18
19
19
import java .sql .Types ;
20
- import java .util .HashMap ;
21
20
import java .util .HashSet ;
22
21
import java .util .List ;
23
22
import java .util .Map ;
24
23
import java .util .Set ;
25
24
import java .util .UUID ;
25
+ import java .util .concurrent .ConcurrentHashMap ;
26
26
import java .util .concurrent .locks .Lock ;
27
27
import java .util .concurrent .locks .ReadWriteLock ;
28
28
import java .util .concurrent .locks .ReentrantReadWriteLock ;
29
+ import java .util .function .Supplier ;
29
30
30
31
import javax .sql .DataSource ;
31
32
@@ -105,6 +106,18 @@ public class JdbcChannelMessageStore implements PriorityCapableChannelMessageSto
105
106
*/
106
107
public static final String DEFAULT_TABLE_PREFIX = "INT_" ;
107
108
109
+ private enum Query {
110
+ CREATE_MESSAGE ,
111
+ COUNT_GROUPS ,
112
+ GROUP_SIZE ,
113
+ DELETE_GROUP ,
114
+ POLL ,
115
+ POLL_WITH_EXCLUSIONS ,
116
+ PRIORITY ,
117
+ PRIORITY_WITH_EXCLUSIONS ,
118
+ DELETE_MESSAGE
119
+ }
120
+
108
121
/**
109
122
* The name of the message header that stores a flag to indicate that the message has been saved. This is an
110
123
* optimization for the put method.
@@ -146,7 +159,7 @@ public class JdbcChannelMessageStore implements PriorityCapableChannelMessageSto
146
159
147
160
private ChannelMessageStorePreparedStatementSetter preparedStatementSetter ;
148
161
149
- private Map <String , String > queryCache = new HashMap <>();
162
+ private final Map <Query , String > queryCache = new ConcurrentHashMap <>();
150
163
151
164
private MessageGroupFactory messageGroupFactory = new SimpleMessageGroupFactory ();
152
165
@@ -409,7 +422,8 @@ public void afterPropertiesSet() throws Exception {
409
422
@ Override
410
423
public MessageGroup addMessageToGroup (Object groupId , final Message <?> message ) {
411
424
try {
412
- this .jdbcTemplate .update (getQuery (this .channelMessageStoreQueryProvider .getCreateMessageQuery ()),
425
+ this .jdbcTemplate .update (getQuery (Query .CREATE_MESSAGE ,
426
+ () -> this .channelMessageStoreQueryProvider .getCreateMessageQuery ()),
413
427
ps -> this .preparedStatementSetter .setValues (ps , message , groupId , this .region ,
414
428
this .priorityEnabled ));
415
429
}
@@ -448,26 +462,22 @@ public MessageGroup getMessageGroup(Object groupId) {
448
462
@ ManagedAttribute
449
463
public int getMessageGroupCount () {
450
464
return this .jdbcTemplate .queryForObject (
451
- getQuery ("SELECT COUNT(DISTINCT GROUP_KEY) from %PREFIX%CHANNEL_MESSAGE where REGION = ?" ),
465
+ getQuery (Query .COUNT_GROUPS ,
466
+ () -> "SELECT COUNT(DISTINCT GROUP_KEY) from %PREFIX%CHANNEL_MESSAGE where REGION = ?" ),
452
467
Integer .class , this .region );
453
468
}
454
469
455
470
/**
456
471
* Replace patterns in the input to produce a valid SQL query. This implementation lazily initializes a
457
472
* simple map-based cache, only replacing the table prefix on the first access to a named query. Further
458
473
* accesses will be resolved from the cache.
459
- * @param sqlQuery The SQL query to be transformed.
474
+ * @param queryName The {@link Query} to be transformed.
475
+ * @param queryProvider a supplier to provide the query template.
460
476
* @return A transformed query with replacements.
461
477
*/
462
- protected String getQuery (String sqlQuery ) {
463
- String query = this .queryCache .get (sqlQuery );
464
-
465
- if (query == null ) {
466
- query = StringUtils .replace (sqlQuery , "%PREFIX%" , this .tablePrefix );
467
- this .queryCache .put (sqlQuery , query );
468
- }
469
-
470
- return query ;
478
+ protected String getQuery (Query queryName , Supplier <String > queryProvider ) {
479
+ return this .queryCache .computeIfAbsent (queryName ,
480
+ k -> StringUtils .replace (queryProvider .get (), "%PREFIX%" , this .tablePrefix ));
471
481
}
472
482
473
483
/**
@@ -479,13 +489,17 @@ protected String getQuery(String sqlQuery) {
479
489
@ ManagedAttribute
480
490
public int messageGroupSize (Object groupId ) {
481
491
final String key = getKey (groupId );
482
- return this .jdbcTemplate .queryForObject (getQuery (this .channelMessageStoreQueryProvider .getCountAllMessagesInGroupQuery ()),
492
+ return this .jdbcTemplate .queryForObject (
493
+ getQuery (Query .GROUP_SIZE ,
494
+ () -> this .channelMessageStoreQueryProvider .getCountAllMessagesInGroupQuery ()),
483
495
Integer .class , key , this .region );
484
496
}
485
497
486
498
@ Override
487
499
public void removeMessageGroup (Object groupId ) {
488
- this .jdbcTemplate .update (this .getQuery (this .channelMessageStoreQueryProvider .getDeleteMessageGroupQuery ()),
500
+ this .jdbcTemplate .update (
501
+ this .getQuery (Query .DELETE_GROUP ,
502
+ () -> this .channelMessageStoreQueryProvider .getDeleteMessageGroupQuery ()),
489
503
this .getKey (groupId ), this .region );
490
504
}
491
505
@@ -531,19 +545,22 @@ protected Message<?> doPollForMessage(String groupIdKey) {
531
545
try {
532
546
if (this .usingIdCache && !this .idCache .isEmpty ()) {
533
547
if (this .priorityEnabled ) {
534
- query = getQuery (this .channelMessageStoreQueryProvider .getPriorityPollFromGroupExcludeIdsQuery ());
548
+ query = getQuery (Query .PRIORITY_WITH_EXCLUSIONS ,
549
+ () -> this .channelMessageStoreQueryProvider .getPriorityPollFromGroupExcludeIdsQuery ());
535
550
}
536
551
else {
537
- query = getQuery (this .channelMessageStoreQueryProvider .getPollFromGroupExcludeIdsQuery ());
552
+ query = getQuery (Query .POLL_WITH_EXCLUSIONS ,
553
+ () -> this .channelMessageStoreQueryProvider .getPollFromGroupExcludeIdsQuery ());
538
554
}
539
555
parameters .addValue ("message_ids" , this .idCache );
540
556
}
541
557
else {
542
558
if (this .priorityEnabled ) {
543
- query = getQuery (this .channelMessageStoreQueryProvider .getPriorityPollFromGroupQuery ());
559
+ query = getQuery (Query .PRIORITY ,
560
+ () -> this .channelMessageStoreQueryProvider .getPriorityPollFromGroupQuery ());
544
561
}
545
562
else {
546
- query = getQuery (this .channelMessageStoreQueryProvider .getPollFromGroupQuery ());
563
+ query = getQuery (Query . POLL , () -> this .channelMessageStoreQueryProvider .getPollFromGroupQuery ());
547
564
}
548
565
}
549
566
messages = namedParameterJdbcTemplate .query (query , parameters , this .messageRowMapper );
@@ -582,7 +599,8 @@ protected Message<?> doPollForMessage(String groupIdKey) {
582
599
private boolean doRemoveMessageFromGroup (Object groupId , Message <?> messageToRemove ) {
583
600
final UUID id = messageToRemove .getHeaders ().getId ();
584
601
585
- int updated = this .jdbcTemplate .update (getQuery (this .channelMessageStoreQueryProvider .getDeleteMessageQuery ()),
602
+ int updated = this .jdbcTemplate .update (
603
+ getQuery (Query .DELETE_MESSAGE , () -> this .channelMessageStoreQueryProvider .getDeleteMessageQuery ()),
586
604
new Object [] { getKey (id ), getKey (groupId ), this .region },
587
605
new int [] { Types .VARCHAR , Types .VARCHAR , Types .VARCHAR });
588
606
0 commit comments