Issue #51: Save a backup on shutdown
This adds snapshot and restore methods to state and uses the snapshot one to save a copy of the state when shutting down. Right now, this is not used for anything else. Some lines performing a migration, but this is only an idea of how it could work. License: MIT Signed-off-by: Hector Sanjuan <hector@protocol.ai>
This commit is contained in:
parent
a3c705a01d
commit
718b2177ce
30
cluster.go
30
cluster.go
|
@ -4,6 +4,8 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -407,6 +409,7 @@ func (c *Cluster) Shutdown() error {
|
|||
}
|
||||
|
||||
c.peerManager.savePeers()
|
||||
c.backupState()
|
||||
|
||||
if err := c.monitor.Shutdown(); err != nil {
|
||||
logger.Errorf("error stopping monitor: %s", err)
|
||||
|
@ -1086,3 +1089,30 @@ func (c *Cluster) allocate(hash *cid.Cid, repl int) ([]peer.ID, error) {
|
|||
return append(validAllocations, candidateAllocs[0:needed]...), nil
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cluster) backupState() {
|
||||
if c.config.path == "" {
|
||||
logger.Warning("Config.path unset. Skipping backup")
|
||||
return
|
||||
}
|
||||
|
||||
folder := filepath.Dir(c.config.path)
|
||||
err := os.MkdirAll(filepath.Join(folder, "backups"), 0700)
|
||||
if err != nil {
|
||||
logger.Error(err)
|
||||
logger.Error("skipping backup")
|
||||
return
|
||||
}
|
||||
fname := time.Now().UTC().Format("20060102_15:04:05")
|
||||
f, err := os.Create(filepath.Join(folder, "backups", fname))
|
||||
if err != nil {
|
||||
logger.Error(err)
|
||||
return
|
||||
}
|
||||
err = c.state.Snapshot(f)
|
||||
if err != nil {
|
||||
logger.Error(err)
|
||||
return
|
||||
}
|
||||
defer f.Close()
|
||||
}
|
||||
|
|
|
@ -9,6 +9,8 @@
|
|||
package ipfscluster
|
||||
|
||||
import (
|
||||
"io"
|
||||
|
||||
rpc "github.com/hsanjuan/go-libp2p-gorpc"
|
||||
cid "github.com/ipfs/go-cid"
|
||||
peer "github.com/libp2p/go-libp2p-peer"
|
||||
|
@ -68,6 +70,10 @@ type State interface {
|
|||
Has(*cid.Cid) bool
|
||||
// Get returns the information attacthed to this pin
|
||||
Get(*cid.Cid) api.Pin
|
||||
// Snapshot writes a snapshot of the state to a writer
|
||||
Snapshot(w io.Writer) error
|
||||
// Restore restores a snapshot from a reader
|
||||
Restore(r io.Reader) error
|
||||
}
|
||||
|
||||
// PinTracker represents a component which tracks the status of
|
||||
|
|
|
@ -1,6 +1,9 @@
|
|||
package mapstate
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"sync"
|
||||
|
||||
"github.com/ipfs/ipfs-cluster/api"
|
||||
|
@ -10,7 +13,7 @@ import (
|
|||
|
||||
// Version is the map state Version. States with old versions should
|
||||
// perform an upgrade before.
|
||||
const Version = 1
|
||||
const Version = 2
|
||||
|
||||
// MapState is a very simple database to store the state of the system
|
||||
// using a Go map. It is thread safe. It implements the State interface.
|
||||
|
@ -23,7 +26,8 @@ type MapState struct {
|
|||
// NewMapState initializes the internal map and returns a new MapState object.
|
||||
func NewMapState() *MapState {
|
||||
return &MapState{
|
||||
PinMap: make(map[string]api.PinSerial),
|
||||
PinMap: make(map[string]api.PinSerial),
|
||||
Version: Version,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -68,7 +72,39 @@ func (st *MapState) List() []api.Pin {
|
|||
defer st.pinMux.RUnlock()
|
||||
cids := make([]api.Pin, 0, len(st.PinMap))
|
||||
for _, v := range st.PinMap {
|
||||
if v.Cid == "" {
|
||||
continue
|
||||
}
|
||||
cids = append(cids, v.ToPin())
|
||||
}
|
||||
return cids
|
||||
}
|
||||
|
||||
// Snapshot dumps the MapState to the given writer, in pretty json
|
||||
// format.
|
||||
func (st *MapState) Snapshot(w io.Writer) error {
|
||||
st.pinMux.RLock()
|
||||
defer st.pinMux.RUnlock()
|
||||
enc := json.NewEncoder(w)
|
||||
enc.SetIndent("", " ")
|
||||
return enc.Encode(st)
|
||||
}
|
||||
|
||||
func (st *MapState) Restore(r io.Reader) error {
|
||||
snap, err := ioutil.ReadAll(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var vonly struct{ Version int }
|
||||
err = json.Unmarshal(snap, &vonly)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if vonly.Version == Version {
|
||||
// we are good
|
||||
err := json.Unmarshal(snap, st)
|
||||
return err
|
||||
} else {
|
||||
return st.migrateFrom(vonly.Version, snap)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
package mapstate
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
cid "github.com/ipfs/go-cid"
|
||||
|
@ -66,3 +68,43 @@ func TestList(t *testing.T) {
|
|||
t.Error("returned something different")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSnapshotRestore(t *testing.T) {
|
||||
ms := NewMapState()
|
||||
ms.Add(c)
|
||||
var buf bytes.Buffer
|
||||
err := ms.Snapshot(&buf)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
ms2 := NewMapState()
|
||||
err = ms2.Restore(&buf)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
get := ms2.Get(c.Cid)
|
||||
if get.Allocations[0] != testPeerID1 {
|
||||
t.Error("expected different peer id")
|
||||
}
|
||||
}
|
||||
|
||||
func TestMigrateFromV1(t *testing.T) {
|
||||
v1 := []byte(fmt.Sprintf(`{
|
||||
"Version": 1,
|
||||
"PinMap": {
|
||||
"%s": {}
|
||||
}
|
||||
}
|
||||
`, c.Cid))
|
||||
buf := bytes.NewBuffer(v1)
|
||||
ms := NewMapState()
|
||||
err := ms.Restore(buf)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
get := ms.Get(c.Cid)
|
||||
if get.ReplicationFactor != -1 || !get.Cid.Equals(c.Cid) {
|
||||
t.Error("expected something different")
|
||||
t.Logf("%+v", get)
|
||||
}
|
||||
}
|
||||
|
|
34
state/mapstate/migrate.go
Normal file
34
state/mapstate/migrate.go
Normal file
|
@ -0,0 +1,34 @@
|
|||
package mapstate
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
|
||||
"github.com/ipfs/ipfs-cluster/api"
|
||||
)
|
||||
|
||||
type mapStateV1 struct {
|
||||
Version int
|
||||
PinMap map[string]struct{}
|
||||
}
|
||||
|
||||
func (st *MapState) migrateFrom(version int, snap []byte) error {
|
||||
switch version {
|
||||
case 1:
|
||||
var mstv1 mapStateV1
|
||||
err := json.Unmarshal(snap, &mstv1)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for k := range mstv1.PinMap {
|
||||
st.PinMap[k] = api.PinSerial{
|
||||
Cid: k,
|
||||
Allocations: []string{},
|
||||
ReplicationFactor: -1,
|
||||
}
|
||||
}
|
||||
return nil
|
||||
default:
|
||||
return errors.New("version migration not supported")
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user