Skip to content

Commit 9090b16

Browse files
committed
Add asynchronous task API (#7781)
* Revert dd183aa * Revert 4418a7a * Cherry pick cfdf7a5 * Cherry pick 60bec16 * Cherry pick 9e1337b * Cherry pick 6b188f2 (cherry picked from commit 7e2e860)
1 parent 8259e24 commit 9090b16

39 files changed

Lines changed: 2105 additions & 1188 deletions

dgraph/cmd/alpha/admin.go

Lines changed: 0 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,6 @@ func getAdminMux() *http.ServeMux {
8383
http.MethodPut: true,
8484
http.MethodPost: true,
8585
}, adminAuthHandler(http.HandlerFunc(drainingHandler))))
86-
adminMux.Handle("/admin/export", allowedMethodsHandler(allowedMethods{http.MethodGet: true},
87-
adminAuthHandler(http.HandlerFunc(exportHandler))))
8886
adminMux.Handle("/admin/config/cache_mb", allowedMethodsHandler(allowedMethods{
8987
http.MethodGet: true,
9088
http.MethodPut: true,
@@ -160,46 +158,6 @@ func shutDownHandler(w http.ResponseWriter, r *http.Request) {
160158
x.Check2(w.Write([]byte(`{"code": "Success", "message": "Server is shutting down"}`)))
161159
}
162160

163-
func exportHandler(w http.ResponseWriter, r *http.Request) {
164-
if err := r.ParseForm(); err != nil {
165-
x.SetHttpStatus(w, http.StatusBadRequest, "Parse of export request failed.")
166-
return
167-
}
168-
169-
format := worker.DefaultExportFormat
170-
if vals, ok := r.Form["format"]; ok {
171-
if len(vals) > 1 {
172-
x.SetHttpStatus(w, http.StatusBadRequest,
173-
"Only one export format may be specified.")
174-
return
175-
}
176-
format = worker.NormalizeExportFormat(vals[0])
177-
if format == "" {
178-
x.SetHttpStatus(w, http.StatusBadRequest, "Invalid export format.")
179-
return
180-
}
181-
}
182-
183-
gqlReq := &schema.Request{
184-
Query: `
185-
mutation export($format: String) {
186-
export(input: {format: $format}) {
187-
response {
188-
code
189-
}
190-
}
191-
}`,
192-
Variables: map[string]interface{}{},
193-
}
194-
195-
if resp := resolveWithAdminServer(gqlReq, r, adminServer); len(resp.Errors) != 0 {
196-
x.SetStatus(w, resp.Errors[0].Message, "Export failed.")
197-
return
198-
}
199-
w.Header().Set("Content-Type", "application/json")
200-
x.Check2(w.Write([]byte(`{"code": "Success", "message": "Export completed."}`)))
201-
}
202-
203161
func memoryLimitHandler(w http.ResponseWriter, r *http.Request) {
204162
switch r.Method {
205163
case http.MethodGet:

dgraph/cmd/alpha/admin_backup.go

Lines changed: 0 additions & 68 deletions
This file was deleted.

dgraph/cmd/alpha/run.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -755,6 +755,7 @@ func run() {
755755
glog.Infof("worker.Config: %+v", worker.Config)
756756

757757
worker.InitServerState()
758+
worker.InitTasks()
758759

759760
if Alpha.Conf.GetBool("expose_trace") {
760761
// TODO: Remove this once we get rid of event logs.

dgraph/cmd/bulk/mapper.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,10 @@ type shardState struct {
5757

5858
func newMapperBuffer(opt *options) *z.Buffer {
5959
sz := float64(opt.MapBufSize) * 1.1
60-
buf, err := z.NewBufferWithDir(int(sz), 2*int(opt.MapBufSize), z.UseMmap,
61-
filepath.Join(opt.TmpDir, bufferDir), "Mapper.Buffer")
60+
tmpDir := filepath.Join(opt.TmpDir, bufferDir)
61+
buf, err := z.NewBufferTmp(tmpDir, int(sz))
6262
x.Check(err)
63-
return buf
63+
return buf.WithMaxSize(2 * int(opt.MapBufSize))
6464
}
6565

6666
func newMapper(st *state) *mapper {

dgraph/cmd/bulk/reduce.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -434,11 +434,9 @@ func bufferStats(cbuf *z.Buffer) {
434434
}
435435

436436
func getBuf(dir string) *z.Buffer {
437-
cbuf, err := z.NewBufferWithDir(64<<20, 64<<30, z.UseCalloc,
438-
filepath.Join(dir, bufferDir), "Reducer.GetBuf")
439-
x.Check(err)
440-
cbuf.AutoMmapAfter(1 << 30)
441-
return cbuf
437+
return z.NewBuffer(64<<20, "Reducer.GetBuf").
438+
WithAutoMmap(1<<30, filepath.Join(dir, bufferDir)).
439+
WithMaxSize(64 << 30)
442440
}
443441

444442
func (r *reducer) reduce(partitionKeys [][]byte, mapItrs []*mapIterator, ci *countIndexer) {

dgraph/cmd/zero/oracle.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,15 +59,16 @@ func (o *Oracle) Init() {
5959
o.commits = make(map[uint64]uint64)
6060
// Remove the older btree file, before creating NewTree, as it may contain stale data leading
6161
// to wrong results.
62-
o.keyCommit = z.NewTree()
62+
o.keyCommit = z.NewTree("oracle")
6363
o.subscribers = make(map[int]chan pb.OracleDelta)
6464
o.updates = make(chan *pb.OracleDelta, 100000) // Keeping 1 second worth of updates.
6565
o.doneUntil.Init(nil)
6666
go o.sendDeltasToSubscribers()
6767
}
6868

69-
// oracle close releases the memory associated with btree used for keycommit.
69+
// close releases the memory associated with btree used for keycommit.
7070
func (o *Oracle) close() {
71+
o.keyCommit.Close()
7172
}
7273

7374
func (o *Oracle) updateStartTxnTs(ts uint64) {

dgraph/cmd/zero/run.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ instances to achieve high-availability.
127127
Head("Audit options").
128128
Flag("output",
129129
`[stdout, /path/to/dir] This specifies where audit logs should be output to.
130-
"stdout" is for standard output. You can also specify the directory where audit logs
130+
"stdout" is for standard output. You can also specify the directory where audit logs
131131
will be saved. When stdout is specified as output other fields will be ignored.`).
132132
Flag("compress",
133133
"Enables the compression of old audit logs.").

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ require (
1919
github.com/dgraph-io/gqlgen v0.13.2
2020
github.com/dgraph-io/gqlparser/v2 v2.2.0
2121
github.com/dgraph-io/graphql-transport-ws v0.0.0-20210223074046-e5b8b80bb4ed
22-
github.com/dgraph-io/ristretto v0.0.4-0.20210310100713-a4346e5d1f90
22+
github.com/dgraph-io/ristretto v0.0.4-0.20210504190834-0bf2acd73aa3
2323
github.com/dgraph-io/simdjson-go v0.3.0
2424
github.com/dgrijalva/jwt-go v3.2.0+incompatible
2525
github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1

go.sum

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ github.com/cenkalti/backoff v2.1.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QH
8989
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
9090
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
9191
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
92+
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
93+
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
9294
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
9395
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd h1:qMd81Ts1T2OTKmB4acZcyKaMtRnY5Y44NuXGX2GFJ1w=
9496
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI=
@@ -129,8 +131,8 @@ github.com/dgraph-io/gqlparser/v2 v2.2.0/go.mod h1:MYS4jppjyx8b9tuUtjV7jU1UFZK6P
129131
github.com/dgraph-io/graphql-transport-ws v0.0.0-20210223074046-e5b8b80bb4ed h1:pgGMBoTtFhR+xkyzINaToLYRurHn+6pxMYffIGmmEPc=
130132
github.com/dgraph-io/graphql-transport-ws v0.0.0-20210223074046-e5b8b80bb4ed/go.mod h1:7z3c/5w0sMYYZF5bHsrh8IH4fKwG5O5Y70cPH1ZLLRQ=
131133
github.com/dgraph-io/ristretto v0.0.4-0.20210309073149-3836124cdc5a/go.mod h1:MIonLggsKgZLUSt414ExgwNtlOL5MuEoAJP514mwGe8=
132-
github.com/dgraph-io/ristretto v0.0.4-0.20210310100713-a4346e5d1f90 h1:arWVlUO9NhZ/2vWprIqpe825GISUPpgJhU/b0ep3j/M=
133-
github.com/dgraph-io/ristretto v0.0.4-0.20210310100713-a4346e5d1f90/go.mod h1:MIonLggsKgZLUSt414ExgwNtlOL5MuEoAJP514mwGe8=
134+
github.com/dgraph-io/ristretto v0.0.4-0.20210504190834-0bf2acd73aa3 h1:jU/wpYsEL+8JPLf/QcjkQKI5g0dOjSuwcMjkThxt5x0=
135+
github.com/dgraph-io/ristretto v0.0.4-0.20210504190834-0bf2acd73aa3/go.mod h1:fux0lOrBhrVCJd3lcTHsIJhq1T2rokOu6v9Vcb3Q9ug=
134136
github.com/dgraph-io/simdjson-go v0.3.0 h1:h71LO7vR4LHMPUhuoGN8bqGm1VNfGOlAG8BI6iDUKw0=
135137
github.com/dgraph-io/simdjson-go v0.3.0/go.mod h1:Otpysdjaxj9OGaJusn4pgQV7OFh2bELuHANq0I78uvY=
136138
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=

graphql/admin/admin.go

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,9 @@ const (
5555
"""
5656
The UInt64 scalar type represents an unsigned 64‐bit numeric non‐fractional value.
5757
UInt64 can represent values in range [0,(2^64 - 1)].
58-
"""
58+
"""
5959
scalar UInt64
60-
60+
6161
"""
6262
The DateTime scalar type represents date and time as a string in RFC3339 format.
6363
For example: "1985-04-12T23:20:50.52Z" represents 20 minutes and 50.52 seconds after the 23rd hour of April 12th, 1985 in UTC.
@@ -243,14 +243,19 @@ const (
243243
anonymous: Boolean
244244
}
245245
246+
input TaskInput {
247+
id: String!
248+
}
249+
246250
type Response {
247251
code: String
248252
message: String
249253
}
250254
251255
type ExportPayload {
252-
response: Response
253256
exportedFiles: [String]
257+
response: Response
258+
taskId: String
254259
}
255260
256261
type DrainingPayload {
@@ -261,6 +266,26 @@ const (
261266
response: Response
262267
}
263268
269+
type TaskPayload {
270+
kind: TaskKind
271+
status: TaskStatus
272+
lastUpdated: DateTime
273+
}
274+
275+
enum TaskStatus {
276+
Queued
277+
Running
278+
Failed
279+
Success
280+
Unknown
281+
}
282+
283+
enum TaskKind {
284+
Backup
285+
Export
286+
Unknown
287+
}
288+
264289
input ConfigInput {
265290
"""
266291
Estimated memory the caches can take. Actual usage by the process would be
@@ -367,6 +392,7 @@ const (
367392
health: [NodeState]
368393
state: MembershipState
369394
config: Config
395+
task(input: TaskInput!): TaskPayload
370396
` + adminQueries + `
371397
}
372398
@@ -724,7 +750,6 @@ func newAdminResolver(
724750
}
725751

726752
func newAdminResolverFactory() resolve.ResolverFactory {
727-
728753
adminMutationResolvers := map[string]resolve.MutationResolverFunc{
729754
"addNamespace": resolveAddNamespace,
730755
"backup": resolveBackup,
@@ -757,18 +782,21 @@ func newAdminResolverFactory() resolve.ResolverFactory {
757782
WithQueryResolver("listBackups", func(q schema.Query) resolve.QueryResolver {
758783
return resolve.QueryResolverFunc(resolveListBackups)
759784
}).
760-
WithMutationResolver("updateGQLSchema", func(m schema.Mutation) resolve.MutationResolver {
761-
return resolve.MutationResolverFunc(
762-
func(ctx context.Context, m schema.Mutation) (*resolve.Resolved, bool) {
763-
return &resolve.Resolved{Err: errors.Errorf(errMsgServerNotReady), Field: m},
764-
false
765-
})
785+
WithQueryResolver("task", func(q schema.Query) resolve.QueryResolver {
786+
return resolve.QueryResolverFunc(resolveTask)
766787
}).
767788
WithQueryResolver("getGQLSchema", func(q schema.Query) resolve.QueryResolver {
768789
return resolve.QueryResolverFunc(
769790
func(ctx context.Context, query schema.Query) *resolve.Resolved {
770791
return &resolve.Resolved{Err: errors.Errorf(errMsgServerNotReady), Field: q}
771792
})
793+
}).
794+
WithMutationResolver("updateGQLSchema", func(m schema.Mutation) resolve.MutationResolver {
795+
return resolve.MutationResolverFunc(
796+
func(ctx context.Context, m schema.Mutation) (*resolve.Resolved, bool) {
797+
return &resolve.Resolved{Err: errors.Errorf(errMsgServerNotReady), Field: m},
798+
false
799+
})
772800
})
773801
for gqlMut, resolver := range adminMutationResolvers {
774802
// gotta force go to evaluate the right function at each loop iteration

0 commit comments

Comments
 (0)