From a89592d4abfef01e68e3c53a3cdb3846b03abd2b Mon Sep 17 00:00:00 2001
From: Ethan Koenig <ethantkoenig@gmail.com>
Date: Mon, 5 Feb 2018 10:29:17 -0800
Subject: [PATCH] Reduce repo indexer disk usage (#3452)

---
 models/issue_indexer.go                       |   4 +-
 models/repo_indexer.go                        |  16 +-
 modules/indexer/indexer.go                    |  59 +++---
 modules/indexer/issue.go                      |  59 +++---
 modules/indexer/repo.go                       |  76 ++++----
 .../bleve/analysis/token/unique/unique.go     |  53 ++++++
 .../ethantkoenig/rupture/Gopkg.lock           | 173 ++++++++++++++++++
 .../ethantkoenig/rupture/Gopkg.toml           |  34 ++++
 .../github.com/ethantkoenig/rupture/LICENSE   |  21 +++
 .../github.com/ethantkoenig/rupture/README.md |  13 ++
 .../ethantkoenig/rupture/flushing_batch.go    |  67 +++++++
 .../ethantkoenig/rupture/metadata.go          |  68 +++++++
 .../ethantkoenig/rupture/sharded_index.go     | 146 +++++++++++++++
 vendor/vendor.json                            |  12 ++
 14 files changed, 704 insertions(+), 97 deletions(-)
 create mode 100644 vendor/github.com/blevesearch/bleve/analysis/token/unique/unique.go
 create mode 100644 vendor/github.com/ethantkoenig/rupture/Gopkg.lock
 create mode 100644 vendor/github.com/ethantkoenig/rupture/Gopkg.toml
 create mode 100644 vendor/github.com/ethantkoenig/rupture/LICENSE
 create mode 100644 vendor/github.com/ethantkoenig/rupture/README.md
 create mode 100644 vendor/github.com/ethantkoenig/rupture/flushing_batch.go
 create mode 100644 vendor/github.com/ethantkoenig/rupture/metadata.go
 create mode 100644 vendor/github.com/ethantkoenig/rupture/sharded_index.go

diff --git a/models/issue_indexer.go b/models/issue_indexer.go
index 922b66f95..b94ba5f2d 100644
--- a/models/issue_indexer.go
+++ b/models/issue_indexer.go
@@ -53,7 +53,7 @@ func populateIssueIndexer() error {
 				return err
 			}
 			for _, issue := range issues {
-				if err := batch.Add(issue.update()); err != nil {
+				if err := issue.update().AddToFlushingBatch(batch); err != nil {
 					return err
 				}
 			}
@@ -78,7 +78,7 @@ func processIssueIndexerUpdateQueue() {
 		issue, err := GetIssueByID(issueID)
 		if err != nil {
 			log.Error(4, "GetIssueByID: %v", err)
-		} else if err = batch.Add(issue.update()); err != nil {
+		} else if err = issue.update().AddToFlushingBatch(batch); err != nil {
 			log.Error(4, "IssueIndexer: %v", err)
 		}
 	}
diff --git a/models/repo_indexer.go b/models/repo_indexer.go
index fee478479..ecd629587 100644
--- a/models/repo_indexer.go
+++ b/models/repo_indexer.go
@@ -14,6 +14,8 @@ import (
 	"code.gitea.io/gitea/modules/indexer"
 	"code.gitea.io/gitea/modules/log"
 	"code.gitea.io/gitea/modules/setting"
+
+	"github.com/ethantkoenig/rupture"
 )
 
 // RepoIndexerStatus status of a repo's entry in the repo indexer
@@ -187,7 +189,7 @@ func getRepoChanges(repo *Repository, revision string) (*repoChanges, error) {
 	return nonGenesisChanges(repo, revision)
 }
 
-func addUpdate(update fileUpdate, repo *Repository, batch *indexer.Batch) error {
+func addUpdate(update fileUpdate, repo *Repository, batch rupture.FlushingBatch) error {
 	stdout, err := git.NewCommand("cat-file", "-s", update.BlobSha).
 		RunInDir(repo.RepoPath())
 	if err != nil {
@@ -206,24 +208,26 @@ func addUpdate(update fileUpdate, repo *Repository, batch *indexer.Batch) error
 	} else if !base.IsTextFile(fileContents) {
 		return nil
 	}
-	return batch.Add(indexer.RepoIndexerUpdate{
+	indexerUpdate := indexer.RepoIndexerUpdate{
 		Filepath: update.Filename,
 		Op:       indexer.RepoIndexerOpUpdate,
 		Data: &indexer.RepoIndexerData{
 			RepoID:  repo.ID,
 			Content: string(fileContents),
 		},
-	})
+	}
+	return indexerUpdate.AddToFlushingBatch(batch)
 }
 
-func addDelete(filename string, repo *Repository, batch *indexer.Batch) error {
-	return batch.Add(indexer.RepoIndexerUpdate{
+func addDelete(filename string, repo *Repository, batch rupture.FlushingBatch) error {
+	indexerUpdate := indexer.RepoIndexerUpdate{
 		Filepath: filename,
 		Op:       indexer.RepoIndexerOpDelete,
 		Data: &indexer.RepoIndexerData{
 			RepoID: repo.ID,
 		},
-	})
+	}
+	return indexerUpdate.AddToFlushingBatch(batch)
 }
 
 // parseGitLsTreeOutput parses the output of a `git ls-tree -r --full-name` command
diff --git a/modules/indexer/indexer.go b/modules/indexer/indexer.go
index d5bdd51f9..9e12a7f50 100644
--- a/modules/indexer/indexer.go
+++ b/modules/indexer/indexer.go
@@ -6,12 +6,17 @@ package indexer
 
 import (
 	"fmt"
+	"os"
 	"strconv"
 
+	"code.gitea.io/gitea/modules/setting"
+
 	"github.com/blevesearch/bleve"
 	"github.com/blevesearch/bleve/analysis/token/unicodenorm"
+	"github.com/blevesearch/bleve/index/upsidedown"
 	"github.com/blevesearch/bleve/mapping"
 	"github.com/blevesearch/bleve/search/query"
+	"github.com/ethantkoenig/rupture"
 )
 
 // indexerID a bleve-compatible unique identifier for an integer id
@@ -53,40 +58,36 @@ func addUnicodeNormalizeTokenFilter(m *mapping.IndexMappingImpl) error {
 	})
 }
 
-// Update represents an update to an indexer
-type Update interface {
-	addToBatch(batch *bleve.Batch) error
-}
-
 const maxBatchSize = 16
 
-// Batch batch of indexer updates that automatically flushes once it
-// reaches a certain size
-type Batch struct {
-	batch *bleve.Batch
-	index bleve.Index
-}
-
-// Add add update to batch, possibly flushing
-func (batch *Batch) Add(update Update) error {
-	if err := update.addToBatch(batch.batch); err != nil {
-		return err
+// openIndexer open the index at the specified path, checking for metadata
+// updates and bleve version updates.  If index needs to be created (or
+// re-created), returns (nil, nil)
+func openIndexer(path string, latestVersion int) (bleve.Index, error) {
+	_, err := os.Stat(setting.Indexer.IssuePath)
+	if err != nil && os.IsNotExist(err) {
+		return nil, nil
+	} else if err != nil {
+		return nil, err
 	}
-	return batch.flushIfFull()
-}
 
-func (batch *Batch) flushIfFull() error {
-	if batch.batch.Size() >= maxBatchSize {
-		return batch.Flush()
+	metadata, err := rupture.ReadIndexMetadata(path)
+	if err != nil {
+		return nil, err
+	}
+	if metadata.Version < latestVersion {
+		// the indexer is using a previous version, so we should delete it and
+		// re-populate
+		return nil, os.RemoveAll(path)
 	}
-	return nil
-}
 
-// Flush manually flush the batch, regardless of its size
-func (batch *Batch) Flush() error {
-	if err := batch.index.Batch(batch.batch); err != nil {
-		return err
+	index, err := bleve.Open(path)
+	if err != nil && err == upsidedown.IncompatibleVersion {
+		// the indexer was built with a previous version of bleve, so we should
+		// delete it and re-populate
+		return nil, os.RemoveAll(path)
+	} else if err != nil {
+		return nil, err
 	}
-	batch.batch.Reset()
-	return nil
+	return index, nil
 }
diff --git a/modules/indexer/issue.go b/modules/indexer/issue.go
index 62a18e2b3..b0d231a7c 100644
--- a/modules/indexer/issue.go
+++ b/modules/indexer/issue.go
@@ -5,8 +5,6 @@
 package indexer
 
 import (
-	"os"
-
 	"code.gitea.io/gitea/modules/log"
 	"code.gitea.io/gitea/modules/setting"
 
@@ -14,12 +12,19 @@ import (
 	"github.com/blevesearch/bleve/analysis/analyzer/custom"
 	"github.com/blevesearch/bleve/analysis/token/lowercase"
 	"github.com/blevesearch/bleve/analysis/tokenizer/unicode"
-	"github.com/blevesearch/bleve/index/upsidedown"
+	"github.com/ethantkoenig/rupture"
 )
 
 // issueIndexer (thread-safe) index for searching issues
 var issueIndexer bleve.Index
 
+const (
+	issueIndexerAnalyzer = "issueIndexer"
+	issueIndexerDocType  = "issueIndexerDocType"
+
+	issueIndexerLatestVersion = 1
+)
+
 // IssueIndexerData data stored in the issue indexer
 type IssueIndexerData struct {
 	RepoID   int64
@@ -28,35 +33,33 @@ type IssueIndexerData struct {
 	Comments []string
 }
 
+// Type returns the document type, for bleve's mapping.Classifier interface.
+func (i *IssueIndexerData) Type() string {
+	return issueIndexerDocType
+}
+
 // IssueIndexerUpdate an update to the issue indexer
 type IssueIndexerUpdate struct {
 	IssueID int64
 	Data    *IssueIndexerData
 }
 
-func (update IssueIndexerUpdate) addToBatch(batch *bleve.Batch) error {
-	return batch.Index(indexerID(update.IssueID), update.Data)
+// AddToFlushingBatch adds the update to the given flushing batch.
+func (i IssueIndexerUpdate) AddToFlushingBatch(batch rupture.FlushingBatch) error {
+	return batch.Index(indexerID(i.IssueID), i.Data)
 }
 
-const issueIndexerAnalyzer = "issueIndexer"
-
 // InitIssueIndexer initialize issue indexer
 func InitIssueIndexer(populateIndexer func() error) {
-	_, err := os.Stat(setting.Indexer.IssuePath)
-	if err != nil && !os.IsNotExist(err) {
+	var err error
+	issueIndexer, err = openIndexer(setting.Indexer.IssuePath, issueIndexerLatestVersion)
+	if err != nil {
 		log.Fatal(4, "InitIssueIndexer: %v", err)
-	} else if err == nil {
-		issueIndexer, err = bleve.Open(setting.Indexer.IssuePath)
-		if err == nil {
-			return
-		} else if err != upsidedown.IncompatibleVersion {
-			log.Fatal(4, "InitIssueIndexer, open index: %v", err)
-		}
-		log.Warn("Incompatible bleve version, deleting and recreating issue indexer")
-		if err = os.RemoveAll(setting.Indexer.IssuePath); err != nil {
-			log.Fatal(4, "InitIssueIndexer: remove index, %v", err)
-		}
 	}
+	if issueIndexer != nil {
+		return
+	}
+
 	if err = createIssueIndexer(); err != nil {
 		log.Fatal(4, "InitIssuesIndexer: create index, %v", err)
 	}
@@ -70,9 +73,13 @@ func createIssueIndexer() error {
 	mapping := bleve.NewIndexMapping()
 	docMapping := bleve.NewDocumentMapping()
 
-	docMapping.AddFieldMappingsAt("RepoID", bleve.NewNumericFieldMapping())
+	numericFieldMapping := bleve.NewNumericFieldMapping()
+	numericFieldMapping.IncludeInAll = false
+	docMapping.AddFieldMappingsAt("RepoID", numericFieldMapping)
 
 	textFieldMapping := bleve.NewTextFieldMapping()
+	textFieldMapping.Store = false
+	textFieldMapping.IncludeInAll = false
 	docMapping.AddFieldMappingsAt("Title", textFieldMapping)
 	docMapping.AddFieldMappingsAt("Content", textFieldMapping)
 	docMapping.AddFieldMappingsAt("Comments", textFieldMapping)
@@ -89,7 +96,8 @@ func createIssueIndexer() error {
 	}
 
 	mapping.DefaultAnalyzer = issueIndexerAnalyzer
-	mapping.AddDocumentMapping("issues", docMapping)
+	mapping.AddDocumentMapping(issueIndexerDocType, docMapping)
+	mapping.AddDocumentMapping("_all", bleve.NewDocumentDisabledMapping())
 
 	var err error
 	issueIndexer, err = bleve.New(setting.Indexer.IssuePath, mapping)
@@ -97,11 +105,8 @@ func createIssueIndexer() error {
 }
 
 // IssueIndexerBatch batch to add updates to
-func IssueIndexerBatch() *Batch {
-	return &Batch{
-		batch: issueIndexer.NewBatch(),
-		index: issueIndexer,
-	}
+func IssueIndexerBatch() rupture.FlushingBatch {
+	return rupture.NewFlushingBatch(issueIndexer, maxBatchSize)
 }
 
 // SearchIssuesByKeyword searches for issues by given conditions.
diff --git a/modules/indexer/repo.go b/modules/indexer/repo.go
index 226e565e3..ffb1dc1e6 100644
--- a/modules/indexer/repo.go
+++ b/modules/indexer/repo.go
@@ -5,7 +5,6 @@
 package indexer
 
 import (
-	"os"
 	"strings"
 
 	"code.gitea.io/gitea/modules/log"
@@ -15,10 +14,17 @@ import (
 	"github.com/blevesearch/bleve/analysis/analyzer/custom"
 	"github.com/blevesearch/bleve/analysis/token/camelcase"
 	"github.com/blevesearch/bleve/analysis/token/lowercase"
+	"github.com/blevesearch/bleve/analysis/token/unique"
 	"github.com/blevesearch/bleve/analysis/tokenizer/unicode"
+	"github.com/ethantkoenig/rupture"
 )
 
-const repoIndexerAnalyzer = "repoIndexerAnalyzer"
+const (
+	repoIndexerAnalyzer = "repoIndexerAnalyzer"
+	repoIndexerDocType  = "repoIndexerDocType"
+
+	repoIndexerLatestVersion = 1
+)
 
 // repoIndexer (thread-safe) index for repository contents
 var repoIndexer bleve.Index
@@ -40,6 +46,11 @@ type RepoIndexerData struct {
 	Content string
 }
 
+// Type returns the document type, for bleve's mapping.Classifier interface.
+func (d *RepoIndexerData) Type() string {
+	return repoIndexerDocType
+}
+
 // RepoIndexerUpdate an update to the repo indexer
 type RepoIndexerUpdate struct {
 	Filepath string
@@ -47,13 +58,14 @@ type RepoIndexerUpdate struct {
 	Data     *RepoIndexerData
 }
 
-func (update RepoIndexerUpdate) addToBatch(batch *bleve.Batch) error {
+// AddToFlushingBatch adds the update to the given flushing batch.
+func (update RepoIndexerUpdate) AddToFlushingBatch(batch rupture.FlushingBatch) error {
 	id := filenameIndexerID(update.Data.RepoID, update.Filepath)
 	switch update.Op {
 	case RepoIndexerOpUpdate:
 		return batch.Index(id, update.Data)
 	case RepoIndexerOpDelete:
-		batch.Delete(id)
+		return batch.Delete(id)
 	default:
 		log.Error(4, "Unrecognized repo indexer op: %d", update.Op)
 	}
@@ -62,48 +74,50 @@ func (update RepoIndexerUpdate) addToBatch(batch *bleve.Batch) error {
 
 // InitRepoIndexer initialize repo indexer
 func InitRepoIndexer(populateIndexer func() error) {
-	_, err := os.Stat(setting.Indexer.RepoPath)
+	var err error
+	repoIndexer, err = openIndexer(setting.Indexer.RepoPath, repoIndexerLatestVersion)
 	if err != nil {
-		if os.IsNotExist(err) {
-			if err = createRepoIndexer(); err != nil {
-				log.Fatal(4, "CreateRepoIndexer: %v", err)
-			}
-			if err = populateIndexer(); err != nil {
-				log.Fatal(4, "PopulateRepoIndex: %v", err)
-			}
-		} else {
-			log.Fatal(4, "InitRepoIndexer: %v", err)
-		}
-	} else {
-		repoIndexer, err = bleve.Open(setting.Indexer.RepoPath)
-		if err != nil {
-			log.Fatal(4, "InitRepoIndexer, open index: %v", err)
-		}
+		log.Fatal(4, "InitRepoIndexer: %v", err)
+	}
+	if repoIndexer != nil {
+		return
+	}
+
+	if err = createRepoIndexer(); err != nil {
+		log.Fatal(4, "CreateRepoIndexer: %v", err)
+	}
+	if err = populateIndexer(); err != nil {
+		log.Fatal(4, "PopulateRepoIndex: %v", err)
 	}
 }
 
 // createRepoIndexer create a repo indexer if one does not already exist
 func createRepoIndexer() error {
+	var err error
 	docMapping := bleve.NewDocumentMapping()
-	docMapping.AddFieldMappingsAt("RepoID", bleve.NewNumericFieldMapping())
+	numericFieldMapping := bleve.NewNumericFieldMapping()
+	numericFieldMapping.IncludeInAll = false
+	docMapping.AddFieldMappingsAt("RepoID", numericFieldMapping)
 
 	textFieldMapping := bleve.NewTextFieldMapping()
+	textFieldMapping.IncludeInAll = false
 	docMapping.AddFieldMappingsAt("Content", textFieldMapping)
 
 	mapping := bleve.NewIndexMapping()
-	if err := addUnicodeNormalizeTokenFilter(mapping); err != nil {
+	if err = addUnicodeNormalizeTokenFilter(mapping); err != nil {
 		return err
-	} else if err := mapping.AddCustomAnalyzer(repoIndexerAnalyzer, map[string]interface{}{
+	} else if err = mapping.AddCustomAnalyzer(repoIndexerAnalyzer, map[string]interface{}{
 		"type":          custom.Name,
 		"char_filters":  []string{},
 		"tokenizer":     unicode.Name,
-		"token_filters": []string{unicodeNormalizeName, camelcase.Name, lowercase.Name},
+		"token_filters": []string{unicodeNormalizeName, camelcase.Name, lowercase.Name, unique.Name},
 	}); err != nil {
 		return err
 	}
 	mapping.DefaultAnalyzer = repoIndexerAnalyzer
-	mapping.AddDocumentMapping("repo", docMapping)
-	var err error
+	mapping.AddDocumentMapping(repoIndexerDocType, docMapping)
+	mapping.AddDocumentMapping("_all", bleve.NewDocumentDisabledMapping())
+
 	repoIndexer, err = bleve.New(setting.Indexer.RepoPath, mapping)
 	return err
 }
@@ -121,11 +135,8 @@ func filenameOfIndexerID(indexerID string) string {
 }
 
 // RepoIndexerBatch batch to add updates to
-func RepoIndexerBatch() *Batch {
-	return &Batch{
-		batch: repoIndexer.NewBatch(),
-		index: repoIndexer,
-	}
+func RepoIndexerBatch() rupture.FlushingBatch {
+	return rupture.NewFlushingBatch(repoIndexer, maxBatchSize)
 }
 
 // DeleteRepoFromIndexer delete all of a repo's files from indexer
@@ -138,8 +149,7 @@ func DeleteRepoFromIndexer(repoID int64) error {
 	}
 	batch := RepoIndexerBatch()
 	for _, hit := range result.Hits {
-		batch.batch.Delete(hit.ID)
-		if err = batch.flushIfFull(); err != nil {
+		if err = batch.Delete(hit.ID); err != nil {
 			return err
 		}
 	}
diff --git a/vendor/github.com/blevesearch/bleve/analysis/token/unique/unique.go b/vendor/github.com/blevesearch/bleve/analysis/token/unique/unique.go
new file mode 100644
index 000000000..f0d96c504
--- /dev/null
+++ b/vendor/github.com/blevesearch/bleve/analysis/token/unique/unique.go
@@ -0,0 +1,53 @@
+//  Copyright (c) 2018 Couchbase, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// 		http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package unique
+
+import (
+	"github.com/blevesearch/bleve/analysis"
+	"github.com/blevesearch/bleve/registry"
+)
+
+const Name = "unique"
+
+// UniqueTermFilter retains only the tokens which mark the first occurence of
+// a term. Tokens whose term appears in a preceding token are dropped.
+type UniqueTermFilter struct{}
+
+func NewUniqueTermFilter() *UniqueTermFilter {
+	return &UniqueTermFilter{}
+}
+
+func (f *UniqueTermFilter) Filter(input analysis.TokenStream) analysis.TokenStream {
+	encounteredTerms := make(map[string]struct{}, len(input)/4)
+	j := 0
+	for _, token := range input {
+		term := string(token.Term)
+		if _, ok := encounteredTerms[term]; ok {
+			continue
+		}
+		encounteredTerms[term] = struct{}{}
+		input[j] = token
+		j++
+	}
+	return input[:j]
+}
+
+func UniqueTermFilterConstructor(config map[string]interface{}, cache *registry.Cache) (analysis.TokenFilter, error) {
+	return NewUniqueTermFilter(), nil
+}
+
+func init() {
+	registry.RegisterTokenFilter(Name, UniqueTermFilterConstructor)
+}
diff --git a/vendor/github.com/ethantkoenig/rupture/Gopkg.lock b/vendor/github.com/ethantkoenig/rupture/Gopkg.lock
new file mode 100644
index 000000000..86e495e78
--- /dev/null
+++ b/vendor/github.com/ethantkoenig/rupture/Gopkg.lock
@@ -0,0 +1,173 @@
+# This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'.
+
+
+[[projects]]
+  name = "github.com/RoaringBitmap/roaring"
+  packages = ["."]
+  revision = "84551f0e309d6f9bafa428ef39b31ab7f16ff7b8"
+  version = "v0.4.1"
+
+[[projects]]
+  branch = "master"
+  name = "github.com/Smerity/govarint"
+  packages = ["."]
+  revision = "7265e41f48f15fd61751e16da866af3c704bb3ab"
+
+[[projects]]
+  name = "github.com/blevesearch/bleve"
+  packages = [
+    ".",
+    "analysis",
+    "analysis/analyzer/standard",
+    "analysis/datetime/flexible",
+    "analysis/datetime/optional",
+    "analysis/lang/en",
+    "analysis/token/lowercase",
+    "analysis/token/porter",
+    "analysis/token/stop",
+    "analysis/tokenizer/unicode",
+    "document",
+    "geo",
+    "index",
+    "index/scorch",
+    "index/scorch/mergeplan",
+    "index/scorch/segment",
+    "index/scorch/segment/mem",
+    "index/scorch/segment/zap",
+    "index/store",
+    "index/store/boltdb",
+    "index/store/gtreap",
+    "index/upsidedown",
+    "mapping",
+    "numeric",
+    "registry",
+    "search",
+    "search/collector",
+    "search/facet",
+    "search/highlight",
+    "search/highlight/format/html",
+    "search/highlight/fragmenter/simple",
+    "search/highlight/highlighter/html",
+    "search/highlight/highlighter/simple",
+    "search/query",
+    "search/scorer",
+    "search/searcher"
+  ]
+  revision = "a3b125508b4443344b596888ca58467b6c9310b9"
+
+[[projects]]
+  branch = "master"
+  name = "github.com/blevesearch/go-porterstemmer"
+  packages = ["."]
+  revision = "23a2c8e5cf1f380f27722c6d2ae8896431dc7d0e"
+
+[[projects]]
+  branch = "master"
+  name = "github.com/blevesearch/segment"
+  packages = ["."]
+  revision = "762005e7a34fd909a84586299f1dd457371d36ee"
+
+[[projects]]
+  branch = "master"
+  name = "github.com/boltdb/bolt"
+  packages = ["."]
+  revision = "9da31745363232bc1e27dbab3569e77383a51585"
+
+[[projects]]
+  branch = "master"
+  name = "github.com/couchbase/vellum"
+  packages = [
+    ".",
+    "regexp",
+    "utf8"
+  ]
+  revision = "ed84a675e24ed0a0bf6859b1ddec7e7c858354bd"
+
+[[projects]]
+  name = "github.com/davecgh/go-spew"
+  packages = ["spew"]
+  revision = "346938d642f2ec3594ed81d874461961cd0faa76"
+  version = "v1.1.0"
+
+[[projects]]
+  branch = "master"
+  name = "github.com/edsrzf/mmap-go"
+  packages = ["."]
+  revision = "0bce6a6887123b67a60366d2c9fe2dfb74289d2e"
+
+[[projects]]
+  branch = "master"
+  name = "github.com/glycerine/go-unsnap-stream"
+  packages = ["."]
+  revision = "62a9a9eb44fd8932157b1a8ace2149eff5971af6"
+
+[[projects]]
+  name = "github.com/golang/protobuf"
+  packages = ["proto"]
+  revision = "925541529c1fa6821df4e44ce2723319eb2be768"
+  version = "v1.0.0"
+
+[[projects]]
+  branch = "master"
+  name = "github.com/golang/snappy"
+  packages = ["."]
+  revision = "553a641470496b2327abcac10b36396bd98e45c9"
+
+[[projects]]
+  branch = "master"
+  name = "github.com/mschoch/smat"
+  packages = ["."]
+  revision = "90eadee771aeab36e8bf796039b8c261bebebe4f"
+
+[[projects]]
+  name = "github.com/philhofer/fwd"
+  packages = ["."]
+  revision = "bb6d471dc95d4fe11e432687f8b70ff496cf3136"
+  version = "v1.0.0"
+
+[[projects]]
+  name = "github.com/pmezard/go-difflib"
+  packages = ["difflib"]
+  revision = "792786c7400a136282c1664665ae0a8db921c6c2"
+  version = "v1.0.0"
+
+[[projects]]
+  branch = "master"
+  name = "github.com/steveyen/gtreap"
+  packages = ["."]
+  revision = "0abe01ef9be25c4aedc174758ec2d917314d6d70"
+
+[[projects]]
+  name = "github.com/stretchr/testify"
+  packages = ["assert"]
+  revision = "12b6f73e6084dad08a7c6e575284b177ecafbc71"
+  version = "v1.2.1"
+
+[[projects]]
+  branch = "master"
+  name = "github.com/tinylib/msgp"
+  packages = ["msgp"]
+  revision = "03a79185462ad029a6e7e05b2f3f3e0498d0a6c0"
+
+[[projects]]
+  branch = "master"
+  name = "github.com/willf/bitset"
+  packages = ["."]
+  revision = "1a37ad96e8c1a11b20900a232874843b5174221f"
+
+[[projects]]
+  name = "golang.org/x/net"
+  packages = ["context"]
+  revision = "309822c5b9b9f80db67f016069a12628d94fad34"
+
+[[projects]]
+  name = "golang.org/x/sys"
+  packages = ["unix"]
+  revision = "3dbebcf8efb6a5011a60c2b4591c1022a759af8a"
+
+[solve-meta]
+  analyzer-name = "dep"
+  analyzer-version = 1
+  inputs-digest = "61c759f0c1136cadf86ae8a30bb78edf33fc844cdcb2316469b4ae14a8d051b0"
+  solver-name = "gps-cdcl"
+  solver-version = 1
diff --git a/vendor/github.com/ethantkoenig/rupture/Gopkg.toml b/vendor/github.com/ethantkoenig/rupture/Gopkg.toml
new file mode 100644
index 000000000..55dbd3b23
--- /dev/null
+++ b/vendor/github.com/ethantkoenig/rupture/Gopkg.toml
@@ -0,0 +1,34 @@
+# Gopkg.toml example
+#
+# Refer to https://github.com/golang/dep/blob/master/docs/Gopkg.toml.md
+# for detailed Gopkg.toml documentation.
+#
+# required = ["github.com/user/thing/cmd/thing"]
+# ignored = ["github.com/user/project/pkgX", "bitbucket.org/user/project/pkgA/pkgY"]
+#
+# [[constraint]]
+#   name = "github.com/user/project"
+#   version = "1.0.0"
+#
+# [[constraint]]
+#   name = "github.com/user/project2"
+#   branch = "dev"
+#   source = "github.com/myfork/project2"
+#
+# [[override]]
+#   name = "github.com/x/y"
+#   version = "2.4.0"
+#
+# [prune]
+#   non-go = false
+#   go-tests = true
+#   unused-packages = true
+
+
+[[constraint]]
+  name = "github.com/stretchr/testify"
+  version = "1.2.1"
+
+[prune]
+  go-tests = true
+  unused-packages = true
diff --git a/vendor/github.com/ethantkoenig/rupture/LICENSE b/vendor/github.com/ethantkoenig/rupture/LICENSE
new file mode 100644
index 000000000..30adfac94
--- /dev/null
+++ b/vendor/github.com/ethantkoenig/rupture/LICENSE
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) 2018 Ethan Koenig
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
diff --git a/vendor/github.com/ethantkoenig/rupture/README.md b/vendor/github.com/ethantkoenig/rupture/README.md
new file mode 100644
index 000000000..da76681e3
--- /dev/null
+++ b/vendor/github.com/ethantkoenig/rupture/README.md
@@ -0,0 +1,13 @@
+# rupture
+
+[![Build Status](https://travis-ci.org/ethantkoenig/rupture.svg?branch=master)](https://travis-ci.org/ethantkoenig/rupture) [![GoDoc](https://godoc.org/github.com/ethantkoenig/rupture?status.svg)](https://godoc.org/github.com/ethantkoenig/rupture) [![Go Report Card](https://goreportcard.com/badge/blevesearch/bleve)](https://goreportcard.com/report/blevesearch/bleve)
+
+An explosive companion to the [bleve indexing library](https://www.github.com/blevesearch/bleve)
+
+## Features
+
+`rupture` includes the following additions to `bleve`:
+
+- __Flushing batches__: Batches of operation which automatically flush to the underlying bleve index.
+- __Sharded indices__: An index-like abstraction built on top of several underlying indices. Sharded indices provide lower write latencies for indices with large amounts of data.
+- __Index metadata__: Track index version for easily managing migrations and schema changes.
diff --git a/vendor/github.com/ethantkoenig/rupture/flushing_batch.go b/vendor/github.com/ethantkoenig/rupture/flushing_batch.go
new file mode 100644
index 000000000..b4948f674
--- /dev/null
+++ b/vendor/github.com/ethantkoenig/rupture/flushing_batch.go
@@ -0,0 +1,67 @@
+package rupture
+
+import (
+	"github.com/blevesearch/bleve"
+)
+
+// FlushingBatch is a batch of operations that automatically flushes to the
+// underlying index once it reaches a certain size.
+type FlushingBatch interface {
+	// Index adds the specified index operation batch, possibly triggering a
+	// flush.
+	Index(id string, data interface{}) error
+	// Remove adds the specified delete operation to the batch, possibly
+	// triggering a flush.
+	Delete(id string) error
+	// Flush flushes the batch's contents.
+	Flush() error
+}
+
+type singleIndexFlushingBatch struct {
+	maxBatchSize int
+	batch        *bleve.Batch
+	index        bleve.Index
+}
+
+func newFlushingBatch(index bleve.Index, maxBatchSize int) *singleIndexFlushingBatch {
+	return &singleIndexFlushingBatch{
+		maxBatchSize: maxBatchSize,
+		batch:        index.NewBatch(),
+		index:        index,
+	}
+}
+
+// NewFlushingBatch creates a new flushing batch for the specified index. Once
+// the number of operations in the batch reaches the specified limit, the batch
+// automatically flushes its operations to the index.
+func NewFlushingBatch(index bleve.Index, maxBatchSize int) FlushingBatch {
+	return newFlushingBatch(index, maxBatchSize)
+}
+
+func (b *singleIndexFlushingBatch) Index(id string, data interface{}) error {
+	if err := b.batch.Index(id, data); err != nil {
+		return err
+	}
+	return b.flushIfFull()
+}
+
+func (b *singleIndexFlushingBatch) Delete(id string) error {
+	b.batch.Delete(id)
+	return b.flushIfFull()
+}
+
+func (b *singleIndexFlushingBatch) flushIfFull() error {
+	if b.batch.Size() < b.maxBatchSize {
+		return nil
+	}
+	return b.Flush()
+}
+
+func (b *singleIndexFlushingBatch) Flush() error {
+	err := b.index.Batch(b.batch)
+	if err != nil {
+		return err
+	}
+	b.batch.Reset()
+	return nil
+}
diff --git a/vendor/github.com/ethantkoenig/rupture/metadata.go b/vendor/github.com/ethantkoenig/rupture/metadata.go
new file mode 100644
index 000000000..f26b53d96
--- /dev/null
+++ b/vendor/github.com/ethantkoenig/rupture/metadata.go
@@ -0,0 +1,68 @@
+package rupture
+
+import (
+	"encoding/json"
+	"io/ioutil"
+	"os"
+	"path/filepath"
+)
+
+const metaFilename = "rupture_meta.json"
+
+func indexMetadataPath(dir string) string {
+	return filepath.Join(dir, metaFilename)
+}
+
+// IndexMetadata contains metadata about a bleve index.
+type IndexMetadata struct {
+	// The version of the data in the index. This can be useful for tracking
+	// schema changes or data migrations.
+	Version int `json:"version"`
+}
+
+// in addition to the user-exposed metadata, we keep additional, internal-only
+// metadata for sharded indices.
+const shardedMetadataFilename = "rupture_sharded_meta.json"
+
+func shardedIndexMetadataPath(dir string) string {
+	return filepath.Join(dir, shardedMetadataFilename)
+}
+
+type shardedIndexMetadata struct {
+	NumShards int `json:"num_shards"`
+}
+
+func readJSON(path string, meta interface{}) error {
+	metaBytes, err := ioutil.ReadFile(path)
+	if err != nil {
+		return err
+	}
+	return json.Unmarshal(metaBytes, meta)
+}
+
+func writeJSON(path string, meta interface{}) error {
+	metaBytes, err := json.Marshal(meta)
+	if err != nil {
+		return err
+	}
+	return ioutil.WriteFile(path, metaBytes, 0666)
+}
+
+// ReadIndexMetadata returns the metadata for the index at the specified path.
+// If no such index metadata exists, an empty metadata and a nil error are
+// returned.
+func ReadIndexMetadata(path string) (*IndexMetadata, error) {
+	meta := &IndexMetadata{}
+	metaPath := indexMetadataPath(path)
+	if _, err := os.Stat(metaPath); os.IsNotExist(err) {
+		return meta, nil
+	} else if err != nil {
+		return nil, err
+	}
+	return meta, readJSON(metaPath, meta)
+}
+
+// WriteIndexMetadata writes metadata for the index at the specified path.
+func WriteIndexMetadata(path string, meta *IndexMetadata) error {
+	return writeJSON(indexMetadataPath(path), meta)
+}
diff --git a/vendor/github.com/ethantkoenig/rupture/sharded_index.go b/vendor/github.com/ethantkoenig/rupture/sharded_index.go
new file mode 100644
index 000000000..8e4cb9338
--- /dev/null
+++ b/vendor/github.com/ethantkoenig/rupture/sharded_index.go
@@ -0,0 +1,146 @@
+package rupture
+
+import (
+	"fmt"
+	"hash/fnv"
+	"path/filepath"
+	"strconv"
+
+	"github.com/blevesearch/bleve"
+	"github.com/blevesearch/bleve/document"
+	"github.com/blevesearch/bleve/mapping"
+)
+
+// ShardedIndex an index that is built onto of multiple underlying bleve
+// indices (i.e. shards). Similar to bleve's index aliases, some methods may
+// not be supported.
+type ShardedIndex interface {
+	bleve.Index
+	shards() []bleve.Index
+}
+
+// a type alias for bleve.Index, so that the anonymous field of
+// shardedIndex does not conflict with the Index(..) method.
+type bleveIndex bleve.Index
+
+type shardedIndex struct {
+	bleveIndex
+	indices []bleve.Index
+}
+
+func hash(id string, n int) uint64 {
+	fnvHash := fnv.New64()
+	fnvHash.Write([]byte(id))
+	return fnvHash.Sum64() % uint64(n)
+}
+
+func childIndexerPath(rootPath string, i int) string {
+	return filepath.Join(rootPath, strconv.Itoa(i))
+}
+
+// NewShardedIndex creates a sharded index at the specified path, with the
+// specified mapping and number of shards.
+func NewShardedIndex(path string, mapping mapping.IndexMapping, numShards int) (ShardedIndex, error) {
+	if numShards <= 0 {
+		return nil, fmt.Errorf("Invalid number of shards: %d", numShards)
+	}
+	err := writeJSON(shardedIndexMetadataPath(path), &shardedIndexMetadata{NumShards: numShards})
+	if err != nil {
+		return nil, err
+	}
+
+	s := &shardedIndex{
+		indices: make([]bleve.Index, numShards),
+	}
+	for i := 0; i < numShards; i++ {
+		s.indices[i], err = bleve.New(childIndexerPath(path, i), mapping)
+		if err != nil {
+			return nil, err
+		}
+	}
+	s.bleveIndex = bleve.NewIndexAlias(s.indices...)
+	return s, nil
+}
+
+// OpenShardedIndex opens a sharded index at the specified path.
+func OpenShardedIndex(path string) (ShardedIndex, error) {
+	var meta shardedIndexMetadata
+	var err error
+	if err = readJSON(shardedIndexMetadataPath(path), &meta); err != nil {
+		return nil, err
+	}
+
+	s := &shardedIndex{
+		indices: make([]bleve.Index, meta.NumShards),
+	}
+	for i := 0; i < meta.NumShards; i++ {
+		s.indices[i], err = bleve.Open(childIndexerPath(path, i))
+		if err != nil {
+			return nil, err
+		}
+	}
+	s.bleveIndex = bleve.NewIndexAlias(s.indices...)
+	return s, nil
+}
+
+func (s *shardedIndex) Index(id string, data interface{}) error {
+	return s.indices[hash(id, len(s.indices))].Index(id, data)
+}
+
+func (s *shardedIndex) Delete(id string) error {
+	return s.indices[hash(id, len(s.indices))].Delete(id)
+}
+
+func (s *shardedIndex) Document(id string) (*document.Document, error) {
+	return s.indices[hash(id, len(s.indices))].Document(id)
+}
+
+func (s *shardedIndex) Close() error {
+	if err := s.bleveIndex.Close(); err != nil {
+		return err
+	}
+	for _, index := range s.indices {
+		if err := index.Close(); err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+func (s *shardedIndex) shards() []bleve.Index {
+	return s.indices
+}
+
+type shardedIndexFlushingBatch struct {
+	batches []*singleIndexFlushingBatch
+}
+
+// NewShardedFlushingBatch creates a flushing batch with the specified batch
+// size for the specified sharded index.
+func NewShardedFlushingBatch(index ShardedIndex, maxBatchSize int) FlushingBatch {
+	indices := index.shards()
+	b := &shardedIndexFlushingBatch{
+		batches: make([]*singleIndexFlushingBatch, len(indices)),
+	}
+	for i, index := range indices {
+		b.batches[i] = newFlushingBatch(index, maxBatchSize)
+	}
+	return b
+}
+
+func (b *shardedIndexFlushingBatch) Index(id string, data interface{}) error {
+	return b.batches[hash(id, len(b.batches))].Index(id, data)
+}
+
+func (b *shardedIndexFlushingBatch) Delete(id string) error {
+	return b.batches[hash(id, len(b.batches))].Delete(id)
+}
+
+func (b *shardedIndexFlushingBatch) Flush() error {
+	for _, batch := range b.batches {
+		if err := batch.Flush(); err != nil {
+			return err
+		}
+	}
+	return nil
+}
diff --git a/vendor/vendor.json b/vendor/vendor.json
index a108bbbab..520cf8e22 100644
--- a/vendor/vendor.json
+++ b/vendor/vendor.json
@@ -128,6 +128,12 @@
 			"revision": "174f8ed44a0bf65e7c8fb228b60b58de62654cd2",
 			"revisionTime": "2017-06-28T17:18:15Z"
 		},
+		{
+			"checksumSHA1": "unacAFTLwgpg7wyI/mYf7Zd9eaU=",
+			"path": "github.com/blevesearch/bleve/analysis/token/unique",
+			"revision": "ff210fbc6d348ad67aa5754eaea11a463fcddafd",
+			"revisionTime": "2018-02-01T18:20:06Z"
+		},
 		{
 			"checksumSHA1": "q7C04nlJLxKmemXLop0oyJhfi5M=",
 			"path": "github.com/blevesearch/bleve/analysis/tokenizer/unicode",
@@ -347,6 +353,12 @@
 			"revision": "57eb5e1fc594ad4b0b1dbea7b286d299e0cb43c2",
 			"revisionTime": "2015-12-24T04:54:52Z"
 		},
+		{
+			"checksumSHA1": "06ofBxeJ9c4LS2p31PCMIj7IjJU=",
+			"path": "github.com/ethantkoenig/rupture",
+			"revision": "0a76f03a811abcca2e6357329b673e9bb8ef9643",
+			"revisionTime": "2018-02-03T18:25:44Z"
+		},
 		{
 			"checksumSHA1": "imR2wF388/0fBU6RRWx8RvTi8Q8=",
 			"path": "github.com/facebookgo/clock",