1
1
package io .searchbox .client ;
2
2
3
3
4
+ import com .google .common .collect .ImmutableSet ;
5
+ import com .google .common .collect .Iterators ;
4
6
import com .google .gson .Gson ;
5
7
import com .google .gson .GsonBuilder ;
6
- import io .searchbox .client .config .RoundRobinServerList ;
7
- import io .searchbox .client .config .ServerList ;
8
8
import io .searchbox .client .config .discovery .NodeChecker ;
9
9
import io .searchbox .client .config .exception .NoServerConfiguredException ;
10
10
import io .searchbox .client .config .idle .IdleConnectionReaper ;
11
- import io . searchbox . client . util . PaddedAtomicReference ;
11
+ import org . apache . commons . lang3 . tuple . Pair ;
12
12
import org .slf4j .Logger ;
13
13
import org .slf4j .LoggerFactory ;
14
14
15
- import java .util .LinkedHashSet ;
15
+ import java .util .Iterator ;
16
16
import java .util .Set ;
17
+ import java .util .concurrent .atomic .AtomicReference ;
17
18
18
19
/**
19
20
* @author Dogukan Sonmez
20
21
*/
21
22
public abstract class AbstractJestClient implements JestClient {
22
23
23
24
final static Logger log = LoggerFactory .getLogger (AbstractJestClient .class );
25
+
24
26
public static final String ELASTIC_SEARCH_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ" ;
25
- private final PaddedAtomicReference < ServerList > listOfServers = new PaddedAtomicReference < ServerList >();
27
+
26
28
protected Gson gson = new GsonBuilder ()
27
29
.setDateFormat (ELASTIC_SEARCH_DATE_FORMAT )
28
30
.create ();
29
31
32
+ // server pool = Pair of (pool size, pool iterator)
33
+ private final AtomicReference <Pair <Integer , Iterator <String >>> serverPoolReference =
34
+ new AtomicReference <Pair <Integer , Iterator <String >>>(Pair .<Integer , Iterator <String >>of (0 , ImmutableSet .<String >of ().iterator ()));
30
35
private NodeChecker nodeChecker ;
31
36
private IdleConnectionReaper idleConnectionReaper ;
32
37
@@ -38,23 +43,13 @@ public void setIdleConnectionReaper(IdleConnectionReaper idleConnectionReaper) {
38
43
this .idleConnectionReaper = idleConnectionReaper ;
39
44
}
40
45
41
- public LinkedHashSet <String > getServers () {
42
- ServerList server = listOfServers .get ();
43
- if (server != null ) return new LinkedHashSet <String >(server .getServers ());
44
- else return null ;
45
- }
46
-
47
- public void setServers (ServerList list ) {
48
- listOfServers .set (list );
49
- }
50
-
51
46
public void setServers (Set <String > servers ) {
52
- try {
53
- RoundRobinServerList serverList = new RoundRobinServerList ( servers );
54
- listOfServers . set ( serverList );
55
- } catch ( NoServerConfiguredException noServers ) {
56
- listOfServers . set ( null );
57
- log .warn ( "No servers are currently available for the client to talk to." );
47
+ serverPoolReference . set ( Pair . of ( servers . size (), Iterators . cycle ( servers )));
48
+
49
+ if ( servers . isEmpty ()) {
50
+ log . warn ( "No servers are currently available to connect." );
51
+ } else if ( log . isDebugEnabled ()) {
52
+ log .debug ( "Server pool was updated to contain {} servers." , servers . size () );
58
53
}
59
54
}
60
55
@@ -69,12 +64,19 @@ public void shutdownClient() {
69
64
}
70
65
}
71
66
72
- protected String getElasticSearchServer () {
73
- ServerList serverList = listOfServers .get ();
74
- if (serverList != null ) return serverList .getServer ();
67
+ /**
68
+ * @throws io.searchbox.client.config.exception.NoServerConfiguredException
69
+ */
70
+ protected String getNextServer () {
71
+ Iterator <String > iterator = serverPoolReference .get ().getValue ();
72
+ if (iterator .hasNext ()) return iterator .next ();
75
73
else throw new NoServerConfiguredException ("No Server is assigned to client to connect" );
76
74
}
77
75
76
+ protected int getServerPoolSize () {
77
+ return serverPoolReference .get ().getKey ();
78
+ }
79
+
78
80
protected String getRequestURL (String elasticSearchServer , String uri ) {
79
81
StringBuilder sb = new StringBuilder (elasticSearchServer );
80
82
0 commit comments