Skip to content

Close queue.JobIter on error and set Queue Notifier #383

@jfontan

Description

@jfontan

The consumer calls a notifier each time a queue error is raised:

// Consumer consumes jobs from a queue and uses multiple workers to process     
// them.                                                                        
type Consumer struct {                                                          
    Notifiers struct {                                                          
        QueueError func(error)                                                  
    }                                                                           
    [...]                                                         
}

func (c *Consumer) notifyQueueError(err error) {
	if c.Notifiers.QueueError == nil {
		return
	}

	c.Notifiers.QueueError(err)
}

In the current version (0.19.0) this notifier is not set so we are not logging when an error happens.

This may be related with borges getting stuck and not getting more jobs from the queue. After some hours it stops and has a lot of goroutines trying to read from the queue:

goroutine 65788793 [select, 5500 minutes]:
github.com/src-d/borges/vendor/github.com/streadway/amqp.(*consumers).buffer(0xc5f24e2480, 0xc180f31860, 0xc180f317a0)
        /home/travis/gopath/src/github.com/src-d/borges/vendor/github.com/streadway/amqp/consumers.go:65 +0x1e2
created by github.com/src-d/borges/vendor/github.com/streadway/amqp.(*consumers).add
        /home/travis/gopath/src/github.com/src-d/borges/vendor/github.com/streadway/amqp/consumers.go:97 +0x166

In two of these events the number of these goroutines were 2045 an 2046. It seems that we have reached the maximum number of channels per connection.

It seems that there is some kind of error with the queue but we are not logging it. Each time the iterator gets an error a new channel is created but the previous one is not closed and this may be why we have this huge number of goroutines. We should close iter on error before returning:

func (c *Consumer) consumeJobIter(iter queue.JobIter) error {
	for {
		j, err := iter.Next()
		if queue.ErrEmptyJob.Is(err) {
			c.notifyQueueError(err)
			continue
		}

		if queue.ErrAlreadyClosed.Is(err) {
			return nil
		}

		if err != nil {
			return err
		}

		if err := c.consumeJob(j); err != nil {
			c.notifyQueueError(err)
		}
	}
}

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions