From 4d8f975d9b9931b06bbd55690affb5146aee0fdf Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Thu, 24 May 2018 16:40:40 +0200 Subject: [PATCH] StateSync(): some improvements This commit: * Does not collect and return changed items when doing StateSync (they are not used) * Removes the StateSync RPC method (no longer used) * Uses tracker.StatusAll() rather than requesting Status on each Cid (should be faster with upcoming pintracker) * Does not launch a go-routine to track every item. Track is an async operation. This likely causes 1000s goroutines to be started with no good reason. License: MIT Signed-off-by: Hector Sanjuan --- cluster.go | 38 ++++++++++++++++------------------ cluster_test.go | 6 +++--- ipfs-cluster-service/daemon.go | 2 +- rpc_api.go | 7 ------- test/rpc_api_mock.go | 5 ----- 5 files changed, 22 insertions(+), 36 deletions(-) diff --git a/cluster.go b/cluster.go index 80a0f36e..05cc8bb0 100644 --- a/cluster.go +++ b/cluster.go @@ -726,31 +726,36 @@ func (c *Cluster) Join(addr ma.Multiaddr) error { } // StateSync syncs the consensus state to the Pin Tracker, ensuring -// that every Cid that should be tracked is tracked. It returns -// PinInfo for Cids which were added or deleted. -func (c *Cluster) StateSync() ([]api.PinInfo, error) { +// that every Cid in the shared state is tracked and that the Pin Tracker +// is not tracking more Cids than it should. +func (c *Cluster) StateSync() error { cState, err := c.consensus.State() if err != nil { - return nil, err + return err } logger.Debug("syncing state to tracker") clusterPins := cState.List() - var changed []*cid.Cid + + trackedPins := c.tracker.StatusAll() + trackedPinsMap := make(map[string]int) + for i, tpin := range trackedPins { + trackedPinsMap[tpin.Cid.String()] = i + } // Track items which are not tracked for _, pin := range clusterPins { - if c.tracker.Status(pin.Cid).Status == api.TrackerStatusUnpinned { + _, tracked := trackedPinsMap[pin.Cid.String()] + if !tracked { logger.Debugf("StateSync: tracking %s, part of the shared state", pin.Cid) - changed = append(changed, pin.Cid) - go c.tracker.Track(pin) + c.tracker.Track(pin) } } // a. Untrack items which should not be tracked // b. Track items which should not be remote as local // c. Track items which should not be local as remote - for _, p := range c.tracker.StatusAll() { + for _, p := range trackedPins { pCid := p.Cid currentPin := cState.Get(pCid) has := cState.Has(pCid) @@ -759,24 +764,17 @@ func (c *Cluster) StateSync() ([]api.PinInfo, error) { switch { case !has: logger.Debugf("StateSync: Untracking %s, is not part of shared state", pCid) - changed = append(changed, pCid) - go c.tracker.Untrack(pCid) + c.tracker.Untrack(pCid) case p.Status == api.TrackerStatusRemote && allocatedHere: logger.Debugf("StateSync: Tracking %s locally (currently remote)", pCid) - changed = append(changed, pCid) - go c.tracker.Track(currentPin) + c.tracker.Track(currentPin) case p.Status == api.TrackerStatusPinned && !allocatedHere: logger.Debugf("StateSync: Tracking %s as remote (currently local)", pCid) - changed = append(changed, pCid) - go c.tracker.Track(currentPin) + c.tracker.Track(currentPin) } } - var infos []api.PinInfo - for _, h := range changed { - infos = append(infos, c.tracker.Status(h)) - } - return infos, nil + return nil } // StatusAll returns the GlobalPinInfo for all tracked Cids in all peers. diff --git a/cluster_test.go b/cluster_test.go index 19053251..9c2e7a68 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -160,7 +160,7 @@ func TestClusterStateSync(t *testing.T) { cl, _, _, st, _ := testingCluster(t) defer cleanRaft() defer cl.Shutdown() - _, err := cl.StateSync() + err := cl.StateSync() if err == nil { t.Fatal("expected an error as there is no state to sync") } @@ -171,7 +171,7 @@ func TestClusterStateSync(t *testing.T) { t.Fatal("pin should have worked:", err) } - _, err = cl.StateSync() + err = cl.StateSync() if err != nil { t.Fatal("sync after pinning should have worked:", err) } @@ -179,7 +179,7 @@ func TestClusterStateSync(t *testing.T) { // Modify state on the side so the sync does not // happen on an empty slide st.Rm(c) - _, err = cl.StateSync() + err = cl.StateSync() if err != nil { t.Fatal("sync with recover should have worked:", err) } diff --git a/ipfs-cluster-service/daemon.go b/ipfs-cluster-service/daemon.go index f5f53ea6..2fe1e4b3 100644 --- a/ipfs-cluster-service/daemon.go +++ b/ipfs-cluster-service/daemon.go @@ -125,7 +125,7 @@ func createCluster( ) checkErr("creating consensus component", err) - tracker := maptracker.NewMapPinTracker(cfgs.trackerCfg, cfgs.clusterCfg.ID) + tracker := maptracker.NewMapPinTracker(cfgs.trackerCfg, host.ID()) mon := setupMonitor(c.String("monitor"), host, cfgs.monCfg, cfgs.pubsubmonCfg) informer, alloc := setupAllocation(c.String("alloc"), cfgs.diskInfCfg, cfgs.numpinInfCfg) diff --git a/rpc_api.go b/rpc_api.go index 0ceba48d..5336def7 100644 --- a/rpc_api.go +++ b/rpc_api.go @@ -192,13 +192,6 @@ func (rpcapi *RPCAPI) RecoverLocal(ctx context.Context, in api.PinSerial, out *a return err } -// StateSync runs Cluster.StateSync(). -func (rpcapi *RPCAPI) StateSync(ctx context.Context, in struct{}, out *[]api.PinInfoSerial) error { - pinfos, err := rpcapi.c.StateSync() - *out = pinInfoSliceToSerial(pinfos) - return err -} - /* Tracker component methods */ diff --git a/test/rpc_api_mock.go b/test/rpc_api_mock.go index 0b0406d7..9bf32a90 100644 --- a/test/rpc_api_mock.go +++ b/test/rpc_api_mock.go @@ -226,11 +226,6 @@ func (mock *mockService) SyncLocal(ctx context.Context, in api.PinSerial, out *a return mock.StatusLocal(ctx, in, out) } -func (mock *mockService) StateSync(ctx context.Context, in struct{}, out *[]api.PinInfoSerial) error { - *out = make([]api.PinInfoSerial, 0, 0) - return nil -} - func (mock *mockService) RecoverAllLocal(ctx context.Context, in struct{}, out *[]api.PinInfoSerial) error { return mock.TrackerRecoverAll(ctx, in, out) }