2512ecb701
New PeerManager, Allocator, Informer components have been added along with a new "replication_factor" configuration option. First, cluster peers collect and push metrics (Informer) to the Cluster leader regularly. The Informer is an interface that can be implemented in custom wayts to support custom metrics. Second, on a pin operation, using the information from the collected metrics, an Allocator can provide a list of preferences as to where the new pin should be assigned. The Allocator is an interface allowing to provide different allocation strategies. Both Allocator and Informer are Cluster Componenets, and have access to the RPC API. The allocations are kept in the shared state. Cluster peer failure detection is still missing and re-allocation is still missing, although re-pinning something when a node is down/metrics missing does re-allocate the pin somewhere else. License: MIT Signed-off-by: Hector Sanjuan <hector@protocol.ai>
106 lines
2.3 KiB
Go
106 lines
2.3 KiB
Go
package ipfscluster
|
|
|
|
import (
|
|
"context"
|
|
"os"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/ipfs/ipfs-cluster/api"
|
|
"github.com/ipfs/ipfs-cluster/state/mapstate"
|
|
"github.com/ipfs/ipfs-cluster/test"
|
|
|
|
cid "github.com/ipfs/go-cid"
|
|
peer "github.com/libp2p/go-libp2p-peer"
|
|
)
|
|
|
|
func cleanRaft() {
|
|
os.RemoveAll(testingConfig().ConsensusDataFolder)
|
|
}
|
|
|
|
func testingConsensus(t *testing.T) *Consensus {
|
|
//logging.SetDebugLogging()
|
|
cfg := testingConfig()
|
|
ctx := context.Background()
|
|
h, err := makeHost(ctx, cfg)
|
|
if err != nil {
|
|
t.Fatal("cannot create host:", err)
|
|
}
|
|
st := mapstate.NewMapState()
|
|
cc, err := NewConsensus([]peer.ID{cfg.ID}, h, cfg.ConsensusDataFolder, st)
|
|
if err != nil {
|
|
t.Fatal("cannot create Consensus:", err)
|
|
}
|
|
cc.SetClient(test.NewMockRPCClient(t))
|
|
<-cc.Ready()
|
|
return cc
|
|
}
|
|
|
|
func TestShutdownConsensus(t *testing.T) {
|
|
// Bring it up twice to make sure shutdown cleans up properly
|
|
// but also to make sure raft comes up ok when re-initialized
|
|
defer cleanRaft()
|
|
cc := testingConsensus(t)
|
|
err := cc.Shutdown()
|
|
if err != nil {
|
|
t.Fatal("Consensus cannot shutdown:", err)
|
|
}
|
|
cc.Shutdown()
|
|
cc = testingConsensus(t)
|
|
err = cc.Shutdown()
|
|
if err != nil {
|
|
t.Fatal("Consensus cannot shutdown:", err)
|
|
}
|
|
}
|
|
|
|
func TestConsensusPin(t *testing.T) {
|
|
cc := testingConsensus(t)
|
|
defer cleanRaft() // Remember defer runs in LIFO order
|
|
defer cc.Shutdown()
|
|
|
|
c, _ := cid.Decode(test.TestCid1)
|
|
err := cc.LogPin(api.CidArg{Cid: c, Everywhere: true})
|
|
if err != nil {
|
|
t.Error("the operation did not make it to the log:", err)
|
|
}
|
|
|
|
time.Sleep(250 * time.Millisecond)
|
|
st, err := cc.State()
|
|
if err != nil {
|
|
t.Fatal("error gettinng state:", err)
|
|
}
|
|
|
|
pins := st.List()
|
|
if len(pins) != 1 || pins[0].Cid.String() != test.TestCid1 {
|
|
t.Error("the added pin should be in the state")
|
|
}
|
|
}
|
|
|
|
func TestConsensusUnpin(t *testing.T) {
|
|
cc := testingConsensus(t)
|
|
defer cleanRaft()
|
|
defer cc.Shutdown()
|
|
|
|
c, _ := cid.Decode(test.TestCid2)
|
|
err := cc.LogUnpin(api.CidArgCid(c))
|
|
if err != nil {
|
|
t.Error("the operation did not make it to the log:", err)
|
|
}
|
|
}
|
|
|
|
func TestConsensusLeader(t *testing.T) {
|
|
cc := testingConsensus(t)
|
|
cfg := testingConfig()
|
|
pID := cfg.ID
|
|
defer cleanRaft()
|
|
defer cc.Shutdown()
|
|
l, err := cc.Leader()
|
|
if err != nil {
|
|
t.Fatal("No leader:", err)
|
|
}
|
|
|
|
if l != pID {
|
|
t.Errorf("expected %s but the leader appears as %s", pID, l)
|
|
}
|
|
}
|