ipfs-cluster/consensus_test.go
Hector Sanjuan 2512ecb701 Issue #41: Add Replication factor
New PeerManager, Allocator, Informer components have been added along
with a new "replication_factor" configuration option.

First, cluster peers collect and push metrics (Informer) to the Cluster
leader regularly. The Informer is an interface that can be implemented
in custom wayts to support custom metrics.

Second, on a pin operation, using the information from the collected metrics,
an Allocator can provide a list of preferences as to where the new pin
should be assigned. The Allocator is an interface allowing to provide
different allocation strategies.

Both Allocator and Informer are Cluster Componenets, and have access
to the RPC API.

The allocations are kept in the shared state. Cluster peer failure
detection is still missing and re-allocation is still missing, although
re-pinning something when a node is down/metrics missing does re-allocate
the pin somewhere else.

License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
2017-02-14 19:13:08 +01:00

106 lines
2.3 KiB
Go

package ipfscluster
import (
"context"
"os"
"testing"
"time"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/state/mapstate"
"github.com/ipfs/ipfs-cluster/test"
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-peer"
)
func cleanRaft() {
os.RemoveAll(testingConfig().ConsensusDataFolder)
}
func testingConsensus(t *testing.T) *Consensus {
//logging.SetDebugLogging()
cfg := testingConfig()
ctx := context.Background()
h, err := makeHost(ctx, cfg)
if err != nil {
t.Fatal("cannot create host:", err)
}
st := mapstate.NewMapState()
cc, err := NewConsensus([]peer.ID{cfg.ID}, h, cfg.ConsensusDataFolder, st)
if err != nil {
t.Fatal("cannot create Consensus:", err)
}
cc.SetClient(test.NewMockRPCClient(t))
<-cc.Ready()
return cc
}
func TestShutdownConsensus(t *testing.T) {
// Bring it up twice to make sure shutdown cleans up properly
// but also to make sure raft comes up ok when re-initialized
defer cleanRaft()
cc := testingConsensus(t)
err := cc.Shutdown()
if err != nil {
t.Fatal("Consensus cannot shutdown:", err)
}
cc.Shutdown()
cc = testingConsensus(t)
err = cc.Shutdown()
if err != nil {
t.Fatal("Consensus cannot shutdown:", err)
}
}
func TestConsensusPin(t *testing.T) {
cc := testingConsensus(t)
defer cleanRaft() // Remember defer runs in LIFO order
defer cc.Shutdown()
c, _ := cid.Decode(test.TestCid1)
err := cc.LogPin(api.CidArg{Cid: c, Everywhere: true})
if err != nil {
t.Error("the operation did not make it to the log:", err)
}
time.Sleep(250 * time.Millisecond)
st, err := cc.State()
if err != nil {
t.Fatal("error gettinng state:", err)
}
pins := st.List()
if len(pins) != 1 || pins[0].Cid.String() != test.TestCid1 {
t.Error("the added pin should be in the state")
}
}
func TestConsensusUnpin(t *testing.T) {
cc := testingConsensus(t)
defer cleanRaft()
defer cc.Shutdown()
c, _ := cid.Decode(test.TestCid2)
err := cc.LogUnpin(api.CidArgCid(c))
if err != nil {
t.Error("the operation did not make it to the log:", err)
}
}
func TestConsensusLeader(t *testing.T) {
cc := testingConsensus(t)
cfg := testingConfig()
pID := cfg.ID
defer cleanRaft()
defer cc.Shutdown()
l, err := cc.Leader()
if err != nil {
t.Fatal("No leader:", err)
}
if l != pID {
t.Errorf("expected %s but the leader appears as %s", pID, l)
}
}