diff --git a/core/env/database.go b/core/env/database.go deleted file mode 100644 index 5ca4e4b9..00000000 --- a/core/env/database.go +++ /dev/null @@ -1,13 +0,0 @@ -// Copyright 2016 Documize Inc. . All rights reserved. -// -// This software (Documize Community Edition) is licensed under -// GNU AGPL v3 http://www.gnu.org/licenses/agpl-3.0.en.html -// -// You can operate outside the AGPL restrictions by purchasing -// Documize Enterprise Edition and obtaining a commercial license -// by contacting . -// -// https://documize.com - -// Package env provides runtime, server level setup and configuration -package env diff --git a/domain/document/endpoint.go b/domain/document/endpoint.go index beba03d1..835e2b38 100644 --- a/domain/document/endpoint.go +++ b/domain/document/endpoint.go @@ -24,12 +24,12 @@ import ( "github.com/documize/community/core/streamutil" "github.com/documize/community/core/stringutil" "github.com/documize/community/domain" + indexer "github.com/documize/community/domain/search" "github.com/documize/community/domain/space" "github.com/documize/community/model/activity" "github.com/documize/community/model/audit" "github.com/documize/community/model/doc" "github.com/documize/community/model/link" - "github.com/documize/community/model/page" "github.com/documize/community/model/search" ) @@ -37,39 +37,7 @@ import ( type Handler struct { Runtime *env.Runtime Store *domain.Store -} - -// SearchDocuments endpoint takes a list of keywords and returns a list of document references matching those keywords. -func (h *Handler) SearchDocuments(w http.ResponseWriter, r *http.Request) { - method := "document.search" - ctx := domain.GetRequestContext(r) - - keywords := request.Query(r, "keywords") - decoded, err := url.QueryUnescape(keywords) - if err != nil { - response.WriteBadRequestError(w, method, err.Error()) - return - } - - results, err := h.Store.Search.Documents(ctx, decoded) - if err != nil { - h.Runtime.Log.Error("search failed", err) - } - - // Put in slugs for easy UI display of search URL - for key, result := range results { - result.DocumentSlug = stringutil.MakeSlug(result.DocumentTitle) - result.FolderSlug = stringutil.MakeSlug(result.LabelName) - results[key] = result - } - - if len(results) == 0 { - results = []search.DocumentSearch{} - } - - h.Store.Audit.Record(ctx, audit.EventTypeSearch) - - response.WriteJSON(w, results) + Indexer indexer.Indexer } // Get is an endpoint that returns the document-level information for a given documentID. @@ -269,11 +237,10 @@ func (h *Handler) Update(w http.ResponseWriter, r *http.Request) { h.Store.Audit.Record(ctx, audit.EventTypeDocumentUpdate) - p := page.Page{DocumentID: documentID, OrgID: ctx.OrgID, Body: d.Slug, Title: d.Title} - h.Store.Search.UpdateDocument(ctx, p) - ctx.Transaction.Commit() + h.Indexer.UpdateDocument(ctx, d) + response.WriteEmpty(w) } @@ -330,10 +297,42 @@ func (h *Handler) Delete(w http.ResponseWriter, r *http.Request) { h.Store.Audit.Record(ctx, audit.EventTypeDocumentDelete) - p := page.Page{DocumentID: documentID, OrgID: ctx.OrgID} - h.Store.Search.DeleteDocument(ctx, p) - ctx.Transaction.Commit() + h.Indexer.DeleteDocument(ctx, documentID) + response.WriteEmpty(w) } + +// SearchDocuments endpoint takes a list of keywords and returns a list of document references matching those keywords. +func (h *Handler) SearchDocuments(w http.ResponseWriter, r *http.Request) { + method := "document.search" + ctx := domain.GetRequestContext(r) + + keywords := request.Query(r, "keywords") + decoded, err := url.QueryUnescape(keywords) + if err != nil { + response.WriteBadRequestError(w, method, err.Error()) + return + } + + results, err := h.Store.Search.Documents(ctx, decoded) + if err != nil { + h.Runtime.Log.Error("search failed", err) + } + + // Put in slugs for easy UI display of search URL + for key, result := range results { + result.DocumentSlug = stringutil.MakeSlug(result.DocumentTitle) + result.FolderSlug = stringutil.MakeSlug(result.LabelName) + results[key] = result + } + + if len(results) == 0 { + results = []search.DocumentSearch{} + } + + h.Store.Audit.Record(ctx, audit.EventTypeSearch) + + response.WriteJSON(w, results) +} diff --git a/domain/search/queue.go b/domain/search/queue.go new file mode 100644 index 00000000..4a2c5aff --- /dev/null +++ b/domain/search/queue.go @@ -0,0 +1,142 @@ +// Copyright 2016 Documize Inc. . All rights reserved. +// +// This software (Documize Community Edition) is licensed under +// GNU AGPL v3 http://www.gnu.org/licenses/agpl-3.0.en.html +// +// You can operate outside the AGPL restrictions by purchasing +// Documize Enterprise Edition and obtaining a commercial license +// by contacting . +// +// https://documize.com + +package search + +import ( + "errors" + "fmt" + "sync" + + "github.com/documize/community/core/env" + "github.com/documize/community/domain" + "github.com/documize/community/model/page" +) + +// Indexer type provides the datastructure for the queues of activity to be serialized through a single background goroutine. +// NOTE if the queue becomes full, the system will trigger the rebuilding entire files in order to clear the backlog. +type Indexer struct { + queue chan queueEntry + rebuild map[string]bool + rebuildLock sync.RWMutex + givenWarning bool + runtime *env.Runtime + store *domain.Store +} + +type queueEntry struct { + action func(domain.RequestContext, page.Page) error + isRebuild bool + page.Page + ctx domain.RequestContext +} + +const searchQueueLength = 2048 // NOTE the largest 15Mb docx in the test set generates 2142 queue entries, but the queue is constantly emptied + +// NewIndexer provides background search indexer +func NewIndexer(rt *env.Runtime, s *domain.Store) (i Indexer) { + i = Indexer{} + i.queue = make(chan queueEntry, searchQueueLength) // provide some decoupling + i.rebuild = make(map[string]bool) + i.runtime = rt + i.store = s + + go i.processQueue() + + return +} + +// processQueue is run as a goroutine, it processes the queue of search index update requests. +func (m *Indexer) processQueue() { + for { + //fmt.Println("DEBUG queue length=", len(Searches.queue)) + if len(m.queue) <= searchQueueLength/20 { // on a busy server, the queue may never get to zero - so use 5% + m.rebuildLock.Lock() + for docid := range m.rebuild { + m.queue <- queueEntry{ + action: m.store.Search.Rebuild, + isRebuild: true, + Page: page.Page{DocumentID: docid}, + } + delete(m.rebuild, docid) + } + m.rebuildLock.Unlock() + } + + qe := <-m.queue + doit := true + + if len(qe.DocumentID) > 0 { + m.rebuildLock.RLock() + if m.rebuild[qe.DocumentID] { + doit = false // don't execute an action on a document queued to be rebuilt + } + m.rebuildLock.RUnlock() + } + + if doit { + tx, err := m.runtime.Db.Beginx() + if err != nil { + } else { + ctx := qe.ctx + ctx.Transaction = tx + err = qe.action(ctx, qe.Page) + if err != nil { + tx.Rollback() + // This action has failed, so re-build indexes for the entire document, + // provided it was not a re-build command that failed and we know the documentId. + if !qe.isRebuild && len(qe.DocumentID) > 0 { + m.rebuildLock.Lock() + m.rebuild[qe.DocumentID] = true + m.rebuildLock.Unlock() + } + } else { + tx.Commit() + } + } + } + } +} + +func (m *Indexer) addQueue(qe queueEntry) error { + lsq := len(m.queue) + + if lsq >= (searchQueueLength - 1) { + if qe.DocumentID != "" { + m.rebuildLock.Lock() + if !m.rebuild[qe.DocumentID] { + m.runtime.Log.Info(fmt.Sprintf("WARNING: Search Queue Has No Space! Marked rebuild index for document id %s", qe.DocumentID)) + } + m.rebuild[qe.DocumentID] = true + m.rebuildLock.Unlock() + } else { + m.runtime.Log.Error("addQueue", errors.New("WARNING: Search Queue Has No Space! But unable to index unknown document id")) + } + + return nil + } + + if lsq > ((8 * searchQueueLength) / 10) { + if !m.givenWarning { + m.runtime.Log.Info(fmt.Sprintf("WARNING: Searches.queue length %d exceeds 80%% of capacity", lsq)) + m.givenWarning = true + } + } else { + if m.givenWarning { + m.runtime.Log.Info(fmt.Sprintf("INFO: Searches.queue length %d now below 80%% of capacity", lsq)) + m.givenWarning = false + } + } + + m.queue <- qe + + return nil +} diff --git a/domain/search/search.go b/domain/search/search.go index 6add2126..ac17683a 100644 --- a/domain/search/search.go +++ b/domain/search/search.go @@ -12,137 +12,12 @@ package search import ( - "errors" - "fmt" - "sync" - - "github.com/documize/community/core/env" "github.com/documize/community/domain" "github.com/documize/community/model" "github.com/documize/community/model/doc" "github.com/documize/community/model/page" ) -// Indexer type provides the datastructure for the queues of activity to be serialized through a single background goroutine. -// NOTE if the queue becomes full, the system will trigger the rebuilding entire files in order to clear the backlog. -type Indexer struct { - queue chan queueEntry - rebuild map[string]bool - rebuildLock sync.RWMutex - givenWarning bool - runtime *env.Runtime - store *domain.Store -} - -type queueEntry struct { - action func(domain.RequestContext, page.Page) error - isRebuild bool - page.Page - ctx domain.RequestContext -} - -var searches *Indexer - -const searchQueueLength = 2048 // NOTE the largest 15Mb docx in the test set generates 2142 queue entries, but the queue is constantly emptied - -// Start the background indexer -func Start(rt *env.Runtime, s *domain.Store) { - searches = &Indexer{} - searches.queue = make(chan queueEntry, searchQueueLength) // provide some decoupling - searches.rebuild = make(map[string]bool) - searches.runtime = rt - searches.store = s - - go searches.searchProcessQueue() -} - -// searchProcessQueue is run as a goroutine, it processes the queue of search index update requests. -func (m *Indexer) searchProcessQueue() { - for { - //fmt.Println("DEBUG queue length=", len(Searches.queue)) - if len(m.queue) <= searchQueueLength/20 { // on a busy server, the queue may never get to zero - so use 5% - m.rebuildLock.Lock() - for docid := range m.rebuild { - m.queue <- queueEntry{ - action: m.store.Search.Rebuild, - isRebuild: true, - Page: page.Page{DocumentID: docid}, - } - delete(m.rebuild, docid) - } - m.rebuildLock.Unlock() - } - - qe := <-m.queue - doit := true - - if len(qe.DocumentID) > 0 { - m.rebuildLock.RLock() - if m.rebuild[qe.DocumentID] { - doit = false // don't execute an action on a document queued to be rebuilt - } - m.rebuildLock.RUnlock() - } - - if doit { - tx, err := m.runtime.Db.Beginx() - if err != nil { - } else { - ctx := qe.ctx - ctx.Transaction = tx - err = qe.action(ctx, qe.Page) - if err != nil { - tx.Rollback() - // This action has failed, so re-build indexes for the entire document, - // provided it was not a re-build command that failed and we know the documentId. - if !qe.isRebuild && len(qe.DocumentID) > 0 { - m.rebuildLock.Lock() - m.rebuild[qe.DocumentID] = true - m.rebuildLock.Unlock() - } - } else { - tx.Commit() - } - } - } - } -} - -func (m *Indexer) addQueue(qe queueEntry) error { - lsq := len(m.queue) - - if lsq >= (searchQueueLength - 1) { - if qe.DocumentID != "" { - m.rebuildLock.Lock() - if !m.rebuild[qe.DocumentID] { - m.runtime.Log.Info(fmt.Sprintf("WARNING: Search Queue Has No Space! Marked rebuild index for document id %s", qe.DocumentID)) - } - m.rebuild[qe.DocumentID] = true - m.rebuildLock.Unlock() - } else { - m.runtime.Log.Error("addQueue", errors.New("WARNING: Search Queue Has No Space! But unable to index unknown document id")) - } - - return nil - } - - if lsq > ((8 * searchQueueLength) / 10) { - if !m.givenWarning { - m.runtime.Log.Info(fmt.Sprintf("WARNING: Searches.queue length %d exceeds 80%% of capacity", lsq)) - m.givenWarning = true - } - } else { - if m.givenWarning { - m.runtime.Log.Info(fmt.Sprintf("INFO: Searches.queue length %d now below 80%% of capacity", lsq)) - m.givenWarning = false - } - } - - m.queue <- qe - - return nil -} - // Add should be called when a new page is added to a document. func (m *Indexer) Add(ctx domain.RequestContext, page page.Page, id string) (err error) { page.RefID = id diff --git a/domain/storer.go b/domain/storer.go index f89d2937..0c3593c6 100644 --- a/domain/storer.go +++ b/domain/storer.go @@ -42,6 +42,7 @@ type Store struct { Page PageStorer Activity ActivityStorer Search SearchStorer + Indexer Indexer } // SpaceStorer defines required methods for space management @@ -193,3 +194,14 @@ type SearchStorer interface { Delete(ctx RequestContext, page page.Page) (err error) Documents(ctx RequestContext, keywords string) (results []search.DocumentSearch, err error) } + +// Indexer defines required methods for managing search indexing process +type Indexer interface { + Add(ctx RequestContext, page page.Page, id string) (err error) + Update(ctx RequestContext, page page.Page) (err error) + UpdateDocument(ctx RequestContext, page page.Page) (err error) + DeleteDocument(ctx RequestContext, documentID string) (err error) + UpdateSequence(ctx RequestContext, documentID, pageID string, sequence float64) (err error) + UpdateLevel(ctx RequestContext, documentID, pageID string, level int) (err error) + Delete(ctx RequestContext, documentID, pageID string) (err error) +} diff --git a/edition/community.go b/edition/community.go index 4b3c0e0d..e3583c9d 100644 --- a/edition/community.go +++ b/edition/community.go @@ -19,7 +19,6 @@ import ( "github.com/documize/community/core/api/request" "github.com/documize/community/core/env" "github.com/documize/community/domain" - "github.com/documize/community/domain/search" "github.com/documize/community/domain/section" "github.com/documize/community/edition/boot" "github.com/documize/community/edition/logging" @@ -69,9 +68,6 @@ func main() { // Register smart sections section.Register(rt) - // Search indexer queue - search.Start(&rt, &s) - ready := make(chan struct{}, 1) // channel signals router ready server.Start(&rt, &s, ready) } diff --git a/model/search/search.go b/model/search/search.go index d52dd4a5..6e726c57 100644 --- a/model/search/search.go +++ b/model/search/search.go @@ -11,7 +11,9 @@ package search -import "time" +import ( + "time" +) // Search holds raw search results. type Search struct { diff --git a/server/routing/entries.go b/server/routing/entries.go index 064d7fc0..f6d795e6 100644 --- a/server/routing/entries.go +++ b/server/routing/entries.go @@ -24,6 +24,7 @@ import ( "github.com/documize/community/domain/meta" "github.com/documize/community/domain/organization" "github.com/documize/community/domain/pin" + "github.com/documize/community/domain/search" "github.com/documize/community/domain/setting" "github.com/documize/community/domain/space" "github.com/documize/community/domain/user" @@ -32,6 +33,9 @@ import ( // RegisterEndpoints register routes for serving API endpoints func RegisterEndpoints(rt *env.Runtime, s *domain.Store) { + // base services + indexer := search.NewIndexer(rt, s) + // Pass server/application level contextual requirements into HTTP handlers // DO NOT pass in per request context (that is done by auth middleware per request) pin := pin.Handler{Runtime: rt, Store: s} @@ -41,7 +45,7 @@ func RegisterEndpoints(rt *env.Runtime, s *domain.Store) { link := link.Handler{Runtime: rt, Store: s} space := space.Handler{Runtime: rt, Store: s} setting := setting.Handler{Runtime: rt, Store: s} - document := document.Handler{Runtime: rt, Store: s} + document := document.Handler{Runtime: rt, Store: s, Indexer: indexer} attachment := attachment.Handler{Runtime: rt, Store: s} organization := organization.Handler{Runtime: rt, Store: s}