diff --git a/core/api/request/init.go b/core/api/request/init.go index 216e8577..54f988ce 100644 --- a/core/api/request/init.go +++ b/core/api/request/init.go @@ -50,15 +50,8 @@ func init() { var err error environment.GetString(&connectionString, "db", true, - `'username:password@protocol(hostname:port)/databasename" for example "fred:bloggs@tcp(localhost:3306)/documize", or if a follower instance "-fred:bloggs@tcp(localhost:3306)/documize"`, + `'username:password@protocol(hostname:port)/databasename" for example "fred:bloggs@tcp(localhost:3306)/documize", or if a follower instance "fred:bloggs@tcp(localhost:3306)/documize"`, func(*string, string) bool { - isLeader := true - followerPrefix := "-" - if strings.HasPrefix(connectionString, followerPrefix) { - isLeader = false - connectionString = strings.TrimPrefix(connectionString, followerPrefix) - log.Info("This is a follower instance, no DB schema updates will be performed") - } Db, err = sqlx.Open("mysql", stdConn(connectionString)) @@ -81,7 +74,7 @@ func init() { // go into setup mode if required if web.SiteMode != web.SiteModeOffline { if database.Check(Db, connectionString) { - if err := database.Migrate(true /* the config table exists */, isLeader); err != nil { + if err := database.Migrate(true /* the config table exists */); err != nil { log.Error("Unable to run database migration: ", err) os.Exit(2) } diff --git a/core/database/create.go b/core/database/create.go index ba1fa87c..4c8c953b 100644 --- a/core/database/create.go +++ b/core/database/create.go @@ -116,8 +116,7 @@ func Create(w http.ResponseWriter, r *http.Request) { return } - if err = Migrate(false, /* no tables exist yet */ - true /* and we must be the only instance 1st time through */); err != nil { + if err = Migrate(false /* no tables exist yet */); err != nil { log.Error("database.Create()", err) return } diff --git a/core/database/migrate.go b/core/database/migrate.go index fdf70f61..e9b6dea3 100644 --- a/core/database/migrate.go +++ b/core/database/migrate.go @@ -13,8 +13,10 @@ package database import ( "bytes" + "crypto/rand" "database/sql" - "errors" + "fmt" + "os" "regexp" "sort" "strings" @@ -96,7 +98,61 @@ func (m migrationsT) migrate(tx *sqlx.Tx) error { return nil } -func migrateEnd(tx *sqlx.Tx, err error) error { +func lockDB() (bool, error) { + b := make([]byte, 2) + _, err := rand.Read(b) + if err != nil { + return false, err + } + time.Sleep(time.Duration(b[0]) * time.Duration(b[1]) * time.Millisecond / 10) // up to 6.5 secs wait + + tx, err := (*dbPtr).Beginx() + if err != nil { + return false, err + } + + _, err = tx.Exec("LOCK TABLE `config` WRITE;") + if err != nil { + return false, err + } + + defer func() { + _, err = tx.Exec("UNLOCK TABLES;") + log.IfErr(err) + log.IfErr(tx.Commit()) + }() + + _, err = tx.Exec("INSERT INTO `config` (`key`,`config`) " + + fmt.Sprintf(`VALUES ('DBLOCK','{"pid": "%d"}');`, os.Getpid())) + if err != nil { + // good error might be "Error 1062: Duplicate entry 'DBLOCK' for key 'idx_config_area'" + if strings.HasPrefix(err.Error(), "Error 1062:") { + log.Info("Database locked by annother Documize instance") + return false, nil + } + return false, err + } + + log.Info("Database locked by this Documize instance") + return true, err // success! +} + +func unlockDB() error { + tx, err := (*dbPtr).Beginx() + if err != nil { + return err + } + _, err = tx.Exec("DELETE FROM `config` WHERE `key`='DBLOCK';") + if err != nil { + return err + } + return tx.Commit() +} + +func migrateEnd(tx *sqlx.Tx, err error, amLeader bool) error { + if amLeader { + defer func() { log.IfErr(unlockDB()) }() + } if tx != nil { if err == nil { log.IfErr(tx.Commit()) @@ -130,56 +186,62 @@ func getLastMigration(tx *sqlx.Tx) (lastMigration string, err error) { } // Migrate the database as required, consolidated action. -func Migrate(ConfigTableExists, amLeader bool) error { +func Migrate(ConfigTableExists bool) error { - if !ConfigTableExists && !amLeader { - return errors.New("database.Migrate() does not work on empty databases for follower instances") + amLeader := false + + if ConfigTableExists { + var err error + amLeader, err = lockDB() + log.IfErr(err) + } else { + amLeader = true // what else can you do? + } + + tx, err := (*dbPtr).Beginx() + if err != nil { + return migrateEnd(tx, err, amLeader) } lastMigration := "" - tx, err := (*dbPtr).Beginx() - if err != nil { - return migrateEnd(tx, err) - } - if ConfigTableExists { lastMigration, err = getLastMigration(tx) if err != nil { - return migrateEnd(tx, err) + return migrateEnd(tx, err, amLeader) } log.Info("Database checks: last previously applied file was " + lastMigration) } mig, err := migrations(lastMigration) if err != nil { - return migrateEnd(tx, err) + return migrateEnd(tx, err, amLeader) } if len(mig) == 0 { log.Info("Database checks: no updates to perform") - return migrateEnd(tx, nil) // no migrations to perform + return migrateEnd(tx, nil, amLeader) // no migrations to perform } if amLeader { log.Info("Database checks: will execute the following update files: " + strings.Join([]string(mig), ", ")) - return migrateEnd(tx, mig.migrate(tx)) + return migrateEnd(tx, mig.migrate(tx), amLeader) } // a follower instance targetMigration := string(mig[len(mig)-1]) for targetMigration != lastMigration { time.Sleep(time.Second) - log.Info("Waiting for migration process to complete") - tx.Rollback() // ignore error - tx, err := (*dbPtr).Beginx() + log.Info("Waiting for database migration process to complete") + tx.Rollback() // ignore error + tx, err := (*dbPtr).Beginx() // need this in order to see the changed situation since last tx if err != nil { - return migrateEnd(tx, err) + return migrateEnd(tx, err, amLeader) } lastMigration, _ = getLastMigration(tx) } - return migrateEnd(tx, nil) + return migrateEnd(tx, nil, amLeader) } func processSQLfile(tx *sqlx.Tx, buf []byte) error {