diff --git a/.travis.yml b/.travis.yml index 237144e9..39e0d0a8 100644 --- a/.travis.yml +++ b/.travis.yml @@ -21,7 +21,6 @@ script: env: global: - secure: M3K3y9+D933tCda7+blW3qqVV8fA6PBDRdJoQvmQc1f0XYbWinJ+bAziFp6diKkF8sMQ+cPwLMONYJuaNT2h7/PkG+sIwF0PuUo5VVCbhGmSDrn2qOjmSnfawNs8wW31f44FQA8ICka1EFZcihohoIMf0e5xZ0tXA9jqw+ngPJiRnv4zyzC3r6t4JMAZcbS9w4KTYpIev5Yj72eCvk6lGjadSVCDVXo2sVs27tNt+BSgtMXiH6Sv8GLOnN2kFspGITgivHgB/jtU6QVtFXB+cbBJJAs3lUYnzmQZ5INecbjweYll07ilwFiCVNCX67+L15gpymKGJbQggloIGyTWrAOa2TMaB/bvblzwwQZ8wE5P3Rss5L0TFkUAcdU+3BUHM+TwV4e8F9x10v1PjgWNBRJQzd1sjKKgGUBCeyCY7VeYDKn9AXI5llISgY/AAfCZwm2cbckMHZZJciMjm+U3Q1FCF+rfhlvUcMG1VEj8r9cGpmWIRjFYVm0NmpUDDNjlC3/lUfTCOOJJyM254EUw63XxabbK6EtDN1yQe8kYRcXH//2rtEwgtMBgqHVY+OOkekzGz8Ra3EBkh6jXrAQL3zKu/GwRlK7/a1OU5MQ7dWcTjbx1AQ6Zfyjg5bZ+idqPgMbqM9Zn2+OaSby8HEEXS0QeZVooDVf/6wdYO4MQ/0A= - - verbose=t after_success: - openssl aes-256-cbc -K $encrypted_5a1cb914c6c9_key -iv $encrypted_5a1cb914c6c9_iv -in .snapcraft/travis_snapcraft.cfg -out .snapcraft/snapcraft.cfg -d diff --git a/api/rest/restapi.go b/api/rest/restapi.go index 3392286e..7b7eea34 100644 --- a/api/rest/restapi.go +++ b/api/rest/restapi.go @@ -379,23 +379,28 @@ func (api *API) peerRemoveHandler(w http.ResponseWriter, r *http.Request) { func (api *API) pinHandler(w http.ResponseWriter, r *http.Request) { if ps := parseCidOrError(w, r); ps.Cid != "" { + logger.Debugf("rest api pinHandler: %s", ps.Cid) + err := api.rpcClient.Call("", "Cluster", "Pin", ps, &struct{}{}) sendAcceptedResponse(w, err) + logger.Debug("rest api pinHandler done") } } func (api *API) unpinHandler(w http.ResponseWriter, r *http.Request) { if ps := parseCidOrError(w, r); ps.Cid != "" { + logger.Debugf("rest api unpinHandler: %s", ps.Cid) err := api.rpcClient.Call("", "Cluster", "Unpin", ps, &struct{}{}) sendAcceptedResponse(w, err) + logger.Debug("rest api unpinHandler done") } } diff --git a/consensus/raft/raft.go b/consensus/raft/raft.go index dde2db51..7c6b2e0c 100644 --- a/consensus/raft/raft.go +++ b/consensus/raft/raft.go @@ -40,6 +40,13 @@ var RaftLogCacheSize = 512 // This is used below because raft Observers panic on 32-bit. const sixtyfour = uint64(^uint(0)) == ^uint64(0) +// How long we wait for updates during shutdown before snapshotting +var waitForUpdatesShutdownTimeout = 5 * time.Second +var waitForUpdatesInterval = 100 * time.Millisecond + +// How many times to retry snapshotting when shutting down +var maxShutdownSnapshotRetries = 5 + // raftWrapper performs all Raft-specific operations which are needed by // Cluster but are not fulfilled by the consensus interface. It should contain // most of the Raft-related stuff so it can be easily replaced in the future, @@ -286,7 +293,7 @@ func (rw *raftWrapper) WaitForUpdates(ctx context.Context) error { if lai == li { return nil } - time.Sleep(500 * time.Millisecond) + time.Sleep(waitForUpdatesInterval) } } } @@ -333,13 +340,52 @@ func (rw *raftWrapper) Snapshot() error { return nil } +// snapshotOnShutdown attempts to take a snapshot before a shutdown. +// Snapshotting might fail if the raft applied index is not the last index. +// This waits for the updates and tries to take a snapshot when the +// applied index is up to date. +// It will retry if the snapshot still fails, in case more updates have arrived. +// If waiting for updates times-out, it will not try anymore, since something +// is wrong. This is a best-effort solution as there is no way to tell Raft +// to stop processing entries because we want to take a snapshot before +// shutting down. +func (rw *raftWrapper) snapshotOnShutdown() error { + var err error + for i := 0; i < maxShutdownSnapshotRetries; i++ { + done := false + ctx, cancel := context.WithTimeout(context.Background(), waitForUpdatesShutdownTimeout) + err := rw.WaitForUpdates(ctx) + cancel() + if err != nil { + logger.Warning("timed out waiting for state updates before shutdown. Snapshotting may fail") + done = true // let's not wait for updates again + } + + err = rw.Snapshot() + if err != nil { + err = errors.New("could not snapshot raft: " + err.Error()) + } else { + err = nil + done = true + } + + if done { + break + } + logger.Warningf("retrying to snapshot (%d/%d)...", i+1, maxShutdownSnapshotRetries) + } + return err +} + // Shutdown shutdown Raft and closes the BoltDB. func (rw *raftWrapper) Shutdown() error { errMsgs := "" - err := rw.Snapshot() + + err := rw.snapshotOnShutdown() if err != nil { - errMsgs += "could not snapshot raft: " + err.Error() + ".\n" + errMsgs += err.Error() + ".\n" } + future := rw.raft.Shutdown() err = future.Error() if err != nil { diff --git a/state/mapstate/map_state.go b/state/mapstate/map_state.go index ab5f844d..fac8c429 100644 --- a/state/mapstate/map_state.go +++ b/state/mapstate/map_state.go @@ -125,7 +125,7 @@ func (st *MapState) Marshal() ([]byte, error) { vCodec := make([]byte, 1) vCodec[0] = byte(st.Version) ret := append(vCodec, buf.Bytes()...) - logger.Debugf("Marshal-- The final marshaled bytes: %x", ret) + // logger.Debugf("Marshal-- The final marshaled bytes: %x", ret) return ret, nil } @@ -136,7 +136,7 @@ func (st *MapState) Marshal() ([]byte, error) { // version is not an error func (st *MapState) Unmarshal(bs []byte) error { // Check version byte - logger.Debugf("The incoming bytes to unmarshal: %x", bs) + // logger.Debugf("The incoming bytes to unmarshal: %x", bs) v := int(bs[0]) logger.Debugf("The interpreted version: %d", v) if v != Version { // snapshot is out of date