384 lines
		
	
	
		
			9.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			384 lines
		
	
	
		
			9.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // The `fwd` package provides a buffered reader
 | |
| // and writer. Each has methods that help improve
 | |
| // the encoding/decoding performance of some binary
 | |
| // protocols.
 | |
| //
 | |
| // The `fwd.Writer` and `fwd.Reader` type provide similar
 | |
| // functionality to their counterparts in `bufio`, plus
 | |
| // a few extra utility methods that simplify read-ahead
 | |
| // and write-ahead. I wrote this package to improve serialization
 | |
| // performance for http://github.com/tinylib/msgp,
 | |
| // where it provided about a 2x speedup over `bufio` for certain
 | |
| // workloads. However, care must be taken to understand the semantics of the
 | |
| // extra methods provided by this package, as they allow
 | |
| // the user to access and manipulate the buffer memory
 | |
| // directly.
 | |
| //
 | |
| // The extra methods for `fwd.Reader` are `Peek`, `Skip`
 | |
| // and `Next`. `(*fwd.Reader).Peek`, unlike `(*bufio.Reader).Peek`,
 | |
| // will re-allocate the read buffer in order to accommodate arbitrarily
 | |
| // large read-ahead. `(*fwd.Reader).Skip` skips the next `n` bytes
 | |
| // in the stream, and uses the `io.Seeker` interface if the underlying
 | |
| // stream implements it. `(*fwd.Reader).Next` returns a slice pointing
 | |
| // to the next `n` bytes in the read buffer (like `Peek`), but also
 | |
| // increments the read position. This allows users to process streams
 | |
| // in arbitrary block sizes without having to manage appropriately-sized
 | |
| // slices. Additionally, obviating the need to copy the data from the
 | |
| // buffer to another location in memory can improve performance dramatically
 | |
| // in CPU-bound applications.
 | |
| //
 | |
| // `fwd.Writer` only has one extra method, which is `(*fwd.Writer).Next`, which
 | |
| // returns a slice pointing to the next `n` bytes of the writer, and increments
 | |
| // the write position by the length of the returned slice. This allows users
 | |
| // to write directly to the end of the buffer.
 | |
| //
 | |
| package fwd
 | |
| 
 | |
| import "io"
 | |
| 
 | |
| const (
 | |
| 	// DefaultReaderSize is the default size of the read buffer
 | |
| 	DefaultReaderSize = 2048
 | |
| 
 | |
| 	// minimum read buffer; straight from bufio
 | |
| 	minReaderSize = 16
 | |
| )
 | |
| 
 | |
| // NewReader returns a new *Reader that reads from 'r'
 | |
| func NewReader(r io.Reader) *Reader {
 | |
| 	return NewReaderSize(r, DefaultReaderSize)
 | |
| }
 | |
| 
 | |
| // NewReaderSize returns a new *Reader that
 | |
| // reads from 'r' and has a buffer size 'n'
 | |
| func NewReaderSize(r io.Reader, n int) *Reader {
 | |
| 	rd := &Reader{
 | |
| 		r:    r,
 | |
| 		data: make([]byte, 0, max(minReaderSize, n)),
 | |
| 	}
 | |
| 	if s, ok := r.(io.Seeker); ok {
 | |
| 		rd.rs = s
 | |
| 	}
 | |
| 	return rd
 | |
| }
 | |
| 
 | |
| // Reader is a buffered look-ahead reader
 | |
| type Reader struct {
 | |
| 	r io.Reader // underlying reader
 | |
| 
 | |
| 	// data[n:len(data)] is buffered data; data[len(data):cap(data)] is free buffer space
 | |
| 	data  []byte // data
 | |
| 	n     int    // read offset
 | |
| 	state error  // last read error
 | |
| 
 | |
| 	// if the reader past to NewReader was
 | |
| 	// also an io.Seeker, this is non-nil
 | |
| 	rs io.Seeker
 | |
| }
 | |
| 
 | |
| // Reset resets the underlying reader
 | |
| // and the read buffer.
 | |
| func (r *Reader) Reset(rd io.Reader) {
 | |
| 	r.r = rd
 | |
| 	r.data = r.data[0:0]
 | |
| 	r.n = 0
 | |
| 	r.state = nil
 | |
| 	if s, ok := rd.(io.Seeker); ok {
 | |
| 		r.rs = s
 | |
| 	} else {
 | |
| 		r.rs = nil
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // more() does one read on the underlying reader
 | |
| func (r *Reader) more() {
 | |
| 	// move data backwards so that
 | |
| 	// the read offset is 0; this way
 | |
| 	// we can supply the maximum number of
 | |
| 	// bytes to the reader
 | |
| 	if r.n != 0 {
 | |
| 		if r.n < len(r.data) {
 | |
| 			r.data = r.data[:copy(r.data[0:], r.data[r.n:])]
 | |
| 		} else {
 | |
| 			r.data = r.data[:0]
 | |
| 		}
 | |
| 		r.n = 0
 | |
| 	}
 | |
| 	var a int
 | |
| 	a, r.state = r.r.Read(r.data[len(r.data):cap(r.data)])
 | |
| 	if a == 0 && r.state == nil {
 | |
| 		r.state = io.ErrNoProgress
 | |
| 		return
 | |
| 	} else if a > 0 && r.state == io.EOF {
 | |
| 		// discard the io.EOF if we read more than 0 bytes.
 | |
| 		// the next call to Read should return io.EOF again.
 | |
| 		r.state = nil
 | |
| 	}
 | |
| 	r.data = r.data[:len(r.data)+a]
 | |
| }
 | |
| 
 | |
| // pop error
 | |
| func (r *Reader) err() (e error) {
 | |
| 	e, r.state = r.state, nil
 | |
| 	return
 | |
| }
 | |
| 
 | |
| // pop error; EOF -> io.ErrUnexpectedEOF
 | |
| func (r *Reader) noEOF() (e error) {
 | |
| 	e, r.state = r.state, nil
 | |
| 	if e == io.EOF {
 | |
| 		e = io.ErrUnexpectedEOF
 | |
| 	}
 | |
| 	return
 | |
| }
 | |
| 
 | |
| // buffered bytes
 | |
| func (r *Reader) buffered() int { return len(r.data) - r.n }
 | |
| 
 | |
| // Buffered returns the number of bytes currently in the buffer
 | |
| func (r *Reader) Buffered() int { return len(r.data) - r.n }
 | |
| 
 | |
| // BufferSize returns the total size of the buffer
 | |
| func (r *Reader) BufferSize() int { return cap(r.data) }
 | |
| 
 | |
| // Peek returns the next 'n' buffered bytes,
 | |
| // reading from the underlying reader if necessary.
 | |
| // It will only return a slice shorter than 'n' bytes
 | |
| // if it also returns an error. Peek does not advance
 | |
| // the reader. EOF errors are *not* returned as
 | |
| // io.ErrUnexpectedEOF.
 | |
| func (r *Reader) Peek(n int) ([]byte, error) {
 | |
| 	// in the degenerate case,
 | |
| 	// we may need to realloc
 | |
| 	// (the caller asked for more
 | |
| 	// bytes than the size of the buffer)
 | |
| 	if cap(r.data) < n {
 | |
| 		old := r.data[r.n:]
 | |
| 		r.data = make([]byte, n+r.buffered())
 | |
| 		r.data = r.data[:copy(r.data, old)]
 | |
| 		r.n = 0
 | |
| 	}
 | |
| 
 | |
| 	// keep filling until
 | |
| 	// we hit an error or
 | |
| 	// read enough bytes
 | |
| 	for r.buffered() < n && r.state == nil {
 | |
| 		r.more()
 | |
| 	}
 | |
| 
 | |
| 	// we must have hit an error
 | |
| 	if r.buffered() < n {
 | |
| 		return r.data[r.n:], r.err()
 | |
| 	}
 | |
| 
 | |
| 	return r.data[r.n : r.n+n], nil
 | |
| }
 | |
| 
 | |
| // Skip moves the reader forward 'n' bytes.
 | |
| // Returns the number of bytes skipped and any
 | |
| // errors encountered. It is analogous to Seek(n, 1).
 | |
| // If the underlying reader implements io.Seeker, then
 | |
| // that method will be used to skip forward.
 | |
| //
 | |
| // If the reader encounters
 | |
| // an EOF before skipping 'n' bytes, it
 | |
| // returns io.ErrUnexpectedEOF. If the
 | |
| // underlying reader implements io.Seeker, then
 | |
| // those rules apply instead. (Many implementations
 | |
| // will not return `io.EOF` until the next call
 | |
| // to Read.)
 | |
| func (r *Reader) Skip(n int) (int, error) {
 | |
| 
 | |
| 	// fast path
 | |
| 	if r.buffered() >= n {
 | |
| 		r.n += n
 | |
| 		return n, nil
 | |
| 	}
 | |
| 
 | |
| 	// use seeker implementation
 | |
| 	// if we can
 | |
| 	if r.rs != nil {
 | |
| 		return r.skipSeek(n)
 | |
| 	}
 | |
| 
 | |
| 	// loop on filling
 | |
| 	// and then erasing
 | |
| 	o := n
 | |
| 	for r.buffered() < n && r.state == nil {
 | |
| 		r.more()
 | |
| 		// we can skip forward
 | |
| 		// up to r.buffered() bytes
 | |
| 		step := min(r.buffered(), n)
 | |
| 		r.n += step
 | |
| 		n -= step
 | |
| 	}
 | |
| 	// at this point, n should be
 | |
| 	// 0 if everything went smoothly
 | |
| 	return o - n, r.noEOF()
 | |
| }
 | |
| 
 | |
| // Next returns the next 'n' bytes in the stream.
 | |
| // Unlike Peek, Next advances the reader position.
 | |
| // The returned bytes point to the same
 | |
| // data as the buffer, so the slice is
 | |
| // only valid until the next reader method call.
 | |
| // An EOF is considered an unexpected error.
 | |
| // If an the returned slice is less than the
 | |
| // length asked for, an error will be returned,
 | |
| // and the reader position will not be incremented.
 | |
| func (r *Reader) Next(n int) ([]byte, error) {
 | |
| 
 | |
| 	// in case the buffer is too small
 | |
| 	if cap(r.data) < n {
 | |
| 		old := r.data[r.n:]
 | |
| 		r.data = make([]byte, n+r.buffered())
 | |
| 		r.data = r.data[:copy(r.data, old)]
 | |
| 		r.n = 0
 | |
| 	}
 | |
| 
 | |
| 	// fill at least 'n' bytes
 | |
| 	for r.buffered() < n && r.state == nil {
 | |
| 		r.more()
 | |
| 	}
 | |
| 
 | |
| 	if r.buffered() < n {
 | |
| 		return r.data[r.n:], r.noEOF()
 | |
| 	}
 | |
| 	out := r.data[r.n : r.n+n]
 | |
| 	r.n += n
 | |
| 	return out, nil
 | |
| }
 | |
| 
 | |
| // skipSeek uses the io.Seeker to seek forward.
 | |
| // only call this function when n > r.buffered()
 | |
| func (r *Reader) skipSeek(n int) (int, error) {
 | |
| 	o := r.buffered()
 | |
| 	// first, clear buffer
 | |
| 	n -= o
 | |
| 	r.n = 0
 | |
| 	r.data = r.data[:0]
 | |
| 
 | |
| 	// then seek forward remaning bytes
 | |
| 	i, err := r.rs.Seek(int64(n), 1)
 | |
| 	return int(i) + o, err
 | |
| }
 | |
| 
 | |
| // Read implements `io.Reader`
 | |
| func (r *Reader) Read(b []byte) (int, error) {
 | |
| 	// if we have data in the buffer, just
 | |
| 	// return that.
 | |
| 	if r.buffered() != 0 {
 | |
| 		x := copy(b, r.data[r.n:])
 | |
| 		r.n += x
 | |
| 		return x, nil
 | |
| 	}
 | |
| 	var n int
 | |
| 	// we have no buffered data; determine
 | |
| 	// whether or not to buffer or call
 | |
| 	// the underlying reader directly
 | |
| 	if len(b) >= cap(r.data) {
 | |
| 		n, r.state = r.r.Read(b)
 | |
| 	} else {
 | |
| 		r.more()
 | |
| 		n = copy(b, r.data)
 | |
| 		r.n = n
 | |
| 	}
 | |
| 	if n == 0 {
 | |
| 		return 0, r.err()
 | |
| 	}
 | |
| 	return n, nil
 | |
| }
 | |
| 
 | |
| // ReadFull attempts to read len(b) bytes into
 | |
| // 'b'. It returns the number of bytes read into
 | |
| // 'b', and an error if it does not return len(b).
 | |
| // EOF is considered an unexpected error.
 | |
| func (r *Reader) ReadFull(b []byte) (int, error) {
 | |
| 	var n int  // read into b
 | |
| 	var nn int // scratch
 | |
| 	l := len(b)
 | |
| 	// either read buffered data,
 | |
| 	// or read directly for the underlying
 | |
| 	// buffer, or fetch more buffered data.
 | |
| 	for n < l && r.state == nil {
 | |
| 		if r.buffered() != 0 {
 | |
| 			nn = copy(b[n:], r.data[r.n:])
 | |
| 			n += nn
 | |
| 			r.n += nn
 | |
| 		} else if l-n > cap(r.data) {
 | |
| 			nn, r.state = r.r.Read(b[n:])
 | |
| 			n += nn
 | |
| 		} else {
 | |
| 			r.more()
 | |
| 		}
 | |
| 	}
 | |
| 	if n < l {
 | |
| 		return n, r.noEOF()
 | |
| 	}
 | |
| 	return n, nil
 | |
| }
 | |
| 
 | |
| // ReadByte implements `io.ByteReader`
 | |
| func (r *Reader) ReadByte() (byte, error) {
 | |
| 	for r.buffered() < 1 && r.state == nil {
 | |
| 		r.more()
 | |
| 	}
 | |
| 	if r.buffered() < 1 {
 | |
| 		return 0, r.err()
 | |
| 	}
 | |
| 	b := r.data[r.n]
 | |
| 	r.n++
 | |
| 	return b, nil
 | |
| }
 | |
| 
 | |
| // WriteTo implements `io.WriterTo`
 | |
| func (r *Reader) WriteTo(w io.Writer) (int64, error) {
 | |
| 	var (
 | |
| 		i   int64
 | |
| 		ii  int
 | |
| 		err error
 | |
| 	)
 | |
| 	// first, clear buffer
 | |
| 	if r.buffered() > 0 {
 | |
| 		ii, err = w.Write(r.data[r.n:])
 | |
| 		i += int64(ii)
 | |
| 		if err != nil {
 | |
| 			return i, err
 | |
| 		}
 | |
| 		r.data = r.data[0:0]
 | |
| 		r.n = 0
 | |
| 	}
 | |
| 	for r.state == nil {
 | |
| 		// here we just do
 | |
| 		// 1:1 reads and writes
 | |
| 		r.more()
 | |
| 		if r.buffered() > 0 {
 | |
| 			ii, err = w.Write(r.data)
 | |
| 			i += int64(ii)
 | |
| 			if err != nil {
 | |
| 				return i, err
 | |
| 			}
 | |
| 			r.data = r.data[0:0]
 | |
| 			r.n = 0
 | |
| 		}
 | |
| 	}
 | |
| 	if r.state != io.EOF {
 | |
| 		return i, r.err()
 | |
| 	}
 | |
| 	return i, nil
 | |
| }
 | |
| 
 | |
| func min(a int, b int) int {
 | |
| 	if a < b {
 | |
| 		return a
 | |
| 	}
 | |
| 	return b
 | |
| }
 | |
| 
 | |
| func max(a int, b int) int {
 | |
| 	if a < b {
 | |
| 		return b
 | |
| 	}
 | |
| 	return a
 | |
| }
 |