* Dropped unused codekit config * Integrated dynamic and static bindata for public * Ignore public bindata * Add a general generate make task * Integrated flexible public assets into web command * Updated vendoring, added all missiong govendor deps * Made the linter happy with the bindata and dynamic code * Moved public bindata definition to modules directory * Ignoring the new bindata path now * Updated to the new public modules import path * Updated public bindata command and drop the new prefix
		
			
				
	
	
		
			239 lines
		
	
	
		
			5.3 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			239 lines
		
	
	
		
			5.3 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright 2015 PingCAP, Inc.
 | |
| //
 | |
| // Licensed under the Apache License, Version 2.0 (the "License");
 | |
| // you may not use this file except in compliance with the License.
 | |
| // You may obtain a copy of the License at
 | |
| //
 | |
| //     http://www.apache.org/licenses/LICENSE-2.0
 | |
| //
 | |
| // Unless required by applicable law or agreed to in writing, software
 | |
| // distributed under the License is distributed on an "AS IS" BASIS,
 | |
| // See the License for the specific language governing permissions and
 | |
| // limitations under the License.
 | |
| 
 | |
| package hbasekv
 | |
| 
 | |
| import (
 | |
| 	"fmt"
 | |
| 	"math/rand"
 | |
| 	"net/url"
 | |
| 	"path/filepath"
 | |
| 	"strings"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/juju/errors"
 | |
| 	"github.com/ngaut/log"
 | |
| 	"github.com/pingcap/go-hbase"
 | |
| 	"github.com/pingcap/go-themis"
 | |
| 	"github.com/pingcap/go-themis/oracle"
 | |
| 	"github.com/pingcap/go-themis/oracle/oracles"
 | |
| 	"github.com/pingcap/tidb/kv"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	// hbaseColFamily is the hbase column family name.
 | |
| 	hbaseColFamily = "f"
 | |
| 	// hbaseQualifier is the hbase column name.
 | |
| 	hbaseQualifier = "q"
 | |
| 	// hbaseFmlAndQual is a shortcut.
 | |
| 	hbaseFmlAndQual = hbaseColFamily + ":" + hbaseQualifier
 | |
| 	// fix length conn pool
 | |
| 	hbaseConnPoolSize = 10
 | |
| )
 | |
| 
 | |
| var (
 | |
| 	hbaseColFamilyBytes = []byte(hbaseColFamily)
 | |
| 	hbaseQualifierBytes = []byte(hbaseQualifier)
 | |
| )
 | |
| 
 | |
| var (
 | |
| 	_ kv.Storage = (*hbaseStore)(nil)
 | |
| )
 | |
| 
 | |
| var (
 | |
| 	// ErrInvalidDSN is returned when store dsn is invalid.
 | |
| 	ErrInvalidDSN = errors.New("invalid dsn")
 | |
| )
 | |
| 
 | |
| type storeCache struct {
 | |
| 	mu    sync.Mutex
 | |
| 	cache map[string]*hbaseStore
 | |
| }
 | |
| 
 | |
| var mc storeCache
 | |
| 
 | |
| func init() {
 | |
| 	mc.cache = make(map[string]*hbaseStore)
 | |
| 	rand.Seed(time.Now().UnixNano())
 | |
| }
 | |
| 
 | |
| type hbaseStore struct {
 | |
| 	mu        sync.Mutex
 | |
| 	uuid      string
 | |
| 	storeName string
 | |
| 	oracle    oracle.Oracle
 | |
| 	conns     []hbase.HBaseClient
 | |
| }
 | |
| 
 | |
| func (s *hbaseStore) getHBaseClient() hbase.HBaseClient {
 | |
| 	// return hbase connection randomly
 | |
| 	return s.conns[rand.Intn(hbaseConnPoolSize)]
 | |
| }
 | |
| 
 | |
| func (s *hbaseStore) Begin() (kv.Transaction, error) {
 | |
| 	s.mu.Lock()
 | |
| 	defer s.mu.Unlock()
 | |
| 	hbaseCli := s.getHBaseClient()
 | |
| 	t, err := themis.NewTxn(hbaseCli, s.oracle)
 | |
| 	if err != nil {
 | |
| 		return nil, errors.Trace(err)
 | |
| 	}
 | |
| 	txn := newHbaseTxn(t, s.storeName)
 | |
| 	return txn, nil
 | |
| }
 | |
| 
 | |
| func (s *hbaseStore) GetSnapshot(ver kv.Version) (kv.Snapshot, error) {
 | |
| 	hbaseCli := s.getHBaseClient()
 | |
| 	t, err := themis.NewTxn(hbaseCli, s.oracle)
 | |
| 	if err != nil {
 | |
| 		return nil, errors.Trace(err)
 | |
| 	}
 | |
| 	return newHbaseSnapshot(t, s.storeName), nil
 | |
| }
 | |
| 
 | |
| func (s *hbaseStore) Close() error {
 | |
| 	mc.mu.Lock()
 | |
| 	defer mc.mu.Unlock()
 | |
| 
 | |
| 	delete(mc.cache, s.uuid)
 | |
| 
 | |
| 	var err error
 | |
| 	for _, conn := range s.conns {
 | |
| 		err = conn.Close()
 | |
| 		if err != nil {
 | |
| 			log.Error(err)
 | |
| 		}
 | |
| 	}
 | |
| 	// return last error
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| func (s *hbaseStore) UUID() string {
 | |
| 	return s.uuid
 | |
| }
 | |
| 
 | |
| func (s *hbaseStore) CurrentVersion() (kv.Version, error) {
 | |
| 	hbaseCli := s.getHBaseClient()
 | |
| 	t, err := themis.NewTxn(hbaseCli, s.oracle)
 | |
| 	if err != nil {
 | |
| 		return kv.Version{Ver: 0}, errors.Trace(err)
 | |
| 	}
 | |
| 	defer t.Release()
 | |
| 
 | |
| 	return kv.Version{Ver: t.GetStartTS()}, nil
 | |
| }
 | |
| 
 | |
| // Driver implements engine Driver.
 | |
| type Driver struct {
 | |
| }
 | |
| 
 | |
| const (
 | |
| 	tsoTypeLocal = "local"
 | |
| 	tsoTypeZK    = "zk"
 | |
| 
 | |
| 	tsoZKPath = "/zk/tso"
 | |
| )
 | |
| 
 | |
| // Open opens or creates an HBase storage with given path.
 | |
| //
 | |
| // The format of path should be 'hbase://zk1,zk2,zk3/table[?tso=local|zk]'.
 | |
| // If tso is not provided, it will use a local oracle instead. (for test only)
 | |
| func (d Driver) Open(path string) (kv.Storage, error) {
 | |
| 	mc.mu.Lock()
 | |
| 	defer mc.mu.Unlock()
 | |
| 
 | |
| 	zks, tso, tableName, err := parsePath(path)
 | |
| 	if err != nil {
 | |
| 		return nil, errors.Trace(err)
 | |
| 	}
 | |
| 	if tso != tsoTypeLocal && tso != tsoTypeZK {
 | |
| 		return nil, errors.Trace(ErrInvalidDSN)
 | |
| 	}
 | |
| 
 | |
| 	uuid := fmt.Sprintf("hbase-%v-%v", zks, tableName)
 | |
| 	if tso == tsoTypeLocal {
 | |
| 		log.Warnf("hbase: store(%s) is using local oracle(for test only)", uuid)
 | |
| 	}
 | |
| 	if store, ok := mc.cache[uuid]; ok {
 | |
| 		return store, nil
 | |
| 	}
 | |
| 
 | |
| 	// create buffered HBase connections, HBaseClient is goroutine-safe, so
 | |
| 	// it's OK to redistribute to transactions.
 | |
| 	conns := make([]hbase.HBaseClient, 0, hbaseConnPoolSize)
 | |
| 	for i := 0; i < hbaseConnPoolSize; i++ {
 | |
| 		var c hbase.HBaseClient
 | |
| 		c, err = hbase.NewClient(strings.Split(zks, ","), "/hbase")
 | |
| 		if err != nil {
 | |
| 			return nil, errors.Trace(err)
 | |
| 		}
 | |
| 		conns = append(conns, c)
 | |
| 	}
 | |
| 
 | |
| 	c := conns[0]
 | |
| 	var b bool
 | |
| 	b, err = c.TableExists(tableName)
 | |
| 	if err != nil {
 | |
| 		return nil, errors.Trace(err)
 | |
| 	}
 | |
| 	if !b {
 | |
| 		// Create new hbase table for store.
 | |
| 		t := hbase.NewTableDesciptor(tableName)
 | |
| 		cf := hbase.NewColumnFamilyDescriptor(hbaseColFamily)
 | |
| 		cf.AddAttr("THEMIS_ENABLE", "true")
 | |
| 		t.AddColumnDesc(cf)
 | |
| 		//TODO: specify split?
 | |
| 		if err := c.CreateTable(t, nil); err != nil {
 | |
| 			return nil, errors.Trace(err)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	var ora oracle.Oracle
 | |
| 	switch tso {
 | |
| 	case tsoTypeLocal:
 | |
| 		ora = oracles.NewLocalOracle()
 | |
| 	case tsoTypeZK:
 | |
| 		ora = oracles.NewRemoteOracle(zks, tsoZKPath)
 | |
| 	}
 | |
| 
 | |
| 	s := &hbaseStore{
 | |
| 		uuid:      uuid,
 | |
| 		storeName: tableName,
 | |
| 		oracle:    ora,
 | |
| 		conns:     conns,
 | |
| 	}
 | |
| 	mc.cache[uuid] = s
 | |
| 	return s, nil
 | |
| }
 | |
| 
 | |
| func parsePath(path string) (zks, tso, tableName string, err error) {
 | |
| 	u, err := url.Parse(path)
 | |
| 	if err != nil {
 | |
| 		return "", "", "", errors.Trace(err)
 | |
| 	}
 | |
| 	if strings.ToLower(u.Scheme) != "hbase" {
 | |
| 		return "", "", "", errors.Trace(ErrInvalidDSN)
 | |
| 	}
 | |
| 	p, tableName := filepath.Split(u.Path)
 | |
| 	if p != "/" {
 | |
| 		return "", "", "", errors.Trace(ErrInvalidDSN)
 | |
| 	}
 | |
| 	zks = u.Host
 | |
| 	tso = u.Query().Get("tso")
 | |
| 	if tso == "" {
 | |
| 		tso = tsoTypeLocal
 | |
| 	}
 | |
| 	return zks, tso, tableName, nil
 | |
| }
 |