-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Expand file tree
/
Copy pathonline_restore_ee.go
More file actions
331 lines (288 loc) · 9.53 KB
/
online_restore_ee.go
File metadata and controls
331 lines (288 loc) · 9.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
// +build !oss
/*
* Copyright 2020 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Dgraph Community License (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* https://github.com/dgraph-io/dgraph/blob/master/licenses/DCL.txt
*/
package worker
import (
"compress/gzip"
"context"
"io"
"net/url"
"strings"
"time"
"github.com/dgraph-io/dgraph/conn"
"github.com/dgraph-io/dgraph/ee/enc"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/schema"
"github.com/golang/protobuf/proto"
"github.com/pkg/errors"
"github.com/spf13/pflag"
"github.com/spf13/viper"
)
const (
errRestoreProposal = "cannot propose restore request"
)
// ProcessRestoreRequest verifies the backup data and sends a restore proposal to each group.
func ProcessRestoreRequest(ctx context.Context, req *pb.RestoreRequest) error {
if req == nil {
return errors.Errorf("restore request cannot be nil")
}
if err := UpdateMembershipState(ctx); err != nil {
return errors.Wrapf(err, "cannot update membership state before restore")
}
memState := GetMembershipState()
currentGroups := make([]uint32, 0)
for gid := range memState.GetGroups() {
currentGroups = append(currentGroups, gid)
}
creds := Credentials{
AccessKey: req.AccessKey,
SecretKey: req.SecretKey,
SessionToken: req.SessionToken,
Anonymous: req.Anonymous,
}
if err := VerifyBackup(req.Location, req.BackupId, &creds, currentGroups); err != nil {
return errors.Wrapf(err, "failed to verify backup")
}
if err := FillRestoreCredentials(req.Location, req); err != nil {
return errors.Wrapf(err, "cannot fill restore proposal with the right credentials")
}
req.RestoreTs = State.GetTimestamp(false)
// TODO: prevent partial restores when proposeRestoreOrSend only sends the restore
// request to a subset of groups.
errCh := make(chan error, len(currentGroups))
for _, gid := range currentGroups {
reqCopy := proto.Clone(req).(*pb.RestoreRequest)
reqCopy.GroupId = gid
go func() {
errCh <- tryRestoreProposal(ctx, reqCopy)
}()
}
for range currentGroups {
if err := <-errCh; err != nil {
return errors.Wrapf(err, "cannot complete restore proposal")
}
}
return nil
}
func proposeRestoreOrSend(ctx context.Context, req *pb.RestoreRequest) error {
if groups().ServesGroup(req.GetGroupId()) && groups().Node.AmLeader() {
_, err := (&grpcWorker{}).Restore(ctx, req)
return err
}
pl := groups().Leader(req.GetGroupId())
if pl == nil {
return conn.ErrNoConnection
}
con := pl.Get()
c := pb.NewWorkerClient(con)
_, err := c.Restore(ctx, req)
return err
}
func retriableRestoreError(err error) bool {
switch {
case err == conn.ErrNoConnection:
// Try to recover from temporary connection issues.
return true
case strings.Contains(err.Error(), "Raft isn't initialized yet"):
// Try to recover if raft has not been initialized.
return true
case strings.Contains(err.Error(), errRestoreProposal):
// Do not try to recover from other errors when sending the proposal.
return false
default:
// Try to recover from other errors (e.g wrong group, waiting for timestamp, etc).
return true
}
}
func tryRestoreProposal(ctx context.Context, req *pb.RestoreRequest) error {
var err error
for i := 0; i < 10; i++ {
err = proposeRestoreOrSend(ctx, req)
if err == nil {
return nil
}
if retriableRestoreError(err) {
time.Sleep(time.Second)
continue
}
return err
}
return err
}
// Restore implements the Worker interface.
func (w *grpcWorker) Restore(ctx context.Context, req *pb.RestoreRequest) (*pb.Status, error) {
var emptyRes pb.Status
if !groups().ServesGroup(req.GroupId) {
return &emptyRes, errors.Errorf("this server doesn't serve group id: %v", req.GroupId)
}
// We should wait to ensure that we have seen all the updates until the StartTs
// of this restore transaction.
if err := posting.Oracle().WaitForTs(ctx, req.RestoreTs); err != nil {
return nil, errors.Wrapf(err, "cannot wait for restore ts %d", req.RestoreTs)
}
err := groups().Node.proposeAndWait(ctx, &pb.Proposal{Restore: req})
if err != nil {
return &emptyRes, errors.Wrapf(err, errRestoreProposal)
}
return &emptyRes, nil
}
// TODO(DGRAPH-1232): Ensure all groups receive the restore proposal.
func handleRestoreProposal(ctx context.Context, req *pb.RestoreRequest) error {
if req == nil {
return errors.Errorf("nil restore request")
}
// Drop all the current data. This also cancels all existing transactions.
dropProposal := pb.Proposal{
Mutations: &pb.Mutations{
GroupId: req.GroupId,
StartTs: req.RestoreTs,
DropOp: pb.Mutations_ALL,
},
}
if err := groups().Node.applyMutations(ctx, &dropProposal); err != nil {
return err
}
// TODO: after the drop, the tablets for the predicates stored in this group's
// backup could be in a different group. The tablets need to be moved.
// Reset tablets and set correct tablets to match the restored backup.
creds := &Credentials{
AccessKey: req.AccessKey,
SecretKey: req.SecretKey,
SessionToken: req.SessionToken,
Anonymous: req.Anonymous,
}
uri, err := url.Parse(req.Location)
if err != nil {
return errors.Wrapf(err, "cannot parse backup location")
}
handler, err := NewUriHandler(uri, creds)
if err != nil {
return errors.Wrapf(err, "cannot create backup handler")
}
manifests, err := handler.GetManifests(uri, req.BackupId)
if err != nil {
return errors.Wrapf(err, "cannot get backup manifests")
}
if len(manifests) == 0 {
return errors.Errorf("no backup manifests found at location %s", req.Location)
}
lastManifest := manifests[len(manifests)-1]
preds, ok := lastManifest.Groups[req.GroupId]
if !ok {
return errors.Errorf("backup manifest does not contain information for group ID %d",
req.GroupId)
}
for _, pred := range preds {
// Force the tablet to be moved to this group, even if it's currently being served
// by another group.
if tablet, err := groups().ForceTablet(pred); err != nil {
return errors.Wrapf(err, "cannot create tablet for restored predicate %s", pred)
} else if tablet.GetGroupId() != req.GroupId {
return errors.Errorf("cannot assign tablet for pred %s to group %d", pred, req.GroupId)
}
}
// Write restored values to disk and update the UID lease.
if err := writeBackup(ctx, req); err != nil {
return errors.Wrapf(err, "cannot write backup")
}
// Load schema back.
if err := schema.LoadFromDb(); err != nil {
return errors.Wrapf(err, "cannot load schema after restore")
}
// Propose a snapshot immediately after all the work is done to prevent the restore
// from being replayed.
if err := groups().Node.proposeSnapshot(1); err != nil {
return errors.Wrapf(err, "cannot propose snapshot after processing restore proposal")
}
// Update the membership state to re-compute the group checksums.
if err := UpdateMembershipState(ctx); err != nil {
return errors.Wrapf(err, "cannot update membership state after restore")
}
return nil
}
// create a config object from the request for use with enc package.
func getEncConfig(req *pb.RestoreRequest) (*viper.Viper, error) {
config := viper.New()
flags := &pflag.FlagSet{}
enc.RegisterFlags(flags)
if err := config.BindPFlags(flags); err != nil {
return nil, errors.Wrapf(err, "bad config bind")
}
// Copy from the request.
config.Set("encryption_key_file", req.EncryptionKeyFile)
config.Set("vault_roleid_file", req.VaultRoleidFile)
config.Set("vault_secretid_file", req.VaultSecretidFile)
// Override only if non-nil
if req.VaultAddr != "" {
config.Set("vault_addr", req.VaultAddr)
}
if req.VaultPath != "" {
config.Set("vault_path", req.VaultPath)
}
if req.VaultField != "" {
config.Set("vault_field", req.VaultField)
}
if req.VaultFormat != "" {
config.Set("vault_format", req.VaultField)
}
return config, nil
}
func writeBackup(ctx context.Context, req *pb.RestoreRequest) error {
res := LoadBackup(req.Location, req.BackupId,
func(r io.Reader, groupId uint32, preds predicateSet) (uint64, error) {
if groupId != req.GroupId {
// LoadBackup will try to call the backup function for every group.
// Exit here if the group is not the one indicated by the request.
return 0, nil
}
cfg, err := getEncConfig(req)
if err != nil {
return 0, errors.Wrapf(err, "unable to get encryption config")
}
key, err := enc.ReadKey(cfg)
if err != nil {
return 0, errors.Wrapf(err, "unable to read key")
}
r, err = enc.GetReader(key, r)
if err != nil {
return 0, errors.Wrapf(err, "cannot get encrypted reader")
}
gzReader, err := gzip.NewReader(r)
if err != nil {
return 0, errors.Wrapf(err, "couldn't create gzip reader")
}
maxUid, err := loadFromBackup(pstore, gzReader, req.RestoreTs, preds)
if err != nil {
return 0, errors.Wrapf(err, "cannot write backup")
}
if maxUid == 0 {
// No need to update the lease, return here.
return 0, nil
}
// Use the value of maxUid to update the uid lease.
pl := groups().connToZeroLeader()
if pl == nil {
return 0, errors.Errorf(
"cannot update uid lease due to no connection to zero leader")
}
zc := pb.NewZeroClient(pl.Get())
if _, err = zc.AssignUids(ctx, &pb.Num{Val: maxUid}); err != nil {
return 0, errors.Wrapf(err, "cannot update max uid lease after restore.")
}
// We return the maxUid to enforce the signature of the method but it will
// be ignored as the uid lease was updated above.
return maxUid, nil
})
if res.Err != nil {
return errors.Wrapf(res.Err, "cannot write backup")
}
return nil
}