ipfs-cluster/cluster.go
Hector Sanjuan 073c43e291 Issue #131: Make sure peers are moved to bootstrap when leaving
Also, do not shutdown when seeing our own departure during bootstrap.

License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
2017-11-01 13:58:57 +01:00

1246 lines
31 KiB
Go

package ipfscluster
import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"time"
pnet "github.com/libp2p/go-libp2p-pnet"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/consensus/raft"
"github.com/ipfs/ipfs-cluster/state"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
cid "github.com/ipfs/go-cid"
host "github.com/libp2p/go-libp2p-host"
ipnet "github.com/libp2p/go-libp2p-interface-pnet"
peer "github.com/libp2p/go-libp2p-peer"
peerstore "github.com/libp2p/go-libp2p-peerstore"
swarm "github.com/libp2p/go-libp2p-swarm"
basichost "github.com/libp2p/go-libp2p/p2p/host/basic"
ma "github.com/multiformats/go-multiaddr"
)
// Cluster is the main IPFS cluster component. It provides
// the go-API for it and orchestrates the components that make up the system.
type Cluster struct {
ctx context.Context
cancel func()
id peer.ID
config *Config
host host.Host
rpcServer *rpc.Server
rpcClient *rpc.Client
peerManager *peerManager
consensus Consensus
api API
ipfs IPFSConnector
state state.State
tracker PinTracker
monitor PeerMonitor
allocator PinAllocator
informer Informer
shutdownLock sync.Mutex
shutdownB bool
removed bool
doneCh chan struct{}
readyCh chan struct{}
readyB bool
wg sync.WaitGroup
paMux sync.Mutex
}
// NewCluster builds a new IPFS Cluster peer. It initializes a LibP2P host,
// creates and RPC Server and client and sets up all components.
//
// The new cluster peer may still be performing initialization tasks when
// this call returns (consensus may still be bootstrapping). Use Cluster.Ready()
// if you need to wait until the peer is fully up.
func NewCluster(
cfg *Config,
consensusCfg *raft.Config,
api API,
ipfs IPFSConnector,
st state.State,
tracker PinTracker,
monitor PeerMonitor,
allocator PinAllocator,
informer Informer) (*Cluster, error) {
err := cfg.Validate()
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
host, err := makeHost(ctx, cfg)
if err != nil {
cancel()
return nil, err
}
logger.Infof("IPFS Cluster v%s listening on:", Version)
for _, addr := range host.Addrs() {
logger.Infof(" %s/ipfs/%s", addr, host.ID().Pretty())
}
c := &Cluster{
ctx: ctx,
cancel: cancel,
id: host.ID(),
config: cfg,
host: host,
api: api,
ipfs: ipfs,
state: st,
tracker: tracker,
monitor: monitor,
allocator: allocator,
informer: informer,
shutdownB: false,
removed: false,
doneCh: make(chan struct{}),
readyCh: make(chan struct{}),
readyB: false,
}
c.setupPeerManager()
err = c.setupRPC()
if err != nil {
c.Shutdown()
return nil, err
}
err = c.setupConsensus(consensusCfg)
if err != nil {
c.Shutdown()
return nil, err
}
c.setupRPCClients()
ok := c.bootstrap()
if !ok {
logger.Error("Bootstrap unsuccessful")
c.Shutdown()
return nil, errors.New("bootstrap unsuccessful")
}
go func() {
c.ready()
c.run()
}()
return c, nil
}
func (c *Cluster) setupPeerManager() {
pm := newPeerManager(c)
c.peerManager = pm
if len(c.config.Peers) > 0 {
c.peerManager.addFromMultiaddrs(c.config.Peers, false)
} else {
c.peerManager.addFromMultiaddrs(c.config.Bootstrap, false)
}
}
func (c *Cluster) setupRPC() error {
rpcServer := rpc.NewServer(c.host, RPCProtocol)
err := rpcServer.RegisterName("Cluster", &RPCAPI{c})
if err != nil {
return err
}
c.rpcServer = rpcServer
rpcClient := rpc.NewClientWithServer(c.host, RPCProtocol, rpcServer)
c.rpcClient = rpcClient
return nil
}
func (c *Cluster) setupConsensus(consensuscfg *raft.Config) error {
var startPeers []peer.ID
if len(c.config.Peers) > 0 {
startPeers = peersFromMultiaddrs(c.config.Peers)
} else {
// start as single cluster before being added
// to the bootstrapper peers' cluster.
startPeers = []peer.ID{}
}
consensus, err := raft.NewConsensus(
append(startPeers, c.id),
c.host,
consensuscfg,
c.state)
if err != nil {
logger.Errorf("error creating consensus: %s", err)
return err
}
c.consensus = consensus
return nil
}
func (c *Cluster) setupRPCClients() {
c.tracker.SetClient(c.rpcClient)
c.ipfs.SetClient(c.rpcClient)
c.api.SetClient(c.rpcClient)
c.consensus.SetClient(c.rpcClient)
c.monitor.SetClient(c.rpcClient)
c.allocator.SetClient(c.rpcClient)
c.informer.SetClient(c.rpcClient)
}
// syncWatcher loops and triggers StateSync and SyncAllLocal from time to time
func (c *Cluster) syncWatcher() {
stateSyncTicker := time.NewTicker(c.config.StateSyncInterval)
syncTicker := time.NewTicker(c.config.IPFSSyncInterval)
for {
select {
case <-stateSyncTicker.C:
logger.Debug("auto-triggering StateSync()")
c.StateSync()
case <-syncTicker.C:
logger.Debug("auto-triggering SyncAllLocal()")
c.SyncAllLocal()
case <-c.ctx.Done():
stateSyncTicker.Stop()
return
}
}
}
func (c *Cluster) broadcastMetric(m api.Metric) error {
peers := c.peerManager.peers()
leader, err := c.consensus.Leader()
if err != nil {
return err
}
// If a peer is down, the rpc call will get locked. Therefore,
// we need to do it async. This way we keep broadcasting
// even if someone is down. Eventually those requests will
// timeout in libp2p and the errors logged.
go func() {
if leader == c.id {
// Leader needs to broadcast its metric to everyone
// in case it goes down (new leader will have to detect this node went down)
logger.Debugf("Leader %s about to broadcast metric %s to %s. Expires: %s", c.id, m.Name, peers, m.Expire)
errs := c.multiRPC(peers,
"Cluster",
"PeerMonitorLogMetric",
m,
copyEmptyStructToIfaces(make([]struct{}, len(peers), len(peers))))
for i, e := range errs {
if e != nil {
logger.Errorf("error pushing metric to %s: %s", peers[i].Pretty(), e)
}
}
logger.Debugf("Leader %s broadcasted metric %s to %s. Expires: %s", c.id, m.Name, peers, m.Expire)
} else {
// non-leaders just need to forward their metrics to the leader
logger.Debugf("Peer %s about to send metric %s to %s. Expires: %s", c.id, m.Name, leader, m.Expire)
err := c.rpcClient.Call(leader,
"Cluster", "PeerMonitorLogMetric",
m, &struct{}{})
if err != nil {
logger.Error(err)
}
logger.Debugf("Peer %s sent metric %s to %s. Expires: %s", c.id, m.Name, leader, m.Expire)
}
}()
return nil
}
// push metrics loops and pushes metrics to the leader's monitor
func (c *Cluster) pushInformerMetrics() {
timer := time.NewTimer(0) // fire immediately first
// The following control how often to make and log
// a retry
retries := 0
retryDelay := 500 * time.Millisecond
retryWarnMod := 60
for {
select {
case <-c.ctx.Done():
return
case <-timer.C:
// wait
}
metric := c.informer.GetMetric()
metric.Peer = c.id
err := c.broadcastMetric(metric)
if err != nil {
if (retries % retryWarnMod) == 0 {
logger.Errorf("error broadcasting metric: %s", err)
retries++
}
// retry in retryDelay
timer.Reset(retryDelay)
continue
}
retries = 0
// send metric again in TTL/2
timer.Reset(metric.GetTTL() / 2)
}
}
func (c *Cluster) pushPingMetrics() {
ticker := time.NewTicker(c.config.MonitorPingInterval)
for {
metric := api.Metric{
Name: "ping",
Peer: c.id,
Valid: true,
}
metric.SetTTLDuration(c.config.MonitorPingInterval * 2)
c.broadcastMetric(metric)
select {
case <-c.ctx.Done():
return
case <-ticker.C:
}
}
logger.Debugf("Peer %s. Finished pushPingMetrics", c.id)
}
// read the alerts channel from the monitor and triggers repins
func (c *Cluster) alertsHandler() {
for {
select {
case <-c.ctx.Done():
return
case alrt := <-c.monitor.Alerts():
// only the leader handles alerts
leader, err := c.consensus.Leader()
if err == nil && leader == c.id {
logger.Warningf("Peer %s received alert for %s in %s", c.id, alrt.MetricName, alrt.Peer.Pretty())
switch alrt.MetricName {
case "ping":
c.repinFromPeer(alrt.Peer)
}
}
}
}
}
// find all Cids pinned to a given peer and triggers re-pins on them.
func (c *Cluster) repinFromPeer(p peer.ID) {
cState, err := c.consensus.State()
if err != nil {
logger.Warning(err)
return
}
list := cState.List()
for _, pin := range list {
if containsPeer(pin.Allocations, p) {
logger.Infof("repinning %s out of %s", pin.Cid, p.Pretty())
c.pin(pin, []peer.ID{p}) // pin blacklisting this peer
}
}
}
// run provides a cancellable context and launches some goroutines
// before signaling readyCh
func (c *Cluster) run() {
go c.syncWatcher()
go c.pushPingMetrics()
go c.pushInformerMetrics()
go c.alertsHandler()
}
func (c *Cluster) ready() {
// We bootstrapped first because with dirty state consensus
// may have a peerset and not find a leader so we cannot wait
// for it.
timer := time.NewTimer(30 * time.Second)
select {
case <-timer.C:
logger.Error("consensus start timed out")
c.Shutdown()
return
case <-c.consensus.Ready():
case <-c.ctx.Done():
return
}
// Cluster is ready.
logger.Info("Cluster Peers (not including ourselves):")
peers := c.peerManager.peersAddrs()
if len(peers) == 0 {
logger.Info(" - No other peers")
}
for _, a := range c.peerManager.peersAddrs() {
logger.Infof(" - %s", a)
}
close(c.readyCh)
c.readyB = true
logger.Info("IPFS Cluster is ready")
}
func (c *Cluster) bootstrap() bool {
// Cases in which we do not bootstrap
if len(c.config.Bootstrap) == 0 || len(c.config.Peers) > 0 {
return true
}
for _, b := range c.config.Bootstrap {
logger.Infof("Bootstrapping to %s", b)
err := c.Join(b)
if err == nil {
return true
}
logger.Error(err)
}
return false
}
// Ready returns a channel which signals when this peer is
// fully initialized (including consensus).
func (c *Cluster) Ready() <-chan struct{} {
return c.readyCh
}
// Shutdown stops the IPFS cluster components
func (c *Cluster) Shutdown() error {
c.shutdownLock.Lock()
defer c.shutdownLock.Unlock()
if c.shutdownB {
logger.Debug("Cluster is already shutdown")
return nil
}
logger.Info("shutting down Cluster")
// Only attempt to leave if:
// - consensus is initialized
// - cluster was ready (no bootstrapping error)
// - We are not removed already (means PeerRemove() was called on us)
if c.consensus != nil && c.config.LeaveOnShutdown && c.readyB && !c.removed {
c.removed = true
// best effort
logger.Warning("attempting to leave the cluster. This may take some seconds")
err := c.consensus.LogRmPeer(c.id)
if err != nil {
logger.Error("leaving cluster: " + err.Error())
}
// save peers as bootstrappers
c.config.Bootstrap = c.peerManager.peersAddrs()
c.peerManager.resetPeers()
c.peerManager.savePeers()
}
// Cancel contexts
c.cancel()
if con := c.consensus; con != nil {
if err := con.Shutdown(); err != nil {
logger.Errorf("error stopping consensus: %s", err)
return err
}
}
// Do not save anything if we were not ready
if c.readyB {
// peers are saved usually on addPeer/rmPeer
// c.peerManager.savePeers()
c.backupState()
}
// We left the cluster or were removed. Destroy the Raft state.
if c.removed && c.readyB {
err := c.consensus.Clean()
if err != nil {
logger.Error("cleaning consensus: ", err)
}
}
if err := c.monitor.Shutdown(); err != nil {
logger.Errorf("error stopping monitor: %s", err)
return err
}
if err := c.api.Shutdown(); err != nil {
logger.Errorf("error stopping API: %s", err)
return err
}
if err := c.ipfs.Shutdown(); err != nil {
logger.Errorf("error stopping IPFS Connector: %s", err)
return err
}
if err := c.tracker.Shutdown(); err != nil {
logger.Errorf("error stopping PinTracker: %s", err)
return err
}
c.wg.Wait()
c.host.Close() // Shutdown all network services
c.shutdownB = true
close(c.doneCh)
return nil
}
// Done provides a way to learn if the Peer has been shutdown
// (for example, because it has been removed from the Cluster)
func (c *Cluster) Done() <-chan struct{} {
return c.doneCh
}
// ID returns information about the Cluster peer
func (c *Cluster) ID() api.ID {
// ignore error since it is included in response object
ipfsID, _ := c.ipfs.ID()
var addrs []ma.Multiaddr
addrsSet := make(map[string]struct{}) // to filter dups
for _, addr := range c.host.Addrs() {
addrsSet[addr.String()] = struct{}{}
}
for k := range addrsSet {
addr, _ := ma.NewMultiaddr(k)
addrs = append(addrs, multiaddrJoin(addr, c.id))
}
return api.ID{
ID: c.id,
//PublicKey: c.host.Peerstore().PubKey(c.id),
Addresses: addrs,
ClusterPeers: c.peerManager.peersAddrs(),
Version: Version,
Commit: Commit,
RPCProtocolVersion: RPCProtocol,
IPFS: ipfsID,
}
}
// PeerAdd adds a new peer to this Cluster.
//
// The new peer must be reachable. It will be added to the
// consensus and will receive the shared state (including the
// list of peers). The new peer should be a single-peer cluster,
// preferable without any relevant state.
func (c *Cluster) PeerAdd(addr ma.Multiaddr) (api.ID, error) {
// starting 10 nodes on the same box for testing
// causes deadlock and a global lock here
// seems to help.
c.paMux.Lock()
defer c.paMux.Unlock()
logger.Debugf("peerAdd called with %s", addr)
pid, decapAddr, err := multiaddrSplit(addr)
if err != nil {
id := api.ID{
Error: err.Error(),
}
return id, err
}
// Figure out its real address if we have one
remoteAddr := getRemoteMultiaddr(c.host, pid, decapAddr)
err = c.peerManager.addPeer(remoteAddr, false)
if err != nil {
logger.Error(err)
id := api.ID{ID: pid, Error: err.Error()}
return id, err
}
// Figure out our address to that peer. This also
// ensures that it is reachable
var addrSerial api.MultiaddrSerial
err = c.rpcClient.Call(pid, "Cluster",
"RemoteMultiaddrForPeer", c.id, &addrSerial)
if err != nil {
logger.Error(err)
id := api.ID{ID: pid, Error: err.Error()}
c.peerManager.rmPeer(pid, false)
return id, err
}
// Log the new peer in the log so everyone gets it.
err = c.consensus.LogAddPeer(remoteAddr) // this will save
if err != nil {
logger.Error(err)
id := api.ID{ID: pid, Error: err.Error()}
c.peerManager.rmPeer(pid, false)
return id, err
}
// Send cluster peers to the new peer.
clusterPeers := append(c.peerManager.peersAddrs(),
addrSerial.ToMultiaddr())
err = c.rpcClient.Call(pid,
"Cluster",
"PeerManagerAddFromMultiaddrs",
api.MultiaddrsToSerial(clusterPeers),
&struct{}{})
if err != nil {
logger.Error(err)
}
// Ask the new peer to connect its IPFS daemon to the rest
err = c.rpcClient.Call(pid,
"Cluster",
"IPFSConnectSwarms",
struct{}{},
&struct{}{})
if err != nil {
logger.Error(err)
}
id, err := c.getIDForPeer(pid)
return id, nil
}
// PeerRemove removes a peer from this Cluster.
//
// The peer will be removed from the consensus peer set,
// it will be shut down after this happens.
func (c *Cluster) PeerRemove(pid peer.ID) error {
if !c.peerManager.isPeer(pid) {
return fmt.Errorf("%s is not a peer", pid.Pretty())
}
// We need to repin before removing the peer, otherwise, it won't
// be able to submit the pins.
logger.Infof("re-allocating all CIDs directly associated to %s", pid)
c.repinFromPeer(pid)
err := c.consensus.LogRmPeer(pid)
if err != nil {
logger.Error(err)
return err
}
return nil
}
// Join adds this peer to an existing cluster. The calling peer should
// be a single-peer cluster node. This is almost equivalent to calling
// PeerAdd on the destination cluster.
func (c *Cluster) Join(addr ma.Multiaddr) error {
logger.Debugf("Join(%s)", addr)
//if len(c.peerManager.peers()) > 1 {
// logger.Error(c.peerManager.peers())
// return errors.New("only single-node clusters can be joined")
//}
pid, _, err := multiaddrSplit(addr)
if err != nil {
logger.Error(err)
return err
}
// Bootstrap to myself
if pid == c.id {
return nil
}
// Add peer to peerstore so we can talk to it
c.peerManager.addPeer(addr, false)
// Note that PeerAdd() on the remote peer will
// figure out what our real address is (obviously not
// ListenAddr).
var myID api.IDSerial
err = c.rpcClient.Call(pid,
"Cluster",
"PeerAdd",
api.MultiaddrToSerial(
multiaddrJoin(c.config.ListenAddr, c.id)),
&myID)
if err != nil {
logger.Error(err)
return err
}
// wait for leader and for state to catch up
// then sync
err = c.consensus.WaitForSync()
if err != nil {
logger.Error(err)
return err
}
c.StateSync()
logger.Infof("joined %s's cluster", addr)
return nil
}
// StateSync syncs the consensus state to the Pin Tracker, ensuring
// that every Cid that should be tracked is tracked. It returns
// PinInfo for Cids which were added or deleted.
func (c *Cluster) StateSync() ([]api.PinInfo, error) {
cState, err := c.consensus.State()
if err != nil {
return nil, err
}
logger.Debug("syncing state to tracker")
clusterPins := cState.List()
var changed []*cid.Cid
// Track items which are not tracked
for _, pin := range clusterPins {
if c.tracker.Status(pin.Cid).Status == api.TrackerStatusUnpinned {
changed = append(changed, pin.Cid)
go c.tracker.Track(pin)
}
}
// Untrack items which should not be tracked
for _, p := range c.tracker.StatusAll() {
if !cState.Has(p.Cid) {
changed = append(changed, p.Cid)
go c.tracker.Untrack(p.Cid)
}
}
var infos []api.PinInfo
for _, h := range changed {
infos = append(infos, c.tracker.Status(h))
}
return infos, nil
}
// StatusAll returns the GlobalPinInfo for all tracked Cids. If an error
// happens, the slice will contain as much information as could be fetched.
func (c *Cluster) StatusAll() ([]api.GlobalPinInfo, error) {
return c.globalPinInfoSlice("TrackerStatusAll")
}
// Status returns the GlobalPinInfo for a given Cid. If an error happens,
// the GlobalPinInfo should contain as much information as could be fetched.
func (c *Cluster) Status(h *cid.Cid) (api.GlobalPinInfo, error) {
return c.globalPinInfoCid("TrackerStatus", h)
}
// SyncAllLocal makes sure that the current state for all tracked items
// matches the state reported by the IPFS daemon.
//
// SyncAllLocal returns the list of PinInfo that where updated because of
// the operation, along with those in error states.
func (c *Cluster) SyncAllLocal() ([]api.PinInfo, error) {
syncedItems, err := c.tracker.SyncAll()
// Despite errors, tracker provides synced items that we can provide.
// They encapsulate the error.
if err != nil {
logger.Error("tracker.Sync() returned with error: ", err)
logger.Error("Is the ipfs daemon running?")
}
return syncedItems, err
}
// SyncLocal performs a local sync operation for the given Cid. This will
// tell the tracker to verify the status of the Cid against the IPFS daemon.
// It returns the updated PinInfo for the Cid.
func (c *Cluster) SyncLocal(h *cid.Cid) (api.PinInfo, error) {
var err error
pInfo, err := c.tracker.Sync(h)
// Despite errors, trackers provides an updated PinInfo so
// we just log it.
if err != nil {
logger.Error("tracker.SyncCid() returned with error: ", err)
logger.Error("Is the ipfs daemon running?")
}
return pInfo, err
}
// SyncAll triggers LocalSync() operations in all cluster peers.
func (c *Cluster) SyncAll() ([]api.GlobalPinInfo, error) {
return c.globalPinInfoSlice("SyncAllLocal")
}
// Sync triggers a LocalSyncCid() operation for a given Cid
// in all cluster peers.
func (c *Cluster) Sync(h *cid.Cid) (api.GlobalPinInfo, error) {
return c.globalPinInfoCid("SyncLocal", h)
}
// RecoverLocal triggers a recover operation for a given Cid
func (c *Cluster) RecoverLocal(h *cid.Cid) (api.PinInfo, error) {
return c.tracker.Recover(h)
}
// Recover triggers a recover operation for a given Cid in all
// cluster peers.
func (c *Cluster) Recover(h *cid.Cid) (api.GlobalPinInfo, error) {
return c.globalPinInfoCid("TrackerRecover", h)
}
// Pins returns the list of Cids managed by Cluster and which are part
// of the current global state. This is the source of truth as to which
// pins are managed and their allocation, but does not indicate if
// the item is successfully pinned. For that, use StatusAll().
func (c *Cluster) Pins() []api.Pin {
cState, err := c.consensus.State()
if err != nil {
logger.Error(err)
return []api.Pin{}
}
return cState.List()
}
// PinGet returns information for a single Cid managed by Cluster.
// The information is obtained from the current global state. The
// returned api.Pin provides information about the allocations
// assigned for the requested Cid, but does not provide indicate if
// the item is successfully pinned. For that, use Status(). PinGet
// returns an error if the given Cid is not part of the global state.
func (c *Cluster) PinGet(h *cid.Cid) (api.Pin, error) {
cState, err := c.consensus.State()
if err != nil {
logger.Error(err)
return api.Pin{}, err
}
pin := cState.Get(h)
if pin.Cid == nil {
return pin, errors.New("Cid is not part of the global state")
}
return pin, nil
}
// Pin makes the cluster Pin a Cid. This implies adding the Cid
// to the IPFS Cluster peers shared-state. Depending on the cluster
// pinning strategy, the PinTracker may then request the IPFS daemon
// to pin the Cid.
//
// Pin returns an error if the operation could not be persisted
// to the global state. Pin does not reflect the success or failure
// of underlying IPFS daemon pinning operations.
func (c *Cluster) Pin(pin api.Pin) error {
return c.pin(pin, []peer.ID{})
}
// pin performs the actual pinning and supports a blacklist to be
// able to evacuate a node.
func (c *Cluster) pin(pin api.Pin, blacklist []peer.ID) error {
rpl := pin.ReplicationFactor
if rpl == 0 {
rpl = c.config.ReplicationFactor
pin.ReplicationFactor = rpl
}
switch {
case rpl == 0:
return errors.New("replication factor is 0")
case rpl < 0:
pin.Allocations = []peer.ID{}
logger.Infof("IPFS cluster pinning %s everywhere:", pin.Cid)
case rpl > 0:
allocs, err := c.allocate(pin.Cid, pin.ReplicationFactor, blacklist)
if err != nil {
return err
}
pin.Allocations = allocs
logger.Infof("IPFS cluster pinning %s on %s:", pin.Cid, pin.Allocations)
}
err := c.consensus.LogPin(pin)
if err != nil {
return err
}
return nil
}
// Unpin makes the cluster Unpin a Cid. This implies adding the Cid
// to the IPFS Cluster peers shared-state.
//
// Unpin returns an error if the operation could not be persisted
// to the global state. Unpin does not reflect the success or failure
// of underlying IPFS daemon unpinning operations.
func (c *Cluster) Unpin(h *cid.Cid) error {
logger.Info("IPFS cluster unpinning:", h)
pin := api.Pin{
Cid: h,
}
err := c.consensus.LogUnpin(pin)
if err != nil {
return err
}
return nil
}
// Version returns the current IPFS Cluster version
func (c *Cluster) Version() string {
return Version
}
// Peers returns the IDs of the members of this Cluster
func (c *Cluster) Peers() []api.ID {
members := c.peerManager.peers()
peersSerial := make([]api.IDSerial, len(members), len(members))
peers := make([]api.ID, len(members), len(members))
errs := c.multiRPC(members, "Cluster", "ID", struct{}{},
copyIDSerialsToIfaces(peersSerial))
for i, err := range errs {
if err != nil {
peersSerial[i].ID = peer.IDB58Encode(members[i])
peersSerial[i].Error = err.Error()
}
}
for i, ps := range peersSerial {
peers[i] = ps.ToID()
}
return peers
}
// makeHost makes a libp2p-host
func makeHost(ctx context.Context, cfg *Config) (host.Host, error) {
ps := peerstore.NewPeerstore()
privateKey := cfg.PrivateKey
publicKey := privateKey.GetPublic()
var protec ipnet.Protector
if len(cfg.Secret) != 0 {
var err error
clusterKey, err := clusterSecretToKey(cfg.Secret)
if err != nil {
return nil, err
}
protec, err = pnet.NewProtector(strings.NewReader(clusterKey))
if err != nil {
return nil, err
}
// this is in go-ipfs, not sure whether we want something like it here
/* go func() {
t := time.NewTicker(30 * time.Second)
<-t.C // swallow one tick
for {
select {
case <-t.C:
if ph := cfg.Host; ph != nil {
if len(ph.Network().Peers()) == 0 {
log.Warning("We are in a private network and have no peers.")
log.Warning("This might be a configuration mistake.")
}
}
case <-n.Process().Closing:
t.Stop()
return
}
}
}()*/
}
if err := ps.AddPubKey(cfg.ID, publicKey); err != nil {
return nil, err
}
if err := ps.AddPrivKey(cfg.ID, privateKey); err != nil {
return nil, err
}
network, err := swarm.NewNetworkWithProtector(
ctx,
[]ma.Multiaddr{cfg.ListenAddr},
cfg.ID,
ps,
protec,
nil,
)
if err != nil {
return nil, err
}
bhost := basichost.New(network)
return bhost, nil
}
// Perform an RPC request to multiple destinations
func (c *Cluster) multiRPC(dests []peer.ID, svcName, svcMethod string, args interface{}, reply []interface{}) []error {
if len(dests) != len(reply) {
panic("must have matching dests and replies")
}
var wg sync.WaitGroup
errs := make([]error, len(dests), len(dests))
for i := range dests {
wg.Add(1)
go func(i int) {
defer wg.Done()
err := c.rpcClient.Call(
dests[i],
svcName,
svcMethod,
args,
reply[i])
errs[i] = err
}(i)
}
wg.Wait()
return errs
}
func (c *Cluster) globalPinInfoCid(method string, h *cid.Cid) (api.GlobalPinInfo, error) {
pin := api.GlobalPinInfo{
Cid: h,
PeerMap: make(map[peer.ID]api.PinInfo),
}
members := c.peerManager.peers()
replies := make([]api.PinInfoSerial, len(members), len(members))
arg := api.Pin{
Cid: h,
}
errs := c.multiRPC(members,
"Cluster",
method, arg.ToSerial(),
copyPinInfoSerialToIfaces(replies))
for i, rserial := range replies {
e := errs[i]
// Potentially rserial is empty. But ToPinInfo ignores all
// errors from underlying libraries. In that case .Status
// will be TrackerStatusBug (0)
r := rserial.ToPinInfo()
// No error. Parse and continue
if e == nil {
pin.PeerMap[members[i]] = r
continue
}
// Deal with error cases (err != nil): wrap errors in PinInfo
// In this case, we had no answer at all. The contacted peer
// must be offline or unreachable.
if r.Status == api.TrackerStatusBug {
logger.Errorf("%s: error in broadcast response from %s: %s ", c.id, members[i], e)
pin.PeerMap[members[i]] = api.PinInfo{
Cid: h,
Peer: members[i],
Status: api.TrackerStatusClusterError,
TS: time.Now(),
Error: e.Error(),
}
} else { // there was an rpc error, but got a valid response :S
r.Error = e.Error()
pin.PeerMap[members[i]] = r
// unlikely to come down this path
}
}
return pin, nil
}
func (c *Cluster) globalPinInfoSlice(method string) ([]api.GlobalPinInfo, error) {
var infos []api.GlobalPinInfo
fullMap := make(map[string]api.GlobalPinInfo)
members := c.peerManager.peers()
replies := make([][]api.PinInfoSerial, len(members), len(members))
errs := c.multiRPC(members,
"Cluster",
method, struct{}{},
copyPinInfoSerialSliceToIfaces(replies))
mergePins := func(pins []api.PinInfoSerial) {
for _, pserial := range pins {
p := pserial.ToPinInfo()
item, ok := fullMap[pserial.Cid]
if !ok {
fullMap[pserial.Cid] = api.GlobalPinInfo{
Cid: p.Cid,
PeerMap: map[peer.ID]api.PinInfo{
p.Peer: p,
},
}
} else {
item.PeerMap[p.Peer] = p
}
}
}
erroredPeers := make(map[peer.ID]string)
for i, r := range replies {
if e := errs[i]; e != nil { // This error must come from not being able to contact that cluster member
logger.Errorf("%s: error in broadcast response from %s: %s ", c.id, members[i], e)
erroredPeers[members[i]] = e.Error()
} else {
mergePins(r)
}
}
// Merge any errors
for p, msg := range erroredPeers {
for cidStr := range fullMap {
c, _ := cid.Decode(cidStr)
fullMap[cidStr].PeerMap[p] = api.PinInfo{
Cid: c,
Peer: p,
Status: api.TrackerStatusClusterError,
TS: time.Now(),
Error: msg,
}
}
}
for _, v := range fullMap {
infos = append(infos, v)
}
return infos, nil
}
func (c *Cluster) getIDForPeer(pid peer.ID) (api.ID, error) {
idSerial := api.ID{ID: pid}.ToSerial()
err := c.rpcClient.Call(
pid, "Cluster", "ID", struct{}{}, &idSerial)
id := idSerial.ToID()
if err != nil {
logger.Error(err)
id.Error = err.Error()
}
return id, err
}
// allocate finds peers to allocate a hash using the informer and the monitor
// it should only be used with a positive replication factor
func (c *Cluster) allocate(hash *cid.Cid, repl int, blacklist []peer.ID) ([]peer.ID, error) {
if repl <= 0 {
return nil, errors.New("cannot decide allocation for replication factor <= 0")
}
// Figure out who is currently holding this
var pinAllocations []peer.ID
st, err := c.consensus.State()
if err != nil {
// no state we assume it is empty. If there was other
// problem, we would fail to commit anyway.
pinAllocations = []peer.ID{}
} else {
pin := st.Get(hash)
pinAllocations = pin.Allocations
}
// Get the LastMetrics from the leading monitor. They are the last
// valid metrics from current cluster peers
var metrics []api.Metric
metricName := c.informer.Name()
l, err := c.consensus.Leader()
if err != nil {
return nil, errors.New("cannot determine leading Monitor")
}
err = c.rpcClient.Call(l,
"Cluster", "PeerMonitorLastMetrics",
metricName,
&metrics)
if err != nil {
return nil, err
}
// We must divide the metrics between current and candidates
current := make(map[peer.ID]api.Metric)
candidates := make(map[peer.ID]api.Metric)
validAllocations := make([]peer.ID, 0, len(pinAllocations))
for _, m := range metrics {
if m.Discard() || containsPeer(blacklist, m.Peer) {
// blacklisted peers do not exist for us
continue
} else if containsPeer(pinAllocations, m.Peer) {
current[m.Peer] = m
validAllocations = append(validAllocations, m.Peer)
} else {
candidates[m.Peer] = m
}
}
currentValid := len(validAllocations)
candidatesValid := len(candidates)
needed := repl - currentValid
logger.Debugf("allocate: Valid allocations: %d", currentValid)
logger.Debugf("allocate: Valid candidates: %d", candidatesValid)
logger.Debugf("allocate: Needed: %d", needed)
// If needed == 0, we don't need anything. If needed < 0, we are
// reducing the replication factor
switch {
case needed <= 0: // set the allocations to the needed ones
return validAllocations[0 : len(validAllocations)+needed], nil
case candidatesValid < needed:
err = logError(
"not enough candidates to allocate %s. Needed: %d. Got: %d (%s)",
hash, needed, candidatesValid, candidates)
return nil, err
default:
// this will return candidate peers in order of
// preference according to the allocator.
candidateAllocs, err := c.allocator.Allocate(hash, current, candidates)
if err != nil {
return nil, logError(err.Error())
}
logger.Debugf("allocate: candidate allocations: %s", candidateAllocs)
// we don't have enough peers to pin
if got := len(candidateAllocs); got < needed {
err = logError(
"cannot find enough allocations for %s. Needed: %d. Got: %d (%s)",
hash, needed, got, candidateAllocs)
return nil, err
}
// the new allocations = the valid ones we had + the needed ones
return append(validAllocations, candidateAllocs[0:needed]...), nil
}
}
func (c *Cluster) backupState() {
if c.config.BaseDir == "" {
logger.Warning("ClusterConfig BaseDir unset. Skipping backup")
return
}
folder := filepath.Join(c.config.BaseDir, "backups")
err := os.MkdirAll(folder, 0700)
if err != nil {
logger.Error(err)
logger.Error("skipping backup")
return
}
fname := time.Now().UTC().Format("20060102_15:04:05")
f, err := os.Create(filepath.Join(folder, fname))
if err != nil {
logger.Error(err)
return
}
defer f.Close()
err = c.state.Snapshot(f)
if err != nil {
logger.Error(err)
return
}
}