Merge pull request #285 from ipfs/fix/snapshot-wait-for-index

Fix #275: Attempt to wait for updates when shutting down
This commit is contained in:
Hector Sanjuan 2018-01-09 15:39:37 +01:00 committed by GitHub
commit bda2674c4b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 56 additions and 6 deletions

View File

@ -21,7 +21,6 @@ script:
env:
global:
- secure: M3K3y9+D933tCda7+blW3qqVV8fA6PBDRdJoQvmQc1f0XYbWinJ+bAziFp6diKkF8sMQ+cPwLMONYJuaNT2h7/PkG+sIwF0PuUo5VVCbhGmSDrn2qOjmSnfawNs8wW31f44FQA8ICka1EFZcihohoIMf0e5xZ0tXA9jqw+ngPJiRnv4zyzC3r6t4JMAZcbS9w4KTYpIev5Yj72eCvk6lGjadSVCDVXo2sVs27tNt+BSgtMXiH6Sv8GLOnN2kFspGITgivHgB/jtU6QVtFXB+cbBJJAs3lUYnzmQZ5INecbjweYll07ilwFiCVNCX67+L15gpymKGJbQggloIGyTWrAOa2TMaB/bvblzwwQZ8wE5P3Rss5L0TFkUAcdU+3BUHM+TwV4e8F9x10v1PjgWNBRJQzd1sjKKgGUBCeyCY7VeYDKn9AXI5llISgY/AAfCZwm2cbckMHZZJciMjm+U3Q1FCF+rfhlvUcMG1VEj8r9cGpmWIRjFYVm0NmpUDDNjlC3/lUfTCOOJJyM254EUw63XxabbK6EtDN1yQe8kYRcXH//2rtEwgtMBgqHVY+OOkekzGz8Ra3EBkh6jXrAQL3zKu/GwRlK7/a1OU5MQ7dWcTjbx1AQ6Zfyjg5bZ+idqPgMbqM9Zn2+OaSby8HEEXS0QeZVooDVf/6wdYO4MQ/0A=
- verbose=t
after_success:
- openssl aes-256-cbc -K $encrypted_5a1cb914c6c9_key -iv $encrypted_5a1cb914c6c9_iv
-in .snapcraft/travis_snapcraft.cfg -out .snapcraft/snapcraft.cfg -d

View File

@ -379,23 +379,28 @@ func (api *API) peerRemoveHandler(w http.ResponseWriter, r *http.Request) {
func (api *API) pinHandler(w http.ResponseWriter, r *http.Request) {
if ps := parseCidOrError(w, r); ps.Cid != "" {
logger.Debugf("rest api pinHandler: %s", ps.Cid)
err := api.rpcClient.Call("",
"Cluster",
"Pin",
ps,
&struct{}{})
sendAcceptedResponse(w, err)
logger.Debug("rest api pinHandler done")
}
}
func (api *API) unpinHandler(w http.ResponseWriter, r *http.Request) {
if ps := parseCidOrError(w, r); ps.Cid != "" {
logger.Debugf("rest api unpinHandler: %s", ps.Cid)
err := api.rpcClient.Call("",
"Cluster",
"Unpin",
ps,
&struct{}{})
sendAcceptedResponse(w, err)
logger.Debug("rest api unpinHandler done")
}
}

View File

@ -40,6 +40,13 @@ var RaftLogCacheSize = 512
// This is used below because raft Observers panic on 32-bit.
const sixtyfour = uint64(^uint(0)) == ^uint64(0)
// How long we wait for updates during shutdown before snapshotting
var waitForUpdatesShutdownTimeout = 5 * time.Second
var waitForUpdatesInterval = 100 * time.Millisecond
// How many times to retry snapshotting when shutting down
var maxShutdownSnapshotRetries = 5
// raftWrapper performs all Raft-specific operations which are needed by
// Cluster but are not fulfilled by the consensus interface. It should contain
// most of the Raft-related stuff so it can be easily replaced in the future,
@ -286,7 +293,7 @@ func (rw *raftWrapper) WaitForUpdates(ctx context.Context) error {
if lai == li {
return nil
}
time.Sleep(500 * time.Millisecond)
time.Sleep(waitForUpdatesInterval)
}
}
}
@ -333,13 +340,52 @@ func (rw *raftWrapper) Snapshot() error {
return nil
}
// snapshotOnShutdown attempts to take a snapshot before a shutdown.
// Snapshotting might fail if the raft applied index is not the last index.
// This waits for the updates and tries to take a snapshot when the
// applied index is up to date.
// It will retry if the snapshot still fails, in case more updates have arrived.
// If waiting for updates times-out, it will not try anymore, since something
// is wrong. This is a best-effort solution as there is no way to tell Raft
// to stop processing entries because we want to take a snapshot before
// shutting down.
func (rw *raftWrapper) snapshotOnShutdown() error {
var err error
for i := 0; i < maxShutdownSnapshotRetries; i++ {
done := false
ctx, cancel := context.WithTimeout(context.Background(), waitForUpdatesShutdownTimeout)
err := rw.WaitForUpdates(ctx)
cancel()
if err != nil {
logger.Warning("timed out waiting for state updates before shutdown. Snapshotting may fail")
done = true // let's not wait for updates again
}
err = rw.Snapshot()
if err != nil {
err = errors.New("could not snapshot raft: " + err.Error())
} else {
err = nil
done = true
}
if done {
break
}
logger.Warningf("retrying to snapshot (%d/%d)...", i+1, maxShutdownSnapshotRetries)
}
return err
}
// Shutdown shutdown Raft and closes the BoltDB.
func (rw *raftWrapper) Shutdown() error {
errMsgs := ""
err := rw.Snapshot()
err := rw.snapshotOnShutdown()
if err != nil {
errMsgs += "could not snapshot raft: " + err.Error() + ".\n"
errMsgs += err.Error() + ".\n"
}
future := rw.raft.Shutdown()
err = future.Error()
if err != nil {

View File

@ -125,7 +125,7 @@ func (st *MapState) Marshal() ([]byte, error) {
vCodec := make([]byte, 1)
vCodec[0] = byte(st.Version)
ret := append(vCodec, buf.Bytes()...)
logger.Debugf("Marshal-- The final marshaled bytes: %x", ret)
// logger.Debugf("Marshal-- The final marshaled bytes: %x", ret)
return ret, nil
}
@ -136,7 +136,7 @@ func (st *MapState) Marshal() ([]byte, error) {
// version is not an error
func (st *MapState) Unmarshal(bs []byte) error {
// Check version byte
logger.Debugf("The incoming bytes to unmarshal: %x", bs)
// logger.Debugf("The incoming bytes to unmarshal: %x", bs)
v := int(bs[0])
logger.Debugf("The interpreted version: %d", v)
if v != Version { // snapshot is out of date