forked from Shiloh/githaven
313 lines
5.4 KiB
Go
313 lines
5.4 KiB
Go
|
package nodb
|
||
|
|
||
|
import (
|
||
|
"bufio"
|
||
|
"bytes"
|
||
|
"errors"
|
||
|
"io"
|
||
|
"os"
|
||
|
"time"
|
||
|
|
||
|
"github.com/lunny/log"
|
||
|
"github.com/lunny/nodb/store/driver"
|
||
|
)
|
||
|
|
||
|
const (
|
||
|
maxReplBatchNum = 100
|
||
|
maxReplLogSize = 1 * 1024 * 1024
|
||
|
)
|
||
|
|
||
|
var (
|
||
|
ErrSkipEvent = errors.New("skip to next event")
|
||
|
)
|
||
|
|
||
|
var (
|
||
|
errInvalidBinLogEvent = errors.New("invalid binglog event")
|
||
|
errInvalidBinLogFile = errors.New("invalid binlog file")
|
||
|
)
|
||
|
|
||
|
type replBatch struct {
|
||
|
wb driver.IWriteBatch
|
||
|
events [][]byte
|
||
|
l *Nodb
|
||
|
|
||
|
lastHead *BinLogHead
|
||
|
}
|
||
|
|
||
|
func (b *replBatch) Commit() error {
|
||
|
b.l.commitLock.Lock()
|
||
|
defer b.l.commitLock.Unlock()
|
||
|
|
||
|
err := b.wb.Commit()
|
||
|
if err != nil {
|
||
|
b.Rollback()
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
if b.l.binlog != nil {
|
||
|
if err = b.l.binlog.Log(b.events...); err != nil {
|
||
|
b.Rollback()
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
|
||
|
b.events = [][]byte{}
|
||
|
b.lastHead = nil
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (b *replBatch) Rollback() error {
|
||
|
b.wb.Rollback()
|
||
|
b.events = [][]byte{}
|
||
|
b.lastHead = nil
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (l *Nodb) replicateEvent(b *replBatch, event []byte) error {
|
||
|
if len(event) == 0 {
|
||
|
return errInvalidBinLogEvent
|
||
|
}
|
||
|
|
||
|
b.events = append(b.events, event)
|
||
|
|
||
|
logType := uint8(event[0])
|
||
|
switch logType {
|
||
|
case BinLogTypePut:
|
||
|
return l.replicatePutEvent(b, event)
|
||
|
case BinLogTypeDeletion:
|
||
|
return l.replicateDeleteEvent(b, event)
|
||
|
default:
|
||
|
return errInvalidBinLogEvent
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (l *Nodb) replicatePutEvent(b *replBatch, event []byte) error {
|
||
|
key, value, err := decodeBinLogPut(event)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
b.wb.Put(key, value)
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (l *Nodb) replicateDeleteEvent(b *replBatch, event []byte) error {
|
||
|
key, err := decodeBinLogDelete(event)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
b.wb.Delete(key)
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func ReadEventFromReader(rb io.Reader, f func(head *BinLogHead, event []byte) error) error {
|
||
|
head := &BinLogHead{}
|
||
|
var err error
|
||
|
|
||
|
for {
|
||
|
if err = head.Read(rb); err != nil {
|
||
|
if err == io.EOF {
|
||
|
break
|
||
|
} else {
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
|
||
|
var dataBuf bytes.Buffer
|
||
|
|
||
|
if _, err = io.CopyN(&dataBuf, rb, int64(head.PayloadLen)); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
err = f(head, dataBuf.Bytes())
|
||
|
if err != nil && err != ErrSkipEvent {
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (l *Nodb) ReplicateFromReader(rb io.Reader) error {
|
||
|
b := new(replBatch)
|
||
|
|
||
|
b.wb = l.ldb.NewWriteBatch()
|
||
|
b.l = l
|
||
|
|
||
|
f := func(head *BinLogHead, event []byte) error {
|
||
|
if b.lastHead == nil {
|
||
|
b.lastHead = head
|
||
|
} else if !b.lastHead.InSameBatch(head) {
|
||
|
if err := b.Commit(); err != nil {
|
||
|
log.Fatal("replication error %s, skip to next", err.Error())
|
||
|
return ErrSkipEvent
|
||
|
}
|
||
|
b.lastHead = head
|
||
|
}
|
||
|
|
||
|
err := l.replicateEvent(b, event)
|
||
|
if err != nil {
|
||
|
log.Fatal("replication error %s, skip to next", err.Error())
|
||
|
return ErrSkipEvent
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
err := ReadEventFromReader(rb, f)
|
||
|
if err != nil {
|
||
|
b.Rollback()
|
||
|
return err
|
||
|
}
|
||
|
return b.Commit()
|
||
|
}
|
||
|
|
||
|
func (l *Nodb) ReplicateFromData(data []byte) error {
|
||
|
rb := bytes.NewReader(data)
|
||
|
|
||
|
err := l.ReplicateFromReader(rb)
|
||
|
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
func (l *Nodb) ReplicateFromBinLog(filePath string) error {
|
||
|
f, err := os.Open(filePath)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
rb := bufio.NewReaderSize(f, 4096)
|
||
|
|
||
|
err = l.ReplicateFromReader(rb)
|
||
|
|
||
|
f.Close()
|
||
|
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
// try to read events, if no events read, try to wait the new event singal until timeout seconds
|
||
|
func (l *Nodb) ReadEventsToTimeout(info *BinLogAnchor, w io.Writer, timeout int) (n int, err error) {
|
||
|
lastIndex := info.LogFileIndex
|
||
|
lastPos := info.LogPos
|
||
|
|
||
|
n = 0
|
||
|
if l.binlog == nil {
|
||
|
//binlog not supported
|
||
|
info.LogFileIndex = 0
|
||
|
info.LogPos = 0
|
||
|
return
|
||
|
}
|
||
|
|
||
|
n, err = l.ReadEventsTo(info, w)
|
||
|
if err == nil && info.LogFileIndex == lastIndex && info.LogPos == lastPos {
|
||
|
//no events read
|
||
|
select {
|
||
|
case <-l.binlog.Wait():
|
||
|
case <-time.After(time.Duration(timeout) * time.Second):
|
||
|
}
|
||
|
return l.ReadEventsTo(info, w)
|
||
|
}
|
||
|
return
|
||
|
}
|
||
|
|
||
|
func (l *Nodb) ReadEventsTo(info *BinLogAnchor, w io.Writer) (n int, err error) {
|
||
|
n = 0
|
||
|
if l.binlog == nil {
|
||
|
//binlog not supported
|
||
|
info.LogFileIndex = 0
|
||
|
info.LogPos = 0
|
||
|
return
|
||
|
}
|
||
|
|
||
|
index := info.LogFileIndex
|
||
|
offset := info.LogPos
|
||
|
|
||
|
filePath := l.binlog.FormatLogFilePath(index)
|
||
|
|
||
|
var f *os.File
|
||
|
f, err = os.Open(filePath)
|
||
|
if os.IsNotExist(err) {
|
||
|
lastIndex := l.binlog.LogFileIndex()
|
||
|
|
||
|
if index == lastIndex {
|
||
|
//no binlog at all
|
||
|
info.LogPos = 0
|
||
|
} else {
|
||
|
//slave binlog info had lost
|
||
|
info.LogFileIndex = -1
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if err != nil {
|
||
|
if os.IsNotExist(err) {
|
||
|
err = nil
|
||
|
}
|
||
|
return
|
||
|
}
|
||
|
|
||
|
defer f.Close()
|
||
|
|
||
|
var fileSize int64
|
||
|
st, _ := f.Stat()
|
||
|
fileSize = st.Size()
|
||
|
|
||
|
if fileSize == info.LogPos {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
if _, err = f.Seek(offset, os.SEEK_SET); err != nil {
|
||
|
//may be invliad seek offset
|
||
|
return
|
||
|
}
|
||
|
|
||
|
var lastHead *BinLogHead = nil
|
||
|
|
||
|
head := &BinLogHead{}
|
||
|
|
||
|
batchNum := 0
|
||
|
|
||
|
for {
|
||
|
if err = head.Read(f); err != nil {
|
||
|
if err == io.EOF {
|
||
|
//we will try to use next binlog
|
||
|
if index < l.binlog.LogFileIndex() {
|
||
|
info.LogFileIndex += 1
|
||
|
info.LogPos = 0
|
||
|
}
|
||
|
err = nil
|
||
|
return
|
||
|
} else {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
}
|
||
|
|
||
|
if lastHead == nil {
|
||
|
lastHead = head
|
||
|
batchNum++
|
||
|
} else if !lastHead.InSameBatch(head) {
|
||
|
lastHead = head
|
||
|
batchNum++
|
||
|
if batchNum > maxReplBatchNum || n > maxReplLogSize {
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if err = head.Write(w); err != nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
if _, err = io.CopyN(w, f, int64(head.PayloadLen)); err != nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
n += (head.Len() + int(head.PayloadLen))
|
||
|
info.LogPos = info.LogPos + int64(head.Len()) + int64(head.PayloadLen)
|
||
|
}
|
||
|
|
||
|
return
|
||
|
}
|