2016-12-02 18:33:39 +00:00
|
|
|
package ipfscluster
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2017-01-23 19:29:05 +00:00
|
|
|
"math/rand"
|
2016-12-15 13:07:19 +00:00
|
|
|
"sync"
|
2017-01-23 19:29:05 +00:00
|
|
|
"time"
|
2016-12-02 18:33:39 +00:00
|
|
|
|
2017-01-25 11:14:39 +00:00
|
|
|
rpc "github.com/hsanjuan/go-libp2p-gorpc"
|
2016-12-16 21:00:08 +00:00
|
|
|
cid "github.com/ipfs/go-cid"
|
2016-12-16 11:40:28 +00:00
|
|
|
host "github.com/libp2p/go-libp2p-host"
|
|
|
|
peer "github.com/libp2p/go-libp2p-peer"
|
|
|
|
peerstore "github.com/libp2p/go-libp2p-peerstore"
|
|
|
|
swarm "github.com/libp2p/go-libp2p-swarm"
|
|
|
|
basichost "github.com/libp2p/go-libp2p/p2p/host/basic"
|
2017-01-24 15:19:23 +00:00
|
|
|
ma "github.com/multiformats/go-multiaddr"
|
2016-12-02 18:33:39 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
// Cluster is the main IPFS cluster component. It provides
|
2017-01-24 11:39:08 +00:00
|
|
|
// the go-API for it and orchestrates the components that make up the system.
|
2016-12-02 18:33:39 +00:00
|
|
|
type Cluster struct {
|
2016-12-09 19:54:46 +00:00
|
|
|
ctx context.Context
|
2016-12-02 18:33:39 +00:00
|
|
|
|
2016-12-23 18:35:37 +00:00
|
|
|
config *Config
|
|
|
|
host host.Host
|
|
|
|
rpcServer *rpc.Server
|
|
|
|
rpcClient *rpc.Client
|
2016-12-02 18:33:39 +00:00
|
|
|
|
2016-12-15 18:08:46 +00:00
|
|
|
consensus *Consensus
|
|
|
|
api API
|
2016-12-02 18:33:39 +00:00
|
|
|
ipfs IPFSConnector
|
2016-12-15 18:08:46 +00:00
|
|
|
state State
|
2016-12-06 21:29:59 +00:00
|
|
|
tracker PinTracker
|
2016-12-15 13:07:19 +00:00
|
|
|
|
|
|
|
shutdownLock sync.Mutex
|
|
|
|
shutdown bool
|
|
|
|
shutdownCh chan struct{}
|
|
|
|
wg sync.WaitGroup
|
2016-12-02 18:33:39 +00:00
|
|
|
}
|
|
|
|
|
2016-12-23 18:35:37 +00:00
|
|
|
// NewCluster builds a new IPFS Cluster. It initializes a LibP2P host, creates
|
|
|
|
// and RPC Server and client and sets up all components.
|
|
|
|
func NewCluster(cfg *Config, api API, ipfs IPFSConnector, state State, tracker PinTracker) (*Cluster, error) {
|
2016-12-09 19:54:46 +00:00
|
|
|
ctx := context.Background()
|
2016-12-02 18:33:39 +00:00
|
|
|
host, err := makeHost(ctx, cfg)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2016-12-23 18:35:37 +00:00
|
|
|
rpcServer := rpc.NewServer(host, RPCProtocol)
|
|
|
|
rpcClient := rpc.NewClientWithServer(host, RPCProtocol, rpcServer)
|
|
|
|
|
2017-01-23 22:58:04 +00:00
|
|
|
logger.Infof("IPFS Cluster v%s - %s/ipfs/%s", Version, cfg.ClusterAddr, host.ID().Pretty())
|
|
|
|
|
2016-12-15 18:08:46 +00:00
|
|
|
consensus, err := NewConsensus(cfg, host, state)
|
2016-12-02 18:33:39 +00:00
|
|
|
if err != nil {
|
2016-12-15 13:19:41 +00:00
|
|
|
logger.Errorf("error creating consensus: %s", err)
|
2016-12-02 18:33:39 +00:00
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
cluster := &Cluster{
|
2016-12-15 13:07:19 +00:00
|
|
|
ctx: ctx,
|
|
|
|
config: cfg,
|
|
|
|
host: host,
|
2016-12-23 18:35:37 +00:00
|
|
|
rpcServer: rpcServer,
|
|
|
|
rpcClient: rpcClient,
|
2016-12-15 13:07:19 +00:00
|
|
|
consensus: consensus,
|
|
|
|
api: api,
|
|
|
|
ipfs: ipfs,
|
|
|
|
state: state,
|
|
|
|
tracker: tracker,
|
|
|
|
shutdownCh: make(chan struct{}),
|
2016-12-02 18:33:39 +00:00
|
|
|
}
|
|
|
|
|
2016-12-23 18:35:37 +00:00
|
|
|
err = rpcServer.RegisterName(
|
|
|
|
"Cluster",
|
|
|
|
&RPCAPI{cluster: cluster})
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2017-01-23 19:29:05 +00:00
|
|
|
// Workaround for https://github.com/libp2p/go-libp2p-swarm/issues/15
|
|
|
|
cluster.openConns()
|
|
|
|
|
2017-01-23 17:38:59 +00:00
|
|
|
defer func() {
|
|
|
|
tracker.SetClient(rpcClient)
|
|
|
|
ipfs.SetClient(rpcClient)
|
|
|
|
api.SetClient(rpcClient)
|
|
|
|
consensus.SetClient(rpcClient)
|
|
|
|
}()
|
2016-12-09 19:54:46 +00:00
|
|
|
|
2016-12-15 13:07:19 +00:00
|
|
|
cluster.run()
|
2016-12-02 18:33:39 +00:00
|
|
|
return cluster, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Shutdown stops the IPFS cluster components
|
|
|
|
func (c *Cluster) Shutdown() error {
|
2016-12-15 13:07:19 +00:00
|
|
|
c.shutdownLock.Lock()
|
|
|
|
defer c.shutdownLock.Unlock()
|
|
|
|
if c.shutdown {
|
|
|
|
logger.Warning("Cluster is already shutdown")
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2016-12-15 13:19:41 +00:00
|
|
|
logger.Info("shutting down IPFS Cluster")
|
2016-12-02 18:33:39 +00:00
|
|
|
if err := c.consensus.Shutdown(); err != nil {
|
2016-12-15 13:19:41 +00:00
|
|
|
logger.Errorf("error stopping consensus: %s", err)
|
2016-12-02 18:33:39 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
if err := c.api.Shutdown(); err != nil {
|
2016-12-15 13:19:41 +00:00
|
|
|
logger.Errorf("error stopping API: %s", err)
|
2016-12-02 18:33:39 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
if err := c.ipfs.Shutdown(); err != nil {
|
2016-12-15 13:19:41 +00:00
|
|
|
logger.Errorf("error stopping IPFS Connector: %s", err)
|
2016-12-02 18:33:39 +00:00
|
|
|
return err
|
|
|
|
}
|
2016-12-07 16:21:29 +00:00
|
|
|
|
|
|
|
if err := c.tracker.Shutdown(); err != nil {
|
2016-12-15 13:19:41 +00:00
|
|
|
logger.Errorf("error stopping PinTracker: %s", err)
|
2016-12-07 16:21:29 +00:00
|
|
|
return err
|
|
|
|
}
|
2016-12-15 13:07:19 +00:00
|
|
|
c.shutdownCh <- struct{}{}
|
|
|
|
c.wg.Wait()
|
2016-12-23 18:35:37 +00:00
|
|
|
c.host.Close() // Shutdown all network services
|
2016-12-09 19:54:46 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2017-01-25 17:07:19 +00:00
|
|
|
// ID returns information about the Cluster peer
|
2017-01-24 15:19:23 +00:00
|
|
|
func (c *Cluster) ID() ID {
|
2017-01-26 18:59:31 +00:00
|
|
|
// ignore error since it is included in response object
|
|
|
|
ipfsID, _ := c.ipfs.ID()
|
|
|
|
var addrs []ma.Multiaddr
|
|
|
|
for _, addr := range c.host.Addrs() {
|
|
|
|
ipfsAddr, _ := ma.NewMultiaddr("/ipfs/" + c.host.ID().Pretty())
|
|
|
|
addrs = append(addrs, addr.Encapsulate(ipfsAddr))
|
|
|
|
}
|
|
|
|
|
2017-01-24 15:19:23 +00:00
|
|
|
return ID{
|
|
|
|
ID: c.host.ID(),
|
|
|
|
PublicKey: c.host.Peerstore().PubKey(c.host.ID()),
|
2017-01-26 18:59:31 +00:00
|
|
|
Addresses: addrs,
|
2017-01-24 15:19:23 +00:00
|
|
|
Version: Version,
|
2017-01-24 15:55:37 +00:00
|
|
|
Commit: Commit,
|
2017-01-24 15:19:23 +00:00
|
|
|
RPCProtocolVersion: RPCProtocol,
|
2017-01-26 18:59:31 +00:00
|
|
|
IPFS: ipfsID,
|
2017-01-24 15:19:23 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-12-20 18:51:13 +00:00
|
|
|
// StateSync syncs the consensus state to the Pin Tracker, ensuring
|
|
|
|
// that every Cid that should be tracked is tracked. It returns
|
|
|
|
// PinInfo for Cids which were added or deleted.
|
|
|
|
func (c *Cluster) StateSync() ([]PinInfo, error) {
|
2016-12-09 19:54:46 +00:00
|
|
|
cState, err := c.consensus.State()
|
|
|
|
if err != nil {
|
2016-12-15 18:08:46 +00:00
|
|
|
return nil, err
|
2016-12-09 19:54:46 +00:00
|
|
|
}
|
2016-12-20 18:51:13 +00:00
|
|
|
|
2017-01-23 22:58:04 +00:00
|
|
|
logger.Debug("syncing state to tracker")
|
2016-12-20 18:51:13 +00:00
|
|
|
clusterPins := cState.ListPins()
|
|
|
|
var changed []*cid.Cid
|
|
|
|
|
|
|
|
// Track items which are not tracked
|
|
|
|
for _, h := range clusterPins {
|
2017-01-25 18:38:23 +00:00
|
|
|
if c.tracker.Status(h).Status == TrackerStatusUnpinned {
|
2016-12-20 18:51:13 +00:00
|
|
|
changed = append(changed, h)
|
2016-12-23 18:35:37 +00:00
|
|
|
err := c.rpcClient.Go("",
|
|
|
|
"Cluster",
|
|
|
|
"Track",
|
|
|
|
NewCidArg(h),
|
|
|
|
&struct{}{},
|
|
|
|
nil)
|
|
|
|
if err != nil {
|
|
|
|
return []PinInfo{}, err
|
|
|
|
}
|
2016-12-20 18:51:13 +00:00
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Untrack items which should not be tracked
|
2017-01-25 18:38:23 +00:00
|
|
|
for _, p := range c.tracker.StatusAll() {
|
2016-12-20 18:51:13 +00:00
|
|
|
h, _ := cid.Decode(p.CidStr)
|
|
|
|
if !cState.HasPin(h) {
|
|
|
|
changed = append(changed, h)
|
2016-12-23 18:35:37 +00:00
|
|
|
err := c.rpcClient.Go("",
|
|
|
|
"Cluster",
|
|
|
|
"Track",
|
|
|
|
&CidArg{p.CidStr},
|
|
|
|
&struct{}{},
|
|
|
|
nil)
|
|
|
|
if err != nil {
|
|
|
|
return []PinInfo{}, err
|
|
|
|
}
|
2016-12-20 18:51:13 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
var infos []PinInfo
|
2016-12-19 17:35:24 +00:00
|
|
|
for _, h := range changed {
|
2017-01-25 18:38:23 +00:00
|
|
|
infos = append(infos, c.tracker.Status(h))
|
2016-12-20 18:51:13 +00:00
|
|
|
}
|
|
|
|
return infos, nil
|
|
|
|
}
|
|
|
|
|
2017-01-25 18:38:23 +00:00
|
|
|
// StatusAll returns the GlobalPinInfo for all tracked Cids. If an error
|
|
|
|
// happens, the slice will contain as much information as could be fetched.
|
|
|
|
func (c *Cluster) StatusAll() ([]GlobalPinInfo, error) {
|
|
|
|
return c.globalPinInfoSlice("TrackerStatusAll")
|
2016-12-23 18:35:37 +00:00
|
|
|
}
|
|
|
|
|
2017-01-25 18:38:23 +00:00
|
|
|
// Status returns the GlobalPinInfo for a given Cid. If an error happens,
|
2016-12-23 18:35:37 +00:00
|
|
|
// the GlobalPinInfo should contain as much information as could be fetched.
|
2017-01-25 18:38:23 +00:00
|
|
|
func (c *Cluster) Status(h *cid.Cid) (GlobalPinInfo, error) {
|
|
|
|
return c.globalPinInfoCid("TrackerStatus", h)
|
2016-12-23 18:35:37 +00:00
|
|
|
}
|
|
|
|
|
2017-01-25 18:38:23 +00:00
|
|
|
// SyncAllLocal makes sure that the current state for all tracked items
|
|
|
|
// matches the state reported by the IPFS daemon.
|
2016-12-20 18:51:13 +00:00
|
|
|
//
|
2017-01-25 18:38:23 +00:00
|
|
|
// SyncAllLocal returns the list of PinInfo that where updated because of
|
|
|
|
// the operation, along with those in error states.
|
|
|
|
func (c *Cluster) SyncAllLocal() ([]PinInfo, error) {
|
|
|
|
syncedItems, err := c.tracker.SyncAll()
|
|
|
|
// Despite errors, tracker provides synced items that we can provide.
|
|
|
|
// They encapsulate the error.
|
2017-01-25 17:07:19 +00:00
|
|
|
if err != nil {
|
|
|
|
logger.Error("tracker.Sync() returned with error: ", err)
|
|
|
|
logger.Error("Is the ipfs daemon running?")
|
|
|
|
logger.Error("LocalSync returning without attempting recovers")
|
2016-12-09 19:54:46 +00:00
|
|
|
}
|
2017-01-25 18:38:23 +00:00
|
|
|
return syncedItems, err
|
2016-12-15 18:08:46 +00:00
|
|
|
}
|
|
|
|
|
2017-01-25 18:38:23 +00:00
|
|
|
// SyncLocal performs a local sync operation for the given Cid. This will
|
|
|
|
// tell the tracker to verify the status of the Cid against the IPFS daemon.
|
|
|
|
// It returns the updated PinInfo for the Cid.
|
|
|
|
func (c *Cluster) SyncLocal(h *cid.Cid) (PinInfo, error) {
|
2016-12-20 18:51:13 +00:00
|
|
|
var err error
|
2017-01-25 18:38:23 +00:00
|
|
|
pInfo, err := c.tracker.Sync(h)
|
2017-01-25 17:07:19 +00:00
|
|
|
// Despite errors, trackers provides an updated PinInfo so
|
|
|
|
// we just log it.
|
|
|
|
if err != nil {
|
|
|
|
logger.Error("tracker.SyncCid() returned with error: ", err)
|
|
|
|
logger.Error("Is the ipfs daemon running?")
|
2016-12-15 18:08:46 +00:00
|
|
|
}
|
2017-01-25 18:38:23 +00:00
|
|
|
return pInfo, err
|
2016-12-15 18:08:46 +00:00
|
|
|
}
|
|
|
|
|
2017-01-26 18:59:31 +00:00
|
|
|
// SyncAll triggers LocalSync() operations in all cluster peers.
|
2017-01-25 18:38:23 +00:00
|
|
|
func (c *Cluster) SyncAll() ([]GlobalPinInfo, error) {
|
|
|
|
return c.globalPinInfoSlice("SyncAllLocal")
|
2016-12-15 18:08:46 +00:00
|
|
|
}
|
|
|
|
|
2017-01-25 18:38:23 +00:00
|
|
|
// Sync triggers a LocalSyncCid() operation for a given Cid
|
2017-01-26 18:59:31 +00:00
|
|
|
// in all cluster peers.
|
2017-01-25 18:38:23 +00:00
|
|
|
func (c *Cluster) Sync(h *cid.Cid) (GlobalPinInfo, error) {
|
|
|
|
return c.globalPinInfoCid("SyncLocal", h)
|
|
|
|
}
|
|
|
|
|
|
|
|
// RecoverLocal triggers a recover operation for a given Cid
|
|
|
|
func (c *Cluster) RecoverLocal(h *cid.Cid) (PinInfo, error) {
|
|
|
|
return c.tracker.Recover(h)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Recover triggers a recover operation for a given Cid in all
|
2017-01-26 18:59:31 +00:00
|
|
|
// cluster peers.
|
2017-01-25 18:38:23 +00:00
|
|
|
func (c *Cluster) Recover(h *cid.Cid) (GlobalPinInfo, error) {
|
|
|
|
return c.globalPinInfoCid("TrackerRecover", h)
|
2016-12-15 18:08:46 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Pins returns the list of Cids managed by Cluster and which are part
|
|
|
|
// of the current global state. This is the source of truth as to which
|
|
|
|
// pins are managed, but does not indicate if the item is successfully pinned.
|
|
|
|
func (c *Cluster) Pins() []*cid.Cid {
|
|
|
|
cState, err := c.consensus.State()
|
|
|
|
if err != nil {
|
|
|
|
return []*cid.Cid{}
|
|
|
|
}
|
|
|
|
return cState.ListPins()
|
2016-12-02 18:33:39 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Pin makes the cluster Pin a Cid. This implies adding the Cid
|
|
|
|
// to the IPFS Cluster peers shared-state. Depending on the cluster
|
2016-12-16 11:40:28 +00:00
|
|
|
// pinning strategy, the PinTracker may then request the IPFS daemon
|
2017-01-23 13:01:49 +00:00
|
|
|
// to pin the Cid.
|
2016-12-02 18:33:39 +00:00
|
|
|
//
|
|
|
|
// Pin returns an error if the operation could not be persisted
|
|
|
|
// to the global state. Pin does not reflect the success or failure
|
|
|
|
// of underlying IPFS daemon pinning operations.
|
|
|
|
func (c *Cluster) Pin(h *cid.Cid) error {
|
2016-12-15 13:19:41 +00:00
|
|
|
logger.Info("pinning:", h)
|
2017-01-23 13:01:49 +00:00
|
|
|
err := c.consensus.LogPin(h)
|
2016-12-23 18:35:37 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
2016-12-02 18:33:39 +00:00
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Unpin makes the cluster Unpin a Cid. This implies adding the Cid
|
2017-01-23 13:01:49 +00:00
|
|
|
// to the IPFS Cluster peers shared-state.
|
2016-12-02 18:33:39 +00:00
|
|
|
//
|
|
|
|
// Unpin returns an error if the operation could not be persisted
|
|
|
|
// to the global state. Unpin does not reflect the success or failure
|
|
|
|
// of underlying IPFS daemon unpinning operations.
|
|
|
|
func (c *Cluster) Unpin(h *cid.Cid) error {
|
2017-01-25 18:38:23 +00:00
|
|
|
logger.Info("unpinning:", h)
|
2017-01-23 13:01:49 +00:00
|
|
|
err := c.consensus.LogUnpin(h)
|
2016-12-23 18:35:37 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
2016-12-02 18:33:39 +00:00
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Version returns the current IPFS Cluster version
|
|
|
|
func (c *Cluster) Version() string {
|
|
|
|
return Version
|
|
|
|
}
|
|
|
|
|
2017-01-26 18:59:31 +00:00
|
|
|
// Peers returns the IDs of the members of this Cluster
|
|
|
|
func (c *Cluster) Peers() []ID {
|
|
|
|
members := c.peerList()
|
|
|
|
peersSerial := make([]IDSerial, len(members), len(members))
|
|
|
|
peers := make([]ID, len(members), len(members))
|
|
|
|
|
|
|
|
ifaceReplies := make([]interface{}, len(members), len(members))
|
|
|
|
for i := range peersSerial {
|
|
|
|
ifaceReplies[i] = &peersSerial[i]
|
|
|
|
}
|
|
|
|
|
|
|
|
errs := c.multiRPC(members, "Cluster", "ID", struct{}{}, ifaceReplies)
|
|
|
|
for i, err := range errs {
|
|
|
|
if err != nil {
|
|
|
|
peersSerial[i].ID = peer.IDB58Encode(members[i])
|
|
|
|
peersSerial[i].Error = err.Error()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
for i, ps := range peersSerial {
|
|
|
|
peers[i] = ps.ToID()
|
|
|
|
}
|
|
|
|
return peers
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *Cluster) peerList() []peer.ID {
|
2016-12-02 18:33:39 +00:00
|
|
|
return c.host.Peerstore().Peers()
|
|
|
|
}
|
|
|
|
|
|
|
|
// run reads from the RPC channels of the different components and launches
|
|
|
|
// short-lived go-routines to handle any requests.
|
|
|
|
func (c *Cluster) run() {
|
2016-12-15 13:07:19 +00:00
|
|
|
c.wg.Add(1)
|
2016-12-23 18:35:37 +00:00
|
|
|
|
|
|
|
// Currently we do nothing other than waiting to
|
|
|
|
// cancel our context.
|
2016-12-15 13:07:19 +00:00
|
|
|
go func() {
|
|
|
|
defer c.wg.Done()
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
defer cancel()
|
|
|
|
c.ctx = ctx
|
2016-12-23 18:35:37 +00:00
|
|
|
<-c.shutdownCh
|
2016-12-15 13:07:19 +00:00
|
|
|
}()
|
2016-12-14 16:25:21 +00:00
|
|
|
}
|
2016-12-06 21:29:59 +00:00
|
|
|
|
2016-12-02 18:33:39 +00:00
|
|
|
// makeHost makes a libp2p-host
|
2016-12-15 18:08:46 +00:00
|
|
|
func makeHost(ctx context.Context, cfg *Config) (host.Host, error) {
|
2016-12-02 18:33:39 +00:00
|
|
|
ps := peerstore.NewPeerstore()
|
2017-01-23 17:38:59 +00:00
|
|
|
privateKey := cfg.PrivateKey
|
2016-12-02 18:33:39 +00:00
|
|
|
publicKey := privateKey.GetPublic()
|
|
|
|
|
2017-01-23 17:38:59 +00:00
|
|
|
if err := ps.AddPubKey(cfg.ID, publicKey); err != nil {
|
2016-12-02 18:33:39 +00:00
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2017-01-23 17:38:59 +00:00
|
|
|
if err := ps.AddPrivKey(cfg.ID, privateKey); err != nil {
|
2016-12-02 18:33:39 +00:00
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2017-01-23 17:38:59 +00:00
|
|
|
for _, addr := range cfg.ClusterPeers {
|
2017-01-24 15:19:23 +00:00
|
|
|
pid, err := addr.ValueForProtocol(ma.P_IPFS)
|
2016-12-02 18:33:39 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2017-01-24 15:19:23 +00:00
|
|
|
ipfs, _ := ma.NewMultiaddr("/ipfs/" + pid)
|
2017-01-23 17:38:59 +00:00
|
|
|
maddr := addr.Decapsulate(ipfs)
|
2016-12-02 18:33:39 +00:00
|
|
|
|
|
|
|
peerID, err := peer.IDB58Decode(pid)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
ps.AddAddrs(
|
|
|
|
peerID,
|
2017-01-24 15:19:23 +00:00
|
|
|
[]ma.Multiaddr{maddr},
|
2016-12-02 18:33:39 +00:00
|
|
|
peerstore.PermanentAddrTTL)
|
|
|
|
}
|
|
|
|
|
|
|
|
network, err := swarm.NewNetwork(
|
|
|
|
ctx,
|
2017-01-24 15:19:23 +00:00
|
|
|
[]ma.Multiaddr{cfg.ClusterAddr},
|
2017-01-23 17:38:59 +00:00
|
|
|
cfg.ID,
|
2016-12-02 18:33:39 +00:00
|
|
|
ps,
|
|
|
|
nil,
|
|
|
|
)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
bhost := basichost.New(network)
|
|
|
|
return bhost, nil
|
|
|
|
}
|
2016-12-20 18:51:13 +00:00
|
|
|
|
2016-12-23 18:35:37 +00:00
|
|
|
// Perform a sync rpc request to multiple destinations
|
|
|
|
func (c *Cluster) multiRPC(dests []peer.ID, svcName, svcMethod string, args interface{}, reply []interface{}) []error {
|
|
|
|
if len(dests) != len(reply) {
|
2017-01-23 13:21:26 +00:00
|
|
|
panic("must have matching dests and replies")
|
2016-12-23 18:35:37 +00:00
|
|
|
}
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
errs := make([]error, len(dests), len(dests))
|
|
|
|
|
2016-12-28 15:25:24 +00:00
|
|
|
for i := range dests {
|
2016-12-23 18:35:37 +00:00
|
|
|
wg.Add(1)
|
|
|
|
go func(i int) {
|
|
|
|
defer wg.Done()
|
|
|
|
err := c.rpcClient.Call(
|
|
|
|
dests[i],
|
|
|
|
svcName,
|
|
|
|
svcMethod,
|
|
|
|
args,
|
|
|
|
reply[i])
|
|
|
|
errs[i] = err
|
|
|
|
}(i)
|
|
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
return errs
|
2016-12-20 18:51:13 +00:00
|
|
|
|
2016-12-23 18:35:37 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (c *Cluster) globalPinInfoCid(method string, h *cid.Cid) (GlobalPinInfo, error) {
|
2016-12-20 18:51:13 +00:00
|
|
|
pin := GlobalPinInfo{
|
2017-01-25 17:07:19 +00:00
|
|
|
Cid: h,
|
|
|
|
PeerMap: make(map[peer.ID]PinInfo),
|
2016-12-20 18:51:13 +00:00
|
|
|
}
|
|
|
|
|
2017-01-26 18:59:31 +00:00
|
|
|
members := c.peerList()
|
2016-12-23 18:35:37 +00:00
|
|
|
replies := make([]PinInfo, len(members), len(members))
|
|
|
|
ifaceReplies := make([]interface{}, len(members), len(members))
|
2016-12-28 15:25:24 +00:00
|
|
|
for i := range replies {
|
2016-12-23 18:35:37 +00:00
|
|
|
ifaceReplies[i] = &replies[i]
|
2016-12-20 18:51:13 +00:00
|
|
|
}
|
2016-12-23 18:35:37 +00:00
|
|
|
args := NewCidArg(h)
|
|
|
|
errs := c.multiRPC(members, "Cluster", method, args, ifaceReplies)
|
2016-12-20 18:51:13 +00:00
|
|
|
|
2016-12-23 18:35:37 +00:00
|
|
|
for i, r := range replies {
|
2017-01-25 17:07:19 +00:00
|
|
|
if e := errs[i]; e != nil { // This error must come from not being able to contact that cluster member
|
2017-01-23 19:29:05 +00:00
|
|
|
logger.Errorf("%s: error in broadcast response from %s: %s ", c.host.ID(), members[i], e)
|
2017-01-25 17:07:19 +00:00
|
|
|
if r.Status == TrackerStatusBug {
|
2017-01-23 23:52:42 +00:00
|
|
|
r = PinInfo{
|
|
|
|
CidStr: h.String(),
|
|
|
|
Peer: members[i],
|
2017-01-25 17:07:19 +00:00
|
|
|
Status: TrackerStatusClusterError,
|
2017-01-23 23:52:42 +00:00
|
|
|
TS: time.Now(),
|
|
|
|
Error: e.Error(),
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
r.Error = e.Error()
|
|
|
|
}
|
2016-12-20 18:51:13 +00:00
|
|
|
}
|
2017-01-25 17:07:19 +00:00
|
|
|
pin.PeerMap[members[i]] = r
|
2016-12-20 18:51:13 +00:00
|
|
|
}
|
2016-12-28 15:25:24 +00:00
|
|
|
|
2017-01-23 23:52:42 +00:00
|
|
|
return pin, nil
|
2016-12-20 18:51:13 +00:00
|
|
|
}
|
|
|
|
|
2016-12-23 18:35:37 +00:00
|
|
|
func (c *Cluster) globalPinInfoSlice(method string) ([]GlobalPinInfo, error) {
|
2016-12-20 18:51:13 +00:00
|
|
|
var infos []GlobalPinInfo
|
|
|
|
fullMap := make(map[string]GlobalPinInfo)
|
|
|
|
|
2017-01-26 18:59:31 +00:00
|
|
|
members := c.peerList()
|
2016-12-23 18:35:37 +00:00
|
|
|
replies := make([][]PinInfo, len(members), len(members))
|
|
|
|
ifaceReplies := make([]interface{}, len(members), len(members))
|
2016-12-28 15:25:24 +00:00
|
|
|
for i := range replies {
|
2016-12-23 18:35:37 +00:00
|
|
|
ifaceReplies[i] = &replies[i]
|
2016-12-20 18:51:13 +00:00
|
|
|
}
|
2016-12-23 18:35:37 +00:00
|
|
|
errs := c.multiRPC(members, "Cluster", method, struct{}{}, ifaceReplies)
|
2016-12-20 18:51:13 +00:00
|
|
|
|
|
|
|
mergePins := func(pins []PinInfo) {
|
|
|
|
for _, p := range pins {
|
|
|
|
item, ok := fullMap[p.CidStr]
|
|
|
|
c, _ := cid.Decode(p.CidStr)
|
|
|
|
if !ok {
|
|
|
|
fullMap[p.CidStr] = GlobalPinInfo{
|
|
|
|
Cid: c,
|
2017-01-25 17:07:19 +00:00
|
|
|
PeerMap: map[peer.ID]PinInfo{
|
2016-12-20 18:51:13 +00:00
|
|
|
p.Peer: p,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
} else {
|
2017-01-25 17:07:19 +00:00
|
|
|
item.PeerMap[p.Peer] = p
|
2016-12-20 18:51:13 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-01-24 00:09:27 +00:00
|
|
|
erroredPeers := make(map[peer.ID]string)
|
2016-12-23 18:35:37 +00:00
|
|
|
for i, r := range replies {
|
2017-01-25 17:07:19 +00:00
|
|
|
if e := errs[i]; e != nil { // This error must come from not being able to contact that cluster member
|
2017-01-23 19:29:05 +00:00
|
|
|
logger.Errorf("%s: error in broadcast response from %s: %s ", c.host.ID(), members[i], e)
|
2017-01-24 00:09:27 +00:00
|
|
|
erroredPeers[members[i]] = e.Error()
|
2017-01-23 23:52:42 +00:00
|
|
|
} else {
|
|
|
|
mergePins(r)
|
2016-12-20 18:51:13 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-01-24 00:09:27 +00:00
|
|
|
// Merge any errors
|
|
|
|
for p, msg := range erroredPeers {
|
2017-01-24 11:39:08 +00:00
|
|
|
for c := range fullMap {
|
2017-01-25 17:07:19 +00:00
|
|
|
fullMap[c].PeerMap[p] = PinInfo{
|
2017-01-24 00:09:27 +00:00
|
|
|
CidStr: c,
|
|
|
|
Peer: p,
|
2017-01-25 17:07:19 +00:00
|
|
|
Status: TrackerStatusClusterError,
|
2017-01-24 00:09:27 +00:00
|
|
|
TS: time.Now(),
|
|
|
|
Error: msg,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-12-20 18:51:13 +00:00
|
|
|
for _, v := range fullMap {
|
|
|
|
infos = append(infos, v)
|
|
|
|
}
|
|
|
|
|
2017-01-23 23:52:42 +00:00
|
|
|
return infos, nil
|
2016-12-20 18:51:13 +00:00
|
|
|
}
|
2017-01-23 19:29:05 +00:00
|
|
|
|
|
|
|
// openConns is a workaround for
|
|
|
|
// https://github.com/libp2p/go-libp2p-swarm/issues/15
|
2017-01-26 18:59:31 +00:00
|
|
|
// which break our tests.
|
2017-01-23 19:29:05 +00:00
|
|
|
// It runs when consensus is initialized so we can assume
|
|
|
|
// that the cluster is more or less up.
|
|
|
|
// It should open connections for peers where they haven't
|
|
|
|
// yet been opened. By randomly sleeping we reduce the
|
2017-01-26 18:59:31 +00:00
|
|
|
// chance that peers will open 2 connections simultaneously.
|
2017-01-23 19:29:05 +00:00
|
|
|
func (c *Cluster) openConns() {
|
|
|
|
rand.Seed(time.Now().UnixNano())
|
|
|
|
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
|
|
|
|
peers := c.host.Peerstore().Peers()
|
|
|
|
for _, p := range peers {
|
|
|
|
peerInfo := c.host.Peerstore().PeerInfo(p)
|
|
|
|
if p == c.host.ID() {
|
|
|
|
continue // do not connect to ourselves
|
|
|
|
}
|
|
|
|
// ignore any errors here
|
|
|
|
c.host.Connect(c.ctx, peerInfo)
|
|
|
|
}
|
|
|
|
}
|