Fix #275: Wait for Raft updates before snapshotting on shutdown
Raft will fail to take a snapshot when applied index is different from the last index. Therefore, we wait for all updates to be aplied before snapshotting. If still it doesn't work, we retry a few times. License: MIT Signed-off-by: Hector Sanjuan <code@hector.link>
This commit is contained in:
parent
abebe498cb
commit
89b8fe106e
|
@ -21,7 +21,6 @@ script:
|
|||
env:
|
||||
global:
|
||||
- secure: M3K3y9+D933tCda7+blW3qqVV8fA6PBDRdJoQvmQc1f0XYbWinJ+bAziFp6diKkF8sMQ+cPwLMONYJuaNT2h7/PkG+sIwF0PuUo5VVCbhGmSDrn2qOjmSnfawNs8wW31f44FQA8ICka1EFZcihohoIMf0e5xZ0tXA9jqw+ngPJiRnv4zyzC3r6t4JMAZcbS9w4KTYpIev5Yj72eCvk6lGjadSVCDVXo2sVs27tNt+BSgtMXiH6Sv8GLOnN2kFspGITgivHgB/jtU6QVtFXB+cbBJJAs3lUYnzmQZ5INecbjweYll07ilwFiCVNCX67+L15gpymKGJbQggloIGyTWrAOa2TMaB/bvblzwwQZ8wE5P3Rss5L0TFkUAcdU+3BUHM+TwV4e8F9x10v1PjgWNBRJQzd1sjKKgGUBCeyCY7VeYDKn9AXI5llISgY/AAfCZwm2cbckMHZZJciMjm+U3Q1FCF+rfhlvUcMG1VEj8r9cGpmWIRjFYVm0NmpUDDNjlC3/lUfTCOOJJyM254EUw63XxabbK6EtDN1yQe8kYRcXH//2rtEwgtMBgqHVY+OOkekzGz8Ra3EBkh6jXrAQL3zKu/GwRlK7/a1OU5MQ7dWcTjbx1AQ6Zfyjg5bZ+idqPgMbqM9Zn2+OaSby8HEEXS0QeZVooDVf/6wdYO4MQ/0A=
|
||||
- verbose=t
|
||||
after_success:
|
||||
- openssl aes-256-cbc -K $encrypted_5a1cb914c6c9_key -iv $encrypted_5a1cb914c6c9_iv
|
||||
-in .snapcraft/travis_snapcraft.cfg -out .snapcraft/snapcraft.cfg -d
|
||||
|
|
|
@ -379,23 +379,28 @@ func (api *API) peerRemoveHandler(w http.ResponseWriter, r *http.Request) {
|
|||
|
||||
func (api *API) pinHandler(w http.ResponseWriter, r *http.Request) {
|
||||
if ps := parseCidOrError(w, r); ps.Cid != "" {
|
||||
logger.Debugf("rest api pinHandler: %s", ps.Cid)
|
||||
|
||||
err := api.rpcClient.Call("",
|
||||
"Cluster",
|
||||
"Pin",
|
||||
ps,
|
||||
&struct{}{})
|
||||
sendAcceptedResponse(w, err)
|
||||
logger.Debug("rest api pinHandler done")
|
||||
}
|
||||
}
|
||||
|
||||
func (api *API) unpinHandler(w http.ResponseWriter, r *http.Request) {
|
||||
if ps := parseCidOrError(w, r); ps.Cid != "" {
|
||||
logger.Debugf("rest api unpinHandler: %s", ps.Cid)
|
||||
err := api.rpcClient.Call("",
|
||||
"Cluster",
|
||||
"Unpin",
|
||||
ps,
|
||||
&struct{}{})
|
||||
sendAcceptedResponse(w, err)
|
||||
logger.Debug("rest api unpinHandler done")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -40,6 +40,13 @@ var RaftLogCacheSize = 512
|
|||
// This is used below because raft Observers panic on 32-bit.
|
||||
const sixtyfour = uint64(^uint(0)) == ^uint64(0)
|
||||
|
||||
// How long we wait for updates during shutdown before snapshotting
|
||||
var waitForUpdatesShutdownTimeout = 5 * time.Second
|
||||
var waitForUpdatesInterval = 100 * time.Millisecond
|
||||
|
||||
// How many times to retry snapshotting when shutting down
|
||||
var maxShutdownSnapshotRetries = 5
|
||||
|
||||
// raftWrapper performs all Raft-specific operations which are needed by
|
||||
// Cluster but are not fulfilled by the consensus interface. It should contain
|
||||
// most of the Raft-related stuff so it can be easily replaced in the future,
|
||||
|
@ -286,7 +293,7 @@ func (rw *raftWrapper) WaitForUpdates(ctx context.Context) error {
|
|||
if lai == li {
|
||||
return nil
|
||||
}
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
time.Sleep(waitForUpdatesInterval)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -333,13 +340,52 @@ func (rw *raftWrapper) Snapshot() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// snapshotOnShutdown attempts to take a snapshot before a shutdown.
|
||||
// Snapshotting might fail if the raft applied index is not the last index.
|
||||
// This waits for the updates and tries to take a snapshot when the
|
||||
// applied index is up to date.
|
||||
// It will retry if the snapshot still fails, in case more updates have arrived.
|
||||
// If waiting for updates times-out, it will not try anymore, since something
|
||||
// is wrong. This is a best-effort solution as there is no way to tell Raft
|
||||
// to stop processing entries because we want to take a snapshot before
|
||||
// shutting down.
|
||||
func (rw *raftWrapper) snapshotOnShutdown() error {
|
||||
var err error
|
||||
for i := 0; i < maxShutdownSnapshotRetries; i++ {
|
||||
done := false
|
||||
ctx, cancel := context.WithTimeout(context.Background(), waitForUpdatesShutdownTimeout)
|
||||
err := rw.WaitForUpdates(ctx)
|
||||
cancel()
|
||||
if err != nil {
|
||||
logger.Warning("timed out waiting for state updates before shutdown. Snapshotting may fail")
|
||||
done = true // let's not wait for updates again
|
||||
}
|
||||
|
||||
err = rw.Snapshot()
|
||||
if err != nil {
|
||||
err = errors.New("could not snapshot raft: " + err.Error())
|
||||
} else {
|
||||
err = nil
|
||||
done = true
|
||||
}
|
||||
|
||||
if done {
|
||||
break
|
||||
}
|
||||
logger.Warningf("retrying to snapshot (%d/%d)...", i+1, maxShutdownSnapshotRetries)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Shutdown shutdown Raft and closes the BoltDB.
|
||||
func (rw *raftWrapper) Shutdown() error {
|
||||
errMsgs := ""
|
||||
err := rw.Snapshot()
|
||||
|
||||
err := rw.snapshotOnShutdown()
|
||||
if err != nil {
|
||||
errMsgs += "could not snapshot raft: " + err.Error() + ".\n"
|
||||
errMsgs += err.Error() + ".\n"
|
||||
}
|
||||
|
||||
future := rw.raft.Shutdown()
|
||||
err = future.Error()
|
||||
if err != nil {
|
||||
|
|
|
@ -125,7 +125,7 @@ func (st *MapState) Marshal() ([]byte, error) {
|
|||
vCodec := make([]byte, 1)
|
||||
vCodec[0] = byte(st.Version)
|
||||
ret := append(vCodec, buf.Bytes()...)
|
||||
logger.Debugf("Marshal-- The final marshaled bytes: %x", ret)
|
||||
// logger.Debugf("Marshal-- The final marshaled bytes: %x", ret)
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
|
@ -136,7 +136,7 @@ func (st *MapState) Marshal() ([]byte, error) {
|
|||
// version is not an error
|
||||
func (st *MapState) Unmarshal(bs []byte) error {
|
||||
// Check version byte
|
||||
logger.Debugf("The incoming bytes to unmarshal: %x", bs)
|
||||
// logger.Debugf("The incoming bytes to unmarshal: %x", bs)
|
||||
v := int(bs[0])
|
||||
logger.Debugf("The interpreted version: %d", v)
|
||||
if v != Version { // snapshot is out of date
|
||||
|
|
Loading…
Reference in New Issue
Block a user