githaven/modules/queue/queue_channel.go
zeripath a609cae9fb
Correctly handle select on multiple channels in Queues (#22146)
There are a few places in FlushQueueWithContext which make an incorrect
assumption about how `select` on multiple channels works.

The problem is best expressed by looking at the following example:

```go
package main

import "fmt"

func main() {
    closedChan := make(chan struct{})
    close(closedChan)
    toClose := make(chan struct{})
    count := 0

    for {
        select {
        case <-closedChan:
            count++
            fmt.Println(count)
            if count == 2 {
                close(toClose)
            }
        case <-toClose:
            return
        }
    }
}
```

This PR double-checks that the contexts are closed outside of checking
if there is data in the dataChan. It also rationalises the WorkerPool
FlushWithContext because the previous implementation failed to handle
pausing correctly. This will probably fix the underlying problem in
 #22145

Fix #22145

Signed-off-by: Andrew Thornton <art27@cantab.net>

Signed-off-by: Andrew Thornton <art27@cantab.net>
2022-12-30 02:06:47 +02:00

158 lines
4.2 KiB
Go

// Copyright 2019 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package queue
import (
"context"
"fmt"
"runtime/pprof"
"sync/atomic"
"time"
"code.gitea.io/gitea/modules/log"
)
// ChannelQueueType is the type for channel queue
const ChannelQueueType Type = "channel"
// ChannelQueueConfiguration is the configuration for a ChannelQueue
type ChannelQueueConfiguration struct {
WorkerPoolConfiguration
Workers int
}
// ChannelQueue implements Queue
//
// A channel queue is not persistable and does not shutdown or terminate cleanly
// It is basically a very thin wrapper around a WorkerPool
type ChannelQueue struct {
*WorkerPool
shutdownCtx context.Context
shutdownCtxCancel context.CancelFunc
terminateCtx context.Context
terminateCtxCancel context.CancelFunc
exemplar interface{}
workers int
name string
}
// NewChannelQueue creates a memory channel queue
func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
configInterface, err := toConfig(ChannelQueueConfiguration{}, cfg)
if err != nil {
return nil, err
}
config := configInterface.(ChannelQueueConfiguration)
if config.BatchLength == 0 {
config.BatchLength = 1
}
terminateCtx, terminateCtxCancel := context.WithCancel(context.Background())
shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx)
queue := &ChannelQueue{
shutdownCtx: shutdownCtx,
shutdownCtxCancel: shutdownCtxCancel,
terminateCtx: terminateCtx,
terminateCtxCancel: terminateCtxCancel,
exemplar: exemplar,
workers: config.Workers,
name: config.Name,
}
queue.WorkerPool = NewWorkerPool(func(data ...Data) []Data {
unhandled := handle(data...)
if len(unhandled) > 0 {
// We can only pushback to the channel if we're paused.
if queue.IsPaused() {
atomic.AddInt64(&queue.numInQueue, int64(len(unhandled)))
go func() {
for _, datum := range data {
queue.dataChan <- datum
}
}()
return nil
}
}
return unhandled
}, config.WorkerPoolConfiguration)
queue.qid = GetManager().Add(queue, ChannelQueueType, config, exemplar)
return queue, nil
}
// Run starts to run the queue
func (q *ChannelQueue) Run(atShutdown, atTerminate func(func())) {
pprof.SetGoroutineLabels(q.baseCtx)
atShutdown(q.Shutdown)
atTerminate(q.Terminate)
log.Debug("ChannelQueue: %s Starting", q.name)
_ = q.AddWorkers(q.workers, 0)
}
// Push will push data into the queue
func (q *ChannelQueue) Push(data Data) error {
if !assignableTo(data, q.exemplar) {
return fmt.Errorf("unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name)
}
q.WorkerPool.Push(data)
return nil
}
// Flush flushes the channel with a timeout - the Flush worker will be registered as a flush worker with the manager
func (q *ChannelQueue) Flush(timeout time.Duration) error {
if q.IsPaused() {
return nil
}
ctx, cancel := q.commonRegisterWorkers(1, timeout, true)
defer cancel()
return q.FlushWithContext(ctx)
}
// Shutdown processing from this queue
func (q *ChannelQueue) Shutdown() {
q.lock.Lock()
defer q.lock.Unlock()
select {
case <-q.shutdownCtx.Done():
log.Trace("ChannelQueue: %s Already Shutting down", q.name)
return
default:
}
log.Trace("ChannelQueue: %s Shutting down", q.name)
go func() {
log.Trace("ChannelQueue: %s Flushing", q.name)
// We can't use Cleanup here because that will close the channel
if err := q.FlushWithContext(q.terminateCtx); err != nil {
log.Warn("ChannelQueue: %s Terminated before completed flushing", q.name)
return
}
log.Debug("ChannelQueue: %s Flushed", q.name)
}()
q.shutdownCtxCancel()
log.Debug("ChannelQueue: %s Shutdown", q.name)
}
// Terminate this queue and close the queue
func (q *ChannelQueue) Terminate() {
log.Trace("ChannelQueue: %s Terminating", q.name)
q.Shutdown()
select {
case <-q.terminateCtx.Done():
return
default:
}
q.terminateCtxCancel()
q.baseCtxFinished()
log.Debug("ChannelQueue: %s Terminated", q.name)
}
// Name returns the name of this queue
func (q *ChannelQueue) Name() string {
return q.name
}
func init() {
queuesMap[ChannelQueueType] = NewChannelQueue
}