5fb2b6ae95
Squashed commit of the following: commit 38cf569c6aed77c46ee4e0f8baa4d1a9daf8f03e Merge:d125f69
aaada42 Author: Hector Sanjuan <hsanjuan@users.noreply.github.com> Date: Wed Feb 20 11:02:00 2019 +0000 Merge pull request #634 from ipfs/issue_450 Support PinPath, UnpinPath (resolve before pinning) commit aaada42054e1f1c7b2abb1270859d0de41a0e5d8 Author: Kishan Mohanbhai Sagathiya <kishansagathiya@gmail.com> Date: Tue Feb 19 22:16:25 2019 +0530 formatResponse accepts api.Pin and not api.PinSerial commit b5da4bea045865814cc422da71827b44ddd44b90 Merge: ba59036cc8dd7e
Author: Kishan Mohanbhai Sagathiya <kishansagathiya@gmail.com> Date: Tue Feb 19 21:36:46 2019 +0530 Merge branch 'master' into issue_450 commit ba5903649c1df1dba20f4d6f7e3573d6fe24921f Merge: f002914 d59880c Author: Kishan Mohanbhai Sagathiya <kishansagathiya@gmail.com> Date: Mon Feb 18 08:41:11 2019 +0530 Merge branch 'issue_450' of github.com:ipfs/ipfs-cluster into issue_450 commit f00291494c0c02621c2296cbb7ac71e4c23aa9ec Author: Kishan Mohanbhai Sagathiya <kishansagathiya@gmail.com> Date: Mon Feb 18 08:31:39 2019 +0530 PinPath: more improvements Added tracing for new methods commit d59880c338eaa8214fe06b4f930a540793d78407 Merge: 0ca4c7cb4f0eb3
Author: Hector Sanjuan <hsanjuan@users.noreply.github.com> Date: Wed Feb 13 15:22:49 2019 +0000 Merge branch 'master' into issue_450 commit 0ca4c7c3b0670ed9c8279f8274d36e3485c10030 Merge: d35017aecef9ea
Author: Kishan Mohanbhai Sagathiya <kishansagathiya@gmail.com> Date: Tue Feb 12 13:10:13 2019 +0530 Merge branch 'master' into issue_450 commit d35017a8de91ca9fc9a9a047c48c75134cee9f98 Author: Kishan Mohanbhai Sagathiya <kishansagathiya@gmail.com> Date: Tue Feb 12 13:07:25 2019 +0530 PinPath: more improvements - Worth having `PinOptions` as a separate field in the struct and constructing the query in the test with ToQuery() - sharness: "intialization" line can be placed outside the tests at the top commit 68e3b90417ffbad89d41a70ac81d85f9037f8848 Author: Kishan Mohanbhai Sagathiya <kishansagathiya@gmail.com> Date: Sun Feb 10 21:43:50 2019 +0530 Using if-continue pattern instead of if-else License: MIT Signed-off-by: Kishan Mohanbhai Sagathiya <kishansagathiya@gmail.com> commit 3c29799f3b85be328b27508332ab92049d8b82f3 Merge: 956790b4324889
Author: Kishan Mohanbhai Sagathiya <kishansagathiya@gmail.com> Date: Thu Feb 7 10:25:52 2019 +0530 Merge branch 'master' into issue_450 License: MIT Signed-off-by: Kishan Mohanbhai Sagathiya <kishansagathiya@gmail.com> commit 956790b381db9858e4194f983e898b07dc51ba66 Author: Kishan Mohanbhai Sagathiya <kishansagathiya@gmail.com> Date: Wed Feb 6 21:11:20 2019 +0530 Removing resolved path License: MIT Signed-off-by: Kishan Mohanbhai Sagathiya <kishansagathiya@gmail.com> commit 7191cc46cedfbec116a9746937e28881b50ca044 Author: Kishan Mohanbhai Sagathiya <kishansagathiya@gmail.com> Date: Wed Feb 6 16:45:07 2019 +0530 Fix go vet License: MIT Signed-off-by: Kishan Mohanbhai Sagathiya <kishansagathiya@gmail.com> commit f8b3d5b63b1b7569e2a3e0d82894fd4491c246c4 Author: Kishan Mohanbhai Sagathiya <kishansagathiya@gmail.com> Date: Wed Feb 6 16:07:03 2019 +0530 Fixed linting error License: MIT Signed-off-by: Kishan Mohanbhai Sagathiya <kishansagathiya@gmail.com> commit 23c57eb467755a1f21387a1615a7f34e97348053 Author: Kishan Mohanbhai Sagathiya <kishansagathiya@gmail.com> Date: Wed Feb 6 09:20:41 2019 +0530 Fixed tests License: MIT Signed-off-by: Kishan Mohanbhai Sagathiya <kishansagathiya@gmail.com> commit 0caedd94aefeb3b6649dedc214cb4b849ace2ea4 Merge: 17e555e5a7ee1d
Author: Kishan Mohanbhai Sagathiya <kishansagathiya@gmail.com> Date: Wed Feb 6 00:07:10 2019 +0530 Merge branch 'master' into issue_450 License: MIT Signed-off-by: Kishan Mohanbhai Sagathiya <kishansagathiya@gmail.com> commit 17e555e4a7c574413df90aac70c5cc29cab98f54 Author: Hector Sanjuan <code@hector.link> Date: Tue Feb 5 16:58:50 2019 +0000 PinPath: address some feedback + improvements * Changed client's Pin() API and PinPath to be consistent * Added helper methods to turn PinPath to query and back * Make code and tests build * Use TestCidResolved everywhere * Fix cluster.PinPath arguments * Fix formatting of responses with --no-status * Make tests readable and call Fatal when needed * Use a pathTestCases variable commit f0e7369c47c5ddadc8ed45df5fd2d4d9b2d42b38 Author: Kishan Mohanbhai Sagathiya <kishansagathiya@gmail.com> Date: Tue Feb 5 18:34:26 2019 +0530 Support PinPath, UnpinPath(resolve before pinning) Addressed review comments as in https://github.com/ipfs/ipfs-cluster/pull/634#pullrequestreview-198751932 License: MIT Signed-off-by: Kishan Mohanbhai Sagathiya <kishansagathiya@gmail.com> commit a8b4f181d2d7afed32ee41331dfaab19fd66a173 Author: Kishan Mohanbhai Sagathiya <kishansagathiya@gmail.com> Date: Tue Jan 29 22:41:27 2019 +0530 Fixing tests License: MIT Signed-off-by: Kishan Mohanbhai Sagathiya <kishansagathiya@gmail.com> commit e39b95ca19e4d75506f4f492678245ef13936a44 Author: Kishan Mohanbhai Sagathiya <kishansagathiya@gmail.com> Date: Tue Jan 29 14:52:53 2019 +0530 Support PinPath, UnpinPath(resolve before pinning) - PinPath and UnpinPath should return api.Pin - PinPath should accept pin options - Removing duplicate logic for Resolve from cluster - And many other review comments https://github.com/ipfs/ipfs-cluster/pull/634#pullrequestreview-195509504 License: MIT Signed-off-by: Kishan Mohanbhai Sagathiya <kishansagathiya@gmail.com> commit d146075126320896665ba58d337a13789f68ea86 Author: Kishan Mohanbhai Sagathiya <kishansagathiya@gmail.com> Date: Wed Jan 23 17:08:41 2019 +0530 Support PinPath, UnpinPath(resolve before pinning) PinPath(in both rest and rpc) should return a serializable struct in the form `{"\":"Q...cid..string..."}` (as used in "github.com/ipfs/go-cid" to marshal and unmarshal) License: MIT Signed-off-by: Kishan Mohanbhai Sagathiya <kishansagathiya@gmail.com> commit 1f4869568a8adb450275257154ea3a26d03a30f3 Merge: 7acfd28a244af9
Author: Kishan Mohanbhai Sagathiya <kishansagathiya@gmail.com> Date: Wed Jan 23 07:18:56 2019 +0530 Merge branch 'master' into issue_450 License: MIT Signed-off-by: Kishan Mohanbhai Sagathiya <kishansagathiya@gmail.com> commit 7acfd282732ddf2282a67d4f9d0170a494eb3ed4 Author: Kishan Mohanbhai Sagathiya <kishansagathiya@gmail.com> Date: Tue Jan 22 18:14:32 2019 +0530 Support PinPath, UnpinPath(resolve before pinning) - RPC must always use serializable structs - In command, just use pin with path as cid is also a valid path - Addressing many other small review comments as in https://github.com/ipfs/ipfs-cluster/pull/634#pullrequestreview-192122534 License: MIT Signed-off-by: Kishan Mohanbhai Sagathiya <kishansagathiya@gmail.com> commit 36905041e1e3f0b204942030aab3ab7b5b9e4d62 Author: Kishan Mohanbhai Sagathiya <kishansagathiya@gmail.com> Date: Wed Jan 16 09:36:42 2019 +0530 Support PinPath, UnpinPath(resolve before pinning) Extra logic for path checking should go into resolve so that it can be properly reused Added sharness tests License: MIT Signed-off-by: Kishan Mohanbhai Sagathiya <kishansagathiya@gmail.com> commit 9116bda3534e77bb391d873051bb520a1b01a326 Author: Kishan Mohanbhai Sagathiya <kishansagathiya@gmail.com> Date: Wed Jan 16 08:08:07 2019 +0530 Support PinPath, UnpinPath(resolve before pinning) error strings should not be capitalized Fixes #450 License: MIT Signed-off-by: Kishan Mohanbhai Sagathiya <kishansagathiya@gmail.com> commit ca7e61861374f456300a85ddc0374e594f74f963 Author: Kishan Mohanbhai Sagathiya <kishansagathiya@gmail.com> Date: Tue Jan 15 23:40:25 2019 +0530 Support PinPath, UnpinPath(resolve before pinning) Tests Fixes #450 License: MIT Signed-off-by: Kishan Mohanbhai Sagathiya <kishansagathiya@gmail.com> commit 522fbcd899f01c01680375561a32a87464157c0a Merge: f1a56abf7bc468
Author: Kishan Mohanbhai Sagathiya <kishansagathiya@gmail.com> Date: Tue Jan 15 10:40:54 2019 +0530 Merge branch 'master' into issue_450 License: MIT Signed-off-by: Kishan Mohanbhai Sagathiya <kishansagathiya@gmail.com> commit f1a56ab925fb74c0c44273a4524afa4843cf757f Author: Kishan Mohanbhai Sagathiya <kishansagathiya@gmail.com> Date: Mon Jan 14 20:58:17 2019 +0530 Support PinPath, UnpinPath(resolve before pinning) - IPFS Connector should act as a pure IPFS client, any extra logic should go to cluster.go - Use cid.Undef, instead of cid.Cid{} Fixes #450 License: MIT Signed-off-by: Kishan Mohanbhai Sagathiya <kishansagathiya@gmail.com> commit c83b91054f6774f1f9d4930cfc3f1fa28236f57c Author: Kishan Mohanbhai Sagathiya <kishansagathiya@gmail.com> Date: Thu Jan 10 08:57:17 2019 +0530 Support PinPath, UnpinPath(resolve before pinning) - Separate handlers, methods and rpc apis for PinPath and UnpinPath from Pin and Unpin - Support ipld paths as well Fixes #450 License: MIT Signed-off-by: Kishan Mohanbhai Sagathiya <kishansagathiya@gmail.com> commit 719dff88129366ce3ccb5e04cb6f8082a0915c5c Merge: 91ceb47 21170c4 Author: Kishan Mohanbhai Sagathiya <kishansagathiya@gmail.com> Date: Wed Jan 9 19:38:35 2019 +0530 Merge branch 'issue_450_old' into HEAD License: MIT Signed-off-by: Kishan Mohanbhai Sagathiya <kishansagathiya@gmail.com> commit 91ceb4796259ca7ef2974ec43e6a278a12796b13 Author: Kishan Mohanbhai Sagathiya <kishansagathiya@gmail.com> Date: Wed Jan 9 19:36:41 2019 +0530 Revert "WIP: Figure out why test does not impleme" This reverts commit 28a3a3f25dce6f296c8cbef86221644c099a7e75. License: MIT Signed-off-by: Kishan Mohanbhai Sagathiya <kishansagathiya@gmail.com> commit 28a3a3f25dce6f296c8cbef86221644c099a7e75 Author: cd10012 <ced361@nyu.edu> Date: Tue Jul 24 23:23:10 2018 -0400 WIP: Figure out why test does not implement IPFSConnector interface... License: MIT Signed-off-by: cd10012 <ced361@nyu.edu> commit 21170c48e77e69583db64544b08120a9baf40d8d Author: Kishan Mohanbhai Sagathiya <kishansagathiya@gmail.com> Date: Tue Jan 8 10:37:59 2019 +0530 Support PinPath, UnpinPath (resolve before pinning) This commit adds API support for pinning using path `POST /pins/<ipfs or ipns path>` and `DELETE /pins/<ipfs or ipns path>` will resolve the path into a cid and perform perform pinning or unpinning Fixes #450 License: MIT Signed-off-by: Kishan Mohanbhai Sagathiya <kishansagathiya@gmail.com> Co-authored-by: Hector Sanjuan <hector@protocol.ai> License: MIT Signed-off-by: Hector Sanjuan <hector@protocol.ai>
1570 lines
43 KiB
Go
1570 lines
43 KiB
Go
package ipfscluster
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"mime/multipart"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/ipfs/ipfs-cluster/adder"
|
|
"github.com/ipfs/ipfs-cluster/adder/local"
|
|
"github.com/ipfs/ipfs-cluster/adder/sharding"
|
|
"github.com/ipfs/ipfs-cluster/api"
|
|
"github.com/ipfs/ipfs-cluster/pstoremgr"
|
|
"github.com/ipfs/ipfs-cluster/rpcutil"
|
|
"github.com/ipfs/ipfs-cluster/state"
|
|
"github.com/ipfs/ipfs-cluster/version"
|
|
|
|
ocgorpc "github.com/lanzafame/go-libp2p-ocgorpc"
|
|
|
|
"go.opencensus.io/trace"
|
|
|
|
cid "github.com/ipfs/go-cid"
|
|
rpc "github.com/libp2p/go-libp2p-gorpc"
|
|
host "github.com/libp2p/go-libp2p-host"
|
|
dht "github.com/libp2p/go-libp2p-kad-dht"
|
|
peer "github.com/libp2p/go-libp2p-peer"
|
|
routedhost "github.com/libp2p/go-libp2p/p2p/host/routed"
|
|
ma "github.com/multiformats/go-multiaddr"
|
|
)
|
|
|
|
// ReadyTimeout specifies the time before giving up
|
|
// during startup (waiting for consensus to be ready)
|
|
// It may need adjustment according to timeouts in the
|
|
// consensus layer.
|
|
var ReadyTimeout = 30 * time.Second
|
|
|
|
var pingMetricName = "ping"
|
|
|
|
// Cluster is the main IPFS cluster component. It provides
|
|
// the go-API for it and orchestrates the components that make up the system.
|
|
type Cluster struct {
|
|
ctx context.Context
|
|
cancel func()
|
|
|
|
id peer.ID
|
|
config *Config
|
|
host host.Host
|
|
dht *dht.IpfsDHT
|
|
rpcServer *rpc.Server
|
|
rpcClient *rpc.Client
|
|
peerManager *pstoremgr.Manager
|
|
|
|
consensus Consensus
|
|
apis []API
|
|
ipfs IPFSConnector
|
|
state state.State
|
|
tracker PinTracker
|
|
monitor PeerMonitor
|
|
allocator PinAllocator
|
|
informer Informer
|
|
tracer Tracer
|
|
|
|
doneCh chan struct{}
|
|
readyCh chan struct{}
|
|
readyB bool
|
|
wg sync.WaitGroup
|
|
|
|
// peerAdd
|
|
paMux sync.Mutex
|
|
|
|
// shutdown function and related variables
|
|
shutdownLock sync.Mutex
|
|
shutdownB bool
|
|
removed bool
|
|
}
|
|
|
|
// NewCluster builds a new IPFS Cluster peer. It initializes a LibP2P host,
|
|
// creates and RPC Server and client and sets up all components.
|
|
//
|
|
// The new cluster peer may still be performing initialization tasks when
|
|
// this call returns (consensus may still be bootstrapping). Use Cluster.Ready()
|
|
// if you need to wait until the peer is fully up.
|
|
func NewCluster(
|
|
host host.Host,
|
|
cfg *Config,
|
|
consensus Consensus,
|
|
apis []API,
|
|
ipfs IPFSConnector,
|
|
st state.State,
|
|
tracker PinTracker,
|
|
monitor PeerMonitor,
|
|
allocator PinAllocator,
|
|
informer Informer,
|
|
tracer Tracer,
|
|
) (*Cluster, error) {
|
|
err := cfg.Validate()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if host == nil {
|
|
return nil, errors.New("cluster host is nil")
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
listenAddrs := ""
|
|
for _, addr := range host.Addrs() {
|
|
listenAddrs += fmt.Sprintf(" %s/ipfs/%s\n", addr, host.ID().Pretty())
|
|
}
|
|
|
|
logger.Infof("IPFS Cluster v%s listening on:\n%s\n", version.Version, listenAddrs)
|
|
|
|
// Note, we already loaded peers from peerstore into the host
|
|
// in daemon.go.
|
|
peerManager := pstoremgr.New(host, cfg.GetPeerstorePath())
|
|
|
|
idht, err := dht.New(ctx, host)
|
|
if err != nil {
|
|
cancel()
|
|
return nil, err
|
|
}
|
|
|
|
// Let the DHT be maintained regularly
|
|
err = idht.Bootstrap(ctx)
|
|
if err != nil {
|
|
cancel()
|
|
return nil, err
|
|
}
|
|
|
|
rHost := routedhost.Wrap(host, idht)
|
|
|
|
c := &Cluster{
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
id: host.ID(),
|
|
config: cfg,
|
|
host: rHost,
|
|
dht: idht,
|
|
consensus: consensus,
|
|
apis: apis,
|
|
ipfs: ipfs,
|
|
state: st,
|
|
tracker: tracker,
|
|
monitor: monitor,
|
|
allocator: allocator,
|
|
informer: informer,
|
|
tracer: tracer,
|
|
peerManager: peerManager,
|
|
shutdownB: false,
|
|
removed: false,
|
|
doneCh: make(chan struct{}),
|
|
readyCh: make(chan struct{}),
|
|
readyB: false,
|
|
}
|
|
|
|
err = c.setupRPC()
|
|
if err != nil {
|
|
c.Shutdown(ctx)
|
|
return nil, err
|
|
}
|
|
c.setupRPCClients()
|
|
go func() {
|
|
c.ready(ReadyTimeout)
|
|
c.run()
|
|
}()
|
|
|
|
return c, nil
|
|
}
|
|
|
|
func (c *Cluster) setupRPC() error {
|
|
var rpcServer *rpc.Server
|
|
if c.config.Tracing {
|
|
sh := &ocgorpc.ServerHandler{}
|
|
rpcServer = rpc.NewServer(c.host, version.RPCProtocol, rpc.WithServerStatsHandler(sh))
|
|
} else {
|
|
rpcServer = rpc.NewServer(c.host, version.RPCProtocol)
|
|
}
|
|
err := rpcServer.RegisterName("Cluster", &RPCAPI{c})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
c.rpcServer = rpcServer
|
|
|
|
var rpcClient *rpc.Client
|
|
if c.config.Tracing {
|
|
csh := &ocgorpc.ClientHandler{}
|
|
rpcClient = rpc.NewClientWithServer(c.host, version.RPCProtocol, rpcServer, rpc.WithClientStatsHandler(csh))
|
|
} else {
|
|
rpcClient = rpc.NewClientWithServer(c.host, version.RPCProtocol, rpcServer)
|
|
}
|
|
c.rpcClient = rpcClient
|
|
return nil
|
|
}
|
|
|
|
func (c *Cluster) setupRPCClients() {
|
|
c.tracker.SetClient(c.rpcClient)
|
|
c.ipfs.SetClient(c.rpcClient)
|
|
for _, api := range c.apis {
|
|
api.SetClient(c.rpcClient)
|
|
}
|
|
c.consensus.SetClient(c.rpcClient)
|
|
c.monitor.SetClient(c.rpcClient)
|
|
c.allocator.SetClient(c.rpcClient)
|
|
c.informer.SetClient(c.rpcClient)
|
|
}
|
|
|
|
// syncWatcher loops and triggers StateSync and SyncAllLocal from time to time
|
|
func (c *Cluster) syncWatcher() {
|
|
ctx, span := trace.StartSpan(c.ctx, "cluster/syncWatcher")
|
|
defer span.End()
|
|
|
|
stateSyncTicker := time.NewTicker(c.config.StateSyncInterval)
|
|
syncTicker := time.NewTicker(c.config.IPFSSyncInterval)
|
|
|
|
for {
|
|
select {
|
|
case <-stateSyncTicker.C:
|
|
logger.Debug("auto-triggering StateSync()")
|
|
c.StateSync(ctx)
|
|
case <-syncTicker.C:
|
|
logger.Debug("auto-triggering SyncAllLocal()")
|
|
c.SyncAllLocal(ctx)
|
|
case <-c.ctx.Done():
|
|
stateSyncTicker.Stop()
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *Cluster) sendInformerMetric(ctx context.Context) (api.Metric, error) {
|
|
ctx, span := trace.StartSpan(ctx, "cluster/sendInformerMetric")
|
|
defer span.End()
|
|
|
|
metric := c.informer.GetMetric(ctx)
|
|
metric.Peer = c.id
|
|
return metric, c.monitor.PublishMetric(ctx, metric)
|
|
}
|
|
|
|
// pushInformerMetrics loops and publishes informers metrics using the
|
|
// cluster monitor. Metrics are pushed normally at a TTL/2 rate. If an error
|
|
// occurs, they are pushed at a TTL/4 rate.
|
|
func (c *Cluster) pushInformerMetrics(ctx context.Context) {
|
|
ctx, span := trace.StartSpan(ctx, "cluster/pushInformerMetrics")
|
|
defer span.End()
|
|
|
|
timer := time.NewTimer(0) // fire immediately first
|
|
|
|
// retries counts how many retries we have made
|
|
retries := 0
|
|
// retryWarnMod controls how often do we log
|
|
// "error broadcasting metric".
|
|
// It will do it in the first error, and then on every
|
|
// 10th.
|
|
retryWarnMod := 10
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-timer.C:
|
|
// wait
|
|
}
|
|
|
|
metric, err := c.sendInformerMetric(ctx)
|
|
|
|
if err != nil {
|
|
if (retries % retryWarnMod) == 0 {
|
|
logger.Errorf("error broadcasting metric: %s", err)
|
|
retries++
|
|
}
|
|
// retry sooner
|
|
timer.Reset(metric.GetTTL() / 4)
|
|
continue
|
|
}
|
|
|
|
retries = 0
|
|
// send metric again in TTL/2
|
|
timer.Reset(metric.GetTTL() / 2)
|
|
}
|
|
}
|
|
|
|
func (c *Cluster) pushPingMetrics(ctx context.Context) {
|
|
ctx, span := trace.StartSpan(ctx, "cluster/pushPingMetrics")
|
|
defer span.End()
|
|
|
|
ticker := time.NewTicker(c.config.MonitorPingInterval)
|
|
for {
|
|
metric := api.Metric{
|
|
Name: pingMetricName,
|
|
Peer: c.id,
|
|
Valid: true,
|
|
}
|
|
metric.SetTTL(c.config.MonitorPingInterval * 2)
|
|
c.monitor.PublishMetric(ctx, metric)
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
}
|
|
}
|
|
}
|
|
|
|
// read the alerts channel from the monitor and triggers repins
|
|
func (c *Cluster) alertsHandler() {
|
|
for {
|
|
select {
|
|
case <-c.ctx.Done():
|
|
return
|
|
case alrt := <-c.monitor.Alerts():
|
|
// only the leader handles alerts
|
|
leader, err := c.consensus.Leader(c.ctx)
|
|
if err == nil && leader == c.id {
|
|
logger.Warningf(
|
|
"Peer %s received alert for %s in %s",
|
|
c.id, alrt.MetricName, alrt.Peer,
|
|
)
|
|
switch alrt.MetricName {
|
|
case pingMetricName:
|
|
c.repinFromPeer(c.ctx, alrt.Peer)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// detects any changes in the peerset and saves the configuration. When it
|
|
// detects that we have been removed from the peerset, it shuts down this peer.
|
|
func (c *Cluster) watchPeers() {
|
|
ticker := time.NewTicker(c.config.PeerWatchInterval)
|
|
|
|
for {
|
|
select {
|
|
case <-c.ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
logger.Debugf("%s watching peers", c.id)
|
|
hasMe := false
|
|
peers, err := c.consensus.Peers(c.ctx)
|
|
if err != nil {
|
|
logger.Error(err)
|
|
continue
|
|
}
|
|
for _, p := range peers {
|
|
if p == c.id {
|
|
hasMe = true
|
|
break
|
|
}
|
|
}
|
|
|
|
if !hasMe {
|
|
c.shutdownLock.Lock()
|
|
defer c.shutdownLock.Unlock()
|
|
logger.Infof("%s: removed from raft. Initiating shutdown", c.id.Pretty())
|
|
c.removed = true
|
|
go c.Shutdown(c.ctx)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// find all Cids pinned to a given peer and triggers re-pins on them.
|
|
func (c *Cluster) repinFromPeer(ctx context.Context, p peer.ID) {
|
|
ctx, span := trace.StartSpan(ctx, "cluster/repinFromPeer")
|
|
defer span.End()
|
|
|
|
if c.config.DisableRepinning {
|
|
logger.Warningf("repinning is disabled. Will not re-allocate cids from %s", p.Pretty())
|
|
return
|
|
}
|
|
|
|
cState, err := c.consensus.State(ctx)
|
|
if err != nil {
|
|
logger.Warning(err)
|
|
return
|
|
}
|
|
list := cState.List(ctx)
|
|
for _, pin := range list {
|
|
if containsPeer(pin.Allocations, p) {
|
|
_, ok, err := c.pin(ctx, pin, []peer.ID{p}, []peer.ID{}) // pin blacklisting this peer
|
|
if ok && err == nil {
|
|
logger.Infof("repinned %s out of %s", pin.Cid, p.Pretty())
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// run launches some go-routines which live throughout the cluster's life
|
|
func (c *Cluster) run() {
|
|
go c.syncWatcher()
|
|
go c.pushPingMetrics(c.ctx)
|
|
go c.pushInformerMetrics(c.ctx)
|
|
go c.watchPeers()
|
|
go c.alertsHandler()
|
|
}
|
|
|
|
func (c *Cluster) ready(timeout time.Duration) {
|
|
ctx, span := trace.StartSpan(c.ctx, "cluster/ready")
|
|
defer span.End()
|
|
|
|
// We bootstrapped first because with dirty state consensus
|
|
// may have a peerset and not find a leader so we cannot wait
|
|
// for it.
|
|
timer := time.NewTimer(timeout)
|
|
select {
|
|
case <-timer.C:
|
|
logger.Error("***** ipfs-cluster consensus start timed out (tips below) *****")
|
|
logger.Error(`
|
|
**************************************************
|
|
This peer was not able to become part of the cluster.
|
|
This might be due to one or several causes:
|
|
- Check the logs above this message for errors
|
|
- Check that there is connectivity to the "peers" multiaddresses
|
|
- Check that all cluster peers are using the same "secret"
|
|
- Check that this peer is reachable on its "listen_multiaddress" by all peers
|
|
- Check that the current cluster is healthy (has a leader). Otherwise make
|
|
sure to start enough peers so that a leader election can happen.
|
|
- Check that the peer(s) you are trying to connect to is running the
|
|
same version of IPFS-cluster.
|
|
**************************************************
|
|
`)
|
|
c.Shutdown(ctx)
|
|
return
|
|
case <-c.consensus.Ready(ctx):
|
|
// Consensus ready means the state is up to date so we can sync
|
|
// it to the tracker. We ignore errors (normal when state
|
|
// doesn't exist in new peers).
|
|
c.StateSync(ctx)
|
|
case <-c.ctx.Done():
|
|
return
|
|
}
|
|
|
|
// Cluster is ready.
|
|
peers, err := c.consensus.Peers(ctx)
|
|
if err != nil {
|
|
logger.Error(err)
|
|
c.Shutdown(ctx)
|
|
return
|
|
}
|
|
|
|
logger.Info("Cluster Peers (without including ourselves):")
|
|
if len(peers) == 1 {
|
|
logger.Info(" - No other peers")
|
|
}
|
|
|
|
for _, p := range peers {
|
|
if p != c.id {
|
|
logger.Infof(" - %s", p.Pretty())
|
|
}
|
|
}
|
|
|
|
close(c.readyCh)
|
|
c.shutdownLock.Lock()
|
|
c.readyB = true
|
|
c.shutdownLock.Unlock()
|
|
logger.Info("** IPFS Cluster is READY **")
|
|
}
|
|
|
|
// Ready returns a channel which signals when this peer is
|
|
// fully initialized (including consensus).
|
|
func (c *Cluster) Ready() <-chan struct{} {
|
|
return c.readyCh
|
|
}
|
|
|
|
// Shutdown stops the IPFS cluster components
|
|
func (c *Cluster) Shutdown(ctx context.Context) error {
|
|
_, span := trace.StartSpan(ctx, "cluster/Shutdown")
|
|
defer span.End()
|
|
ctx = trace.NewContext(c.ctx, span)
|
|
|
|
c.shutdownLock.Lock()
|
|
defer c.shutdownLock.Unlock()
|
|
|
|
if c.shutdownB {
|
|
logger.Debug("Cluster is already shutdown")
|
|
return nil
|
|
}
|
|
|
|
logger.Info("shutting down Cluster")
|
|
|
|
// Try to store peerset file for all known peers whatsoever
|
|
// if we got ready (otherwise, don't overwrite anything)
|
|
if c.readyB {
|
|
c.peerManager.SavePeerstoreForPeers(c.host.Peerstore().Peers())
|
|
}
|
|
|
|
// Only attempt to leave if:
|
|
// - consensus is initialized
|
|
// - cluster was ready (no bootstrapping error)
|
|
// - We are not removed already (means watchPeers() called us)
|
|
if c.consensus != nil && c.config.LeaveOnShutdown && c.readyB && !c.removed {
|
|
c.removed = true
|
|
_, err := c.consensus.Peers(ctx)
|
|
if err == nil {
|
|
// best effort
|
|
logger.Warning("attempting to leave the cluster. This may take some seconds")
|
|
err := c.consensus.RmPeer(ctx, c.id)
|
|
if err != nil {
|
|
logger.Error("leaving cluster: " + err.Error())
|
|
}
|
|
}
|
|
}
|
|
|
|
if con := c.consensus; con != nil {
|
|
if err := con.Shutdown(ctx); err != nil {
|
|
logger.Errorf("error stopping consensus: %s", err)
|
|
return err
|
|
}
|
|
}
|
|
|
|
// We left the cluster or were removed. Destroy the Raft state.
|
|
if c.removed && c.readyB {
|
|
err := c.consensus.Clean(ctx)
|
|
if err != nil {
|
|
logger.Error("cleaning consensus: ", err)
|
|
}
|
|
}
|
|
|
|
if err := c.monitor.Shutdown(ctx); err != nil {
|
|
logger.Errorf("error stopping monitor: %s", err)
|
|
return err
|
|
}
|
|
|
|
for _, api := range c.apis {
|
|
if err := api.Shutdown(ctx); err != nil {
|
|
logger.Errorf("error stopping API: %s", err)
|
|
return err
|
|
}
|
|
}
|
|
|
|
if err := c.ipfs.Shutdown(ctx); err != nil {
|
|
logger.Errorf("error stopping IPFS Connector: %s", err)
|
|
return err
|
|
}
|
|
|
|
if err := c.tracker.Shutdown(ctx); err != nil {
|
|
logger.Errorf("error stopping PinTracker: %s", err)
|
|
return err
|
|
}
|
|
|
|
if err := c.tracer.Shutdown(ctx); err != nil {
|
|
logger.Errorf("error stopping Tracer: %s", err)
|
|
return err
|
|
}
|
|
|
|
c.cancel()
|
|
c.host.Close() // Shutdown all network services
|
|
c.wg.Wait()
|
|
c.shutdownB = true
|
|
close(c.doneCh)
|
|
return nil
|
|
}
|
|
|
|
// Done provides a way to learn if the Peer has been shutdown
|
|
// (for example, because it has been removed from the Cluster)
|
|
func (c *Cluster) Done() <-chan struct{} {
|
|
return c.doneCh
|
|
}
|
|
|
|
// ID returns information about the Cluster peer
|
|
func (c *Cluster) ID(ctx context.Context) api.ID {
|
|
_, span := trace.StartSpan(ctx, "cluster/ID")
|
|
defer span.End()
|
|
ctx = trace.NewContext(c.ctx, span)
|
|
|
|
// ignore error since it is included in response object
|
|
ipfsID, _ := c.ipfs.ID(ctx)
|
|
var addrs []ma.Multiaddr
|
|
|
|
addrsSet := make(map[string]struct{}) // to filter dups
|
|
for _, addr := range c.host.Addrs() {
|
|
addrsSet[addr.String()] = struct{}{}
|
|
}
|
|
for k := range addrsSet {
|
|
addr, _ := ma.NewMultiaddr(k)
|
|
addrs = append(addrs, api.MustLibp2pMultiaddrJoin(addr, c.id))
|
|
}
|
|
|
|
peers := []peer.ID{}
|
|
// This method might get called very early by a remote peer
|
|
// and might catch us when consensus is not set
|
|
if c.consensus != nil {
|
|
peers, _ = c.consensus.Peers(ctx)
|
|
}
|
|
|
|
return api.ID{
|
|
ID: c.id,
|
|
//PublicKey: c.host.Peerstore().PubKey(c.id),
|
|
Addresses: addrs,
|
|
ClusterPeers: peers,
|
|
ClusterPeersAddresses: c.peerManager.PeersAddresses(peers),
|
|
Version: version.Version.String(),
|
|
RPCProtocolVersion: version.RPCProtocol,
|
|
IPFS: ipfsID,
|
|
Peername: c.config.Peername,
|
|
}
|
|
}
|
|
|
|
// PeerAdd adds a new peer to this Cluster.
|
|
//
|
|
// For it to work well, the new peer should be discoverable
|
|
// (part of our peerstore or connected to one of the existing peers)
|
|
// and reachable. Since PeerAdd allows to add peers which are
|
|
// not running, or reachable, it is recommended to call Join() from the
|
|
// new peer instead.
|
|
//
|
|
// The new peer ID will be passed to the consensus
|
|
// component to be added to the peerset.
|
|
func (c *Cluster) PeerAdd(ctx context.Context, pid peer.ID) (api.ID, error) {
|
|
_, span := trace.StartSpan(ctx, "cluster/PeerAdd")
|
|
defer span.End()
|
|
ctx = trace.NewContext(c.ctx, span)
|
|
|
|
// starting 10 nodes on the same box for testing
|
|
// causes deadlock and a global lock here
|
|
// seems to help.
|
|
c.paMux.Lock()
|
|
defer c.paMux.Unlock()
|
|
logger.Debugf("peerAdd called with %s", pid.Pretty())
|
|
|
|
// Log the new peer in the log so everyone gets it.
|
|
err := c.consensus.AddPeer(ctx, pid)
|
|
if err != nil {
|
|
logger.Error(err)
|
|
id := api.ID{ID: pid, Error: err.Error()}
|
|
return id, err
|
|
}
|
|
|
|
// Ask the new peer to connect its IPFS daemon to the rest
|
|
err = c.rpcClient.CallContext(
|
|
ctx,
|
|
pid,
|
|
"Cluster",
|
|
"IPFSConnectSwarms",
|
|
struct{}{},
|
|
&struct{}{},
|
|
)
|
|
if err != nil {
|
|
logger.Error(err)
|
|
}
|
|
|
|
id := api.ID{}
|
|
|
|
// wait up to 2 seconds for new peer to catch up
|
|
// and return an up to date api.ID object.
|
|
// otherwise it might not contain the current cluster peers
|
|
// as it should.
|
|
for i := 0; i < 20; i++ {
|
|
id, _ = c.getIDForPeer(ctx, pid)
|
|
ownPeers, err := c.consensus.Peers(ctx)
|
|
if err != nil {
|
|
break
|
|
}
|
|
newNodePeers := id.ClusterPeers
|
|
added, removed := diffPeers(ownPeers, newNodePeers)
|
|
if len(added) == 0 && len(removed) == 0 && containsPeer(ownPeers, pid) {
|
|
break // the new peer has fully joined
|
|
}
|
|
time.Sleep(200 * time.Millisecond)
|
|
logger.Debugf("%s addPeer: retrying to get ID from %s",
|
|
c.id.Pretty(), pid.Pretty())
|
|
}
|
|
logger.Info("Peer added ", pid.Pretty())
|
|
return id, nil
|
|
}
|
|
|
|
// PeerRemove removes a peer from this Cluster.
|
|
//
|
|
// The peer will be removed from the consensus peerset.
|
|
// This may first trigger repinnings for all content if not disabled.
|
|
func (c *Cluster) PeerRemove(ctx context.Context, pid peer.ID) error {
|
|
_, span := trace.StartSpan(ctx, "cluster/PeerRemove")
|
|
defer span.End()
|
|
ctx = trace.NewContext(c.ctx, span)
|
|
|
|
// We need to repin before removing the peer, otherwise, it won't
|
|
// be able to submit the pins.
|
|
logger.Infof("re-allocating all CIDs directly associated to %s", pid)
|
|
c.repinFromPeer(ctx, pid)
|
|
|
|
err := c.consensus.RmPeer(ctx, pid)
|
|
if err != nil {
|
|
logger.Error(err)
|
|
return err
|
|
}
|
|
logger.Info("Peer removed ", pid.Pretty())
|
|
return nil
|
|
}
|
|
|
|
// Join adds this peer to an existing cluster by bootstrapping to a
|
|
// given multiaddress. It works by calling PeerAdd on the destination
|
|
// cluster and making sure that the new peer is ready to discover and contact
|
|
// the rest.
|
|
func (c *Cluster) Join(ctx context.Context, addr ma.Multiaddr) error {
|
|
_, span := trace.StartSpan(ctx, "cluster/Join")
|
|
defer span.End()
|
|
ctx = trace.NewContext(c.ctx, span)
|
|
|
|
logger.Debugf("Join(%s)", addr)
|
|
|
|
pid, _, err := api.Libp2pMultiaddrSplit(addr)
|
|
if err != nil {
|
|
logger.Error(err)
|
|
return err
|
|
}
|
|
|
|
// Bootstrap to myself
|
|
if pid == c.id {
|
|
return nil
|
|
}
|
|
|
|
// Add peer to peerstore so we can talk to it
|
|
c.peerManager.ImportPeer(addr, true)
|
|
|
|
// Note that PeerAdd() on the remote peer will
|
|
// figure out what our real address is (obviously not
|
|
// ListenAddr).
|
|
var myID api.IDSerial
|
|
err = c.rpcClient.CallContext(
|
|
ctx,
|
|
pid,
|
|
"Cluster",
|
|
"PeerAdd",
|
|
peer.IDB58Encode(c.id),
|
|
&myID,
|
|
)
|
|
if err != nil {
|
|
logger.Error(err)
|
|
return err
|
|
}
|
|
|
|
// We need to trigger a DHT bootstrap asap for this peer to not be
|
|
// lost if the peer it bootstrapped to goes down. We do this manually
|
|
// by triggering 1 round of bootstrap in the background.
|
|
// Note that our regular bootstrap process is still running in the
|
|
// background since we created the cluster.
|
|
go func() {
|
|
c.dht.BootstrapOnce(c.ctx, dht.DefaultBootstrapConfig)
|
|
}()
|
|
|
|
// wait for leader and for state to catch up
|
|
// then sync
|
|
err = c.consensus.WaitForSync(ctx)
|
|
if err != nil {
|
|
logger.Error(err)
|
|
return err
|
|
}
|
|
|
|
c.StateSync(ctx)
|
|
|
|
logger.Infof("%s: joined %s's cluster", c.id.Pretty(), pid.Pretty())
|
|
return nil
|
|
}
|
|
|
|
// StateSync syncs the consensus state to the Pin Tracker, ensuring
|
|
// that every Cid in the shared state is tracked and that the Pin Tracker
|
|
// is not tracking more Cids than it should.
|
|
func (c *Cluster) StateSync(ctx context.Context) error {
|
|
_, span := trace.StartSpan(ctx, "cluster/StateSync")
|
|
defer span.End()
|
|
ctx = trace.NewContext(c.ctx, span)
|
|
|
|
cState, err := c.consensus.State(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
logger.Debug("syncing state to tracker")
|
|
clusterPins := cState.List(ctx)
|
|
|
|
trackedPins := c.tracker.StatusAll(ctx)
|
|
trackedPinsMap := make(map[string]int)
|
|
for i, tpin := range trackedPins {
|
|
trackedPinsMap[tpin.Cid.String()] = i
|
|
}
|
|
|
|
// Track items which are not tracked
|
|
for _, pin := range clusterPins {
|
|
_, tracked := trackedPinsMap[pin.Cid.String()]
|
|
if !tracked {
|
|
logger.Debugf("StateSync: tracking %s, part of the shared state", pin.Cid)
|
|
c.tracker.Track(ctx, pin)
|
|
}
|
|
}
|
|
|
|
// a. Untrack items which should not be tracked
|
|
// b. Track items which should not be remote as local
|
|
// c. Track items which should not be local as remote
|
|
for _, p := range trackedPins {
|
|
pCid := p.Cid
|
|
currentPin, has := cState.Get(ctx, pCid)
|
|
allocatedHere := containsPeer(currentPin.Allocations, c.id) || currentPin.ReplicationFactorMin == -1
|
|
|
|
switch {
|
|
case !has:
|
|
logger.Debugf("StateSync: Untracking %s, is not part of shared state", pCid)
|
|
c.tracker.Untrack(ctx, pCid)
|
|
case p.Status == api.TrackerStatusRemote && allocatedHere:
|
|
logger.Debugf("StateSync: Tracking %s locally (currently remote)", pCid)
|
|
c.tracker.Track(ctx, currentPin)
|
|
case p.Status == api.TrackerStatusPinned && !allocatedHere:
|
|
logger.Debugf("StateSync: Tracking %s as remote (currently local)", pCid)
|
|
c.tracker.Track(ctx, currentPin)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// StatusAll returns the GlobalPinInfo for all tracked Cids in all peers.
|
|
// If an error happens, the slice will contain as much information as
|
|
// could be fetched from other peers.
|
|
func (c *Cluster) StatusAll(ctx context.Context) ([]api.GlobalPinInfo, error) {
|
|
_, span := trace.StartSpan(ctx, "cluster/StatusAll")
|
|
defer span.End()
|
|
ctx = trace.NewContext(c.ctx, span)
|
|
|
|
return c.globalPinInfoSlice(ctx, "TrackerStatusAll")
|
|
}
|
|
|
|
// StatusAllLocal returns the PinInfo for all the tracked Cids in this peer.
|
|
func (c *Cluster) StatusAllLocal(ctx context.Context) []api.PinInfo {
|
|
_, span := trace.StartSpan(ctx, "cluster/StatusAllLocal")
|
|
defer span.End()
|
|
ctx = trace.NewContext(c.ctx, span)
|
|
|
|
return c.tracker.StatusAll(ctx)
|
|
}
|
|
|
|
// Status returns the GlobalPinInfo for a given Cid as fetched from all
|
|
// current peers. If an error happens, the GlobalPinInfo should contain
|
|
// as much information as could be fetched from the other peers.
|
|
func (c *Cluster) Status(ctx context.Context, h cid.Cid) (api.GlobalPinInfo, error) {
|
|
_, span := trace.StartSpan(ctx, "cluster/Status")
|
|
defer span.End()
|
|
ctx = trace.NewContext(c.ctx, span)
|
|
|
|
return c.globalPinInfoCid(ctx, "TrackerStatus", h)
|
|
}
|
|
|
|
// StatusLocal returns this peer's PinInfo for a given Cid.
|
|
func (c *Cluster) StatusLocal(ctx context.Context, h cid.Cid) api.PinInfo {
|
|
_, span := trace.StartSpan(ctx, "cluster/StatusLocal")
|
|
defer span.End()
|
|
ctx = trace.NewContext(c.ctx, span)
|
|
|
|
return c.tracker.Status(ctx, h)
|
|
}
|
|
|
|
// SyncAll triggers SyncAllLocal() operations in all cluster peers, making sure
|
|
// that the state of tracked items matches the state reported by the IPFS daemon
|
|
// and returning the results as GlobalPinInfo. If an error happens, the slice
|
|
// will contain as much information as could be fetched from the peers.
|
|
func (c *Cluster) SyncAll(ctx context.Context) ([]api.GlobalPinInfo, error) {
|
|
_, span := trace.StartSpan(ctx, "cluster/SyncAll")
|
|
defer span.End()
|
|
ctx = trace.NewContext(c.ctx, span)
|
|
|
|
return c.globalPinInfoSlice(ctx, "SyncAllLocal")
|
|
}
|
|
|
|
// SyncAllLocal makes sure that the current state for all tracked items
|
|
// in this peer matches the state reported by the IPFS daemon.
|
|
//
|
|
// SyncAllLocal returns the list of PinInfo that where updated because of
|
|
// the operation, along with those in error states.
|
|
func (c *Cluster) SyncAllLocal(ctx context.Context) ([]api.PinInfo, error) {
|
|
_, span := trace.StartSpan(ctx, "cluster/SyncAllLocal")
|
|
defer span.End()
|
|
ctx = trace.NewContext(c.ctx, span)
|
|
|
|
syncedItems, err := c.tracker.SyncAll(ctx)
|
|
// Despite errors, tracker provides synced items that we can provide.
|
|
// They encapsulate the error.
|
|
if err != nil {
|
|
logger.Error("tracker.Sync() returned with error: ", err)
|
|
logger.Error("Is the ipfs daemon running?")
|
|
}
|
|
return syncedItems, err
|
|
}
|
|
|
|
// Sync triggers a SyncLocal() operation for a given Cid.
|
|
// in all cluster peers.
|
|
func (c *Cluster) Sync(ctx context.Context, h cid.Cid) (api.GlobalPinInfo, error) {
|
|
_, span := trace.StartSpan(ctx, "cluster/Sync")
|
|
defer span.End()
|
|
ctx = trace.NewContext(c.ctx, span)
|
|
|
|
return c.globalPinInfoCid(ctx, "SyncLocal", h)
|
|
}
|
|
|
|
// used for RecoverLocal and SyncLocal.
|
|
func (c *Cluster) localPinInfoOp(
|
|
ctx context.Context,
|
|
h cid.Cid,
|
|
f func(context.Context, cid.Cid) (api.PinInfo, error),
|
|
) (pInfo api.PinInfo, err error) {
|
|
ctx, span := trace.StartSpan(ctx, "cluster/localPinInfoOp")
|
|
defer span.End()
|
|
|
|
cids, err := c.cidsFromMetaPin(ctx, h)
|
|
if err != nil {
|
|
return api.PinInfo{}, err
|
|
}
|
|
|
|
for _, ci := range cids {
|
|
pInfo, err = f(ctx, ci)
|
|
if err != nil {
|
|
logger.Error("tracker.SyncCid() returned with error: ", err)
|
|
logger.Error("Is the ipfs daemon running?")
|
|
break
|
|
}
|
|
}
|
|
// return the last pInfo/err, should be the root Cid if everything ok
|
|
return pInfo, err
|
|
|
|
}
|
|
|
|
// SyncLocal performs a local sync operation for the given Cid. This will
|
|
// tell the tracker to verify the status of the Cid against the IPFS daemon.
|
|
// It returns the updated PinInfo for the Cid.
|
|
func (c *Cluster) SyncLocal(ctx context.Context, h cid.Cid) (pInfo api.PinInfo, err error) {
|
|
_, span := trace.StartSpan(ctx, "cluster/SyncLocal")
|
|
defer span.End()
|
|
ctx = trace.NewContext(c.ctx, span)
|
|
|
|
return c.localPinInfoOp(ctx, h, c.tracker.Sync)
|
|
}
|
|
|
|
// RecoverAllLocal triggers a RecoverLocal operation for all Cids tracked
|
|
// by this peer.
|
|
func (c *Cluster) RecoverAllLocal(ctx context.Context) ([]api.PinInfo, error) {
|
|
_, span := trace.StartSpan(ctx, "cluster/RecoverAllLocal")
|
|
defer span.End()
|
|
ctx = trace.NewContext(c.ctx, span)
|
|
|
|
return c.tracker.RecoverAll(ctx)
|
|
}
|
|
|
|
// Recover triggers a recover operation for a given Cid in all
|
|
// cluster peers.
|
|
func (c *Cluster) Recover(ctx context.Context, h cid.Cid) (api.GlobalPinInfo, error) {
|
|
_, span := trace.StartSpan(ctx, "cluster/Recover")
|
|
defer span.End()
|
|
ctx = trace.NewContext(c.ctx, span)
|
|
|
|
return c.globalPinInfoCid(ctx, "TrackerRecover", h)
|
|
}
|
|
|
|
// RecoverLocal triggers a recover operation for a given Cid in this peer only.
|
|
// It returns the updated PinInfo, after recovery.
|
|
func (c *Cluster) RecoverLocal(ctx context.Context, h cid.Cid) (pInfo api.PinInfo, err error) {
|
|
_, span := trace.StartSpan(ctx, "cluster/RecoverLocal")
|
|
defer span.End()
|
|
ctx = trace.NewContext(c.ctx, span)
|
|
|
|
return c.localPinInfoOp(ctx, h, c.tracker.Recover)
|
|
}
|
|
|
|
// Pins returns the list of Cids managed by Cluster and which are part
|
|
// of the current global state. This is the source of truth as to which
|
|
// pins are managed and their allocation, but does not indicate if
|
|
// the item is successfully pinned. For that, use StatusAll().
|
|
func (c *Cluster) Pins(ctx context.Context) []api.Pin {
|
|
_, span := trace.StartSpan(ctx, "cluster/Pins")
|
|
defer span.End()
|
|
ctx = trace.NewContext(c.ctx, span)
|
|
|
|
cState, err := c.consensus.State(ctx)
|
|
if err != nil {
|
|
logger.Error(err)
|
|
return []api.Pin{}
|
|
}
|
|
return cState.List(ctx)
|
|
}
|
|
|
|
// PinGet returns information for a single Cid managed by Cluster.
|
|
// The information is obtained from the current global state. The
|
|
// returned api.Pin provides information about the allocations
|
|
// assigned for the requested Cid, but does not indicate if
|
|
// the item is successfully pinned. For that, use Status(). PinGet
|
|
// returns an error if the given Cid is not part of the global state.
|
|
func (c *Cluster) PinGet(ctx context.Context, h cid.Cid) (api.Pin, error) {
|
|
_, span := trace.StartSpan(ctx, "cluster/PinGet")
|
|
defer span.End()
|
|
ctx = trace.NewContext(c.ctx, span)
|
|
|
|
st, err := c.consensus.State(ctx)
|
|
if err != nil {
|
|
return api.PinCid(h), err
|
|
}
|
|
pin, ok := st.Get(ctx, h)
|
|
if !ok {
|
|
return pin, errors.New("cid is not part of the global state")
|
|
}
|
|
return pin, nil
|
|
}
|
|
|
|
// Pin makes the cluster Pin a Cid. This implies adding the Cid
|
|
// to the IPFS Cluster peers shared-state. Depending on the cluster
|
|
// pinning strategy, the PinTracker may then request the IPFS daemon
|
|
// to pin the Cid.
|
|
//
|
|
// Pin returns an error if the operation could not be persisted
|
|
// to the global state. Pin does not reflect the success or failure
|
|
// of underlying IPFS daemon pinning operations.
|
|
//
|
|
// If the argument's allocations are non-empty then these peers are pinned with
|
|
// priority over other peers in the cluster. If the max repl factor is less
|
|
// than the size of the specified peerset then peers are chosen from this set
|
|
// in allocation order. If the min repl factor is greater than the size of
|
|
// this set then the remaining peers are allocated in order from the rest of
|
|
// the cluster. Priority allocations are best effort. If any priority peers
|
|
// are unavailable then Pin will simply allocate from the rest of the cluster.
|
|
func (c *Cluster) Pin(ctx context.Context, pin api.Pin) error {
|
|
_, span := trace.StartSpan(ctx, "cluster/Pin")
|
|
defer span.End()
|
|
ctx = trace.NewContext(c.ctx, span)
|
|
_, _, err := c.pin(ctx, pin, []peer.ID{}, pin.Allocations)
|
|
return err
|
|
}
|
|
|
|
// sets the default replication factor in a pin when it's set to 0
|
|
func (c *Cluster) setupReplicationFactor(pin *api.Pin) error {
|
|
rplMin := pin.ReplicationFactorMin
|
|
rplMax := pin.ReplicationFactorMax
|
|
if rplMin == 0 {
|
|
rplMin = c.config.ReplicationFactorMin
|
|
pin.ReplicationFactorMin = rplMin
|
|
}
|
|
if rplMax == 0 {
|
|
rplMax = c.config.ReplicationFactorMax
|
|
pin.ReplicationFactorMax = rplMax
|
|
}
|
|
|
|
return isReplicationFactorValid(rplMin, rplMax)
|
|
}
|
|
|
|
// basic checks on the pin type to check it's well-formed.
|
|
func checkPinType(pin *api.Pin) error {
|
|
switch pin.Type {
|
|
case api.DataType:
|
|
if pin.Reference != cid.Undef {
|
|
return errors.New("data pins should not reference other pins")
|
|
}
|
|
case api.ShardType:
|
|
if pin.MaxDepth != 1 {
|
|
return errors.New("must pin shards go depth 1")
|
|
}
|
|
// FIXME: indirect shard pins could have max-depth 2
|
|
// FIXME: repinning a shard type will overwrite replication
|
|
// factor from previous:
|
|
// if existing.ReplicationFactorMin != rplMin ||
|
|
// existing.ReplicationFactorMax != rplMax {
|
|
// return errors.New("shard update with wrong repl factors")
|
|
//}
|
|
case api.ClusterDAGType:
|
|
if pin.MaxDepth != 0 {
|
|
return errors.New("must pin roots directly")
|
|
}
|
|
if pin.Reference == cid.Undef {
|
|
return errors.New("clusterDAG pins should reference a Meta pin")
|
|
}
|
|
case api.MetaType:
|
|
if pin.Allocations != nil && len(pin.Allocations) != 0 {
|
|
return errors.New("meta pin should not specify allocations")
|
|
}
|
|
if pin.Reference == cid.Undef {
|
|
return errors.New("metaPins should reference a ClusterDAG")
|
|
}
|
|
|
|
default:
|
|
return errors.New("unrecognized pin type")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// setupPin ensures that the Pin object is fit for pinning. We check
|
|
// and set the replication factors and ensure that the pinType matches the
|
|
// metadata consistently.
|
|
func (c *Cluster) setupPin(ctx context.Context, pin *api.Pin) error {
|
|
ctx, span := trace.StartSpan(ctx, "cluster/setupPin")
|
|
defer span.End()
|
|
|
|
err := c.setupReplicationFactor(pin)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
existing, err := c.PinGet(ctx, pin.Cid)
|
|
if err == nil && existing.Type != pin.Type { // it exists
|
|
return errors.New("cannot repin CID with different tracking method, clear state with pin rm to proceed")
|
|
}
|
|
return checkPinType(pin)
|
|
}
|
|
|
|
// pin performs the actual pinning and supports a blacklist to be
|
|
// able to evacuate a node and returns the pin object that it tried to pin, whether the pin was submitted
|
|
// to the consensus layer or skipped (due to error or to the fact
|
|
// that it was already valid) and errror.
|
|
func (c *Cluster) pin(ctx context.Context, pin api.Pin, blacklist []peer.ID, prioritylist []peer.ID) (api.Pin, bool, error) {
|
|
ctx, span := trace.StartSpan(ctx, "cluster/pin")
|
|
defer span.End()
|
|
|
|
if pin.Cid == cid.Undef {
|
|
return pin, false, errors.New("bad pin object")
|
|
}
|
|
|
|
// setup pin might produce some side-effects to our pin
|
|
err := c.setupPin(ctx, &pin)
|
|
if err != nil {
|
|
return pin, false, err
|
|
}
|
|
if pin.Type == api.MetaType {
|
|
return pin, true, c.consensus.LogPin(ctx, pin)
|
|
}
|
|
|
|
allocs, err := c.allocate(
|
|
ctx,
|
|
pin.Cid,
|
|
pin.ReplicationFactorMin,
|
|
pin.ReplicationFactorMax,
|
|
blacklist,
|
|
prioritylist,
|
|
)
|
|
if err != nil {
|
|
return pin, false, err
|
|
}
|
|
pin.Allocations = allocs
|
|
|
|
if curr, _ := c.PinGet(ctx, pin.Cid); curr.Equals(pin) {
|
|
// skip pinning
|
|
logger.Debugf("pinning %s skipped: already correctly allocated", pin.Cid)
|
|
return pin, false, nil
|
|
}
|
|
|
|
if len(pin.Allocations) == 0 {
|
|
logger.Infof("IPFS cluster pinning %s everywhere:", pin.Cid)
|
|
} else {
|
|
logger.Infof("IPFS cluster pinning %s on %s:", pin.Cid, pin.Allocations)
|
|
}
|
|
|
|
return pin, true, c.consensus.LogPin(ctx, pin)
|
|
}
|
|
|
|
func (c *Cluster) unpin(ctx context.Context, h cid.Cid) (api.Pin, error) {
|
|
_, span := trace.StartSpan(ctx, "cluster/unpin")
|
|
defer span.End()
|
|
ctx = trace.NewContext(c.ctx, span)
|
|
|
|
logger.Info("IPFS cluster unpinning:", h)
|
|
pin, err := c.PinGet(ctx, h)
|
|
if err != nil {
|
|
return pin, fmt.Errorf("cannot unpin pin uncommitted to state: %s", err)
|
|
}
|
|
|
|
switch pin.Type {
|
|
case api.DataType:
|
|
return pin, c.consensus.LogUnpin(ctx, pin)
|
|
case api.ShardType:
|
|
err := "cannot unpin a shard direclty. Unpin content root CID instead."
|
|
return pin, errors.New(err)
|
|
case api.MetaType:
|
|
// Unpin cluster dag and referenced shards
|
|
err := c.unpinClusterDag(pin)
|
|
if err != nil {
|
|
return pin, err
|
|
}
|
|
return pin, c.consensus.LogUnpin(ctx, pin)
|
|
case api.ClusterDAGType:
|
|
err := "cannot unpin a Cluster DAG directly. Unpin content root CID instead."
|
|
return pin, errors.New(err)
|
|
default:
|
|
return pin, errors.New("unrecognized pin type")
|
|
}
|
|
}
|
|
|
|
// Unpin makes the cluster Unpin a Cid. This implies adding the Cid
|
|
// to the IPFS Cluster peers shared-state.
|
|
//
|
|
// Unpin returns an error if the operation could not be persisted
|
|
// to the global state. Unpin does not reflect the success or failure
|
|
// of underlying IPFS daemon unpinning operations.
|
|
func (c *Cluster) Unpin(ctx context.Context, h cid.Cid) error {
|
|
_, span := trace.StartSpan(ctx, "cluster/Unpin")
|
|
defer span.End()
|
|
ctx = trace.NewContext(c.ctx, span)
|
|
_, err := c.unpin(ctx, h)
|
|
return err
|
|
}
|
|
|
|
// unpinClusterDag unpins the clusterDAG metadata node and the shard metadata
|
|
// nodes that it references. It handles the case where multiple parents
|
|
// reference the same metadata node, only unpinning those nodes without
|
|
// existing references
|
|
func (c *Cluster) unpinClusterDag(metaPin api.Pin) error {
|
|
ctx, span := trace.StartSpan(c.ctx, "cluster/unpinClusterDag")
|
|
defer span.End()
|
|
|
|
cids, err := c.cidsFromMetaPin(ctx, metaPin.Cid)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// TODO: FIXME: potentially unpinning shards which are referenced
|
|
// by other clusterDAGs.
|
|
for _, ci := range cids {
|
|
err = c.consensus.LogUnpin(ctx, api.PinCid(ci))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// PinPath pins an CID resolved from its IPFS Path. It returns the resolved
|
|
// Pin object.
|
|
func (c *Cluster) PinPath(ctx context.Context, path api.PinPath) (api.Pin, error) {
|
|
_, span := trace.StartSpan(ctx, "cluster/PinPath")
|
|
defer span.End()
|
|
|
|
ctx = trace.NewContext(c.ctx, span)
|
|
ci, err := c.ipfs.Resolve(ctx, path.Path)
|
|
if err != nil {
|
|
return api.Pin{}, err
|
|
}
|
|
|
|
p := api.PinCid(ci)
|
|
p.PinOptions = path.PinOptions
|
|
p, _, err = c.pin(ctx, p, []peer.ID{}, p.Allocations)
|
|
return p, err
|
|
}
|
|
|
|
// UnpinPath unpins a CID resolved from its IPFS Path. If returns the
|
|
// previously pinned Pin object.
|
|
func (c *Cluster) UnpinPath(ctx context.Context, path string) (api.Pin, error) {
|
|
_, span := trace.StartSpan(ctx, "cluster/UnpinPath")
|
|
defer span.End()
|
|
|
|
ctx = trace.NewContext(c.ctx, span)
|
|
ci, err := c.ipfs.Resolve(ctx, path)
|
|
if err != nil {
|
|
return api.Pin{}, err
|
|
}
|
|
|
|
return c.unpin(ctx, ci)
|
|
}
|
|
|
|
// AddFile adds a file to the ipfs daemons of the cluster. The ipfs importer
|
|
// pipeline is used to DAGify the file. Depending on input parameters this
|
|
// DAG can be added locally to the calling cluster peer's ipfs repo, or
|
|
// sharded across the entire cluster.
|
|
func (c *Cluster) AddFile(reader *multipart.Reader, params *api.AddParams) (cid.Cid, error) {
|
|
// TODO: add context param and tracing
|
|
var dags adder.ClusterDAGService
|
|
if params.Shard {
|
|
dags = sharding.New(c.rpcClient, params.PinOptions, nil)
|
|
} else {
|
|
dags = local.New(c.rpcClient, params.PinOptions)
|
|
}
|
|
add := adder.New(dags, params, nil)
|
|
return add.FromMultipart(c.ctx, reader)
|
|
}
|
|
|
|
// Version returns the current IPFS Cluster version.
|
|
func (c *Cluster) Version() string {
|
|
return version.Version.String()
|
|
}
|
|
|
|
// Peers returns the IDs of the members of this Cluster.
|
|
func (c *Cluster) Peers(ctx context.Context) []api.ID {
|
|
_, span := trace.StartSpan(ctx, "cluster/Peers")
|
|
defer span.End()
|
|
ctx = trace.NewContext(c.ctx, span)
|
|
|
|
members, err := c.consensus.Peers(ctx)
|
|
if err != nil {
|
|
logger.Error(err)
|
|
logger.Error("an empty list of peers will be returned")
|
|
return []api.ID{}
|
|
}
|
|
lenMembers := len(members)
|
|
|
|
peersSerial := make([]api.IDSerial, lenMembers, lenMembers)
|
|
peers := make([]api.ID, lenMembers, lenMembers)
|
|
|
|
ctxs, cancels := rpcutil.CtxsWithCancel(ctx, lenMembers)
|
|
defer rpcutil.MultiCancel(cancels)
|
|
|
|
errs := c.rpcClient.MultiCall(
|
|
ctxs,
|
|
members,
|
|
"Cluster",
|
|
"ID",
|
|
struct{}{},
|
|
rpcutil.CopyIDSerialsToIfaces(peersSerial),
|
|
)
|
|
|
|
for i, err := range errs {
|
|
if err != nil {
|
|
peersSerial[i].ID = peer.IDB58Encode(members[i])
|
|
peersSerial[i].Error = err.Error()
|
|
}
|
|
}
|
|
|
|
for i, ps := range peersSerial {
|
|
peers[i] = ps.ToID()
|
|
}
|
|
return peers
|
|
}
|
|
|
|
func (c *Cluster) globalPinInfoCid(ctx context.Context, method string, h cid.Cid) (api.GlobalPinInfo, error) {
|
|
ctx, span := trace.StartSpan(ctx, "cluster/globalPinInfoCid")
|
|
defer span.End()
|
|
|
|
pin := api.GlobalPinInfo{
|
|
Cid: h,
|
|
PeerMap: make(map[peer.ID]api.PinInfo),
|
|
}
|
|
|
|
members, err := c.consensus.Peers(ctx)
|
|
if err != nil {
|
|
logger.Error(err)
|
|
return api.GlobalPinInfo{}, err
|
|
}
|
|
lenMembers := len(members)
|
|
|
|
replies := make([]api.PinInfoSerial, lenMembers, lenMembers)
|
|
arg := api.Pin{
|
|
Cid: h,
|
|
}
|
|
|
|
ctxs, cancels := rpcutil.CtxsWithCancel(ctx, lenMembers)
|
|
defer rpcutil.MultiCancel(cancels)
|
|
|
|
errs := c.rpcClient.MultiCall(
|
|
ctxs,
|
|
members,
|
|
"Cluster",
|
|
method,
|
|
arg.ToSerial(),
|
|
rpcutil.CopyPinInfoSerialToIfaces(replies),
|
|
)
|
|
|
|
for i, rserial := range replies {
|
|
e := errs[i]
|
|
|
|
// Potentially rserial is empty. But ToPinInfo ignores all
|
|
// errors from underlying libraries. In that case .Status
|
|
// will be TrackerStatusUndefined (0)
|
|
r := rserial.ToPinInfo()
|
|
|
|
// No error. Parse and continue
|
|
if e == nil {
|
|
pin.PeerMap[members[i]] = r
|
|
continue
|
|
}
|
|
|
|
// Deal with error cases (err != nil): wrap errors in PinInfo
|
|
|
|
// In this case, we had no answer at all. The contacted peer
|
|
// must be offline or unreachable.
|
|
if r.Status == api.TrackerStatusUndefined {
|
|
logger.Errorf("%s: error in broadcast response from %s: %s ", c.id, members[i], e)
|
|
pin.PeerMap[members[i]] = api.PinInfo{
|
|
Cid: h,
|
|
Peer: members[i],
|
|
PeerName: members[i].String(),
|
|
Status: api.TrackerStatusClusterError,
|
|
TS: time.Now(),
|
|
Error: e.Error(),
|
|
}
|
|
} else { // there was an rpc error, but got a valid response :S
|
|
r.Error = e.Error()
|
|
pin.PeerMap[members[i]] = r
|
|
// unlikely to come down this path
|
|
}
|
|
}
|
|
|
|
return pin, nil
|
|
}
|
|
|
|
func (c *Cluster) globalPinInfoSlice(ctx context.Context, method string) ([]api.GlobalPinInfo, error) {
|
|
ctx, span := trace.StartSpan(ctx, "cluster/globalPinInfoSlice")
|
|
defer span.End()
|
|
|
|
infos := make([]api.GlobalPinInfo, 0)
|
|
fullMap := make(map[string]api.GlobalPinInfo)
|
|
|
|
members, err := c.consensus.Peers(ctx)
|
|
if err != nil {
|
|
logger.Error(err)
|
|
return []api.GlobalPinInfo{}, err
|
|
}
|
|
lenMembers := len(members)
|
|
|
|
replies := make([][]api.PinInfoSerial, lenMembers, lenMembers)
|
|
|
|
ctxs, cancels := rpcutil.CtxsWithCancel(ctx, lenMembers)
|
|
defer rpcutil.MultiCancel(cancels)
|
|
|
|
errs := c.rpcClient.MultiCall(
|
|
ctxs,
|
|
members,
|
|
"Cluster",
|
|
method,
|
|
struct{}{},
|
|
rpcutil.CopyPinInfoSerialSliceToIfaces(replies),
|
|
)
|
|
|
|
mergePins := func(pins []api.PinInfoSerial) {
|
|
for _, pserial := range pins {
|
|
p := pserial.ToPinInfo()
|
|
item, ok := fullMap[pserial.Cid]
|
|
if !ok {
|
|
fullMap[pserial.Cid] = api.GlobalPinInfo{
|
|
Cid: p.Cid,
|
|
PeerMap: map[peer.ID]api.PinInfo{
|
|
p.Peer: p,
|
|
},
|
|
}
|
|
} else {
|
|
item.PeerMap[p.Peer] = p
|
|
}
|
|
}
|
|
}
|
|
|
|
erroredPeers := make(map[peer.ID]string)
|
|
for i, r := range replies {
|
|
if e := errs[i]; e != nil { // This error must come from not being able to contact that cluster member
|
|
logger.Errorf("%s: error in broadcast response from %s: %s ", c.id, members[i], e)
|
|
erroredPeers[members[i]] = e.Error()
|
|
} else {
|
|
mergePins(r)
|
|
}
|
|
}
|
|
|
|
// Merge any errors
|
|
for p, msg := range erroredPeers {
|
|
for cidStr := range fullMap {
|
|
c, _ := cid.Decode(cidStr)
|
|
fullMap[cidStr].PeerMap[p] = api.PinInfo{
|
|
Cid: c,
|
|
Peer: p,
|
|
Status: api.TrackerStatusClusterError,
|
|
TS: time.Now(),
|
|
Error: msg,
|
|
}
|
|
}
|
|
}
|
|
|
|
for _, v := range fullMap {
|
|
infos = append(infos, v)
|
|
}
|
|
|
|
return infos, nil
|
|
}
|
|
|
|
func (c *Cluster) getIDForPeer(ctx context.Context, pid peer.ID) (api.ID, error) {
|
|
ctx, span := trace.StartSpan(ctx, "cluster/getIDForPeer")
|
|
defer span.End()
|
|
|
|
idSerial := api.ID{ID: pid}.ToSerial()
|
|
err := c.rpcClient.CallContext(
|
|
ctx,
|
|
pid,
|
|
"Cluster",
|
|
"ID",
|
|
struct{}{},
|
|
&idSerial,
|
|
)
|
|
id := idSerial.ToID()
|
|
if err != nil {
|
|
logger.Error(err)
|
|
id.Error = err.Error()
|
|
}
|
|
return id, err
|
|
}
|
|
|
|
// cidsFromMetaPin expands a meta-pin and returns a list of Cids that
|
|
// Cluster handles for it: the ShardPins, the ClusterDAG and the MetaPin, in
|
|
// that order (the MetaPin is the last element).
|
|
// It returns a slice with only the given Cid if it's not a known Cid or not a
|
|
// MetaPin.
|
|
func (c *Cluster) cidsFromMetaPin(ctx context.Context, h cid.Cid) ([]cid.Cid, error) {
|
|
ctx, span := trace.StartSpan(ctx, "cluster/cidsFromMetaPin")
|
|
defer span.End()
|
|
|
|
cState, err := c.consensus.State(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
list := []cid.Cid{h}
|
|
|
|
pin, ok := cState.Get(ctx, h)
|
|
if !ok {
|
|
return list, nil
|
|
}
|
|
|
|
if pin.Type != api.MetaType {
|
|
return list, nil
|
|
}
|
|
|
|
list = append([]cid.Cid{pin.Reference}, list...)
|
|
clusterDagPin, err := c.PinGet(ctx, pin.Reference)
|
|
if err != nil {
|
|
return list, fmt.Errorf("could not get clusterDAG pin from state. Malformed pin?: %s", err)
|
|
}
|
|
|
|
clusterDagBlock, err := c.ipfs.BlockGet(ctx, clusterDagPin.Cid)
|
|
if err != nil {
|
|
return list, fmt.Errorf("error reading clusterDAG block from ipfs: %s", err)
|
|
}
|
|
|
|
clusterDagNode, err := sharding.CborDataToNode(clusterDagBlock, "cbor")
|
|
if err != nil {
|
|
return list, fmt.Errorf("error parsing clusterDAG block: %s", err)
|
|
}
|
|
for _, l := range clusterDagNode.Links() {
|
|
list = append([]cid.Cid{l.Cid}, list...)
|
|
}
|
|
|
|
return list, nil
|
|
}
|
|
|
|
// diffPeers returns the peerIDs added and removed from peers2 in relation to
|
|
// peers1
|
|
func diffPeers(peers1, peers2 []peer.ID) (added, removed []peer.ID) {
|
|
m1 := make(map[peer.ID]struct{})
|
|
m2 := make(map[peer.ID]struct{})
|
|
added = make([]peer.ID, 0)
|
|
removed = make([]peer.ID, 0)
|
|
if peers1 == nil && peers2 == nil {
|
|
return
|
|
}
|
|
if peers1 == nil {
|
|
added = peers2
|
|
return
|
|
}
|
|
if peers2 == nil {
|
|
removed = peers1
|
|
return
|
|
}
|
|
|
|
for _, p := range peers1 {
|
|
m1[p] = struct{}{}
|
|
}
|
|
for _, p := range peers2 {
|
|
m2[p] = struct{}{}
|
|
}
|
|
for k := range m1 {
|
|
_, ok := m2[k]
|
|
if !ok {
|
|
removed = append(removed, k)
|
|
}
|
|
}
|
|
for k := range m2 {
|
|
_, ok := m1[k]
|
|
if !ok {
|
|
added = append(added, k)
|
|
}
|
|
}
|
|
return
|
|
}
|