* Dropped unused codekit config * Integrated dynamic and static bindata for public * Ignore public bindata * Add a general generate make task * Integrated flexible public assets into web command * Updated vendoring, added all missiong govendor deps * Made the linter happy with the bindata and dynamic code * Moved public bindata definition to modules directory * Ignoring the new bindata path now * Updated to the new public modules import path * Updated public bindata command and drop the new prefix
		
			
				
	
	
		
			244 lines
		
	
	
		
			4.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			244 lines
		
	
	
		
			4.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package client
 | |
| 
 | |
| import (
 | |
| 	"container/list"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/juju/errors"
 | |
| 	"github.com/ngaut/log"
 | |
| 	"github.com/ngaut/tso/proto"
 | |
| 	"github.com/ngaut/tso/util"
 | |
| 	"github.com/ngaut/zkhelper"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	maxPipelineRequest = 100000
 | |
| )
 | |
| 
 | |
| // Client is a timestamp oracle client.
 | |
| type Client struct {
 | |
| 	requests chan *PipelineRequest
 | |
| 
 | |
| 	pending *list.List
 | |
| 	conf    *Conf
 | |
| 
 | |
| 	addr string
 | |
| 
 | |
| 	leaderCh chan string
 | |
| }
 | |
| 
 | |
| // Conf is the configuration.
 | |
| type Conf struct {
 | |
| 	// tso server address, it will be deprecated later.
 | |
| 	ServerAddr string
 | |
| 
 | |
| 	// ZKAddr is for zookeeper address, if set, client will ignore ServerAddr
 | |
| 	// and find the leader tso server address in zookeeper.
 | |
| 	// Later ServerAddr is just for simple test and backward compatibility.
 | |
| 	ZKAddr string
 | |
| 
 | |
| 	// root path is the tso server saving in zookeeper, like /zk/tso.
 | |
| 	RootPath string
 | |
| }
 | |
| 
 | |
| // PipelineRequest let you get the timestamp with pipeline.
 | |
| type PipelineRequest struct {
 | |
| 	done  chan error
 | |
| 	reply *proto.Response
 | |
| }
 | |
| 
 | |
| func newPipelineRequest() *PipelineRequest {
 | |
| 	return &PipelineRequest{
 | |
| 		done: make(chan error, 1),
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // MarkDone sets the repsone for current request.
 | |
| func (pr *PipelineRequest) MarkDone(reply *proto.Response, err error) {
 | |
| 	if err != nil {
 | |
| 		pr.reply = nil
 | |
| 	}
 | |
| 	pr.reply = reply
 | |
| 	pr.done <- errors.Trace(err)
 | |
| }
 | |
| 
 | |
| // GetTS gets the timestamp.
 | |
| func (pr *PipelineRequest) GetTS() (*proto.Timestamp, error) {
 | |
| 	err := <-pr.done
 | |
| 	if err != nil {
 | |
| 		return nil, errors.Trace(err)
 | |
| 	}
 | |
| 
 | |
| 	return &pr.reply.Timestamp, nil
 | |
| }
 | |
| 
 | |
| // NewClient creates a timestamp oracle client.
 | |
| func NewClient(conf *Conf) *Client {
 | |
| 	c := &Client{
 | |
| 		requests: make(chan *PipelineRequest, maxPipelineRequest),
 | |
| 		pending:  list.New(),
 | |
| 		conf:     conf,
 | |
| 		leaderCh: make(chan string, 1),
 | |
| 	}
 | |
| 
 | |
| 	if len(conf.ZKAddr) == 0 {
 | |
| 		c.leaderCh <- conf.ServerAddr
 | |
| 	} else {
 | |
| 		go c.watchLeader()
 | |
| 	}
 | |
| 
 | |
| 	go c.workerLoop()
 | |
| 
 | |
| 	return c
 | |
| }
 | |
| 
 | |
| func (c *Client) cleanupPending(err error) {
 | |
| 	log.Warn(err)
 | |
| 	length := c.pending.Len()
 | |
| 	for i := 0; i < length; i++ {
 | |
| 		e := c.pending.Front()
 | |
| 		c.pending.Remove(e)
 | |
| 		e.Value.(*PipelineRequest).MarkDone(nil, err)
 | |
| 	}
 | |
| 
 | |
| 	// clear request in channel too
 | |
| 	length = len(c.requests)
 | |
| 	for i := 0; i < length; i++ {
 | |
| 		req := <-c.requests
 | |
| 		req.MarkDone(nil, err)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (c *Client) notifyOne(reply *proto.Response) {
 | |
| 	e := c.pending.Front()
 | |
| 	c.pending.Remove(e)
 | |
| 	req := e.Value.(*PipelineRequest)
 | |
| 	req.MarkDone(reply, nil)
 | |
| }
 | |
| 
 | |
| func (c *Client) writeRequests(session *Conn) error {
 | |
| 	var protoHdr [1]byte
 | |
| 	for i := 0; i < c.pending.Len(); i++ {
 | |
| 		session.Write(protoHdr[:])
 | |
| 	}
 | |
| 	return session.Flush()
 | |
| }
 | |
| 
 | |
| func (c *Client) handleResponse(session *Conn) error {
 | |
| 	length := c.pending.Len()
 | |
| 	for i := 0; i < length; i++ {
 | |
| 		var resp proto.Response
 | |
| 		err := resp.Decode(session)
 | |
| 		if err != nil {
 | |
| 			return errors.Trace(err)
 | |
| 		}
 | |
| 		c.notifyOne(&resp)
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (c *Client) do() error {
 | |
| 	session, err := NewConnection(c.addr, time.Duration(1*time.Second))
 | |
| 	if err != nil {
 | |
| 		return errors.Trace(err)
 | |
| 	}
 | |
| 
 | |
| 	log.Debugf("connect tso server %s ok", c.addr)
 | |
| 
 | |
| 	defer session.Close()
 | |
| 	for {
 | |
| 		select {
 | |
| 		case req := <-c.requests:
 | |
| 			c.pending.PushBack(req)
 | |
| 			length := len(c.requests)
 | |
| 			for i := 0; i < length; i++ {
 | |
| 				req = <-c.requests
 | |
| 				c.pending.PushBack(req)
 | |
| 			}
 | |
| 
 | |
| 			err = c.writeRequests(session)
 | |
| 			if err != nil {
 | |
| 				return errors.Trace(err)
 | |
| 			}
 | |
| 			err = c.handleResponse(session)
 | |
| 			if err != nil {
 | |
| 				return errors.Trace(err)
 | |
| 			}
 | |
| 		case addr := <-c.leaderCh:
 | |
| 			oldAddr := c.addr
 | |
| 			c.addr = addr
 | |
| 			return errors.Errorf("leader change %s -> %s", oldAddr, addr)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (c *Client) workerLoop() {
 | |
| 	// first get tso leader
 | |
| 	c.addr = <-c.leaderCh
 | |
| 	log.Debugf("try to connect tso server %s", c.addr)
 | |
| 
 | |
| 	for {
 | |
| 		err := c.do()
 | |
| 		if err != nil {
 | |
| 			c.cleanupPending(err)
 | |
| 		}
 | |
| 		select {
 | |
| 		case <-time.After(1 * time.Second):
 | |
| 		case addr := <-c.leaderCh:
 | |
| 			// If old tso server down, NewConnection will fail and return immediately in do function,
 | |
| 			// so we must check leader change here.
 | |
| 			log.Warnf("leader change %s -> %s", c.addr, addr)
 | |
| 			c.addr = addr
 | |
| 			// Wait some time to let tso server allow accepting connections.
 | |
| 			time.Sleep(1 * time.Second)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (c *Client) watchLeader() {
 | |
| 	var (
 | |
| 		conn zkhelper.Conn
 | |
| 		err  error
 | |
| 	)
 | |
| 
 | |
| 	for {
 | |
| 		conn, err = zkhelper.ConnectToZkWithTimeout(c.conf.ZKAddr, time.Second)
 | |
| 		if err != nil {
 | |
| 			log.Errorf("connect zk err %v, retry later", err)
 | |
| 			time.Sleep(3 * time.Second)
 | |
| 			continue
 | |
| 		}
 | |
| 		break
 | |
| 	}
 | |
| 
 | |
| 	defer conn.Close()
 | |
| 
 | |
| 	var lastAddr string
 | |
| 
 | |
| 	for {
 | |
| 		addr, watcher, err := util.GetWatchLeader(conn, c.conf.RootPath)
 | |
| 		if err != nil {
 | |
| 			log.Errorf("get tso leader err %v, retry later", err)
 | |
| 			time.Sleep(3 * time.Second)
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		if lastAddr != addr {
 | |
| 			log.Warnf("leader change %s -> %s", lastAddr, addr)
 | |
| 			lastAddr = addr
 | |
| 			c.leaderCh <- addr
 | |
| 		}
 | |
| 
 | |
| 		// watch the leader changes.
 | |
| 		<-watcher
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // GoGetTimestamp returns a PipelineRequest so you can get the timestamp later.
 | |
| func (c *Client) GoGetTimestamp() *PipelineRequest {
 | |
| 	pr := newPipelineRequest()
 | |
| 	c.requests <- pr
 | |
| 	return pr
 | |
| }
 |