Skip to content

Commit ad5fc6e

Browse files
committed
added dedupe to syncer
1 parent 3db738e commit ad5fc6e

File tree

6 files changed

+143
-3
lines changed

6 files changed

+143
-3
lines changed

cmd/sync.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,17 @@ func performSync(s *syncer.Syncer) error {
115115
s.Log.Info("Finished syncs!")
116116
}
117117

118+
/* Sync */
119+
if len(s.Config.Remotes.Dedupe) > 0 {
120+
s.Log.Info("Running dedupes...")
121+
122+
if err := s.Dedupe(nil); err != nil {
123+
return errors.WithMessage(err, "failed performing all dedupes")
124+
}
125+
126+
s.Log.Info("Finished dedupes!")
127+
}
128+
118129
/* Move Server Side */
119130
if len(s.Config.Remotes.MoveServerSide) > 0 {
120131
s.Log.Info("Running move server-sides...")

config/syncer.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@ package config
33
type SyncerRemotes struct {
44
Copy []string
55
Sync []string
6+
Dedupe []string
67
MoveServerSide []UploaderRemotesMoveServerSide `mapstructure:"move_server_side"`
78
}
89

910
type SyncerRcloneParams struct {
1011
Copy []string
1112
Sync []string
13+
Dedupe []string
1214
MoveServerSide []string `mapstructure:"move_server_side"`
1315
}
1416

rclone/dedupe.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package rclone
2+
3+
import (
4+
"github.com/go-cmd/cmd"
5+
"github.com/pkg/errors"
6+
"github.com/sirupsen/logrus"
7+
)
8+
9+
/* Public */
10+
11+
func Dedupe(remotePath string, additionalRcloneParams []string) (bool, int, error) {
12+
// set variables
13+
rLog := log.WithFields(logrus.Fields{
14+
"action": CMD_DEDUPE,
15+
"remote_path": remotePath,
16+
})
17+
result := false
18+
19+
// generate required rclone parameters
20+
params := []string{
21+
CMD_DEDUPE,
22+
remotePath,
23+
}
24+
25+
if baseParams, err := getBaseParams(); err != nil {
26+
return false, 1, errors.Wrapf(err, "failed generating baseParams to %q: %q", CMD_DEDUPE,
27+
remotePath)
28+
} else {
29+
params = append(params, baseParams...)
30+
}
31+
32+
if additionalParams, err := getAdditionalParams(CMD_DEDUPE, additionalRcloneParams); err != nil {
33+
return false, 1, errors.Wrapf(err, "failed generating additionalParams to %s: %q",
34+
CMD_DEDUPE, remotePath)
35+
} else {
36+
params = append(params, additionalParams...)
37+
}
38+
39+
rLog.Debugf("Generated params: %v", params)
40+
41+
// setup cmd
42+
cmdOptions := cmd.Options{
43+
Buffered: false,
44+
Streaming: true,
45+
}
46+
rcloneCmd := cmd.NewCmdOptions(cmdOptions, cfg.Rclone.Path, params...)
47+
48+
// live stream logs
49+
doneChan := make(chan struct{})
50+
go func() {
51+
defer close(doneChan)
52+
53+
for rcloneCmd.Stdout != nil || rcloneCmd.Stderr != nil {
54+
select {
55+
case line, open := <-rcloneCmd.Stdout:
56+
if !open {
57+
rcloneCmd.Stdout = nil
58+
continue
59+
}
60+
log.Info(line)
61+
case line, open := <-rcloneCmd.Stderr:
62+
if !open {
63+
rcloneCmd.Stderr = nil
64+
continue
65+
}
66+
log.Info(line)
67+
}
68+
}
69+
}()
70+
71+
// run command
72+
rLog.Debug("Starting...")
73+
74+
status := <-rcloneCmd.Start()
75+
<-doneChan
76+
77+
// check status
78+
switch status.Exit {
79+
case EXIT_SUCCESS:
80+
result = true
81+
default:
82+
break
83+
}
84+
85+
rLog.WithField("exit_code", status.Exit).Debug("Finished")
86+
return result, status.Exit, status.Error
87+
}

rclone/param.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,8 @@ func getAdditionalParams(cmd string, extraParams []string) ([]string, error) {
5858
break
5959
case CMD_DEDUPE:
6060
params = append(params,
61-
// keep newest duplicate file
62-
"--dedupe-mode", "newest",
61+
// tpslimit
62+
"--tpslimit", "5",
6363
)
6464
default:
6565
break

syncer/dedupe.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package syncer
2+
3+
import (
4+
"fmt"
5+
"github.com/l3uddz/crop/rclone"
6+
"github.com/pkg/errors"
7+
"github.com/sirupsen/logrus"
8+
)
9+
10+
func (s *Syncer) Dedupe(additionalRcloneParams []string) error {
11+
extraParams := s.Config.RcloneParams.Dedupe
12+
if additionalRcloneParams != nil {
13+
extraParams = append(extraParams, additionalRcloneParams...)
14+
}
15+
16+
// iterate all remotes and run dedupe
17+
for _, dedupeRemote := range s.Config.Remotes.Dedupe {
18+
// set variables
19+
rLog := s.Log.WithFields(logrus.Fields{
20+
"dedupe_remote": dedupeRemote,
21+
})
22+
23+
// dedupe remote
24+
rLog.Info("Deduping...")
25+
success, exitCode, err := rclone.Dedupe(dedupeRemote, extraParams)
26+
27+
// check result
28+
if err != nil {
29+
rLog.WithError(err).Errorf("Failed unexpectedly...")
30+
return errors.WithMessagef(err, "dedupe failed unexpectedly with exit code: %v", exitCode)
31+
} else if success {
32+
// successful exit code
33+
continue
34+
}
35+
36+
return fmt.Errorf("dedupe failed with exit code: %v", exitCode)
37+
}
38+
39+
return nil
40+
}

syncer/move.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ func (s *Syncer) Move(additionalRcloneParams []string) error {
3030
extraParams = append(extraParams, additionalRcloneParams...)
3131
}
3232

33-
// iterate all remotes and run copy
33+
// iterate all remotes and run move
3434
for _, move := range moveRemotes {
3535
// set variables
3636
attempts := 1

0 commit comments

Comments
 (0)