mirror of
https://github.com/documize/community.git
synced 2025-07-21 22:29:41 +02:00
wip lock/unlock DB for migration
This commit is contained in:
parent
66dfc8c6ca
commit
c551647340
3 changed files with 84 additions and 30 deletions
|
@ -50,15 +50,8 @@ func init() {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
environment.GetString(&connectionString, "db", true,
|
environment.GetString(&connectionString, "db", true,
|
||||||
`'username:password@protocol(hostname:port)/databasename" for example "fred:bloggs@tcp(localhost:3306)/documize", or if a follower instance "-fred:bloggs@tcp(localhost:3306)/documize"`,
|
`'username:password@protocol(hostname:port)/databasename" for example "fred:bloggs@tcp(localhost:3306)/documize", or if a follower instance "fred:bloggs@tcp(localhost:3306)/documize"`,
|
||||||
func(*string, string) bool {
|
func(*string, string) bool {
|
||||||
isLeader := true
|
|
||||||
followerPrefix := "-"
|
|
||||||
if strings.HasPrefix(connectionString, followerPrefix) {
|
|
||||||
isLeader = false
|
|
||||||
connectionString = strings.TrimPrefix(connectionString, followerPrefix)
|
|
||||||
log.Info("This is a follower instance, no DB schema updates will be performed")
|
|
||||||
}
|
|
||||||
|
|
||||||
Db, err = sqlx.Open("mysql", stdConn(connectionString))
|
Db, err = sqlx.Open("mysql", stdConn(connectionString))
|
||||||
|
|
||||||
|
@ -81,7 +74,7 @@ func init() {
|
||||||
// go into setup mode if required
|
// go into setup mode if required
|
||||||
if web.SiteMode != web.SiteModeOffline {
|
if web.SiteMode != web.SiteModeOffline {
|
||||||
if database.Check(Db, connectionString) {
|
if database.Check(Db, connectionString) {
|
||||||
if err := database.Migrate(true /* the config table exists */, isLeader); err != nil {
|
if err := database.Migrate(true /* the config table exists */); err != nil {
|
||||||
log.Error("Unable to run database migration: ", err)
|
log.Error("Unable to run database migration: ", err)
|
||||||
os.Exit(2)
|
os.Exit(2)
|
||||||
}
|
}
|
||||||
|
|
|
@ -116,8 +116,7 @@ func Create(w http.ResponseWriter, r *http.Request) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = Migrate(false, /* no tables exist yet */
|
if err = Migrate(false /* no tables exist yet */); err != nil {
|
||||||
true /* and we must be the only instance 1st time through */); err != nil {
|
|
||||||
log.Error("database.Create()", err)
|
log.Error("database.Create()", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,8 +13,10 @@ package database
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"crypto/rand"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"errors"
|
"fmt"
|
||||||
|
"os"
|
||||||
"regexp"
|
"regexp"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
|
@ -96,7 +98,61 @@ func (m migrationsT) migrate(tx *sqlx.Tx) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func migrateEnd(tx *sqlx.Tx, err error) error {
|
func lockDB() (bool, error) {
|
||||||
|
b := make([]byte, 2)
|
||||||
|
_, err := rand.Read(b)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
time.Sleep(time.Duration(b[0]) * time.Duration(b[1]) * time.Millisecond / 10) // up to 6.5 secs wait
|
||||||
|
|
||||||
|
tx, err := (*dbPtr).Beginx()
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = tx.Exec("LOCK TABLE `config` WRITE;")
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
_, err = tx.Exec("UNLOCK TABLES;")
|
||||||
|
log.IfErr(err)
|
||||||
|
log.IfErr(tx.Commit())
|
||||||
|
}()
|
||||||
|
|
||||||
|
_, err = tx.Exec("INSERT INTO `config` (`key`,`config`) " +
|
||||||
|
fmt.Sprintf(`VALUES ('DBLOCK','{"pid": "%d"}');`, os.Getpid()))
|
||||||
|
if err != nil {
|
||||||
|
// good error might be "Error 1062: Duplicate entry 'DBLOCK' for key 'idx_config_area'"
|
||||||
|
if strings.HasPrefix(err.Error(), "Error 1062:") {
|
||||||
|
log.Info("Database locked by annother Documize instance")
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info("Database locked by this Documize instance")
|
||||||
|
return true, err // success!
|
||||||
|
}
|
||||||
|
|
||||||
|
func unlockDB() error {
|
||||||
|
tx, err := (*dbPtr).Beginx()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
_, err = tx.Exec("DELETE FROM `config` WHERE `key`='DBLOCK';")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return tx.Commit()
|
||||||
|
}
|
||||||
|
|
||||||
|
func migrateEnd(tx *sqlx.Tx, err error, amLeader bool) error {
|
||||||
|
if amLeader {
|
||||||
|
defer func() { log.IfErr(unlockDB()) }()
|
||||||
|
}
|
||||||
if tx != nil {
|
if tx != nil {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
log.IfErr(tx.Commit())
|
log.IfErr(tx.Commit())
|
||||||
|
@ -130,56 +186,62 @@ func getLastMigration(tx *sqlx.Tx) (lastMigration string, err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Migrate the database as required, consolidated action.
|
// Migrate the database as required, consolidated action.
|
||||||
func Migrate(ConfigTableExists, amLeader bool) error {
|
func Migrate(ConfigTableExists bool) error {
|
||||||
|
|
||||||
if !ConfigTableExists && !amLeader {
|
amLeader := false
|
||||||
return errors.New("database.Migrate() does not work on empty databases for follower instances")
|
|
||||||
|
if ConfigTableExists {
|
||||||
|
var err error
|
||||||
|
amLeader, err = lockDB()
|
||||||
|
log.IfErr(err)
|
||||||
|
} else {
|
||||||
|
amLeader = true // what else can you do?
|
||||||
|
}
|
||||||
|
|
||||||
|
tx, err := (*dbPtr).Beginx()
|
||||||
|
if err != nil {
|
||||||
|
return migrateEnd(tx, err, amLeader)
|
||||||
}
|
}
|
||||||
|
|
||||||
lastMigration := ""
|
lastMigration := ""
|
||||||
|
|
||||||
tx, err := (*dbPtr).Beginx()
|
|
||||||
if err != nil {
|
|
||||||
return migrateEnd(tx, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if ConfigTableExists {
|
if ConfigTableExists {
|
||||||
lastMigration, err = getLastMigration(tx)
|
lastMigration, err = getLastMigration(tx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return migrateEnd(tx, err)
|
return migrateEnd(tx, err, amLeader)
|
||||||
}
|
}
|
||||||
log.Info("Database checks: last previously applied file was " + lastMigration)
|
log.Info("Database checks: last previously applied file was " + lastMigration)
|
||||||
}
|
}
|
||||||
|
|
||||||
mig, err := migrations(lastMigration)
|
mig, err := migrations(lastMigration)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return migrateEnd(tx, err)
|
return migrateEnd(tx, err, amLeader)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(mig) == 0 {
|
if len(mig) == 0 {
|
||||||
log.Info("Database checks: no updates to perform")
|
log.Info("Database checks: no updates to perform")
|
||||||
return migrateEnd(tx, nil) // no migrations to perform
|
return migrateEnd(tx, nil, amLeader) // no migrations to perform
|
||||||
}
|
}
|
||||||
|
|
||||||
if amLeader {
|
if amLeader {
|
||||||
log.Info("Database checks: will execute the following update files: " + strings.Join([]string(mig), ", "))
|
log.Info("Database checks: will execute the following update files: " + strings.Join([]string(mig), ", "))
|
||||||
return migrateEnd(tx, mig.migrate(tx))
|
return migrateEnd(tx, mig.migrate(tx), amLeader)
|
||||||
}
|
}
|
||||||
|
|
||||||
// a follower instance
|
// a follower instance
|
||||||
targetMigration := string(mig[len(mig)-1])
|
targetMigration := string(mig[len(mig)-1])
|
||||||
for targetMigration != lastMigration {
|
for targetMigration != lastMigration {
|
||||||
time.Sleep(time.Second)
|
time.Sleep(time.Second)
|
||||||
log.Info("Waiting for migration process to complete")
|
log.Info("Waiting for database migration process to complete")
|
||||||
tx.Rollback() // ignore error
|
tx.Rollback() // ignore error
|
||||||
tx, err := (*dbPtr).Beginx()
|
tx, err := (*dbPtr).Beginx() // need this in order to see the changed situation since last tx
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return migrateEnd(tx, err)
|
return migrateEnd(tx, err, amLeader)
|
||||||
}
|
}
|
||||||
lastMigration, _ = getLastMigration(tx)
|
lastMigration, _ = getLastMigration(tx)
|
||||||
}
|
}
|
||||||
|
|
||||||
return migrateEnd(tx, nil)
|
return migrateEnd(tx, nil, amLeader)
|
||||||
}
|
}
|
||||||
|
|
||||||
func processSQLfile(tx *sqlx.Tx, buf []byte) error {
|
func processSQLfile(tx *sqlx.Tx, buf []byte) error {
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue