githaven-fork/modules/queue/manager.go
zeripath 06b9d553bc
Timeout on flush in testing (#16864)
* Timeout on flush in testing

At the end of each test the queues are flushed. At present there is no limit on the
length of time a flush can take which can lead to long flushes.

However, if the CI task is cancelled we lose the log information as to where the long
flush was taking place.

This PR simply adds a default time limit of 2 minutes - at which point an error will
be produced. This should allow us to more easily find the culprit.

Signed-off-by: Andrew Thornton <art27@cantab.net>

* return better error

Signed-off-by: Andrew Thornton <art27@cantab.net>

Co-authored-by: 6543 <6543@obermui.de>
2021-08-30 00:27:51 -04:00

390 lines
10 KiB
Go

// Copyright 2019 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package queue
import (
"context"
"fmt"
"reflect"
"sort"
"strings"
"sync"
"time"
"code.gitea.io/gitea/modules/json"
"code.gitea.io/gitea/modules/log"
)
var manager *Manager
// Manager is a queue manager
type Manager struct {
mutex sync.Mutex
counter int64
Queues map[int64]*ManagedQueue
}
// ManagedQueue represents a working queue with a Pool of workers.
//
// Although a ManagedQueue should really represent a Queue this does not
// necessarily have to be the case. This could be used to describe any queue.WorkerPool.
type ManagedQueue struct {
mutex sync.Mutex
QID int64
Type Type
Name string
Configuration interface{}
ExemplarType string
Managed interface{}
counter int64
PoolWorkers map[int64]*PoolWorkers
}
// Flushable represents a pool or queue that is flushable
type Flushable interface {
// Flush will add a flush worker to the pool - the worker should be autoregistered with the manager
Flush(time.Duration) error
// FlushWithContext is very similar to Flush
// NB: The worker will not be registered with the manager.
FlushWithContext(ctx context.Context) error
// IsEmpty will return if the managed pool is empty and has no work
IsEmpty() bool
}
// ManagedPool is a simple interface to get certain details from a worker pool
type ManagedPool interface {
// AddWorkers adds a number of worker as group to the pool with the provided timeout. A CancelFunc is provided to cancel the group
AddWorkers(number int, timeout time.Duration) context.CancelFunc
// NumberOfWorkers returns the total number of workers in the pool
NumberOfWorkers() int
// MaxNumberOfWorkers returns the maximum number of workers the pool can dynamically grow to
MaxNumberOfWorkers() int
// SetMaxNumberOfWorkers sets the maximum number of workers the pool can dynamically grow to
SetMaxNumberOfWorkers(int)
// BoostTimeout returns the current timeout for worker groups created during a boost
BoostTimeout() time.Duration
// BlockTimeout returns the timeout the internal channel can block for before a boost would occur
BlockTimeout() time.Duration
// BoostWorkers sets the number of workers to be created during a boost
BoostWorkers() int
// SetPoolSettings sets the user updatable settings for the pool
SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration)
}
// ManagedQueueList implements the sort.Interface
type ManagedQueueList []*ManagedQueue
// PoolWorkers represents a group of workers working on a queue
type PoolWorkers struct {
PID int64
Workers int
Start time.Time
Timeout time.Time
HasTimeout bool
Cancel context.CancelFunc
IsFlusher bool
}
// PoolWorkersList implements the sort.Interface for PoolWorkers
type PoolWorkersList []*PoolWorkers
func init() {
_ = GetManager()
}
// GetManager returns a Manager and initializes one as singleton if there's none yet
func GetManager() *Manager {
if manager == nil {
manager = &Manager{
Queues: make(map[int64]*ManagedQueue),
}
}
return manager
}
// Add adds a queue to this manager
func (m *Manager) Add(managed interface{},
t Type,
configuration,
exemplar interface{}) int64 {
cfg, _ := json.Marshal(configuration)
mq := &ManagedQueue{
Type: t,
Configuration: string(cfg),
ExemplarType: reflect.TypeOf(exemplar).String(),
PoolWorkers: make(map[int64]*PoolWorkers),
Managed: managed,
}
m.mutex.Lock()
m.counter++
mq.QID = m.counter
mq.Name = fmt.Sprintf("queue-%d", mq.QID)
if named, ok := managed.(Named); ok {
name := named.Name()
if len(name) > 0 {
mq.Name = name
}
}
m.Queues[mq.QID] = mq
m.mutex.Unlock()
log.Trace("Queue Manager registered: %s (QID: %d)", mq.Name, mq.QID)
return mq.QID
}
// Remove a queue from the Manager
func (m *Manager) Remove(qid int64) {
m.mutex.Lock()
delete(m.Queues, qid)
m.mutex.Unlock()
log.Trace("Queue Manager removed: QID: %d", qid)
}
// GetManagedQueue by qid
func (m *Manager) GetManagedQueue(qid int64) *ManagedQueue {
m.mutex.Lock()
defer m.mutex.Unlock()
return m.Queues[qid]
}
// FlushAll flushes all the flushable queues attached to this manager
func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error {
var ctx context.Context
var cancel context.CancelFunc
start := time.Now()
end := start
hasTimeout := false
if timeout > 0 {
ctx, cancel = context.WithTimeout(baseCtx, timeout)
end = start.Add(timeout)
hasTimeout = true
} else {
ctx, cancel = context.WithCancel(baseCtx)
}
defer cancel()
for {
select {
case <-ctx.Done():
mqs := m.ManagedQueues()
nonEmptyQueues := []string{}
for _, mq := range mqs {
if !mq.IsEmpty() {
nonEmptyQueues = append(nonEmptyQueues, mq.Name)
}
}
if len(nonEmptyQueues) > 0 {
return fmt.Errorf("flush timeout with non-empty queues: %s", strings.Join(nonEmptyQueues, ", "))
}
return nil
default:
}
mqs := m.ManagedQueues()
log.Debug("Found %d Managed Queues", len(mqs))
wg := sync.WaitGroup{}
wg.Add(len(mqs))
allEmpty := true
for _, mq := range mqs {
if mq.IsEmpty() {
wg.Done()
continue
}
allEmpty = false
if flushable, ok := mq.Managed.(Flushable); ok {
log.Debug("Flushing (flushable) queue: %s", mq.Name)
go func(q *ManagedQueue) {
localCtx, localCtxCancel := context.WithCancel(ctx)
pid := q.RegisterWorkers(1, start, hasTimeout, end, localCtxCancel, true)
err := flushable.FlushWithContext(localCtx)
if err != nil && err != ctx.Err() {
cancel()
}
q.CancelWorkers(pid)
localCtxCancel()
wg.Done()
}(mq)
} else {
log.Debug("Queue: %s is non-empty but is not flushable", mq.Name)
wg.Done()
}
}
if allEmpty {
log.Debug("All queues are empty")
break
}
// Ensure there are always at least 100ms between loops but not more if we've actually been doing some flushign
// but don't delay cancellation here.
select {
case <-ctx.Done():
case <-time.After(100 * time.Millisecond):
}
wg.Wait()
}
return nil
}
// ManagedQueues returns the managed queues
func (m *Manager) ManagedQueues() []*ManagedQueue {
m.mutex.Lock()
mqs := make([]*ManagedQueue, 0, len(m.Queues))
for _, mq := range m.Queues {
mqs = append(mqs, mq)
}
m.mutex.Unlock()
sort.Sort(ManagedQueueList(mqs))
return mqs
}
// Workers returns the poolworkers
func (q *ManagedQueue) Workers() []*PoolWorkers {
q.mutex.Lock()
workers := make([]*PoolWorkers, 0, len(q.PoolWorkers))
for _, worker := range q.PoolWorkers {
workers = append(workers, worker)
}
q.mutex.Unlock()
sort.Sort(PoolWorkersList(workers))
return workers
}
// RegisterWorkers registers workers to this queue
func (q *ManagedQueue) RegisterWorkers(number int, start time.Time, hasTimeout bool, timeout time.Time, cancel context.CancelFunc, isFlusher bool) int64 {
q.mutex.Lock()
defer q.mutex.Unlock()
q.counter++
q.PoolWorkers[q.counter] = &PoolWorkers{
PID: q.counter,
Workers: number,
Start: start,
Timeout: timeout,
HasTimeout: hasTimeout,
Cancel: cancel,
IsFlusher: isFlusher,
}
return q.counter
}
// CancelWorkers cancels pooled workers with pid
func (q *ManagedQueue) CancelWorkers(pid int64) {
q.mutex.Lock()
pw, ok := q.PoolWorkers[pid]
q.mutex.Unlock()
if !ok {
return
}
pw.Cancel()
}
// RemoveWorkers deletes pooled workers with pid
func (q *ManagedQueue) RemoveWorkers(pid int64) {
q.mutex.Lock()
pw, ok := q.PoolWorkers[pid]
delete(q.PoolWorkers, pid)
q.mutex.Unlock()
if ok && pw.Cancel != nil {
pw.Cancel()
}
}
// AddWorkers adds workers to the queue if it has registered an add worker function
func (q *ManagedQueue) AddWorkers(number int, timeout time.Duration) context.CancelFunc {
if pool, ok := q.Managed.(ManagedPool); ok {
// the cancel will be added to the pool workers description above
return pool.AddWorkers(number, timeout)
}
return nil
}
// Flush flushes the queue with a timeout
func (q *ManagedQueue) Flush(timeout time.Duration) error {
if flushable, ok := q.Managed.(Flushable); ok {
// the cancel will be added to the pool workers description above
return flushable.Flush(timeout)
}
return nil
}
// IsEmpty returns if the queue is empty
func (q *ManagedQueue) IsEmpty() bool {
if flushable, ok := q.Managed.(Flushable); ok {
return flushable.IsEmpty()
}
return true
}
// NumberOfWorkers returns the number of workers in the queue
func (q *ManagedQueue) NumberOfWorkers() int {
if pool, ok := q.Managed.(ManagedPool); ok {
return pool.NumberOfWorkers()
}
return -1
}
// MaxNumberOfWorkers returns the maximum number of workers for the pool
func (q *ManagedQueue) MaxNumberOfWorkers() int {
if pool, ok := q.Managed.(ManagedPool); ok {
return pool.MaxNumberOfWorkers()
}
return 0
}
// BoostWorkers returns the number of workers for a boost
func (q *ManagedQueue) BoostWorkers() int {
if pool, ok := q.Managed.(ManagedPool); ok {
return pool.BoostWorkers()
}
return -1
}
// BoostTimeout returns the timeout of the next boost
func (q *ManagedQueue) BoostTimeout() time.Duration {
if pool, ok := q.Managed.(ManagedPool); ok {
return pool.BoostTimeout()
}
return 0
}
// BlockTimeout returns the timeout til the next boost
func (q *ManagedQueue) BlockTimeout() time.Duration {
if pool, ok := q.Managed.(ManagedPool); ok {
return pool.BlockTimeout()
}
return 0
}
// SetPoolSettings sets the setable boost values
func (q *ManagedQueue) SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) {
if pool, ok := q.Managed.(ManagedPool); ok {
pool.SetPoolSettings(maxNumberOfWorkers, boostWorkers, timeout)
}
}
func (l ManagedQueueList) Len() int {
return len(l)
}
func (l ManagedQueueList) Less(i, j int) bool {
return l[i].Name < l[j].Name
}
func (l ManagedQueueList) Swap(i, j int) {
l[i], l[j] = l[j], l[i]
}
func (l PoolWorkersList) Len() int {
return len(l)
}
func (l PoolWorkersList) Less(i, j int) bool {
return l[i].Start.Before(l[j].Start)
}
func (l PoolWorkersList) Swap(i, j int) {
l[i], l[j] = l[j], l[i]
}