313 lines
		
	
	
		
			5.4 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			313 lines
		
	
	
		
			5.4 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package nodb
 | |
| 
 | |
| import (
 | |
| 	"bufio"
 | |
| 	"bytes"
 | |
| 	"errors"
 | |
| 	"io"
 | |
| 	"os"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/lunny/log"
 | |
| 	"github.com/lunny/nodb/store/driver"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	maxReplBatchNum = 100
 | |
| 	maxReplLogSize  = 1 * 1024 * 1024
 | |
| )
 | |
| 
 | |
| var (
 | |
| 	ErrSkipEvent = errors.New("skip to next event")
 | |
| )
 | |
| 
 | |
| var (
 | |
| 	errInvalidBinLogEvent = errors.New("invalid binglog event")
 | |
| 	errInvalidBinLogFile  = errors.New("invalid binlog file")
 | |
| )
 | |
| 
 | |
| type replBatch struct {
 | |
| 	wb     driver.IWriteBatch
 | |
| 	events [][]byte
 | |
| 	l      *Nodb
 | |
| 
 | |
| 	lastHead *BinLogHead
 | |
| }
 | |
| 
 | |
| func (b *replBatch) Commit() error {
 | |
| 	b.l.commitLock.Lock()
 | |
| 	defer b.l.commitLock.Unlock()
 | |
| 
 | |
| 	err := b.wb.Commit()
 | |
| 	if err != nil {
 | |
| 		b.Rollback()
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	if b.l.binlog != nil {
 | |
| 		if err = b.l.binlog.Log(b.events...); err != nil {
 | |
| 			b.Rollback()
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	b.events = [][]byte{}
 | |
| 	b.lastHead = nil
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (b *replBatch) Rollback() error {
 | |
| 	b.wb.Rollback()
 | |
| 	b.events = [][]byte{}
 | |
| 	b.lastHead = nil
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (l *Nodb) replicateEvent(b *replBatch, event []byte) error {
 | |
| 	if len(event) == 0 {
 | |
| 		return errInvalidBinLogEvent
 | |
| 	}
 | |
| 
 | |
| 	b.events = append(b.events, event)
 | |
| 
 | |
| 	logType := uint8(event[0])
 | |
| 	switch logType {
 | |
| 	case BinLogTypePut:
 | |
| 		return l.replicatePutEvent(b, event)
 | |
| 	case BinLogTypeDeletion:
 | |
| 		return l.replicateDeleteEvent(b, event)
 | |
| 	default:
 | |
| 		return errInvalidBinLogEvent
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (l *Nodb) replicatePutEvent(b *replBatch, event []byte) error {
 | |
| 	key, value, err := decodeBinLogPut(event)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	b.wb.Put(key, value)
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (l *Nodb) replicateDeleteEvent(b *replBatch, event []byte) error {
 | |
| 	key, err := decodeBinLogDelete(event)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	b.wb.Delete(key)
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func ReadEventFromReader(rb io.Reader, f func(head *BinLogHead, event []byte) error) error {
 | |
| 	head := &BinLogHead{}
 | |
| 	var err error
 | |
| 
 | |
| 	for {
 | |
| 		if err = head.Read(rb); err != nil {
 | |
| 			if err == io.EOF {
 | |
| 				break
 | |
| 			} else {
 | |
| 				return err
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		var dataBuf bytes.Buffer
 | |
| 
 | |
| 		if _, err = io.CopyN(&dataBuf, rb, int64(head.PayloadLen)); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		err = f(head, dataBuf.Bytes())
 | |
| 		if err != nil && err != ErrSkipEvent {
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (l *Nodb) ReplicateFromReader(rb io.Reader) error {
 | |
| 	b := new(replBatch)
 | |
| 
 | |
| 	b.wb = l.ldb.NewWriteBatch()
 | |
| 	b.l = l
 | |
| 
 | |
| 	f := func(head *BinLogHead, event []byte) error {
 | |
| 		if b.lastHead == nil {
 | |
| 			b.lastHead = head
 | |
| 		} else if !b.lastHead.InSameBatch(head) {
 | |
| 			if err := b.Commit(); err != nil {
 | |
| 				log.Fatal("replication error %s, skip to next", err.Error())
 | |
| 				return ErrSkipEvent
 | |
| 			}
 | |
| 			b.lastHead = head
 | |
| 		}
 | |
| 
 | |
| 		err := l.replicateEvent(b, event)
 | |
| 		if err != nil {
 | |
| 			log.Fatal("replication error %s, skip to next", err.Error())
 | |
| 			return ErrSkipEvent
 | |
| 		}
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	err := ReadEventFromReader(rb, f)
 | |
| 	if err != nil {
 | |
| 		b.Rollback()
 | |
| 		return err
 | |
| 	}
 | |
| 	return b.Commit()
 | |
| }
 | |
| 
 | |
| func (l *Nodb) ReplicateFromData(data []byte) error {
 | |
| 	rb := bytes.NewReader(data)
 | |
| 
 | |
| 	err := l.ReplicateFromReader(rb)
 | |
| 
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| func (l *Nodb) ReplicateFromBinLog(filePath string) error {
 | |
| 	f, err := os.Open(filePath)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	rb := bufio.NewReaderSize(f, 4096)
 | |
| 
 | |
| 	err = l.ReplicateFromReader(rb)
 | |
| 
 | |
| 	f.Close()
 | |
| 
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| // try to read events, if no events read, try to wait the new event singal until timeout seconds
 | |
| func (l *Nodb) ReadEventsToTimeout(info *BinLogAnchor, w io.Writer, timeout int) (n int, err error) {
 | |
| 	lastIndex := info.LogFileIndex
 | |
| 	lastPos := info.LogPos
 | |
| 
 | |
| 	n = 0
 | |
| 	if l.binlog == nil {
 | |
| 		//binlog not supported
 | |
| 		info.LogFileIndex = 0
 | |
| 		info.LogPos = 0
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	n, err = l.ReadEventsTo(info, w)
 | |
| 	if err == nil && info.LogFileIndex == lastIndex && info.LogPos == lastPos {
 | |
| 		//no events read
 | |
| 		select {
 | |
| 		case <-l.binlog.Wait():
 | |
| 		case <-time.After(time.Duration(timeout) * time.Second):
 | |
| 		}
 | |
| 		return l.ReadEventsTo(info, w)
 | |
| 	}
 | |
| 	return
 | |
| }
 | |
| 
 | |
| func (l *Nodb) ReadEventsTo(info *BinLogAnchor, w io.Writer) (n int, err error) {
 | |
| 	n = 0
 | |
| 	if l.binlog == nil {
 | |
| 		//binlog not supported
 | |
| 		info.LogFileIndex = 0
 | |
| 		info.LogPos = 0
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	index := info.LogFileIndex
 | |
| 	offset := info.LogPos
 | |
| 
 | |
| 	filePath := l.binlog.FormatLogFilePath(index)
 | |
| 
 | |
| 	var f *os.File
 | |
| 	f, err = os.Open(filePath)
 | |
| 	if os.IsNotExist(err) {
 | |
| 		lastIndex := l.binlog.LogFileIndex()
 | |
| 
 | |
| 		if index == lastIndex {
 | |
| 			//no binlog at all
 | |
| 			info.LogPos = 0
 | |
| 		} else {
 | |
| 			//slave binlog info had lost
 | |
| 			info.LogFileIndex = -1
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if err != nil {
 | |
| 		if os.IsNotExist(err) {
 | |
| 			err = nil
 | |
| 		}
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	defer f.Close()
 | |
| 
 | |
| 	var fileSize int64
 | |
| 	st, _ := f.Stat()
 | |
| 	fileSize = st.Size()
 | |
| 
 | |
| 	if fileSize == info.LogPos {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	if _, err = f.Seek(offset, os.SEEK_SET); err != nil {
 | |
| 		//may be invliad seek offset
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	var lastHead *BinLogHead = nil
 | |
| 
 | |
| 	head := &BinLogHead{}
 | |
| 
 | |
| 	batchNum := 0
 | |
| 
 | |
| 	for {
 | |
| 		if err = head.Read(f); err != nil {
 | |
| 			if err == io.EOF {
 | |
| 				//we will try to use next binlog
 | |
| 				if index < l.binlog.LogFileIndex() {
 | |
| 					info.LogFileIndex += 1
 | |
| 					info.LogPos = 0
 | |
| 				}
 | |
| 				err = nil
 | |
| 				return
 | |
| 			} else {
 | |
| 				return
 | |
| 			}
 | |
| 
 | |
| 		}
 | |
| 
 | |
| 		if lastHead == nil {
 | |
| 			lastHead = head
 | |
| 			batchNum++
 | |
| 		} else if !lastHead.InSameBatch(head) {
 | |
| 			lastHead = head
 | |
| 			batchNum++
 | |
| 			if batchNum > maxReplBatchNum || n > maxReplLogSize {
 | |
| 				return
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		if err = head.Write(w); err != nil {
 | |
| 			return
 | |
| 		}
 | |
| 
 | |
| 		if _, err = io.CopyN(w, f, int64(head.PayloadLen)); err != nil {
 | |
| 			return
 | |
| 		}
 | |
| 
 | |
| 		n += (head.Len() + int(head.PayloadLen))
 | |
| 		info.LogPos = info.LogPos + int64(head.Len()) + int64(head.PayloadLen)
 | |
| 	}
 | |
| 
 | |
| 	return
 | |
| }
 |