Skip to content

Commit 24c701d

Browse files
committed
refactoring and start on syncer package
1 parent 3de7303 commit 24c701d

File tree

9 files changed

+217
-19
lines changed

9 files changed

+217
-19
lines changed

config/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ type BuildVars struct {
2525
type Configuration struct {
2626
Rclone RcloneConfig
2727
Uploader map[string]UploaderConfig
28+
Syncer map[string]SyncerConfig
2829
}
2930

3031
/* Vars */

config/syncer.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package config
2+
3+
type SyncerRemotes struct {
4+
Copy []string
5+
Sync []string
6+
MoveServerSide []UploaderRemotesMoveServerSide `mapstructure:"move_server_side"`
7+
}
8+
9+
type SyncerRcloneParams struct {
10+
Copy []string
11+
Move []string
12+
MoveServerSide []string `mapstructure:"move_server_side"`
13+
}
14+
15+
type SyncerConfig struct {
16+
Enabled bool
17+
ServiceAccountFolder string `mapstructure:"sa_folder"`
18+
SourceRemote string `mapstructure:"source_remote"`
19+
Remotes SyncerRemotes
20+
RcloneParams SyncerRcloneParams `mapstructure:"rclone_params"`
21+
}

config/uploader.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ type UploaderRemotes struct {
2525
Copy []string
2626
Move string
2727
MoveServerSide []UploaderRemotesMoveServerSide `mapstructure:"move_server_side"`
28-
Dedupe []string
2928
}
3029

3130
type UploaderRcloneParams struct {

rclone/copy.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,14 @@ package rclone
22

33
import (
44
"github.com/go-cmd/cmd"
5-
"github.com/l3uddz/crop/config"
65
"github.com/l3uddz/crop/pathutils"
76
"github.com/pkg/errors"
87
"github.com/sirupsen/logrus"
98
)
109

1110
/* Public */
1211

13-
func Copy(u *config.UploaderConfig, from string, to string, serviceAccountFile *pathutils.Path,
12+
func Copy(from string, to string, serviceAccountFile *pathutils.Path,
1413
additionalRcloneParams []string) (bool, int, error) {
1514
// set variables
1615
rLog := log.WithFields(logrus.Fields{
@@ -34,10 +33,7 @@ func Copy(u *config.UploaderConfig, from string, to string, serviceAccountFile *
3433
params = append(params, baseParams...)
3534
}
3635

37-
extraParams := u.RcloneParams.Copy
38-
if additionalRcloneParams != nil {
39-
extraParams = append(extraParams, additionalRcloneParams...)
40-
}
36+
extraParams := additionalRcloneParams
4137

4238
if additionalParams, err := getAdditionalParams(CMD_COPY, extraParams); err != nil {
4339
return false, 1, errors.WithMessagef(err, "failed generating additionalParams to %q: %q -> %q",

rclone/move.go

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,14 @@ package rclone
22

33
import (
44
"github.com/go-cmd/cmd"
5-
"github.com/l3uddz/crop/config"
65
"github.com/l3uddz/crop/pathutils"
76
"github.com/pkg/errors"
87
"github.com/sirupsen/logrus"
98
)
109

1110
/* Public */
1211

13-
func Move(u *config.UploaderConfig, from string, to string, serviceAccountFile *pathutils.Path, serverSide bool,
12+
func Move(from string, to string, serviceAccountFile *pathutils.Path, serverSide bool,
1413
additionalRcloneParams []string) (bool, int, error) {
1514
// set variables
1615
rLog := log.WithFields(logrus.Fields{
@@ -34,15 +33,10 @@ func Move(u *config.UploaderConfig, from string, to string, serviceAccountFile *
3433
params = append(params, baseParams...)
3534
}
3635

37-
extraParams := u.RcloneParams.Move
36+
extraParams := additionalRcloneParams
3837
if serverSide {
39-
// this is a server side move, so add any additional configured params
40-
extraParams = append(extraParams, u.RcloneParams.MoveServerSide...)
4138
// add server side parameter
4239
extraParams = append(extraParams, "--drive-server-side-across-configs")
43-
} else if additionalRcloneParams != nil {
44-
// add additional params from parameters
45-
extraParams = append(extraParams, additionalRcloneParams...)
4640
}
4741

4842
if additionalParams, err := getAdditionalParams(CMD_MOVE, extraParams); err != nil {

syncer/copy.go

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package syncer
2+
3+
import (
4+
"fmt"
5+
"github.com/l3uddz/crop/cache"
6+
"github.com/l3uddz/crop/pathutils"
7+
"github.com/l3uddz/crop/rclone"
8+
"github.com/l3uddz/crop/stringutils"
9+
"github.com/pkg/errors"
10+
"github.com/sirupsen/logrus"
11+
"time"
12+
)
13+
14+
func (s *Syncer) Copy(additionalRcloneParams []string) error {
15+
// set variables
16+
copyParams := s.Config.RcloneParams.Copy
17+
if additionalRcloneParams != nil {
18+
copyParams = append(copyParams, additionalRcloneParams...)
19+
}
20+
21+
// iterate all remotes and run copy
22+
for _, remotePath := range s.Config.Remotes.Copy {
23+
// set variables
24+
attempts := 1
25+
rLog := s.Log.WithFields(logrus.Fields{
26+
"copy_remote": remotePath,
27+
"source_remote": s.Config.SourceRemote,
28+
"attempts": attempts,
29+
})
30+
31+
// copy to remote
32+
for {
33+
// get service account file
34+
var serviceAccount *pathutils.Path
35+
var err error
36+
37+
if s.ServiceAccountCount > 0 {
38+
serviceAccount, err = rclone.GetAvailableServiceAccount(s.ServiceAccountFiles)
39+
if err != nil {
40+
return errors.WithMessagef(err,
41+
"aborting further copy attempts of %q due to serviceAccount exhaustion",
42+
s.Config.SourceRemote)
43+
}
44+
45+
// reset log
46+
rLog = s.Log.WithFields(logrus.Fields{
47+
"copy_remote": remotePath,
48+
"source_remote": s.Config.SourceRemote,
49+
"attempts": attempts,
50+
"service_account": serviceAccount.RealPath,
51+
})
52+
}
53+
54+
// copy
55+
rLog.Info("Copying...")
56+
success, exitCode, err := rclone.Copy(s.Config.SourceRemote, remotePath, serviceAccount, copyParams)
57+
58+
// check result
59+
if err != nil {
60+
rLog.WithError(err).Errorf("Failed unexpectedly...")
61+
return errors.WithMessagef(err, "copy failed unexpectedly with exit code: %v", exitCode)
62+
} else if success {
63+
// successful exit code
64+
break
65+
}
66+
67+
// is this an exit code we can retry?
68+
switch exitCode {
69+
case rclone.EXIT_FATAL_ERROR:
70+
// are we using service accounts?
71+
if s.ServiceAccountCount == 0 {
72+
// we are not using service accounts, so mark this remote as banned
73+
if err := cache.Set(stringutils.FromLeftUntil(remotePath, ":"),
74+
time.Now().UTC().Add(25*time.Hour)); err != nil {
75+
rLog.WithError(err).Errorf("Failed banning remote")
76+
}
77+
78+
return fmt.Errorf("copy failed with exit code: %v", exitCode)
79+
}
80+
81+
// ban this service account
82+
if err := cache.Set(serviceAccount.RealPath, time.Now().UTC().Add(25*time.Hour)); err != nil {
83+
rLog.WithError(err).Error("Failed banning service account, cannot try again...")
84+
return fmt.Errorf("failed banning service account: %v", serviceAccount.RealPath)
85+
}
86+
87+
// attempt copy again
88+
rLog.Warnf("Copy failed with retryable exit code %v, trying again...", exitCode)
89+
attempts++
90+
continue
91+
default:
92+
return fmt.Errorf("failed and cannot proceed with exit code: %v", exitCode)
93+
}
94+
}
95+
}
96+
97+
return nil
98+
}

syncer/syncer.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package syncer
2+
3+
import (
4+
"github.com/l3uddz/crop/config"
5+
"github.com/l3uddz/crop/logger"
6+
"github.com/l3uddz/crop/pathutils"
7+
"github.com/l3uddz/crop/stringutils"
8+
"github.com/sirupsen/logrus"
9+
"regexp"
10+
"sort"
11+
"strconv"
12+
"strings"
13+
)
14+
15+
type Syncer struct {
16+
// Public
17+
Log *logrus.Entry
18+
GlobalConfig *config.Configuration
19+
Config *config.SyncerConfig
20+
Name string
21+
22+
ServiceAccountFiles []pathutils.Path
23+
ServiceAccountCount int
24+
}
25+
26+
func New(config *config.Configuration, syncerConfig *config.SyncerConfig, syncerName string) (*Syncer, error) {
27+
// init syncer dependencies
28+
// - service account files
29+
var serviceAccountFiles []pathutils.Path
30+
if syncerConfig.ServiceAccountFolder != "" {
31+
serviceAccountFiles, _ = pathutils.GetPathsInFolder(syncerConfig.ServiceAccountFolder, true,
32+
false, func(path string) *string {
33+
lowerPath := strings.ToLower(path)
34+
35+
// ignore non json files
36+
if !strings.HasSuffix(lowerPath, ".json") {
37+
return nil
38+
}
39+
40+
return &path
41+
})
42+
43+
// sort service files
44+
if len(serviceAccountFiles) > 0 {
45+
re := regexp.MustCompile("[0-9]+")
46+
sort.SliceStable(serviceAccountFiles, func(i, j int) bool {
47+
is := stringutils.NewOrExisting(re.FindString(serviceAccountFiles[i].RealPath), "0")
48+
js := stringutils.NewOrExisting(re.FindString(serviceAccountFiles[j].RealPath), "0")
49+
50+
in, err := strconv.Atoi(is)
51+
if err != nil {
52+
return false
53+
}
54+
jn, err := strconv.Atoi(js)
55+
if err != nil {
56+
return false
57+
}
58+
59+
return in < jn
60+
})
61+
}
62+
}
63+
64+
// init uploader
65+
syncer := &Syncer{
66+
Log: logger.GetLogger(syncerName),
67+
GlobalConfig: config,
68+
Config: syncerConfig,
69+
Name: syncerName,
70+
ServiceAccountFiles: serviceAccountFiles,
71+
ServiceAccountCount: len(serviceAccountFiles),
72+
}
73+
74+
return syncer, nil
75+
}

uploader/copy.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,12 @@ import (
1212
)
1313

1414
func (u *Uploader) Copy(additionalRcloneParams []string) error {
15+
// set variables
16+
copyParams := u.Config.RcloneParams.Copy
17+
if additionalRcloneParams != nil {
18+
copyParams = append(copyParams, additionalRcloneParams...)
19+
}
20+
1521
// iterate all remotes and run copy
1622
for _, remotePath := range u.Config.Remotes.Copy {
1723
// set variables
@@ -47,8 +53,7 @@ func (u *Uploader) Copy(additionalRcloneParams []string) error {
4753

4854
// copy
4955
rLog.Info("Copying...")
50-
success, exitCode, err := rclone.Copy(u.Config, u.Config.LocalFolder, remotePath, serviceAccount,
51-
additionalRcloneParams)
56+
success, exitCode, err := rclone.Copy(u.Config.LocalFolder, remotePath, serviceAccount, copyParams)
5257

5358
// check result
5459
if err != nil {

uploader/move.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ type MoveInstruction struct {
1919

2020
func (u *Uploader) Move(serverSide bool, additionalRcloneParams []string) error {
2121
var moveRemotes []MoveInstruction
22+
var extraParams []string
2223

2324
// create move instructions
2425
if serverSide {
@@ -30,13 +31,22 @@ func (u *Uploader) Move(serverSide bool, additionalRcloneParams []string) error
3031
ServerSide: true,
3132
})
3233
}
34+
35+
extraParams = u.Config.RcloneParams.MoveServerSide
3336
} else {
3437
// this is a normal move (to only one location)
3538
moveRemotes = append(moveRemotes, MoveInstruction{
3639
From: u.Config.LocalFolder,
3740
To: u.Config.Remotes.Move,
3841
ServerSide: false,
3942
})
43+
44+
extraParams = u.Config.RcloneParams.Move
45+
}
46+
47+
// set variables
48+
if additionalRcloneParams != nil {
49+
extraParams = append(extraParams, additionalRcloneParams...)
4050
}
4151

4252
// iterate all remotes and run copy
@@ -75,8 +85,7 @@ func (u *Uploader) Move(serverSide bool, additionalRcloneParams []string) error
7585

7686
// move
7787
rLog.Info("Moving...")
78-
success, exitCode, err := rclone.Move(u.Config, move.From, move.To, serviceAccount, serverSide,
79-
additionalRcloneParams)
88+
success, exitCode, err := rclone.Move(move.From, move.To, serviceAccount, serverSide, extraParams)
8089

8190
// check result
8291
if err != nil {

0 commit comments

Comments
 (0)