Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 30 additions & 75 deletions archiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,7 @@ var (
//
// See borges documentation for more details about the archiving rules.
type Archiver struct {
Notifiers struct {
// Start function, if set, is called whenever a job is started.
Start func(*Job)
// Stop function, if set, is called whenever a job stops. If
// there was an error, it is passed as second parameter,
// otherwise, it is nil.
Stop func(*Job, error)
// Warn function, if set, is called whenever there is a warning
// during the processing of a repository.
Warn func(*Job, error)
}
log log15.Logger

// TemporaryCloner is used to clone repositories into temporary storage.
TemporaryCloner TemporaryCloner
Expand All @@ -60,9 +50,11 @@ type Archiver struct {
LockSession lock.Session
}

func NewArchiver(r *model.RepositoryStore, tx repository.RootedTransactioner,
tc TemporaryCloner, ls lock.Session) *Archiver {
func NewArchiver(log log15.Logger, r *model.RepositoryStore,
tx repository.RootedTransactioner, tc TemporaryCloner,
ls lock.Session) *Archiver {
return &Archiver{
log: log,
TemporaryCloner: tc,
RepositoryStorage: r,
RootedTransactioner: tx,
Expand All @@ -72,14 +64,19 @@ func NewArchiver(r *model.RepositoryStore, tx repository.RootedTransactioner,

// Do archives a repository according to a job.
func (a *Archiver) Do(j *Job) error {
a.notifyStart(j)
err := a.do(j)
a.notifyStop(j, err)
return err
log := a.log.New("job", j.RepositoryID)
log.Info("job started")
if err := a.do(log, j); err != nil {
log.Error("job finished with error", "error", err)
return err
}

log.Info("job finished successfully")
return nil
}

func (a *Archiver) do(j *Job) (err error) {
log := log.New("job", j.RepositoryID)
func (a *Archiver) do(log log15.Logger, j *Job) (err error) {

now := time.Now()

r, err := a.getRepositoryModel(j)
Expand All @@ -96,7 +93,8 @@ func (a *Archiver) do(j *Job) (err error) {
if err != nil {
return err
}
log.Debug("endpoint selected", "endpoint", endpoint)

log = log.New("endpoint", endpoint)

gr, err := a.TemporaryCloner.Clone(
context.TODO(),
Expand Down Expand Up @@ -157,30 +155,6 @@ func (a *Archiver) getRepositoryModel(j *Job) (*model.Repository, error) {
return r, nil
}

func (a *Archiver) notifyStart(j *Job) {
if a.Notifiers.Start == nil {
return
}

a.Notifiers.Start(j)
}

func (a *Archiver) notifyStop(j *Job, err error) {
if a.Notifiers.Stop == nil {
return
}

a.Notifiers.Stop(j, err)
}

func (a *Archiver) notifyWarn(j *Job, err error) {
if a.Notifiers.Warn == nil {
return
}

a.Notifiers.Warn(j, err)
}

func selectEndpoint(endpoints []string) (string, error) {
if len(endpoints) == 0 {
return "", ErrEndpointsEmpty.New()
Expand All @@ -190,13 +164,14 @@ func selectEndpoint(endpoints []string) (string, error) {
return endpoints[0], nil
}

func (a *Archiver) pushChangesToRootedRepositories(log log15.Logger,
func (a *Archiver) pushChangesToRootedRepositories(ctxLog log15.Logger,
j *Job, r *model.Repository, tr TemporaryRepository, changes Changes,
now time.Time) error {

var failedInits []model.SHA1
for ic, cs := range changes {
lock := a.LockSession.NewLocker(ic.String())
log := ctxLog.New("root", ic.String())
lock := a.LockSession.NewLocker(fmt.Sprintf("borges/%s", ic.String()))
ch, err := lock.Lock()
if err != nil {
failedInits = append(failedInits, ic)
Expand All @@ -206,7 +181,7 @@ func (a *Archiver) pushChangesToRootedRepositories(log log15.Logger,

if err := a.pushChangesToRootedRepository(r, tr, ic, cs); err != nil {
err = ErrPushToRootedRepository.Wrap(err, ic.String())
a.notifyWarn(j, err)
log.Error("error pushing changes to rooted repository", "error", err)
failedInits = append(failedInits, ic)
if err := lock.Unlock(); err != nil {
log.Warn("failed to release lock", "root", ic.String(), "error", err)
Expand All @@ -218,7 +193,7 @@ func (a *Archiver) pushChangesToRootedRepositories(log log15.Logger,
r.References = updateRepositoryReferences(r.References, cs, ic)
if err := a.dbUpdateRepository(r, now); err != nil {
err = ErrPushToRootedRepository.Wrap(err, ic.String())
a.notifyWarn(j, err)
log.Error("error updating repository in database", "error", err)
failedInits = append(failedInits, ic)
}

Expand Down Expand Up @@ -380,42 +355,22 @@ func checkFailedInits(changes Changes, failed []model.SHA1) error {
// NewArchiverWorkerPool creates a new WorkerPool that uses an Archiver to
// process jobs. It takes optional start, stop and warn notifier functions that
// are equal to the Archiver notifiers but with additional WorkerContext.
func NewArchiverWorkerPool(r *model.RepositoryStore,
func NewArchiverWorkerPool(
log log15.Logger,
r *model.RepositoryStore,
tx repository.RootedTransactioner,
tc TemporaryCloner,
ls lock.Service,
start func(*WorkerContext, *Job),
stop func(*WorkerContext, *Job, error),
warn func(*WorkerContext, *Job, error)) *WorkerPool {
ls lock.Service) *WorkerPool {

do := func(ctx *WorkerContext, j *Job) error {
do := func(log log15.Logger, j *Job) error {
lsess, err := ls.NewSession(&lock.SessionConfig{TTL: 10 * time.Second})
if err != nil {
return err
}

a := NewArchiver(r, tx, tc, lsess)

if start != nil {
a.Notifiers.Start = func(j *Job) {
start(ctx, j)
}
}

if stop != nil {
a.Notifiers.Stop = func(j *Job, err error) {
stop(ctx, j, err)
}
}

if warn != nil {
a.Notifiers.Warn = func(j *Job, err error) {
warn(ctx, j, err)
}
}

a := NewArchiver(log, r, tx, tc, lsess)
return a.Do(j)
}

return NewWorkerPool(do)
return NewWorkerPool(log, do)
}
6 changes: 2 additions & 4 deletions archiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"testing"
"time"

"github.com/inconshreveable/log15"
"github.com/satori/go.uuid"
"github.com/src-d/go-git-fixtures"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -67,10 +68,7 @@ func (s *ArchiverSuite) SetupTest() {
})
s.NoError(err)

s.a = NewArchiver(s.store, s.tx, NewTemporaryCloner(s.tmpFs), ls)
s.a.Notifiers.Warn = func(j *Job, err error) {
s.NoError(err, "job: %v", j)
}
s.a = NewArchiver(log15.New(), s.store, s.tx, NewTemporaryCloner(s.tmpFs), ls)
}

func (s *ArchiverSuite) TearDownTest() {
Expand Down
24 changes: 2 additions & 22 deletions cli/borges/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,36 +29,16 @@ func (c *consumerCmd) Execute(args []string) error {
}

wp := borges.NewArchiverWorkerPool(
log,
core.ModelRepositoryStore(),
core.RootedTransactioner(),
borges.NewTemporaryCloner(core.TemporaryFilesystem()),
core.Locking(),
c.startNotifier, c.stopNotifier, c.warnNotifier)
)
wp.SetWorkerCount(c.WorkersCount)

ac := borges.NewConsumer(q, wp)
ac.Notifiers.QueueError = c.queueErrorNotifier
ac.Start()

return nil
}

func (c *consumerCmd) startNotifier(ctx *borges.WorkerContext, j *borges.Job) {
log.Debug("job started", "WorkerID", ctx.ID, "RepositoryID", j.RepositoryID)
}

func (c *consumerCmd) stopNotifier(ctx *borges.WorkerContext, j *borges.Job, err error) {
if err != nil {
log.Error("job errored", "WorkerID", ctx.ID, "RepositoryID", j.RepositoryID, "error", err)
} else {
log.Info("job done", "WorkerID", ctx.ID, "RepositoryID", j.RepositoryID)
}
}

func (c *consumerCmd) warnNotifier(ctx *borges.WorkerContext, j *borges.Job, err error) {
log.Warn("job warning", "WorkerID", ctx.ID, "RepositoryID", j.RepositoryID, "error", err)
}

func (c *consumerCmd) queueErrorNotifier(err error) {
log.Error("queue error", "error", err)
}
11 changes: 1 addition & 10 deletions cli/borges/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ func (c *producerCmd) Execute(args []string) error {
}
defer ioutil.CheckClose(ji, &err)

p := borges.NewProducer(ji, q)
p.Notifiers.Done = c.notifier
p := borges.NewProducer(log, ji, q)
p.Start()

return err
Expand All @@ -68,11 +67,3 @@ func (c *producerCmd) jobIter(b queue.Broker) (borges.JobIter, error) {
return nil, fmt.Errorf("invalid source: %s", c.Source)
}
}

func (c *producerCmd) notifier(j *borges.Job, err error) {
if err != nil {
log.Error("job queue error", "RepositoryID", j.RepositoryID, "error", err)
} else {
log.Info("job queued", "RepositoryID", j.RepositoryID)
}
}
3 changes: 0 additions & 3 deletions common.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"io"
"strings"

"github.com/inconshreveable/log15"
"github.com/satori/go.uuid"
"gopkg.in/src-d/core-retrieval.v0"
"gopkg.in/src-d/core-retrieval.v0/model"
Expand All @@ -15,8 +14,6 @@ import (
)

var (
log = log15.New()

// ErrAlreadyStopped signals that an operation cannot be done because
// the entity is already sopped.
ErrAlreadyStopped = errors.NewKind("already stopped: %s")
Expand Down
7 changes: 4 additions & 3 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"
"time"

"github.com/inconshreveable/log15"
"github.com/satori/go.uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -22,7 +23,7 @@ type ConsumerSuite struct {
}

func (s *ConsumerSuite) newConsumer() *Consumer {
wp := NewWorkerPool(func(*WorkerContext, *Job) error { return nil })
wp := NewWorkerPool(log15.New(), func(log15.Logger, *Job) error { return nil })
return NewConsumer(s.queue, wp)
}

Expand All @@ -37,7 +38,7 @@ func (s *ConsumerSuite) TestConsumer_StartStop_FailedJob() {

processed := 0
done := make(chan struct{}, 1)
c.WorkerPool.do = func(w *WorkerContext, j *Job) error {
c.WorkerPool.do = func(log log15.Logger, j *Job) error {
defer func() { done <- struct{}{} }()
processed++
if processed == 2 {
Expand Down Expand Up @@ -88,7 +89,7 @@ func (s *ConsumerSuite) TestConsumer_StartStop() {

processed := 0
done := make(chan struct{}, 1)
c.WorkerPool.do = func(*WorkerContext, *Job) error {
c.WorkerPool.do = func(log15.Logger, *Job) error {
processed++
if processed > 1 {
assert.Fail("too many jobs processed")
Expand Down
3 changes: 2 additions & 1 deletion git.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strconv"
"time"

"github.com/inconshreveable/log15"
"gopkg.in/src-d/core-retrieval.v0/model"
"gopkg.in/src-d/go-billy.v3"
"gopkg.in/src-d/go-billy.v3/util"
Expand Down Expand Up @@ -123,7 +124,7 @@ func ResolveCommit(r *git.Repository, h plumbing.Hash) (*object.Commit, error) {
case *object.Tag:
return ResolveCommit(r, o.Target)
default:
log.Warn("referenced object not supported", "type", o.Type())
log15.Warn("referenced object not supported", "hash", h.String(), "type", o.Type())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this log is missing the context too, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, the way it is structured right now, this function doesn't get context. We might want to fix in a future PR?

return nil, ErrReferencedObjectTypeNotSupported
}
}
Expand Down
Loading