mirror of
https://github.com/documize/community.git
synced 2025-07-24 23:59:47 +02:00
Merge pull request #29 from documize/db-auto-upgrade
DB auto-upgrade multi-instance fix
This commit is contained in:
commit
615aca8040
4 changed files with 731 additions and 741 deletions
|
@ -50,8 +50,9 @@ func init() {
|
|||
var err error
|
||||
|
||||
environment.GetString(&connectionString, "db", true,
|
||||
`"username:password@protocol(hostname:port)/databasename" for example "fred:bloggs@tcp(localhost:3306)/documize"`,
|
||||
`'username:password@protocol(hostname:port)/databasename" for example "fred:bloggs@tcp(localhost:3306)/documize"`,
|
||||
func(*string, string) bool {
|
||||
|
||||
Db, err = sqlx.Open("mysql", stdConn(connectionString))
|
||||
|
||||
if err != nil {
|
||||
|
|
|
@ -19,9 +19,9 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/documize/community/core/api/util"
|
||||
"github.com/documize/community/core/web"
|
||||
"github.com/documize/community/core/log"
|
||||
"github.com/documize/community/core/utility"
|
||||
"github.com/documize/community/core/web"
|
||||
)
|
||||
|
||||
// runSQL creates a transaction per call
|
||||
|
|
|
@ -13,16 +13,20 @@ package database
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/rand"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"os"
|
||||
"regexp"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/jmoiron/sqlx"
|
||||
|
||||
"github.com/documize/community/core/web"
|
||||
"github.com/documize/community/core/log"
|
||||
"github.com/documize/community/core/utility"
|
||||
"github.com/documize/community/core/web"
|
||||
)
|
||||
|
||||
const migrationsDir = "bindata/scripts"
|
||||
|
@ -69,97 +73,176 @@ func migrations(lastMigration string) (migrationsT, error) {
|
|||
func (m migrationsT) migrate(tx *sqlx.Tx) error {
|
||||
for _, v := range m {
|
||||
log.Info("Processing migration file: " + v)
|
||||
|
||||
buf, err := web.Asset(migrationsDir + "/" + v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
//fmt.Println("DEBUG database.Migrate() ", v, ":\n", string(buf)) // TODO actually run the SQL
|
||||
|
||||
err = processSQLfile(tx, buf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
json := `{"database":"` + v + `"}`
|
||||
sql := "INSERT INTO `config` (`key`,`config`) " +
|
||||
"VALUES ('META','" + json +
|
||||
"') ON DUPLICATE KEY UPDATE `config`='" + json + "';"
|
||||
|
||||
_, err = tx.Exec(sql)
|
||||
_, err = tx.Exec(sql) // add a record in the config file to say we have done the upgrade
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
//fmt.Println("DEBUG insert 10s wait for testing")
|
||||
//time.Sleep(10 * time.Second)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func migrateEnd(tx *sqlx.Tx, err error) error {
|
||||
if tx != nil {
|
||||
_, ulerr := tx.Exec("UNLOCK TABLES;")
|
||||
log.IfErr(ulerr)
|
||||
if err == nil {
|
||||
log.IfErr(tx.Commit())
|
||||
log.Info("Database checks: completed")
|
||||
return nil
|
||||
}
|
||||
log.IfErr(tx.Rollback())
|
||||
func lockDB() (bool, error) {
|
||||
b := make([]byte, 2)
|
||||
_, err := rand.Read(b)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
log.Error("Database checks: failed: ", err)
|
||||
return err
|
||||
wait := ((time.Duration(b[0]) << 8) | time.Duration(b[1])) * time.Millisecond / 10 // up to 6.5 secs wait
|
||||
time.Sleep(wait)
|
||||
|
||||
tx, err := (*dbPtr).Beginx()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
_, err = tx.Exec("LOCK TABLE `config` WRITE;")
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
_, err = tx.Exec("UNLOCK TABLES;")
|
||||
log.IfErr(err)
|
||||
log.IfErr(tx.Commit())
|
||||
}()
|
||||
|
||||
_, err = tx.Exec("INSERT INTO `config` (`key`,`config`) " +
|
||||
fmt.Sprintf(`VALUES ('DBLOCK','{"pid": "%d"}');`, os.Getpid()))
|
||||
if err != nil {
|
||||
// good error would be "Error 1062: Duplicate entry 'DBLOCK' for key 'idx_config_area'"
|
||||
if strings.HasPrefix(err.Error(), "Error 1062:") {
|
||||
log.Info("Database locked by annother Documize instance")
|
||||
return false, nil
|
||||
}
|
||||
return false, err
|
||||
}
|
||||
|
||||
log.Info("Database locked by this Documize instance")
|
||||
return true, err // success!
|
||||
}
|
||||
|
||||
func unlockDB() error {
|
||||
tx, err := (*dbPtr).Beginx()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = tx.Exec("DELETE FROM `config` WHERE `key`='DBLOCK';")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
func migrateEnd(tx *sqlx.Tx, err error, amLeader bool) error {
|
||||
if amLeader {
|
||||
defer func() { log.IfErr(unlockDB()) }()
|
||||
if tx != nil {
|
||||
if err == nil {
|
||||
log.IfErr(tx.Commit())
|
||||
log.Info("Database checks: completed")
|
||||
return nil
|
||||
}
|
||||
log.IfErr(tx.Rollback())
|
||||
}
|
||||
log.Error("Database checks: failed: ", err)
|
||||
return err
|
||||
}
|
||||
return nil // not the leader, so ignore errors
|
||||
}
|
||||
|
||||
func getLastMigration(tx *sqlx.Tx) (lastMigration string, err error) {
|
||||
var stmt *sql.Stmt
|
||||
stmt, err = tx.Prepare("SELECT JSON_EXTRACT(`config`,'$.database') FROM `config` WHERE `key` = 'META';")
|
||||
if err == nil {
|
||||
defer utility.Close(stmt)
|
||||
var item = make([]uint8, 0)
|
||||
|
||||
row := stmt.QueryRow()
|
||||
|
||||
err = row.Scan(&item)
|
||||
if err == nil {
|
||||
if len(item) > 1 {
|
||||
q := []byte(`"`)
|
||||
lastMigration = string(bytes.TrimPrefix(bytes.TrimSuffix(item, q), q))
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Migrate the database as required, consolidated action.
|
||||
func Migrate(ConfigTableExists bool) error {
|
||||
|
||||
lastMigration := ""
|
||||
amLeader := false
|
||||
|
||||
if ConfigTableExists {
|
||||
var err error
|
||||
amLeader, err = lockDB()
|
||||
log.IfErr(err)
|
||||
} else {
|
||||
amLeader = true // what else can you do?
|
||||
}
|
||||
|
||||
tx, err := (*dbPtr).Beginx()
|
||||
if err != nil {
|
||||
return migrateEnd(tx, err)
|
||||
return migrateEnd(tx, err, amLeader)
|
||||
}
|
||||
|
||||
lastMigration := ""
|
||||
|
||||
if ConfigTableExists {
|
||||
_, err = tx.Exec("LOCK TABLE `config` WRITE;")
|
||||
lastMigration, err = getLastMigration(tx)
|
||||
if err != nil {
|
||||
return migrateEnd(tx, err)
|
||||
}
|
||||
|
||||
log.Info("Database checks: lock taken")
|
||||
|
||||
var stmt *sql.Stmt
|
||||
stmt, err = tx.Prepare("SELECT JSON_EXTRACT(`config`,'$.database') FROM `config` WHERE `key` = 'META';")
|
||||
if err == nil {
|
||||
defer utility.Close(stmt)
|
||||
var item = make([]uint8, 0)
|
||||
|
||||
row := stmt.QueryRow()
|
||||
|
||||
err = row.Scan(&item)
|
||||
if err != nil {
|
||||
return migrateEnd(tx, err)
|
||||
}
|
||||
|
||||
if len(item) > 1 {
|
||||
q := []byte(`"`)
|
||||
lastMigration = string(bytes.TrimPrefix(bytes.TrimSuffix(item, q), q))
|
||||
}
|
||||
return migrateEnd(tx, err, amLeader)
|
||||
}
|
||||
log.Info("Database checks: last previously applied file was " + lastMigration)
|
||||
}
|
||||
|
||||
mig, err := migrations(lastMigration)
|
||||
if err != nil {
|
||||
return migrateEnd(tx, err)
|
||||
return migrateEnd(tx, err, amLeader)
|
||||
}
|
||||
|
||||
if len(mig) == 0 {
|
||||
log.Info("Database checks: no updates to perform")
|
||||
return migrateEnd(tx, nil) // no migrations to perform
|
||||
return migrateEnd(tx, nil, amLeader) // no migrations to perform
|
||||
}
|
||||
log.Info("Database checks: will execute the following update files: " + strings.Join([]string(mig), ", "))
|
||||
|
||||
return migrateEnd(tx, mig.migrate(tx))
|
||||
if amLeader {
|
||||
log.Info("Database checks: will execute the following update files: " + strings.Join([]string(mig), ", "))
|
||||
return migrateEnd(tx, mig.migrate(tx), amLeader)
|
||||
}
|
||||
|
||||
// a follower instance
|
||||
targetMigration := string(mig[len(mig)-1])
|
||||
for targetMigration != lastMigration {
|
||||
time.Sleep(time.Second)
|
||||
log.Info("Waiting for database migration process to complete")
|
||||
tx.Rollback() // ignore error
|
||||
tx, err := (*dbPtr).Beginx() // need this in order to see the changed situation since last tx
|
||||
if err != nil {
|
||||
return migrateEnd(tx, err, amLeader)
|
||||
}
|
||||
lastMigration, _ = getLastMigration(tx)
|
||||
}
|
||||
|
||||
return migrateEnd(tx, nil, amLeader)
|
||||
}
|
||||
|
||||
func processSQLfile(tx *sqlx.Tx, buf []byte) error {
|
||||
|
|
File diff suppressed because one or more lines are too long
Loading…
Add table
Add a link
Reference in a new issue