ipfs-cluster/cluster.go
Hector Sanjuan 11124ee224 Fix: repinning does not re-allocate as needed
Long story: Since #1768 there has been a recurring repinning test failure with
Raft consensus.

Per the test, if a pin is allocated to a peer that has been shutdown,
submitting the pin again should re-allocate it to a peer that is still
running.

Investigation on why this test fails and why it fails only in Raft lead to
realizing that this and other similar tests, were passing by chance. The
needed re-allocations were made not by the new submission of the pin, but by
the automatic-repinning feature. The actual resubmitted pin was carrying the
same allocations (one of them being the peer that was down), but it was
silently failing because the RedirectToLeader() code path was using
cc.ctx and hitting the peer that had been shutdown, which caused it to error.

Fixing the context propagation, meant that we would re-overwrite the pin with
the old allocations, thus the actual behaviour did not pass the test.

So, on one side, this fix an number of tests that had not disabled automatic
repinning and was probably getting in the way of things. On the other side,
this removes a condition that prevents re-allocation of pins if they exists
and options have not changed.

I don't fully understand why this was there though, since the Allocate() code
does return the old allocations anyways when they are enough, so it should not
re-allocate randomly. I suspect this was preventing some misbehaviour in the
Allocate() code from the time before it was improved with multiple allocators
etc.
2022-09-27 12:31:24 +02:00

2282 lines
60 KiB
Go

package ipfscluster
import (
"context"
"encoding/json"
"errors"
"fmt"
"mime/multipart"
"sync"
"time"
"github.com/ipfs-cluster/ipfs-cluster/adder"
"github.com/ipfs-cluster/ipfs-cluster/adder/sharding"
"github.com/ipfs-cluster/ipfs-cluster/adder/single"
"github.com/ipfs-cluster/ipfs-cluster/api"
"github.com/ipfs-cluster/ipfs-cluster/pstoremgr"
"github.com/ipfs-cluster/ipfs-cluster/rpcutil"
"github.com/ipfs-cluster/ipfs-cluster/state"
"github.com/ipfs-cluster/ipfs-cluster/version"
"go.uber.org/multierr"
ds "github.com/ipfs/go-datastore"
rpc "github.com/libp2p/go-libp2p-gorpc"
dual "github.com/libp2p/go-libp2p-kad-dht/dual"
host "github.com/libp2p/go-libp2p/core/host"
peer "github.com/libp2p/go-libp2p/core/peer"
peerstore "github.com/libp2p/go-libp2p/core/peerstore"
mdns "github.com/libp2p/go-libp2p/p2p/discovery/mdns"
ma "github.com/multiformats/go-multiaddr"
ocgorpc "github.com/lanzafame/go-libp2p-ocgorpc"
trace "go.opencensus.io/trace"
)
// ReadyTimeout specifies the time before giving up
// during startup (waiting for consensus to be ready)
// It may need adjustment according to timeouts in the
// consensus layer.
var ReadyTimeout = 30 * time.Second
const (
pingMetricName = "ping"
bootstrapCount = 3
reBootstrapInterval = 30 * time.Second
mdnsServiceTag = "_ipfs-cluster-discovery._udp"
maxAlerts = 1000
)
var errFollowerMode = errors.New("this peer is configured to be in follower mode. Write operations are disabled")
// Cluster is the main IPFS cluster component. It provides
// the go-API for it and orchestrates the components that make up the system.
type Cluster struct {
ctx context.Context
cancel func()
id peer.ID
config *Config
host host.Host
dht *dual.DHT
discovery mdns.Service
datastore ds.Datastore
rpcServer *rpc.Server
rpcClient *rpc.Client
peerManager *pstoremgr.Manager
consensus Consensus
apis []API
ipfs IPFSConnector
tracker PinTracker
monitor PeerMonitor
allocator PinAllocator
informers []Informer
tracer Tracer
alerts []api.Alert
alertsMux sync.Mutex
doneCh chan struct{}
readyCh chan struct{}
readyB bool
wg sync.WaitGroup
// peerAdd
paMux sync.Mutex
// shutdown function and related variables
shutdownLock sync.RWMutex
shutdownB bool
removed bool
curPingVal pingValue
}
// NewCluster builds a new IPFS Cluster peer. It initializes a LibP2P host,
// creates and RPC Server and client and sets up all components.
//
// The new cluster peer may still be performing initialization tasks when
// this call returns (consensus may still be bootstrapping). Use Cluster.Ready()
// if you need to wait until the peer is fully up.
func NewCluster(
ctx context.Context,
host host.Host,
dht *dual.DHT,
cfg *Config,
datastore ds.Datastore,
consensus Consensus,
apis []API,
ipfs IPFSConnector,
tracker PinTracker,
monitor PeerMonitor,
allocator PinAllocator,
informers []Informer,
tracer Tracer,
) (*Cluster, error) {
err := cfg.Validate()
if err != nil {
return nil, err
}
if host == nil {
return nil, errors.New("cluster host is nil")
}
if len(informers) == 0 {
return nil, errors.New("no informers are passed")
}
ctx, cancel := context.WithCancel(ctx)
listenAddrs := ""
for _, addr := range host.Addrs() {
listenAddrs += fmt.Sprintf(" %s/p2p/%s\n", addr, host.ID().Pretty())
}
logger.Infof("IPFS Cluster v%s listening on:\n%s\n", version.Version, listenAddrs)
peerManager := pstoremgr.New(ctx, host, cfg.GetPeerstorePath())
var mdnsSvc mdns.Service
if cfg.MDNSInterval > 0 {
mdnsSvc = mdns.NewMdnsService(host, mdnsServiceTag, peerManager)
err = mdnsSvc.Start()
if err != nil {
logger.Warnf("mDNS could not be started: %s", err)
}
}
c := &Cluster{
ctx: ctx,
cancel: cancel,
id: host.ID(),
config: cfg,
host: host,
dht: dht,
discovery: mdnsSvc,
datastore: datastore,
consensus: consensus,
apis: apis,
ipfs: ipfs,
tracker: tracker,
monitor: monitor,
allocator: allocator,
informers: informers,
tracer: tracer,
alerts: []api.Alert{},
peerManager: peerManager,
shutdownB: false,
removed: false,
doneCh: make(chan struct{}),
readyCh: make(chan struct{}),
readyB: false,
}
// Import known cluster peers from peerstore file and config. Set
// a non permanent TTL.
c.peerManager.ImportPeersFromPeerstore(false, peerstore.AddressTTL)
c.peerManager.ImportPeers(c.config.PeerAddresses, false, peerstore.AddressTTL)
// Attempt to connect to some peers (up to bootstrapCount)
connectedPeers := c.peerManager.Bootstrap(bootstrapCount)
// We cannot warn when count is low as this as this is normal if going
// to Join() later.
logger.Debugf("bootstrap count %d", len(connectedPeers))
// Log a ping metric for every connected peer. This will make them
// visible as peers without having to wait for them to send one.
for _, p := range connectedPeers {
if err := c.logPingMetric(ctx, p); err != nil {
logger.Warn(err)
}
}
// After setupRPC components can do their tasks with a fully operative
// routed libp2p host with some connections and a working DHT (hopefully).
err = c.setupRPC()
if err != nil {
c.Shutdown(ctx)
return nil, err
}
c.setupRPCClients()
// Note: It is very important to first call Add() once in a non-racy
// place
c.wg.Add(1)
go func() {
defer c.wg.Done()
c.ready(ReadyTimeout)
c.run()
}()
return c, nil
}
func (c *Cluster) setupRPC() error {
rpcServer, err := newRPCServer(c)
if err != nil {
return err
}
c.rpcServer = rpcServer
var rpcClient *rpc.Client
if c.config.Tracing {
csh := &ocgorpc.ClientHandler{}
rpcClient = rpc.NewClientWithServer(
c.host,
version.RPCProtocol,
rpcServer,
rpc.WithClientStatsHandler(csh),
)
} else {
rpcClient = rpc.NewClientWithServer(c.host, version.RPCProtocol, rpcServer)
}
c.rpcClient = rpcClient
return nil
}
func (c *Cluster) setupRPCClients() {
c.ipfs.SetClient(c.rpcClient)
c.tracker.SetClient(c.rpcClient)
for _, api := range c.apis {
api.SetClient(c.rpcClient)
}
c.consensus.SetClient(c.rpcClient)
c.monitor.SetClient(c.rpcClient)
c.allocator.SetClient(c.rpcClient)
for _, informer := range c.informers {
informer.SetClient(c.rpcClient)
}
}
// watchPinset triggers recurrent operations that loop on the pinset.
func (c *Cluster) watchPinset() {
ctx, span := trace.StartSpan(c.ctx, "cluster/watchPinset")
defer span.End()
stateSyncTimer := time.NewTimer(c.config.StateSyncInterval)
// Upon start, every item in the state that is not pinned will appear
// as PinError when doing a Status, we should proceed to recover
// (try pinning) all of those right away.
recoverTimer := time.NewTimer(0) // 0 so that it does an initial recover right away
// This prevents doing an StateSync while doing a RecoverAllLocal,
// which is intended behavior as for very large pinsets
for {
select {
case <-stateSyncTimer.C:
logger.Debug("auto-triggering StateSync()")
c.StateSync(ctx)
stateSyncTimer.Reset(c.config.StateSyncInterval)
case <-recoverTimer.C:
logger.Debug("auto-triggering RecoverAllLocal()")
out := make(chan api.PinInfo, 1024)
go func() {
for range out {
}
}()
err := c.RecoverAllLocal(ctx, out)
if err != nil {
logger.Error(err)
}
recoverTimer.Reset(c.config.PinRecoverInterval)
case <-c.ctx.Done():
if !stateSyncTimer.Stop() {
<-stateSyncTimer.C
}
if !recoverTimer.Stop() {
<-recoverTimer.C
}
return
}
}
}
// returns the smallest ttl from the metrics pushed by the informer.
func (c *Cluster) sendInformerMetrics(ctx context.Context, informer Informer) (time.Duration, error) {
ctx, span := trace.StartSpan(ctx, "cluster/sendInformerMetric")
defer span.End()
var minTTL time.Duration
var errors error
metrics := informer.GetMetrics(ctx)
if len(metrics) == 0 {
logger.Errorf("informer %s produced no metrics", informer.Name())
return minTTL, nil
}
for _, metric := range metrics {
if metric.Discard() { // do not publish invalid metrics
// the tags informer creates an invalid metric
// when no tags are defined.
continue
}
metric.Peer = c.id
ttl := metric.GetTTL()
if ttl > 0 && (ttl < minTTL || minTTL == 0) {
minTTL = ttl
}
err := c.monitor.PublishMetric(ctx, metric)
if multierr.AppendInto(&errors, err) {
logger.Warnf("error sending metric %s: %s", metric.Name, err)
}
}
return minTTL, errors
}
func (c *Cluster) sendInformersMetrics(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "cluster/sendInformersMetrics")
defer span.End()
var errors error
for _, informer := range c.informers {
_, err := c.sendInformerMetrics(ctx, informer)
if multierr.AppendInto(&errors, err) {
logger.Warnf("informer %s did not send all metrics", informer.Name())
}
}
return errors
}
// pushInformerMetrics loops and publishes informers metrics using the
// cluster monitor. Metrics are pushed normally at a TTL/2 rate. If an error
// occurs, they are pushed at a TTL/4 rate.
func (c *Cluster) pushInformerMetrics(ctx context.Context, informer Informer) {
ctx, span := trace.StartSpan(ctx, "cluster/pushInformerMetrics")
defer span.End()
timer := time.NewTimer(0) // fire immediately first
// retries counts how many retries we have made
retries := 0
// retryWarnMod controls how often do we log
// "error broadcasting metric".
// It will do it in the first error, and then on every
// 10th.
retryWarnMod := 10
for {
select {
case <-ctx.Done():
return
case <-timer.C:
// wait
}
minTTL, err := c.sendInformerMetrics(ctx, informer)
if minTTL == 0 {
minTTL = 30 * time.Second
}
if err != nil {
if (retries % retryWarnMod) == 0 {
logger.Errorf("error broadcasting metric: %s", err)
retries++
}
// retry sooner
timer.Reset(minTTL / 4)
continue
}
retries = 0
// send metric again in TTL/2
timer.Reset(minTTL / 2)
}
}
func (c *Cluster) sendPingMetric(ctx context.Context) (api.Metric, error) {
ctx, span := trace.StartSpan(ctx, "cluster/sendPingMetric")
defer span.End()
id := c.ID(ctx)
newPingVal := pingValue{
Peername: id.Peername,
IPFSID: id.IPFS.ID,
IPFSAddresses: publicIPFSAddresses(id.IPFS.Addresses),
}
if c.curPingVal.Valid() &&
!newPingVal.Valid() { // i.e. ipfs down
newPingVal = c.curPingVal // use last good value
}
c.curPingVal = newPingVal
v, err := json.Marshal(newPingVal)
if err != nil {
logger.Error(err)
// continue anyways
}
metric := api.Metric{
Name: pingMetricName,
Peer: c.id,
Valid: true,
Value: string(v),
}
metric.SetTTL(c.config.MonitorPingInterval * 2)
return metric, c.monitor.PublishMetric(ctx, metric)
}
// logPingMetric logs a ping metric as if it had been sent from PID. It is
// used to make peers appear available as soon as we connect to them (without
// having to wait for them to broadcast a metric).
//
// We avoid specifically sending a metric to a peer when we "connect" to it
// because: a) this requires an extra. OPEN RPC endpoint (LogMetric) that can
// be called by everyone b) We have no way of verifying that the peer ID in a
// metric pushed is actually the issuer of the metric (something the regular
// "pubsub" way of pushing metrics allows (by verifying the signature on the
// message). Thus, this reduces chances of abuse until we have something
// better.
func (c *Cluster) logPingMetric(ctx context.Context, pid peer.ID) error {
m := api.Metric{
Name: pingMetricName,
Peer: pid,
Valid: true,
}
m.SetTTL(c.config.MonitorPingInterval * 2)
return c.monitor.LogMetric(ctx, m)
}
func (c *Cluster) pushPingMetrics(ctx context.Context) {
ctx, span := trace.StartSpan(ctx, "cluster/pushPingMetrics")
defer span.End()
ticker := time.NewTicker(c.config.MonitorPingInterval)
for {
select {
case <-ctx.Done():
return
default:
}
c.sendPingMetric(ctx)
select {
case <-ctx.Done():
return
case <-ticker.C:
}
}
}
// Alerts returns the last alerts recorded by this cluster peer with the most
// recent first.
func (c *Cluster) Alerts() []api.Alert {
c.alertsMux.Lock()
alerts := make([]api.Alert, len(c.alerts))
{
total := len(alerts)
for i, a := range c.alerts {
alerts[total-1-i] = a
}
}
c.alertsMux.Unlock()
return alerts
}
// read the alerts channel from the monitor and triggers repins
func (c *Cluster) alertsHandler() {
for {
select {
case <-c.ctx.Done():
return
case alrt := <-c.monitor.Alerts():
// Follower peers do not care about alerts.
// They can do nothing about them.
if c.config.FollowerMode {
continue
}
logger.Warnf("metric alert for %s: Peer: %s.", alrt.Name, alrt.Peer)
c.alertsMux.Lock()
{
if len(c.alerts) > maxAlerts {
c.alerts = c.alerts[:0]
}
c.alerts = append(c.alerts, alrt)
}
c.alertsMux.Unlock()
if alrt.Name != pingMetricName {
continue // only handle ping alerts
}
if c.config.DisableRepinning {
logger.Debugf("repinning is disabled. Will not re-allocate pins on alerts")
return
}
cState, err := c.consensus.State(c.ctx)
if err != nil {
logger.Warn(err)
return
}
distance, err := c.distances(c.ctx, alrt.Peer)
if err != nil {
logger.Warn(err)
return
}
pinCh := make(chan api.Pin, 1024)
go func() {
err = cState.List(c.ctx, pinCh)
if err != nil {
logger.Warn(err)
}
}()
for pin := range pinCh {
if containsPeer(pin.Allocations, alrt.Peer) && distance.isClosest(pin.Cid) {
c.repinFromPeer(c.ctx, alrt.Peer, pin)
}
}
}
}
}
// detects any changes in the peerset and saves the configuration. When it
// detects that we have been removed from the peerset, it shuts down this peer.
func (c *Cluster) watchPeers() {
ticker := time.NewTicker(c.config.PeerWatchInterval)
defer ticker.Stop()
for {
select {
case <-c.ctx.Done():
return
default:
}
select {
case <-c.ctx.Done():
return
case <-ticker.C:
//logger.Debugf("%s watching peers", c.id)
hasMe := false
peers, err := c.consensus.Peers(c.ctx)
if err != nil {
logger.Error(err)
continue
}
for _, p := range peers {
if p == c.id {
hasMe = true
break
}
}
if !hasMe {
c.shutdownLock.Lock()
defer c.shutdownLock.Unlock()
logger.Info("peer no longer in peerset. Initiating shutdown")
c.removed = true
go c.Shutdown(c.ctx)
return
}
}
}
}
// reBootstrap regularly attempts to bootstrap (re-connect to peers from the
// peerstore). This should ensure that we auto-recover from situations in
// which the network was completely gone and we lost all peers.
func (c *Cluster) reBootstrap() {
ticker := time.NewTicker(reBootstrapInterval)
defer ticker.Stop()
for {
select {
case <-c.ctx.Done():
return
case <-ticker.C:
connected := c.peerManager.Bootstrap(bootstrapCount)
for _, p := range connected {
logger.Infof("reconnected to %s", p)
}
}
}
}
// find all Cids pinned to a given peer and triggers re-pins on them.
func (c *Cluster) vacatePeer(ctx context.Context, p peer.ID) {
ctx, span := trace.StartSpan(ctx, "cluster/vacatePeer")
defer span.End()
if c.config.DisableRepinning {
logger.Warnf("repinning is disabled. Will not re-allocate cids from %s", p.Pretty())
return
}
cState, err := c.consensus.State(ctx)
if err != nil {
logger.Warn(err)
return
}
pinCh := make(chan api.Pin, 1024)
go func() {
err = cState.List(ctx, pinCh)
if err != nil {
logger.Warn(err)
}
}()
for pin := range pinCh {
if containsPeer(pin.Allocations, p) {
c.repinFromPeer(ctx, p, pin)
}
}
}
// repinFromPeer triggers a repin on a given pin object blacklisting one of the
// allocations.
func (c *Cluster) repinFromPeer(ctx context.Context, p peer.ID, pin api.Pin) {
ctx, span := trace.StartSpan(ctx, "cluster/repinFromPeer")
defer span.End()
logger.Debugf("repinning %s from peer %s", pin.Cid, p)
pin.Allocations = nil // force re-allocations
// note that pin() should not result in different allocations
// if we are not under the replication-factor min.
_, ok, err := c.pin(ctx, pin, []peer.ID{p})
if ok && err == nil {
logger.Infof("repinned %s out of %s", pin.Cid, p.Pretty())
}
}
// run launches some go-routines which live throughout the cluster's life
func (c *Cluster) run() {
c.wg.Add(1)
go func() {
defer c.wg.Done()
c.watchPinset()
}()
c.wg.Add(1)
go func() {
defer c.wg.Done()
c.pushPingMetrics(c.ctx)
}()
c.wg.Add(len(c.informers))
for _, informer := range c.informers {
go func(inf Informer) {
defer c.wg.Done()
c.pushInformerMetrics(c.ctx, inf)
}(informer)
}
c.wg.Add(1)
go func() {
defer c.wg.Done()
c.watchPeers()
}()
c.wg.Add(1)
go func() {
defer c.wg.Done()
c.alertsHandler()
}()
c.wg.Add(1)
go func() {
defer c.wg.Done()
c.reBootstrap()
}()
}
func (c *Cluster) ready(timeout time.Duration) {
ctx, span := trace.StartSpan(c.ctx, "cluster/ready")
defer span.End()
// We bootstrapped first because with dirty state consensus
// may have a peerset and not find a leader so we cannot wait
// for it.
timer := time.NewTimer(timeout)
select {
case <-timer.C:
logger.Error("***** ipfs-cluster consensus start timed out (tips below) *****")
logger.Error(`
**************************************************
This peer was not able to become part of the cluster.
This might be due to one or several causes:
- Check the logs above this message for errors
- Check that there is connectivity to the "peers" multiaddresses
- Check that all cluster peers are using the same "secret"
- Check that this peer is reachable on its "listen_multiaddress" by all peers
- Check that the current cluster is healthy (has a leader). Otherwise make
sure to start enough peers so that a leader election can happen.
- Check that the peer(s) you are trying to connect to is running the
same version of IPFS-cluster.
**************************************************
`)
c.Shutdown(ctx)
return
case <-c.consensus.Ready(ctx):
// Consensus ready means the state is up to date.
case <-c.ctx.Done():
return
}
// Cluster is ready.
peers, err := c.consensus.Peers(ctx)
if err != nil {
logger.Error(err)
c.Shutdown(ctx)
return
}
logger.Info("Cluster Peers (without including ourselves):")
if len(peers) == 1 {
logger.Info(" - No other peers")
}
for _, p := range peers {
if p != c.id {
logger.Infof(" - %s", p.Pretty())
}
}
// Wait for ipfs
logger.Info("Waiting for IPFS to be ready...")
select {
case <-ctx.Done():
return
case <-c.ipfs.Ready(ctx):
ipfsid, err := c.ipfs.ID(ctx)
if err != nil {
logger.Error("IPFS signaled ready but ID() errored: ", err)
} else {
logger.Infof("IPFS is ready. Peer ID: %s", ipfsid.ID)
}
}
close(c.readyCh)
c.shutdownLock.Lock()
c.readyB = true
c.shutdownLock.Unlock()
logger.Info("** IPFS Cluster is READY **")
}
// Ready returns a channel which signals when this peer is
// fully initialized (including consensus).
func (c *Cluster) Ready() <-chan struct{} {
return c.readyCh
}
// Shutdown performs all the necessary operations to shutdown
// the IPFS Cluster peer:
// * Save peerstore with the current peers
// * Remove itself from consensus when LeaveOnShutdown is set
// * It Shutdowns all the components
// * Collects all goroutines
//
// Shutdown does not close the libp2p host, the DHT, the datastore or
// generally anything that Cluster did not create.
func (c *Cluster) Shutdown(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "cluster/Shutdown")
defer span.End()
c.shutdownLock.Lock()
defer c.shutdownLock.Unlock()
if c.shutdownB {
logger.Debug("Cluster is already shutdown")
return nil
}
logger.Info("shutting down Cluster")
// Shutdown APIs first, avoids more requests coming through.
for _, api := range c.apis {
if err := api.Shutdown(ctx); err != nil {
logger.Errorf("error stopping API: %s", err)
return err
}
}
// Cancel discovery service (this shutdowns announcing). Handling
// entries is canceled along with the context below.
if c.discovery != nil {
c.discovery.Close()
}
// Try to store peerset file for all known peers whatsoever
// if we got ready (otherwise, don't overwrite anything)
if c.readyB {
// Ignoring error since it's a best-effort
c.peerManager.SavePeerstoreForPeers(c.host.Peerstore().Peers())
}
// Only attempt to leave if:
// - consensus is initialized
// - cluster was ready (no bootstrapping error)
// - We are not removed already (means watchPeers() called us)
if c.consensus != nil && c.config.LeaveOnShutdown && c.readyB && !c.removed {
c.removed = true
_, err := c.consensus.Peers(ctx)
if err == nil {
// best effort
logger.Warn("attempting to leave the cluster. This may take some seconds")
err := c.consensus.RmPeer(ctx, c.id)
if err != nil {
logger.Error("leaving cluster: " + err.Error())
}
}
}
if con := c.consensus; con != nil {
if err := con.Shutdown(ctx); err != nil {
logger.Errorf("error stopping consensus: %s", err)
return err
}
}
// We left the cluster or were removed. Remove any consensus-specific
// state.
if c.removed && c.readyB {
err := c.consensus.Clean(ctx)
if err != nil {
logger.Error("cleaning consensus: ", err)
}
}
if err := c.monitor.Shutdown(ctx); err != nil {
logger.Errorf("error stopping monitor: %s", err)
return err
}
if err := c.ipfs.Shutdown(ctx); err != nil {
logger.Errorf("error stopping IPFS Connector: %s", err)
return err
}
if err := c.tracker.Shutdown(ctx); err != nil {
logger.Errorf("error stopping PinTracker: %s", err)
return err
}
for _, inf := range c.informers {
if err := inf.Shutdown(ctx); err != nil {
logger.Errorf("error stopping informer: %s", err)
return err
}
}
if err := c.tracer.Shutdown(ctx); err != nil {
logger.Errorf("error stopping Tracer: %s", err)
return err
}
c.cancel()
c.wg.Wait()
c.shutdownB = true
close(c.doneCh)
return nil
}
// Done provides a way to learn if the Peer has been shutdown
// (for example, because it has been removed from the Cluster)
func (c *Cluster) Done() <-chan struct{} {
return c.doneCh
}
// ID returns information about the Cluster peer
func (c *Cluster) ID(ctx context.Context) api.ID {
ctx, span := trace.StartSpan(ctx, "cluster/ID")
defer span.End()
// ignore error since it is included in response object
ipfsID, err := c.ipfs.ID(ctx)
if err != nil {
ipfsID = api.IPFSID{
Error: err.Error(),
}
}
var addrs []api.Multiaddr
mAddrs, err := peer.AddrInfoToP2pAddrs(&peer.AddrInfo{ID: c.id, Addrs: c.host.Addrs()})
if err == nil {
for _, mAddr := range mAddrs {
addrs = append(addrs, api.NewMultiaddrWithValue(mAddr))
}
}
peers := []peer.ID{}
// This method might get called very early by a remote peer
// and might catch us when consensus is not set
if c.consensus != nil {
peers, _ = c.consensus.Peers(ctx)
}
clusterPeerInfos := c.peerManager.PeerInfos(peers)
addresses := []api.Multiaddr{}
for _, pinfo := range clusterPeerInfos {
addrs, err := peer.AddrInfoToP2pAddrs(&pinfo)
if err != nil {
continue
}
for _, a := range addrs {
addresses = append(addresses, api.NewMultiaddrWithValue(a))
}
}
id := api.ID{
ID: c.id,
// PublicKey: c.host.Peerstore().PubKey(c.id),
Addresses: addrs,
ClusterPeers: peers,
ClusterPeersAddresses: addresses,
Version: version.Version.String(),
RPCProtocolVersion: version.RPCProtocol,
IPFS: ipfsID,
Peername: c.config.Peername,
}
if err != nil {
id.Error = err.Error()
}
return id
}
// PeerAdd adds a new peer to this Cluster.
//
// For it to work well, the new peer should be discoverable
// (part of our peerstore or connected to one of the existing peers)
// and reachable. Since PeerAdd allows to add peers which are
// not running, or reachable, it is recommended to call Join() from the
// new peer instead.
//
// The new peer ID will be passed to the consensus
// component to be added to the peerset.
func (c *Cluster) PeerAdd(ctx context.Context, pid peer.ID) (*api.ID, error) {
ctx, span := trace.StartSpan(ctx, "cluster/PeerAdd")
defer span.End()
c.shutdownLock.RLock()
defer c.shutdownLock.RUnlock()
if c.shutdownB {
return nil, errors.New("cluster is shutdown")
}
// starting 10 nodes on the same box for testing
// causes deadlock and a global lock here
// seems to help.
c.paMux.Lock()
defer c.paMux.Unlock()
logger.Debugf("peerAdd called with %s", pid.Pretty())
// Let the consensus layer be aware of this peer
err := c.consensus.AddPeer(ctx, pid)
if err != nil {
logger.Error(err)
id := &api.ID{ID: pid, Error: err.Error()}
return id, err
}
logger.Info("Peer added ", pid.Pretty())
addedID, err := c.getIDForPeer(ctx, pid)
if err != nil {
return addedID, err
}
if !containsPeer(addedID.ClusterPeers, c.id) {
addedID.ClusterPeers = append(addedID.ClusterPeers, c.id)
}
return addedID, nil
}
// PeerRemove removes a peer from this Cluster.
//
// The peer will be removed from the consensus peerset.
// This may first trigger repinnings for all content if not disabled.
func (c *Cluster) PeerRemove(ctx context.Context, pid peer.ID) error {
ctx, span := trace.StartSpan(ctx, "cluster/PeerRemove")
defer span.End()
// We need to repin before removing the peer, otherwise, it won't
// be able to submit the pins.
logger.Infof("re-allocating all CIDs directly associated to %s", pid)
c.vacatePeer(ctx, pid)
err := c.consensus.RmPeer(ctx, pid)
if err != nil {
logger.Error(err)
return err
}
logger.Info("Peer removed ", pid.Pretty())
return nil
}
// Join adds this peer to an existing cluster by bootstrapping to a
// given multiaddress. It works by calling PeerAdd on the destination
// cluster and making sure that the new peer is ready to discover and contact
// the rest.
func (c *Cluster) Join(ctx context.Context, addr ma.Multiaddr) error {
ctx, span := trace.StartSpan(ctx, "cluster/Join")
defer span.End()
logger.Debugf("Join(%s)", addr)
// Add peer to peerstore so we can talk to it
pid, err := c.peerManager.ImportPeer(addr, false, peerstore.PermanentAddrTTL)
if err != nil {
return err
}
if pid == c.id {
return nil
}
// Note that PeerAdd() on the remote peer will
// figure out what our real address is (obviously not
// ListenAddr).
var myID api.ID
err = c.rpcClient.CallContext(
ctx,
pid,
"Cluster",
"PeerAdd",
c.id,
&myID,
)
if err != nil {
logger.Error(err)
return err
}
// Log a fake but valid metric from the peer we are
// contacting. This will signal a CRDT component that
// we know that peer since we have metrics for it without
// having to wait for the next metric round.
if err := c.logPingMetric(ctx, pid); err != nil {
logger.Warn(err)
}
// Broadcast our metrics to the world
err = c.sendInformersMetrics(ctx)
if err != nil {
logger.Warn(err)
}
_, err = c.sendPingMetric(ctx)
if err != nil {
logger.Warn(err)
}
// We need to trigger a DHT bootstrap asap for this peer to not be
// lost if the peer it bootstrapped to goes down. We do this manually
// by triggering 1 round of bootstrap in the background.
// Note that our regular bootstrap process is still running in the
// background since we created the cluster.
c.wg.Add(1)
go func() {
defer c.wg.Done()
select {
case err := <-c.dht.LAN.RefreshRoutingTable():
if err != nil {
// this error is quite chatty
// on single peer clusters
logger.Debug(err)
}
case <-c.ctx.Done():
return
}
select {
case err := <-c.dht.WAN.RefreshRoutingTable():
if err != nil {
// this error is quite chatty
// on single peer clusters
logger.Debug(err)
}
case <-c.ctx.Done():
return
}
}()
// ConnectSwarms in the background after a while, when we have likely
// received some metrics.
time.AfterFunc(c.config.MonitorPingInterval, func() {
c.ipfs.ConnectSwarms(c.ctx)
})
// wait for leader and for state to catch up
// then sync
err = c.consensus.WaitForSync(ctx)
if err != nil {
logger.Error(err)
return err
}
// Start pinning items in the state that are not on IPFS yet.
out := make(chan api.PinInfo, 1024)
// discard outputs
go func() {
for range out {
}
}()
go c.RecoverAllLocal(c.ctx, out)
logger.Infof("%s: joined %s's cluster", c.id.Pretty(), pid.Pretty())
return nil
}
// Distances returns a distance checker using current trusted peers.
// It can optionally receive a peer ID to exclude from the checks.
func (c *Cluster) distances(ctx context.Context, exclude peer.ID) (*distanceChecker, error) {
trustedPeers, err := c.getTrustedPeers(ctx, exclude)
if err != nil {
logger.Error("could not get trusted peers:", err)
return nil, err
}
return &distanceChecker{
local: c.id,
otherPeers: trustedPeers,
cache: make(map[peer.ID]distance, len(trustedPeers)+1),
}, nil
}
// StateSync performs maintenance tasks on the global state that require
// looping through all the items. It is triggered automatically on
// StateSyncInterval. Currently it:
// - Sends unpin for expired items for which this peer is "closest"
// (skipped for follower peers)
func (c *Cluster) StateSync(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "cluster/StateSync")
defer span.End()
logger.Debug("StateSync")
if c.config.FollowerMode {
return nil
}
cState, err := c.consensus.State(ctx)
if err != nil {
return err
}
timeNow := time.Now()
// Only trigger pin operations if we are the closest with respect to
// other trusted peers. We cannot know if our peer ID is trusted by
// other peers in the Cluster. This assumes yes. Setting FollowerMode
// is a way to assume the opposite and skip this completely.
distance, err := c.distances(ctx, "")
if err != nil {
return err // could not list peers
}
clusterPins := make(chan api.Pin, 1024)
go func() {
err = cState.List(ctx, clusterPins)
if err != nil {
logger.Error(err)
}
}()
// Unpin expired items when we are the closest peer to them.
for p := range clusterPins {
if p.ExpiredAt(timeNow) && distance.isClosest(p.Cid) {
logger.Infof("Unpinning %s: pin expired at %s", p.Cid, p.ExpireAt)
if _, err := c.Unpin(ctx, p.Cid); err != nil {
logger.Error(err)
}
}
}
return nil
}
// StatusAll returns the GlobalPinInfo for all tracked Cids in all peers on
// the out channel. This is done by broacasting a StatusAll to all peers. If
// an error happens, it is returned. This method blocks until it finishes. The
// operation can be aborted by canceling the context.
func (c *Cluster) StatusAll(ctx context.Context, filter api.TrackerStatus, out chan<- api.GlobalPinInfo) error {
ctx, span := trace.StartSpan(ctx, "cluster/StatusAll")
defer span.End()
in := make(chan api.TrackerStatus, 1)
in <- filter
close(in)
return c.globalPinInfoStream(ctx, "PinTracker", "StatusAll", in, out)
}
// StatusAllLocal returns the PinInfo for all the tracked Cids in this peer on
// the out channel. It blocks until finished.
func (c *Cluster) StatusAllLocal(ctx context.Context, filter api.TrackerStatus, out chan<- api.PinInfo) error {
ctx, span := trace.StartSpan(ctx, "cluster/StatusAllLocal")
defer span.End()
return c.tracker.StatusAll(ctx, filter, out)
}
// Status returns the GlobalPinInfo for a given Cid as fetched from all
// current peers. If an error happens, the GlobalPinInfo should contain
// as much information as could be fetched from the other peers.
func (c *Cluster) Status(ctx context.Context, h api.Cid) (api.GlobalPinInfo, error) {
ctx, span := trace.StartSpan(ctx, "cluster/Status")
defer span.End()
return c.globalPinInfoCid(ctx, "PinTracker", "Status", h)
}
// StatusLocal returns this peer's PinInfo for a given Cid.
func (c *Cluster) StatusLocal(ctx context.Context, h api.Cid) api.PinInfo {
ctx, span := trace.StartSpan(ctx, "cluster/StatusLocal")
defer span.End()
return c.tracker.Status(ctx, h)
}
// used for RecoverLocal and SyncLocal.
func (c *Cluster) localPinInfoOp(
ctx context.Context,
h api.Cid,
f func(context.Context, api.Cid) (api.PinInfo, error),
) (pInfo api.PinInfo, err error) {
ctx, span := trace.StartSpan(ctx, "cluster/localPinInfoOp")
defer span.End()
cids, err := c.cidsFromMetaPin(ctx, h)
if err != nil {
return api.PinInfo{}, err
}
for _, ci := range cids {
pInfo, err = f(ctx, ci)
if err != nil {
logger.Error("tracker.SyncCid() returned with error: ", err)
logger.Error("Is the ipfs daemon running?")
break
}
}
// return the last pInfo/err, should be the root Cid if everything ok
return pInfo, err
}
// RecoverAll triggers a RecoverAllLocal operation on all peers and returns
// GlobalPinInfo objets for all recovered items. This method blocks until
// finished. Operation can be aborted by canceling the context.
func (c *Cluster) RecoverAll(ctx context.Context, out chan<- api.GlobalPinInfo) error {
ctx, span := trace.StartSpan(ctx, "cluster/RecoverAll")
defer span.End()
return c.globalPinInfoStream(ctx, "Cluster", "RecoverAllLocal", nil, out)
}
// RecoverAllLocal triggers a RecoverLocal operation for all Cids tracked
// by this peer.
//
// Recover operations ask IPFS to pin or unpin items in error state. Recover
// is faster than calling Pin on the same CID as it avoids committing an
// identical pin to the consensus layer.
//
// It returns the list of pins that were re-queued for pinning on the out
// channel. It blocks until done.
//
// RecoverAllLocal is called automatically every PinRecoverInterval.
func (c *Cluster) RecoverAllLocal(ctx context.Context, out chan<- api.PinInfo) error {
ctx, span := trace.StartSpan(ctx, "cluster/RecoverAllLocal")
defer span.End()
return c.tracker.RecoverAll(ctx, out)
}
// Recover triggers a recover operation for a given Cid in all
// cluster peers.
//
// Recover operations ask IPFS to pin or unpin items in error state. Recover
// is faster than calling Pin on the same CID as it avoids committing an
// identical pin to the consensus layer.
func (c *Cluster) Recover(ctx context.Context, h api.Cid) (api.GlobalPinInfo, error) {
ctx, span := trace.StartSpan(ctx, "cluster/Recover")
defer span.End()
return c.globalPinInfoCid(ctx, "PinTracker", "Recover", h)
}
// RecoverLocal triggers a recover operation for a given Cid in this peer only.
// It returns the updated PinInfo, after recovery.
//
// Recover operations ask IPFS to pin or unpin items in error state. Recover
// is faster than calling Pin on the same CID as it avoids committing an
// identical pin to the consensus layer.
func (c *Cluster) RecoverLocal(ctx context.Context, h api.Cid) (api.PinInfo, error) {
ctx, span := trace.StartSpan(ctx, "cluster/RecoverLocal")
defer span.End()
return c.localPinInfoOp(ctx, h, c.tracker.Recover)
}
// Pins sends pins on the given out channel as it iterates the full
// pinset (current global state). This is the source of truth as to which pins
// are managed and their allocation, but does not indicate if the item is
// successfully pinned. For that, use the Status*() methods.
//
// The operation can be aborted by canceling the context. This methods blocks
// until the operation has completed.
func (c *Cluster) Pins(ctx context.Context, out chan<- api.Pin) error {
ctx, span := trace.StartSpan(ctx, "cluster/Pins")
defer span.End()
cState, err := c.consensus.State(ctx)
if err != nil {
logger.Error(err)
return err
}
return cState.List(ctx, out)
}
// pinsSlice returns the list of Cids managed by Cluster and which are part
// of the current global state. This is the source of truth as to which
// pins are managed and their allocation, but does not indicate if
// the item is successfully pinned. For that, use StatusAll().
//
// It is recommended to use PinsChannel(), as this method is equivalent to
// loading the full pinset in memory!
func (c *Cluster) pinsSlice(ctx context.Context) ([]api.Pin, error) {
out := make(chan api.Pin, 1024)
var err error
go func() {
err = c.Pins(ctx, out)
}()
var pins []api.Pin
for pin := range out {
pins = append(pins, pin)
}
return pins, err
}
// PinGet returns information for a single Cid managed by Cluster.
// The information is obtained from the current global state. The
// returned api.Pin provides information about the allocations
// assigned for the requested Cid, but does not indicate if
// the item is successfully pinned. For that, use Status(). PinGet
// returns an error if the given Cid is not part of the global state.
func (c *Cluster) PinGet(ctx context.Context, h api.Cid) (api.Pin, error) {
ctx, span := trace.StartSpan(ctx, "cluster/PinGet")
defer span.End()
st, err := c.consensus.State(ctx)
if err != nil {
return api.Pin{}, err
}
pin, err := st.Get(ctx, h)
if err != nil {
return api.Pin{}, err
}
return pin, nil
}
// Pin makes the cluster Pin a Cid. This implies adding the Cid
// to the IPFS Cluster peers shared-state. Depending on the cluster
// pinning strategy, the PinTracker may then request the IPFS daemon
// to pin the Cid.
//
// Pin returns the Pin as stored in the global state (with the given
// allocations and an error if the operation could not be persisted. Pin does
// not reflect the success or failure of underlying IPFS daemon pinning
// operations which happen in async fashion.
//
// If the options UserAllocations are non-empty then these peers are pinned
// with priority over other peers in the cluster. If the max repl factor is
// less than the size of the specified peerset then peers are chosen from this
// set in allocation order. If the minimum repl factor is greater than the
// size of this set then the remaining peers are allocated in order from the
// rest of the cluster. Priority allocations are best effort. If any priority
// peers are unavailable then Pin will simply allocate from the rest of the
// cluster.
//
// If the Update option is set, the pin options (including allocations) will
// be copied from an existing one. This is equivalent to running PinUpdate.
func (c *Cluster) Pin(ctx context.Context, h api.Cid, opts api.PinOptions) (api.Pin, error) {
ctx, span := trace.StartSpan(ctx, "cluster/Pin")
defer span.End()
pin := api.PinWithOpts(h, opts)
result, _, err := c.pin(ctx, pin, []peer.ID{})
return result, err
}
// sets the default replication factor in a pin when it's set to 0
func (c *Cluster) setupReplicationFactor(pin api.Pin) (api.Pin, error) {
rplMin := pin.ReplicationFactorMin
rplMax := pin.ReplicationFactorMax
if rplMin == 0 {
rplMin = c.config.ReplicationFactorMin
pin.ReplicationFactorMin = rplMin
}
if rplMax == 0 {
rplMax = c.config.ReplicationFactorMax
pin.ReplicationFactorMax = rplMax
}
// When pinning everywhere, remove all allocations.
// Allocations may have been preset by the adder
// for the cases when the replication factor is > -1.
// Fixes part of #1319: allocations when adding
// are kept.
if pin.IsPinEverywhere() {
pin.Allocations = nil
}
return pin, isReplicationFactorValid(rplMin, rplMax)
}
// basic checks on the pin type to check it's well-formed.
func checkPinType(pin api.Pin) error {
switch pin.Type {
case api.DataType:
if pin.Reference != nil {
return errors.New("data pins should not reference other pins")
}
case api.ShardType:
if pin.MaxDepth != 1 {
return errors.New("must pin shards go depth 1")
}
// FIXME: indirect shard pins could have max-depth 2
// FIXME: repinning a shard type will overwrite replication
// factor from previous:
// if existing.ReplicationFactorMin != rplMin ||
// existing.ReplicationFactorMax != rplMax {
// return errors.New("shard update with wrong repl factors")
//}
case api.ClusterDAGType:
if pin.MaxDepth != 0 {
return errors.New("must pin roots directly")
}
if pin.Reference == nil {
return errors.New("clusterDAG pins should reference a Meta pin")
}
case api.MetaType:
if len(pin.Allocations) != 0 {
return errors.New("meta pin should not specify allocations")
}
if pin.Reference == nil {
return errors.New("metaPins should reference a ClusterDAG")
}
default:
return errors.New("unrecognized pin type")
}
return nil
}
// setupPin ensures that the Pin object is fit for pinning. We check
// and set the replication factors and ensure that the pinType matches the
// metadata consistently.
func (c *Cluster) setupPin(ctx context.Context, pin, existing api.Pin) (api.Pin, error) {
_, span := trace.StartSpan(ctx, "cluster/setupPin")
defer span.End()
var err error
pin, err = c.setupReplicationFactor(pin)
if err != nil {
return pin, err
}
if !pin.ExpireAt.IsZero() && pin.ExpireAt.Before(time.Now()) {
return pin, errors.New("pin.ExpireAt set before current time")
}
if !existing.Defined() {
return pin, nil
}
// If an pin CID is already pin, we do a couple more checks
if existing.Type != pin.Type {
msg := "cannot repin CID with different tracking method, "
msg += "clear state with pin rm to proceed. "
msg += "New: %s. Was: %s"
return pin, fmt.Errorf(msg, pin.Type, existing.Type)
}
if existing.Mode == api.PinModeRecursive && pin.Mode != api.PinModeRecursive {
msg := "cannot repin a CID which is already pinned in "
msg += "recursive mode (new pin is pinned as %s). Unpin it first."
return pin, fmt.Errorf(msg, pin.Mode)
}
return pin, checkPinType(pin)
}
// pin performs the actual pinning and supports a blacklist to be able to
// evacuate a node and returns the pin object that it tried to pin, whether
// the pin was submitted to the consensus layer or skipped (due to error or to
// the fact that it was already valid) and error.
//
// This is the method called by the Cluster.Pin RPC endpoint.
func (c *Cluster) pin(
ctx context.Context,
pin api.Pin,
blacklist []peer.ID,
) (api.Pin, bool, error) {
ctx, span := trace.StartSpan(ctx, "cluster/pin")
defer span.End()
if c.config.FollowerMode {
return api.Pin{}, false, errFollowerMode
}
if !pin.Cid.Defined() {
return pin, false, errors.New("bad pin object")
}
// Handle pin updates when the option is set
if update := pin.PinUpdate; update.Defined() && !update.Equals(pin.Cid) {
pin, err := c.PinUpdate(ctx, update, pin.Cid, pin.PinOptions)
return pin, true, err
}
existing, err := c.PinGet(ctx, pin.Cid)
if err != nil && err != state.ErrNotFound {
return pin, false, err
}
pin, err = c.setupPin(ctx, pin, existing)
if err != nil {
return pin, false, err
}
// Set the Pin timestamp to now(). This is not an user-controllable
// "option".
pin.Timestamp = time.Now()
if pin.Type == api.MetaType {
return pin, true, c.consensus.LogPin(ctx, pin)
}
// Usually allocations are unset when pinning normally, however, the
// allocations may have been preset by the adder in which case they
// need to be respected. Whenever allocations are set. We don't
// re-allocate. repinFromPeer() unsets allocations for this reason.
// allocate() will check which peers are currently allocated
// and try to respect them.
if len(pin.Allocations) == 0 {
// If replication factor is -1, this will return empty
// allocations.
allocs, err := c.allocate(
ctx,
pin.Cid,
existing,
pin.ReplicationFactorMin,
pin.ReplicationFactorMax,
blacklist,
pin.UserAllocations,
)
if err != nil {
return pin, false, err
}
pin.Allocations = allocs
}
// If this is true, replication factor should be -1.
if len(pin.Allocations) == 0 {
logger.Infof("pinning %s everywhere:", pin.Cid)
} else {
logger.Infof("pinning %s on %s:", pin.Cid, pin.Allocations)
}
return pin, true, c.consensus.LogPin(ctx, pin)
}
// Unpin removes a previously pinned Cid from Cluster. It returns
// the global state Pin object as it was stored before removal, or
// an error if it was not possible to update the global state.
//
// Unpin does not reflect the success or failure of underlying IPFS daemon
// unpinning operations, which happen in async fashion.
func (c *Cluster) Unpin(ctx context.Context, h api.Cid) (api.Pin, error) {
ctx, span := trace.StartSpan(ctx, "cluster/Unpin")
defer span.End()
if c.config.FollowerMode {
return api.Pin{}, errFollowerMode
}
logger.Info("IPFS cluster unpinning:", h)
pin, err := c.PinGet(ctx, h)
if err != nil {
return api.Pin{}, err
}
switch pin.Type {
case api.DataType:
return pin, c.consensus.LogUnpin(ctx, pin)
case api.ShardType:
err := "cannot unpin a shard directly. Unpin content root CID instead"
return pin, errors.New(err)
case api.MetaType:
// Unpin cluster dag and referenced shards
err := c.unpinClusterDag(ctx, pin)
if err != nil {
return pin, err
}
return pin, c.consensus.LogUnpin(ctx, pin)
case api.ClusterDAGType:
err := "cannot unpin a Cluster DAG directly. Unpin content root CID instead"
return pin, errors.New(err)
default:
return pin, errors.New("unrecognized pin type")
}
}
// unpinClusterDag unpins the clusterDAG metadata node and the shard metadata
// nodes that it references. It handles the case where multiple parents
// reference the same metadata node, only unpinning those nodes without
// existing references
func (c *Cluster) unpinClusterDag(ctx context.Context, metaPin api.Pin) error {
cids, err := c.cidsFromMetaPin(ctx, metaPin.Cid)
if err != nil {
return err
}
// TODO: FIXME: potentially unpinning shards which are referenced
// by other clusterDAGs.
for _, ci := range cids {
err = c.consensus.LogUnpin(ctx, api.PinCid(ci))
if err != nil {
return err
}
}
return nil
}
// PinUpdate pins a new CID based on an existing cluster Pin. The allocations
// and most pin options (replication factors) are copied from the existing
// Pin. The options object can be used to set the Name for the new pin and
// might support additional options in the future.
//
// The from pin is NOT unpinned upon completion. The new pin might take
// advantage of efficient pin/update operation on IPFS-side (if the
// IPFSConnector supports it - the default one does). This may offer
// significant speed when pinning items which are similar to previously pinned
// content.
func (c *Cluster) PinUpdate(ctx context.Context, from api.Cid, to api.Cid, opts api.PinOptions) (api.Pin, error) {
existing, err := c.PinGet(ctx, from)
if err != nil { // including when the existing pin is not found
return api.Pin{}, err
}
// Hector: I am not sure whether it has any point to update something
// like a MetaType.
if existing.Type != api.DataType {
return api.Pin{}, errors.New("this pin type cannot be updated")
}
existing.Cid = to
existing.PinUpdate = from
existing.Timestamp = time.Now()
if opts.Name != "" {
existing.Name = opts.Name
}
if !opts.ExpireAt.IsZero() && opts.ExpireAt.After(time.Now()) {
existing.ExpireAt = opts.ExpireAt
}
return existing, c.consensus.LogPin(ctx, existing)
}
// PinPath pins an CID resolved from its IPFS Path. It returns the resolved
// Pin object.
func (c *Cluster) PinPath(ctx context.Context, path string, opts api.PinOptions) (api.Pin, error) {
ctx, span := trace.StartSpan(ctx, "cluster/PinPath")
defer span.End()
ci, err := c.ipfs.Resolve(ctx, path)
if err != nil {
return api.Pin{}, err
}
return c.Pin(ctx, ci, opts)
}
// UnpinPath unpins a CID resolved from its IPFS Path. If returns the
// previously pinned Pin object.
func (c *Cluster) UnpinPath(ctx context.Context, path string) (api.Pin, error) {
ctx, span := trace.StartSpan(ctx, "cluster/UnpinPath")
defer span.End()
ci, err := c.ipfs.Resolve(ctx, path)
if err != nil {
return api.Pin{}, err
}
return c.Unpin(ctx, ci)
}
// AddFile adds a file to the ipfs daemons of the cluster. The ipfs importer
// pipeline is used to DAGify the file. Depending on input parameters this
// DAG can be added locally to the calling cluster peer's ipfs repo, or
// sharded across the entire cluster.
func (c *Cluster) AddFile(ctx context.Context, reader *multipart.Reader, params api.AddParams) (api.Cid, error) {
// TODO: add context param and tracing
var dags adder.ClusterDAGService
if params.Shard {
dags = sharding.New(ctx, c.rpcClient, params, nil)
} else {
dags = single.New(ctx, c.rpcClient, params, params.Local)
}
defer dags.Close()
add := adder.New(dags, params, nil)
return add.FromMultipart(ctx, reader)
}
// Version returns the current IPFS Cluster version.
func (c *Cluster) Version() string {
return version.Version.String()
}
// Peers returns the IDs of the members of this Cluster on the out channel.
// This method blocks until it has finished.
func (c *Cluster) Peers(ctx context.Context, out chan<- api.ID) {
ctx, span := trace.StartSpan(ctx, "cluster/Peers")
defer span.End()
peers, err := c.consensus.Peers(ctx)
if err != nil {
logger.Error(err)
logger.Error("an empty list of peers will be returned")
close(out)
return
}
c.peersWithFilter(ctx, peers, out)
}
// requests IDs from a given number of peers.
func (c *Cluster) peersWithFilter(ctx context.Context, peers []peer.ID, out chan<- api.ID) {
defer close(out)
// We should be done relatively quickly with this call. Otherwise
// report errors.
timeout := 15 * time.Second
ctxCall, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
in := make(chan struct{})
close(in)
idsOut := make(chan api.ID, len(peers))
errCh := make(chan []error, 1)
go func() {
defer close(errCh)
errCh <- c.rpcClient.MultiStream(
ctxCall,
peers,
"Cluster",
"IDStream",
in,
idsOut,
)
}()
// Unfortunately, we need to use idsOut as intermediary channel
// because it is closed when MultiStream ends and we cannot keep
// adding things on it (the errors below).
for id := range idsOut {
select {
case <-ctx.Done():
logger.Errorf("Peers call aborted: %s", ctx.Err())
return
case out <- id:
}
}
// ErrCh will always be closed on context cancellation too.
errs := <-errCh
for i, err := range errs {
if err == nil {
continue
}
if rpc.IsAuthorizationError(err) {
continue
}
select {
case <-ctx.Done():
logger.Errorf("Peers call aborted: %s", ctx.Err())
case out <- api.ID{
ID: peers[i],
Error: err.Error(),
}:
}
}
}
// getTrustedPeers gives listed of trusted peers except the current peer and
// the excluded peer if provided.
func (c *Cluster) getTrustedPeers(ctx context.Context, exclude peer.ID) ([]peer.ID, error) {
peers, err := c.consensus.Peers(ctx)
if err != nil {
return nil, err
}
trustedPeers := make([]peer.ID, 0, len(peers))
for _, p := range peers {
if p == c.id || p == exclude || !c.consensus.IsTrustedPeer(ctx, p) {
continue
}
trustedPeers = append(trustedPeers, p)
}
return trustedPeers, nil
}
func (c *Cluster) setTrackerStatus(gpin *api.GlobalPinInfo, h api.Cid, peers []peer.ID, status api.TrackerStatus, pin api.Pin, t time.Time) {
for _, p := range peers {
pv := pingValueFromMetric(c.monitor.LatestForPeer(c.ctx, pingMetricName, p))
gpin.Add(api.PinInfo{
Cid: h,
Name: pin.Name,
Allocations: pin.Allocations,
Origins: pin.Origins,
Created: pin.Timestamp,
Metadata: pin.Metadata,
Peer: p,
PinInfoShort: api.PinInfoShort{
PeerName: pv.Peername,
IPFS: pv.IPFSID,
IPFSAddresses: pv.IPFSAddresses,
Status: status,
TS: t,
},
})
}
}
func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h api.Cid) (api.GlobalPinInfo, error) {
ctx, span := trace.StartSpan(ctx, "cluster/globalPinInfoCid")
defer span.End()
// The object we will return
gpin := api.GlobalPinInfo{}
// allocated peers, we will contact them through rpc
var dests []peer.ID
// un-allocated peers, we will set remote status
var remote []peer.ID
timeNow := time.Now()
// If pin is not part of the pinset, mark it unpinned
pin, err := c.PinGet(ctx, h)
if err != nil && err != state.ErrNotFound {
logger.Error(err)
return api.GlobalPinInfo{}, err
}
// When NotFound return directly with an unpinned
// status.
if err == state.ErrNotFound {
var members []peer.ID
if c.config.FollowerMode {
members = []peer.ID{c.host.ID()}
} else {
members, err = c.consensus.Peers(ctx)
if err != nil {
logger.Error(err)
return api.GlobalPinInfo{}, err
}
}
c.setTrackerStatus(
&gpin,
h,
members,
api.TrackerStatusUnpinned,
api.PinCid(h),
timeNow,
)
return gpin, nil
}
// The pin exists.
gpin.Cid = h
gpin.Name = pin.Name
// Make the list of peers that will receive the request.
if c.config.FollowerMode {
// during follower mode return only local status.
dests = []peer.ID{c.host.ID()}
remote = []peer.ID{}
} else {
members, err := c.consensus.Peers(ctx)
if err != nil {
logger.Error(err)
return api.GlobalPinInfo{}, err
}
if !pin.IsPinEverywhere() {
dests = pin.Allocations
remote = peersSubtract(members, dests)
} else {
dests = members
remote = []peer.ID{}
}
}
// set status remote on un-allocated peers
c.setTrackerStatus(&gpin, h, remote, api.TrackerStatusRemote, pin, timeNow)
lenDests := len(dests)
replies := make([]api.PinInfo, lenDests)
// a globalPinInfo type of request should be relatively fast. We
// cannot block response indefinitely due to an unresponsive node.
timeout := 15 * time.Second
ctxs, cancels := rpcutil.CtxsWithTimeout(ctx, lenDests, timeout)
defer rpcutil.MultiCancel(cancels)
errs := c.rpcClient.MultiCall(
ctxs,
dests,
comp,
method,
h,
rpcutil.CopyPinInfoToIfaces(replies),
)
for i, r := range replies {
e := errs[i]
// No error. Parse and continue
if e == nil {
gpin.Add(r)
continue
}
if rpc.IsAuthorizationError(e) {
logger.Debug("rpc auth error:", e)
continue
}
// Deal with error cases (err != nil): wrap errors in PinInfo
logger.Errorf("%s: error in broadcast response from %s: %s ", c.id, dests[i], e)
pv := pingValueFromMetric(c.monitor.LatestForPeer(ctx, pingMetricName, dests[i]))
gpin.Add(api.PinInfo{
Cid: h,
Name: pin.Name,
Peer: dests[i],
Allocations: pin.Allocations,
Origins: pin.Origins,
Created: pin.Timestamp,
Metadata: pin.Metadata,
PinInfoShort: api.PinInfoShort{
PeerName: pv.Peername,
IPFS: pv.IPFSID,
IPFSAddresses: pv.IPFSAddresses,
Status: api.TrackerStatusClusterError,
TS: timeNow,
Error: e.Error(),
},
})
}
return gpin, nil
}
func (c *Cluster) globalPinInfoStream(ctx context.Context, comp, method string, inChan interface{}, out chan<- api.GlobalPinInfo) error {
defer close(out)
ctx, span := trace.StartSpan(ctx, "cluster/globalPinInfoStream")
defer span.End()
if inChan == nil {
emptyChan := make(chan struct{})
close(emptyChan)
inChan = emptyChan
}
fullMap := make(map[api.Cid]api.GlobalPinInfo)
var members []peer.ID
var err error
if c.config.FollowerMode {
members = []peer.ID{c.host.ID()}
} else {
members, err = c.consensus.Peers(ctx)
if err != nil {
logger.Error(err)
return err
}
}
// We don't have a good timeout proposal for this. Depending on the
// size of the state and the peformance of IPFS and the network, this
// may take moderately long.
// If we did, this is the place to put it.
ctx, cancel := context.WithCancel(ctx)
defer cancel()
msOut := make(chan api.PinInfo)
errsCh := make(chan []error, 1)
go func() {
defer close(errsCh)
errsCh <- c.rpcClient.MultiStream(
ctx,
members,
comp,
method,
inChan,
msOut,
)
}()
setPinInfo := func(p api.PinInfo) {
if !p.Defined() {
return
}
info, ok := fullMap[p.Cid]
if !ok {
info = api.GlobalPinInfo{}
}
info.Add(p)
// Set the new/updated info
fullMap[p.Cid] = info
}
// make the big collection.
for pin := range msOut {
setPinInfo(pin)
}
// This WAITs until MultiStream is DONE.
erroredPeers := make(map[peer.ID]string)
errs, ok := <-errsCh
if ok {
for i, err := range errs {
if err == nil {
continue
}
if rpc.IsAuthorizationError(err) {
logger.Debug("rpc auth error", err)
continue
}
logger.Errorf("%s: error in broadcast response from %s: %s ", c.id, members[i], err)
erroredPeers[members[i]] = err.Error()
}
}
// Merge any errors
for p, msg := range erroredPeers {
pv := pingValueFromMetric(c.monitor.LatestForPeer(ctx, pingMetricName, p))
for c := range fullMap {
setPinInfo(api.PinInfo{
Cid: c,
Name: "",
Peer: p,
Allocations: nil,
Origins: nil,
// Created: // leave unitialized
Metadata: nil,
PinInfoShort: api.PinInfoShort{
PeerName: pv.Peername,
IPFS: pv.IPFSID,
IPFSAddresses: pv.IPFSAddresses,
Status: api.TrackerStatusClusterError,
TS: time.Now(),
Error: msg,
},
})
}
}
for _, v := range fullMap {
select {
case <-ctx.Done():
err := fmt.Errorf("%s.%s aborted: %w", comp, method, ctx.Err())
logger.Error(err)
return err
case out <- v:
}
}
return nil
}
func (c *Cluster) getIDForPeer(ctx context.Context, pid peer.ID) (*api.ID, error) {
ctx, span := trace.StartSpan(ctx, "cluster/getIDForPeer")
defer span.End()
var id api.ID
err := c.rpcClient.CallContext(
ctx,
pid,
"Cluster",
"ID",
struct{}{},
&id,
)
if err != nil {
logger.Error(err)
id.ID = pid
id.Error = err.Error()
}
return &id, err
}
// cidsFromMetaPin expands a meta-pin and returns a list of Cids that
// Cluster handles for it: the ShardPins, the ClusterDAG and the MetaPin, in
// that order (the MetaPin is the last element).
// It returns a slice with only the given Cid if it's not a known Cid or not a
// MetaPin.
func (c *Cluster) cidsFromMetaPin(ctx context.Context, h api.Cid) ([]api.Cid, error) {
ctx, span := trace.StartSpan(ctx, "cluster/cidsFromMetaPin")
defer span.End()
cState, err := c.consensus.State(ctx)
if err != nil {
return nil, err
}
list := []api.Cid{h}
pin, err := cState.Get(ctx, h)
if err != nil {
return nil, err
}
if pin.Type != api.MetaType {
return list, nil
}
if pin.Reference == nil {
return nil, errors.New("metaPin.Reference is unset")
}
list = append([]api.Cid{*pin.Reference}, list...)
clusterDagPin, err := c.PinGet(ctx, *pin.Reference)
if err != nil {
return list, fmt.Errorf("could not get clusterDAG pin from state. Malformed pin?: %s", err)
}
clusterDagBlock, err := c.ipfs.BlockGet(ctx, clusterDagPin.Cid)
if err != nil {
return list, fmt.Errorf("error reading clusterDAG block from ipfs: %s", err)
}
clusterDagNode, err := sharding.CborDataToNode(clusterDagBlock, "cbor")
if err != nil {
return list, fmt.Errorf("error parsing clusterDAG block: %s", err)
}
for _, l := range clusterDagNode.Links() {
list = append([]api.Cid{api.NewCid(l.Cid)}, list...)
}
return list, nil
}
// // diffPeers returns the peerIDs added and removed from peers2 in relation to
// // peers1
// func diffPeers(peers1, peers2 []peer.ID) (added, removed []peer.ID) {
// m1 := make(map[peer.ID]struct{})
// m2 := make(map[peer.ID]struct{})
// added = make([]peer.ID, 0)
// removed = make([]peer.ID, 0)
// if peers1 == nil && peers2 == nil {
// return
// }
// if peers1 == nil {
// added = peers2
// return
// }
// if peers2 == nil {
// removed = peers1
// return
// }
// for _, p := range peers1 {
// m1[p] = struct{}{}
// }
// for _, p := range peers2 {
// m2[p] = struct{}{}
// }
// for k := range m1 {
// _, ok := m2[k]
// if !ok {
// removed = append(removed, k)
// }
// }
// for k := range m2 {
// _, ok := m1[k]
// if !ok {
// added = append(added, k)
// }
// }
// return
// }
// RepoGC performs garbage collection sweep on all peers' IPFS repo.
func (c *Cluster) RepoGC(ctx context.Context) (api.GlobalRepoGC, error) {
ctx, span := trace.StartSpan(ctx, "cluster/RepoGC")
defer span.End()
members, err := c.consensus.Peers(ctx)
if err != nil {
logger.Error(err)
return api.GlobalRepoGC{}, err
}
// to club `RepoGCLocal` responses of all peers into one
globalRepoGC := api.GlobalRepoGC{PeerMap: make(map[string]api.RepoGC)}
for _, member := range members {
var repoGC api.RepoGC
err = c.rpcClient.CallContext(
ctx,
member,
"Cluster",
"RepoGCLocal",
struct{}{},
&repoGC,
)
if err == nil {
globalRepoGC.PeerMap[member.String()] = repoGC
continue
}
if rpc.IsAuthorizationError(err) {
logger.Debug("rpc auth error:", err)
continue
}
logger.Errorf("%s: error in broadcast response from %s: %s ", c.id, member, err)
pv := pingValueFromMetric(c.monitor.LatestForPeer(ctx, pingMetricName, member))
globalRepoGC.PeerMap[member.String()] = api.RepoGC{
Peer: member,
Peername: pv.Peername,
Keys: []api.IPFSRepoGC{},
Error: err.Error(),
}
}
return globalRepoGC, nil
}
// RepoGCLocal performs garbage collection only on the local IPFS deamon.
func (c *Cluster) RepoGCLocal(ctx context.Context) (api.RepoGC, error) {
ctx, span := trace.StartSpan(ctx, "cluster/RepoGCLocal")
defer span.End()
resp, err := c.ipfs.RepoGC(ctx)
if err != nil {
return api.RepoGC{}, err
}
resp.Peer = c.id
resp.Peername = c.config.Peername
return resp, nil
}