Skip to content

Commit 872091a

Browse files
authored
Merge pull request #111 from smola/context-logging
logging: propagate logger instance and context
2 parents 9a5fa22 + 23f8b83 commit 872091a

File tree

11 files changed

+81
-203
lines changed

11 files changed

+81
-203
lines changed

archiver.go

Lines changed: 30 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -33,17 +33,7 @@ var (
3333
//
3434
// See borges documentation for more details about the archiving rules.
3535
type Archiver struct {
36-
Notifiers struct {
37-
// Start function, if set, is called whenever a job is started.
38-
Start func(*Job)
39-
// Stop function, if set, is called whenever a job stops. If
40-
// there was an error, it is passed as second parameter,
41-
// otherwise, it is nil.
42-
Stop func(*Job, error)
43-
// Warn function, if set, is called whenever there is a warning
44-
// during the processing of a repository.
45-
Warn func(*Job, error)
46-
}
36+
log log15.Logger
4737

4838
// TemporaryCloner is used to clone repositories into temporary storage.
4939
TemporaryCloner TemporaryCloner
@@ -60,9 +50,11 @@ type Archiver struct {
6050
LockSession lock.Session
6151
}
6252

63-
func NewArchiver(r *model.RepositoryStore, tx repository.RootedTransactioner,
64-
tc TemporaryCloner, ls lock.Session) *Archiver {
53+
func NewArchiver(log log15.Logger, r *model.RepositoryStore,
54+
tx repository.RootedTransactioner, tc TemporaryCloner,
55+
ls lock.Session) *Archiver {
6556
return &Archiver{
57+
log: log,
6658
TemporaryCloner: tc,
6759
RepositoryStorage: r,
6860
RootedTransactioner: tx,
@@ -72,14 +64,19 @@ func NewArchiver(r *model.RepositoryStore, tx repository.RootedTransactioner,
7264

7365
// Do archives a repository according to a job.
7466
func (a *Archiver) Do(j *Job) error {
75-
a.notifyStart(j)
76-
err := a.do(j)
77-
a.notifyStop(j, err)
78-
return err
67+
log := a.log.New("job", j.RepositoryID)
68+
log.Info("job started")
69+
if err := a.do(log, j); err != nil {
70+
log.Error("job finished with error", "error", err)
71+
return err
72+
}
73+
74+
log.Info("job finished successfully")
75+
return nil
7976
}
8077

81-
func (a *Archiver) do(j *Job) (err error) {
82-
log := log.New("job", j.RepositoryID)
78+
func (a *Archiver) do(log log15.Logger, j *Job) (err error) {
79+
8380
now := time.Now()
8481

8582
r, err := a.getRepositoryModel(j)
@@ -96,7 +93,8 @@ func (a *Archiver) do(j *Job) (err error) {
9693
if err != nil {
9794
return err
9895
}
99-
log.Debug("endpoint selected", "endpoint", endpoint)
96+
97+
log = log.New("endpoint", endpoint)
10098

10199
gr, err := a.TemporaryCloner.Clone(
102100
context.TODO(),
@@ -157,30 +155,6 @@ func (a *Archiver) getRepositoryModel(j *Job) (*model.Repository, error) {
157155
return r, nil
158156
}
159157

160-
func (a *Archiver) notifyStart(j *Job) {
161-
if a.Notifiers.Start == nil {
162-
return
163-
}
164-
165-
a.Notifiers.Start(j)
166-
}
167-
168-
func (a *Archiver) notifyStop(j *Job, err error) {
169-
if a.Notifiers.Stop == nil {
170-
return
171-
}
172-
173-
a.Notifiers.Stop(j, err)
174-
}
175-
176-
func (a *Archiver) notifyWarn(j *Job, err error) {
177-
if a.Notifiers.Warn == nil {
178-
return
179-
}
180-
181-
a.Notifiers.Warn(j, err)
182-
}
183-
184158
func selectEndpoint(endpoints []string) (string, error) {
185159
if len(endpoints) == 0 {
186160
return "", ErrEndpointsEmpty.New()
@@ -190,13 +164,14 @@ func selectEndpoint(endpoints []string) (string, error) {
190164
return endpoints[0], nil
191165
}
192166

193-
func (a *Archiver) pushChangesToRootedRepositories(log log15.Logger,
167+
func (a *Archiver) pushChangesToRootedRepositories(ctxLog log15.Logger,
194168
j *Job, r *model.Repository, tr TemporaryRepository, changes Changes,
195169
now time.Time) error {
196170

197171
var failedInits []model.SHA1
198172
for ic, cs := range changes {
199-
lock := a.LockSession.NewLocker(ic.String())
173+
log := ctxLog.New("root", ic.String())
174+
lock := a.LockSession.NewLocker(fmt.Sprintf("borges/%s", ic.String()))
200175
ch, err := lock.Lock()
201176
if err != nil {
202177
failedInits = append(failedInits, ic)
@@ -206,7 +181,7 @@ func (a *Archiver) pushChangesToRootedRepositories(log log15.Logger,
206181

207182
if err := a.pushChangesToRootedRepository(r, tr, ic, cs); err != nil {
208183
err = ErrPushToRootedRepository.Wrap(err, ic.String())
209-
a.notifyWarn(j, err)
184+
log.Error("error pushing changes to rooted repository", "error", err)
210185
failedInits = append(failedInits, ic)
211186
if err := lock.Unlock(); err != nil {
212187
log.Warn("failed to release lock", "root", ic.String(), "error", err)
@@ -218,7 +193,7 @@ func (a *Archiver) pushChangesToRootedRepositories(log log15.Logger,
218193
r.References = updateRepositoryReferences(r.References, cs, ic)
219194
if err := a.dbUpdateRepository(r, now); err != nil {
220195
err = ErrPushToRootedRepository.Wrap(err, ic.String())
221-
a.notifyWarn(j, err)
196+
log.Error("error updating repository in database", "error", err)
222197
failedInits = append(failedInits, ic)
223198
}
224199

@@ -380,42 +355,22 @@ func checkFailedInits(changes Changes, failed []model.SHA1) error {
380355
// NewArchiverWorkerPool creates a new WorkerPool that uses an Archiver to
381356
// process jobs. It takes optional start, stop and warn notifier functions that
382357
// are equal to the Archiver notifiers but with additional WorkerContext.
383-
func NewArchiverWorkerPool(r *model.RepositoryStore,
358+
func NewArchiverWorkerPool(
359+
log log15.Logger,
360+
r *model.RepositoryStore,
384361
tx repository.RootedTransactioner,
385362
tc TemporaryCloner,
386-
ls lock.Service,
387-
start func(*WorkerContext, *Job),
388-
stop func(*WorkerContext, *Job, error),
389-
warn func(*WorkerContext, *Job, error)) *WorkerPool {
363+
ls lock.Service) *WorkerPool {
390364

391-
do := func(ctx *WorkerContext, j *Job) error {
365+
do := func(log log15.Logger, j *Job) error {
392366
lsess, err := ls.NewSession(&lock.SessionConfig{TTL: 10 * time.Second})
393367
if err != nil {
394368
return err
395369
}
396370

397-
a := NewArchiver(r, tx, tc, lsess)
398-
399-
if start != nil {
400-
a.Notifiers.Start = func(j *Job) {
401-
start(ctx, j)
402-
}
403-
}
404-
405-
if stop != nil {
406-
a.Notifiers.Stop = func(j *Job, err error) {
407-
stop(ctx, j, err)
408-
}
409-
}
410-
411-
if warn != nil {
412-
a.Notifiers.Warn = func(j *Job, err error) {
413-
warn(ctx, j, err)
414-
}
415-
}
416-
371+
a := NewArchiver(log, r, tx, tc, lsess)
417372
return a.Do(j)
418373
}
419374

420-
return NewWorkerPool(do)
375+
return NewWorkerPool(log, do)
421376
}

archiver_test.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"testing"
1010
"time"
1111

12+
"github.com/inconshreveable/log15"
1213
"github.com/satori/go.uuid"
1314
"github.com/src-d/go-git-fixtures"
1415
"github.com/stretchr/testify/require"
@@ -67,10 +68,7 @@ func (s *ArchiverSuite) SetupTest() {
6768
})
6869
s.NoError(err)
6970

70-
s.a = NewArchiver(s.store, s.tx, NewTemporaryCloner(s.tmpFs), ls)
71-
s.a.Notifiers.Warn = func(j *Job, err error) {
72-
s.NoError(err, "job: %v", j)
73-
}
71+
s.a = NewArchiver(log15.New(), s.store, s.tx, NewTemporaryCloner(s.tmpFs), ls)
7472
}
7573

7674
func (s *ArchiverSuite) TearDownTest() {

cli/borges/consumer.go

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -29,36 +29,16 @@ func (c *consumerCmd) Execute(args []string) error {
2929
}
3030

3131
wp := borges.NewArchiverWorkerPool(
32+
log,
3233
core.ModelRepositoryStore(),
3334
core.RootedTransactioner(),
3435
borges.NewTemporaryCloner(core.TemporaryFilesystem()),
3536
core.Locking(),
36-
c.startNotifier, c.stopNotifier, c.warnNotifier)
37+
)
3738
wp.SetWorkerCount(c.WorkersCount)
3839

3940
ac := borges.NewConsumer(q, wp)
40-
ac.Notifiers.QueueError = c.queueErrorNotifier
4141
ac.Start()
4242

4343
return nil
4444
}
45-
46-
func (c *consumerCmd) startNotifier(ctx *borges.WorkerContext, j *borges.Job) {
47-
log.Debug("job started", "WorkerID", ctx.ID, "RepositoryID", j.RepositoryID)
48-
}
49-
50-
func (c *consumerCmd) stopNotifier(ctx *borges.WorkerContext, j *borges.Job, err error) {
51-
if err != nil {
52-
log.Error("job errored", "WorkerID", ctx.ID, "RepositoryID", j.RepositoryID, "error", err)
53-
} else {
54-
log.Info("job done", "WorkerID", ctx.ID, "RepositoryID", j.RepositoryID)
55-
}
56-
}
57-
58-
func (c *consumerCmd) warnNotifier(ctx *borges.WorkerContext, j *borges.Job, err error) {
59-
log.Warn("job warning", "WorkerID", ctx.ID, "RepositoryID", j.RepositoryID, "error", err)
60-
}
61-
62-
func (c *consumerCmd) queueErrorNotifier(err error) {
63-
log.Error("queue error", "error", err)
64-
}

cli/borges/producer.go

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,7 @@ func (c *producerCmd) Execute(args []string) error {
4141
}
4242
defer ioutil.CheckClose(ji, &err)
4343

44-
p := borges.NewProducer(ji, q)
45-
p.Notifiers.Done = c.notifier
44+
p := borges.NewProducer(log, ji, q)
4645
p.Start()
4746

4847
return err
@@ -68,11 +67,3 @@ func (c *producerCmd) jobIter(b queue.Broker) (borges.JobIter, error) {
6867
return nil, fmt.Errorf("invalid source: %s", c.Source)
6968
}
7069
}
71-
72-
func (c *producerCmd) notifier(j *borges.Job, err error) {
73-
if err != nil {
74-
log.Error("job queue error", "RepositoryID", j.RepositoryID, "error", err)
75-
} else {
76-
log.Info("job queued", "RepositoryID", j.RepositoryID)
77-
}
78-
}

common.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"io"
77
"strings"
88

9-
"github.com/inconshreveable/log15"
109
"github.com/satori/go.uuid"
1110
"gopkg.in/src-d/core-retrieval.v0"
1211
"gopkg.in/src-d/core-retrieval.v0/model"
@@ -15,8 +14,6 @@ import (
1514
)
1615

1716
var (
18-
log = log15.New()
19-
2017
// ErrAlreadyStopped signals that an operation cannot be done because
2118
// the entity is already sopped.
2219
ErrAlreadyStopped = errors.NewKind("already stopped: %s")

consumer_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"testing"
77
"time"
88

9+
"github.com/inconshreveable/log15"
910
"github.com/satori/go.uuid"
1011
"github.com/stretchr/testify/assert"
1112
"github.com/stretchr/testify/require"
@@ -22,7 +23,7 @@ type ConsumerSuite struct {
2223
}
2324

2425
func (s *ConsumerSuite) newConsumer() *Consumer {
25-
wp := NewWorkerPool(func(*WorkerContext, *Job) error { return nil })
26+
wp := NewWorkerPool(log15.New(), func(log15.Logger, *Job) error { return nil })
2627
return NewConsumer(s.queue, wp)
2728
}
2829

@@ -37,7 +38,7 @@ func (s *ConsumerSuite) TestConsumer_StartStop_FailedJob() {
3738

3839
processed := 0
3940
done := make(chan struct{}, 1)
40-
c.WorkerPool.do = func(w *WorkerContext, j *Job) error {
41+
c.WorkerPool.do = func(log log15.Logger, j *Job) error {
4142
defer func() { done <- struct{}{} }()
4243
processed++
4344
if processed == 2 {
@@ -88,7 +89,7 @@ func (s *ConsumerSuite) TestConsumer_StartStop() {
8889

8990
processed := 0
9091
done := make(chan struct{}, 1)
91-
c.WorkerPool.do = func(*WorkerContext, *Job) error {
92+
c.WorkerPool.do = func(log15.Logger, *Job) error {
9293
processed++
9394
if processed > 1 {
9495
assert.Fail("too many jobs processed")

git.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"strconv"
1010
"time"
1111

12+
"github.com/inconshreveable/log15"
1213
"gopkg.in/src-d/core-retrieval.v0/model"
1314
"gopkg.in/src-d/go-billy.v3"
1415
"gopkg.in/src-d/go-billy.v3/util"
@@ -123,7 +124,7 @@ func ResolveCommit(r *git.Repository, h plumbing.Hash) (*object.Commit, error) {
123124
case *object.Tag:
124125
return ResolveCommit(r, o.Target)
125126
default:
126-
log.Warn("referenced object not supported", "type", o.Type())
127+
log15.Warn("referenced object not supported", "hash", h.String(), "type", o.Type())
127128
return nil, ErrReferencedObjectTypeNotSupported
128129
}
129130
}

0 commit comments

Comments
 (0)