Skip to content

Commit cb36f84

Browse files
authored
Merge pull request #199 from RichardKnop/feature/add-async-worker-launch-method
Add a non blocking LaunchAsync method to worker
2 parents 362bcfb + 72dd392 commit cb36f84

4 files changed

Lines changed: 63 additions & 41 deletions

File tree

example/machinery.go

Lines changed: 5 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@ import (
44
"errors"
55
"fmt"
66
"os"
7-
"reflect"
8-
"strings"
97
"time"
108

119
"github.com/RichardKnop/machinery/v1"
@@ -197,7 +195,7 @@ func send() error {
197195
if err != nil {
198196
return fmt.Errorf("Getting task result failed with error: %s", err.Error())
199197
}
200-
log.INFO.Printf("1 + 1 = %v\n", humanReadableResults(results))
198+
log.INFO.Printf("1 + 1 = %v\n", tasks.HumanReadableResults(results))
201199

202200
/*
203201
* Now let's explore ways of sending multiple tasks
@@ -222,7 +220,7 @@ func send() error {
222220
"%v + %v = %v\n",
223221
asyncResult.Signature.Args[0].Value,
224222
asyncResult.Signature.Args[1].Value,
225-
humanReadableResults(results),
223+
tasks.HumanReadableResults(results),
226224
)
227225
}
228226

@@ -241,7 +239,7 @@ func send() error {
241239
if err != nil {
242240
return fmt.Errorf("Getting chord result failed with error: %s", err.Error())
243241
}
244-
log.INFO.Printf("(1 + 1) * (2 + 2) * (5 + 6) = %v\n", humanReadableResults(results))
242+
log.INFO.Printf("(1 + 1) * (2 + 2) * (5 + 6) = %v\n", tasks.HumanReadableResults(results))
245243

246244
// Now let's try chaining task results
247245
initTasks()
@@ -257,7 +255,7 @@ func send() error {
257255
if err != nil {
258256
return fmt.Errorf("Getting chain result failed with error: %s", err.Error())
259257
}
260-
log.INFO.Printf("(((1 + 1) + (2 + 2)) + (5 + 6)) * 4 = %v\n", humanReadableResults(results))
258+
log.INFO.Printf("(((1 + 1) + (2 + 2)) + (5 + 6)) * 4 = %v\n", tasks.HumanReadableResults(results))
261259

262260
// Let's try a task which throws panic to make sure stack trace is not lost
263261
initTasks()
@@ -283,20 +281,7 @@ func send() error {
283281
if err != nil {
284282
return fmt.Errorf("Getting long running task result failed with error: %s", err.Error())
285283
}
286-
log.INFO.Printf("Long running task returned = %v\n", humanReadableResults(results))
284+
log.INFO.Printf("Long running task returned = %v\n", tasks.HumanReadableResults(results))
287285

288286
return nil
289287
}
290-
291-
func humanReadableResults(results []reflect.Value) string {
292-
if len(results) == 1 {
293-
return fmt.Sprintf("%v", results[0].Interface())
294-
}
295-
296-
readableResults := make([]string, len(results))
297-
for i := 0; i < len(results); i++ {
298-
readableResults[i] = fmt.Sprintf("%v", results[i].Interface())
299-
}
300-
301-
return fmt.Sprintf("[%s]", strings.Join(readableResults, ", "))
302-
}

v1/backends/async_result.go

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -87,15 +87,7 @@ func (asyncResult *AsyncResult) Touch() ([]reflect.Value, error) {
8787
}
8888

8989
if asyncResult.taskState.IsSuccess() {
90-
resultValues := make([]reflect.Value, len(asyncResult.taskState.Results))
91-
for i, result := range asyncResult.taskState.Results {
92-
resultValue, err := tasks.ReflectValue(result.Type, result.Value)
93-
if err != nil {
94-
return nil, err
95-
}
96-
resultValues[i] = resultValue
97-
}
98-
return resultValues, nil
90+
return tasks.ReflectTaskResults(asyncResult.taskState.Results)
9991
}
10092

10193
return nil, nil

v1/tasks/result.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,40 @@
11
package tasks
22

3+
import (
4+
"fmt"
5+
"reflect"
6+
"strings"
7+
)
8+
39
// TaskResult represents an actual return value of a processed task
410
type TaskResult struct {
511
Type string `bson:"type"`
612
Value interface{} `bson:"value"`
713
}
14+
15+
// ReflectTaskResults ...
16+
func ReflectTaskResults(taskResults []*TaskResult) ([]reflect.Value, error) {
17+
resultValues := make([]reflect.Value, len(taskResults))
18+
for i, taskResult := range taskResults {
19+
resultValue, err := ReflectValue(taskResult.Type, taskResult.Value)
20+
if err != nil {
21+
return nil, err
22+
}
23+
resultValues[i] = resultValue
24+
}
25+
return resultValues, nil
26+
}
27+
28+
// HumanReadableResults ...
29+
func HumanReadableResults(results []reflect.Value) string {
30+
if len(results) == 1 {
31+
return fmt.Sprintf("%v", results[0].Interface())
32+
}
33+
34+
readableResults := make([]string, len(results))
35+
for i := 0; i < len(results); i++ {
36+
readableResults[i] = fmt.Sprintf("%v", results[i].Interface())
37+
}
38+
39+
return fmt.Sprintf("[%s]", strings.Join(readableResults, ", "))
40+
}

v1/worker.go

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"fmt"
66
"os"
77
"os/signal"
8-
"strings"
98
"syscall"
109
"time"
1110

@@ -25,9 +24,19 @@ type Worker struct {
2524
// Launch starts a new worker process. The worker subscribes
2625
// to the default queue and processes incoming registered tasks
2726
func (worker *Worker) Launch() error {
27+
errorsChan := make(chan error)
28+
29+
worker.LaunchAsync(errorsChan)
30+
31+
return <-errorsChan
32+
}
33+
34+
// LaunchAsync is a non blocking version of Launch
35+
func (worker *Worker) LaunchAsync(errorsChan chan<- error) {
2836
cnf := worker.server.GetConfig()
2937
broker := worker.server.GetBroker()
3038

39+
// Log some useful information about woorker configuration
3140
log.INFO.Printf("Launching a worker with the following settings:")
3241
log.INFO.Printf("- Broker: %s", cnf.Broker)
3342
log.INFO.Printf("- DefaultQueue: %s", cnf.DefaultQueue)
@@ -40,11 +49,7 @@ func (worker *Worker) Launch() error {
4049
log.INFO.Printf(" - PrefetchCount: %d", cnf.AMQP.PrefetchCount)
4150
}
4251

43-
errorsChan := make(chan error)
44-
sig := make(chan os.Signal, 1)
45-
signal.Notify(sig, os.Interrupt, syscall.SIGTERM)
46-
var signalsReceived uint
47-
52+
// Goroutine to start broker consumption and handle retries when broker connection dies
4853
go func() {
4954
for {
5055
retry, err := broker.StartConsuming(worker.ConsumerTag, worker.Concurrency, worker)
@@ -58,6 +63,11 @@ func (worker *Worker) Launch() error {
5863
}
5964
}()
6065

66+
sig := make(chan os.Signal, 1)
67+
signal.Notify(sig, os.Interrupt, syscall.SIGTERM)
68+
var signalsReceived uint
69+
70+
// Goroutine Handle SIGINT and SIGTERM signals
6171
go func() {
6272
for {
6373
select {
@@ -79,8 +89,6 @@ func (worker *Worker) Launch() error {
7989
}
8090
}
8191
}()
82-
83-
return <-errorsChan
8492
}
8593

8694
// Quit tears down the running worker process
@@ -166,11 +174,15 @@ func (worker *Worker) taskSucceeded(signature *tasks.Signature, taskResults []*t
166174
return fmt.Errorf("Set state success error: %s", err)
167175
}
168176

169-
debugResults := make([]string, len(taskResults))
170-
for i, taskResult := range taskResults {
171-
debugResults[i] = fmt.Sprintf("%v", taskResult.Value)
177+
// Log human readable results of the processed task
178+
var debugResults = "[]"
179+
results, err := tasks.ReflectTaskResults(taskResults)
180+
if err != nil {
181+
log.WARNING.Print(err)
182+
} else {
183+
debugResults = tasks.HumanReadableResults(results)
172184
}
173-
log.INFO.Printf("Processed task %s. Results = [%v]", signature.UUID, strings.Join(debugResults, ", "))
185+
log.INFO.Printf("Processed task %s. Results = %s", signature.UUID, debugResults)
174186

175187
// Trigger success callbacks
176188

0 commit comments

Comments
 (0)