* Dropped unused codekit config * Integrated dynamic and static bindata for public * Ignore public bindata * Add a general generate make task * Integrated flexible public assets into web command * Updated vendoring, added all missiong govendor deps * Made the linter happy with the bindata and dynamic code * Moved public bindata definition to modules directory * Ignoring the new bindata path now * Updated to the new public modules import path * Updated public bindata command and drop the new prefix
		
			
				
	
	
		
			148 lines
		
	
	
		
			3.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			148 lines
		
	
	
		
			3.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package themis
 | |
| 
 | |
| import (
 | |
| 	"bytes"
 | |
| 	"encoding/binary"
 | |
| 	"math"
 | |
| 	"strings"
 | |
| 
 | |
| 	"github.com/juju/errors"
 | |
| 	"github.com/ngaut/log"
 | |
| 	"github.com/pingcap/go-hbase"
 | |
| )
 | |
| 
 | |
| var _ LockManager = (*themisLockManager)(nil)
 | |
| 
 | |
| type themisLockManager struct {
 | |
| 	rpc         *themisRPC
 | |
| 	hbaseClient hbase.HBaseClient
 | |
| }
 | |
| 
 | |
| func newThemisLockManager(rpc *themisRPC, hbaseCli hbase.HBaseClient) LockManager {
 | |
| 	return &themisLockManager{
 | |
| 		rpc:         rpc,
 | |
| 		hbaseClient: hbaseCli,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func getDataColFromMetaCol(lockOrWriteCol hbase.Column) hbase.Column {
 | |
| 	// get data column from lock column
 | |
| 	// key is like => L:family#qual, #p:family#qual
 | |
| 	parts := strings.Split(string(lockOrWriteCol.Qual), "#")
 | |
| 	if len(parts) != 2 {
 | |
| 		return lockOrWriteCol
 | |
| 	}
 | |
| 	c := hbase.Column{
 | |
| 		Family: []byte(parts[0]),
 | |
| 		Qual:   []byte(parts[1]),
 | |
| 	}
 | |
| 	return c
 | |
| }
 | |
| 
 | |
| func getLocksFromResults(tbl []byte, lockKvs []*hbase.Kv, client *themisRPC) ([]Lock, error) {
 | |
| 	var locks []Lock
 | |
| 	for _, kv := range lockKvs {
 | |
| 		col := &hbase.ColumnCoordinate{
 | |
| 			Table: tbl,
 | |
| 			Row:   kv.Row,
 | |
| 			Column: hbase.Column{
 | |
| 				Family: kv.Family,
 | |
| 				Qual:   kv.Qual,
 | |
| 			},
 | |
| 		}
 | |
| 		if !isLockColumn(col.Column) {
 | |
| 			return nil, errors.New("invalid lock")
 | |
| 		}
 | |
| 		l, err := parseLockFromBytes(kv.Value)
 | |
| 		if err != nil {
 | |
| 			return nil, errors.Trace(err)
 | |
| 		}
 | |
| 		cc := &hbase.ColumnCoordinate{
 | |
| 			Table:  tbl,
 | |
| 			Row:    kv.Row,
 | |
| 			Column: getDataColFromMetaCol(col.Column),
 | |
| 		}
 | |
| 		l.SetCoordinate(cc)
 | |
| 		client.checkAndSetLockIsExpired(l)
 | |
| 		locks = append(locks, l)
 | |
| 	}
 | |
| 	return locks, nil
 | |
| }
 | |
| 
 | |
| func (m *themisLockManager) IsLockExists(cc *hbase.ColumnCoordinate, startTs, endTs uint64) (bool, error) {
 | |
| 	get := hbase.NewGet(cc.Row)
 | |
| 	get.AddTimeRange(startTs, endTs+1)
 | |
| 	get.AddStringColumn(string(LockFamilyName), string(cc.Family)+"#"+string(cc.Qual))
 | |
| 	// check if lock exists
 | |
| 	rs, err := m.hbaseClient.Get(string(cc.Table), get)
 | |
| 	if err != nil {
 | |
| 		return false, errors.Trace(err)
 | |
| 	}
 | |
| 	// primary lock has been released
 | |
| 	if rs == nil {
 | |
| 		return false, nil
 | |
| 	}
 | |
| 	return true, nil
 | |
| }
 | |
| 
 | |
| func (m *themisLockManager) GetCommitTimestamp(cc *hbase.ColumnCoordinate, prewriteTs uint64) (uint64, error) {
 | |
| 	g := hbase.NewGet(cc.Row)
 | |
| 	// add put write column
 | |
| 	qual := string(cc.Family) + "#" + string(cc.Qual)
 | |
| 	g.AddStringColumn("#p", qual)
 | |
| 	// add del write column
 | |
| 	g.AddStringColumn("#d", qual)
 | |
| 	// time range => [ours startTs, +Inf)
 | |
| 	g.AddTimeRange(prewriteTs, math.MaxInt64)
 | |
| 	g.SetMaxVersion(math.MaxInt32)
 | |
| 	r, err := m.hbaseClient.Get(string(cc.Table), g)
 | |
| 	if err != nil {
 | |
| 		return 0, errors.Trace(err)
 | |
| 	}
 | |
| 	// may delete by other client
 | |
| 	if r == nil {
 | |
| 		return 0, nil
 | |
| 	}
 | |
| 	for _, kv := range r.SortedColumns {
 | |
| 		for commitTs, val := range kv.Values {
 | |
| 			var ts uint64
 | |
| 			binary.Read(bytes.NewBuffer(val), binary.BigEndian, &ts)
 | |
| 			if ts == prewriteTs {
 | |
| 				// get this commit's commitTs
 | |
| 				return commitTs, nil
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	// no such transction
 | |
| 	return 0, nil
 | |
| }
 | |
| 
 | |
| func (m *themisLockManager) CleanLock(cc *hbase.ColumnCoordinate, prewriteTs uint64) (uint64, Lock, error) {
 | |
| 	l, err := m.rpc.getLockAndErase(cc, prewriteTs)
 | |
| 	if err != nil {
 | |
| 		return 0, nil, errors.Trace(err)
 | |
| 	}
 | |
| 	pl, _ := l.(*themisPrimaryLock)
 | |
| 	// if primary lock is nil, means someothers have already committed
 | |
| 	if pl == nil {
 | |
| 		commitTs, err := m.GetCommitTimestamp(cc, prewriteTs)
 | |
| 		if err != nil {
 | |
| 			return 0, nil, errors.Trace(err)
 | |
| 		}
 | |
| 		return commitTs, nil, nil
 | |
| 	}
 | |
| 	return 0, pl, nil
 | |
| }
 | |
| 
 | |
| func (m *themisLockManager) EraseLockAndData(cc *hbase.ColumnCoordinate, prewriteTs uint64) error {
 | |
| 	log.Debugf("erase row=%q txn=%d", cc.Row, prewriteTs)
 | |
| 	d := hbase.NewDelete(cc.Row)
 | |
| 	d.AddColumnWithTimestamp(LockFamilyName, []byte(string(cc.Family)+"#"+string(cc.Qual)), prewriteTs)
 | |
| 	d.AddColumnWithTimestamp(cc.Family, cc.Qual, prewriteTs)
 | |
| 	ok, err := m.hbaseClient.Delete(string(cc.Table), d)
 | |
| 	if !ok {
 | |
| 		log.Error(err)
 | |
| 	}
 | |
| 	return errors.Trace(err)
 | |
| }
 |