Skip to content

Commit 4c8a8cd

Browse files
committed
change(syncer): parallel via waitgroup
1 parent 4475685 commit 4c8a8cd

File tree

2 files changed

+71
-11
lines changed

2 files changed

+71
-11
lines changed

cmd/sync.go

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,12 @@ import (
1010
"github.com/sirupsen/logrus"
1111
"github.com/spf13/cobra"
1212
"strings"
13+
"sync"
1314
)
1415

1516
var (
16-
flagSyncer string
17+
flagSyncer string
18+
flagParallelism int
1719
)
1820

1921
var syncCmd = &cobra.Command{
@@ -27,7 +29,12 @@ var syncCmd = &cobra.Command{
2729
defer cache.Close()
2830

2931
// iterate syncer's
32+
var wg sync.WaitGroup
33+
pos := 0
34+
3035
for _, syncerConfig := range config.Config.Syncer {
36+
syncerConfig := syncerConfig
37+
3138
log := log.WithField("syncer", syncerConfig.Name)
3239

3340
// skip disabled syncer(s)
@@ -43,54 +50,71 @@ var syncCmd = &cobra.Command{
4350
}
4451

4552
// create syncer
46-
sync, err := syncer.New(config.Config, &syncerConfig, syncerConfig.Name)
53+
syncr, err := syncer.New(config.Config, &syncerConfig, syncerConfig.Name)
4754
if err != nil {
4855
log.WithError(err).Error("Failed initializing syncer, skipping...")
4956
continue
5057
}
5158

52-
serviceAccountCount := sync.RemoteServiceAccountFiles.ServiceAccountsCount()
59+
serviceAccountCount := syncr.RemoteServiceAccountFiles.ServiceAccountsCount()
5360
if serviceAccountCount > 0 {
54-
sync.Log.WithField("found_files", serviceAccountCount).Info("Loaded service accounts")
61+
syncr.Log.WithField("found_files", serviceAccountCount).Info("Loaded service accounts")
5562
} else {
5663
// no service accounts were loaded
5764
// check to see if any of the copy or sync remote(s) are banned
58-
banned, expiry := rclone.AnyRemotesBanned(sync.Config.Remotes.Copy)
65+
banned, expiry := rclone.AnyRemotesBanned(syncr.Config.Remotes.Copy)
5966
if banned && !expiry.IsZero() {
6067
// one of the copy remotes is banned, abort
61-
sync.Log.WithFields(logrus.Fields{
68+
syncr.Log.WithFields(logrus.Fields{
6269
"expires_time": expiry,
6370
"expires_in": humanize.Time(expiry),
6471
}).Warn("Cannot proceed with sync as a copy remote is banned")
6572
continue
6673
}
6774

68-
banned, expiry = rclone.AnyRemotesBanned(sync.Config.Remotes.Sync)
75+
banned, expiry = rclone.AnyRemotesBanned(syncr.Config.Remotes.Sync)
6976
if banned && !expiry.IsZero() {
7077
// one of the sync remotes is banned, abort
71-
sync.Log.WithFields(logrus.Fields{
78+
syncr.Log.WithFields(logrus.Fields{
7279
"expires_time": expiry,
7380
"expires_in": humanize.Time(expiry),
7481
}).Warn("Cannot proceed with sync as a sync remote is banned")
7582
continue
7683
}
7784
}
7885

86+
pos += 1
7987
log.Info("Syncer commencing...")
8088

8189
// perform sync
82-
if err := performSync(sync); err != nil {
83-
sync.Log.WithError(err).Error("Error occurred while running syncer, skipping...")
84-
continue
90+
wg.Add(1)
91+
go func(s *syncer.Syncer, w *sync.WaitGroup) {
92+
defer w.Done()
93+
94+
// run syncer
95+
if err := performSync(s); err != nil {
96+
s.Log.WithError(err).Error("Error occurred while running syncer, skipping...")
97+
}
98+
}(syncr, &wg)
99+
100+
// parallelism limit reached?
101+
if pos%flagParallelism == 0 {
102+
log.Info("Waiting before starting next syncer")
103+
wg.Wait()
85104
}
86105
}
106+
107+
// wait for all syncers to finish
108+
log.Info("Waiting for syncer(s) to finish")
109+
wg.Wait()
87110
},
88111
}
89112

90113
func init() {
91114
rootCmd.AddCommand(syncCmd)
92115

93116
syncCmd.Flags().StringVarP(&flagSyncer, "syncer", "s", "", "Run for a specific syncer")
117+
syncCmd.Flags().IntVarP(&flagParallelism, "parallelism", "p", 1, "Max parallel syncers")
94118
}
95119

96120
func performSync(s *syncer.Syncer) error {

rclone/sa.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"sort"
1414
"strconv"
1515
"strings"
16+
"sync"
1617
"time"
1718
)
1819

@@ -34,6 +35,17 @@ type ServiceAccountManager struct {
3435
remoteServiceAccounts map[string]RemoteServiceAccounts
3536
}
3637

38+
var (
39+
mtx sync.Mutex
40+
psac map[string]time.Time
41+
)
42+
43+
/* Private */
44+
45+
func init() {
46+
psac = make(map[string]time.Time)
47+
}
48+
3749
/* Public */
3850

3951
func NewServiceAccountManager(serviceAccountFolders map[string]string) *ServiceAccountManager {
@@ -123,6 +135,10 @@ func (m *ServiceAccountManager) GetServiceAccount(remotePaths ...string) ([]*Rem
123135
var err error
124136
successfulRemotes := make(map[string]*types.Nil)
125137

138+
// acquire global lock
139+
mtx.Lock()
140+
defer mtx.Unlock()
141+
126142
for _, remotePath := range remotePaths {
127143
saFound := false
128144

@@ -155,6 +171,19 @@ func (m *ServiceAccountManager) GetServiceAccount(remotePaths ...string) ([]*Rem
155171
continue
156172
}
157173

174+
// has this service account been issued within N seconds?
175+
expiry, exists := psac[sa.RealPath]
176+
switch {
177+
case exists && expiry.Before(time.Now().UTC()):
178+
// it was issued before, but it was not within N seconds
179+
delete(psac, sa.RealPath)
180+
case exists:
181+
// it was issued before and it has not expired yet
182+
continue
183+
default:
184+
break
185+
}
186+
158187
// this service account is unbanned
159188
serviceAccounts = append(serviceAccounts, &RemoteServiceAccount{
160189
RemoteEnvVar: remote.RemoteEnvVar,
@@ -177,6 +206,13 @@ func (m *ServiceAccountManager) GetServiceAccount(remotePaths ...string) ([]*Rem
177206
break
178207
}
179208

209+
// were service accounts found?
210+
if len(serviceAccounts) > 0 {
211+
for _, sa := range serviceAccounts {
212+
psac[sa.ServiceAccountPath] = time.Now().UTC().Add(10 * time.Second)
213+
}
214+
}
215+
180216
return serviceAccounts, err
181217
}
182218

0 commit comments

Comments
 (0)