forked from Shiloh/githaven
7f8e3192cd
* Allow common redis and leveldb connections Prevents multiple reopening of redis and leveldb connections to the same place by sharing connections. Further allows for more configurable redis connection type using the redisURI and a leveldbURI scheme. Signed-off-by: Andrew Thornton <art27@cantab.net> * add unit-test Signed-off-by: Andrew Thornton <art27@cantab.net> * as per @lunny Signed-off-by: Andrew Thornton <art27@cantab.net> * add test Signed-off-by: Andrew Thornton <art27@cantab.net> * Update modules/cache/cache_redis.go * Update modules/queue/queue_disk.go * Update modules/cache/cache_redis.go * Update modules/cache/cache_redis.go * Update modules/queue/unique_queue_disk.go * Update modules/queue/queue_disk.go * Update modules/queue/unique_queue_disk.go * Update modules/session/redis.go Co-authored-by: techknowlogick <techknowlogick@gitea.io> Co-authored-by: Lauris BH <lauris@nix.lv>
127 lines
3.4 KiB
Go
127 lines
3.4 KiB
Go
// Copyright 2019 The Gitea Authors. All rights reserved.
|
|
// Use of this source code is governed by a MIT-style
|
|
// license that can be found in the LICENSE file.
|
|
|
|
package queue
|
|
|
|
import "github.com/go-redis/redis/v7"
|
|
|
|
// RedisUniqueQueueType is the type for redis queue
|
|
const RedisUniqueQueueType Type = "unique-redis"
|
|
|
|
// RedisUniqueQueue redis queue
|
|
type RedisUniqueQueue struct {
|
|
*ByteFIFOUniqueQueue
|
|
}
|
|
|
|
// RedisUniqueQueueConfiguration is the configuration for the redis queue
|
|
type RedisUniqueQueueConfiguration struct {
|
|
ByteFIFOQueueConfiguration
|
|
RedisUniqueByteFIFOConfiguration
|
|
}
|
|
|
|
// NewRedisUniqueQueue creates single redis or cluster redis queue.
|
|
//
|
|
// Please note that this Queue does not guarantee that a particular
|
|
// task cannot be processed twice or more at the same time. Uniqueness is
|
|
// only guaranteed whilst the task is waiting in the queue.
|
|
func NewRedisUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
|
|
configInterface, err := toConfig(RedisUniqueQueueConfiguration{}, cfg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
config := configInterface.(RedisUniqueQueueConfiguration)
|
|
|
|
byteFIFO, err := NewRedisUniqueByteFIFO(config.RedisUniqueByteFIFOConfiguration)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if len(byteFIFO.setName) == 0 {
|
|
byteFIFO.setName = byteFIFO.queueName + "_unique"
|
|
}
|
|
|
|
byteFIFOQueue, err := NewByteFIFOUniqueQueue(RedisUniqueQueueType, byteFIFO, handle, config.ByteFIFOQueueConfiguration, exemplar)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
queue := &RedisUniqueQueue{
|
|
ByteFIFOUniqueQueue: byteFIFOQueue,
|
|
}
|
|
|
|
queue.qid = GetManager().Add(queue, RedisUniqueQueueType, config, exemplar)
|
|
|
|
return queue, nil
|
|
}
|
|
|
|
var _ (UniqueByteFIFO) = &RedisUniqueByteFIFO{}
|
|
|
|
// RedisUniqueByteFIFO represents a UniqueByteFIFO formed from a redisClient
|
|
type RedisUniqueByteFIFO struct {
|
|
RedisByteFIFO
|
|
setName string
|
|
}
|
|
|
|
// RedisUniqueByteFIFOConfiguration is the configuration for the RedisUniqueByteFIFO
|
|
type RedisUniqueByteFIFOConfiguration struct {
|
|
RedisByteFIFOConfiguration
|
|
SetName string
|
|
}
|
|
|
|
// NewRedisUniqueByteFIFO creates a UniqueByteFIFO formed from a redisClient
|
|
func NewRedisUniqueByteFIFO(config RedisUniqueByteFIFOConfiguration) (*RedisUniqueByteFIFO, error) {
|
|
internal, err := NewRedisByteFIFO(config.RedisByteFIFOConfiguration)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
fifo := &RedisUniqueByteFIFO{
|
|
RedisByteFIFO: *internal,
|
|
setName: config.SetName,
|
|
}
|
|
|
|
return fifo, nil
|
|
}
|
|
|
|
// PushFunc pushes data to the end of the fifo and calls the callback if it is added
|
|
func (fifo *RedisUniqueByteFIFO) PushFunc(data []byte, fn func() error) error {
|
|
added, err := fifo.client.SAdd(fifo.setName, data).Result()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if added == 0 {
|
|
return ErrAlreadyInQueue
|
|
}
|
|
if fn != nil {
|
|
if err := fn(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return fifo.client.RPush(fifo.queueName, data).Err()
|
|
}
|
|
|
|
// Pop pops data from the start of the fifo
|
|
func (fifo *RedisUniqueByteFIFO) Pop() ([]byte, error) {
|
|
data, err := fifo.client.LPop(fifo.queueName).Bytes()
|
|
if err != nil && err != redis.Nil {
|
|
return data, err
|
|
}
|
|
|
|
if len(data) == 0 {
|
|
return data, nil
|
|
}
|
|
|
|
err = fifo.client.SRem(fifo.setName, data).Err()
|
|
return data, err
|
|
}
|
|
|
|
// Has returns whether the fifo contains this data
|
|
func (fifo *RedisUniqueByteFIFO) Has(data []byte) (bool, error) {
|
|
return fifo.client.SIsMember(fifo.setName, data).Result()
|
|
}
|
|
|
|
func init() {
|
|
queuesMap[RedisUniqueQueueType] = NewRedisUniqueQueue
|
|
}
|