StateSync(): some improvements
This commit: * Does not collect and return changed items when doing StateSync (they are not used) * Removes the StateSync RPC method (no longer used) * Uses tracker.StatusAll() rather than requesting Status on each Cid (should be faster with upcoming pintracker) * Does not launch a go-routine to track every item. Track is an async operation. This likely causes 1000s goroutines to be started with no good reason. License: MIT Signed-off-by: Hector Sanjuan <code@hector.link>
This commit is contained in:
parent
9336097791
commit
4d8f975d9b
38
cluster.go
38
cluster.go
|
@ -726,31 +726,36 @@ func (c *Cluster) Join(addr ma.Multiaddr) error {
|
|||
}
|
||||
|
||||
// StateSync syncs the consensus state to the Pin Tracker, ensuring
|
||||
// that every Cid that should be tracked is tracked. It returns
|
||||
// PinInfo for Cids which were added or deleted.
|
||||
func (c *Cluster) StateSync() ([]api.PinInfo, error) {
|
||||
// that every Cid in the shared state is tracked and that the Pin Tracker
|
||||
// is not tracking more Cids than it should.
|
||||
func (c *Cluster) StateSync() error {
|
||||
cState, err := c.consensus.State()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
logger.Debug("syncing state to tracker")
|
||||
clusterPins := cState.List()
|
||||
var changed []*cid.Cid
|
||||
|
||||
trackedPins := c.tracker.StatusAll()
|
||||
trackedPinsMap := make(map[string]int)
|
||||
for i, tpin := range trackedPins {
|
||||
trackedPinsMap[tpin.Cid.String()] = i
|
||||
}
|
||||
|
||||
// Track items which are not tracked
|
||||
for _, pin := range clusterPins {
|
||||
if c.tracker.Status(pin.Cid).Status == api.TrackerStatusUnpinned {
|
||||
_, tracked := trackedPinsMap[pin.Cid.String()]
|
||||
if !tracked {
|
||||
logger.Debugf("StateSync: tracking %s, part of the shared state", pin.Cid)
|
||||
changed = append(changed, pin.Cid)
|
||||
go c.tracker.Track(pin)
|
||||
c.tracker.Track(pin)
|
||||
}
|
||||
}
|
||||
|
||||
// a. Untrack items which should not be tracked
|
||||
// b. Track items which should not be remote as local
|
||||
// c. Track items which should not be local as remote
|
||||
for _, p := range c.tracker.StatusAll() {
|
||||
for _, p := range trackedPins {
|
||||
pCid := p.Cid
|
||||
currentPin := cState.Get(pCid)
|
||||
has := cState.Has(pCid)
|
||||
|
@ -759,24 +764,17 @@ func (c *Cluster) StateSync() ([]api.PinInfo, error) {
|
|||
switch {
|
||||
case !has:
|
||||
logger.Debugf("StateSync: Untracking %s, is not part of shared state", pCid)
|
||||
changed = append(changed, pCid)
|
||||
go c.tracker.Untrack(pCid)
|
||||
c.tracker.Untrack(pCid)
|
||||
case p.Status == api.TrackerStatusRemote && allocatedHere:
|
||||
logger.Debugf("StateSync: Tracking %s locally (currently remote)", pCid)
|
||||
changed = append(changed, pCid)
|
||||
go c.tracker.Track(currentPin)
|
||||
c.tracker.Track(currentPin)
|
||||
case p.Status == api.TrackerStatusPinned && !allocatedHere:
|
||||
logger.Debugf("StateSync: Tracking %s as remote (currently local)", pCid)
|
||||
changed = append(changed, pCid)
|
||||
go c.tracker.Track(currentPin)
|
||||
c.tracker.Track(currentPin)
|
||||
}
|
||||
}
|
||||
|
||||
var infos []api.PinInfo
|
||||
for _, h := range changed {
|
||||
infos = append(infos, c.tracker.Status(h))
|
||||
}
|
||||
return infos, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
// StatusAll returns the GlobalPinInfo for all tracked Cids in all peers.
|
||||
|
|
|
@ -160,7 +160,7 @@ func TestClusterStateSync(t *testing.T) {
|
|||
cl, _, _, st, _ := testingCluster(t)
|
||||
defer cleanRaft()
|
||||
defer cl.Shutdown()
|
||||
_, err := cl.StateSync()
|
||||
err := cl.StateSync()
|
||||
if err == nil {
|
||||
t.Fatal("expected an error as there is no state to sync")
|
||||
}
|
||||
|
@ -171,7 +171,7 @@ func TestClusterStateSync(t *testing.T) {
|
|||
t.Fatal("pin should have worked:", err)
|
||||
}
|
||||
|
||||
_, err = cl.StateSync()
|
||||
err = cl.StateSync()
|
||||
if err != nil {
|
||||
t.Fatal("sync after pinning should have worked:", err)
|
||||
}
|
||||
|
@ -179,7 +179,7 @@ func TestClusterStateSync(t *testing.T) {
|
|||
// Modify state on the side so the sync does not
|
||||
// happen on an empty slide
|
||||
st.Rm(c)
|
||||
_, err = cl.StateSync()
|
||||
err = cl.StateSync()
|
||||
if err != nil {
|
||||
t.Fatal("sync with recover should have worked:", err)
|
||||
}
|
||||
|
|
|
@ -125,7 +125,7 @@ func createCluster(
|
|||
)
|
||||
checkErr("creating consensus component", err)
|
||||
|
||||
tracker := maptracker.NewMapPinTracker(cfgs.trackerCfg, cfgs.clusterCfg.ID)
|
||||
tracker := maptracker.NewMapPinTracker(cfgs.trackerCfg, host.ID())
|
||||
mon := setupMonitor(c.String("monitor"), host, cfgs.monCfg, cfgs.pubsubmonCfg)
|
||||
informer, alloc := setupAllocation(c.String("alloc"), cfgs.diskInfCfg, cfgs.numpinInfCfg)
|
||||
|
||||
|
|
|
@ -192,13 +192,6 @@ func (rpcapi *RPCAPI) RecoverLocal(ctx context.Context, in api.PinSerial, out *a
|
|||
return err
|
||||
}
|
||||
|
||||
// StateSync runs Cluster.StateSync().
|
||||
func (rpcapi *RPCAPI) StateSync(ctx context.Context, in struct{}, out *[]api.PinInfoSerial) error {
|
||||
pinfos, err := rpcapi.c.StateSync()
|
||||
*out = pinInfoSliceToSerial(pinfos)
|
||||
return err
|
||||
}
|
||||
|
||||
/*
|
||||
Tracker component methods
|
||||
*/
|
||||
|
|
|
@ -226,11 +226,6 @@ func (mock *mockService) SyncLocal(ctx context.Context, in api.PinSerial, out *a
|
|||
return mock.StatusLocal(ctx, in, out)
|
||||
}
|
||||
|
||||
func (mock *mockService) StateSync(ctx context.Context, in struct{}, out *[]api.PinInfoSerial) error {
|
||||
*out = make([]api.PinInfoSerial, 0, 0)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mock *mockService) RecoverAllLocal(ctx context.Context, in struct{}, out *[]api.PinInfoSerial) error {
|
||||
return mock.TrackerRecoverAll(ctx, in, out)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue
Block a user