1
0
Fork 0
mirror of https://github.com/documize/community.git synced 2025-07-19 13:19:43 +02:00

Introduce transaction isolation customization

Search related indexing transactions requires better TX begin/commit.

New helpers provide TX isolation control.
This commit is contained in:
McMatts 2019-03-08 15:50:55 +00:00
parent 36d7136210
commit f062005946
5 changed files with 63 additions and 65 deletions

59
core/env/runtime.go vendored
View file

@ -13,25 +13,25 @@
package env package env
import ( import (
"context" "context"
"database/sql" "database/sql"
"github.com/documize/community/domain" "github.com/documize/community/domain"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
) )
const ( const (
// SiteModeNormal serves app // SiteModeNormal serves app
SiteModeNormal = "" SiteModeNormal = ""
// SiteModeOffline serves offline.html // SiteModeOffline serves offline.html
SiteModeOffline = "1" SiteModeOffline = "1"
// SiteModeSetup tells Ember to serve setup route // SiteModeSetup tells Ember to serve setup route
SiteModeSetup = "2" SiteModeSetup = "2"
// SiteModeBadDB redirects to db-error.html page // SiteModeBadDB redirects to db-error.html page
SiteModeBadDB = "3" SiteModeBadDB = "3"
) )
// Runtime provides access to database, logger and other server-level scoped objects. // Runtime provides access to database, logger and other server-level scoped objects.
@ -44,40 +44,37 @@ type Runtime struct {
Product domain.Product Product domain.Product
} }
var ctx = context.Background()
// StartTx beings database transaction with application defined // StartTx beings database transaction with application defined
// database transaction isolation level. // database transaction isolation level.
// Any error encountered during this operation is logged to runtime logger. // Any error encountered during this operation is logged to runtime logger.
func (r *Runtime) StartTx() (tx *sqlx.Tx, ok bool) { func (r *Runtime) StartTx(i sql.IsolationLevel) (tx *sqlx.Tx, ok bool) {
tx, err := r.Db.BeginTxx(ctx, &sql.TxOptions{Isolation: sql.LevelReadUncommitted}) tx, err := r.Db.BeginTxx(context.Background(), &sql.TxOptions{Isolation: i})
if err != nil { if err != nil {
r.Log.Error("unable to start database transaction", err) r.Log.Error("unable to start database transaction", err)
return nil, false return nil, false
} }
return tx, true return tx, true
} }
// Rollback aborts active database transaction. // Rollback aborts active database transaction.
// Any error encountered during this operation is logged to runtime logger. // Any error encountered during this operation is logged to runtime logger.
func (r *Runtime) Rollback(tx *sqlx.Tx) bool { func (r *Runtime) Rollback(tx *sqlx.Tx) bool {
if err := tx.Commit(); err != nil { if err := tx.Commit(); err != nil {
r.Log.Error("unable to commit database transaction", err) r.Log.Error("unable to commit database transaction", err)
return false return false
} }
return true return true
} }
// Commit flushes pending changes to database. // Commit flushes pending changes to database.
// Any error encountered during this operation is logged to runtime logger. // Any error encountered during this operation is logged to runtime logger.
func (r *Runtime) Commit(tx *sqlx.Tx) bool { func (r *Runtime) Commit(tx *sqlx.Tx) bool {
if err := tx.Commit(); err != nil { if err := tx.Commit(); err != nil {
r.Log.Error("unable to commit database transaction", err) r.Log.Error("unable to commit database transaction", err)
return false return false
} }
return true return true
} }

View file

@ -12,12 +12,14 @@
package search package search
import ( import (
"database/sql"
"github.com/documize/community/domain" "github.com/documize/community/domain"
"github.com/documize/community/model/attachment" "github.com/documize/community/model/attachment"
"github.com/documize/community/model/category" "github.com/documize/community/model/category"
"github.com/documize/community/model/doc" "github.com/documize/community/model/doc"
"github.com/documize/community/model/page" "github.com/documize/community/model/page"
sm "github.com/documize/community/model/search" sm "github.com/documize/community/model/search"
"github.com/documize/community/model/workflow"
) )
// IndexDocument adds search indesd entries for document inserting title, tags and attachments as // IndexDocument adds search indesd entries for document inserting title, tags and attachments as
@ -26,20 +28,21 @@ func (m *Indexer) IndexDocument(ctx domain.RequestContext, d doc.Document, a []a
method := "search.IndexDocument" method := "search.IndexDocument"
var err error var err error
ctx.Transaction, err = m.runtime.Db.Beginx() ok := true
if err != nil { ctx.Transaction, ok = m.runtime.StartTx(sql.LevelReadUncommitted)
m.runtime.Log.Error(method, err) if !ok {
m.runtime.Log.Info("unable to start TX for " + method)
return return
} }
err = m.store.Search.IndexDocument(ctx, d, a) err = m.store.Search.IndexDocument(ctx, d, a)
if err != nil { if err != nil {
ctx.Transaction.Rollback() m.runtime.Rollback(ctx.Transaction)
m.runtime.Log.Error(method, err) m.runtime.Log.Error(method, err)
return return
} }
ctx.Transaction.Commit() m.runtime.Commit(ctx.Transaction)
} }
// DeleteDocument removes all search entries for document. // DeleteDocument removes all search entries for document.
@ -47,20 +50,21 @@ func (m *Indexer) DeleteDocument(ctx domain.RequestContext, ID string) {
method := "search.DeleteDocument" method := "search.DeleteDocument"
var err error var err error
ctx.Transaction, err = m.runtime.Db.Beginx() ok := true
if err != nil { ctx.Transaction, ok = m.runtime.StartTx(sql.LevelReadUncommitted)
m.runtime.Log.Error(method, err) if !ok {
m.runtime.Log.Info("unable to start TX for " + method)
return return
} }
err = m.store.Search.DeleteDocument(ctx, ID) err = m.store.Search.DeleteDocument(ctx, ID)
if err != nil { if err != nil {
ctx.Transaction.Rollback() m.runtime.Rollback(ctx.Transaction)
m.runtime.Log.Error(method, err) m.runtime.Log.Error(method, err)
return return
} }
ctx.Transaction.Commit() m.runtime.Commit(ctx.Transaction)
} }
// IndexContent adds search index entry for document context. // IndexContent adds search index entry for document context.
@ -69,25 +73,26 @@ func (m *Indexer) IndexContent(ctx domain.RequestContext, p page.Page) {
method := "search.IndexContent" method := "search.IndexContent"
var err error var err error
ctx.Transaction, err = m.runtime.Db.Beginx() // we do not index pending pages
if err != nil { if p.Status == workflow.ChangePending || p.Status == workflow.ChangePendingNew {
m.runtime.Log.Error(method, err) return
}
ok := true
ctx.Transaction, ok = m.runtime.StartTx(sql.LevelReadUncommitted)
if !ok {
m.runtime.Log.Info("unable to start TX for " + method)
return return
} }
err = m.store.Search.IndexContent(ctx, p) err = m.store.Search.IndexContent(ctx, p)
if err != nil { if err != nil {
ctx.Transaction.Rollback() m.runtime.Rollback(ctx.Transaction)
m.runtime.Log.Error(method, err) m.runtime.Log.Error(method, err)
return return
} }
err = ctx.Transaction.Commit() m.runtime.Commit(ctx.Transaction)
if err != nil {
ctx.Transaction.Rollback()
m.runtime.Log.Error(method, err)
return
}
} }
// DeleteContent removes all search entries for specific document content. // DeleteContent removes all search entries for specific document content.
@ -95,20 +100,21 @@ func (m *Indexer) DeleteContent(ctx domain.RequestContext, pageID string) {
method := "search.DeleteContent" method := "search.DeleteContent"
var err error var err error
ctx.Transaction, err = m.runtime.Db.Beginx() ok := true
if err != nil { ctx.Transaction, ok = m.runtime.StartTx(sql.LevelReadUncommitted)
m.runtime.Log.Error(method, err) if !ok {
m.runtime.Log.Info("unable to start TX for " + method)
return return
} }
err = m.store.Search.DeleteContent(ctx, pageID) err = m.store.Search.DeleteContent(ctx, pageID)
if err != nil { if err != nil {
ctx.Transaction.Rollback() m.runtime.Rollback(ctx.Transaction)
m.runtime.Log.Error(method, err) m.runtime.Log.Error(method, err)
return return
} }
ctx.Transaction.Commit() m.runtime.Commit(ctx.Transaction)
} }
// FilterCategoryProtected removes search results that cannot be seen by user // FilterCategoryProtected removes search results that cannot be seen by user

View file

@ -24,7 +24,6 @@ import (
"github.com/documize/community/model/doc" "github.com/documize/community/model/doc"
"github.com/documize/community/model/page" "github.com/documize/community/model/page"
"github.com/documize/community/model/search" "github.com/documize/community/model/search"
"github.com/documize/community/model/workflow"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@ -125,11 +124,6 @@ func (s Store) DeleteDocument(ctx domain.RequestContext, ID string) (err error)
func (s Store) IndexContent(ctx domain.RequestContext, p page.Page) (err error) { func (s Store) IndexContent(ctx domain.RequestContext, p page.Page) (err error) {
method := "search.IndexContent" method := "search.IndexContent"
// we do not index pending pages
if p.Status == workflow.ChangePending || p.Status == workflow.ChangePendingNew {
return
}
// remove previous search entries // remove previous search entries
_, err = ctx.Transaction.Exec(s.Bind("DELETE FROM dmz_search WHERE c_orgid=? AND c_docid=? AND c_itemid=? AND c_itemtype='page'"), _, err = ctx.Transaction.Exec(s.Bind("DELETE FROM dmz_search WHERE c_orgid=? AND c_docid=? AND c_itemid=? AND c_itemtype='page'"),
ctx.OrgID, p.DocumentID, p.RefID) ctx.OrgID, p.DocumentID, p.RefID)

View file

@ -696,7 +696,7 @@ func (h *Handler) Delete(w http.ResponseWriter, r *http.Request) {
// Delete the space first. // Delete the space first.
ok := true ok := true
ctx.Transaction, ok = h.Runtime.StartTx() ctx.Transaction, ok = h.Runtime.StartTx(sql.LevelReadUncommitted)
if !ok { if !ok {
response.WriteError(w, method) response.WriteError(w, method)
return return
@ -712,7 +712,7 @@ func (h *Handler) Delete(w http.ResponseWriter, r *http.Request) {
h.Runtime.Commit(ctx.Transaction) h.Runtime.Commit(ctx.Transaction)
// Delete data associated with this space. // Delete data associated with this space.
ctx.Transaction, ok = h.Runtime.StartTx() ctx.Transaction, ok = h.Runtime.StartTx(sql.LevelReadUncommitted)
if !ok { if !ok {
response.WriteError(w, method) response.WriteError(w, method)
return return
@ -756,7 +756,7 @@ func (h *Handler) Delete(w http.ResponseWriter, r *http.Request) {
h.Runtime.Commit(ctx.Transaction) h.Runtime.Commit(ctx.Transaction)
// Record this action. // Record this action.
ctx.Transaction, ok = h.Runtime.StartTx() ctx.Transaction, ok = h.Runtime.StartTx(sql.LevelReadUncommitted)
if !ok { if !ok {
response.WriteError(w, method) response.WriteError(w, method)
return return

View file

@ -184,6 +184,7 @@ type DocumentStorer interface {
TemplatesBySpace(ctx domain.RequestContext, spaceID string) (documents []doc.Document, err error) TemplatesBySpace(ctx domain.RequestContext, spaceID string) (documents []doc.Document, err error)
PublicDocuments(ctx domain.RequestContext, orgID string) (documents []doc.SitemapDocument, err error) PublicDocuments(ctx domain.RequestContext, orgID string) (documents []doc.SitemapDocument, err error)
Update(ctx domain.RequestContext, document doc.Document) (err error) Update(ctx domain.RequestContext, document doc.Document) (err error)
UpdateRevised(ctx domain.RequestContext, docID string) (err error)
UpdateGroup(ctx domain.RequestContext, document doc.Document) (err error) UpdateGroup(ctx domain.RequestContext, document doc.Document) (err error)
ChangeDocumentSpace(ctx domain.RequestContext, document, space string) (err error) ChangeDocumentSpace(ctx domain.RequestContext, document, space string) (err error)
MoveDocumentSpace(ctx domain.RequestContext, id, move string) (err error) MoveDocumentSpace(ctx domain.RequestContext, id, move string) (err error)