zeripath 2c903383b5
Add Unique Queue infrastructure and move TestPullRequests to this (#9856)
* Upgrade levelqueue to version 0.2.0

This adds functionality for Unique Queues

* Add UniqueQueue interface and functions to create them

* Add UniqueQueue implementations

* Move TestPullRequests over to use UniqueQueue

* Reduce code duplication

* Add bytefifos

* Ensure invalid types are logged

* Fix close race in PersistableChannelQueue Shutdown
2020-02-02 23:19:58 +00:00

299 lines
6.7 KiB
Go

// Copyright 2019 Lunny Xiao. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package levelqueue
import (
"bytes"
"encoding/binary"
"sync"
"github.com/syndtr/goleveldb/leveldb"
)
const (
lowKeyStr = "low"
highKeyStr = "high"
)
// Queue defines a queue struct
type Queue struct {
db *leveldb.DB
highLock sync.Mutex
lowLock sync.Mutex
low int64
high int64
lowKey []byte
highKey []byte
prefix []byte
closeUnderlyingDB bool
}
// Open opens a queue from the db path or creates a
// queue if it doesn't exist.
// The keys will not be prefixed by default
func Open(dataDir string) (*Queue, error) {
db, err := leveldb.OpenFile(dataDir, nil)
if err != nil {
return nil, err
}
return NewQueue(db, []byte{}, true)
}
// NewQueue creates a queue from a db. The keys will be prefixed with prefix
// and at close the db will be closed as per closeUnderlyingDB
func NewQueue(db *leveldb.DB, prefix []byte, closeUnderlyingDB bool) (*Queue, error) {
var err error
var queue = &Queue{
db: db,
closeUnderlyingDB: closeUnderlyingDB,
}
queue.prefix = make([]byte, len(prefix))
copy(queue.prefix, prefix)
queue.lowKey = withPrefix(prefix, []byte(lowKeyStr))
queue.highKey = withPrefix(prefix, []byte(highKeyStr))
queue.low, err = queue.readID(queue.lowKey)
if err == leveldb.ErrNotFound {
queue.low = 1
err = db.Put(queue.lowKey, id2bytes(1), nil)
}
if err != nil {
return nil, err
}
queue.high, err = queue.readID(queue.highKey)
if err == leveldb.ErrNotFound {
err = db.Put(queue.highKey, id2bytes(0), nil)
}
if err != nil {
return nil, err
}
return queue, nil
}
func (queue *Queue) readID(key []byte) (int64, error) {
bs, err := queue.db.Get(key, nil)
if err != nil {
return 0, err
}
return bytes2id(bs)
}
func (queue *Queue) highincrement() (int64, error) {
id := queue.high + 1
queue.high = id
err := queue.db.Put(queue.highKey, id2bytes(queue.high), nil)
if err != nil {
queue.high = queue.high - 1
return 0, err
}
return id, nil
}
func (queue *Queue) highdecrement() (int64, error) {
queue.high = queue.high - 1
err := queue.db.Put(queue.highKey, id2bytes(queue.high), nil)
if err != nil {
queue.high = queue.high + 1
return 0, err
}
return queue.high, nil
}
func (queue *Queue) lowincrement() (int64, error) {
queue.low = queue.low + 1
err := queue.db.Put(queue.lowKey, id2bytes(queue.low), nil)
if err != nil {
queue.low = queue.low - 1
return 0, err
}
return queue.low, nil
}
func (queue *Queue) lowdecrement() (int64, error) {
queue.low = queue.low - 1
err := queue.db.Put(queue.lowKey, id2bytes(queue.low), nil)
if err != nil {
queue.low = queue.low + 1
return 0, err
}
return queue.low, nil
}
// Len returns the length of the queue
func (queue *Queue) Len() int64 {
queue.lowLock.Lock()
queue.highLock.Lock()
l := queue.high - queue.low + 1
queue.highLock.Unlock()
queue.lowLock.Unlock()
return l
}
func id2bytes(id int64) []byte {
var buf = make([]byte, 8)
binary.PutVarint(buf, id)
return buf
}
func bytes2id(b []byte) (int64, error) {
return binary.ReadVarint(bytes.NewReader(b))
}
func withPrefix(prefix []byte, value []byte) []byte {
if len(prefix) == 0 {
return value
}
prefixed := make([]byte, len(prefix)+1+len(value))
copy(prefixed[0:len(prefix)], prefix)
prefixed[len(prefix)] = '-'
copy(prefixed[len(prefix)+1:], value)
return prefixed
}
// RPush pushes a data from right of queue
func (queue *Queue) RPush(data []byte) error {
queue.highLock.Lock()
id, err := queue.highincrement()
if err != nil {
queue.highLock.Unlock()
return err
}
err = queue.db.Put(withPrefix(queue.prefix, id2bytes(id)), data, nil)
queue.highLock.Unlock()
return err
}
// LPush pushes a data from left of queue
func (queue *Queue) LPush(data []byte) error {
queue.lowLock.Lock()
id, err := queue.lowdecrement()
if err != nil {
queue.lowLock.Unlock()
return err
}
err = queue.db.Put(withPrefix(queue.prefix, id2bytes(id)), data, nil)
queue.lowLock.Unlock()
return err
}
// RPop pop a data from right of queue
func (queue *Queue) RPop() ([]byte, error) {
queue.highLock.Lock()
defer queue.highLock.Unlock()
currentID := queue.high
res, err := queue.db.Get(withPrefix(queue.prefix, id2bytes(currentID)), nil)
if err != nil {
if err == leveldb.ErrNotFound {
return nil, ErrNotFound
}
return nil, err
}
_, err = queue.highdecrement()
if err != nil {
return nil, err
}
err = queue.db.Delete(withPrefix(queue.prefix, id2bytes(currentID)), nil)
if err != nil {
return nil, err
}
return res, nil
}
// RHandle receives a user callback function to handle the right element of the queue, if function return nil, then delete the element, otherwise keep the element.
func (queue *Queue) RHandle(h func([]byte) error) error {
queue.highLock.Lock()
defer queue.highLock.Unlock()
currentID := queue.high
res, err := queue.db.Get(withPrefix(queue.prefix, id2bytes(currentID)), nil)
if err != nil {
if err == leveldb.ErrNotFound {
return ErrNotFound
}
return err
}
if err = h(res); err != nil {
return err
}
_, err = queue.highdecrement()
if err != nil {
return err
}
return queue.db.Delete(withPrefix(queue.prefix, id2bytes(currentID)), nil)
}
// LPop pop a data from left of queue
func (queue *Queue) LPop() ([]byte, error) {
queue.lowLock.Lock()
defer queue.lowLock.Unlock()
currentID := queue.low
res, err := queue.db.Get(withPrefix(queue.prefix, id2bytes(currentID)), nil)
if err != nil {
if err == leveldb.ErrNotFound {
return nil, ErrNotFound
}
return nil, err
}
_, err = queue.lowincrement()
if err != nil {
return nil, err
}
err = queue.db.Delete(withPrefix(queue.prefix, id2bytes(currentID)), nil)
if err != nil {
return nil, err
}
return res, nil
}
// LHandle receives a user callback function to handle the left element of the queue, if function return nil, then delete the element, otherwise keep the element.
func (queue *Queue) LHandle(h func([]byte) error) error {
queue.lowLock.Lock()
defer queue.lowLock.Unlock()
currentID := queue.low
res, err := queue.db.Get(withPrefix(queue.prefix, id2bytes(currentID)), nil)
if err != nil {
if err == leveldb.ErrNotFound {
return ErrNotFound
}
return err
}
if err = h(res); err != nil {
return err
}
_, err = queue.lowincrement()
if err != nil {
return err
}
return queue.db.Delete(withPrefix(queue.prefix, id2bytes(currentID)), nil)
}
// Close closes the queue (and the underlying db is set to closeUnderlyingDB)
func (queue *Queue) Close() error {
if !queue.closeUnderlyingDB {
queue.db = nil
return nil
}
err := queue.db.Close()
queue.db = nil
return err
}