@@ -2,8 +2,10 @@ package test_helpers
2
2
3
3
import (
4
4
"context"
5
+ "errors"
5
6
"fmt"
6
7
"reflect"
8
+ "sync"
7
9
"time"
8
10
9
11
"github.com/tarantool/go-tarantool/v2"
@@ -206,6 +208,45 @@ func SetInstanceRO(ctx context.Context, dialer tarantool.Dialer, connOpts tarant
206
208
return err
207
209
}
208
210
211
+ checkRole := func (conn * tarantool.Connection , isReplica bool ) string {
212
+ data , err := conn .Do (tarantool .NewCallRequest ("box.info" )).Get ()
213
+ switch {
214
+ case err != nil :
215
+ return fmt .Sprintf ("failed to get box.info: %s" , err )
216
+ case len (data ) < 1 :
217
+ return "box.info is empty"
218
+ }
219
+
220
+ status , statusFound := data [0 ].(map [interface {}]interface {})["status" ]
221
+ readonly , readonlyFound := data [0 ].(map [interface {}]interface {})["ro" ]
222
+ switch {
223
+ case ! statusFound :
224
+ return "box.info.status is missing"
225
+ case status != "running" :
226
+ return fmt .Sprintf ("box.info.status='%s' (waiting for 'running')" , status )
227
+ case ! readonlyFound :
228
+ return "box.info.ro is missing"
229
+ case readonly != isReplica :
230
+ return fmt .Sprintf ("box.info.ro='%v' (waiting for '%v')" , readonly , isReplica )
231
+ default :
232
+ return ""
233
+ }
234
+ }
235
+
236
+ problem := "not checked yet"
237
+
238
+ // Wait for the role to be applied.
239
+ for len (problem ) != 0 {
240
+ select {
241
+ case <- time .After (10 * time .Millisecond ):
242
+ case <- ctx .Done ():
243
+ return fmt .Errorf ("%w: failed to apply role, the last problem: %s" ,
244
+ ctx .Err (), problem )
245
+ }
246
+
247
+ problem = checkRole (conn , isReplica )
248
+ }
249
+
209
250
return nil
210
251
}
211
252
@@ -215,16 +256,23 @@ func SetClusterRO(dialers []tarantool.Dialer, connOpts tarantool.Opts,
215
256
return fmt .Errorf ("number of servers should be equal to number of roles" )
216
257
}
217
258
259
+ ctx , cancel := GetConnectContext ()
260
+ defer cancel ()
261
+
262
+ // Apply roles in parallel.
263
+ errs := make ([]error , len (dialers ))
264
+ var wg sync.WaitGroup
265
+ wg .Add (len (dialers ))
218
266
for i , dialer := range dialers {
219
- ctx , cancel := GetConnectContext ()
220
- err := SetInstanceRO (ctx , dialer , connOpts , roles [i ])
221
- cancel ()
222
- if err != nil {
223
- return err
224
- }
267
+ // Pass loop variables to avoid its closure.
268
+ go func (i int , dialer tarantool.Dialer ) {
269
+ defer wg .Done ()
270
+ errs [i ] = SetInstanceRO (ctx , dialer , connOpts , roles [i ])
271
+ }(i , dialer )
225
272
}
273
+ wg .Wait ()
226
274
227
- return nil
275
+ return errors . Join ( errs ... )
228
276
}
229
277
230
278
func StartTarantoolInstances (instsOpts []StartOpts ) ([]* TarantoolInstance , error ) {
0 commit comments