githaven/vendor/github.com/couchbaselabs/go-couchbase/observe.go
2019-02-05 11:52:51 -05:00

301 lines
6.1 KiB
Go

package couchbase
import (
"fmt"
"github.com/couchbase/goutils/logging"
"sync"
)
type PersistTo uint8
const (
PersistNone = PersistTo(0x00)
PersistMaster = PersistTo(0x01)
PersistOne = PersistTo(0x02)
PersistTwo = PersistTo(0x03)
PersistThree = PersistTo(0x04)
PersistFour = PersistTo(0x05)
)
type ObserveTo uint8
const (
ObserveNone = ObserveTo(0x00)
ObserveReplicateOne = ObserveTo(0x01)
ObserveReplicateTwo = ObserveTo(0x02)
ObserveReplicateThree = ObserveTo(0x03)
ObserveReplicateFour = ObserveTo(0x04)
)
type JobType uint8
const (
OBSERVE = JobType(0x00)
PERSIST = JobType(0x01)
)
type ObservePersistJob struct {
vb uint16
vbuuid uint64
hostname string
jobType JobType
failover uint8
lastPersistedSeqNo uint64
currentSeqNo uint64
resultChan chan *ObservePersistJob
errorChan chan *OPErrResponse
}
type OPErrResponse struct {
vb uint16
vbuuid uint64
err error
job *ObservePersistJob
}
var ObservePersistPool = NewPool(1024)
var OPJobChan = make(chan *ObservePersistJob, 1024)
var OPJobDone = make(chan bool)
var wg sync.WaitGroup
func (b *Bucket) StartOPPollers(maxWorkers int) {
for i := 0; i < maxWorkers; i++ {
go b.OPJobPoll()
wg.Add(1)
}
wg.Wait()
}
func (b *Bucket) SetObserveAndPersist(nPersist PersistTo, nObserve ObserveTo) (err error) {
numNodes := len(b.Nodes())
if int(nPersist) > numNodes || int(nObserve) > numNodes {
return fmt.Errorf("Not enough healthy nodes in the cluster")
}
if int(nPersist) > (b.Replicas+1) || int(nObserve) > b.Replicas {
return fmt.Errorf("Not enough replicas in the cluster")
}
if EnableMutationToken == false {
return fmt.Errorf("Mutation Tokens not enabled ")
}
b.ds = &DurablitySettings{Persist: PersistTo(nPersist), Observe: ObserveTo(nObserve)}
return
}
func (b *Bucket) ObserveAndPersistPoll(vb uint16, vbuuid uint64, seqNo uint64) (err error, failover bool) {
b.RLock()
ds := b.ds
b.RUnlock()
if ds == nil {
return
}
nj := 0 // total number of jobs
resultChan := make(chan *ObservePersistJob, 10)
errChan := make(chan *OPErrResponse, 10)
nodes := b.GetNodeList(vb)
if int(ds.Observe) > len(nodes) || int(ds.Persist) > len(nodes) {
return fmt.Errorf("Not enough healthy nodes in the cluster"), false
}
logging.Infof("Node list %v", nodes)
if ds.Observe >= ObserveReplicateOne {
// create a job for each host
for i := ObserveReplicateOne; i < ds.Observe+1; i++ {
opJob := ObservePersistPool.Get()
opJob.vb = vb
opJob.vbuuid = vbuuid
opJob.jobType = OBSERVE
opJob.hostname = nodes[i]
opJob.resultChan = resultChan
opJob.errorChan = errChan
OPJobChan <- opJob
nj++
}
}
if ds.Persist >= PersistMaster {
for i := PersistMaster; i < ds.Persist+1; i++ {
opJob := ObservePersistPool.Get()
opJob.vb = vb
opJob.vbuuid = vbuuid
opJob.jobType = PERSIST
opJob.hostname = nodes[i]
opJob.resultChan = resultChan
opJob.errorChan = errChan
OPJobChan <- opJob
nj++
}
}
ok := true
for ok {
select {
case res := <-resultChan:
jobDone := false
if res.failover == 0 {
// no failover
if res.jobType == PERSIST {
if res.lastPersistedSeqNo >= seqNo {
jobDone = true
}
} else {
if res.currentSeqNo >= seqNo {
jobDone = true
}
}
if jobDone == true {
nj--
ObservePersistPool.Put(res)
} else {
// requeue this job
OPJobChan <- res
}
} else {
// Not currently handling failover scenarios TODO
nj--
ObservePersistPool.Put(res)
failover = true
}
if nj == 0 {
// done with all the jobs
ok = false
close(resultChan)
close(errChan)
}
case Err := <-errChan:
logging.Errorf("Error in Observe/Persist %v", Err.err)
err = fmt.Errorf("Error in Observe/Persist job %v", Err.err)
nj--
ObservePersistPool.Put(Err.job)
if nj == 0 {
close(resultChan)
close(errChan)
ok = false
}
}
}
return
}
func (b *Bucket) OPJobPoll() {
ok := true
for ok == true {
select {
case job := <-OPJobChan:
pool := b.getConnPoolByHost(job.hostname, false /* bucket not already locked */)
if pool == nil {
errRes := &OPErrResponse{vb: job.vb, vbuuid: job.vbuuid}
errRes.err = fmt.Errorf("Pool not found for host %v", job.hostname)
errRes.job = job
job.errorChan <- errRes
continue
}
conn, err := pool.Get()
if err != nil {
errRes := &OPErrResponse{vb: job.vb, vbuuid: job.vbuuid}
errRes.err = fmt.Errorf("Unable to get connection from pool %v", err)
errRes.job = job
job.errorChan <- errRes
continue
}
res, err := conn.ObserveSeq(job.vb, job.vbuuid)
if err != nil {
errRes := &OPErrResponse{vb: job.vb, vbuuid: job.vbuuid}
errRes.err = fmt.Errorf("Command failed %v", err)
errRes.job = job
job.errorChan <- errRes
continue
}
pool.Return(conn)
job.lastPersistedSeqNo = res.LastPersistedSeqNo
job.currentSeqNo = res.CurrentSeqNo
job.failover = res.Failover
job.resultChan <- job
case <-OPJobDone:
logging.Infof("Observe Persist Poller exitting")
ok = false
}
}
wg.Done()
}
func (b *Bucket) GetNodeList(vb uint16) []string {
vbm := b.VBServerMap()
if len(vbm.VBucketMap) < int(vb) {
logging.Infof("vbmap smaller than vblist")
return nil
}
nodes := make([]string, len(vbm.VBucketMap[vb]))
for i := 0; i < len(vbm.VBucketMap[vb]); i++ {
n := vbm.VBucketMap[vb][i]
if n < 0 {
continue
}
node := b.getMasterNode(n)
if len(node) > 1 {
nodes[i] = node
}
continue
}
return nodes
}
//pool of ObservePersist Jobs
type OPpool struct {
pool chan *ObservePersistJob
}
// NewPool creates a new pool of jobs
func NewPool(max int) *OPpool {
return &OPpool{
pool: make(chan *ObservePersistJob, max),
}
}
// Borrow a Client from the pool.
func (p *OPpool) Get() *ObservePersistJob {
var o *ObservePersistJob
select {
case o = <-p.pool:
default:
o = &ObservePersistJob{}
}
return o
}
// Return returns a Client to the pool.
func (p *OPpool) Put(o *ObservePersistJob) {
select {
case p.pool <- o:
default:
// let it go, let it go...
}
}