1
0
Fork 0
mirror of https://github.com/documize/community.git synced 2025-07-20 05:39:42 +02:00

refactored routing/web serving logic

This commit is contained in:
Harvey Kandola 2017-07-21 18:14:19 +01:00
parent 76d77bef9b
commit d888962082
16 changed files with 410 additions and 382 deletions

View file

@ -22,9 +22,9 @@ import (
"strings"
"time"
"github.com/documize/community/core/log"
"github.com/documize/community/core/env"
"github.com/documize/community/core/streamutil"
"github.com/documize/community/core/web"
"github.com/documize/community/server/web"
"github.com/jmoiron/sqlx"
)
@ -69,9 +69,9 @@ func migrations(lastMigration string) (migrationsT, error) {
}
// migrate the database as required, by applying the migrations.
func (m migrationsT) migrate(tx *sqlx.Tx) error {
func (m migrationsT) migrate(runtime env.Runtime, tx *sqlx.Tx) error {
for _, v := range m {
log.Info("Processing migration file: " + v)
runtime.Log.Info("Processing migration file: " + v)
buf, err := web.Asset(migrationsDir + "/" + v)
if err != nil {
@ -96,7 +96,7 @@ func (m migrationsT) migrate(tx *sqlx.Tx) error {
return nil
}
func lockDB() (bool, error) {
func lockDB(runtime env.Runtime) (bool, error) {
b := make([]byte, 2)
_, err := rand.Read(b)
if err != nil {
@ -117,8 +117,10 @@ func lockDB() (bool, error) {
defer func() {
_, err = tx.Exec("UNLOCK TABLES;")
log.IfErr(err)
log.IfErr(tx.Commit())
if err != nil {
runtime.Log.Error("unable to unlock tables", err)
}
tx.Commit()
}()
_, err = tx.Exec("INSERT INTO `config` (`key`,`config`) " +
@ -126,13 +128,13 @@ func lockDB() (bool, error) {
if err != nil {
// good error would be "Error 1062: Duplicate entry 'DBLOCK' for key 'idx_config_area'"
if strings.HasPrefix(err.Error(), "Error 1062:") {
log.Info("Database locked by annother Documize instance")
runtime.Log.Info("Database locked by annother Documize instance")
return false, nil
}
return false, err
}
log.Info("Database locked by this Documize instance")
runtime.Log.Info("Database locked by this Documize instance")
return true, err // success!
}
@ -148,18 +150,18 @@ func unlockDB() error {
return tx.Commit()
}
func migrateEnd(tx *sqlx.Tx, err error, amLeader bool) error {
func migrateEnd(runtime env.Runtime, tx *sqlx.Tx, err error, amLeader bool) error {
if amLeader {
defer func() { log.IfErr(unlockDB()) }()
defer func() { unlockDB() }()
if tx != nil {
if err == nil {
log.IfErr(tx.Commit())
log.Info("Database checks: completed")
tx.Commit()
runtime.Log.Info("Database checks: completed")
return nil
}
log.IfErr(tx.Rollback())
tx.Rollback()
}
log.Error("Database checks: failed: ", err)
runtime.Log.Error("Database checks: failed: ", err)
return err
}
return nil // not the leader, so ignore errors
@ -186,21 +188,22 @@ func getLastMigration(tx *sqlx.Tx) (lastMigration string, err error) {
}
// Migrate the database as required, consolidated action.
func Migrate(ConfigTableExists bool) error {
func Migrate(runtime env.Runtime, ConfigTableExists bool) error {
amLeader := false
if ConfigTableExists {
var err error
amLeader, err = lockDB()
log.IfErr(err)
amLeader, err = lockDB(runtime)
if err != nil {
runtime.Log.Error("unable to lock DB", err)
}
} else {
amLeader = true // what else can you do?
}
tx, err := (*dbPtr).Beginx()
if err != nil {
return migrateEnd(tx, err, amLeader)
return migrateEnd(runtime, tx, err, amLeader)
}
lastMigration := ""
@ -208,53 +211,50 @@ func Migrate(ConfigTableExists bool) error {
if ConfigTableExists {
lastMigration, err = getLastMigration(tx)
if err != nil {
return migrateEnd(tx, err, amLeader)
return migrateEnd(runtime, tx, err, amLeader)
}
log.Info("Database checks: last applied " + lastMigration)
runtime.Log.Info("Database checks: last applied " + lastMigration)
}
mig, err := migrations(lastMigration)
if err != nil {
return migrateEnd(tx, err, amLeader)
return migrateEnd(runtime, tx, err, amLeader)
}
if len(mig) == 0 {
log.Info("Database checks: no updates required")
return migrateEnd(tx, nil, amLeader) // no migrations to perform
runtime.Log.Info("Database checks: no updates required")
return migrateEnd(runtime, tx, nil, amLeader) // no migrations to perform
}
if amLeader {
log.Info("Database checks: will execute the following update files: " + strings.Join([]string(mig), ", "))
return migrateEnd(tx, mig.migrate(tx), amLeader)
runtime.Log.Info("Database checks: will execute the following update files: " + strings.Join([]string(mig), ", "))
return migrateEnd(runtime, tx, mig.migrate(runtime, tx), amLeader)
}
// a follower instance
targetMigration := string(mig[len(mig)-1])
for targetMigration != lastMigration {
time.Sleep(time.Second)
log.Info("Waiting for database migration completion")
runtime.Log.Info("Waiting for database migration completion")
tx.Rollback() // ignore error
tx, err := (*dbPtr).Beginx() // need this in order to see the changed situation since last tx
if err != nil {
return migrateEnd(tx, err, amLeader)
return migrateEnd(runtime, tx, err, amLeader)
}
lastMigration, _ = getLastMigration(tx)
}
return migrateEnd(tx, nil, amLeader)
return migrateEnd(runtime, tx, nil, amLeader)
}
func processSQLfile(tx *sqlx.Tx, buf []byte) error {
stmts := getStatements(buf)
for _, stmt := range stmts {
_, err := tx.Exec(stmt)
if err != nil {
return err
}
}
return nil