1
0
Fork 0
mirror of https://github.com/documize/community.git synced 2025-07-20 21:59:42 +02:00

search indexing process coupling

This commit is contained in:
Harvey Kandola 2017-08-01 10:39:07 +01:00
parent 65390ab67d
commit 7faf6d6cff
8 changed files with 201 additions and 184 deletions

13
core/env/database.go vendored
View file

@ -1,13 +0,0 @@
// Copyright 2016 Documize Inc. <legal@documize.com>. All rights reserved.
//
// This software (Documize Community Edition) is licensed under
// GNU AGPL v3 http://www.gnu.org/licenses/agpl-3.0.en.html
//
// You can operate outside the AGPL restrictions by purchasing
// Documize Enterprise Edition and obtaining a commercial license
// by contacting <sales@documize.com>.
//
// https://documize.com
// Package env provides runtime, server level setup and configuration
package env

View file

@ -24,12 +24,12 @@ import (
"github.com/documize/community/core/streamutil"
"github.com/documize/community/core/stringutil"
"github.com/documize/community/domain"
indexer "github.com/documize/community/domain/search"
"github.com/documize/community/domain/space"
"github.com/documize/community/model/activity"
"github.com/documize/community/model/audit"
"github.com/documize/community/model/doc"
"github.com/documize/community/model/link"
"github.com/documize/community/model/page"
"github.com/documize/community/model/search"
)
@ -37,39 +37,7 @@ import (
type Handler struct {
Runtime *env.Runtime
Store *domain.Store
}
// SearchDocuments endpoint takes a list of keywords and returns a list of document references matching those keywords.
func (h *Handler) SearchDocuments(w http.ResponseWriter, r *http.Request) {
method := "document.search"
ctx := domain.GetRequestContext(r)
keywords := request.Query(r, "keywords")
decoded, err := url.QueryUnescape(keywords)
if err != nil {
response.WriteBadRequestError(w, method, err.Error())
return
}
results, err := h.Store.Search.Documents(ctx, decoded)
if err != nil {
h.Runtime.Log.Error("search failed", err)
}
// Put in slugs for easy UI display of search URL
for key, result := range results {
result.DocumentSlug = stringutil.MakeSlug(result.DocumentTitle)
result.FolderSlug = stringutil.MakeSlug(result.LabelName)
results[key] = result
}
if len(results) == 0 {
results = []search.DocumentSearch{}
}
h.Store.Audit.Record(ctx, audit.EventTypeSearch)
response.WriteJSON(w, results)
Indexer indexer.Indexer
}
// Get is an endpoint that returns the document-level information for a given documentID.
@ -269,11 +237,10 @@ func (h *Handler) Update(w http.ResponseWriter, r *http.Request) {
h.Store.Audit.Record(ctx, audit.EventTypeDocumentUpdate)
p := page.Page{DocumentID: documentID, OrgID: ctx.OrgID, Body: d.Slug, Title: d.Title}
h.Store.Search.UpdateDocument(ctx, p)
ctx.Transaction.Commit()
h.Indexer.UpdateDocument(ctx, d)
response.WriteEmpty(w)
}
@ -330,10 +297,42 @@ func (h *Handler) Delete(w http.ResponseWriter, r *http.Request) {
h.Store.Audit.Record(ctx, audit.EventTypeDocumentDelete)
p := page.Page{DocumentID: documentID, OrgID: ctx.OrgID}
h.Store.Search.DeleteDocument(ctx, p)
ctx.Transaction.Commit()
h.Indexer.DeleteDocument(ctx, documentID)
response.WriteEmpty(w)
}
// SearchDocuments endpoint takes a list of keywords and returns a list of document references matching those keywords.
func (h *Handler) SearchDocuments(w http.ResponseWriter, r *http.Request) {
method := "document.search"
ctx := domain.GetRequestContext(r)
keywords := request.Query(r, "keywords")
decoded, err := url.QueryUnescape(keywords)
if err != nil {
response.WriteBadRequestError(w, method, err.Error())
return
}
results, err := h.Store.Search.Documents(ctx, decoded)
if err != nil {
h.Runtime.Log.Error("search failed", err)
}
// Put in slugs for easy UI display of search URL
for key, result := range results {
result.DocumentSlug = stringutil.MakeSlug(result.DocumentTitle)
result.FolderSlug = stringutil.MakeSlug(result.LabelName)
results[key] = result
}
if len(results) == 0 {
results = []search.DocumentSearch{}
}
h.Store.Audit.Record(ctx, audit.EventTypeSearch)
response.WriteJSON(w, results)
}

142
domain/search/queue.go Normal file
View file

@ -0,0 +1,142 @@
// Copyright 2016 Documize Inc. <legal@documize.com>. All rights reserved.
//
// This software (Documize Community Edition) is licensed under
// GNU AGPL v3 http://www.gnu.org/licenses/agpl-3.0.en.html
//
// You can operate outside the AGPL restrictions by purchasing
// Documize Enterprise Edition and obtaining a commercial license
// by contacting <sales@documize.com>.
//
// https://documize.com
package search
import (
"errors"
"fmt"
"sync"
"github.com/documize/community/core/env"
"github.com/documize/community/domain"
"github.com/documize/community/model/page"
)
// Indexer type provides the datastructure for the queues of activity to be serialized through a single background goroutine.
// NOTE if the queue becomes full, the system will trigger the rebuilding entire files in order to clear the backlog.
type Indexer struct {
queue chan queueEntry
rebuild map[string]bool
rebuildLock sync.RWMutex
givenWarning bool
runtime *env.Runtime
store *domain.Store
}
type queueEntry struct {
action func(domain.RequestContext, page.Page) error
isRebuild bool
page.Page
ctx domain.RequestContext
}
const searchQueueLength = 2048 // NOTE the largest 15Mb docx in the test set generates 2142 queue entries, but the queue is constantly emptied
// NewIndexer provides background search indexer
func NewIndexer(rt *env.Runtime, s *domain.Store) (i Indexer) {
i = Indexer{}
i.queue = make(chan queueEntry, searchQueueLength) // provide some decoupling
i.rebuild = make(map[string]bool)
i.runtime = rt
i.store = s
go i.processQueue()
return
}
// processQueue is run as a goroutine, it processes the queue of search index update requests.
func (m *Indexer) processQueue() {
for {
//fmt.Println("DEBUG queue length=", len(Searches.queue))
if len(m.queue) <= searchQueueLength/20 { // on a busy server, the queue may never get to zero - so use 5%
m.rebuildLock.Lock()
for docid := range m.rebuild {
m.queue <- queueEntry{
action: m.store.Search.Rebuild,
isRebuild: true,
Page: page.Page{DocumentID: docid},
}
delete(m.rebuild, docid)
}
m.rebuildLock.Unlock()
}
qe := <-m.queue
doit := true
if len(qe.DocumentID) > 0 {
m.rebuildLock.RLock()
if m.rebuild[qe.DocumentID] {
doit = false // don't execute an action on a document queued to be rebuilt
}
m.rebuildLock.RUnlock()
}
if doit {
tx, err := m.runtime.Db.Beginx()
if err != nil {
} else {
ctx := qe.ctx
ctx.Transaction = tx
err = qe.action(ctx, qe.Page)
if err != nil {
tx.Rollback()
// This action has failed, so re-build indexes for the entire document,
// provided it was not a re-build command that failed and we know the documentId.
if !qe.isRebuild && len(qe.DocumentID) > 0 {
m.rebuildLock.Lock()
m.rebuild[qe.DocumentID] = true
m.rebuildLock.Unlock()
}
} else {
tx.Commit()
}
}
}
}
}
func (m *Indexer) addQueue(qe queueEntry) error {
lsq := len(m.queue)
if lsq >= (searchQueueLength - 1) {
if qe.DocumentID != "" {
m.rebuildLock.Lock()
if !m.rebuild[qe.DocumentID] {
m.runtime.Log.Info(fmt.Sprintf("WARNING: Search Queue Has No Space! Marked rebuild index for document id %s", qe.DocumentID))
}
m.rebuild[qe.DocumentID] = true
m.rebuildLock.Unlock()
} else {
m.runtime.Log.Error("addQueue", errors.New("WARNING: Search Queue Has No Space! But unable to index unknown document id"))
}
return nil
}
if lsq > ((8 * searchQueueLength) / 10) {
if !m.givenWarning {
m.runtime.Log.Info(fmt.Sprintf("WARNING: Searches.queue length %d exceeds 80%% of capacity", lsq))
m.givenWarning = true
}
} else {
if m.givenWarning {
m.runtime.Log.Info(fmt.Sprintf("INFO: Searches.queue length %d now below 80%% of capacity", lsq))
m.givenWarning = false
}
}
m.queue <- qe
return nil
}

View file

@ -12,137 +12,12 @@
package search
import (
"errors"
"fmt"
"sync"
"github.com/documize/community/core/env"
"github.com/documize/community/domain"
"github.com/documize/community/model"
"github.com/documize/community/model/doc"
"github.com/documize/community/model/page"
)
// Indexer type provides the datastructure for the queues of activity to be serialized through a single background goroutine.
// NOTE if the queue becomes full, the system will trigger the rebuilding entire files in order to clear the backlog.
type Indexer struct {
queue chan queueEntry
rebuild map[string]bool
rebuildLock sync.RWMutex
givenWarning bool
runtime *env.Runtime
store *domain.Store
}
type queueEntry struct {
action func(domain.RequestContext, page.Page) error
isRebuild bool
page.Page
ctx domain.RequestContext
}
var searches *Indexer
const searchQueueLength = 2048 // NOTE the largest 15Mb docx in the test set generates 2142 queue entries, but the queue is constantly emptied
// Start the background indexer
func Start(rt *env.Runtime, s *domain.Store) {
searches = &Indexer{}
searches.queue = make(chan queueEntry, searchQueueLength) // provide some decoupling
searches.rebuild = make(map[string]bool)
searches.runtime = rt
searches.store = s
go searches.searchProcessQueue()
}
// searchProcessQueue is run as a goroutine, it processes the queue of search index update requests.
func (m *Indexer) searchProcessQueue() {
for {
//fmt.Println("DEBUG queue length=", len(Searches.queue))
if len(m.queue) <= searchQueueLength/20 { // on a busy server, the queue may never get to zero - so use 5%
m.rebuildLock.Lock()
for docid := range m.rebuild {
m.queue <- queueEntry{
action: m.store.Search.Rebuild,
isRebuild: true,
Page: page.Page{DocumentID: docid},
}
delete(m.rebuild, docid)
}
m.rebuildLock.Unlock()
}
qe := <-m.queue
doit := true
if len(qe.DocumentID) > 0 {
m.rebuildLock.RLock()
if m.rebuild[qe.DocumentID] {
doit = false // don't execute an action on a document queued to be rebuilt
}
m.rebuildLock.RUnlock()
}
if doit {
tx, err := m.runtime.Db.Beginx()
if err != nil {
} else {
ctx := qe.ctx
ctx.Transaction = tx
err = qe.action(ctx, qe.Page)
if err != nil {
tx.Rollback()
// This action has failed, so re-build indexes for the entire document,
// provided it was not a re-build command that failed and we know the documentId.
if !qe.isRebuild && len(qe.DocumentID) > 0 {
m.rebuildLock.Lock()
m.rebuild[qe.DocumentID] = true
m.rebuildLock.Unlock()
}
} else {
tx.Commit()
}
}
}
}
}
func (m *Indexer) addQueue(qe queueEntry) error {
lsq := len(m.queue)
if lsq >= (searchQueueLength - 1) {
if qe.DocumentID != "" {
m.rebuildLock.Lock()
if !m.rebuild[qe.DocumentID] {
m.runtime.Log.Info(fmt.Sprintf("WARNING: Search Queue Has No Space! Marked rebuild index for document id %s", qe.DocumentID))
}
m.rebuild[qe.DocumentID] = true
m.rebuildLock.Unlock()
} else {
m.runtime.Log.Error("addQueue", errors.New("WARNING: Search Queue Has No Space! But unable to index unknown document id"))
}
return nil
}
if lsq > ((8 * searchQueueLength) / 10) {
if !m.givenWarning {
m.runtime.Log.Info(fmt.Sprintf("WARNING: Searches.queue length %d exceeds 80%% of capacity", lsq))
m.givenWarning = true
}
} else {
if m.givenWarning {
m.runtime.Log.Info(fmt.Sprintf("INFO: Searches.queue length %d now below 80%% of capacity", lsq))
m.givenWarning = false
}
}
m.queue <- qe
return nil
}
// Add should be called when a new page is added to a document.
func (m *Indexer) Add(ctx domain.RequestContext, page page.Page, id string) (err error) {
page.RefID = id

View file

@ -42,6 +42,7 @@ type Store struct {
Page PageStorer
Activity ActivityStorer
Search SearchStorer
Indexer Indexer
}
// SpaceStorer defines required methods for space management
@ -193,3 +194,14 @@ type SearchStorer interface {
Delete(ctx RequestContext, page page.Page) (err error)
Documents(ctx RequestContext, keywords string) (results []search.DocumentSearch, err error)
}
// Indexer defines required methods for managing search indexing process
type Indexer interface {
Add(ctx RequestContext, page page.Page, id string) (err error)
Update(ctx RequestContext, page page.Page) (err error)
UpdateDocument(ctx RequestContext, page page.Page) (err error)
DeleteDocument(ctx RequestContext, documentID string) (err error)
UpdateSequence(ctx RequestContext, documentID, pageID string, sequence float64) (err error)
UpdateLevel(ctx RequestContext, documentID, pageID string, level int) (err error)
Delete(ctx RequestContext, documentID, pageID string) (err error)
}

View file

@ -19,7 +19,6 @@ import (
"github.com/documize/community/core/api/request"
"github.com/documize/community/core/env"
"github.com/documize/community/domain"
"github.com/documize/community/domain/search"
"github.com/documize/community/domain/section"
"github.com/documize/community/edition/boot"
"github.com/documize/community/edition/logging"
@ -69,9 +68,6 @@ func main() {
// Register smart sections
section.Register(rt)
// Search indexer queue
search.Start(&rt, &s)
ready := make(chan struct{}, 1) // channel signals router ready
server.Start(&rt, &s, ready)
}

View file

@ -11,7 +11,9 @@
package search
import "time"
import (
"time"
)
// Search holds raw search results.
type Search struct {

View file

@ -24,6 +24,7 @@ import (
"github.com/documize/community/domain/meta"
"github.com/documize/community/domain/organization"
"github.com/documize/community/domain/pin"
"github.com/documize/community/domain/search"
"github.com/documize/community/domain/setting"
"github.com/documize/community/domain/space"
"github.com/documize/community/domain/user"
@ -32,6 +33,9 @@ import (
// RegisterEndpoints register routes for serving API endpoints
func RegisterEndpoints(rt *env.Runtime, s *domain.Store) {
// base services
indexer := search.NewIndexer(rt, s)
// Pass server/application level contextual requirements into HTTP handlers
// DO NOT pass in per request context (that is done by auth middleware per request)
pin := pin.Handler{Runtime: rt, Store: s}
@ -41,7 +45,7 @@ func RegisterEndpoints(rt *env.Runtime, s *domain.Store) {
link := link.Handler{Runtime: rt, Store: s}
space := space.Handler{Runtime: rt, Store: s}
setting := setting.Handler{Runtime: rt, Store: s}
document := document.Handler{Runtime: rt, Store: s}
document := document.Handler{Runtime: rt, Store: s, Indexer: indexer}
attachment := attachment.Handler{Runtime: rt, Store: s}
organization := organization.Handler{Runtime: rt, Store: s}