33
44using System ;
55using System . Threading ;
6+ using System . Threading . Tasks ;
67using Garnet . common ;
8+ using Garnet . server ;
79using Garnet . server . TLS ;
810using Microsoft . Extensions . Logging ;
911using Tsavorite . core ;
@@ -30,32 +32,42 @@ internal sealed partial class ClusterManager : IDisposable
3032 /// </summary>
3133 readonly IGarnetTlsOptions tlsOptions ;
3234
35+ /// <summary>
36+ /// Garnet server options
37+ /// </summary>
38+ readonly GarnetServerOptions serverOptions ;
39+
3340 /// <summary>
3441 /// ClusterProvider
3542 /// </summary>
3643 public readonly ClusterProvider clusterProvider ;
3744
45+ /// <summary>
46+ /// Flush count used to indicate a pending flush operation.
47+ /// </summary>
48+ int flushCount = 0 ;
49+
3850 /// <summary>
3951 /// Constructor
4052 /// </summary>
41- public unsafe ClusterManager ( ClusterProvider clusterProvider , ILogger logger = null )
53+ public ClusterManager ( ClusterProvider clusterProvider , ILogger logger = null )
4254 {
4355 this . clusterProvider = clusterProvider ;
44- var opts = clusterProvider . serverOptions ;
56+ this . serverOptions = clusterProvider . serverOptions ;
4557 var clusterFolder = "/cluster" ;
46- var clusterDataPath = opts . CheckpointDir + clusterFolder ;
47- var deviceFactory = opts . GetInitializedDeviceFactory ( clusterDataPath ) ;
58+ var clusterDataPath = serverOptions . CheckpointDir + clusterFolder ;
59+ var deviceFactory = serverOptions . GetInitializedDeviceFactory ( clusterDataPath ) ;
4860
4961 clusterConfigDevice = deviceFactory . Get ( new FileDescriptor ( directoryName : "" , fileName : "nodes.conf" ) ) ;
5062 pool = new ( 1 , ( int ) clusterConfigDevice . SectorSize ) ;
5163
5264 var clusterEndpoint = clusterProvider . storeWrapper . GetClusterEndpoint ( ) ;
5365
5466 this . logger = logger ;
55- var recoverConfig = clusterConfigDevice . GetFileSize ( 0 ) > 0 && ! opts . CleanClusterConfig ;
67+ var recoverConfig = clusterConfigDevice . GetFileSize ( 0 ) > 0 && ! serverOptions . CleanClusterConfig ;
5668
57- tlsOptions = opts . TlsOptions ;
58- if ( ! opts . CleanClusterConfig )
69+ tlsOptions = serverOptions . TlsOptions ;
70+ if ( ! serverOptions . CleanClusterConfig )
5971 logger ? . LogInformation ( "Attempt to recover cluster config from: {configFilename}" , clusterConfigDevice . FileName ) ;
6072 else
6173 logger ? . LogInformation ( "Skipping recovery of local config due to CleanClusterConfig flag set" ) ;
@@ -84,10 +96,37 @@ public unsafe ClusterManager(ClusterProvider clusterProvider, ILogger logger = n
8496 clusterConnectionStore = new GarnetClusterConnectionStore ( logger : logger ) ;
8597 InitLocal ( clusterEndpoint . Address . ToString ( ) , clusterEndpoint . Port , recoverConfig ) ;
8698 logger ? . LogInformation ( "{NodeInfoStartup}" , CurrentConfig . GetClusterInfo ( clusterProvider ) . TrimEnd ( '\n ' ) ) ;
87- gossipDelay = TimeSpan . FromSeconds ( opts . GossipDelay ) ;
88- clusterTimeout = opts . ClusterTimeout <= 0 ? Timeout . InfiniteTimeSpan : TimeSpan . FromSeconds ( opts . ClusterTimeout ) ;
99+ gossipDelay = TimeSpan . FromSeconds ( serverOptions . GossipDelay ) ;
100+ clusterTimeout = serverOptions . ClusterTimeout <= 0 ? Timeout . InfiniteTimeSpan : TimeSpan . FromSeconds ( serverOptions . ClusterTimeout ) ;
89101 numActiveTasks = 0 ;
90- GossipSamplePercent = opts . GossipSamplePercent ;
102+ GossipSamplePercent = serverOptions . GossipSamplePercent ;
103+
104+ // Run Background task
105+ if ( serverOptions . ClusterConfigFlushFrequencyMs > 0 )
106+ Task . Run ( ( ) => FlushTask ( ) ) ;
107+
108+ async Task FlushTask ( )
109+ {
110+ var flushConfigFrequency = TimeSpan . FromMilliseconds ( serverOptions . ClusterConfigFlushFrequencyMs ) ;
111+ try
112+ {
113+ Interlocked . Increment ( ref numActiveTasks ) ;
114+ while ( true )
115+ {
116+ if ( ctsGossip . IsCancellationRequested )
117+ return ;
118+
119+ if ( Interlocked . CompareExchange ( ref flushCount , 0 , 0 ) > 0 )
120+ ClusterUtils . WriteInto ( clusterConfigDevice , pool , 0 , currentConfig . ToByteArray ( ) , logger : logger ) ;
121+
122+ await Task . Delay ( flushConfigFrequency , ctsGossip . Token ) . ConfigureAwait ( false ) ;
123+ }
124+ }
125+ finally
126+ {
127+ Interlocked . Decrement ( ref numActiveTasks ) ;
128+ }
129+ }
91130 }
92131
93132 /// <summary>
@@ -118,10 +157,23 @@ public void DisposeBackgroundTasks()
118157 public void Start ( )
119158 => TryStartGossipTasks ( ) ;
120159
160+ /// <summary>
161+ /// Flush current config to disk
162+ /// </summary>
121163 public void FlushConfig ( )
122164 {
123- lock ( this )
124- ClusterUtils . WriteInto ( clusterConfigDevice , pool , 0 , currentConfig . ToByteArray ( ) , logger : logger ) ;
165+ if ( serverOptions . ClusterConfigFlushFrequencyMs == - 1 )
166+ return ;
167+
168+ if ( serverOptions . ClusterConfigFlushFrequencyMs > 0 )
169+ Interlocked . Increment ( ref flushCount ) ;
170+ else
171+ {
172+ lock ( this )
173+ {
174+ ClusterUtils . WriteInto ( clusterConfigDevice , pool , 0 , currentConfig . ToByteArray ( ) , logger : logger ) ;
175+ }
176+ }
125177 }
126178
127179 /// <summary>
@@ -157,6 +209,10 @@ private void InitLocal(string address, int port, bool recoverConfig)
157209 }
158210 }
159211
212+ /// <summary>
213+ /// Implements CLUSTER INFO command
214+ /// </summary>
215+ /// <returns></returns>
160216 public string GetInfo ( )
161217 {
162218 var current = CurrentConfig ;
@@ -175,6 +231,11 @@ public string GetInfo()
175231 return ClusterInfo ;
176232 }
177233
234+ /// <summary>
235+ /// Return range of slots from provided array of slots
236+ /// </summary>
237+ /// <param name="slots"></param>
238+ /// <returns></returns>
178239 public static string GetRange ( int [ ] slots )
179240 {
180241 var range = "> " ;
0 commit comments