githaven-fork/vendor/github.com/couchbase/gomemcached/client/upr_event.go

404 lines
12 KiB
Go
Raw Normal View History

package memcached
import (
"encoding/binary"
"fmt"
"github.com/couchbase/gomemcached"
"math"
)
type SystemEventType int
const InvalidSysEvent SystemEventType = -1
const (
CollectionCreate SystemEventType = 0
CollectionDrop SystemEventType = iota
CollectionFlush SystemEventType = iota // KV did not implement
ScopeCreate SystemEventType = iota
ScopeDrop SystemEventType = iota
CollectionChanged SystemEventType = iota
)
type ScopeCreateEvent interface {
GetSystemEventName() (string, error)
GetScopeId() (uint32, error)
GetManifestId() (uint64, error)
}
type CollectionCreateEvent interface {
GetSystemEventName() (string, error)
GetScopeId() (uint32, error)
GetCollectionId() (uint32, error)
GetManifestId() (uint64, error)
GetMaxTTL() (uint32, error)
}
type CollectionDropEvent interface {
GetScopeId() (uint32, error)
GetCollectionId() (uint32, error)
GetManifestId() (uint64, error)
}
type ScopeDropEvent interface {
GetScopeId() (uint32, error)
GetManifestId() (uint64, error)
}
type CollectionChangedEvent interface {
GetCollectionId() (uint32, error)
GetManifestId() (uint64, error)
GetMaxTTL() (uint32, error)
}
var ErrorInvalidOp error = fmt.Errorf("Invalid Operation")
var ErrorInvalidVersion error = fmt.Errorf("Invalid version for parsing")
var ErrorValueTooShort error = fmt.Errorf("Value length is too short")
var ErrorNoMaxTTL error = fmt.Errorf("This event has no max TTL")
// UprEvent memcached events for UPR streams.
type UprEvent struct {
Opcode gomemcached.CommandCode // Type of event
Status gomemcached.Status // Response status
VBucket uint16 // VBucket this event applies to
DataType uint8 // data type
Opaque uint16 // 16 MSB of opaque
VBuuid uint64 // This field is set by downstream
Flags uint32 // Item flags
Expiry uint32 // Item expiration time
Key, Value []byte // Item key/value
OldValue []byte // TODO: TBD: old document value
Cas uint64 // CAS value of the item
Seqno uint64 // sequence number of the mutation
RevSeqno uint64 // rev sequence number : deletions
LockTime uint32 // Lock time
MetadataSize uint16 // Metadata size
SnapstartSeq uint64 // start sequence number of this snapshot
SnapendSeq uint64 // End sequence number of the snapshot
SnapshotType uint32 // 0: disk 1: memory
FailoverLog *FailoverLog // Failover log containing vvuid and sequnce number
Error error // Error value in case of a failure
ExtMeta []byte // Extended Metadata
AckSize uint32 // The number of bytes that can be Acked to DCP
SystemEvent SystemEventType // Only valid if IsSystemEvent() is true
SysEventVersion uint8 // Based on the version, the way Extra bytes is parsed is different
ValueLen int // Cache it to avoid len() calls for performance
CollectionId uint32 // Valid if Collection is in use
StreamId *uint16 // Nil if not in use
}
// FailoverLog containing vvuid and sequnce number
type FailoverLog [][2]uint64
// Containing a pair of vbno and the high seqno
type VBSeqnos [][2]uint64
func makeUprEvent(rq gomemcached.MCRequest, stream *UprStream, bytesReceivedFromDCP int) *UprEvent {
event := &UprEvent{
Opcode: rq.Opcode,
VBucket: stream.Vbucket,
VBuuid: stream.Vbuuid,
Value: rq.Body,
Cas: rq.Cas,
ExtMeta: rq.ExtMeta,
DataType: rq.DataType,
ValueLen: len(rq.Body),
SystemEvent: InvalidSysEvent,
CollectionId: math.MaxUint32,
}
event.PopulateFieldsBasedOnStreamType(rq, stream.StreamType)
// set AckSize for events that need to be acked to DCP,
// i.e., events with CommandCodes that need to be buffered in DCP
if _, ok := gomemcached.BufferedCommandCodeMap[rq.Opcode]; ok {
event.AckSize = uint32(bytesReceivedFromDCP)
}
// 16 LSBits are used by client library to encode vbucket number.
// 16 MSBits are left for application to multiplex on opaque value.
event.Opaque = appOpaque(rq.Opaque)
if len(rq.Extras) >= uprMutationExtraLen &&
event.Opcode == gomemcached.UPR_MUTATION {
event.Seqno = binary.BigEndian.Uint64(rq.Extras[:8])
event.RevSeqno = binary.BigEndian.Uint64(rq.Extras[8:16])
event.Flags = binary.BigEndian.Uint32(rq.Extras[16:20])
event.Expiry = binary.BigEndian.Uint32(rq.Extras[20:24])
event.LockTime = binary.BigEndian.Uint32(rq.Extras[24:28])
event.MetadataSize = binary.BigEndian.Uint16(rq.Extras[28:30])
} else if len(rq.Extras) >= uprDeletetionWithDeletionTimeExtraLen &&
event.Opcode == gomemcached.UPR_DELETION {
event.Seqno = binary.BigEndian.Uint64(rq.Extras[:8])
event.RevSeqno = binary.BigEndian.Uint64(rq.Extras[8:16])
event.Expiry = binary.BigEndian.Uint32(rq.Extras[16:20])
} else if len(rq.Extras) >= uprDeletetionExtraLen &&
event.Opcode == gomemcached.UPR_DELETION ||
event.Opcode == gomemcached.UPR_EXPIRATION {
event.Seqno = binary.BigEndian.Uint64(rq.Extras[:8])
event.RevSeqno = binary.BigEndian.Uint64(rq.Extras[8:16])
event.MetadataSize = binary.BigEndian.Uint16(rq.Extras[16:18])
} else if len(rq.Extras) >= uprSnapshotExtraLen &&
event.Opcode == gomemcached.UPR_SNAPSHOT {
event.SnapstartSeq = binary.BigEndian.Uint64(rq.Extras[:8])
event.SnapendSeq = binary.BigEndian.Uint64(rq.Extras[8:16])
event.SnapshotType = binary.BigEndian.Uint32(rq.Extras[16:20])
} else if event.IsSystemEvent() {
event.PopulateEvent(rq.Extras)
} else if event.IsSeqnoAdv() {
event.PopulateSeqnoAdv(rq.Extras)
} else if event.IsOsoSnapshot() {
event.PopulateOso(rq.Extras)
}
return event
}
func (event *UprEvent) PopulateFieldsBasedOnStreamType(rq gomemcached.MCRequest, streamType DcpStreamType) {
switch streamType {
case CollectionsStreamId:
for _, extra := range rq.FramingExtras {
streamId, streamIdErr := extra.GetStreamId()
if streamIdErr == nil {
event.StreamId = &streamId
}
}
// After parsing streamID, still need to populate regular collectionID
fallthrough
case CollectionsNonStreamId:
switch rq.Opcode {
// Only these will have CID encoded within the key
case gomemcached.UPR_MUTATION,
gomemcached.UPR_DELETION,
gomemcached.UPR_EXPIRATION:
uleb128 := Uleb128(rq.Key)
result, bytesShifted := uleb128.ToUint32(rq.Keylen)
event.CollectionId = result
event.Key = rq.Key[bytesShifted:]
default:
event.Key = rq.Key
}
case NonCollectionStream:
// Let default behavior be legacy stream type
fallthrough
default:
event.Key = rq.Key
}
}
func (event *UprEvent) String() string {
name := gomemcached.CommandNames[event.Opcode]
if name == "" {
name = fmt.Sprintf("#%d", event.Opcode)
}
return name
}
func (event *UprEvent) IsSnappyDataType() bool {
return event.Opcode == gomemcached.UPR_MUTATION && (event.DataType&SnappyDataType > 0)
}
func (event *UprEvent) IsCollectionType() bool {
return event.IsSystemEvent() || event.CollectionId <= math.MaxUint32
}
func (event *UprEvent) IsSystemEvent() bool {
return event.Opcode == gomemcached.DCP_SYSTEM_EVENT
}
func (event *UprEvent) IsSeqnoAdv() bool {
return event.Opcode == gomemcached.DCP_SEQNO_ADV
}
func (event *UprEvent) IsOsoSnapshot() bool {
return event.Opcode == gomemcached.DCP_OSO_SNAPSHOT
}
func (event *UprEvent) PopulateEvent(extras []byte) {
if len(extras) < dcpSystemEventExtraLen {
// Wrong length, don't parse
return
}
event.Seqno = binary.BigEndian.Uint64(extras[:8])
event.SystemEvent = SystemEventType(binary.BigEndian.Uint32(extras[8:12]))
var versionTemp uint16 = binary.BigEndian.Uint16(extras[12:14])
event.SysEventVersion = uint8(versionTemp >> 8)
}
func (event *UprEvent) PopulateSeqnoAdv(extras []byte) {
if len(extras) < dcpSeqnoAdvExtraLen {
// Wrong length, don't parse
return
}
event.Seqno = binary.BigEndian.Uint64(extras[:8])
}
func (event *UprEvent) PopulateOso(extras []byte) {
if len(extras) < dcpOsoExtraLen {
// Wrong length, don't parse
return
}
event.Flags = binary.BigEndian.Uint32(extras[:4])
}
func (event *UprEvent) GetSystemEventName() (string, error) {
switch event.SystemEvent {
case CollectionCreate:
fallthrough
case ScopeCreate:
return string(event.Key), nil
default:
return "", ErrorInvalidOp
}
}
func (event *UprEvent) GetManifestId() (uint64, error) {
switch event.SystemEvent {
// Version 0 only checks
case CollectionChanged:
fallthrough
case ScopeDrop:
fallthrough
case ScopeCreate:
fallthrough
case CollectionDrop:
if event.SysEventVersion > 0 {
return 0, ErrorInvalidVersion
}
fallthrough
case CollectionCreate:
// CollectionCreate supports version 1
if event.SysEventVersion > 1 {
return 0, ErrorInvalidVersion
}
if event.ValueLen < 8 {
return 0, ErrorValueTooShort
}
return binary.BigEndian.Uint64(event.Value[0:8]), nil
default:
return 0, ErrorInvalidOp
}
}
func (event *UprEvent) GetCollectionId() (uint32, error) {
switch event.SystemEvent {
case CollectionDrop:
if event.SysEventVersion > 0 {
return 0, ErrorInvalidVersion
}
fallthrough
case CollectionCreate:
if event.SysEventVersion > 1 {
return 0, ErrorInvalidVersion
}
if event.ValueLen < 16 {
return 0, ErrorValueTooShort
}
return binary.BigEndian.Uint32(event.Value[12:16]), nil
case CollectionChanged:
if event.SysEventVersion > 0 {
return 0, ErrorInvalidVersion
}
if event.ValueLen < 12 {
return 0, ErrorValueTooShort
}
return binary.BigEndian.Uint32(event.Value[8:12]), nil
default:
return 0, ErrorInvalidOp
}
}
func (event *UprEvent) GetScopeId() (uint32, error) {
switch event.SystemEvent {
// version 0 checks
case ScopeCreate:
fallthrough
case ScopeDrop:
fallthrough
case CollectionDrop:
if event.SysEventVersion > 0 {
return 0, ErrorInvalidVersion
}
fallthrough
case CollectionCreate:
// CollectionCreate could be either 0 or 1
if event.SysEventVersion > 1 {
return 0, ErrorInvalidVersion
}
if event.ValueLen < 12 {
return 0, ErrorValueTooShort
}
return binary.BigEndian.Uint32(event.Value[8:12]), nil
default:
return 0, ErrorInvalidOp
}
}
func (event *UprEvent) GetMaxTTL() (uint32, error) {
switch event.SystemEvent {
case CollectionCreate:
if event.SysEventVersion < 1 {
return 0, ErrorNoMaxTTL
}
if event.ValueLen < 20 {
return 0, ErrorValueTooShort
}
return binary.BigEndian.Uint32(event.Value[16:20]), nil
case CollectionChanged:
if event.SysEventVersion > 0 {
return 0, ErrorInvalidVersion
}
if event.ValueLen < 16 {
return 0, ErrorValueTooShort
}
return binary.BigEndian.Uint32(event.Value[12:16]), nil
default:
return 0, ErrorInvalidOp
}
}
// Only if error is nil:
// Returns true if event states oso begins
// Return false if event states oso ends
func (event *UprEvent) GetOsoBegin() (bool, error) {
if !event.IsOsoSnapshot() {
return false, ErrorInvalidOp
}
if event.Flags == 1 {
return true, nil
} else if event.Flags == 2 {
return false, nil
} else {
return false, ErrorInvalidOp
}
}
type Uleb128 []byte
func (u Uleb128) ToUint32(cachedLen int) (result uint32, bytesShifted int) {
var shift uint = 0
for curByte := 0; curByte < cachedLen; curByte++ {
oneByte := u[curByte]
last7Bits := 0x7f & oneByte
result |= uint32(last7Bits) << shift
bytesShifted++
if oneByte&0x80 == 0 {
break
}
shift += 7
}
return
}