State import: allow replication factor and allocations overwrite on import
This commit is contained in:
parent
cdc01ebf3c
commit
f4c3fc4ce5
|
@ -13,6 +13,7 @@ import (
|
|||
"strings"
|
||||
|
||||
ipfscluster "github.com/ipfs/ipfs-cluster"
|
||||
"github.com/ipfs/ipfs-cluster/api"
|
||||
"github.com/ipfs/ipfs-cluster/cmdutils"
|
||||
"github.com/ipfs/ipfs-cluster/pstoremgr"
|
||||
"github.com/ipfs/ipfs-cluster/version"
|
||||
|
@ -506,6 +507,20 @@ to import. If no argument is provided, stdin will be used.
|
|||
Name: "force, f",
|
||||
Usage: "skips confirmation prompt",
|
||||
},
|
||||
cli.IntFlag{
|
||||
Name: "replication-min, rmin",
|
||||
Value: 0,
|
||||
Usage: "Overwrite replication-factor-min for all pins on import",
|
||||
},
|
||||
cli.IntFlag{
|
||||
Name: "replication-max, rmax",
|
||||
Value: 0,
|
||||
Usage: "Overwrite replication-factor-max for all pins on import",
|
||||
},
|
||||
cli.StringFlag{
|
||||
Name: "allocations, allocs",
|
||||
Usage: "Overwrite allocations for all pins on import. Comma-separated list of peer IDs",
|
||||
},
|
||||
},
|
||||
Action: func(c *cli.Context) error {
|
||||
locker.lock()
|
||||
|
@ -517,6 +532,13 @@ to import. If no argument is provided, stdin will be used.
|
|||
return nil
|
||||
}
|
||||
|
||||
// importState allows overwriting of some options on import
|
||||
opts := api.PinOptions{
|
||||
ReplicationFactorMin: c.Int("replication-min"),
|
||||
ReplicationFactorMax: c.Int("replication-max"),
|
||||
UserAllocations: api.StringsToPeers(strings.Split(c.String("allocations"), ",")),
|
||||
}
|
||||
|
||||
mgr := getStateManager()
|
||||
|
||||
// Get the importing file path
|
||||
|
@ -532,7 +554,7 @@ to import. If no argument is provided, stdin will be used.
|
|||
}
|
||||
defer r.Close()
|
||||
|
||||
checkErr("importing state", mgr.ImportState(r))
|
||||
checkErr("importing state", mgr.ImportState(r, opts))
|
||||
logger.Info("state successfully imported. Make sure all peers have consistent states")
|
||||
return nil
|
||||
},
|
||||
|
|
|
@ -24,7 +24,7 @@ import (
|
|||
// StateManager is the interface that allows to import, export and clean
|
||||
// different cluster states depending on the consensus component used.
|
||||
type StateManager interface {
|
||||
ImportState(io.Reader) error
|
||||
ImportState(io.Reader, api.PinOptions) error
|
||||
ExportState(io.Writer) error
|
||||
GetStore() (ds.Datastore, error)
|
||||
GetOfflineState(ds.Datastore) (state.State, error)
|
||||
|
@ -73,7 +73,7 @@ func (raftsm *raftStateManager) GetOfflineState(store ds.Datastore) (state.State
|
|||
return raft.OfflineState(raftsm.cfgs.Raft, store)
|
||||
}
|
||||
|
||||
func (raftsm *raftStateManager) ImportState(r io.Reader) error {
|
||||
func (raftsm *raftStateManager) ImportState(r io.Reader, opts api.PinOptions) error {
|
||||
err := raftsm.Clean()
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -88,7 +88,7 @@ func (raftsm *raftStateManager) ImportState(r io.Reader) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = importState(r, st)
|
||||
err = importState(r, st, opts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -138,7 +138,7 @@ func (crdtsm *crdtStateManager) GetOfflineState(store ds.Datastore) (state.State
|
|||
return crdt.OfflineState(crdtsm.cfgs.Crdt, store)
|
||||
}
|
||||
|
||||
func (crdtsm *crdtStateManager) ImportState(r io.Reader) error {
|
||||
func (crdtsm *crdtStateManager) ImportState(r io.Reader, opts api.PinOptions) error {
|
||||
err := crdtsm.Clean()
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -155,7 +155,7 @@ func (crdtsm *crdtStateManager) ImportState(r io.Reader) error {
|
|||
}
|
||||
batchingSt := st.(state.BatchingState)
|
||||
|
||||
err = importState(r, batchingSt)
|
||||
err = importState(r, batchingSt, opts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -185,7 +185,7 @@ func (crdtsm *crdtStateManager) Clean() error {
|
|||
return crdt.Clean(context.Background(), crdtsm.cfgs.Crdt, store)
|
||||
}
|
||||
|
||||
func importState(r io.Reader, st state.State) error {
|
||||
func importState(r io.Reader, st state.State, opts api.PinOptions) error {
|
||||
ctx := context.Background()
|
||||
dec := json.NewDecoder(r)
|
||||
for {
|
||||
|
@ -197,6 +197,22 @@ func importState(r io.Reader, st state.State) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if opts.ReplicationFactorMax > 0 {
|
||||
pin.ReplicationFactorMax = opts.ReplicationFactorMax
|
||||
}
|
||||
|
||||
if opts.ReplicationFactorMin > 0 {
|
||||
pin.ReplicationFactorMin = opts.ReplicationFactorMin
|
||||
}
|
||||
|
||||
if len(opts.UserAllocations) > 0 {
|
||||
// We are injecting directly to the state.
|
||||
// UserAllocation option is not stored in the state.
|
||||
// We need to set Allocations directly.
|
||||
pin.Allocations = opts.UserAllocations
|
||||
}
|
||||
|
||||
err = st.Add(ctx, &pin)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
Loading…
Reference in New Issue
Block a user