githaven-fork/vendor/github.com/couchbase/go-couchbase/streaming.go

225 lines
5.2 KiB
Go
Raw Normal View History

2019-02-05 16:52:51 +00:00
package couchbase
import (
"encoding/json"
"fmt"
"github.com/couchbase/goutils/logging"
"io"
"io/ioutil"
"net"
"net/http"
"time"
"unsafe"
)
// Bucket auto-updater gets the latest version of the bucket config from
// the server. If the configuration has changed then updated the local
// bucket information. If the bucket has been deleted then notify anyone
// who is holding a reference to this bucket
const MAX_RETRY_COUNT = 5
const DISCONNECT_PERIOD = 120 * time.Second
type NotifyFn func(bucket string, err error)
type StreamingFn func(bucket *Bucket)
2019-02-05 16:52:51 +00:00
// Use TCP keepalive to detect half close sockets
var updaterTransport http.RoundTripper = &http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).Dial,
}
var updaterHTTPClient = &http.Client{Transport: updaterTransport}
func doHTTPRequestForUpdate(req *http.Request) (*http.Response, error) {
var err error
var res *http.Response
for i := 0; i < HTTP_MAX_RETRY; i++ {
res, err = updaterHTTPClient.Do(req)
if err != nil && isHttpConnError(err) {
continue
}
break
}
if err != nil {
return nil, err
}
return res, err
}
func (b *Bucket) RunBucketUpdater(notify NotifyFn) {
b.RunBucketUpdater2(nil, notify)
}
func (b *Bucket) RunBucketUpdater2(streamingFn StreamingFn, notify NotifyFn) {
2019-02-05 16:52:51 +00:00
go func() {
err := b.UpdateBucket2(streamingFn)
2019-02-05 16:52:51 +00:00
if err != nil {
if notify != nil {
notify(b.GetName(), err)
}
logging.Errorf(" Bucket Updater exited with err %v", err)
}
}()
}
func (b *Bucket) replaceConnPools2(with []*connectionPool, bucketLocked bool) {
if !bucketLocked {
b.Lock()
defer b.Unlock()
}
old := b.connPools
b.connPools = unsafe.Pointer(&with)
if old != nil {
for _, pool := range *(*[]*connectionPool)(old) {
if pool != nil && pool.inUse == false {
pool.Close()
}
}
}
return
}
func (b *Bucket) UpdateBucket() error {
return b.UpdateBucket2(nil)
}
2019-02-05 16:52:51 +00:00
func (b *Bucket) UpdateBucket2(streamingFn StreamingFn) error {
2019-02-05 16:52:51 +00:00
var failures int
var returnErr error
var poolServices PoolServices
2019-02-05 16:52:51 +00:00
for {
if failures == MAX_RETRY_COUNT {
logging.Errorf(" Maximum failures reached. Exiting loop...")
return fmt.Errorf("Max failures reached. Last Error %v", returnErr)
}
nodes := b.Nodes()
if len(nodes) < 1 {
return fmt.Errorf("No healthy nodes found")
}
2021-02-28 23:08:33 +00:00
streamUrl := fmt.Sprintf("%s/pools/default/bucketsStreaming/%s", b.pool.client.BaseURL, uriAdj(b.GetName()))
2019-02-05 16:52:51 +00:00
logging.Infof(" Trying with %s", streamUrl)
req, err := http.NewRequest("GET", streamUrl, nil)
if err != nil {
return err
}
// Lock here to avoid having pool closed under us.
b.RLock()
err = maybeAddAuth(req, b.pool.client.ah)
b.RUnlock()
if err != nil {
return err
}
res, err := doHTTPRequestForUpdate(req)
if err != nil {
return err
}
if res.StatusCode != 200 {
bod, _ := ioutil.ReadAll(io.LimitReader(res.Body, 512))
logging.Errorf("Failed to connect to host, unexpected status code: %v. Body %s", res.StatusCode, bod)
res.Body.Close()
returnErr = fmt.Errorf("Failed to connect to host. Status %v Body %s", res.StatusCode, bod)
failures++
continue
}
dec := json.NewDecoder(res.Body)
tmpb := &Bucket{}
for {
err := dec.Decode(&tmpb)
if err != nil {
returnErr = err
res.Body.Close()
break
}
// if we got here, reset failure count
failures = 0
if b.pool.client.tlsConfig != nil {
poolServices, err = b.pool.client.GetPoolServices("default")
if err != nil {
returnErr = err
res.Body.Close()
break
}
}
2019-02-05 16:52:51 +00:00
b.Lock()
// mark all the old connection pools for deletion
pools := b.getConnPools(true /* already locked */)
for _, pool := range pools {
if pool != nil {
pool.inUse = false
}
}
newcps := make([]*connectionPool, len(tmpb.VBSMJson.ServerList))
for i := range newcps {
// get the old connection pool and check if it is still valid
pool := b.getConnPoolByHost(tmpb.VBSMJson.ServerList[i], true /* bucket already locked */)
if pool != nil && pool.inUse == false && pool.tlsConfig == b.pool.client.tlsConfig {
2019-02-05 16:52:51 +00:00
// if the hostname and index is unchanged then reuse this pool
newcps[i] = pool
pool.inUse = true
continue
}
// else create a new pool
var encrypted bool
hostport := tmpb.VBSMJson.ServerList[i]
if b.pool.client.tlsConfig != nil {
hostport, encrypted, err = MapKVtoSSL(hostport, &poolServices)
if err != nil {
b.Unlock()
return err
}
}
2019-02-05 16:52:51 +00:00
if b.ah != nil {
newcps[i] = newConnectionPool(hostport,
b.ah, false, PoolSize, PoolOverflow, b.pool.client.tlsConfig, b.Name, encrypted)
2019-02-05 16:52:51 +00:00
} else {
newcps[i] = newConnectionPool(hostport,
2019-02-05 16:52:51 +00:00
b.authHandler(true /* bucket already locked */),
false, PoolSize, PoolOverflow, b.pool.client.tlsConfig, b.Name, encrypted)
2019-02-05 16:52:51 +00:00
}
}
b.replaceConnPools2(newcps, true /* bucket already locked */)
tmpb.ah = b.ah
b.vBucketServerMap = unsafe.Pointer(&tmpb.VBSMJson)
b.nodeList = unsafe.Pointer(&tmpb.NodesJSON)
b.Unlock()
if streamingFn != nil {
streamingFn(tmpb)
}
logging.Debugf("Got new configuration for bucket %s", b.GetName())
2019-02-05 16:52:51 +00:00
}
// we are here because of an error
failures++
continue
}
return nil
}