Issue #41: Add Replication factor

New PeerManager, Allocator, Informer components have been added along
with a new "replication_factor" configuration option.

First, cluster peers collect and push metrics (Informer) to the Cluster
leader regularly. The Informer is an interface that can be implemented
in custom wayts to support custom metrics.

Second, on a pin operation, using the information from the collected metrics,
an Allocator can provide a list of preferences as to where the new pin
should be assigned. The Allocator is an interface allowing to provide
different allocation strategies.

Both Allocator and Informer are Cluster Componenets, and have access
to the RPC API.

The allocations are kept in the shared state. Cluster peer failure
detection is still missing and re-allocation is still missing, although
re-pinning something when a node is down/metrics missing does re-allocate
the pin somewhere else.

License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
This commit is contained in:
Hector Sanjuan 2017-02-13 16:46:53 +01:00
parent 9deb56b762
commit 2512ecb701
33 changed files with 2058 additions and 361 deletions

View File

@ -55,7 +55,7 @@ deps: gx
$(gx_bin) --verbose install --global
$(gx-go_bin) rewrite
test: deps
go test -tags silent -v -covermode count -coverprofile=coverage.out ./...
go test -tags silent -v ./...
rw: gx
$(gx-go_bin) rewrite
rwundo: gx

View File

@ -0,0 +1,93 @@
// Package numpinalloc implements an ipfscluster.Allocator based on the "numpin"
// Informer. It is a simple example on how an allocator is implemented.
package numpinalloc
import (
"sort"
"strconv"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/informer/numpin"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
cid "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
peer "github.com/libp2p/go-libp2p-peer"
)
var logger = logging.Logger("numpinalloc")
// Allocator implements ipfscluster.Allocate.
type Allocator struct{}
func NewAllocator() *Allocator {
return &Allocator{}
}
// SetClient does nothing in this allocator
func (alloc *Allocator) SetClient(c *rpc.Client) {}
// Shutdown does nothing in this allocator
func (alloc *Allocator) Shutdown() error { return nil }
// Allocate returns where to allocate a pin request based on "numpin"-Informer
// metrics. In this simple case, we do not pay attention to the metrics
// of the current, we just need to sort the candidates by number of pins.
func (alloc *Allocator) Allocate(c *cid.Cid, current, candidates map[peer.ID]api.Metric) ([]peer.ID, error) {
// sort our metrics
numpins := newMetricsSorter(candidates)
sort.Sort(numpins)
return numpins.peers, nil
}
// metricsSorter attaches sort.Interface methods to our metrics and sorts
// a slice of peers in the way that interest us
type metricsSorter struct {
peers []peer.ID
m map[peer.ID]int
}
func newMetricsSorter(m map[peer.ID]api.Metric) *metricsSorter {
vMap := make(map[peer.ID]int)
peers := make([]peer.ID, 0, len(m))
for k, v := range m {
if v.Name != numpin.MetricName || v.Discard() {
continue
}
val, err := strconv.Atoi(v.Value)
if err != nil {
continue
}
peers = append(peers, k)
vMap[k] = val
}
sorter := &metricsSorter{
m: vMap,
peers: peers,
}
return sorter
}
// Len returns the number of metrics
func (s metricsSorter) Len() int {
return len(s.peers)
}
// Less reports if the element in position i is less than the element in j
func (s metricsSorter) Less(i, j int) bool {
peeri := s.peers[i]
peerj := s.peers[j]
x := s.m[peeri]
y := s.m[peerj]
return x < y
}
// Swap swaps the elements in positions i and j
func (s metricsSorter) Swap(i, j int) {
temp := s.peers[i]
s.peers[i] = s.peers[j]
s.peers[j] = temp
}

View File

@ -0,0 +1,134 @@
package numpinalloc
import (
"testing"
"time"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/informer/numpin"
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-peer"
)
type testcase struct {
candidates map[peer.ID]api.Metric
current map[peer.ID]api.Metric
expected []peer.ID
}
var (
peer0 = peer.ID("QmUQ6Nsejt1SuZAu8yL8WgqQZHHAYreLVYYa4VPsLUCed7")
peer1 = peer.ID("QmUZ13osndQ5uL4tPWHXe3iBgBgq9gfewcBMSCAuMBsDJ6")
peer2 = peer.ID("QmPrSBATWGAN56fiiEWEhKX3L1F3mTghEQR7vQwaeo7zHi")
peer3 = peer.ID("QmPGDFvBkgWhvzEK9qaTWrWurSwqXNmhnK3hgELPdZZNPa")
testCid, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmq")
)
var inAMinute = time.Now().Add(time.Minute).Format(time.RFC1123)
var testCases = []testcase{
{ // regular sort
candidates: map[peer.ID]api.Metric{
peer0: api.Metric{
Name: numpin.MetricName,
Value: "5",
Expire: inAMinute,
Valid: true,
},
peer1: api.Metric{
Name: numpin.MetricName,
Value: "1",
Expire: inAMinute,
Valid: true,
},
peer2: api.Metric{
Name: numpin.MetricName,
Value: "3",
Expire: inAMinute,
Valid: true,
},
peer3: api.Metric{
Name: numpin.MetricName,
Value: "2",
Expire: inAMinute,
Valid: true,
},
},
current: map[peer.ID]api.Metric{},
expected: []peer.ID{peer1, peer3, peer2, peer0},
},
{ // filter invalid
candidates: map[peer.ID]api.Metric{
peer0: api.Metric{
Name: numpin.MetricName,
Value: "1",
Expire: inAMinute,
Valid: false,
},
peer1: api.Metric{
Name: numpin.MetricName,
Value: "5",
Expire: inAMinute,
Valid: true,
},
},
current: map[peer.ID]api.Metric{},
expected: []peer.ID{peer1},
},
{ // filter bad metric name
candidates: map[peer.ID]api.Metric{
peer0: api.Metric{
Name: "lalala",
Value: "1",
Expire: inAMinute,
Valid: true,
},
peer1: api.Metric{
Name: numpin.MetricName,
Value: "5",
Expire: inAMinute,
Valid: true,
},
},
current: map[peer.ID]api.Metric{},
expected: []peer.ID{peer1},
},
{ // filter bad value
candidates: map[peer.ID]api.Metric{
peer0: api.Metric{
Name: numpin.MetricName,
Value: "abc",
Expire: inAMinute,
Valid: true,
},
peer1: api.Metric{
Name: numpin.MetricName,
Value: "5",
Expire: inAMinute,
Valid: true,
},
},
current: map[peer.ID]api.Metric{},
expected: []peer.ID{peer1},
},
}
func Test(t *testing.T) {
alloc := &Allocator{}
for i, tc := range testCases {
t.Logf("Test case %d", i)
res, err := alloc.Allocate(testCid, tc.current, tc.candidates)
if err != nil {
t.Fatal(err)
}
if len(res) == 0 {
t.Fatal("0 allocations")
}
for i, r := range res {
if e := tc.expected[i]; r != e {
t.Errorf("Expect r[%d]=%s but got %s", i, r, e)
}
}
}
}

View File

@ -36,7 +36,7 @@ const (
// The IPFS daemon is not pinning the item
TrackerStatusUnpinned
// The IPFS deamon is not pinning the item but it is being tracked
TrackerStatusRemotePin
TrackerStatusRemote
)
// TrackerStatus represents the status of a tracked Cid in the PinTracker
@ -51,7 +51,7 @@ var trackerStatusString = map[TrackerStatus]string{
TrackerStatusPinning: "pinning",
TrackerStatusUnpinning: "unpinning",
TrackerStatusUnpinned: "unpinned",
TrackerStatusRemotePin: "remote",
TrackerStatusRemote: "remote",
}
// String converts a TrackerStatus into a readable string.
@ -335,25 +335,91 @@ func (addrsS MultiaddrsSerial) ToMultiaddrs() []ma.Multiaddr {
// CidArg is an arguments that carry a Cid. It may carry more things in the
// future.
type CidArg struct {
Cid *cid.Cid
Cid *cid.Cid
Allocations []peer.ID
Everywhere bool
}
// CidArgCid is a shorcut to create a CidArg only with a Cid.
func CidArgCid(c *cid.Cid) CidArg {
return CidArg{
Cid: c,
}
}
// CidArgSerial is a serializable version of CidArg
type CidArgSerial struct {
Cid string `json:"cid"`
Cid string `json:"cid"`
Allocations []string `json:"allocations"`
Everywhere bool `json:"everywhere"`
}
// ToSerial converts a CidArg to CidArgSerial.
func (carg CidArg) ToSerial() CidArgSerial {
lenAllocs := len(carg.Allocations)
allocs := make([]string, lenAllocs, lenAllocs)
for i, p := range carg.Allocations {
allocs[i] = peer.IDB58Encode(p)
}
return CidArgSerial{
Cid: carg.Cid.String(),
Cid: carg.Cid.String(),
Allocations: allocs,
Everywhere: carg.Everywhere,
}
}
// ToCidArg converts a CidArgSerial to its native form.
func (cargs CidArgSerial) ToCidArg() CidArg {
c, _ := cid.Decode(cargs.Cid)
lenAllocs := len(cargs.Allocations)
allocs := make([]peer.ID, lenAllocs, lenAllocs)
for i, p := range cargs.Allocations {
allocs[i], _ = peer.IDB58Decode(p)
}
return CidArg{
Cid: c,
Cid: c,
Allocations: allocs,
Everywhere: cargs.Everywhere,
}
}
// Metric transports information about a peer.ID. It is used to decide
// pin allocations by a PinAllocator. IPFS cluster is agnostic to
// the Value, which should be interpreted by the PinAllocator.
type Metric struct {
Name string
Peer peer.ID // filled-in by Cluster.
Value string
Expire string // RFC1123
Valid bool // if the metric is not valid it will be discarded
}
// SetTTL sets Metric to expire after the given seconds
func (m *Metric) SetTTL(seconds int) {
exp := time.Now().Add(time.Duration(seconds) * time.Second)
m.Expire = exp.Format(time.RFC1123)
}
// GetTTL returns the time left before the Metric expires
func (m *Metric) GetTTL() time.Duration {
exp, _ := time.Parse(time.RFC1123, m.Expire)
return exp.Sub(time.Now())
}
// Expired returns if the Metric has expired
func (m *Metric) Expired() bool {
exp, _ := time.Parse(time.RFC1123, m.Expire)
return time.Now().After(exp)
}
// Discard returns if the metric not valid or has expired
func (m *Metric) Discard() bool {
return !m.Valid || m.Expired()
}
// Alert carries alerting information about a peer. WIP.
type Alert struct {
Peer peer.ID
MetricName string
}

195
api/types_test.go Normal file
View File

@ -0,0 +1,195 @@
package api
import (
"testing"
"time"
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-peer"
ma "github.com/multiformats/go-multiaddr"
)
var testTime = time.Date(2017, 12, 31, 15, 45, 50, 0, time.UTC)
var testMAddr, _ = ma.NewMultiaddr("/ip4/1.2.3.4")
var testCid1, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmq")
var testPeerID1, _ = peer.IDB58Decode("QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLewuabc")
var testPeerID2, _ = peer.IDB58Decode("QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLewuabd")
func TestTrackerFromString(t *testing.T) {
testcases := []string{"bug", "cluster_error", "pin_error", "unpin_error", "pinned", "pinning", "unpinning", "unpinned", "remote"}
for i, tc := range testcases {
if TrackerStatusFromString(tc).String() != TrackerStatus(i).String() {
t.Errorf("%s does not match %s", tc, i)
}
}
}
func TestIPFSPinStatusFromString(t *testing.T) {
testcases := []string{"direct", "recursive", "indirect"}
for i, tc := range testcases {
if IPFSPinStatusFromString(tc) != IPFSPinStatus(i+2) {
t.Errorf("%s does not match IPFSPinStatus %d", tc, i+2)
}
}
}
func TestGlobalPinInfoConv(t *testing.T) {
defer func() {
if r := recover(); r != nil {
t.Fatal("paniced")
}
}()
gpi := GlobalPinInfo{
Cid: testCid1,
PeerMap: map[peer.ID]PinInfo{
testPeerID1: {
Cid: testCid1,
Peer: testPeerID1,
Status: TrackerStatusPinned,
TS: testTime,
},
},
}
newgpi := gpi.ToSerial().ToGlobalPinInfo()
if gpi.Cid.String() != newgpi.Cid.String() {
t.Error("mismatching CIDs")
}
if gpi.PeerMap[testPeerID1].Cid.String() != newgpi.PeerMap[testPeerID1].Cid.String() {
t.Error("mismatching PinInfo CIDs")
}
if !gpi.PeerMap[testPeerID1].TS.Equal(newgpi.PeerMap[testPeerID1].TS) {
t.Error("bad time")
}
}
func TestIDConv(t *testing.T) {
defer func() {
if r := recover(); r != nil {
t.Fatal("paniced")
}
}()
id := ID{
ID: testPeerID1,
Addresses: []ma.Multiaddr{testMAddr},
ClusterPeers: []ma.Multiaddr{testMAddr},
Version: "testv",
Commit: "ab",
RPCProtocolVersion: "testp",
Error: "teste",
IPFS: IPFSID{
ID: testPeerID2,
Addresses: []ma.Multiaddr{testMAddr},
Error: "abc",
},
}
newid := id.ToSerial().ToID()
if id.ID != newid.ID {
t.Error("mismatching Peer IDs")
}
if !id.Addresses[0].Equal(newid.Addresses[0]) {
t.Error("mismatching addresses")
}
if !id.ClusterPeers[0].Equal(newid.ClusterPeers[0]) {
t.Error("mismatching clusterPeers")
}
if id.Version != newid.Version ||
id.Commit != newid.Commit ||
id.RPCProtocolVersion != newid.RPCProtocolVersion ||
id.Error != newid.Error {
t.Error("some field didn't survive")
}
if id.IPFS.ID != newid.IPFS.ID {
t.Error("ipfs daemon id mismatch")
}
if !id.IPFS.Addresses[0].Equal(newid.IPFS.Addresses[0]) {
t.Error("mismatching addresses")
}
if id.IPFS.Error != newid.IPFS.Error {
t.Error("ipfs error mismatch")
}
}
func TestMultiaddrConv(t *testing.T) {
defer func() {
if r := recover(); r != nil {
t.Fatal("paniced")
}
}()
addrs := []ma.Multiaddr{testMAddr}
new := MultiaddrsToSerial(addrs).ToMultiaddrs()
if !addrs[0].Equal(new[0]) {
t.Error("mismatch")
}
}
func TestCidArgConv(t *testing.T) {
defer func() {
if r := recover(); r != nil {
t.Fatal("paniced")
}
}()
c := CidArg{
Cid: testCid1,
Allocations: []peer.ID{testPeerID1},
Everywhere: true,
}
newc := c.ToSerial().ToCidArg()
if c.Cid.String() != newc.Cid.String() ||
c.Allocations[0] != newc.Allocations[0] ||
c.Everywhere != newc.Everywhere {
t.Error("mismatch")
}
}
func TestMetric(t *testing.T) {
m := Metric{
Name: "hello",
Value: "abc",
}
if !m.Expired() {
t.Error("metric should be expire")
}
m.SetTTL(1)
if m.Expired() {
t.Error("metric should not be expired")
}
// let it expire
time.Sleep(1500 * time.Millisecond)
if !m.Expired() {
t.Error("metric should be expired")
}
m.SetTTL(30)
m.Valid = true
if m.Discard() {
t.Error("metric should be valid")
}
m.Valid = false
if !m.Discard() {
t.Error("metric should be invalid")
}
ttl := m.GetTTL()
if ttl > 30*time.Second || ttl < 29*time.Second {
t.Error("looks like a bad ttl")
}
}

View File

@ -2,6 +2,7 @@ package ipfscluster
import (
"context"
"errors"
"fmt"
"sync"
"time"
@ -36,6 +37,9 @@ type Cluster struct {
ipfs IPFSConnector
state State
tracker PinTracker
monitor PeerMonitor
allocator PinAllocator
informer Informer
shutdownLock sync.Mutex
shutdown bool
@ -52,7 +56,16 @@ type Cluster struct {
// The new cluster peer may still be performing initialization tasks when
// this call returns (consensus may still be bootstrapping). Use Cluster.Ready()
// if you need to wait until the peer is fully up.
func NewCluster(cfg *Config, api API, ipfs IPFSConnector, state State, tracker PinTracker) (*Cluster, error) {
func NewCluster(
cfg *Config,
api API,
ipfs IPFSConnector,
state State,
tracker PinTracker,
monitor PeerMonitor,
allocator PinAllocator,
informer Informer) (*Cluster, error) {
ctx, cancel := context.WithCancel(context.Background())
host, err := makeHost(ctx, cfg)
if err != nil {
@ -65,17 +78,20 @@ func NewCluster(cfg *Config, api API, ipfs IPFSConnector, state State, tracker P
}
c := &Cluster{
ctx: ctx,
cancel: cancel,
id: host.ID(),
config: cfg,
host: host,
api: api,
ipfs: ipfs,
state: state,
tracker: tracker,
doneCh: make(chan struct{}, 1),
readyCh: make(chan struct{}, 1),
ctx: ctx,
cancel: cancel,
id: host.ID(),
config: cfg,
host: host,
api: api,
ipfs: ipfs,
state: state,
tracker: tracker,
monitor: monitor,
allocator: allocator,
informer: informer,
doneCh: make(chan struct{}),
readyCh: make(chan struct{}),
}
c.setupPeerManager()
@ -91,7 +107,17 @@ func NewCluster(cfg *Config, api API, ipfs IPFSConnector, state State, tracker P
return nil, err
}
c.setupRPCClients()
c.run()
c.bootstrap()
ok := c.bootstrap()
if !ok {
logger.Error("Bootstrap unsuccessful")
c.Shutdown()
return nil, errors.New("bootstrap unsuccessful")
}
go func() {
c.ready()
c.run()
}()
return c, nil
}
@ -144,8 +170,12 @@ func (c *Cluster) setupRPCClients() {
c.ipfs.SetClient(c.rpcClient)
c.api.SetClient(c.rpcClient)
c.consensus.SetClient(c.rpcClient)
c.monitor.SetClient(c.rpcClient)
c.allocator.SetClient(c.rpcClient)
c.informer.SetClient(c.rpcClient)
}
// stateSyncWatcher loops and triggers StateSync from time to time
func (c *Cluster) stateSyncWatcher() {
stateSyncTicker := time.NewTicker(
time.Duration(c.config.StateSyncSeconds) * time.Second)
@ -160,21 +190,52 @@ func (c *Cluster) stateSyncWatcher() {
}
}
// push metrics loops and pushes metrics to the leader's monitor
func (c *Cluster) pushInformerMetrics() {
timer := time.NewTimer(0) // fire immediately first
for {
select {
case <-c.ctx.Done():
return
case <-timer.C:
// wait
}
leader, err := c.consensus.Leader()
if err != nil {
// retry in 1 second
timer.Stop()
timer.Reset(1 * time.Second)
continue
}
metric := c.informer.GetMetric()
metric.Peer = c.id
err = c.rpcClient.Call(
leader,
"Cluster", "PeerMonitorLogMetric",
metric, &struct{}{})
if err != nil {
logger.Errorf("error pushing metric to %s", leader.Pretty())
}
logger.Debugf("pushed metric %s to %s", metric.Name, metric.Peer.Pretty())
timer.Stop() // no need to drain C if we are here
timer.Reset(metric.GetTTL() / 2)
}
}
// run provides a cancellable context and launches some goroutines
// before signaling readyCh
func (c *Cluster) run() {
go c.stateSyncWatcher()
go c.bootstrapAndReady()
go c.pushInformerMetrics()
}
func (c *Cluster) bootstrapAndReady() {
ok := c.bootstrap()
if !ok {
logger.Error("Bootstrap unsuccessful")
c.Shutdown()
return
}
func (c *Cluster) ready() {
// We bootstrapped first because with dirty state consensus
// may have a peerset and not find a leader so we cannot wait
// for it.
@ -190,8 +251,6 @@ func (c *Cluster) bootstrapAndReady() {
}
// Cluster is ready.
c.readyCh <- struct{}{}
logger.Info("IPFS Cluster is ready")
logger.Info("Cluster Peers (not including ourselves):")
peers := c.peerManager.peersAddrs()
if len(peers) == 0 {
@ -200,6 +259,8 @@ func (c *Cluster) bootstrapAndReady() {
for _, a := range c.peerManager.peersAddrs() {
logger.Infof(" - %s", a)
}
close(c.readyCh)
logger.Info("IPFS Cluster is ready")
}
func (c *Cluster) bootstrap() bool {
@ -467,25 +528,20 @@ func (c *Cluster) StateSync() ([]api.PinInfo, error) {
}
logger.Debug("syncing state to tracker")
clusterPins := cState.ListPins()
clusterPins := cState.List()
var changed []*cid.Cid
// For the moment we run everything in parallel.
// The PinTracker should probably decide if it can
// pin in parallel or queues everything and does it
// one by one
// Track items which are not tracked
for _, h := range clusterPins {
if c.tracker.Status(h).Status == api.TrackerStatusUnpinned {
changed = append(changed, h)
go c.tracker.Track(h)
for _, carg := range clusterPins {
if c.tracker.Status(carg.Cid).Status == api.TrackerStatusUnpinned {
changed = append(changed, carg.Cid)
go c.tracker.Track(carg)
}
}
// Untrack items which should not be tracked
for _, p := range c.tracker.StatusAll() {
if !cState.HasPin(p.Cid) {
if !cState.Has(p.Cid) {
changed = append(changed, p.Cid)
go c.tracker.Untrack(p.Cid)
}
@ -566,13 +622,14 @@ func (c *Cluster) Recover(h *cid.Cid) (api.GlobalPinInfo, error) {
// Pins returns the list of Cids managed by Cluster and which are part
// of the current global state. This is the source of truth as to which
// pins are managed, but does not indicate if the item is successfully pinned.
func (c *Cluster) Pins() []*cid.Cid {
func (c *Cluster) Pins() []api.CidArg {
cState, err := c.consensus.State()
if err != nil {
logger.Error(err)
return []*cid.Cid{}
return []api.CidArg{}
}
return cState.ListPins()
return cState.List()
}
// Pin makes the cluster Pin a Cid. This implies adding the Cid
@ -585,7 +642,26 @@ func (c *Cluster) Pins() []*cid.Cid {
// of underlying IPFS daemon pinning operations.
func (c *Cluster) Pin(h *cid.Cid) error {
logger.Info("pinning:", h)
err := c.consensus.LogPin(h)
cidArg := api.CidArg{
Cid: h,
}
rpl := c.config.ReplicationFactor
switch {
case rpl == 0:
return errors.New("replication factor is 0")
case rpl < 0:
cidArg.Everywhere = true
case rpl > 0:
allocs, err := c.allocate(h)
if err != nil {
return err
}
cidArg.Allocations = allocs
}
err := c.consensus.LogPin(cidArg)
if err != nil {
return err
}
@ -600,7 +676,12 @@ func (c *Cluster) Pin(h *cid.Cid) error {
// of underlying IPFS daemon unpinning operations.
func (c *Cluster) Unpin(h *cid.Cid) error {
logger.Info("unpinning:", h)
err := c.consensus.LogUnpin(h)
carg := api.CidArg{
Cid: h,
}
err := c.consensus.LogUnpin(carg)
if err != nil {
return err
}
@ -799,3 +880,102 @@ func (c *Cluster) getIDForPeer(pid peer.ID) (api.ID, error) {
}
return id, err
}
// allocate finds peers to allocate a hash using the informer and the monitor
// it should only be used with a positive replication factor
func (c *Cluster) allocate(hash *cid.Cid) ([]peer.ID, error) {
if c.config.ReplicationFactor <= 0 {
return nil, errors.New("cannot decide allocation for replication factor <= 0")
}
// Figure out who is currently holding this
var currentlyAllocatedPeers []peer.ID
st, err := c.consensus.State()
if err != nil {
// no state we assume it is empty. If there was other
// problem, we would fail to commit anyway.
currentlyAllocatedPeers = []peer.ID{}
} else {
carg := st.Get(hash)
currentlyAllocatedPeers = carg.Allocations
}
// initialize a candidate metrics map with all current clusterPeers
// (albeit with invalid metrics)
clusterPeers := c.peerManager.peers()
metricsMap := make(map[peer.ID]api.Metric)
for _, cp := range clusterPeers {
metricsMap[cp] = api.Metric{Valid: false}
}
// Request latest metrics logged by informers from the leader
metricName := c.informer.Name()
l, err := c.consensus.Leader()
if err != nil {
return nil, errors.New("cannot determine leading Monitor")
}
var metrics []api.Metric
err = c.rpcClient.Call(l,
"Cluster", "PeerMonitorLastMetrics",
metricName,
&metrics)
if err != nil {
return nil, err
}
// put metrics in the metricsMap if they belong to a current clusterPeer
for _, m := range metrics {
_, ok := metricsMap[m.Peer]
if !ok {
continue
}
metricsMap[m.Peer] = m
}
// Remove any invalid metric. This will clear any cluster peers
// for which we did not receive metrics.
for p, m := range metricsMap {
if m.Discard() {
delete(metricsMap, p)
}
}
// Move metrics from currentlyAllocatedPeers to a new map
currentlyAllocatedPeersMetrics := make(map[peer.ID]api.Metric)
for _, p := range currentlyAllocatedPeers {
m, ok := metricsMap[p]
if !ok {
continue
}
currentlyAllocatedPeersMetrics[p] = m
delete(metricsMap, p)
}
// how many allocations do we need (note we will re-allocate if we did
// not receive good metrics for currently allocated peeers)
needed := c.config.ReplicationFactor - len(currentlyAllocatedPeersMetrics)
// if we are already good (note invalid metrics would trigger
// re-allocations as they are not included in currentAllocMetrics)
if needed <= 0 {
return nil, fmt.Errorf("CID is already correctly allocated to %s", currentlyAllocatedPeers)
}
// Allocate is called with currentAllocMetrics which contains
// only currentlyAllocatedPeers when they have provided valid metrics.
candidateAllocs, err := c.allocator.Allocate(hash, currentlyAllocatedPeersMetrics, metricsMap)
if err != nil {
return nil, logError(err.Error())
}
// we don't have enough peers to pin
if len(candidateAllocs) < needed {
err = logError("cannot find enough allocations for this CID: needed: %d. Got: %s",
needed, candidateAllocs)
return nil, err
}
// return as many as needed
return candidateAllocs[0:needed], nil
}

View File

@ -4,7 +4,9 @@ import (
"errors"
"testing"
"github.com/ipfs/ipfs-cluster/allocator/numpinalloc"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/informer/numpin"
"github.com/ipfs/ipfs-cluster/state/mapstate"
"github.com/ipfs/ipfs-cluster/test"
@ -78,6 +80,9 @@ func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, *mapstate
cfg := testingConfig()
st := mapstate.NewMapState()
tracker := NewMapPinTracker(cfg)
mon := NewStdPeerMonitor(5)
alloc := numpinalloc.NewAllocator()
inf := numpin.NewInformer()
cl, err := NewCluster(
cfg,
@ -85,7 +90,9 @@ func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, *mapstate
ipfs,
st,
tracker,
)
mon,
alloc,
inf)
if err != nil {
t.Fatal("cannot create cluster:", err)
}
@ -129,7 +136,7 @@ func TestClusterStateSync(t *testing.T) {
// Modify state on the side so the sync does not
// happen on an empty slide
st.RmPin(c)
st.Rm(c)
_, err = cl.StateSync()
if err != nil {
t.Fatal("sync with recover should have worked:", err)

View File

@ -69,6 +69,9 @@ type Config struct {
// Number of seconds between StateSync() operations
StateSyncSeconds int
// ReplicationFactor is the number of copies we keep for each pin
ReplicationFactor int
// if a config has been loaded from disk, track the path
// so it can be saved to the same place.
path string
@ -125,6 +128,12 @@ type JSONConfig struct {
// tracker state. Normally states are synced anyway, but this helps
// when new nodes are joining the cluster
StateSyncSeconds int `json:"state_sync_seconds"`
// ReplicationFactor indicates the number of nodes that must pin content.
// For exampe, a replication_factor of 2 will prompt cluster to choose
// two nodes for each pinned hash. A replication_factor -1 will
// use every available node for each pin.
ReplicationFactor int `json:"replication_factor"`
}
// ToJSONConfig converts a Config object to its JSON representation which
@ -164,6 +173,7 @@ func (cfg *Config) ToJSONConfig() (j *JSONConfig, err error) {
IPFSNodeMultiaddress: cfg.IPFSNodeAddr.String(),
ConsensusDataFolder: cfg.ConsensusDataFolder,
StateSyncSeconds: cfg.StateSyncSeconds,
ReplicationFactor: cfg.ReplicationFactor,
}
return
}
@ -232,6 +242,11 @@ func (jcfg *JSONConfig) ToConfig() (c *Config, err error) {
return
}
if jcfg.ReplicationFactor == 0 {
logger.Warning("Replication factor set to -1 (pin everywhere)")
jcfg.ReplicationFactor = -1
}
if jcfg.StateSyncSeconds <= 0 {
jcfg.StateSyncSeconds = DefaultStateSyncSeconds
}
@ -248,6 +263,7 @@ func (jcfg *JSONConfig) ToConfig() (c *Config, err error) {
IPFSNodeAddr: ipfsNodeAddr,
ConsensusDataFolder: jcfg.ConsensusDataFolder,
StateSyncSeconds: jcfg.StateSyncSeconds,
ReplicationFactor: jcfg.ReplicationFactor,
}
return
}
@ -331,5 +347,6 @@ func NewDefaultConfig() (*Config, error) {
IPFSNodeAddr: ipfsNodeAddr,
ConsensusDataFolder: "ipfscluster-data",
StateSyncSeconds: DefaultStateSyncSeconds,
ReplicationFactor: -1,
}, nil
}

View File

@ -9,7 +9,6 @@ import (
"github.com/ipfs/ipfs-cluster/api"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
cid "github.com/ipfs/go-cid"
consensus "github.com/libp2p/go-libp2p-consensus"
host "github.com/libp2p/go-libp2p-host"
peer "github.com/libp2p/go-libp2p-peer"
@ -17,14 +16,6 @@ import (
ma "github.com/multiformats/go-multiaddr"
)
// Type of pin operation
const (
LogOpPin = iota + 1
LogOpUnpin
LogOpAddPeer
LogOpRmPeer
)
// LeaderTimeout specifies how long to wait before failing an operation
// because there is no leader
var LeaderTimeout = 15 * time.Second
@ -33,95 +24,6 @@ var LeaderTimeout = 15 * time.Second
// we give up
var CommitRetries = 2
type clusterLogOpType int
// clusterLogOp represents an operation for the OpLogConsensus system.
// It implements the consensus.Op interface.
type clusterLogOp struct {
Arg string
Type clusterLogOpType
ctx context.Context
rpcClient *rpc.Client
}
// ApplyTo applies the operation to the State
func (op *clusterLogOp) ApplyTo(cstate consensus.State) (consensus.State, error) {
state, ok := cstate.(State)
var err error
if !ok {
// Should never be here
panic("received unexpected state type")
}
switch op.Type {
case LogOpPin:
c, err := cid.Decode(op.Arg)
if err != nil {
panic("could not decode a CID we ourselves encoded")
}
err = state.AddPin(c)
if err != nil {
goto ROLLBACK
}
// Async, we let the PinTracker take care of any problems
op.rpcClient.Go("",
"Cluster",
"Track",
api.CidArg{c}.ToSerial(),
&struct{}{},
nil)
case LogOpUnpin:
c, err := cid.Decode(op.Arg)
if err != nil {
panic("could not decode a CID we ourselves encoded")
}
err = state.RmPin(c)
if err != nil {
goto ROLLBACK
}
// Async, we let the PinTracker take care of any problems
op.rpcClient.Go("",
"Cluster",
"Untrack",
api.CidArg{c}.ToSerial(),
&struct{}{},
nil)
case LogOpAddPeer:
addr, err := ma.NewMultiaddr(op.Arg)
if err != nil {
panic("could not decode a multiaddress we ourselves encoded")
}
op.rpcClient.Call("",
"Cluster",
"PeerManagerAddPeer",
api.MultiaddrToSerial(addr),
&struct{}{})
// TODO rebalance ops
case LogOpRmPeer:
pid, err := peer.IDB58Decode(op.Arg)
if err != nil {
panic("could not decode a PID we ourselves encoded")
}
op.rpcClient.Call("",
"Cluster",
"PeerManagerRmPeer",
pid,
&struct{}{})
// TODO rebalance ops
default:
logger.Error("unknown clusterLogOp type. Ignoring")
}
return state, nil
ROLLBACK:
// We failed to apply the operation to the state
// and therefore we need to request a rollback to the
// cluster to the previous state. This operation can only be performed
// by the cluster leader.
logger.Error("Rollbacks are not implemented")
return nil, errors.New("a rollback may be necessary. Reason: " + err.Error())
}
// Consensus handles the work of keeping a shared-state between
// the peers of an IPFS Cluster, as well as modifying that state and
// applying any updates in a thread-safe manner.
@ -132,7 +34,7 @@ type Consensus struct {
consensus consensus.OpLogConsensus
actor consensus.Actor
baseOp *clusterLogOp
baseOp *LogOp
raft *Raft
rpcClient *rpc.Client
@ -150,7 +52,7 @@ type Consensus struct {
// is discarded.
func NewConsensus(clusterPeers []peer.ID, host host.Host, dataFolder string, state State) (*Consensus, error) {
ctx := context.Background()
op := &clusterLogOp{
op := &LogOp{
ctx: context.Background(),
}
@ -297,22 +199,21 @@ func (cc *Consensus) Ready() <-chan struct{} {
return cc.readyCh
}
func (cc *Consensus) op(argi interface{}, t clusterLogOpType) *clusterLogOp {
var arg string
func (cc *Consensus) op(argi interface{}, t LogOpType) *LogOp {
switch argi.(type) {
case *cid.Cid:
arg = argi.(*cid.Cid).String()
case peer.ID:
arg = peer.IDB58Encode(argi.(peer.ID))
case api.CidArg:
return &LogOp{
Cid: argi.(api.CidArg).ToSerial(),
Type: t,
}
case ma.Multiaddr:
arg = argi.(ma.Multiaddr).String()
return &LogOp{
Peer: api.MultiaddrToSerial(argi.(ma.Multiaddr)),
Type: t,
}
default:
panic("bad type")
}
return &clusterLogOp{
Arg: arg,
Type: t,
}
}
// returns true if the operation was redirected to the leader
@ -339,12 +240,12 @@ func (cc *Consensus) redirectToLeader(method string, arg interface{}) (bool, err
return true, err
}
func (cc *Consensus) logOpCid(rpcOp string, opType clusterLogOpType, c *cid.Cid) error {
func (cc *Consensus) logOpCid(rpcOp string, opType LogOpType, carg api.CidArg) error {
var finalErr error
for i := 0; i < CommitRetries; i++ {
logger.Debugf("Try %d", i)
redirected, err := cc.redirectToLeader(
rpcOp, api.CidArg{c}.ToSerial())
rpcOp, carg.ToSerial())
if err != nil {
finalErr = err
continue
@ -356,8 +257,7 @@ func (cc *Consensus) logOpCid(rpcOp string, opType clusterLogOpType, c *cid.Cid)
// It seems WE are the leader.
// Create pin operation for the log
op := cc.op(c, opType)
op := cc.op(carg, opType)
_, err = cc.consensus.CommitOp(op)
if err != nil {
// This means the op did not make it to the log
@ -374,21 +274,21 @@ func (cc *Consensus) logOpCid(rpcOp string, opType clusterLogOpType, c *cid.Cid)
switch opType {
case LogOpPin:
logger.Infof("pin committed to global state: %s", c)
logger.Infof("pin committed to global state: %s", carg.Cid)
case LogOpUnpin:
logger.Infof("unpin committed to global state: %s", c)
logger.Infof("unpin committed to global state: %s", carg.Cid)
}
return nil
}
// LogPin submits a Cid to the shared state of the cluster. It will forward
// the operation to the leader if this is not it.
func (cc *Consensus) LogPin(c *cid.Cid) error {
func (cc *Consensus) LogPin(c api.CidArg) error {
return cc.logOpCid("ConsensusLogPin", LogOpPin, c)
}
// LogUnpin removes a Cid from the shared state of the cluster.
func (cc *Consensus) LogUnpin(c *cid.Cid) error {
func (cc *Consensus) LogUnpin(c api.CidArg) error {
return cc.logOpCid("ConsensusLogUnpin", LogOpUnpin, c)
}
@ -458,7 +358,11 @@ func (cc *Consensus) LogRmPeer(pid peer.ID) error {
// It seems WE are the leader.
// Create pin operation for the log
op := cc.op(pid, LogOpRmPeer)
addr, err := ma.NewMultiaddr("/ipfs/" + peer.IDB58Encode(pid))
if err != nil {
return err
}
op := cc.op(addr, LogOpRmPeer)
_, err = cc.consensus.CommitOp(op)
if err != nil {
// This means the op did not make it to the log

View File

@ -6,6 +6,7 @@ import (
"testing"
"time"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/state/mapstate"
"github.com/ipfs/ipfs-cluster/test"
@ -13,76 +14,6 @@ import (
peer "github.com/libp2p/go-libp2p-peer"
)
func TestApplyToPin(t *testing.T) {
op := &clusterLogOp{
Arg: test.TestCid1,
Type: LogOpPin,
ctx: context.Background(),
rpcClient: test.NewMockRPCClient(t),
}
st := mapstate.NewMapState()
op.ApplyTo(st)
pins := st.ListPins()
if len(pins) != 1 || pins[0].String() != test.TestCid1 {
t.Error("the state was not modified correctly")
}
}
func TestApplyToUnpin(t *testing.T) {
op := &clusterLogOp{
Arg: test.TestCid1,
Type: LogOpUnpin,
ctx: context.Background(),
rpcClient: test.NewMockRPCClient(t),
}
st := mapstate.NewMapState()
c, _ := cid.Decode(test.TestCid1)
st.AddPin(c)
op.ApplyTo(st)
pins := st.ListPins()
if len(pins) != 0 {
t.Error("the state was not modified correctly")
}
}
func TestApplyToBadState(t *testing.T) {
defer func() {
if r := recover(); r == nil {
t.Error("should have recovered an error")
}
}()
op := &clusterLogOp{
Arg: test.TestCid1,
Type: LogOpUnpin,
ctx: context.Background(),
rpcClient: test.NewMockRPCClient(t),
}
var st interface{}
op.ApplyTo(st)
}
func TestApplyToBadCid(t *testing.T) {
defer func() {
if r := recover(); r == nil {
t.Error("should have recovered an error")
}
}()
op := &clusterLogOp{
Arg: "agadfaegf",
Type: LogOpPin,
ctx: context.Background(),
rpcClient: test.NewMockRPCClient(t),
}
st := mapstate.NewMapState()
op.ApplyTo(st)
}
func cleanRaft() {
os.RemoveAll(testingConfig().ConsensusDataFolder)
}
@ -128,7 +59,7 @@ func TestConsensusPin(t *testing.T) {
defer cc.Shutdown()
c, _ := cid.Decode(test.TestCid1)
err := cc.LogPin(c)
err := cc.LogPin(api.CidArg{Cid: c, Everywhere: true})
if err != nil {
t.Error("the operation did not make it to the log:", err)
}
@ -139,8 +70,8 @@ func TestConsensusPin(t *testing.T) {
t.Fatal("error gettinng state:", err)
}
pins := st.ListPins()
if len(pins) != 1 || pins[0].String() != test.TestCid1 {
pins := st.List()
if len(pins) != 1 || pins[0].Cid.String() != test.TestCid1 {
t.Error("the added pin should be in the state")
}
}
@ -151,7 +82,7 @@ func TestConsensusUnpin(t *testing.T) {
defer cc.Shutdown()
c, _ := cid.Decode(test.TestCid2)
err := cc.LogUnpin(c)
err := cc.LogUnpin(api.CidArgCid(c))
if err != nil {
t.Error("the operation did not make it to the log:", err)
}

77
informer/numpin/numpin.go Normal file
View File

@ -0,0 +1,77 @@
// Package numpin implements an ipfs-cluster informer which determines how many
// items this peer is pinning and returns it as api.Metric
package numpin
import (
"fmt"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
"github.com/ipfs/ipfs-cluster/api"
)
// MetricTTL specifies how long our reported metric is valid in seconds.
var MetricTTL = 10
// MetricName specifies the name of our metric
var MetricName = "numpin"
// Informer is a simple object to implement the ipfscluster.Informer
// and Component interfaces
type Informer struct {
rpcClient *rpc.Client
}
func NewInformer() *Informer {
return &Informer{}
}
// SetClient provides us with an rpc.Client which allows
// contacting other components in the cluster.
func (npi *Informer) SetClient(c *rpc.Client) {
npi.rpcClient = c
}
// Shutdown is called on cluster shutdown. We just invalidate
// any metrics from this point.
func (npi *Informer) Shutdown() error {
npi.rpcClient = nil
return nil
}
// Name returns the name of this informer
func (npi *Informer) Name() string {
return MetricName
}
// GetMetric contacts the IPFSConnector component and
// requests the `pin ls` command. We return the number
// of pins in IPFS.
func (npi *Informer) GetMetric() api.Metric {
if npi.rpcClient == nil {
return api.Metric{
Valid: false,
}
}
pinMap := make(map[string]api.IPFSPinStatus)
// make use of the RPC API to obtain information
// about the number of pins in IPFS. See RPCAPI docs.
err := npi.rpcClient.Call("", // Local call
"Cluster", // Service name
"IPFSPinLs", // Method name
"recursive", // in arg
&pinMap) // out arg
valid := err == nil
m := api.Metric{
Name: MetricName,
Value: fmt.Sprintf("%d", len(pinMap)),
Valid: valid,
}
m.SetTTL(MetricTTL)
return m
}

View File

@ -0,0 +1,45 @@
package numpin
import (
"testing"
"github.com/ipfs/ipfs-cluster/api"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
)
type mockService struct{}
func mockRPCClient(t *testing.T) *rpc.Client {
s := rpc.NewServer(nil, "mock")
c := rpc.NewClientWithServer(nil, "mock", s)
err := s.RegisterName("Cluster", &mockService{})
if err != nil {
t.Fatal(err)
}
return c
}
func (mock *mockService) IPFSPinLs(in string, out *map[string]api.IPFSPinStatus) error {
*out = map[string]api.IPFSPinStatus{
"QmPGDFvBkgWhvzEK9qaTWrWurSwqXNmhnK3hgELPdZZNPa": api.IPFSPinStatusRecursive,
"QmUZ13osndQ5uL4tPWHXe3iBgBgq9gfewcBMSCAuMBsDJ6": api.IPFSPinStatusRecursive,
}
return nil
}
func Test(t *testing.T) {
inf := NewInformer()
m := inf.GetMetric()
if m.Valid {
t.Error("metric should be invalid")
}
inf.SetClient(mockRPCClient(t))
m = inf.GetMetric()
if !m.Valid {
t.Error("metric should be valid")
}
if m.Value != "2" {
t.Error("bad metric value")
}
}

View File

@ -14,6 +14,7 @@ const (
formatGPInfo
formatString
formatVersion
formatCidArg
)
type format int
@ -45,6 +46,10 @@ func textFormatObject(body []byte, format int) {
var obj api.Version
textFormatDecodeOn(body, &obj)
textFormatPrintVersion(&obj)
case formatCidArg:
var obj api.CidArgSerial
textFormatDecodeOn(body, &obj)
textFormatPrintCidArg(&obj)
default:
var obj interface{}
textFormatDecodeOn(body, &obj)
@ -99,3 +104,12 @@ func textFormatPrintGPinfo(obj *api.GlobalPinInfoSerial) {
func textFormatPrintVersion(obj *api.Version) {
fmt.Println(obj.Version)
}
func textFormatPrintCidArg(obj *api.CidArgSerial) {
fmt.Printf("%s | Allocations: ", obj.Cid)
if obj.Everywhere {
fmt.Printf("[everywhere]\n")
} else {
fmt.Printf("%s", obj.Allocations)
}
}

View File

@ -225,9 +225,10 @@ in the cluster and should be part of the list offered by "pin ls".
cidStr := c.Args().First()
_, err := cid.Decode(cidStr)
checkErr("parsing cid", err)
request("POST", "/pins/"+cidStr, nil)
resp := request("POST", "/pins/"+cidStr, nil)
formatResponse(c, resp)
time.Sleep(500 * time.Millisecond)
resp := request("GET", "/pins/"+cidStr, nil)
resp = request("GET", "/pins/"+cidStr, nil)
formatResponse(c, resp)
return nil
},
@ -260,12 +261,13 @@ although unpinning operations in the cluster may take longer or fail.
Name: "ls",
Usage: "List tracked CIDs",
UsageText: `
This command will list the CIDs which are tracked by IPFS Cluster. This
list does not include information about tracking status or location, it
This command will list the CIDs which are tracked by IPFS Cluster and to
which peers they are currently allocated. This list does not include
any monitoring information about the
merely represents the list of pins which are part of the global state of
the cluster. For specific information, use "status".
`,
Flags: []cli.Flag{parseFlag(formatString)},
Flags: []cli.Flag{parseFlag(formatCidArg)},
Action: func(c *cli.Context) error {
resp := request("GET", "/pinlist", nil)
formatResponse(c, resp)

View File

@ -13,6 +13,8 @@ import (
"github.com/urfave/cli"
ipfscluster "github.com/ipfs/ipfs-cluster"
"github.com/ipfs/ipfs-cluster/allocator/numpinalloc"
"github.com/ipfs/ipfs-cluster/informer/numpin"
"github.com/ipfs/ipfs-cluster/state/mapstate"
)
@ -237,12 +239,19 @@ func run(c *cli.Context) error {
state := mapstate.NewMapState()
tracker := ipfscluster.NewMapPinTracker(cfg)
mon := ipfscluster.NewStdPeerMonitor(5)
informer := numpin.NewInformer()
alloc := numpinalloc.NewAllocator()
cluster, err := ipfscluster.NewCluster(
cfg,
api,
proxy,
state,
tracker)
tracker,
mon,
alloc,
informer)
checkErr("starting cluster", err)
signalChan := make(chan os.Signal, 20)

View File

@ -241,7 +241,9 @@ func (ipfs *IPFSHTTPConnector) pinOpHandler(op string, w http.ResponseWriter, r
err = ipfs.rpcClient.Call("",
"Cluster",
op,
api.CidArgSerial{arg},
api.CidArgSerial{
Cid: arg,
},
&struct{}{})
if err != nil {
@ -270,7 +272,7 @@ func (ipfs *IPFSHTTPConnector) pinLsHandler(w http.ResponseWriter, r *http.Reque
pinLs := ipfsPinLsResp{}
pinLs.Keys = make(map[string]ipfsPinType)
var pins []string
var pins []api.CidArgSerial
err := ipfs.rpcClient.Call("",
"Cluster",
"PinList",
@ -283,7 +285,7 @@ func (ipfs *IPFSHTTPConnector) pinLsHandler(w http.ResponseWriter, r *http.Reque
}
for _, pin := range pins {
pinLs.Keys[pin] = ipfsPinType{
pinLs.Keys[pin.Cid] = ipfsPinType{
Type: "recursive",
}
}
@ -507,8 +509,8 @@ func (ipfs *IPFSHTTPConnector) get(path string) ([]byte, error) {
msg = fmt.Sprintf("IPFS unsuccessful: %d: %s",
resp.StatusCode, ipfsErr.Message)
} else {
msg = fmt.Sprintf("IPFS-get unsuccessful: %d: %s",
resp.StatusCode, body)
msg = fmt.Sprintf("IPFS-get '%s' unsuccessful: %d: %s",
path, resp.StatusCode, body)
}
logger.Warning(msg)
return body, errors.New(msg)

View File

@ -58,15 +58,16 @@ type Peered interface {
// is used by the Consensus component to keep track of
// objects which objects are pinned. This component should be thread safe.
type State interface {
// AddPin adds a pin to the State
AddPin(*cid.Cid) error
// RmPin removes a pin from the State
RmPin(*cid.Cid) error
// ListPins lists all the pins in the state
ListPins() []*cid.Cid
// HasPin returns true if the state is holding a Cid
HasPin(*cid.Cid) bool
// AddPeer adds a peer to the shared state
// Add adds a pin to the State
Add(api.CidArg) error
// Rm removes a pin from the State
Rm(*cid.Cid) error
// List lists all the pins in the state
List() []api.CidArg
// Has returns true if the state is holding information for a Cid
Has(*cid.Cid) bool
// Get returns the information attacthed to this pin
Get(*cid.Cid) api.CidArg
}
// PinTracker represents a component which tracks the status of
@ -76,7 +77,7 @@ type PinTracker interface {
Component
// Track tells the tracker that a Cid is now under its supervision
// The tracker may decide to perform an IPFS pin.
Track(*cid.Cid) error
Track(api.CidArg) error
// Untrack tells the tracker that a Cid is to be forgotten. The tracker
// may perform an IPFS unpin operation.
Untrack(*cid.Cid) error
@ -93,3 +94,43 @@ type PinTracker interface {
// Recover retriggers a Pin/Unpin operation in Cids with error status.
Recover(*cid.Cid) (api.PinInfo, error)
}
// Informer returns Metric information in a peer. The metrics produced by
// informers are then passed to a PinAllocator which will use them to
// determine where to pin content.
type Informer interface {
Component
Name() string
GetMetric() api.Metric
}
// PinAllocator decides where to pin certain content. In order to make such
// decision, it receives the pin arguments, the peers which are currently
// allocated to the content and metrics available for all peers which could
// allocate the content.
type PinAllocator interface {
Component
// Allocate returns the list of peers that should be assigned to
// Pin content in oder of preference (from the most preferred to the
// least). The current map contains the metrics for all peers
// which are currently pinning the content. The candidates map
// contains the metrics for all pins which are eligible for pinning
// the content.
Allocate(c *cid.Cid, current, candidates map[peer.ID]api.Metric) ([]peer.ID, error)
}
// PeerMonitor is a component in charge of monitoring the peers in the cluster
// and providing candidates to the PinAllocator when a pin request arrives.
type PeerMonitor interface {
Component
// LogMetric stores a metric. Metrics are pushed reguarly from each peer
// to the active PeerMonitor.
LogMetric(api.Metric)
// LastMetrics returns a map with the latest metrics of matching name
// for the current cluster peers.
LastMetrics(name string) []api.Metric
// Alerts delivers alerts generated when this peer monitor detects
// a problem (i.e. metrics not arriving as expected). Alerts are used to
// trigger rebalancing operations.
Alerts() <-chan api.Alert
}

View File

@ -4,11 +4,14 @@ import (
"fmt"
"math/rand"
"os"
"strings"
"sync"
"testing"
"time"
"github.com/ipfs/ipfs-cluster/allocator/numpinalloc"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/informer/numpin"
"github.com/ipfs/ipfs-cluster/state/mapstate"
"github.com/ipfs/ipfs-cluster/test"
@ -51,7 +54,7 @@ func randomBytes() []byte {
return bs
}
func createComponents(t *testing.T, i int) (*Config, *RESTAPI, *IPFSHTTPConnector, *mapstate.MapState, *MapPinTracker, *test.IpfsMock) {
func createComponents(t *testing.T, i int) (*Config, API, IPFSConnector, State, PinTracker, PeerMonitor, PinAllocator, Informer, *test.IpfsMock) {
mock := test.NewIpfsMock()
clusterAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", clusterPort+i))
apiAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", apiPort+i))
@ -72,6 +75,7 @@ func createComponents(t *testing.T, i int) (*Config, *RESTAPI, *IPFSHTTPConnecto
cfg.IPFSNodeAddr = nodeAddr
cfg.ConsensusDataFolder = "./e2eTestRaft/" + pid.Pretty()
cfg.LeaveOnShutdown = false
cfg.ReplicationFactor = -1
api, err := NewRESTAPI(cfg)
checkErr(t, err)
@ -79,41 +83,51 @@ func createComponents(t *testing.T, i int) (*Config, *RESTAPI, *IPFSHTTPConnecto
checkErr(t, err)
state := mapstate.NewMapState()
tracker := NewMapPinTracker(cfg)
mon := NewStdPeerMonitor(5)
alloc := numpinalloc.NewAllocator()
numpin.MetricTTL = 1 // second
inf := numpin.NewInformer()
return cfg, api, ipfs, state, tracker, mock
return cfg, api, ipfs, state, tracker, mon, alloc, inf, mock
}
func createCluster(t *testing.T, cfg *Config, api *RESTAPI, ipfs *IPFSHTTPConnector, state *mapstate.MapState, tracker *MapPinTracker) *Cluster {
cl, err := NewCluster(cfg, api, ipfs, state, tracker)
func createCluster(t *testing.T, cfg *Config, api API, ipfs IPFSConnector, state State, tracker PinTracker, mon PeerMonitor, alloc PinAllocator, inf Informer) *Cluster {
cl, err := NewCluster(cfg, api, ipfs, state, tracker, mon, alloc, inf)
checkErr(t, err)
<-cl.Ready()
return cl
}
func createOnePeerCluster(t *testing.T, nth int) (*Cluster, *test.IpfsMock) {
cfg, api, ipfs, state, tracker, mock := createComponents(t, nth)
cl := createCluster(t, cfg, api, ipfs, state, tracker)
cfg, api, ipfs, state, tracker, mon, alloc, inf, mock := createComponents(t, nth)
cl := createCluster(t, cfg, api, ipfs, state, tracker, mon, alloc, inf)
return cl, mock
}
func createClusters(t *testing.T) ([]*Cluster, []*test.IpfsMock) {
os.RemoveAll("./e2eTestRaft")
cfgs := make([]*Config, nClusters, nClusters)
apis := make([]*RESTAPI, nClusters, nClusters)
ipfss := make([]*IPFSHTTPConnector, nClusters, nClusters)
states := make([]*mapstate.MapState, nClusters, nClusters)
trackers := make([]*MapPinTracker, nClusters, nClusters)
apis := make([]API, nClusters, nClusters)
ipfss := make([]IPFSConnector, nClusters, nClusters)
states := make([]State, nClusters, nClusters)
trackers := make([]PinTracker, nClusters, nClusters)
mons := make([]PeerMonitor, nClusters, nClusters)
allocs := make([]PinAllocator, nClusters, nClusters)
infs := make([]Informer, nClusters, nClusters)
ipfsMocks := make([]*test.IpfsMock, nClusters, nClusters)
clusters := make([]*Cluster, nClusters, nClusters)
clusterPeers := make([]ma.Multiaddr, nClusters, nClusters)
for i := 0; i < nClusters; i++ {
cfg, api, ipfs, state, tracker, mock := createComponents(t, i)
cfg, api, ipfs, state, tracker, mon, alloc, inf, mock := createComponents(t, i)
cfgs[i] = cfg
apis[i] = api
ipfss[i] = ipfs
states[i] = state
trackers[i] = tracker
mons[i] = mon
allocs[i] = alloc
infs[i] = inf
ipfsMocks[i] = mock
addr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d/ipfs/%s",
clusterPort+i,
@ -143,7 +157,7 @@ func createClusters(t *testing.T) ([]*Cluster, []*test.IpfsMock) {
for i := 0; i < nClusters; i++ {
wg.Add(1)
go func(i int) {
clusters[i] = createCluster(t, cfgs[i], apis[i], ipfss[i], states[i], trackers[i])
clusters[i] = createCluster(t, cfgs[i], apis[i], ipfss[i], states[i], trackers[i], mons[i], allocs[i], infs[i])
wg.Done()
}(i)
}
@ -283,12 +297,12 @@ func TestClustersPin(t *testing.T) {
for i := 0; i < nPins; i++ {
j := rand.Intn(nClusters) // choose a random cluster peer
err := clusters[j].Unpin(pinList[i])
err := clusters[j].Unpin(pinList[i].Cid)
if err != nil {
t.Errorf("error unpinning %s: %s", pinList[i], err)
}
// test re-unpin
err = clusters[j].Unpin(pinList[i])
err = clusters[j].Unpin(pinList[i].Cid)
if err != nil {
t.Errorf("error re-unpinning %s: %s", pinList[i], err)
}
@ -606,3 +620,212 @@ func TestClustersShutdown(t *testing.T) {
runF(t, clusters, f)
runF(t, clusters, f)
}
func TestClustersReplication(t *testing.T) {
clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock)
for _, c := range clusters {
c.config.ReplicationFactor = nClusters - 1
}
// Why is replication factor nClusters - 1?
// Because that way we know that pinning nCluster
// pins with an strategy like numpins (which tries
// to make everyone pin the same number of things),
// will result in each peer holding locally exactly
// nCluster pins.
// Let some metrics arrive
time.Sleep(time.Second)
tmpCid, _ := cid.Decode(test.TestCid1)
prefix := tmpCid.Prefix()
for i := 0; i < nClusters; i++ {
// Pick a random cluster and hash
j := rand.Intn(nClusters) // choose a random cluster peer
h, err := prefix.Sum(randomBytes()) // create random cid
checkErr(t, err)
err = clusters[j].Pin(h)
if err != nil {
t.Error(err)
}
time.Sleep(time.Second / 2)
// check that it is held by exactly nClusters -1 peers
gpi, err := clusters[j].Status(h)
if err != nil {
t.Fatal(err)
}
numLocal := 0
numRemote := 0
for _, v := range gpi.PeerMap {
if v.Status == api.TrackerStatusPinned {
numLocal++
} else if v.Status == api.TrackerStatusRemote {
numRemote++
}
}
if numLocal != nClusters-1 {
t.Errorf("We wanted replication %d but it's only %d",
nClusters-1, numLocal)
}
if numRemote != 1 {
t.Errorf("We wanted 1 peer track as remote but %d do", numRemote)
}
time.Sleep(time.Second / 2) // this is for metric to be up to date
}
f := func(t *testing.T, c *Cluster) {
pinfos := c.tracker.StatusAll()
if len(pinfos) != nClusters {
t.Error("Pinfos does not have the expected pins")
}
numRemote := 0
numLocal := 0
for _, pi := range pinfos {
switch pi.Status {
case api.TrackerStatusPinned:
numLocal++
case api.TrackerStatusRemote:
numRemote++
}
}
if numLocal != nClusters-1 {
t.Errorf("Expected %d local pins but got %d", nClusters-1, numLocal)
}
if numRemote != 1 {
t.Errorf("Expected 1 remote pin but got %d", numRemote)
}
pins := c.Pins()
for _, pin := range pins {
allocs := pin.Allocations
if len(allocs) != nClusters-1 {
t.Errorf("Allocations are [%s]", allocs)
}
for _, a := range allocs {
if a == c.id {
pinfo := c.tracker.Status(pin.Cid)
if pinfo.Status != api.TrackerStatusPinned {
t.Errorf("Peer %s was allocated but it is not pinning cid", c.id)
}
}
}
}
}
runF(t, clusters, f)
}
// In this test we check that repinning something
// when a node has gone down will re-assign the pin
func TestClustersReplicationRealloc(t *testing.T) {
clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock)
for _, c := range clusters {
c.config.ReplicationFactor = nClusters - 1
}
// Let some metrics arrive
time.Sleep(time.Second)
j := rand.Intn(nClusters)
h, _ := cid.Decode(test.TestCid1)
err := clusters[j].Pin(h)
if err != nil {
t.Error(err)
}
// Let the pin arrive
time.Sleep(time.Second / 2)
// Re-pin should fail as it is allocated already
err = clusters[j].Pin(h)
if err == nil {
t.Fatal("expected an error")
}
t.Log(err)
var killedClusterIndex int
// find someone that pinned it and kill that cluster
for i, c := range clusters {
pinfo := c.tracker.Status(h)
if pinfo.Status == api.TrackerStatusPinned {
killedClusterIndex = i
c.Shutdown()
return
}
}
// let metrics expire
time.Sleep(2 * time.Second)
// now pin should succeed
err = clusters[j].Pin(h)
if err != nil {
t.Fatal(err)
}
numPinned := 0
for i, c := range clusters {
if i == killedClusterIndex {
continue
}
pinfo := c.tracker.Status(h)
if pinfo.Status == api.TrackerStatusPinned {
numPinned++
}
}
if numPinned != nClusters-1 {
t.Error("pin should have been correctly re-assigned")
}
}
// In this test we try to pin something when there are not
// as many available peers a we need. It's like before, except
// more peers are killed.
func TestClustersReplicationNotEnoughPeers(t *testing.T) {
if nClusters < 5 {
t.Skip("Need at least 5 peers")
}
clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock)
for _, c := range clusters {
c.config.ReplicationFactor = nClusters - 1
}
// Let some metrics arrive
time.Sleep(time.Second)
j := rand.Intn(nClusters)
h, _ := cid.Decode(test.TestCid1)
err := clusters[j].Pin(h)
if err != nil {
t.Error(err)
}
// Let the pin arrive
time.Sleep(time.Second / 2)
clusters[1].Shutdown()
clusters[2].Shutdown()
// Time for consensus to catch up again in case we hit the leader.
delay()
err = clusters[j].Pin(h)
if err == nil {
t.Fatal("expected an error")
}
if !strings.Contains(err.Error(), "enough allocations") {
t.Error("different error than expected")
t.Error(err)
}
t.Log(err)
}

109
log_op.go Normal file
View File

@ -0,0 +1,109 @@
package ipfscluster
import (
"context"
"errors"
"github.com/ipfs/ipfs-cluster/api"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
consensus "github.com/libp2p/go-libp2p-consensus"
peer "github.com/libp2p/go-libp2p-peer"
ma "github.com/multiformats/go-multiaddr"
)
// Type of consensus operation
const (
LogOpPin = iota + 1
LogOpUnpin
LogOpAddPeer
LogOpRmPeer
)
// LogOpType expresses the type of a consensus Operation
type LogOpType int
// LogOp represents an operation for the OpLogConsensus system.
// It implements the consensus.Op interface and it is used by the
// Consensus component.
type LogOp struct {
Cid api.CidArgSerial
Peer api.MultiaddrSerial
Type LogOpType
ctx context.Context
rpcClient *rpc.Client
}
// ApplyTo applies the operation to the State
func (op *LogOp) ApplyTo(cstate consensus.State) (consensus.State, error) {
state, ok := cstate.(State)
var err error
if !ok {
// Should never be here
panic("received unexpected state type")
}
switch op.Type {
case LogOpPin:
arg := op.Cid.ToCidArg()
err = state.Add(arg)
if err != nil {
goto ROLLBACK
}
// Async, we let the PinTracker take care of any problems
op.rpcClient.Go("",
"Cluster",
"Track",
arg.ToSerial(),
&struct{}{},
nil)
case LogOpUnpin:
arg := op.Cid.ToCidArg()
err = state.Rm(arg.Cid)
if err != nil {
goto ROLLBACK
}
// Async, we let the PinTracker take care of any problems
op.rpcClient.Go("",
"Cluster",
"Untrack",
arg.ToSerial(),
&struct{}{},
nil)
case LogOpAddPeer:
addr := op.Peer.ToMultiaddr()
op.rpcClient.Call("",
"Cluster",
"PeerManagerAddPeer",
api.MultiaddrToSerial(addr),
&struct{}{})
// TODO rebalance ops
case LogOpRmPeer:
addr := op.Peer.ToMultiaddr()
pidstr, err := addr.ValueForProtocol(ma.P_IPFS)
if err != nil {
panic("peer badly encoded")
}
pid, err := peer.IDB58Decode(pidstr)
if err != nil {
panic("could not decode a PID we ourselves encoded")
}
op.rpcClient.Call("",
"Cluster",
"PeerManagerRmPeer",
pid,
&struct{}{})
// TODO rebalance ops
default:
logger.Error("unknown LogOp type. Ignoring")
}
return state, nil
ROLLBACK:
// We failed to apply the operation to the state
// and therefore we need to request a rollback to the
// cluster to the previous state. This operation can only be performed
// by the cluster leader.
logger.Error("Rollbacks are not implemented")
return nil, errors.New("a rollback may be necessary. Reason: " + err.Error())
}

82
log_op_test.go Normal file
View File

@ -0,0 +1,82 @@
package ipfscluster
import (
"context"
"testing"
cid "github.com/ipfs/go-cid"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/state/mapstate"
"github.com/ipfs/ipfs-cluster/test"
)
func TestApplyToPin(t *testing.T) {
op := &LogOp{
Cid: api.CidArgSerial{Cid: test.TestCid1},
Type: LogOpPin,
ctx: context.Background(),
rpcClient: test.NewMockRPCClient(t),
}
st := mapstate.NewMapState()
op.ApplyTo(st)
pins := st.List()
if len(pins) != 1 || pins[0].Cid.String() != test.TestCid1 {
t.Error("the state was not modified correctly")
}
}
func TestApplyToUnpin(t *testing.T) {
op := &LogOp{
Cid: api.CidArgSerial{Cid: test.TestCid1},
Type: LogOpUnpin,
ctx: context.Background(),
rpcClient: test.NewMockRPCClient(t),
}
st := mapstate.NewMapState()
c, _ := cid.Decode(test.TestCid1)
st.Add(api.CidArg{Cid: c, Everywhere: true})
op.ApplyTo(st)
pins := st.List()
if len(pins) != 0 {
t.Error("the state was not modified correctly")
}
}
func TestApplyToBadState(t *testing.T) {
defer func() {
if r := recover(); r == nil {
t.Error("should have recovered an error")
}
}()
op := &LogOp{
Cid: api.CidArgSerial{Cid: test.TestCid1},
Type: LogOpUnpin,
ctx: context.Background(),
rpcClient: test.NewMockRPCClient(t),
}
var st interface{}
op.ApplyTo(st)
}
// func TestApplyToBadCid(t *testing.T) {
// defer func() {
// if r := recover(); r == nil {
// t.Error("should have recovered an error")
// }
// }()
// op := &LogOp{
// Cid: api.CidArgSerial{Cid: "agadfaegf"},
// Type: LogOpPin,
// ctx: context.Background(),
// rpcClient: test.NewMockRPCClient(t),
// }
// st := mapstate.NewMapState()
// op.ApplyTo(st)
// }

View File

@ -21,6 +21,11 @@ var (
UnpinningTimeout = 10 * time.Second
)
// PinQueueSize specifies the maximum amount of pin operations waiting
// to be performed. If the queue is full, pins/unpins will be set to
// pinError/unpinError.
var PinQueueSize = 1024
var (
errUnpinningTimeout = errors.New("unpinning operation is taking too long")
errPinningTimeout = errors.New("pinning operation is taking too long")
@ -34,45 +39,62 @@ type MapPinTracker struct {
mux sync.RWMutex
status map[string]api.PinInfo
ctx context.Context
ctx context.Context
cancel func()
rpcClient *rpc.Client
rpcReady chan struct{}
peerID peer.ID
peerID peer.ID
pinCh chan api.CidArg
unpinCh chan api.CidArg
shutdownLock sync.Mutex
shutdown bool
shutdownCh chan struct{}
wg sync.WaitGroup
}
// NewMapPinTracker returns a new object which has been correcly
// initialized with the given configuration.
func NewMapPinTracker(cfg *Config) *MapPinTracker {
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
mpt := &MapPinTracker{
ctx: ctx,
status: make(map[string]api.PinInfo),
rpcReady: make(chan struct{}, 1),
peerID: cfg.ID,
shutdownCh: make(chan struct{}, 1),
ctx: ctx,
cancel: cancel,
status: make(map[string]api.PinInfo),
rpcReady: make(chan struct{}, 1),
peerID: cfg.ID,
pinCh: make(chan api.CidArg, PinQueueSize),
unpinCh: make(chan api.CidArg, PinQueueSize),
}
mpt.run()
go mpt.pinWorker()
go mpt.unpinWorker()
return mpt
}
// run does nothing other than give MapPinTracker a cancellable context.
func (mpt *MapPinTracker) run() {
mpt.wg.Add(1)
go func() {
defer mpt.wg.Done()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mpt.ctx = ctx
<-mpt.rpcReady
logger.Info("PinTracker ready")
<-mpt.shutdownCh
}()
// reads the queue and makes pins to the IPFS daemon one by one
func (mpt *MapPinTracker) pinWorker() {
for {
select {
case p := <-mpt.pinCh:
mpt.pin(p)
case <-mpt.ctx.Done():
return
}
}
}
// reads the queue and makes unpin requests to the IPFS daemon
func (mpt *MapPinTracker) unpinWorker() {
for {
select {
case p := <-mpt.unpinCh:
mpt.unpin(p)
case <-mpt.ctx.Done():
return
}
}
}
// Shutdown finishes the services provided by the MapPinTracker and cancels
@ -87,8 +109,8 @@ func (mpt *MapPinTracker) Shutdown() error {
}
logger.Info("stopping MapPinTracker")
mpt.cancel()
close(mpt.rpcReady)
mpt.shutdownCh <- struct{}{}
mpt.wg.Wait()
mpt.shutdown = true
return nil
@ -164,47 +186,83 @@ func (mpt *MapPinTracker) unsafeSetError(c *cid.Cid, err error) {
}
}
func (mpt *MapPinTracker) pin(c *cid.Cid) error {
mpt.set(c, api.TrackerStatusPinning)
func (mpt *MapPinTracker) isRemote(c api.CidArg) bool {
if c.Everywhere {
return false
}
for _, p := range c.Allocations {
if p == mpt.peerID {
return false
}
}
return true
}
func (mpt *MapPinTracker) pin(c api.CidArg) error {
mpt.set(c.Cid, api.TrackerStatusPinning)
err := mpt.rpcClient.Call("",
"Cluster",
"IPFSPin",
api.CidArg{c}.ToSerial(),
c.ToSerial(),
&struct{}{})
if err != nil {
mpt.setError(c, err)
mpt.setError(c.Cid, err)
return err
}
mpt.set(c, api.TrackerStatusPinned)
mpt.set(c.Cid, api.TrackerStatusPinned)
return nil
}
func (mpt *MapPinTracker) unpin(c *cid.Cid) error {
mpt.set(c, api.TrackerStatusUnpinning)
func (mpt *MapPinTracker) unpin(c api.CidArg) error {
err := mpt.rpcClient.Call("",
"Cluster",
"IPFSUnpin",
api.CidArg{c}.ToSerial(),
c.ToSerial(),
&struct{}{})
if err != nil {
mpt.setError(c, err)
mpt.setError(c.Cid, err)
return err
}
mpt.set(c, api.TrackerStatusUnpinned)
mpt.set(c.Cid, api.TrackerStatusUnpinned)
return nil
}
// Track tells the MapPinTracker to start managing a Cid,
// possibly trigerring Pin operations on the IPFS daemon.
func (mpt *MapPinTracker) Track(c *cid.Cid) error {
return mpt.pin(c)
func (mpt *MapPinTracker) Track(c api.CidArg) error {
if mpt.isRemote(c) {
if mpt.get(c.Cid).Status == api.TrackerStatusPinned {
mpt.unpin(c)
}
mpt.set(c.Cid, api.TrackerStatusRemote)
return nil
}
mpt.set(c.Cid, api.TrackerStatusPinning)
select {
case mpt.pinCh <- c:
default:
mpt.setError(c.Cid, errors.New("pin queue is full"))
return logError("map_pin_tracker pin queue is full")
}
return nil
}
// Untrack tells the MapPinTracker to stop managing a Cid.
// If the Cid is pinned locally, it will be unpinned.
func (mpt *MapPinTracker) Untrack(c *cid.Cid) error {
return mpt.unpin(c)
mpt.set(c, api.TrackerStatusUnpinning)
select {
case mpt.unpinCh <- api.CidArgCid(c):
default:
mpt.setError(c, errors.New("unpin queue is full"))
return logError("map_pin_tracker unpin queue is full")
}
return nil
}
// Status returns information for a Cid tracked by this
@ -238,7 +296,7 @@ func (mpt *MapPinTracker) Sync(c *cid.Cid) (api.PinInfo, error) {
err := mpt.rpcClient.Call("",
"Cluster",
"IPFSPinLsCid",
api.CidArg{c}.ToSerial(),
api.CidArgCid(c).ToSerial(),
&ips)
if err != nil {
mpt.setError(c, err)
@ -308,7 +366,7 @@ func (mpt *MapPinTracker) syncStatus(c *cid.Cid, ips api.IPFSPinStatus) api.PinI
case api.TrackerStatusUnpinned:
mpt.setError(c, errPinned)
case api.TrackerStatusUnpinError: // nothing, keep error as it was
default:
default: //remote
}
} else {
switch p.Status {
@ -323,7 +381,7 @@ func (mpt *MapPinTracker) syncStatus(c *cid.Cid, ips api.IPFSPinStatus) api.PinI
case api.TrackerStatusUnpinning, api.TrackerStatusUnpinError:
mpt.set(c, api.TrackerStatusUnpinned)
case api.TrackerStatusUnpinned: // nothing
default:
default: // remote
}
}
return mpt.get(c)
@ -331,7 +389,8 @@ func (mpt *MapPinTracker) syncStatus(c *cid.Cid, ips api.IPFSPinStatus) api.PinI
// Recover will re-track or re-untrack a Cid in error state,
// possibly retriggering an IPFS pinning operation and returning
// only when it is done.
// only when it is done. The pinning/unpinning operation happens
// synchronously, jumping the queues.
func (mpt *MapPinTracker) Recover(c *cid.Cid) (api.PinInfo, error) {
p := mpt.get(c)
if p.Status != api.TrackerStatusPinError &&
@ -342,9 +401,9 @@ func (mpt *MapPinTracker) Recover(c *cid.Cid) (api.PinInfo, error) {
var err error
switch p.Status {
case api.TrackerStatusPinError:
err = mpt.Track(c)
err = mpt.pin(api.CidArg{Cid: c})
case api.TrackerStatusUnpinError:
err = mpt.Untrack(c)
err = mpt.unpin(api.CidArg{Cid: c})
}
if err != nil {
logger.Errorf("error recovering %s: %s", c, err)

View File

@ -229,7 +229,7 @@ func TestClustersPeerJoin(t *testing.T) {
t.Error("all peers should be connected")
}
pins := c.Pins()
if len(pins) != 1 || !pins[0].Equals(hash) {
if len(pins) != 1 || !pins[0].Cid.Equals(hash) {
t.Error("all peers should have pinned the cid")
}
}
@ -262,7 +262,7 @@ func TestClustersPeerJoinAllAtOnce(t *testing.T) {
t.Error("all peers should be connected")
}
pins := c.Pins()
if len(pins) != 1 || !pins[0].Equals(hash) {
if len(pins) != 1 || !pins[0].Cid.Equals(hash) {
t.Error("all peers should have pinned the cid")
}
}
@ -304,7 +304,7 @@ func TestClustersPeerJoinAllAtOnceWithRandomBootstrap(t *testing.T) {
t.Error("all peers should be connected")
}
pins := c.Pins()
if len(pins) != 1 || !pins[0].Equals(hash) {
if len(pins) != 1 || !pins[0].Cid.Equals(hash) {
t.Error("all peers should have pinned the cid")
}
}

220
peer_monitor.go Normal file
View File

@ -0,0 +1,220 @@
package ipfscluster
import (
"context"
"errors"
"sync"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
peer "github.com/libp2p/go-libp2p-peer"
"github.com/ipfs/ipfs-cluster/api"
)
// AlertChannelCap specifies how much buffer the alerts channel has.
var AlertChannelCap = 256
// peerMetrics is just a circular queue
type peerMetrics struct {
last int
window []api.Metric
// mux sync.RWMutex
}
func newPeerMetrics(windowCap int) *peerMetrics {
w := make([]api.Metric, 0, windowCap)
return &peerMetrics{0, w}
}
func (pmets *peerMetrics) add(m api.Metric) {
// pmets.mux.Lock()
// defer pmets.mux.Unlock()
if len(pmets.window) < cap(pmets.window) {
pmets.window = append(pmets.window, m)
pmets.last = len(pmets.window) - 1
return
}
// len == cap
pmets.last = (pmets.last + 1) % cap(pmets.window)
pmets.window[pmets.last] = m
return
}
func (pmets *peerMetrics) latest() (api.Metric, error) {
// pmets.mux.RLock()
// defer pmets.mux.RUnlock()
if len(pmets.window) == 0 {
return api.Metric{}, errors.New("no metrics")
}
return pmets.window[pmets.last], nil
}
// ordered from newest to oldest
func (pmets *peerMetrics) all() []api.Metric {
// pmets.mux.RLock()
// pmets.mux.RUnlock()
wlen := len(pmets.window)
res := make([]api.Metric, 0, wlen)
if wlen == 0 {
return res
}
for i := pmets.last; i >= 0; i-- {
res = append(res, pmets.window[i])
}
for i := wlen; i > pmets.last; i-- {
res = append(res, pmets.window[i])
}
return res
}
type metricsByPeer map[peer.ID]*peerMetrics
// StdPeerMonitor is a component in charge of monitoring peers, logging
// metrics and detecting failures
type StdPeerMonitor struct {
ctx context.Context
cancel func()
rpcClient *rpc.Client
rpcReady chan struct{}
metrics map[string]metricsByPeer
metricsMux sync.RWMutex
windowCap int
alerts chan api.Alert
shutdownLock sync.Mutex
shutdown bool
wg sync.WaitGroup
}
// NewStdPeerMonitor creates a new monitor.
func NewStdPeerMonitor(windowCap int) *StdPeerMonitor {
if windowCap <= 0 {
panic("windowCap too small")
}
ctx, cancel := context.WithCancel(context.Background())
mon := &StdPeerMonitor{
ctx: ctx,
cancel: cancel,
rpcReady: make(chan struct{}, 1),
metrics: make(map[string]metricsByPeer),
windowCap: windowCap,
alerts: make(chan api.Alert),
}
go mon.run()
return mon
}
func (mon *StdPeerMonitor) run() {
select {
case <-mon.rpcReady:
//go mon.Heartbeat()
case <-mon.ctx.Done():
}
}
// SetClient saves the given rpc.Client for later use
func (mon *StdPeerMonitor) SetClient(c *rpc.Client) {
mon.rpcClient = c
mon.rpcReady <- struct{}{}
}
// Shutdown stops the peer monitor. It particular, it will
// not deliver any alerts.
func (mon *StdPeerMonitor) Shutdown() error {
mon.shutdownLock.Lock()
defer mon.shutdownLock.Unlock()
if mon.shutdown {
logger.Warning("StdPeerMonitor already shut down")
return nil
}
logger.Info("stopping StdPeerMonitor")
close(mon.rpcReady)
mon.cancel()
mon.wg.Wait()
mon.shutdown = true
return nil
}
// LogMetric stores a metric so it can later be retrieved.
func (mon *StdPeerMonitor) LogMetric(m api.Metric) {
mon.metricsMux.Lock()
defer mon.metricsMux.Unlock()
name := m.Name
peer := m.Peer
mbyp, ok := mon.metrics[name]
if !ok {
mbyp = make(metricsByPeer)
mon.metrics[name] = mbyp
}
pmets, ok := mbyp[peer]
if !ok {
pmets = newPeerMetrics(mon.windowCap)
mbyp[peer] = pmets
}
logger.Debugf("logged '%s' metric from '%s'", name, peer)
pmets.add(m)
}
// func (mon *StdPeerMonitor) getLastMetric(name string, p peer.ID) api.Metric {
// mon.metricsMux.RLock()
// defer mon.metricsMux.RUnlock()
// emptyMetric := api.Metric{
// Name: name,
// Peer: p,
// Valid: false,
// }
// mbyp, ok := mon.metrics[name]
// if !ok {
// return emptyMetric
// }
// pmets, ok := mbyp[p]
// if !ok {
// return emptyMetric
// }
// metric, err := pmets.latest()
// if err != nil {
// return emptyMetric
// }
// return metric
// }
// LastMetrics returns last known VALID metrics of a given type
func (mon *StdPeerMonitor) LastMetrics(name string) []api.Metric {
mon.metricsMux.RLock()
defer mon.metricsMux.RUnlock()
mbyp, ok := mon.metrics[name]
if !ok {
return []api.Metric{}
}
metrics := make([]api.Metric, 0, len(mbyp))
for _, peerMetrics := range mbyp {
last, err := peerMetrics.latest()
if err != nil || last.Discard() {
continue
}
metrics = append(metrics, last)
}
return metrics
}
// Alerts() returns a channel on which alerts are sent when the
// monitor detects a failure.
func (mon *StdPeerMonitor) Alerts() <-chan api.Alert {
return mon.alerts
}

100
peer_monitor_test.go Normal file
View File

@ -0,0 +1,100 @@
package ipfscluster
import (
"fmt"
"testing"
peer "github.com/libp2p/go-libp2p-peer"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/test"
)
var metricCounter = 0
func testPeerMonitor(t *testing.T) *StdPeerMonitor {
mock := test.NewMockRPCClient(t)
mon := NewStdPeerMonitor(2)
mon.SetClient(mock)
return mon
}
func newMetric(n string, p peer.ID) api.Metric {
m := api.Metric{
Name: n,
Peer: p,
Value: fmt.Sprintf("%d", metricCounter),
Valid: true,
}
m.SetTTL(5)
metricCounter++
return m
}
func TestPeerMonitorShutdown(t *testing.T) {
pm := testPeerMonitor(t)
err := pm.Shutdown()
if err != nil {
t.Error(err)
}
err = pm.Shutdown()
if err != nil {
t.Error(err)
}
}
func TestPeerMonitorLogMetric(t *testing.T) {
pm := testPeerMonitor(t)
defer pm.Shutdown()
metricCounter = 0
// dont fill window
pm.LogMetric(newMetric("test", test.TestPeerID1))
pm.LogMetric(newMetric("test", test.TestPeerID2))
pm.LogMetric(newMetric("test", test.TestPeerID3))
// fill window
pm.LogMetric(newMetric("test2", test.TestPeerID3))
pm.LogMetric(newMetric("test2", test.TestPeerID3))
pm.LogMetric(newMetric("test2", test.TestPeerID3))
pm.LogMetric(newMetric("test2", test.TestPeerID3))
lastMetrics := pm.LastMetrics("testbad")
if len(lastMetrics) != 0 {
t.Logf("%+v", lastMetrics)
t.Error("metrics should be empty")
}
lastMetrics = pm.LastMetrics("test")
if len(lastMetrics) != 3 {
t.Error("metrics should correspond to 3 hosts")
}
for _, v := range lastMetrics {
switch v.Peer {
case test.TestPeerID1:
if v.Value != "0" {
t.Error("bad metric value")
}
case test.TestPeerID2:
if v.Value != "1" {
t.Error("bad metric value")
}
case test.TestPeerID3:
if v.Value != "2" {
t.Error("bad metric value")
}
default:
t.Error("bad peer")
}
}
lastMetrics = pm.LastMetrics("test2")
if len(lastMetrics) != 1 {
t.Fatal("should only be one metric")
}
if lastMetrics[0].Value != fmt.Sprintf("%d", metricCounter-1) {
t.Error("metric is not last")
}
}

View File

@ -353,7 +353,7 @@ func (rest *RESTAPI) unpinHandler(w http.ResponseWriter, r *http.Request) {
}
func (rest *RESTAPI) pinListHandler(w http.ResponseWriter, r *http.Request) {
var pins []string
var pins []api.CidArgSerial
err := rest.rpcClient.Call("",
"Cluster",
"PinList",
@ -424,9 +424,9 @@ func parseCidOrError(w http.ResponseWriter, r *http.Request) api.CidArgSerial {
_, err := cid.Decode(hash)
if err != nil {
sendErrorResponse(w, 400, "error decoding Cid: "+err.Error())
return api.CidArgSerial{""}
return api.CidArgSerial{Cid: ""}
}
return api.CidArgSerial{hash}
return api.CidArgSerial{Cid: hash}
}
func parsePidOrError(w http.ResponseWriter, r *http.Request) peer.ID {

View File

@ -190,11 +190,11 @@ func TestRESTAPIPinListEndpoint(t *testing.T) {
rest := testRESTAPI(t)
defer rest.Shutdown()
var resp []string
var resp []api.CidArgSerial
makeGet(t, "/pinlist", &resp)
if len(resp) != 3 ||
resp[0] != test.TestCid1 || resp[1] != test.TestCid2 ||
resp[2] != test.TestCid3 {
resp[0].Cid != test.TestCid1 || resp[1].Cid != test.TestCid2 ||
resp[2].Cid != test.TestCid3 {
t.Error("unexpected pin list: ", resp)
}
}

View File

@ -43,13 +43,13 @@ func (rpcapi *RPCAPI) Unpin(in api.CidArgSerial, out *struct{}) error {
}
// PinList runs Cluster.Pins().
func (rpcapi *RPCAPI) PinList(in struct{}, out *[]string) error {
func (rpcapi *RPCAPI) PinList(in struct{}, out *[]api.CidArgSerial) error {
cidList := rpcapi.c.Pins()
cidStrList := make([]string, 0, len(cidList))
cidSerialList := make([]api.CidArgSerial, 0, len(cidList))
for _, c := range cidList {
cidStrList = append(cidStrList, c.String())
cidSerialList = append(cidSerialList, c.ToSerial())
}
*out = cidStrList
*out = cidSerialList
return nil
}
@ -156,8 +156,7 @@ func (rpcapi *RPCAPI) Recover(in api.CidArgSerial, out *api.GlobalPinInfoSerial)
// Track runs PinTracker.Track().
func (rpcapi *RPCAPI) Track(in api.CidArgSerial, out *struct{}) error {
c := in.ToCidArg().Cid
return rpcapi.c.tracker.Track(c)
return rpcapi.c.tracker.Track(in.ToCidArg())
}
// Untrack runs PinTracker.Untrack().
@ -225,13 +224,13 @@ func (rpcapi *RPCAPI) IPFSPinLs(in string, out *map[string]api.IPFSPinStatus) er
// ConsensusLogPin runs Consensus.LogPin().
func (rpcapi *RPCAPI) ConsensusLogPin(in api.CidArgSerial, out *struct{}) error {
c := in.ToCidArg().Cid
c := in.ToCidArg()
return rpcapi.c.consensus.LogPin(c)
}
// ConsensusLogUnpin runs Consensus.LogUnpin().
func (rpcapi *RPCAPI) ConsensusLogUnpin(in api.CidArgSerial, out *struct{}) error {
c := in.ToCidArg().Cid
c := in.ToCidArg()
return rpcapi.c.consensus.LogUnpin(c)
}
@ -274,6 +273,28 @@ func (rpcapi *RPCAPI) PeerManagerRmPeer(in peer.ID, out *struct{}) error {
return rpcapi.c.peerManager.rmPeer(in, false)
}
// PeerManagerPeers runs peerManager.peers().
func (rpcapi *RPCAPI) PeerManagerPeers(in struct{}, out *[]peer.ID) error {
*out = rpcapi.c.peerManager.peers()
return nil
}
/*
PeerMonitor
*/
// PeerMonitorLogMetric runs PeerMonitor.LogMetric().
func (rpcapi *RPCAPI) PeerMonitorLogMetric(in api.Metric, out *struct{}) error {
rpcapi.c.monitor.LogMetric(in)
return nil
}
// PeerMonitorLastMetrics runs PeerMonitor.LastMetrics().
func (rpcapi *RPCAPI) PeerMonitorLastMetrics(in string, out *[]api.Metric) error {
*out = rpcapi.c.monitor.LastMetrics(in)
return nil
}
/*
Other
*/

View File

@ -3,56 +3,69 @@ package mapstate
import (
"sync"
"github.com/ipfs/ipfs-cluster/api"
cid "github.com/ipfs/go-cid"
)
const Version = 1
// MapState is a very simple database to store the state of the system
// using a Go map. It is thread safe. It implements the State interface.
type MapState struct {
pinMux sync.RWMutex
PinMap map[string]struct{}
pinMux sync.RWMutex
PinMap map[string]api.CidArgSerial
Version int
}
// NewMapState initializes the internal map and returns a new MapState object.
func NewMapState() *MapState {
return &MapState{
PinMap: make(map[string]struct{}),
PinMap: make(map[string]api.CidArgSerial),
}
}
// AddPin adds a Cid to the internal map.
func (st *MapState) AddPin(c *cid.Cid) error {
// Add adds a CidArg to the internal map.
func (st *MapState) Add(c api.CidArg) error {
st.pinMux.Lock()
defer st.pinMux.Unlock()
var a struct{}
st.PinMap[c.String()] = a
st.PinMap[c.Cid.String()] = c.ToSerial()
return nil
}
// RmPin removes a Cid from the internal map.
func (st *MapState) RmPin(c *cid.Cid) error {
// Rm removes a Cid from the internal map.
func (st *MapState) Rm(c *cid.Cid) error {
st.pinMux.Lock()
defer st.pinMux.Unlock()
delete(st.PinMap, c.String())
return nil
}
// HasPin returns true if the Cid belongs to the State.
func (st *MapState) HasPin(c *cid.Cid) bool {
func (st *MapState) Get(c *cid.Cid) api.CidArg {
st.pinMux.RLock()
defer st.pinMux.RUnlock()
cargs, ok := st.PinMap[c.String()]
if !ok { // make sure no panics
return api.CidArg{}
}
return cargs.ToCidArg()
}
// Has returns true if the Cid belongs to the State.
func (st *MapState) Has(c *cid.Cid) bool {
st.pinMux.RLock()
defer st.pinMux.RUnlock()
_, ok := st.PinMap[c.String()]
return ok
}
// ListPins provides a list of Cids in the State.
func (st *MapState) ListPins() []*cid.Cid {
// List provides the list of tracked CidArgs.
func (st *MapState) List() []api.CidArg {
st.pinMux.RLock()
defer st.pinMux.RUnlock()
cids := make([]*cid.Cid, 0, len(st.PinMap))
for k := range st.PinMap {
c, _ := cid.Decode(k)
cids = append(cids, c)
cids := make([]api.CidArg, 0, len(st.PinMap))
for _, v := range st.PinMap {
cids = append(cids, v.ToCidArg())
}
return cids
}

View File

@ -0,0 +1,68 @@
package mapstate
import (
"testing"
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-peer"
"github.com/ipfs/ipfs-cluster/api"
)
var testCid1, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmq")
var testPeerID1, _ = peer.IDB58Decode("QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLewuabc")
var c = api.CidArg{
Cid: testCid1,
Allocations: []peer.ID{testPeerID1},
Everywhere: false,
}
func TestAdd(t *testing.T) {
ms := NewMapState()
ms.Add(c)
if !ms.Has(c.Cid) {
t.Error("should have added it")
}
}
func TestRm(t *testing.T) {
ms := NewMapState()
ms.Add(c)
ms.Rm(c.Cid)
if ms.Has(c.Cid) {
t.Error("should have removed it")
}
}
func TestGet(t *testing.T) {
defer func() {
if r := recover(); r != nil {
t.Fatal("paniced")
}
}()
ms := NewMapState()
ms.Add(c)
get := ms.Get(c.Cid)
if get.Cid.String() != c.Cid.String() ||
get.Allocations[0] != c.Allocations[0] ||
get.Everywhere != c.Everywhere {
t.Error("returned something different")
}
}
func TestList(t *testing.T) {
defer func() {
if r := recover(); r != nil {
t.Fatal("paniced")
}
}()
ms := NewMapState()
ms.Add(c)
list := ms.List()
if list[0].Cid.String() != c.Cid.String() ||
list[0].Allocations[0] != c.Allocations[0] ||
list[0].Everywhere != c.Everywhere {
t.Error("returned something different")
}
}

View File

@ -9,6 +9,7 @@ import (
"strconv"
"strings"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/state/mapstate"
cid "github.com/ipfs/go-cid"
@ -92,7 +93,7 @@ func (m *IpfsMock) handler(w http.ResponseWriter, r *http.Request) {
if err != nil {
goto ERROR
}
m.pinMap.AddPin(c)
m.pinMap.Add(api.CidArgCid(c))
resp := mockPinResp{
Pins: []string{cidStr},
}
@ -109,7 +110,7 @@ func (m *IpfsMock) handler(w http.ResponseWriter, r *http.Request) {
if err != nil {
goto ERROR
}
m.pinMap.RmPin(c)
m.pinMap.Rm(c)
resp := mockPinResp{
Pins: []string{cidStr},
}
@ -120,9 +121,9 @@ func (m *IpfsMock) handler(w http.ResponseWriter, r *http.Request) {
arg, ok := query["arg"]
if !ok {
rMap := make(map[string]mockPinType)
pins := m.pinMap.ListPins()
pins := m.pinMap.List()
for _, p := range pins {
rMap[p.String()] = mockPinType{"recursive"}
rMap[p.Cid.String()] = mockPinType{"recursive"}
}
j, _ := json.Marshal(mockPinLsResp{rMap})
w.Write(j)
@ -137,7 +138,7 @@ func (m *IpfsMock) handler(w http.ResponseWriter, r *http.Request) {
if err != nil {
goto ERROR
}
ok = m.pinMap.HasPin(c)
ok = m.pinMap.Has(c)
if ok {
rMap := make(map[string]mockPinType)
rMap[cidStr] = mockPinType{"recursive"}

View File

@ -42,8 +42,18 @@ func (mock *mockService) Unpin(in api.CidArgSerial, out *struct{}) error {
return nil
}
func (mock *mockService) PinList(in struct{}, out *[]string) error {
*out = []string{TestCid1, TestCid2, TestCid3}
func (mock *mockService) PinList(in struct{}, out *[]api.CidArgSerial) error {
*out = []api.CidArgSerial{
{
Cid: TestCid1,
},
{
Cid: TestCid2,
},
{
Cid: TestCid3,
},
}
return nil
}
@ -180,3 +190,8 @@ func (mock *mockService) Track(in api.CidArgSerial, out *struct{}) error {
func (mock *mockService) Untrack(in api.CidArgSerial, out *struct{}) error {
return nil
}
func (mock *mockService) PeerManagerPeers(in struct{}, out *[]peer.ID) error {
*out = []peer.ID{TestPeerID1, TestPeerID2, TestPeerID3}
return nil
}

62
test/test_test.go Normal file
View File

@ -0,0 +1,62 @@
package test
import (
"reflect"
"testing"
ipfscluster "github.com/ipfs/ipfs-cluster"
)
func TestIpfsMock(t *testing.T) {
ipfsmock := NewIpfsMock()
defer ipfsmock.Close()
}
// Test that our RPC mock resembles the original
func TestRPCMockValid(t *testing.T) {
mock := &mockService{}
real := &ipfscluster.RPCAPI{}
mockT := reflect.TypeOf(mock)
realT := reflect.TypeOf(real)
// Make sure all the methods we have match the original
for i := 0; i < mockT.NumMethod(); i++ {
method := mockT.Method(i)
name := method.Name
origMethod, ok := realT.MethodByName(name)
if !ok {
t.Fatalf("%s method not found in real RPC", name)
}
mType := method.Type
oType := origMethod.Type
if nout := mType.NumOut(); nout != 1 || nout != oType.NumOut() {
t.Errorf("%s: more than 1 out parameter", name)
}
if mType.Out(0).Name() != "error" {
t.Errorf("%s out param should be an error", name)
}
if nin := mType.NumIn(); nin != oType.NumIn() || nin != 3 {
t.Errorf("%s: num in parameter mismatch: %d vs. %d", name, nin, oType.NumIn())
}
for j := 1; j < 3; j++ {
mn := mType.In(j).String()
on := oType.In(j).String()
if mn != on {
t.Errorf("%s: name mismatch: %s vs %s", name, mn, on)
}
}
}
for i := 0; i < realT.NumMethod(); i++ {
name := realT.Method(i).Name
_, ok := mockT.MethodByName(name)
if !ok {
t.Logf("Warning: %s: unimplemented in mock rpc", name)
}
}
}

View File

@ -1,6 +1,7 @@
package ipfscluster
import (
"errors"
"fmt"
"github.com/ipfs/ipfs-cluster/api"
@ -137,3 +138,9 @@ func globalPinInfoSliceToSerial(gpi []api.GlobalPinInfo) []api.GlobalPinInfoSeri
}
return gpis
}
func logError(fmtstr string, args ...interface{}) error {
msg := fmt.Sprintf(fmtstr, args...)
logger.Error(msg)
return errors.New(msg)
}