staticcheck: fix all staticcheck warnings in the project
This commit is contained in:
parent
260930305a
commit
f83ff9b655
|
@ -178,7 +178,7 @@ func TestAddOnePeerFails(t *testing.T) {
|
|||
defer wg.Done()
|
||||
_, err := clusters[0].AddFile(r, params)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
t.Error(err)
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -236,7 +236,7 @@ func TestAddAllPeersFail(t *testing.T) {
|
|||
defer wg.Done()
|
||||
_, err := clusters[0].AddFile(r, params)
|
||||
if err != adder.ErrBlockAdder {
|
||||
t.Fatal("expected ErrBlockAdder. Got: ", err)
|
||||
t.Error("expected ErrBlockAdder. Got: ", err)
|
||||
}
|
||||
}()
|
||||
|
||||
|
|
|
@ -121,29 +121,30 @@ func (adder *Adder) add(reader io.Reader) (ipld.Node, error) {
|
|||
return nd, nil
|
||||
}
|
||||
|
||||
// RootNode returns the mfs root node
|
||||
func (adder *Adder) curRootNode() (ipld.Node, error) {
|
||||
mr, err := adder.mfsRoot()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
root, err := mr.GetDirectory().GetNode()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Cluster: commented as it is unused
|
||||
// // RootNode returns the mfs root node
|
||||
// func (adder *Adder) curRootNode() (ipld.Node, error) {
|
||||
// mr, err := adder.mfsRoot()
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// root, err := mr.GetDirectory().GetNode()
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
|
||||
// if one root file, use that hash as root.
|
||||
if len(root.Links()) == 1 {
|
||||
nd, err := root.Links()[0].GetNode(adder.ctx, adder.dagService)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// // if one root file, use that hash as root.
|
||||
// if len(root.Links()) == 1 {
|
||||
// nd, err := root.Links()[0].GetNode(adder.ctx, adder.dagService)
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
|
||||
root = nd
|
||||
}
|
||||
// root = nd
|
||||
// }
|
||||
|
||||
return root, err
|
||||
}
|
||||
// return root, err
|
||||
// }
|
||||
|
||||
// PinRoot recursively pins the root node of Adder and
|
||||
// writes the pin state to the backing datastore.
|
||||
|
|
|
@ -36,7 +36,6 @@ func init() {
|
|||
|
||||
// MaxLinks is the max number of links that, when serialized fit into a block
|
||||
const MaxLinks = 5984
|
||||
const fixedPerLink = 40
|
||||
const hashFn = mh.SHA2_256
|
||||
|
||||
// CborDataToNode parses cbor data into a clusterDAG node while making a few
|
||||
|
|
|
@ -21,7 +21,6 @@ import (
|
|||
rpc "github.com/libp2p/go-libp2p-gorpc"
|
||||
)
|
||||
|
||||
var errNotFound = errors.New("dagservice: block not found")
|
||||
var logger = logging.Logger("shardingdags")
|
||||
|
||||
// DAGService is an implementation of a ClusterDAGService which
|
||||
|
|
|
@ -35,7 +35,7 @@ func newShard(ctx context.Context, rpc *rpc.Client, opts api.PinOptions) (*shard
|
|||
return nil, err
|
||||
}
|
||||
|
||||
if opts.ReplicationFactorMin > 0 && (allocs == nil || len(allocs) == 0) {
|
||||
if opts.ReplicationFactorMin > 0 && len(allocs) == 0 {
|
||||
// This would mean that the empty cid is part of the shared state somehow.
|
||||
panic("allocations for new shard cannot be empty without error")
|
||||
}
|
||||
|
|
|
@ -4,7 +4,6 @@ package single
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
adder "github.com/ipfs/ipfs-cluster/adder"
|
||||
"github.com/ipfs/ipfs-cluster/api"
|
||||
|
@ -16,9 +15,8 @@ import (
|
|||
rpc "github.com/libp2p/go-libp2p-gorpc"
|
||||
)
|
||||
|
||||
var errNotFound = errors.New("dagservice: block not found")
|
||||
|
||||
var logger = logging.Logger("singledags")
|
||||
var _ = logger // otherwise unused
|
||||
|
||||
// DAGService is an implementation of an adder.ClusterDAGService which
|
||||
// puts the added blocks directly in the peers allocated to them (without
|
||||
|
|
|
@ -11,13 +11,10 @@ import (
|
|||
"github.com/ipfs/ipfs-cluster/api"
|
||||
|
||||
cid "github.com/ipfs/go-cid"
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
peer "github.com/libp2p/go-libp2p-core/peer"
|
||||
rpc "github.com/libp2p/go-libp2p-gorpc"
|
||||
)
|
||||
|
||||
var logger = logging.Logger("ascendalloc")
|
||||
|
||||
// AscendAllocator extends the SimpleAllocator
|
||||
type AscendAllocator struct{}
|
||||
|
||||
|
|
|
@ -11,13 +11,10 @@ import (
|
|||
"github.com/ipfs/ipfs-cluster/api"
|
||||
|
||||
cid "github.com/ipfs/go-cid"
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
peer "github.com/libp2p/go-libp2p-core/peer"
|
||||
rpc "github.com/libp2p/go-libp2p-gorpc"
|
||||
)
|
||||
|
||||
var logger = logging.Logger("descendalloc")
|
||||
|
||||
// DescendAllocator extends the SimpleAllocator
|
||||
type DescendAllocator struct{}
|
||||
|
||||
|
|
|
@ -278,7 +278,7 @@ func (cfg *Config) applyJSONConfig(jcfg *jsonConfig) error {
|
|||
cfg.MaxHeaderBytes = jcfg.MaxHeaderBytes
|
||||
}
|
||||
|
||||
if extra := jcfg.ExtractHeadersExtra; extra != nil && len(extra) > 0 {
|
||||
if extra := jcfg.ExtractHeadersExtra; len(extra) > 0 {
|
||||
cfg.ExtractHeadersExtra = extra
|
||||
}
|
||||
config.SetIfNotDefault(jcfg.ExtractHeadersPath, &cfg.ExtractHeadersPath)
|
||||
|
|
|
@ -340,7 +340,6 @@ func ipfsErrorResponder(w http.ResponseWriter, errMsg string, code int) {
|
|||
w.WriteHeader(http.StatusInternalServerError)
|
||||
}
|
||||
w.Write(resBytes)
|
||||
return
|
||||
}
|
||||
|
||||
func (proxy *Server) pinOpHandler(op string, w http.ResponseWriter, r *http.Request) {
|
||||
|
@ -373,7 +372,6 @@ func (proxy *Server) pinOpHandler(op string, w http.ResponseWriter, r *http.Requ
|
|||
resBytes, _ := json.Marshal(res)
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write(resBytes)
|
||||
return
|
||||
}
|
||||
|
||||
func (proxy *Server) pinHandler(w http.ResponseWriter, r *http.Request) {
|
||||
|
@ -529,7 +527,6 @@ func (proxy *Server) pinUpdateHandler(w http.ResponseWriter, r *http.Request) {
|
|||
resBytes, _ := json.Marshal(res)
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write(resBytes)
|
||||
return
|
||||
}
|
||||
|
||||
func (proxy *Server) addHandler(w http.ResponseWriter, r *http.Request) {
|
||||
|
@ -628,8 +625,8 @@ func (proxy *Server) repoStatHandler(w http.ResponseWriter, r *http.Request) {
|
|||
ctxs, cancels := rpcutil.CtxsWithCancel(proxy.ctx, len(peers))
|
||||
defer rpcutil.MultiCancel(cancels)
|
||||
|
||||
repoStats := make([]*api.IPFSRepoStat, len(peers), len(peers))
|
||||
repoStatsIfaces := make([]interface{}, len(repoStats), len(repoStats))
|
||||
repoStats := make([]*api.IPFSRepoStat, len(peers))
|
||||
repoStatsIfaces := make([]interface{}, len(repoStats))
|
||||
for i := range repoStats {
|
||||
repoStats[i] = &api.IPFSRepoStat{}
|
||||
repoStatsIfaces[i] = repoStats[i]
|
||||
|
@ -662,7 +659,6 @@ func (proxy *Server) repoStatHandler(w http.ResponseWriter, r *http.Request) {
|
|||
resBytes, _ := json.Marshal(totalStats)
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write(resBytes)
|
||||
return
|
||||
}
|
||||
|
||||
type ipfsRepoGCResp struct {
|
||||
|
@ -718,8 +714,6 @@ func (proxy *Server) repoGCHandler(w http.ResponseWriter, r *http.Request) {
|
|||
if !streamErrors && mErrStr != "" {
|
||||
w.Header().Set("X-Stream-Error", mErrStr)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// slashHandler returns a handler which converts a /a/b/c/<argument> request
|
||||
|
|
|
@ -7,7 +7,6 @@ import (
|
|||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
@ -641,7 +640,7 @@ func TestProxyAdd(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
reqs := make([]*http.Request, len(testcases), len(testcases))
|
||||
reqs := make([]*http.Request, len(testcases))
|
||||
|
||||
sth := test.NewShardingTestHelper()
|
||||
defer sth.Clean(t)
|
||||
|
@ -732,14 +731,6 @@ func TestIPFSProxy(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func mustParseURL(rawurl string) *url.URL {
|
||||
u, err := url.Parse(rawurl)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return u
|
||||
}
|
||||
|
||||
func TestHeaderExtraction(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
proxy, mock := testIPFSProxy(t)
|
||||
|
|
|
@ -169,7 +169,7 @@ type Config struct {
|
|||
// NewDefaultClient() to create one.
|
||||
type defaultClient struct {
|
||||
ctx context.Context
|
||||
cancel func()
|
||||
cancel context.CancelFunc
|
||||
config *Config
|
||||
transport *http.Transport
|
||||
net string
|
||||
|
@ -180,9 +180,10 @@ type defaultClient struct {
|
|||
|
||||
// NewDefaultClient initializes a client given a Config.
|
||||
func NewDefaultClient(cfg *Config) (Client, error) {
|
||||
ctx := context.Background()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
client := &defaultClient{
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
config: cfg,
|
||||
}
|
||||
|
||||
|
@ -352,7 +353,7 @@ func IsPeerAddress(addr ma.Multiaddr) bool {
|
|||
return false
|
||||
}
|
||||
pid, err := addr.ValueForProtocol(ma.P_P2P)
|
||||
dnsaddr, err2 := addr.ValueForProtocol(madns.DnsaddrProtocol.Code)
|
||||
dnsaddr, err2 := addr.ValueForProtocol(ma.P_DNSADDR)
|
||||
return (pid != "" && err == nil) || (dnsaddr != "" && err2 == nil)
|
||||
}
|
||||
|
||||
|
|
|
@ -23,7 +23,7 @@ func testAPI(t *testing.T) *rest.API {
|
|||
cfg := &rest.Config{}
|
||||
cfg.Default()
|
||||
cfg.HTTPListenAddr = []ma.Multiaddr{apiMAddr}
|
||||
secret := make(pnet.PSK, 32, 32)
|
||||
secret := make(pnet.PSK, 32)
|
||||
|
||||
h, err := libp2p.New(
|
||||
context.Background(),
|
||||
|
@ -58,7 +58,7 @@ func apiMAddr(a *rest.API) ma.Multiaddr {
|
|||
}
|
||||
|
||||
func peerMAddr(a *rest.API) ma.Multiaddr {
|
||||
ipfsAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s", peer.IDB58Encode(a.Host().ID())))
|
||||
ipfsAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s", peer.Encode(a.Host().ID())))
|
||||
for _, a := range a.Host().Addrs() {
|
||||
if _, err := a.ValueForProtocol(ma.P_IP4); err == nil {
|
||||
return a.Encapsulate(ipfsAddr)
|
||||
|
|
|
@ -8,7 +8,7 @@ import (
|
|||
shell "github.com/ipfs/go-ipfs-api"
|
||||
files "github.com/ipfs/go-ipfs-files"
|
||||
"github.com/ipfs/ipfs-cluster/api"
|
||||
peer "github.com/libp2p/go-libp2p-peer"
|
||||
peer "github.com/libp2p/go-libp2p-core/peer"
|
||||
)
|
||||
|
||||
// loadBalancingClient is a client to interact with IPFS Cluster APIs
|
||||
|
|
|
@ -52,7 +52,7 @@ func (c *defaultClient) PeerAdd(ctx context.Context, pid peer.ID) (*api.ID, erro
|
|||
ctx, span := trace.StartSpan(ctx, "client/PeerAdd")
|
||||
defer span.End()
|
||||
|
||||
pidStr := peer.IDB58Encode(pid)
|
||||
pidStr := peer.Encode(pid)
|
||||
body := peerAddBody{pidStr}
|
||||
|
||||
var buf bytes.Buffer
|
||||
|
@ -518,7 +518,7 @@ func (c *defaultClient) Add(
|
|||
ctx, span := trace.StartSpan(ctx, "client/Add")
|
||||
defer span.End()
|
||||
|
||||
addFiles := make([]files.DirEntry, len(paths), len(paths))
|
||||
addFiles := make([]files.DirEntry, len(paths))
|
||||
for i, p := range paths {
|
||||
u, err := url.Parse(p)
|
||||
if err != nil {
|
||||
|
|
|
@ -6,10 +6,9 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/ipfs/ipfs-cluster/api"
|
||||
types "github.com/ipfs/ipfs-cluster/api"
|
||||
"github.com/ipfs/ipfs-cluster/api/rest"
|
||||
"github.com/ipfs/ipfs-cluster/test"
|
||||
rest "github.com/ipfs/ipfs-cluster/api/rest"
|
||||
test "github.com/ipfs/ipfs-cluster/test"
|
||||
|
||||
cid "github.com/ipfs/go-cid"
|
||||
peer "github.com/libp2p/go-libp2p-core/peer"
|
||||
|
@ -90,6 +89,7 @@ func TestPeersWithError(t *testing.T) {
|
|||
|
||||
testF := func(t *testing.T, c Client) {
|
||||
addr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/44444")
|
||||
var _ = c
|
||||
c, _ = NewDefaultClient(&Config{APIAddr: addr, DisableKeepAlives: true})
|
||||
ids, err := c.Peers(ctx)
|
||||
if err == nil {
|
||||
|
@ -367,7 +367,7 @@ func TestStatusAll(t *testing.T) {
|
|||
t.Error("there should be two pins")
|
||||
}
|
||||
|
||||
pins, err = c.StatusAll(ctx, 1<<25, false)
|
||||
_, err = c.StatusAll(ctx, 1<<25, false)
|
||||
if err == nil {
|
||||
t.Error("expected an error")
|
||||
}
|
||||
|
@ -476,7 +476,7 @@ type waitService struct {
|
|||
pinStart time.Time
|
||||
}
|
||||
|
||||
func (wait *waitService) Pin(ctx context.Context, in *api.Pin, out *api.Pin) error {
|
||||
func (wait *waitService) Pin(ctx context.Context, in *types.Pin, out *types.Pin) error {
|
||||
wait.l.Lock()
|
||||
defer wait.l.Unlock()
|
||||
wait.pinStart = time.Now()
|
||||
|
@ -484,41 +484,41 @@ func (wait *waitService) Pin(ctx context.Context, in *api.Pin, out *api.Pin) err
|
|||
return nil
|
||||
}
|
||||
|
||||
func (wait *waitService) Status(ctx context.Context, in cid.Cid, out *api.GlobalPinInfo) error {
|
||||
func (wait *waitService) Status(ctx context.Context, in cid.Cid, out *types.GlobalPinInfo) error {
|
||||
wait.l.Lock()
|
||||
defer wait.l.Unlock()
|
||||
if time.Now().After(wait.pinStart.Add(5 * time.Second)) { //pinned
|
||||
*out = api.GlobalPinInfo{
|
||||
*out = types.GlobalPinInfo{
|
||||
Cid: in,
|
||||
PeerMap: map[string]*api.PinInfo{
|
||||
peer.IDB58Encode(test.PeerID1): {
|
||||
PeerMap: map[string]*types.PinInfo{
|
||||
peer.Encode(test.PeerID1): {
|
||||
Cid: in,
|
||||
Peer: test.PeerID1,
|
||||
Status: api.TrackerStatusPinned,
|
||||
Status: types.TrackerStatusPinned,
|
||||
TS: wait.pinStart,
|
||||
},
|
||||
peer.IDB58Encode(test.PeerID2): {
|
||||
peer.Encode(test.PeerID2): {
|
||||
Cid: in,
|
||||
Peer: test.PeerID2,
|
||||
Status: api.TrackerStatusPinned,
|
||||
Status: types.TrackerStatusPinned,
|
||||
TS: wait.pinStart,
|
||||
},
|
||||
},
|
||||
}
|
||||
} else { // pinning
|
||||
*out = api.GlobalPinInfo{
|
||||
*out = types.GlobalPinInfo{
|
||||
Cid: in,
|
||||
PeerMap: map[string]*api.PinInfo{
|
||||
peer.IDB58Encode(test.PeerID1): {
|
||||
PeerMap: map[string]*types.PinInfo{
|
||||
peer.Encode(test.PeerID1): {
|
||||
Cid: in,
|
||||
Peer: test.PeerID1,
|
||||
Status: api.TrackerStatusPinning,
|
||||
Status: types.TrackerStatusPinning,
|
||||
TS: wait.pinStart,
|
||||
},
|
||||
peer.IDB58Encode(test.PeerID2): {
|
||||
peer.Encode(test.PeerID2): {
|
||||
Cid: in,
|
||||
Peer: test.PeerID2,
|
||||
Status: api.TrackerStatusPinned,
|
||||
Status: types.TrackerStatusPinned,
|
||||
TS: wait.pinStart,
|
||||
},
|
||||
},
|
||||
|
@ -554,21 +554,23 @@ func TestWaitFor(t *testing.T) {
|
|||
fp := StatusFilterParams{
|
||||
Cid: test.Cid1,
|
||||
Local: false,
|
||||
Target: api.TrackerStatusPinned,
|
||||
Target: types.TrackerStatusPinned,
|
||||
CheckFreq: time.Second,
|
||||
}
|
||||
start := time.Now()
|
||||
|
||||
st, err := WaitFor(ctx, c, fp)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
if time.Now().Sub(start) <= 5*time.Second {
|
||||
t.Fatal("slow pin should have taken at least 5 seconds")
|
||||
if time.Since(start) <= 5*time.Second {
|
||||
t.Error("slow pin should have taken at least 5 seconds")
|
||||
return
|
||||
}
|
||||
|
||||
for _, pi := range st.PeerMap {
|
||||
if pi.Status != api.TrackerStatusPinned {
|
||||
if pi.Status != types.TrackerStatusPinned {
|
||||
t.Error("pin info should show the item is pinned")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -73,10 +73,8 @@ func (c *defaultClient) doRequest(
|
|||
r.SetBasicAuth(c.config.Username, c.config.Password)
|
||||
}
|
||||
|
||||
if headers != nil {
|
||||
for k, v := range headers {
|
||||
r.Header.Set(k, v)
|
||||
}
|
||||
for k, v := range headers {
|
||||
r.Header.Set(k, v)
|
||||
}
|
||||
|
||||
if body != nil {
|
||||
|
|
|
@ -80,7 +80,7 @@ func (c *defaultClient) enableLibp2p() error {
|
|||
c.transport.RegisterProtocol("libp2p", p2phttp.NewTransport(h))
|
||||
c.net = "libp2p"
|
||||
c.p2p = h
|
||||
c.hostname = peer.IDB58Encode(pinfo.ID)
|
||||
c.hostname = peer.Encode(pinfo.ID)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -413,7 +413,7 @@ func (cfg *Config) loadLibp2pOptions(jcfg *jsonConfig) error {
|
|||
}
|
||||
|
||||
if jcfg.ID != "" {
|
||||
id, err := peer.IDB58Decode(jcfg.ID)
|
||||
id, err := peer.Decode(jcfg.ID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error parsing restapi.ID: %s", err)
|
||||
}
|
||||
|
@ -473,7 +473,7 @@ func (cfg *Config) toJSONConfig() (jcfg *jsonConfig, err error) {
|
|||
}
|
||||
|
||||
if cfg.ID != "" {
|
||||
jcfg.ID = peer.IDB58Encode(cfg.ID)
|
||||
jcfg.ID = peer.Encode(cfg.ID)
|
||||
}
|
||||
if cfg.PrivateKey != nil {
|
||||
pkeyBytes, err := cfg.PrivateKey.Bytes()
|
||||
|
|
|
@ -203,7 +203,7 @@ func TestLibp2pConfig(t *testing.T) {
|
|||
}
|
||||
defer rest.Shutdown(ctx)
|
||||
|
||||
badPid, _ := peer.IDB58Decode("QmTQ6oKHDwFjzr4ihirVCLJe8CxanxD3ZjGRYzubFuNDjE")
|
||||
badPid, _ := peer.Decode("QmTQ6oKHDwFjzr4ihirVCLJe8CxanxD3ZjGRYzubFuNDjE")
|
||||
cfg.ID = badPid
|
||||
err = cfg.Validate()
|
||||
if err == nil {
|
||||
|
|
|
@ -72,9 +72,6 @@ var (
|
|||
// Used by sendResponse to set the right status
|
||||
const autoStatus = -1
|
||||
|
||||
// For making a random sharding ID
|
||||
var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
|
||||
|
||||
// API implements an API and aims to provides
|
||||
// a RESTful HTTP API for Cluster.
|
||||
type API struct {
|
||||
|
@ -324,7 +321,7 @@ func basicAuthHandler(credentials map[string]string, h http.Handler) http.Handle
|
|||
logger.Error(err)
|
||||
return
|
||||
}
|
||||
http.Error(w, resp, 401)
|
||||
http.Error(w, resp, http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -340,7 +337,7 @@ func basicAuthHandler(credentials map[string]string, h http.Handler) http.Handle
|
|||
logger.Error(err)
|
||||
return
|
||||
}
|
||||
http.Error(w, resp, 401)
|
||||
http.Error(w, resp, http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
h.ServeHTTP(w, r)
|
||||
|
@ -686,8 +683,6 @@ func (api *API) addHandler(w http.ResponseWriter, r *http.Request) {
|
|||
w,
|
||||
nil,
|
||||
)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (api *API) peerListHandler(w http.ResponseWriter, r *http.Request) {
|
||||
|
@ -715,7 +710,7 @@ func (api *API) peerAddHandler(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
pid, err := peer.IDB58Decode(addInfo.PeerID)
|
||||
pid, err := peer.Decode(addInfo.PeerID)
|
||||
if err != nil {
|
||||
api.sendResponse(w, http.StatusBadRequest, errors.New("error decoding peer_id"), nil)
|
||||
return
|
||||
|
@ -1074,7 +1069,7 @@ func (api *API) repoGCHandler(w http.ResponseWriter, r *http.Request) {
|
|||
func repoGCToGlobal(r *types.RepoGC) types.GlobalRepoGC {
|
||||
return types.GlobalRepoGC{
|
||||
PeerMap: map[string]*types.RepoGC{
|
||||
peer.IDB58Encode(r.Peer): r,
|
||||
peer.Encode(r.Peer): r,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
@ -1124,7 +1119,7 @@ func (api *API) parseCidOrError(w http.ResponseWriter, r *http.Request) *types.P
|
|||
func (api *API) parsePidOrError(w http.ResponseWriter, r *http.Request) peer.ID {
|
||||
vars := mux.Vars(r)
|
||||
idStr := vars["peer"]
|
||||
pid, err := peer.IDB58Decode(idStr)
|
||||
pid, err := peer.Decode(idStr)
|
||||
if err != nil {
|
||||
api.sendResponse(w, http.StatusBadRequest, errors.New("error decoding Peer ID: "+err.Error()), nil)
|
||||
return ""
|
||||
|
@ -1136,13 +1131,13 @@ func pinInfoToGlobal(pInfo *types.PinInfo) *types.GlobalPinInfo {
|
|||
return &types.GlobalPinInfo{
|
||||
Cid: pInfo.Cid,
|
||||
PeerMap: map[string]*types.PinInfo{
|
||||
peer.IDB58Encode(pInfo.Peer): pInfo,
|
||||
peer.Encode(pInfo.Peer): pInfo,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func pinInfosToGlobal(pInfos []*types.PinInfo) []*types.GlobalPinInfo {
|
||||
gPInfos := make([]*types.GlobalPinInfo, len(pInfos), len(pInfos))
|
||||
gPInfos := make([]*types.GlobalPinInfo, len(pInfos))
|
||||
for i, p := range pInfos {
|
||||
gPInfos[i] = pinInfoToGlobal(p)
|
||||
}
|
||||
|
|
|
@ -179,7 +179,7 @@ func httpURL(a *API) string {
|
|||
}
|
||||
|
||||
func p2pURL(a *API) string {
|
||||
return fmt.Sprintf("libp2p://%s", peer.IDB58Encode(a.Host().ID()))
|
||||
return fmt.Sprintf("libp2p://%s", peer.Encode(a.Host().ID()))
|
||||
}
|
||||
|
||||
func httpsURL(a *API) string {
|
||||
|
@ -558,10 +558,10 @@ func TestConnectGraphEndpoint(t *testing.T) {
|
|||
// test a few link values
|
||||
pid1 := test.PeerID1
|
||||
pid4 := test.PeerID4
|
||||
if _, ok := cg.ClustertoIPFS[peer.IDB58Encode(pid1)]; !ok {
|
||||
if _, ok := cg.ClustertoIPFS[peer.Encode(pid1)]; !ok {
|
||||
t.Fatal("missing cluster peer 1 from cluster to peer links map")
|
||||
}
|
||||
if cg.ClustertoIPFS[peer.IDB58Encode(pid1)] != pid4 {
|
||||
if cg.ClustertoIPFS[peer.Encode(pid1)] != pid4 {
|
||||
t.Error("unexpected ipfs peer mapped to cluster peer 1 in graph")
|
||||
}
|
||||
}
|
||||
|
@ -860,7 +860,7 @@ func TestAPIStatusAllEndpoint(t *testing.T) {
|
|||
|
||||
if len(resp) != 3 ||
|
||||
!resp[0].Cid.Equals(test.Cid1) ||
|
||||
resp[1].PeerMap[peer.IDB58Encode(test.PeerID1)].Status.String() != "pinning" {
|
||||
resp[1].PeerMap[peer.Encode(test.PeerID1)].Status.String() != "pinning" {
|
||||
t.Errorf("unexpected statusAll resp")
|
||||
}
|
||||
|
||||
|
@ -924,7 +924,7 @@ func TestAPIStatusEndpoint(t *testing.T) {
|
|||
if !resp.Cid.Equals(test.Cid1) {
|
||||
t.Error("expected the same cid")
|
||||
}
|
||||
info, ok := resp.PeerMap[peer.IDB58Encode(test.PeerID1)]
|
||||
info, ok := resp.PeerMap[peer.Encode(test.PeerID1)]
|
||||
if !ok {
|
||||
t.Fatal("expected info for test.PeerID1")
|
||||
}
|
||||
|
@ -939,7 +939,7 @@ func TestAPIStatusEndpoint(t *testing.T) {
|
|||
if !resp2.Cid.Equals(test.Cid1) {
|
||||
t.Error("expected the same cid")
|
||||
}
|
||||
info, ok = resp2.PeerMap[peer.IDB58Encode(test.PeerID2)]
|
||||
info, ok = resp2.PeerMap[peer.Encode(test.PeerID2)]
|
||||
if !ok {
|
||||
t.Fatal("expected info for test.PeerID2")
|
||||
}
|
||||
|
@ -963,7 +963,7 @@ func TestAPIRecoverEndpoint(t *testing.T) {
|
|||
if !resp.Cid.Equals(test.Cid1) {
|
||||
t.Error("expected the same cid")
|
||||
}
|
||||
info, ok := resp.PeerMap[peer.IDB58Encode(test.PeerID1)]
|
||||
info, ok := resp.PeerMap[peer.Encode(test.PeerID1)]
|
||||
if !ok {
|
||||
t.Fatal("expected info for test.PeerID1")
|
||||
}
|
||||
|
|
|
@ -709,7 +709,7 @@ func convertPinType(t PinType) pb.Pin_PinType {
|
|||
|
||||
// ProtoMarshal marshals this Pin using probobuf.
|
||||
func (pin *Pin) ProtoMarshal() ([]byte, error) {
|
||||
allocs := make([][]byte, len(pin.Allocations), len(pin.Allocations))
|
||||
allocs := make([][]byte, len(pin.Allocations))
|
||||
for i, pid := range pin.Allocations {
|
||||
bs, err := pid.Marshal()
|
||||
if err != nil {
|
||||
|
@ -766,7 +766,7 @@ func (pin *Pin) ProtoUnmarshal(data []byte) error {
|
|||
|
||||
pbAllocs := pbPin.GetAllocations()
|
||||
lenAllocs := len(pbAllocs)
|
||||
allocs := make([]peer.ID, lenAllocs, lenAllocs)
|
||||
allocs := make([]peer.ID, lenAllocs)
|
||||
for i, pidb := range pbAllocs {
|
||||
pid, err := peer.IDFromBytes(pidb)
|
||||
if err != nil {
|
||||
|
@ -914,7 +914,7 @@ func (m *Metric) SetTTL(d time.Duration) {
|
|||
// GetTTL returns the time left before the Metric expires
|
||||
func (m *Metric) GetTTL() time.Duration {
|
||||
expDate := time.Unix(0, m.Expire)
|
||||
return expDate.Sub(time.Now())
|
||||
return time.Until(expDate)
|
||||
}
|
||||
|
||||
// Expired returns if the Metric has expired
|
||||
|
|
|
@ -10,26 +10,10 @@ import (
|
|||
|
||||
cid "github.com/ipfs/go-cid"
|
||||
peer "github.com/libp2p/go-libp2p-core/peer"
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
|
||||
"github.com/ugorji/go/codec"
|
||||
)
|
||||
|
||||
var testTime = time.Date(2017, 12, 31, 15, 45, 50, 0, time.UTC)
|
||||
var testMAddr, _ = ma.NewMultiaddr("/ip4/1.2.3.4")
|
||||
var testMAddr2, _ = ma.NewMultiaddr("/dns4/a.b.c.d")
|
||||
var testMAddr3, _ = ma.NewMultiaddr("/ip4/127.0.0.1/tcp/8081/ws/p2p/QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd")
|
||||
var testCid1, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmq")
|
||||
var testCid2, _ = cid.Decode("QmYCLpFCj9Av8NFjkQogvtXspnTDFWaizLpVFEijHTH4eV")
|
||||
var testCid3, _ = cid.Decode("QmZmdA3UZKuHuy9FrWsxJ82q21nbEh97NUnxTzF5EHxZia")
|
||||
var testCid4, _ = cid.Decode("QmZbNfi13Sb2WUDMjiW1ZNhnds5KDk6mJB5hP9B5h9m5CJ")
|
||||
var testPeerID1, _ = peer.IDB58Decode("QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLewuabc")
|
||||
var testPeerID2, _ = peer.IDB58Decode("QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLewuabd")
|
||||
var testPeerID3, _ = peer.IDB58Decode("QmPGDFvBkgWhvzEK9qaTWrWurSwqXNmhnK3hgELPdZZNPa")
|
||||
var testPeerID4, _ = peer.IDB58Decode("QmZ8naDy5mEz4GLuQwjWt9MPYqHTBbsm8tQBrNSjiq6zBc")
|
||||
var testPeerID5, _ = peer.IDB58Decode("QmZVAo3wd8s5eTTy2kPYs34J9PvfxpKPuYsePPYGjgRRjg")
|
||||
var testPeerID6, _ = peer.IDB58Decode("QmR8Vu6kZk7JvAN2rWVWgiduHatgBq2bb15Yyq8RRhYSbx")
|
||||
|
||||
func TestTrackerFromString(t *testing.T) {
|
||||
testcases := []string{"cluster_error", "pin_error", "unpin_error", "pinned", "pinning", "unpinning", "unpinned", "remote"}
|
||||
for i, tc := range testcases {
|
||||
|
@ -231,9 +215,9 @@ func TestPinOptionsQuery(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestIDCodec(t *testing.T) {
|
||||
TestPeerID1, _ := peer.IDB58Decode("QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLewuabc")
|
||||
TestPeerID2, _ := peer.IDB58Decode("QmUZ13osndQ5uL4tPWHXe3iBgBgq9gfewcBMSCAuMBsDJ6")
|
||||
TestPeerID3, _ := peer.IDB58Decode("QmPGDFvBkgWhvzEK9qaTWrWurSwqXNmhnK3hgELPdZZNPa")
|
||||
TestPeerID1, _ := peer.Decode("QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLewuabc")
|
||||
TestPeerID2, _ := peer.Decode("QmUZ13osndQ5uL4tPWHXe3iBgBgq9gfewcBMSCAuMBsDJ6")
|
||||
TestPeerID3, _ := peer.Decode("QmPGDFvBkgWhvzEK9qaTWrWurSwqXNmhnK3hgELPdZZNPa")
|
||||
addr, _ := NewMultiaddr("/ip4/1.2.3.4")
|
||||
id := &ID{
|
||||
ID: TestPeerID1,
|
||||
|
|
|
@ -4,12 +4,12 @@ import (
|
|||
peer "github.com/libp2p/go-libp2p-core/peer"
|
||||
)
|
||||
|
||||
// PeersToStrings IDB58Encodes a list of peers.
|
||||
// PeersToStrings Encodes a list of peers.
|
||||
func PeersToStrings(peers []peer.ID) []string {
|
||||
strs := make([]string, len(peers))
|
||||
for i, p := range peers {
|
||||
if p != "" {
|
||||
strs[i] = peer.IDB58Encode(p)
|
||||
strs[i] = peer.Encode(p)
|
||||
}
|
||||
}
|
||||
return strs
|
||||
|
@ -19,7 +19,7 @@ func PeersToStrings(peers []peer.ID) []string {
|
|||
func StringsToPeers(strs []string) []peer.ID {
|
||||
peers := []peer.ID{}
|
||||
for _, p := range strs {
|
||||
pid, err := peer.IDB58Decode(p)
|
||||
pid, err := peer.Decode(p)
|
||||
if err != nil {
|
||||
logger.Debugf("'%s': %s", p, err)
|
||||
continue
|
||||
|
|
105
cluster.go
105
cluster.go
|
@ -1431,7 +1431,7 @@ func (c *Cluster) Unpin(ctx context.Context, h cid.Cid) (*api.Pin, error) {
|
|||
case api.DataType:
|
||||
return pin, c.consensus.LogUnpin(ctx, pin)
|
||||
case api.ShardType:
|
||||
err := "cannot unpin a shard directly. Unpin content root CID instead."
|
||||
err := "cannot unpin a shard directly. Unpin content root CID instead"
|
||||
return pin, errors.New(err)
|
||||
case api.MetaType:
|
||||
// Unpin cluster dag and referenced shards
|
||||
|
@ -1441,7 +1441,7 @@ func (c *Cluster) Unpin(ctx context.Context, h cid.Cid) (*api.Pin, error) {
|
|||
}
|
||||
return pin, c.consensus.LogUnpin(ctx, pin)
|
||||
case api.ClusterDAGType:
|
||||
err := "cannot unpin a Cluster DAG directly. Unpin content root CID instead."
|
||||
err := "cannot unpin a Cluster DAG directly. Unpin content root CID instead"
|
||||
return pin, errors.New(err)
|
||||
default:
|
||||
return pin, errors.New("unrecognized pin type")
|
||||
|
@ -1569,7 +1569,7 @@ func (c *Cluster) Peers(ctx context.Context) []*api.ID {
|
|||
}
|
||||
lenMembers := len(members)
|
||||
|
||||
peers := make([]*api.ID, lenMembers, lenMembers)
|
||||
peers := make([]*api.ID, lenMembers)
|
||||
|
||||
ctxs, cancels := rpcutil.CtxsWithCancel(ctx, lenMembers)
|
||||
defer rpcutil.MultiCancel(cancels)
|
||||
|
@ -1588,6 +1588,7 @@ func (c *Cluster) Peers(ctx context.Context) []*api.ID {
|
|||
for i, err := range errs {
|
||||
if err == nil {
|
||||
finalPeers = append(finalPeers, peers[i])
|
||||
_ = finalPeers // staticcheck
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -1624,7 +1625,7 @@ func (c *Cluster) getTrustedPeers(ctx context.Context) ([]peer.ID, error) {
|
|||
|
||||
func setTrackerStatus(gpin *api.GlobalPinInfo, h cid.Cid, peers []peer.ID, status api.TrackerStatus, t time.Time) {
|
||||
for _, p := range peers {
|
||||
gpin.PeerMap[peer.IDB58Encode(p)] = &api.PinInfo{
|
||||
gpin.PeerMap[peer.Encode(p)] = &api.PinInfo{
|
||||
Cid: h,
|
||||
Peer: p,
|
||||
PeerName: p.String(),
|
||||
|
@ -1684,7 +1685,7 @@ func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h c
|
|||
setTrackerStatus(gpin, h, remote, api.TrackerStatusRemote, timeNow)
|
||||
|
||||
lenDests := len(dests)
|
||||
replies := make([]*api.PinInfo, lenDests, lenDests)
|
||||
replies := make([]*api.PinInfo, lenDests)
|
||||
ctxs, cancels := rpcutil.CtxsWithCancel(ctx, lenDests)
|
||||
defer rpcutil.MultiCancel(cancels)
|
||||
|
||||
|
@ -1702,7 +1703,7 @@ func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h c
|
|||
|
||||
// No error. Parse and continue
|
||||
if e == nil {
|
||||
gpin.PeerMap[peer.IDB58Encode(dests[i])] = r
|
||||
gpin.PeerMap[peer.Encode(dests[i])] = r
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -1713,7 +1714,7 @@ func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h c
|
|||
|
||||
// Deal with error cases (err != nil): wrap errors in PinInfo
|
||||
logger.Errorf("%s: error in broadcast response from %s: %s ", c.id, dests[i], e)
|
||||
gpin.PeerMap[peer.IDB58Encode(dests[i])] = &api.PinInfo{
|
||||
gpin.PeerMap[peer.Encode(dests[i])] = &api.PinInfo{
|
||||
Cid: h,
|
||||
Peer: dests[i],
|
||||
PeerName: dests[i].String(),
|
||||
|
@ -1746,7 +1747,7 @@ func (c *Cluster) globalPinInfoSlice(ctx context.Context, comp, method string) (
|
|||
}
|
||||
lenMembers := len(members)
|
||||
|
||||
replies := make([][]*api.PinInfo, lenMembers, lenMembers)
|
||||
replies := make([][]*api.PinInfo, lenMembers)
|
||||
|
||||
ctxs, cancels := rpcutil.CtxsWithCancel(ctx, lenMembers)
|
||||
defer rpcutil.MultiCancel(cancels)
|
||||
|
@ -1770,11 +1771,11 @@ func (c *Cluster) globalPinInfoSlice(ctx context.Context, comp, method string) (
|
|||
fullMap[p.Cid] = &api.GlobalPinInfo{
|
||||
Cid: p.Cid,
|
||||
PeerMap: map[string]*api.PinInfo{
|
||||
peer.IDB58Encode(p.Peer): p,
|
||||
peer.Encode(p.Peer): p,
|
||||
},
|
||||
}
|
||||
} else {
|
||||
item.PeerMap[peer.IDB58Encode(p.Peer)] = p
|
||||
item.PeerMap[peer.Encode(p.Peer)] = p
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1796,7 +1797,7 @@ func (c *Cluster) globalPinInfoSlice(ctx context.Context, comp, method string) (
|
|||
// Merge any errors
|
||||
for p, msg := range erroredPeers {
|
||||
for c := range fullMap {
|
||||
fullMap[c].PeerMap[peer.IDB58Encode(p)] = &api.PinInfo{
|
||||
fullMap[c].PeerMap[peer.Encode(p)] = &api.PinInfo{
|
||||
Cid: c,
|
||||
Peer: p,
|
||||
Status: api.TrackerStatusClusterError,
|
||||
|
@ -1887,45 +1888,45 @@ func (c *Cluster) cidsFromMetaPin(ctx context.Context, h cid.Cid) ([]cid.Cid, er
|
|||
return list, nil
|
||||
}
|
||||
|
||||
// diffPeers returns the peerIDs added and removed from peers2 in relation to
|
||||
// peers1
|
||||
func diffPeers(peers1, peers2 []peer.ID) (added, removed []peer.ID) {
|
||||
m1 := make(map[peer.ID]struct{})
|
||||
m2 := make(map[peer.ID]struct{})
|
||||
added = make([]peer.ID, 0)
|
||||
removed = make([]peer.ID, 0)
|
||||
if peers1 == nil && peers2 == nil {
|
||||
return
|
||||
}
|
||||
if peers1 == nil {
|
||||
added = peers2
|
||||
return
|
||||
}
|
||||
if peers2 == nil {
|
||||
removed = peers1
|
||||
return
|
||||
}
|
||||
// // diffPeers returns the peerIDs added and removed from peers2 in relation to
|
||||
// // peers1
|
||||
// func diffPeers(peers1, peers2 []peer.ID) (added, removed []peer.ID) {
|
||||
// m1 := make(map[peer.ID]struct{})
|
||||
// m2 := make(map[peer.ID]struct{})
|
||||
// added = make([]peer.ID, 0)
|
||||
// removed = make([]peer.ID, 0)
|
||||
// if peers1 == nil && peers2 == nil {
|
||||
// return
|
||||
// }
|
||||
// if peers1 == nil {
|
||||
// added = peers2
|
||||
// return
|
||||
// }
|
||||
// if peers2 == nil {
|
||||
// removed = peers1
|
||||
// return
|
||||
// }
|
||||
|
||||
for _, p := range peers1 {
|
||||
m1[p] = struct{}{}
|
||||
}
|
||||
for _, p := range peers2 {
|
||||
m2[p] = struct{}{}
|
||||
}
|
||||
for k := range m1 {
|
||||
_, ok := m2[k]
|
||||
if !ok {
|
||||
removed = append(removed, k)
|
||||
}
|
||||
}
|
||||
for k := range m2 {
|
||||
_, ok := m1[k]
|
||||
if !ok {
|
||||
added = append(added, k)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
// for _, p := range peers1 {
|
||||
// m1[p] = struct{}{}
|
||||
// }
|
||||
// for _, p := range peers2 {
|
||||
// m2[p] = struct{}{}
|
||||
// }
|
||||
// for k := range m1 {
|
||||
// _, ok := m2[k]
|
||||
// if !ok {
|
||||
// removed = append(removed, k)
|
||||
// }
|
||||
// }
|
||||
// for k := range m2 {
|
||||
// _, ok := m1[k]
|
||||
// if !ok {
|
||||
// added = append(added, k)
|
||||
// }
|
||||
// }
|
||||
// return
|
||||
// }
|
||||
|
||||
// RepoGC performs garbage collection sweep on all peers' IPFS repo.
|
||||
func (c *Cluster) RepoGC(ctx context.Context) (*api.GlobalRepoGC, error) {
|
||||
|
@ -1952,7 +1953,7 @@ func (c *Cluster) RepoGC(ctx context.Context) (*api.GlobalRepoGC, error) {
|
|||
&repoGC,
|
||||
)
|
||||
if err == nil {
|
||||
globalRepoGC.PeerMap[peer.IDB58Encode(member)] = &repoGC
|
||||
globalRepoGC.PeerMap[peer.Encode(member)] = &repoGC
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -1963,9 +1964,9 @@ func (c *Cluster) RepoGC(ctx context.Context) (*api.GlobalRepoGC, error) {
|
|||
|
||||
logger.Errorf("%s: error in broadcast response from %s: %s ", c.id, member, err)
|
||||
|
||||
globalRepoGC.PeerMap[peer.IDB58Encode(member)] = &api.RepoGC{
|
||||
globalRepoGC.PeerMap[peer.Encode(member)] = &api.RepoGC{
|
||||
Peer: member,
|
||||
Peername: peer.IDB58Encode(member),
|
||||
Peername: peer.Encode(member),
|
||||
Keys: []api.IPFSRepoGC{},
|
||||
Error: err.Error(),
|
||||
}
|
||||
|
|
|
@ -9,7 +9,6 @@ import (
|
|||
"os"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ipfs/ipfs-cluster/config"
|
||||
|
@ -59,8 +58,6 @@ type ConnMgrConfig struct {
|
|||
// config.ComponentConfig interface.
|
||||
type Config struct {
|
||||
config.Saver
|
||||
lock sync.Mutex
|
||||
peerstoreLock sync.Mutex
|
||||
|
||||
// User-defined peername for use as human-readable identifier.
|
||||
Peername string
|
||||
|
@ -203,7 +200,7 @@ func (cfg *Config) ConfigKey() string {
|
|||
func (cfg *Config) Default() error {
|
||||
cfg.setDefaults()
|
||||
|
||||
clusterSecret := make([]byte, 32, 32)
|
||||
clusterSecret := make([]byte, 32)
|
||||
n, err := rand.Read(clusterSecret)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -38,7 +38,6 @@ func (c *mockComponent) Shutdown(ctx context.Context) error {
|
|||
|
||||
func (c *mockComponent) SetClient(client *rpc.Client) {
|
||||
c.rpcClient = client
|
||||
return
|
||||
}
|
||||
|
||||
type mockAPI struct {
|
||||
|
@ -213,7 +212,7 @@ func cleanState() {
|
|||
os.RemoveAll(testsFolder)
|
||||
}
|
||||
|
||||
func testClusterShutdown(t *testing.T) {
|
||||
func TestClusterShutdown(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
cl, _, _, _ := testingCluster(t)
|
||||
err := cl.Shutdown(ctx)
|
||||
|
|
|
@ -11,10 +11,10 @@ import (
|
|||
libp2p "github.com/libp2p/go-libp2p"
|
||||
relay "github.com/libp2p/go-libp2p-circuit"
|
||||
connmgr "github.com/libp2p/go-libp2p-connmgr"
|
||||
crypto "github.com/libp2p/go-libp2p-core/crypto"
|
||||
host "github.com/libp2p/go-libp2p-core/host"
|
||||
corepnet "github.com/libp2p/go-libp2p-core/pnet"
|
||||
routing "github.com/libp2p/go-libp2p-core/routing"
|
||||
crypto "github.com/libp2p/go-libp2p-crypto"
|
||||
host "github.com/libp2p/go-libp2p-host"
|
||||
dht "github.com/libp2p/go-libp2p-kad-dht"
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
libp2pquic "github.com/libp2p/go-libp2p-quic-transport"
|
||||
|
|
|
@ -22,15 +22,14 @@ type addedOutputQuiet struct {
|
|||
}
|
||||
|
||||
func jsonFormatObject(resp interface{}) {
|
||||
switch resp.(type) {
|
||||
switch r := resp.(type) {
|
||||
case nil:
|
||||
return
|
||||
case []*addedOutputQuiet:
|
||||
// print original objects as in JSON it makes
|
||||
// no sense to have a human "quiet" output
|
||||
serials := resp.([]*addedOutputQuiet)
|
||||
var actual []*api.AddedOutput
|
||||
for _, s := range serials {
|
||||
for _, s := range r {
|
||||
actual = append(actual, s.AddedOutput)
|
||||
}
|
||||
jsonFormatPrint(actual)
|
||||
|
@ -46,55 +45,55 @@ func jsonFormatPrint(obj interface{}) {
|
|||
}
|
||||
|
||||
func textFormatObject(resp interface{}) {
|
||||
switch resp.(type) {
|
||||
switch r := resp.(type) {
|
||||
case nil:
|
||||
return
|
||||
case string:
|
||||
fmt.Println(resp)
|
||||
case *api.ID:
|
||||
textFormatPrintID(resp.(*api.ID))
|
||||
textFormatPrintID(r)
|
||||
case *api.GlobalPinInfo:
|
||||
textFormatPrintGPInfo(resp.(*api.GlobalPinInfo))
|
||||
textFormatPrintGPInfo(r)
|
||||
case *api.Pin:
|
||||
textFormatPrintPin(resp.(*api.Pin))
|
||||
textFormatPrintPin(r)
|
||||
case *api.AddedOutput:
|
||||
textFormatPrintAddedOutput(resp.(*api.AddedOutput))
|
||||
textFormatPrintAddedOutput(r)
|
||||
case *addedOutputQuiet:
|
||||
textFormatPrintAddedOutputQuiet(resp.(*addedOutputQuiet))
|
||||
textFormatPrintAddedOutputQuiet(r)
|
||||
case *api.Version:
|
||||
textFormatPrintVersion(resp.(*api.Version))
|
||||
textFormatPrintVersion(r)
|
||||
case *api.Error:
|
||||
textFormatPrintError(resp.(*api.Error))
|
||||
textFormatPrintError(r)
|
||||
case *api.Metric:
|
||||
textFormatPrintMetric(resp.(*api.Metric))
|
||||
textFormatPrintMetric(r)
|
||||
case []*api.ID:
|
||||
for _, item := range resp.([]*api.ID) {
|
||||
for _, item := range r {
|
||||
textFormatObject(item)
|
||||
}
|
||||
case []*api.GlobalPinInfo:
|
||||
for _, item := range resp.([]*api.GlobalPinInfo) {
|
||||
for _, item := range r {
|
||||
textFormatObject(item)
|
||||
}
|
||||
case []*api.Pin:
|
||||
for _, item := range resp.([]*api.Pin) {
|
||||
for _, item := range r {
|
||||
textFormatObject(item)
|
||||
}
|
||||
case []*api.AddedOutput:
|
||||
for _, item := range resp.([]*api.AddedOutput) {
|
||||
for _, item := range r {
|
||||
textFormatObject(item)
|
||||
}
|
||||
case []*addedOutputQuiet:
|
||||
for _, item := range resp.([]*addedOutputQuiet) {
|
||||
for _, item := range r {
|
||||
textFormatObject(item)
|
||||
}
|
||||
case []*api.Metric:
|
||||
for _, item := range resp.([]*api.Metric) {
|
||||
for _, item := range r {
|
||||
textFormatObject(item)
|
||||
}
|
||||
case *api.GlobalRepoGC:
|
||||
textFormatPrintGlobalRepoGC(resp.(*api.GlobalRepoGC))
|
||||
textFormatPrintGlobalRepoGC(r)
|
||||
case []string:
|
||||
for _, item := range resp.([]string) {
|
||||
for _, item := range r {
|
||||
textFormatObject(item)
|
||||
}
|
||||
default:
|
||||
|
@ -163,16 +162,6 @@ func textFormatPrintGPInfo(obj *api.GlobalPinInfo) {
|
|||
}
|
||||
}
|
||||
|
||||
func textFormatPrintPInfo(obj *api.PinInfo) {
|
||||
gpinfo := api.GlobalPinInfo{
|
||||
Cid: obj.Cid,
|
||||
PeerMap: map[string]*api.PinInfo{
|
||||
peer.IDB58Encode(obj.Peer): obj,
|
||||
},
|
||||
}
|
||||
textFormatPrintGPInfo(&gpinfo)
|
||||
}
|
||||
|
||||
func textFormatPrintVersion(obj *api.Version) {
|
||||
fmt.Println(obj.Version)
|
||||
}
|
||||
|
@ -230,11 +219,11 @@ func textFormatPrintMetric(obj *api.Metric) {
|
|||
if obj.Name == "freespace" {
|
||||
u, err := strconv.ParseUint(obj.Value, 10, 64)
|
||||
checkErr("parsing to uint64", err)
|
||||
fmt.Printf("%s | freespace: %s | Expires in: %s\n", peer.IDB58Encode(obj.Peer), humanize.Bytes(u), humanize.Time(time.Unix(0, obj.Expire)))
|
||||
fmt.Printf("%s | freespace: %s | Expires in: %s\n", peer.Encode(obj.Peer), humanize.Bytes(u), humanize.Time(time.Unix(0, obj.Expire)))
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Printf("%s | %s | Expires in: %s\n", peer.IDB58Encode(obj.Peer), obj.Name, humanize.Time(time.Unix(0, obj.Expire)))
|
||||
fmt.Printf("%s | %s | Expires in: %s\n", peer.Encode(obj.Peer), obj.Name, humanize.Time(time.Unix(0, obj.Expire)))
|
||||
}
|
||||
|
||||
func textFormatPrintGlobalRepoGC(obj *api.GlobalRepoGC) {
|
||||
|
|
|
@ -7,7 +7,7 @@ import (
|
|||
"sort"
|
||||
|
||||
dot "github.com/kishansagathiya/go-dot"
|
||||
peer "github.com/libp2p/go-libp2p-peer"
|
||||
peer "github.com/libp2p/go-libp2p-core/peer"
|
||||
|
||||
"github.com/ipfs/ipfs-cluster/api"
|
||||
)
|
||||
|
@ -38,16 +38,14 @@ const (
|
|||
tIPFSMissing // Missing IPFS node
|
||||
)
|
||||
|
||||
var errUnfinishedWrite = errors.New("could not complete write of line to output")
|
||||
var errUnknownNodeType = errors.New("unsupported node type. Expected cluster or ipfs")
|
||||
var errCorruptOrdering = errors.New("expected pid to have an ordering within dot writer")
|
||||
|
||||
func makeDot(cg *api.ConnectGraph, w io.Writer, allIpfs bool) error {
|
||||
ipfsEdges := make(map[string][]peer.ID)
|
||||
for k, v := range cg.IPFSLinks {
|
||||
ipfsEdges[k] = make([]peer.ID, 0)
|
||||
for _, id := range v {
|
||||
strPid := peer.IDB58Encode(id)
|
||||
strPid := peer.Encode(id)
|
||||
|
||||
if _, ok := cg.IPFSLinks[strPid]; ok || allIpfs {
|
||||
ipfsEdges[k] = append(ipfsEdges[k], id)
|
||||
|
@ -65,7 +63,7 @@ func makeDot(cg *api.ConnectGraph, w io.Writer, allIpfs bool) error {
|
|||
dW := dotWriter{
|
||||
w: w,
|
||||
dotGraph: dot.NewGraph("cluster"),
|
||||
self: peer.IDB58Encode(cg.ClusterID),
|
||||
self: peer.Encode(cg.ClusterID),
|
||||
trustMap: cg.ClusterTrustLinks,
|
||||
idToPeername: cg.IDtoPeername,
|
||||
ipfsEdges: ipfsEdges,
|
||||
|
@ -207,7 +205,7 @@ func (dW *dotWriter) print() error {
|
|||
v := dW.clusterEdges[k]
|
||||
for _, id := range v {
|
||||
toNode := dW.clusterNodes[k]
|
||||
fromNode := dW.clusterNodes[peer.IDB58Encode(id)]
|
||||
fromNode := dW.clusterNodes[peer.Encode(id)]
|
||||
dW.dotGraph.AddEdge(toNode, fromNode, true, "")
|
||||
}
|
||||
}
|
||||
|
@ -229,7 +227,7 @@ func (dW *dotWriter) print() error {
|
|||
continue
|
||||
}
|
||||
|
||||
fromNode, ok = dW.ipfsNodes[peer.IDB58Encode(ipfsID)]
|
||||
fromNode, ok = dW.ipfsNodes[peer.Encode(ipfsID)]
|
||||
if !ok {
|
||||
logger.Error("expected a node at this id")
|
||||
continue
|
||||
|
@ -244,7 +242,7 @@ func (dW *dotWriter) print() error {
|
|||
v := dW.ipfsEdges[k]
|
||||
toNode := dW.ipfsNodes[k]
|
||||
for _, id := range v {
|
||||
idStr := peer.IDB58Encode(id)
|
||||
idStr := peer.Encode(id)
|
||||
fromNode, ok := dW.ipfsNodes[idStr]
|
||||
if !ok {
|
||||
logger.Error("expected a node here")
|
||||
|
@ -257,7 +255,7 @@ func (dW *dotWriter) print() error {
|
|||
}
|
||||
|
||||
func sortedKeys(dict map[string][]peer.ID) []string {
|
||||
keys := make([]string, len(dict), len(dict))
|
||||
keys := make([]string, len(dict))
|
||||
i := 0
|
||||
for k := range dict {
|
||||
keys[i] = k
|
||||
|
|
|
@ -67,53 +67,53 @@ I2 -> I0
|
|||
}`
|
||||
|
||||
var (
|
||||
pid1, _ = peer.IDB58Decode("QmUBuxVHoNNjfmNpTad36UeaFQv3gXAtCv9r6KhmeqhEhD")
|
||||
pid2, _ = peer.IDB58Decode("QmV35LjbEGPfN7KfMAJp43VV2enwXqqQf5esx4vUcgHDQJ")
|
||||
pid3, _ = peer.IDB58Decode("QmZ2ckU7G35MYyJgMTwMUnicsGqSy3YUxGBX7qny6MQmJu")
|
||||
pid4, _ = peer.IDB58Decode("QmXbiVZd93SLiu9TAm21F2y9JwGiFLydbEVkPBaMR3DZDV")
|
||||
pid5, _ = peer.IDB58Decode("QmPFKAGZbUjdzt8BBx8VTWCe9UeUQVcoqHFehSPzN5LSsq")
|
||||
pid6, _ = peer.IDB58Decode("QmbU7273zH6jxwDe2nqRmEm2rp5PpqP2xeQr2xCmwbBsuL")
|
||||
pid1, _ = peer.Decode("QmUBuxVHoNNjfmNpTad36UeaFQv3gXAtCv9r6KhmeqhEhD")
|
||||
pid2, _ = peer.Decode("QmV35LjbEGPfN7KfMAJp43VV2enwXqqQf5esx4vUcgHDQJ")
|
||||
pid3, _ = peer.Decode("QmZ2ckU7G35MYyJgMTwMUnicsGqSy3YUxGBX7qny6MQmJu")
|
||||
pid4, _ = peer.Decode("QmXbiVZd93SLiu9TAm21F2y9JwGiFLydbEVkPBaMR3DZDV")
|
||||
pid5, _ = peer.Decode("QmPFKAGZbUjdzt8BBx8VTWCe9UeUQVcoqHFehSPzN5LSsq")
|
||||
pid6, _ = peer.Decode("QmbU7273zH6jxwDe2nqRmEm2rp5PpqP2xeQr2xCmwbBsuL")
|
||||
|
||||
pid7, _ = peer.IDB58Decode("QmQsdAdCHs4PRLi5tcoLfasYppryqQENxgAy4b2aS8xccb")
|
||||
pid8, _ = peer.IDB58Decode("QmVV2enwXqqQf5esx4v36UeaFQvFehSPzNfi8aaaaaanM8")
|
||||
pid9, _ = peer.IDB58Decode("QmfCHNQ2vbUmAuJZhE2hEpgiJq4sL1XScWEKnUrVtWZdeD")
|
||||
pid7, _ = peer.Decode("QmQsdAdCHs4PRLi5tcoLfasYppryqQENxgAy4b2aS8xccb")
|
||||
pid8, _ = peer.Decode("QmVV2enwXqqQf5esx4v36UeaFQvFehSPzNfi8aaaaaanM8")
|
||||
pid9, _ = peer.Decode("QmfCHNQ2vbUmAuJZhE2hEpgiJq4sL1XScWEKnUrVtWZdeD")
|
||||
)
|
||||
|
||||
func TestSimpleIpfsGraphs(t *testing.T) {
|
||||
cg := api.ConnectGraph{
|
||||
ClusterID: pid1,
|
||||
ClusterLinks: map[string][]peer.ID{
|
||||
peer.IDB58Encode(pid1): []peer.ID{
|
||||
peer.Encode(pid1): []peer.ID{
|
||||
pid2,
|
||||
pid3,
|
||||
},
|
||||
peer.IDB58Encode(pid2): []peer.ID{
|
||||
peer.Encode(pid2): []peer.ID{
|
||||
pid1,
|
||||
pid3,
|
||||
},
|
||||
peer.IDB58Encode(pid3): []peer.ID{
|
||||
peer.Encode(pid3): []peer.ID{
|
||||
pid1,
|
||||
pid2,
|
||||
},
|
||||
},
|
||||
IPFSLinks: map[string][]peer.ID{
|
||||
peer.IDB58Encode(pid4): []peer.ID{
|
||||
peer.Encode(pid4): []peer.ID{
|
||||
pid5,
|
||||
pid6,
|
||||
},
|
||||
peer.IDB58Encode(pid5): []peer.ID{
|
||||
peer.Encode(pid5): []peer.ID{
|
||||
pid4,
|
||||
pid6,
|
||||
},
|
||||
peer.IDB58Encode(pid6): []peer.ID{
|
||||
peer.Encode(pid6): []peer.ID{
|
||||
pid4,
|
||||
pid5,
|
||||
},
|
||||
},
|
||||
ClustertoIPFS: map[string]peer.ID{
|
||||
peer.IDB58Encode(pid1): pid4,
|
||||
peer.IDB58Encode(pid2): pid5,
|
||||
peer.IDB58Encode(pid3): pid6,
|
||||
peer.Encode(pid1): pid4,
|
||||
peer.Encode(pid2): pid5,
|
||||
peer.Encode(pid3): pid6,
|
||||
},
|
||||
}
|
||||
buf := new(bytes.Buffer)
|
||||
|
@ -181,35 +181,35 @@ func TestIpfsAllGraphs(t *testing.T) {
|
|||
cg := api.ConnectGraph{
|
||||
ClusterID: pid1,
|
||||
ClusterLinks: map[string][]peer.ID{
|
||||
peer.IDB58Encode(pid1): []peer.ID{
|
||||
peer.Encode(pid1): []peer.ID{
|
||||
pid2,
|
||||
pid3,
|
||||
},
|
||||
peer.IDB58Encode(pid2): []peer.ID{
|
||||
peer.Encode(pid2): []peer.ID{
|
||||
pid1,
|
||||
pid3,
|
||||
},
|
||||
peer.IDB58Encode(pid3): []peer.ID{
|
||||
peer.Encode(pid3): []peer.ID{
|
||||
pid1,
|
||||
pid2,
|
||||
},
|
||||
},
|
||||
IPFSLinks: map[string][]peer.ID{
|
||||
peer.IDB58Encode(pid4): []peer.ID{
|
||||
peer.Encode(pid4): []peer.ID{
|
||||
pid5,
|
||||
pid6,
|
||||
pid7,
|
||||
pid8,
|
||||
pid9,
|
||||
},
|
||||
peer.IDB58Encode(pid5): []peer.ID{
|
||||
peer.Encode(pid5): []peer.ID{
|
||||
pid4,
|
||||
pid6,
|
||||
pid7,
|
||||
pid8,
|
||||
pid9,
|
||||
},
|
||||
peer.IDB58Encode(pid6): []peer.ID{
|
||||
peer.Encode(pid6): []peer.ID{
|
||||
pid4,
|
||||
pid5,
|
||||
pid7,
|
||||
|
@ -218,9 +218,9 @@ func TestIpfsAllGraphs(t *testing.T) {
|
|||
},
|
||||
},
|
||||
ClustertoIPFS: map[string]peer.ID{
|
||||
peer.IDB58Encode(pid1): pid4,
|
||||
peer.IDB58Encode(pid2): pid5,
|
||||
peer.IDB58Encode(pid3): pid6,
|
||||
peer.Encode(pid1): pid4,
|
||||
peer.Encode(pid2): pid5,
|
||||
peer.Encode(pid3): pid6,
|
||||
},
|
||||
}
|
||||
|
||||
|
|
|
@ -20,7 +20,6 @@ import (
|
|||
peer "github.com/libp2p/go-libp2p-core/peer"
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
|
||||
"contrib.go.opencensus.io/exporter/jaeger"
|
||||
uuid "github.com/google/uuid"
|
||||
cli "github.com/urfave/cli"
|
||||
)
|
||||
|
@ -34,16 +33,12 @@ const Version = "0.12.1"
|
|||
var (
|
||||
defaultHost = "/ip4/127.0.0.1/tcp/9094"
|
||||
defaultTimeout = 0
|
||||
defaultUsername = ""
|
||||
defaultPassword = ""
|
||||
defaultWaitCheckFreq = time.Second
|
||||
defaultAddParams = api.DefaultAddParams()
|
||||
)
|
||||
|
||||
var logger = logging.Logger("cluster-ctl")
|
||||
|
||||
var tracer *jaeger.Exporter
|
||||
|
||||
var globalClient client.Client
|
||||
|
||||
// Description provides a short summary of the functionality of this tool
|
||||
|
@ -70,9 +65,9 @@ https://github.com/ipfs/ipfs-cluster.
|
|||
programName,
|
||||
defaultHost)
|
||||
|
||||
type peerAddBody struct {
|
||||
Addr string `json:"peer_multiaddress"`
|
||||
}
|
||||
// type peerAddBody struct {
|
||||
// Addr string `json:"peer_multiaddress"`
|
||||
// }
|
||||
|
||||
func out(m string, a ...interface{}) {
|
||||
fmt.Fprintf(os.Stderr, m, a...)
|
||||
|
@ -245,7 +240,7 @@ cluster peers.
|
|||
Flags: []cli.Flag{},
|
||||
Action: func(c *cli.Context) error {
|
||||
pid := c.Args().First()
|
||||
p, err := peer.IDB58Decode(pid)
|
||||
p, err := peer.Decode(pid)
|
||||
checkErr("parsing peer ID", err)
|
||||
cerr := globalClient.PeerRm(ctx, p)
|
||||
formatResponse(c, nil, cerr)
|
||||
|
@ -405,7 +400,7 @@ content.
|
|||
}
|
||||
|
||||
// Read arguments (paths)
|
||||
paths := make([]string, c.NArg(), c.NArg())
|
||||
paths := make([]string, c.NArg())
|
||||
for i, path := range c.Args() {
|
||||
paths[i] = path
|
||||
}
|
||||
|
@ -979,14 +974,6 @@ deamon, otherwise on all IPFS daemons.
|
|||
app.Run(os.Args)
|
||||
}
|
||||
|
||||
func parseFlag(t int) cli.IntFlag {
|
||||
return cli.IntFlag{
|
||||
Name: "parseAs",
|
||||
Value: t,
|
||||
Hidden: true,
|
||||
}
|
||||
}
|
||||
|
||||
func localFlag() cli.BoolFlag {
|
||||
return cli.BoolFlag{
|
||||
Name: "local",
|
||||
|
|
|
@ -16,7 +16,7 @@ import (
|
|||
"github.com/ipfs/go-datastore"
|
||||
ipfscluster "github.com/ipfs/ipfs-cluster"
|
||||
ipfshttp "github.com/ipfs/ipfs-cluster/ipfsconn/ipfshttp"
|
||||
host "github.com/libp2p/go-libp2p-host"
|
||||
host "github.com/libp2p/go-libp2p-core/host"
|
||||
dht "github.com/libp2p/go-libp2p-kad-dht"
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
"github.com/pkg/errors"
|
||||
|
|
|
@ -7,8 +7,8 @@ import (
|
|||
"fmt"
|
||||
"io/ioutil"
|
||||
|
||||
crypto "github.com/libp2p/go-libp2p-core/crypto"
|
||||
peer "github.com/libp2p/go-libp2p-core/peer"
|
||||
crypto "github.com/libp2p/go-libp2p-crypto"
|
||||
|
||||
"github.com/kelseyhightower/envconfig"
|
||||
)
|
||||
|
@ -122,7 +122,7 @@ func (ident *Identity) LoadJSON(raw []byte) error {
|
|||
}
|
||||
|
||||
func (ident *Identity) applyIdentityJSON(jID *identityJSON) error {
|
||||
pid, err := peer.IDB58Decode(jID.ID)
|
||||
pid, err := peer.Decode(jID.ID)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("error decoding cluster ID: %s", err)
|
||||
return err
|
||||
|
|
|
@ -30,10 +30,10 @@ func (c *Cluster) ConnectGraph() (api.ConnectGraph, error) {
|
|||
|
||||
for _, member := range members {
|
||||
// one of the entries is for itself, but that shouldn't hurt
|
||||
cg.ClusterTrustLinks[peer.IDB58Encode(member)] = c.consensus.IsTrustedPeer(ctx, member)
|
||||
cg.ClusterTrustLinks[peer.Encode(member)] = c.consensus.IsTrustedPeer(ctx, member)
|
||||
}
|
||||
|
||||
peers := make([][]*api.ID, len(members), len(members))
|
||||
peers := make([][]*api.ID, len(members))
|
||||
|
||||
ctxs, cancels := rpcutil.CtxsWithCancel(ctx, len(members))
|
||||
defer rpcutil.MultiCancel(cancels)
|
||||
|
@ -48,7 +48,7 @@ func (c *Cluster) ConnectGraph() (api.ConnectGraph, error) {
|
|||
)
|
||||
|
||||
for i, err := range errs {
|
||||
p := peer.IDB58Encode(members[i])
|
||||
p := peer.Encode(members[i])
|
||||
cg.ClusterLinks[p] = make([]peer.ID, 0)
|
||||
if err != nil { // Only setting cluster connections when no error occurs
|
||||
logger.Debugf("RPC error reaching cluster peer %s: %s", p, err.Error())
|
||||
|
@ -76,7 +76,7 @@ func (c *Cluster) recordClusterLinks(cg *api.ConnectGraph, p string, peers []*ap
|
|||
logger.Debugf("Peer %s errored connecting to its peer %s", p, id.ID.Pretty())
|
||||
continue
|
||||
}
|
||||
if peer.IDB58Encode(id.ID) == p {
|
||||
if peer.Encode(id.ID) == p {
|
||||
selfConnection = true
|
||||
pID = id
|
||||
} else {
|
||||
|
@ -93,8 +93,8 @@ func (c *Cluster) recordIPFSLinks(cg *api.ConnectGraph, pID *api.ID) {
|
|||
return
|
||||
}
|
||||
|
||||
pid := peer.IDB58Encode(pID.ID)
|
||||
ipfsPid := peer.IDB58Encode(ipfsID)
|
||||
pid := peer.Encode(pID.ID)
|
||||
ipfsPid := peer.Encode(ipfsID)
|
||||
|
||||
if _, ok := cg.IPFSLinks[pid]; ok {
|
||||
logger.Warnf("ipfs id: %s already recorded, one ipfs daemon in use by multiple cluster peers", ipfsID.Pretty())
|
||||
|
|
|
@ -115,7 +115,7 @@ func (cfg *Config) applyJSONConfig(jcfg *jsonConfig) error {
|
|||
cfg.TrustedPeers = []peer.ID{}
|
||||
break
|
||||
}
|
||||
pid, err := peer.IDB58Decode(p)
|
||||
pid, err := peer.Decode(p)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error parsing trusted peers: %s", err)
|
||||
}
|
||||
|
|
|
@ -39,6 +39,10 @@ func TestLoadJSON(t *testing.T) {
|
|||
"cluster_name": "test",
|
||||
"trusted_peers": []
|
||||
}`))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if cfg.TrustAll {
|
||||
t.Error("TrustAll is only enabled with '*'")
|
||||
}
|
||||
|
|
|
@ -323,7 +323,7 @@ func (css *Consensus) Ready(ctx context.Context) <-chan struct{} {
|
|||
// IsTrustedPeer returns whether the given peer is taken into account
|
||||
// when submitting updates to the consensus state.
|
||||
func (css *Consensus) IsTrustedPeer(ctx context.Context, pid peer.ID) bool {
|
||||
ctx, span := trace.StartSpan(ctx, "consensus/IsTrustedPeer")
|
||||
_, span := trace.StartSpan(ctx, "consensus/IsTrustedPeer")
|
||||
defer span.End()
|
||||
|
||||
if css.config.TrustAll {
|
||||
|
@ -343,7 +343,7 @@ func (css *Consensus) IsTrustedPeer(ctx context.Context, pid peer.ID) bool {
|
|||
// has the highest priority when the peerstore is saved, and it's addresses
|
||||
// are always remembered.
|
||||
func (css *Consensus) Trust(ctx context.Context, pid peer.ID) error {
|
||||
ctx, span := trace.StartSpan(ctx, "consensus/Trust")
|
||||
_, span := trace.StartSpan(ctx, "consensus/Trust")
|
||||
defer span.End()
|
||||
|
||||
css.trustedPeers.Store(pid, struct{}{})
|
||||
|
@ -358,7 +358,7 @@ func (css *Consensus) Trust(ctx context.Context, pid peer.ID) error {
|
|||
|
||||
// Distrust removes a peer from the "trusted" set.
|
||||
func (css *Consensus) Distrust(ctx context.Context, pid peer.ID) error {
|
||||
ctx, span := trace.StartSpan(ctx, "consensus/Distrust")
|
||||
_, span := trace.StartSpan(ctx, "consensus/Distrust")
|
||||
defer span.End()
|
||||
|
||||
css.trustedPeers.Delete(pid)
|
||||
|
@ -500,8 +500,7 @@ func OfflineState(cfg *Config, store ds.Datastore) (state.BatchingState, error)
|
|||
opts := crdt.DefaultOptions()
|
||||
opts.Logger = logger
|
||||
|
||||
var blocksDatastore ds.Batching
|
||||
blocksDatastore = namespace.Wrap(
|
||||
var blocksDatastore ds.Batching = namespace.Wrap(
|
||||
batching,
|
||||
ds.NewKey(cfg.DatastoreNamespace).ChildString(blocksNs),
|
||||
)
|
||||
|
|
|
@ -21,7 +21,6 @@ import (
|
|||
peer "github.com/libp2p/go-libp2p-core/peer"
|
||||
rpc "github.com/libp2p/go-libp2p-gorpc"
|
||||
libp2praft "github.com/libp2p/go-libp2p-raft"
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
|
||||
"go.opencensus.io/tag"
|
||||
"go.opencensus.io/trace"
|
||||
|
@ -223,7 +222,7 @@ func (cc *Consensus) SetClient(c *rpc.Client) {
|
|||
// Ready returns a channel which is signaled when the Consensus
|
||||
// algorithm has finished bootstrapping and is ready to use
|
||||
func (cc *Consensus) Ready(ctx context.Context) <-chan struct{} {
|
||||
ctx, span := trace.StartSpan(ctx, "consensus/Ready")
|
||||
_, span := trace.StartSpan(ctx, "consensus/Ready")
|
||||
defer span.End()
|
||||
|
||||
return cc.readyCh
|
||||
|
@ -276,7 +275,7 @@ func (cc *Consensus) redirectToLeader(method string, arg interface{}) (bool, err
|
|||
if err != nil {
|
||||
return false, fmt.Errorf("timed out waiting for leader: %s", err)
|
||||
}
|
||||
leader, err = peer.IDB58Decode(pidstr)
|
||||
leader, err = peer.Decode(pidstr)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
@ -409,7 +408,7 @@ func (cc *Consensus) AddPeer(ctx context.Context, pid peer.ID) error {
|
|||
}
|
||||
// Being here means we are the leader and can commit
|
||||
cc.shutdownLock.RLock() // do not shutdown while committing
|
||||
finalErr = cc.raft.AddPeer(ctx, peer.IDB58Encode(pid))
|
||||
finalErr = cc.raft.AddPeer(ctx, peer.Encode(pid))
|
||||
|
||||
cc.shutdownLock.RUnlock()
|
||||
if finalErr != nil {
|
||||
|
@ -440,7 +439,7 @@ func (cc *Consensus) RmPeer(ctx context.Context, pid peer.ID) error {
|
|||
}
|
||||
// Being here means we are the leader and can commit
|
||||
cc.shutdownLock.RLock() // do not shutdown while committing
|
||||
finalErr = cc.raft.RemovePeer(ctx, peer.IDB58Encode(pid))
|
||||
finalErr = cc.raft.RemovePeer(ctx, peer.Encode(pid))
|
||||
cc.shutdownLock.RUnlock()
|
||||
if finalErr != nil {
|
||||
time.Sleep(cc.config.CommitRetryDelay)
|
||||
|
@ -458,7 +457,7 @@ func (cc *Consensus) RmPeer(ctx context.Context, pid peer.ID) error {
|
|||
// writes to the shared state should happen through the Consensus component
|
||||
// methods.
|
||||
func (cc *Consensus) State(ctx context.Context) (state.ReadOnly, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "consensus/State")
|
||||
_, span := trace.StartSpan(ctx, "consensus/State")
|
||||
defer span.End()
|
||||
|
||||
st, err := cc.consensus.GetLogHead()
|
||||
|
@ -479,7 +478,7 @@ func (cc *Consensus) State(ctx context.Context) (state.ReadOnly, error) {
|
|||
// Leader returns the peerID of the Leader of the
|
||||
// cluster. It returns an error when there is no leader.
|
||||
func (cc *Consensus) Leader(ctx context.Context) (peer.ID, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "consensus/Leader")
|
||||
_, span := trace.StartSpan(ctx, "consensus/Leader")
|
||||
defer span.End()
|
||||
|
||||
// Note the hard-dependency on raft here...
|
||||
|
@ -489,7 +488,7 @@ func (cc *Consensus) Leader(ctx context.Context) (peer.ID, error) {
|
|||
|
||||
// Clean removes the Raft persisted state.
|
||||
func (cc *Consensus) Clean(ctx context.Context) error {
|
||||
ctx, span := trace.StartSpan(ctx, "consensus/Clean")
|
||||
_, span := trace.StartSpan(ctx, "consensus/Clean")
|
||||
defer span.End()
|
||||
|
||||
cc.shutdownLock.RLock()
|
||||
|
@ -532,7 +531,7 @@ func (cc *Consensus) Peers(ctx context.Context) ([]peer.ID, error) {
|
|||
sort.Strings(raftPeers)
|
||||
|
||||
for _, p := range raftPeers {
|
||||
id, err := peer.IDB58Decode(p)
|
||||
id, err := peer.Decode(p)
|
||||
if err != nil {
|
||||
panic("could not decode peer")
|
||||
}
|
||||
|
@ -541,14 +540,6 @@ func (cc *Consensus) Peers(ctx context.Context) ([]peer.ID, error) {
|
|||
return peers, nil
|
||||
}
|
||||
|
||||
func parsePIDFromMultiaddr(addr ma.Multiaddr) string {
|
||||
pidstr, err := addr.ValueForProtocol(ma.P_P2P)
|
||||
if err != nil {
|
||||
panic("peer badly encoded")
|
||||
}
|
||||
return pidstr
|
||||
}
|
||||
|
||||
// OfflineState state returns a cluster state by reading the Raft data and
|
||||
// writing it to the given datastore which is then wrapped as a state.State.
|
||||
// Usually an in-memory datastore suffices. The given datastore should be
|
||||
|
|
|
@ -20,10 +20,6 @@ import (
|
|||
"go.opencensus.io/trace"
|
||||
)
|
||||
|
||||
// errBadRaftState is returned when the consensus component cannot start
|
||||
// because the cluster peers do not match the raft peers.
|
||||
var errBadRaftState = errors.New("cluster peers do not match raft peers")
|
||||
|
||||
// ErrWaitingForSelf is returned when we are waiting for ourselves to depart
|
||||
// the peer set, which won't happen
|
||||
var errWaitingForSelf = errors.New("waiting for ourselves to depart")
|
||||
|
@ -78,7 +74,7 @@ func newRaftWrapper(
|
|||
raftW.host = host
|
||||
raftW.staging = staging
|
||||
// Set correct LocalID
|
||||
cfg.RaftConfig.LocalID = hraft.ServerID(peer.IDB58Encode(host.ID()))
|
||||
cfg.RaftConfig.LocalID = hraft.ServerID(peer.Encode(host.ID()))
|
||||
|
||||
df := cfg.GetDataFolder()
|
||||
err := makeDataFolder(df)
|
||||
|
@ -235,7 +231,7 @@ func makeServerConf(peers []peer.ID) hraft.Configuration {
|
|||
|
||||
// Servers are peers + self. We avoid duplicate entries below
|
||||
for _, pid := range peers {
|
||||
p := peer.IDB58Encode(pid)
|
||||
p := peer.Encode(pid)
|
||||
_, ok := sm[p]
|
||||
if !ok { // avoid dups
|
||||
sm[p] = struct{}{}
|
||||
|
@ -277,7 +273,7 @@ func (rw *raftWrapper) WaitForVoter(ctx context.Context) error {
|
|||
|
||||
logger.Debug("waiting until we are promoted to a voter")
|
||||
|
||||
pid := hraft.ServerID(peer.IDB58Encode(rw.host.ID()))
|
||||
pid := hraft.ServerID(peer.Encode(rw.host.ID()))
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
@ -388,26 +384,21 @@ func (rw *raftWrapper) Snapshot() error {
|
|||
func (rw *raftWrapper) snapshotOnShutdown() error {
|
||||
var err error
|
||||
for i := 0; i < maxShutdownSnapshotRetries; i++ {
|
||||
done := false
|
||||
ctx, cancel := context.WithTimeout(context.Background(), waitForUpdatesShutdownTimeout)
|
||||
err := rw.WaitForUpdates(ctx)
|
||||
err = rw.WaitForUpdates(ctx)
|
||||
cancel()
|
||||
if err != nil {
|
||||
logger.Warn("timed out waiting for state updates before shutdown. Snapshotting may fail")
|
||||
done = true // let's not wait for updates again
|
||||
return rw.Snapshot()
|
||||
}
|
||||
|
||||
err = rw.Snapshot()
|
||||
if err != nil {
|
||||
err = errors.New("could not snapshot raft: " + err.Error())
|
||||
} else {
|
||||
err = nil
|
||||
done = true
|
||||
if err == nil {
|
||||
return nil // things worked
|
||||
}
|
||||
|
||||
if done {
|
||||
break
|
||||
}
|
||||
// There was an error
|
||||
err = errors.New("could not snapshot raft: " + err.Error())
|
||||
logger.Warnf("retrying to snapshot (%d/%d)...", i+1, maxShutdownSnapshotRetries)
|
||||
}
|
||||
return err
|
||||
|
@ -415,7 +406,7 @@ func (rw *raftWrapper) snapshotOnShutdown() error {
|
|||
|
||||
// Shutdown shutdown Raft and closes the BoltDB.
|
||||
func (rw *raftWrapper) Shutdown(ctx context.Context) error {
|
||||
ctx, span := trace.StartSpan(ctx, "consensus/raft/Shutdown")
|
||||
_, span := trace.StartSpan(ctx, "consensus/raft/Shutdown")
|
||||
defer span.End()
|
||||
|
||||
errMsgs := ""
|
||||
|
@ -511,14 +502,14 @@ func (rw *raftWrapper) RemovePeer(ctx context.Context, peer string) error {
|
|||
// Leader returns Raft's leader. It may be an empty string if
|
||||
// there is no leader or it is unknown.
|
||||
func (rw *raftWrapper) Leader(ctx context.Context) string {
|
||||
ctx, span := trace.StartSpan(ctx, "consensus/raft/Leader")
|
||||
_, span := trace.StartSpan(ctx, "consensus/raft/Leader")
|
||||
defer span.End()
|
||||
|
||||
return string(rw.raft.Leader())
|
||||
}
|
||||
|
||||
func (rw *raftWrapper) Peers(ctx context.Context) ([]string, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "consensus/raft/Peers")
|
||||
_, span := trace.StartSpan(ctx, "consensus/raft/Peers")
|
||||
defer span.End()
|
||||
|
||||
ids := make([]string, 0)
|
||||
|
@ -594,8 +585,7 @@ func SnapshotSave(cfg *Config, newState state.State, pids []peer.ID) error {
|
|||
}
|
||||
|
||||
// make a new raft snapshot
|
||||
var raftSnapVersion hraft.SnapshotVersion
|
||||
raftSnapVersion = 1 // As of hraft v1.0.0 this is always 1
|
||||
var raftSnapVersion hraft.SnapshotVersion = 1 // As of hraft v1.0.0 this is always 1
|
||||
configIndex := uint64(1)
|
||||
var raftIndex uint64
|
||||
var raftTerm uint64
|
||||
|
@ -692,7 +682,7 @@ func (rw *raftWrapper) observePeers() {
|
|||
case obs := <-obsCh:
|
||||
pObs := obs.Data.(hraft.PeerObservation)
|
||||
logger.Info("raft peer departed. Removing from peerstore: ", pObs.Peer.ID)
|
||||
pID, err := peer.IDB58Decode(string(pObs.Peer.ID))
|
||||
pID, err := peer.Decode(string(pObs.Peer.ID))
|
||||
if err != nil {
|
||||
logger.Error(err)
|
||||
continue
|
||||
|
|
|
@ -59,7 +59,7 @@ func (disk *Informer) SetClient(c *rpc.Client) {
|
|||
// Shutdown is called on cluster shutdown. We just invalidate
|
||||
// any metrics from this point.
|
||||
func (disk *Informer) Shutdown(ctx context.Context) error {
|
||||
ctx, span := trace.StartSpan(ctx, "informer/disk/Shutdown")
|
||||
_, span := trace.StartSpan(ctx, "informer/disk/Shutdown")
|
||||
defer span.End()
|
||||
|
||||
disk.rpcClient = nil
|
||||
|
|
|
@ -44,7 +44,7 @@ func (npi *Informer) SetClient(c *rpc.Client) {
|
|||
// Shutdown is called on cluster shutdown. We just invalidate
|
||||
// any metrics from this point.
|
||||
func (npi *Informer) Shutdown(ctx context.Context) error {
|
||||
ctx, span := trace.StartSpan(ctx, "informer/numpin/Shutdown")
|
||||
_, span := trace.StartSpan(ctx, "informer/numpin/Shutdown")
|
||||
defer span.End()
|
||||
|
||||
npi.rpcClient = nil
|
||||
|
|
|
@ -132,7 +132,7 @@ func TestMain(m *testing.M) {
|
|||
}
|
||||
|
||||
func randomBytes() []byte {
|
||||
bs := make([]byte, 64, 64)
|
||||
bs := make([]byte, 64)
|
||||
for i := 0; i < len(bs); i++ {
|
||||
b := byte(rand.Int())
|
||||
bs[i] = b
|
||||
|
@ -283,9 +283,9 @@ func createOnePeerCluster(t *testing.T, nth int, clusterSecret []byte) (*Cluster
|
|||
}
|
||||
|
||||
func createHosts(t *testing.T, clusterSecret []byte, nClusters int) ([]host.Host, []*pubsub.PubSub, []*dht.IpfsDHT) {
|
||||
hosts := make([]host.Host, nClusters, nClusters)
|
||||
pubsubs := make([]*pubsub.PubSub, nClusters, nClusters)
|
||||
dhts := make([]*dht.IpfsDHT, nClusters, nClusters)
|
||||
hosts := make([]host.Host, nClusters)
|
||||
pubsubs := make([]*pubsub.PubSub, nClusters)
|
||||
dhts := make([]*dht.IpfsDHT, nClusters)
|
||||
|
||||
tcpaddr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/0")
|
||||
quicAddr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/udp/0/quic")
|
||||
|
@ -337,19 +337,19 @@ func newTestDHT(ctx context.Context, h host.Host) (*dht.IpfsDHT, error) {
|
|||
func createClusters(t *testing.T) ([]*Cluster, []*test.IpfsMock) {
|
||||
ctx := context.Background()
|
||||
os.RemoveAll(testsFolder)
|
||||
cfgs := make([]*Config, nClusters, nClusters)
|
||||
stores := make([]ds.Datastore, nClusters, nClusters)
|
||||
cons := make([]Consensus, nClusters, nClusters)
|
||||
apis := make([][]API, nClusters, nClusters)
|
||||
ipfss := make([]IPFSConnector, nClusters, nClusters)
|
||||
trackers := make([]PinTracker, nClusters, nClusters)
|
||||
mons := make([]PeerMonitor, nClusters, nClusters)
|
||||
allocs := make([]PinAllocator, nClusters, nClusters)
|
||||
infs := make([]Informer, nClusters, nClusters)
|
||||
tracers := make([]Tracer, nClusters, nClusters)
|
||||
ipfsMocks := make([]*test.IpfsMock, nClusters, nClusters)
|
||||
cfgs := make([]*Config, nClusters)
|
||||
stores := make([]ds.Datastore, nClusters)
|
||||
cons := make([]Consensus, nClusters)
|
||||
apis := make([][]API, nClusters)
|
||||
ipfss := make([]IPFSConnector, nClusters)
|
||||
trackers := make([]PinTracker, nClusters)
|
||||
mons := make([]PeerMonitor, nClusters)
|
||||
allocs := make([]PinAllocator, nClusters)
|
||||
infs := make([]Informer, nClusters)
|
||||
tracers := make([]Tracer, nClusters)
|
||||
ipfsMocks := make([]*test.IpfsMock, nClusters)
|
||||
|
||||
clusters := make([]*Cluster, nClusters, nClusters)
|
||||
clusters := make([]*Cluster, nClusters)
|
||||
|
||||
// Uncomment when testing with fixed ports
|
||||
// clusterPeers := make([]ma.Multiaddr, nClusters, nClusters)
|
||||
|
@ -602,7 +602,7 @@ func TestClustersPeersRetainOrder(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if bytes.Compare(peers1, peers2) != 0 {
|
||||
if !bytes.Equal(peers1, peers2) {
|
||||
t.Error("expected both results to be same")
|
||||
}
|
||||
}
|
||||
|
@ -704,10 +704,10 @@ func TestClustersPinUpdate(t *testing.T) {
|
|||
|
||||
ttlDelay()
|
||||
|
||||
h, err := prefix.Sum(randomBytes()) // create random cid
|
||||
h2, err := prefix.Sum(randomBytes()) // create random cid
|
||||
h, _ := prefix.Sum(randomBytes()) // create random cid
|
||||
h2, _ := prefix.Sum(randomBytes()) // create random cid
|
||||
|
||||
_, err = clusters[0].PinUpdate(ctx, h, h2, api.PinOptions{})
|
||||
_, err := clusters[0].PinUpdate(ctx, h, h2, api.PinOptions{})
|
||||
if err == nil || err != state.ErrNotFound {
|
||||
t.Fatal("pin update should fail when from is not pinned")
|
||||
}
|
||||
|
@ -778,7 +778,7 @@ func TestClustersStatusAll(t *testing.T) {
|
|||
t.Error("bad info in status")
|
||||
}
|
||||
|
||||
pid := peer.IDB58Encode(c.host.ID())
|
||||
pid := peer.Encode(c.host.ID())
|
||||
if info[pid].Status != api.TrackerStatusPinned {
|
||||
t.Error("the hash should have been pinned")
|
||||
}
|
||||
|
@ -839,7 +839,7 @@ func TestClustersStatusAllWithErrors(t *testing.T) {
|
|||
t.Error("bad number of peers in status")
|
||||
}
|
||||
|
||||
pid := peer.IDB58Encode(clusters[1].id)
|
||||
pid := peer.Encode(clusters[1].id)
|
||||
errst := stts.PeerMap[pid]
|
||||
|
||||
if !errst.Cid.Equals(h) {
|
||||
|
@ -896,20 +896,20 @@ func TestClustersRecoverLocal(t *testing.T) {
|
|||
pinDelay()
|
||||
|
||||
f := func(t *testing.T, c *Cluster) {
|
||||
info, err := c.RecoverLocal(ctx, h)
|
||||
_, err := c.RecoverLocal(ctx, h)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// Wait for queue to be processed
|
||||
delay()
|
||||
|
||||
info = c.StatusLocal(ctx, h)
|
||||
info := c.StatusLocal(ctx, h)
|
||||
if info.Status != api.TrackerStatusPinError {
|
||||
t.Errorf("element is %s and not PinError", info.Status)
|
||||
}
|
||||
|
||||
// Recover good ID
|
||||
info, err = c.RecoverLocal(ctx, h2)
|
||||
info, _ = c.RecoverLocal(ctx, h2)
|
||||
if info.Status != api.TrackerStatusPinned {
|
||||
t.Error("element should be in Pinned state")
|
||||
}
|
||||
|
@ -951,7 +951,7 @@ func TestClustersRecover(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
pinfo, ok := ginfo.PeerMap[peer.IDB58Encode(clusters[j].host.ID())]
|
||||
pinfo, ok := ginfo.PeerMap[peer.Encode(clusters[j].host.ID())]
|
||||
if !ok {
|
||||
t.Fatal("should have info for this host")
|
||||
}
|
||||
|
@ -960,7 +960,7 @@ func TestClustersRecover(t *testing.T) {
|
|||
}
|
||||
|
||||
for _, c := range clusters {
|
||||
inf, ok := ginfo.PeerMap[peer.IDB58Encode(c.host.ID())]
|
||||
inf, ok := ginfo.PeerMap[peer.Encode(c.host.ID())]
|
||||
if !ok {
|
||||
t.Fatal("GlobalPinInfo should not be empty for this host")
|
||||
}
|
||||
|
@ -985,7 +985,7 @@ func TestClustersRecover(t *testing.T) {
|
|||
}
|
||||
|
||||
for _, c := range clusters {
|
||||
inf, ok := ginfo.PeerMap[peer.IDB58Encode(c.host.ID())]
|
||||
inf, ok := ginfo.PeerMap[peer.Encode(c.host.ID())]
|
||||
if !ok {
|
||||
t.Fatal("GlobalPinInfo should have this cluster")
|
||||
}
|
||||
|
@ -1334,7 +1334,7 @@ func TestClustersReplicationFactorMin(t *testing.T) {
|
|||
t.Error("Pin should have failed as rplMin cannot be satisfied")
|
||||
}
|
||||
t.Log(err)
|
||||
if !strings.Contains(err.Error(), fmt.Sprintf("not enough peers to allocate CID")) {
|
||||
if !strings.Contains(err.Error(), "not enough peers to allocate CID") {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
@ -1672,7 +1672,7 @@ func TestClustersRebalanceOnPeerDown(t *testing.T) {
|
|||
|
||||
// kill the local pinner
|
||||
for _, c := range clusters {
|
||||
clid := peer.IDB58Encode(c.id)
|
||||
clid := peer.Encode(c.id)
|
||||
if clid == localPinner {
|
||||
c.Shutdown(ctx)
|
||||
} else if clid == remotePinner {
|
||||
|
@ -1709,7 +1709,7 @@ func validateClusterGraph(t *testing.T, graph api.ConnectGraph, clusterIDs map[s
|
|||
// Make lookup index for peers connected to id1
|
||||
peerIndex := make(map[string]struct{})
|
||||
for _, p := range peers {
|
||||
peerIndex[peer.IDB58Encode(p)] = struct{}{}
|
||||
peerIndex[peer.Encode(p)] = struct{}{}
|
||||
}
|
||||
for id2 := range clusterIDs {
|
||||
if _, ok := peerIndex[id2]; id1 != id2 && !ok {
|
||||
|
@ -1736,7 +1736,7 @@ func validateClusterGraph(t *testing.T, graph api.ConnectGraph, clusterIDs map[s
|
|||
if len(graph.IPFSLinks) != 1 {
|
||||
t.Error("Expected exactly one ipfs peer for all cluster nodes, the mocked peer")
|
||||
}
|
||||
links, ok := graph.IPFSLinks[peer.IDB58Encode(test.PeerID1)]
|
||||
links, ok := graph.IPFSLinks[peer.Encode(test.PeerID1)]
|
||||
if !ok {
|
||||
t.Error("Expected the mocked ipfs peer to be a node in the graph")
|
||||
} else {
|
||||
|
@ -1778,7 +1778,7 @@ func TestClustersGraphConnected(t *testing.T) {
|
|||
|
||||
clusterIDs := make(map[string]struct{})
|
||||
for _, c := range clusters {
|
||||
id := peer.IDB58Encode(c.ID(ctx).ID)
|
||||
id := peer.Encode(c.ID(ctx).ID)
|
||||
clusterIDs[id] = struct{}{}
|
||||
}
|
||||
validateClusterGraph(t, graph, clusterIDs, nClusters)
|
||||
|
@ -1827,7 +1827,7 @@ func TestClustersGraphUnhealthy(t *testing.T) {
|
|||
if i == discon1 || i == discon2 {
|
||||
continue
|
||||
}
|
||||
id := peer.IDB58Encode(c.ID(ctx).ID)
|
||||
id := peer.Encode(c.ID(ctx).ID)
|
||||
clusterIDs[id] = struct{}{}
|
||||
}
|
||||
peerNum := nClusters
|
||||
|
|
|
@ -45,10 +45,6 @@ var logger = logging.Logger("ipfshttp")
|
|||
// only the 10th will trigger a SendInformerMetrics call.
|
||||
var updateMetricMod = 10
|
||||
|
||||
// progressTick sets how often we check progress when doing refs and pins
|
||||
// requests.
|
||||
var progressTick = 5 * time.Second
|
||||
|
||||
// Connector implements the IPFSConnector interface
|
||||
// and provides a component which is used to perform
|
||||
// on-demand requests against the configured IPFS daemom
|
||||
|
@ -99,11 +95,6 @@ type ipfsRepoGCResp struct {
|
|||
Error string
|
||||
}
|
||||
|
||||
type ipfsRefsResp struct {
|
||||
Ref string
|
||||
Err string
|
||||
}
|
||||
|
||||
type ipfsPinsResp struct {
|
||||
Pins []string
|
||||
Progress int
|
||||
|
@ -122,10 +113,6 @@ type ipfsPeer struct {
|
|||
Peer string
|
||||
}
|
||||
|
||||
type ipfsStream struct {
|
||||
Protocol string
|
||||
}
|
||||
|
||||
// NewConnector creates the component and leaves it ready to be started
|
||||
func NewConnector(cfg *Config) (*Connector, error) {
|
||||
err := cfg.Validate()
|
||||
|
@ -222,7 +209,7 @@ func (ipfs *Connector) SetClient(c *rpc.Client) {
|
|||
// Shutdown stops any listeners and stops the component from taking
|
||||
// any requests.
|
||||
func (ipfs *Connector) Shutdown(ctx context.Context) error {
|
||||
ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/Shutdown")
|
||||
_, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/Shutdown")
|
||||
defer span.End()
|
||||
|
||||
ipfs.shutdownLock.Lock()
|
||||
|
@ -266,7 +253,7 @@ func (ipfs *Connector) ID(ctx context.Context) (*api.IPFSID, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
pID, err := peer.IDB58Decode(res.ID)
|
||||
pID, err := peer.Decode(res.ID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -275,7 +262,7 @@ func (ipfs *Connector) ID(ctx context.Context) (*api.IPFSID, error) {
|
|||
ID: pID,
|
||||
}
|
||||
|
||||
mAddrs := make([]api.Multiaddr, len(res.Addresses), len(res.Addresses))
|
||||
mAddrs := make([]api.Multiaddr, len(res.Addresses))
|
||||
for i, strAddr := range res.Addresses {
|
||||
mAddr, err := api.NewMultiaddr(strAddr)
|
||||
if err != nil {
|
||||
|
@ -628,24 +615,6 @@ func (ipfs *Connector) postCtx(ctx context.Context, path string, contentType str
|
|||
return body, nil
|
||||
}
|
||||
|
||||
// postDiscardBodyCtx makes a POST requests but discards the body
|
||||
// of the response directly after reading it.
|
||||
func (ipfs *Connector) postDiscardBodyCtx(ctx context.Context, path string) error {
|
||||
res, err := ipfs.doPostCtx(ctx, ipfs.client, ipfs.apiURL(), path, "", nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer res.Body.Close()
|
||||
|
||||
_, err = checkResponse(path, res)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = io.Copy(ioutil.Discard, res.Body)
|
||||
return err
|
||||
}
|
||||
|
||||
// apiURL is a short-hand for building the url of the IPFS
|
||||
// daemon API.
|
||||
func (ipfs *Connector) apiURL() string {
|
||||
|
@ -736,9 +705,8 @@ func getConfigValue(path []string, cfg map[string]interface{}) (interface{}, err
|
|||
return value, nil
|
||||
}
|
||||
|
||||
switch value.(type) {
|
||||
switch v := value.(type) {
|
||||
case map[string]interface{}:
|
||||
v := value.(map[string]interface{})
|
||||
return getConfigValue(path[1:], v)
|
||||
default:
|
||||
return nil, errors.New("invalid path")
|
||||
|
@ -865,7 +833,7 @@ func (ipfs *Connector) SwarmPeers(ctx context.Context) ([]peer.ID, error) {
|
|||
|
||||
swarm := make([]peer.ID, len(peersRaw.Peers))
|
||||
for i, p := range peersRaw.Peers {
|
||||
pID, err := peer.IDB58Decode(p.Peer)
|
||||
pID, err := peer.Decode(p.Peer)
|
||||
if err != nil {
|
||||
logger.Error(err)
|
||||
return swarm, err
|
||||
|
|
|
@ -63,7 +63,7 @@ func TestIPFSID(t *testing.T) {
|
|||
t.Error("expected no error")
|
||||
}
|
||||
mock.Close()
|
||||
id, err = ipfs.ID(ctx)
|
||||
_, err = ipfs.ID(ctx)
|
||||
if err == nil {
|
||||
t.Error("expected an error")
|
||||
}
|
||||
|
@ -392,6 +392,9 @@ func TestConfigKey(t *testing.T) {
|
|||
}
|
||||
|
||||
v, err = ipfs.ConfigKey("Datastore")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
_, ok = v.(map[string]interface{})
|
||||
if !ok {
|
||||
t.Error("should have returned the whole Datastore config object")
|
||||
|
|
|
@ -38,8 +38,6 @@ type Checker struct {
|
|||
metrics *Store
|
||||
threshold float64
|
||||
|
||||
alertThreshold int
|
||||
|
||||
failedPeersMu sync.Mutex
|
||||
failedPeers map[peer.ID]map[string]int
|
||||
}
|
||||
|
@ -93,20 +91,6 @@ func (mc *Checker) CheckAll() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (mc *Checker) alertIfExpired(metric *api.Metric) error {
|
||||
if !metric.Expired() {
|
||||
return nil
|
||||
}
|
||||
|
||||
err := mc.alert(metric.Peer, metric.Name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
metric.Valid = false
|
||||
mc.metrics.Add(metric) // invalidate so we don't alert again
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mc *Checker) alert(pid peer.ID, metricName string) error {
|
||||
mc.failedPeersMu.Lock()
|
||||
defer mc.failedPeersMu.Unlock()
|
||||
|
|
|
@ -308,7 +308,7 @@ func Benchmark_prob_meanStdDev(b *testing.B) {
|
|||
|
||||
func makeRandSlice(size int) []float64 {
|
||||
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
s := make([]float64, size, size)
|
||||
s := make([]float64, size)
|
||||
|
||||
for i := 0; i < size-1; i++ {
|
||||
s[i] = float64(r.Int63n(25)) + r.Float64()
|
||||
|
|
|
@ -9,12 +9,9 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
"github.com/ipfs/ipfs-cluster/api"
|
||||
)
|
||||
|
||||
var logger = logging.Logger("metricwin")
|
||||
|
||||
// DefaultWindowCap sets the amount of metrics to store per peer.
|
||||
var DefaultWindowCap = 25
|
||||
|
||||
|
|
|
@ -47,7 +47,6 @@ func TestWindow_Race(t *testing.T) {
|
|||
i++
|
||||
case <-done:
|
||||
return
|
||||
default:
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
@ -64,7 +63,6 @@ func TestWindow_Race(t *testing.T) {
|
|||
// log <- fmt.Sprintf("latest: %v", l)
|
||||
case <-done:
|
||||
return
|
||||
default:
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
@ -80,7 +78,6 @@ func TestWindow_Race(t *testing.T) {
|
|||
// log <- fmt.Sprintf("all: %v", w.All())
|
||||
case <-done:
|
||||
return
|
||||
default:
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
@ -95,23 +92,17 @@ func TestWindow_Race(t *testing.T) {
|
|||
log <- fmt.Sprintf("dist: %v", w.Distribution())
|
||||
case <-done:
|
||||
return
|
||||
default:
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
<-start
|
||||
for {
|
||||
select {
|
||||
case <-done:
|
||||
for s := range log {
|
||||
fmt.Println(s)
|
||||
}
|
||||
close(done)
|
||||
return
|
||||
}
|
||||
<-done
|
||||
for s := range log {
|
||||
fmt.Println(s)
|
||||
}
|
||||
close(done)
|
||||
}()
|
||||
|
||||
close(start)
|
||||
|
|
|
@ -36,6 +36,7 @@ type Monitor struct {
|
|||
rpcReady chan struct{}
|
||||
|
||||
pubsub *pubsub.PubSub
|
||||
topic *pubsub.Topic
|
||||
subscription *pubsub.Subscription
|
||||
peers PeersFunc
|
||||
|
||||
|
@ -72,7 +73,12 @@ func New(
|
|||
mtrs := metrics.NewStore()
|
||||
checker := metrics.NewChecker(ctx, mtrs, cfg.FailureThreshold)
|
||||
|
||||
subscription, err := psub.Subscribe(PubsubTopic)
|
||||
topic, err := psub.Join(PubsubTopic)
|
||||
if err != nil {
|
||||
cancel()
|
||||
return nil, err
|
||||
}
|
||||
subscription, err := topic.Subscribe()
|
||||
if err != nil {
|
||||
cancel()
|
||||
return nil, err
|
||||
|
@ -84,6 +90,7 @@ func New(
|
|||
rpcReady: make(chan struct{}, 1),
|
||||
|
||||
pubsub: psub,
|
||||
topic: topic,
|
||||
subscription: subscription,
|
||||
peers: peers,
|
||||
|
||||
|
@ -153,7 +160,7 @@ func (mon *Monitor) SetClient(c *rpc.Client) {
|
|||
// Shutdown stops the peer monitor. It particular, it will
|
||||
// not deliver any alerts.
|
||||
func (mon *Monitor) Shutdown(ctx context.Context) error {
|
||||
ctx, span := trace.StartSpan(ctx, "monitor/pubsub/Shutdown")
|
||||
_, span := trace.StartSpan(ctx, "monitor/pubsub/Shutdown")
|
||||
defer span.End()
|
||||
|
||||
mon.shutdownLock.Lock()
|
||||
|
@ -176,7 +183,7 @@ func (mon *Monitor) Shutdown(ctx context.Context) error {
|
|||
|
||||
// LogMetric stores a metric so it can later be retrieved.
|
||||
func (mon *Monitor) LogMetric(ctx context.Context, m *api.Metric) error {
|
||||
ctx, span := trace.StartSpan(ctx, "monitor/pubsub/LogMetric")
|
||||
_, span := trace.StartSpan(ctx, "monitor/pubsub/LogMetric")
|
||||
defer span.End()
|
||||
|
||||
mon.metrics.Add(m)
|
||||
|
@ -209,7 +216,7 @@ func (mon *Monitor) PublishMetric(ctx context.Context, m *api.Metric) error {
|
|||
m.Expire,
|
||||
)
|
||||
|
||||
err = mon.pubsub.Publish(PubsubTopic, b.Bytes())
|
||||
err = mon.topic.Publish(ctx, b.Bytes())
|
||||
if err != nil {
|
||||
logger.Error(err)
|
||||
return err
|
||||
|
@ -248,7 +255,7 @@ func (mon *Monitor) Alerts() <-chan *api.Alert {
|
|||
|
||||
// MetricNames lists all metric names.
|
||||
func (mon *Monitor) MetricNames(ctx context.Context) []string {
|
||||
ctx, span := trace.StartSpan(ctx, "monitor/pubsub/MetricNames")
|
||||
_, span := trace.StartSpan(ctx, "monitor/pubsub/MetricNames")
|
||||
defer span.End()
|
||||
|
||||
return mon.metrics.MetricNames()
|
||||
|
|
|
@ -12,8 +12,8 @@ var logger = logging.Logger("observations")
|
|||
|
||||
var (
|
||||
// taken from ocgrpc (https://github.com/census-instrumentation/opencensus-go/blob/master/plugin/ocgrpc/stats_common.go)
|
||||
latencyDistribution = view.Distribution(0, 0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, 800, 1000, 2000, 5000, 10000, 20000, 50000, 100000)
|
||||
bytesDistribution = view.Distribution(0, 24, 32, 64, 128, 256, 512, 1024, 2048, 4096, 16384, 65536, 262144, 1048576)
|
||||
// latencyDistribution = view.Distribution(0, 0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, 800, 1000, 2000, 5000, 10000, 20000, 50000, 100000)
|
||||
// bytesDistribution = view.Distribution(0, 24, 32, 64, 128, 256, 512, 1024, 2048, 4096, 16384, 65536, 262144, 1048576)
|
||||
messageCountDistribution = view.Distribution(1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536)
|
||||
)
|
||||
|
||||
|
|
|
@ -19,8 +19,8 @@ import (
|
|||
)
|
||||
|
||||
func peerManagerClusters(t *testing.T) ([]*Cluster, []*test.IpfsMock, host.Host) {
|
||||
cls := make([]*Cluster, nClusters, nClusters)
|
||||
mocks := make([]*test.IpfsMock, nClusters, nClusters)
|
||||
cls := make([]*Cluster, nClusters)
|
||||
mocks := make([]*test.IpfsMock, nClusters)
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < nClusters; i++ {
|
||||
wg.Add(1)
|
||||
|
@ -67,7 +67,7 @@ func peerManagerClusters(t *testing.T) ([]*Cluster, []*test.IpfsMock, host.Host)
|
|||
func clusterAddr(c *Cluster) ma.Multiaddr {
|
||||
for _, a := range c.host.Addrs() {
|
||||
if _, err := a.ValueForProtocol(ma.P_IP4); err == nil {
|
||||
p := peer.IDB58Encode(c.id)
|
||||
p := peer.Encode(c.id)
|
||||
cAddr, _ := ma.NewMultiaddr(fmt.Sprintf("%s/p2p/%s", a, p))
|
||||
return cAddr
|
||||
}
|
||||
|
@ -196,7 +196,7 @@ func TestClustersPeerAddInUnhealthyCluster(t *testing.T) {
|
|||
t.Skip("need at least 3 nodes for this test")
|
||||
}
|
||||
|
||||
_, err := clusters[0].PeerAdd(ctx, clusters[1].id)
|
||||
clusters[0].PeerAdd(ctx, clusters[1].id)
|
||||
ttlDelay()
|
||||
ids := clusters[1].Peers(ctx)
|
||||
// raft will have only 2 peers
|
||||
|
@ -207,7 +207,7 @@ func TestClustersPeerAddInUnhealthyCluster(t *testing.T) {
|
|||
|
||||
// Now we shutdown the one member of the running cluster
|
||||
// and try to add someone else.
|
||||
err = clusters[1].Shutdown(ctx)
|
||||
err := clusters[1].Shutdown(ctx)
|
||||
if err != nil {
|
||||
t.Error("Shutdown should be clean: ", err)
|
||||
}
|
||||
|
@ -398,7 +398,6 @@ func TestClustersPeerRemoveReallocsPins(t *testing.T) {
|
|||
|
||||
ctx := context.Background()
|
||||
clusters, mocks := createClusters(t)
|
||||
defer shutdownClusters(t, clusters, mocks)
|
||||
|
||||
if len(clusters) < 3 {
|
||||
t.Skip("test needs at least 3 clusters")
|
||||
|
@ -428,6 +427,7 @@ func TestClustersPeerRemoveReallocsPins(t *testing.T) {
|
|||
}
|
||||
}
|
||||
if chosen == nil {
|
||||
shutdownClusters(t, clusters, mocks)
|
||||
t.Fatal("did not get to choose a peer?")
|
||||
}
|
||||
|
||||
|
@ -438,6 +438,7 @@ func TestClustersPeerRemoveReallocsPins(t *testing.T) {
|
|||
mocks = append(mocks[:chosenIndex], mocks[chosenIndex+1:]...)
|
||||
defer chosen.Shutdown(ctx)
|
||||
defer chosenMock.Close()
|
||||
defer shutdownClusters(t, clusters, mocks)
|
||||
|
||||
prefix := test.Cid1.Prefix()
|
||||
|
||||
|
|
|
@ -504,7 +504,8 @@ func TestTrackUntrackWithCancel(t *testing.T) {
|
|||
go func() {
|
||||
err = tt.args.tracker.Untrack(context.Background(), tt.args.c)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
t.Error()
|
||||
return
|
||||
}
|
||||
}()
|
||||
var ctx context.Context
|
||||
|
|
|
@ -313,7 +313,7 @@ func (spt *Tracker) Status(ctx context.Context, c cid.Cid) *api.PinInfo {
|
|||
|
||||
// check global state to see if cluster should even be caring about
|
||||
// the provided cid
|
||||
gpin := &api.Pin{}
|
||||
var gpin *api.Pin
|
||||
st, err := spt.getState(ctx)
|
||||
if err != nil {
|
||||
logger.Error(err)
|
||||
|
@ -440,7 +440,7 @@ func (spt *Tracker) ipfsStatusAll(ctx context.Context) (map[string]*api.PinInfo,
|
|||
logger.Error(err)
|
||||
return nil, err
|
||||
}
|
||||
pins := make(map[string]*api.PinInfo, 0)
|
||||
pins := make(map[string]*api.PinInfo, len(ipsMap))
|
||||
for cidstr, ips := range ipsMap {
|
||||
c, err := cid.Decode(cidstr)
|
||||
if err != nil {
|
||||
|
@ -468,14 +468,13 @@ func (spt *Tracker) localStatus(ctx context.Context, incExtra bool) (map[string]
|
|||
defer span.End()
|
||||
|
||||
// get shared state
|
||||
statePins := []*api.Pin{}
|
||||
st, err := spt.getState(ctx)
|
||||
if err != nil {
|
||||
logger.Error(err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
statePins, err = st.List(ctx)
|
||||
statePins, err := st.List(ctx)
|
||||
if err != nil {
|
||||
logger.Error(err)
|
||||
return nil, err
|
||||
|
@ -527,9 +526,9 @@ func (spt *Tracker) localStatus(ctx context.Context, incExtra bool) (map[string]
|
|||
return pininfos, nil
|
||||
}
|
||||
|
||||
func (spt *Tracker) getErrorsAll(ctx context.Context) []*api.PinInfo {
|
||||
return spt.optracker.Filter(ctx, optracker.PhaseError)
|
||||
}
|
||||
// func (spt *Tracker) getErrorsAll(ctx context.Context) []*api.PinInfo {
|
||||
// return spt.optracker.Filter(ctx, optracker.PhaseError)
|
||||
// }
|
||||
|
||||
// OpContext exports the internal optracker's OpContext method.
|
||||
// For testing purposes only.
|
||||
|
|
|
@ -3,7 +3,6 @@ package stateless
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sort"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -193,7 +192,8 @@ func TestTrackUntrackWithCancel(t *testing.T) {
|
|||
go func() {
|
||||
err = spt.Untrack(ctx, slowPinCid)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
}()
|
||||
select {
|
||||
|
@ -298,7 +298,8 @@ func TestUntrackTrackWithCancel(t *testing.T) {
|
|||
go func() {
|
||||
err = spt.Track(ctx, slowPin)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
}()
|
||||
select {
|
||||
|
@ -465,12 +466,6 @@ func TestStatus(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
var sortPinInfoByCid = func(p []*api.PinInfo) {
|
||||
sort.Slice(p, func(i, j int) bool {
|
||||
return p[i].Cid.String() < p[j].Cid.String()
|
||||
})
|
||||
}
|
||||
|
||||
func BenchmarkTracker_localStatus(b *testing.B) {
|
||||
tracker := testStatelessPinTracker(b)
|
||||
ctx := context.Background()
|
||||
|
|
|
@ -66,7 +66,7 @@ func (pm *Manager) ImportPeer(addr ma.Multiaddr, connect bool, ttl time.Duration
|
|||
}
|
||||
|
||||
protos := addr.Protocols()
|
||||
if len(protos) > 0 && protos[0].Code == madns.DnsaddrProtocol.Code {
|
||||
if len(protos) > 0 && protos[0].Code == ma.P_DNSADDR {
|
||||
// We need to pre-resolve this
|
||||
logger.Debugf("resolving %s", addr)
|
||||
ctx, cancel := context.WithTimeout(pm.ctx, DNSTimeout)
|
||||
|
|
|
@ -28,7 +28,7 @@ func clean(pm *Manager) {
|
|||
}
|
||||
|
||||
func testAddr(loc string, pid peer.ID) ma.Multiaddr {
|
||||
m, _ := ma.NewMultiaddr(loc + "/p2p/" + peer.IDB58Encode(pid))
|
||||
m, _ := ma.NewMultiaddr(loc + "/p2p/" + peer.Encode(pid))
|
||||
return m
|
||||
}
|
||||
|
||||
|
|
|
@ -352,7 +352,7 @@ func (rpcapi *ClusterRPCAPI) BlockAllocate(ctx context.Context, in *api.Pin, out
|
|||
// Returned metrics are Valid and belong to current
|
||||
// Cluster peers.
|
||||
metrics := rpcapi.c.monitor.LatestMetrics(ctx, pingMetricName)
|
||||
peers := make([]peer.ID, len(metrics), len(metrics))
|
||||
peers := make([]peer.ID, len(metrics))
|
||||
for i, m := range metrics {
|
||||
peers[i] = m.Peer
|
||||
}
|
||||
|
|
|
@ -21,8 +21,8 @@ func CtxsWithTimeout(
|
|||
timeout time.Duration,
|
||||
) ([]context.Context, []context.CancelFunc) {
|
||||
|
||||
ctxs := make([]context.Context, n, n)
|
||||
cancels := make([]context.CancelFunc, n, n)
|
||||
ctxs := make([]context.Context, n)
|
||||
cancels := make([]context.CancelFunc, n)
|
||||
for i := 0; i < n; i++ {
|
||||
ctx, cancel := context.WithTimeout(parent, timeout)
|
||||
ctxs[i] = ctx
|
||||
|
@ -37,8 +37,8 @@ func CtxsWithCancel(
|
|||
n int,
|
||||
) ([]context.Context, []context.CancelFunc) {
|
||||
|
||||
ctxs := make([]context.Context, n, n)
|
||||
cancels := make([]context.CancelFunc, n, n)
|
||||
ctxs := make([]context.Context, n)
|
||||
cancels := make([]context.CancelFunc, n)
|
||||
for i := 0; i < n; i++ {
|
||||
ctx, cancel := context.WithCancel(parent)
|
||||
ctxs[i] = ctx
|
||||
|
@ -61,7 +61,7 @@ func MultiCancel(cancels []context.CancelFunc) {
|
|||
// slice using pointers to each elements of the original slice.
|
||||
// Useful to handle gorpc.MultiCall() replies.
|
||||
func CopyPIDsToIfaces(in []peer.ID) []interface{} {
|
||||
ifaces := make([]interface{}, len(in), len(in))
|
||||
ifaces := make([]interface{}, len(in))
|
||||
for i := range in {
|
||||
ifaces[i] = &in[i]
|
||||
}
|
||||
|
@ -72,7 +72,7 @@ func CopyPIDsToIfaces(in []peer.ID) []interface{} {
|
|||
// slice using pointers to each elements of the original slice.
|
||||
// Useful to handle gorpc.MultiCall() replies.
|
||||
func CopyIDsToIfaces(in []*api.ID) []interface{} {
|
||||
ifaces := make([]interface{}, len(in), len(in))
|
||||
ifaces := make([]interface{}, len(in))
|
||||
for i := range in {
|
||||
in[i] = &api.ID{}
|
||||
ifaces[i] = in[i]
|
||||
|
@ -84,7 +84,7 @@ func CopyIDsToIfaces(in []*api.ID) []interface{} {
|
|||
// to an empty interface slice using pointers to each elements of the
|
||||
// original slice. Useful to handle gorpc.MultiCall() replies.
|
||||
func CopyIDSliceToIfaces(in [][]*api.ID) []interface{} {
|
||||
ifaces := make([]interface{}, len(in), len(in))
|
||||
ifaces := make([]interface{}, len(in))
|
||||
for i := range in {
|
||||
ifaces[i] = &in[i]
|
||||
}
|
||||
|
@ -95,7 +95,7 @@ func CopyIDSliceToIfaces(in [][]*api.ID) []interface{} {
|
|||
// an empty interface slice using pointers to each elements of
|
||||
// the original slice. Useful to handle gorpc.MultiCall() replies.
|
||||
func CopyPinInfoToIfaces(in []*api.PinInfo) []interface{} {
|
||||
ifaces := make([]interface{}, len(in), len(in))
|
||||
ifaces := make([]interface{}, len(in))
|
||||
for i := range in {
|
||||
in[i] = &api.PinInfo{}
|
||||
ifaces[i] = in[i]
|
||||
|
@ -107,7 +107,7 @@ func CopyPinInfoToIfaces(in []*api.PinInfo) []interface{} {
|
|||
// to an empty interface slice using pointers to each elements of the original
|
||||
// slice. Useful to handle gorpc.MultiCall() replies.
|
||||
func CopyPinInfoSliceToIfaces(in [][]*api.PinInfo) []interface{} {
|
||||
ifaces := make([]interface{}, len(in), len(in))
|
||||
ifaces := make([]interface{}, len(in))
|
||||
for i := range in {
|
||||
ifaces[i] = &in[i]
|
||||
}
|
||||
|
@ -118,7 +118,7 @@ func CopyPinInfoSliceToIfaces(in [][]*api.PinInfo) []interface{} {
|
|||
// an empty interface slice using pointers to each elements of
|
||||
// the original slice. Useful to handle gorpc.MultiCall() replies.
|
||||
func CopyRepoGCSliceToIfaces(in []*api.RepoGC) []interface{} {
|
||||
ifaces := make([]interface{}, len(in), len(in))
|
||||
ifaces := make([]interface{}, len(in))
|
||||
for i := range in {
|
||||
in[i] = &api.RepoGC{}
|
||||
ifaces[i] = in[i]
|
||||
|
@ -130,7 +130,7 @@ func CopyRepoGCSliceToIfaces(in []*api.RepoGC) []interface{} {
|
|||
// slice using pointers to each elements of the original slice.
|
||||
// Useful to handle gorpc.MultiCall() replies.
|
||||
func CopyEmptyStructToIfaces(in []struct{}) []interface{} {
|
||||
ifaces := make([]interface{}, len(in), len(in))
|
||||
ifaces := make([]interface{}, len(in))
|
||||
for i := range in {
|
||||
ifaces[i] = &in[i]
|
||||
}
|
||||
|
@ -141,7 +141,7 @@ func CopyEmptyStructToIfaces(in []struct{}) []interface{} {
|
|||
// slice of then given length. Useful for RPC methods which have no response
|
||||
// types (so they use empty structs).
|
||||
func RPCDiscardReplies(n int) []interface{} {
|
||||
replies := make([]struct{}, n, n)
|
||||
replies := make([]struct{}, n)
|
||||
return CopyEmptyStructToIfaces(replies)
|
||||
}
|
||||
|
||||
|
|
|
@ -33,7 +33,7 @@ type State struct {
|
|||
dsWrite ds.Write
|
||||
codecHandle codec.Handle
|
||||
namespace ds.Key
|
||||
version int
|
||||
// version int
|
||||
}
|
||||
|
||||
// DefaultHandle returns the codec handler of choice (Msgpack).
|
||||
|
@ -166,8 +166,6 @@ func (st *State) List(ctx context.Context) ([]*api.Pin, error) {
|
|||
// Migrate migrates an older state version to the current one.
|
||||
// This is a no-op for now.
|
||||
func (st *State) Migrate(ctx context.Context, r io.Reader) error {
|
||||
ctx, span := trace.StartSpan(ctx, "state/map/Migrate")
|
||||
defer span.End()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -316,7 +314,7 @@ func NewBatching(dstore ds.Batching, namespace string, handle codec.Handle) (*Ba
|
|||
|
||||
// Commit persists the batched write operations.
|
||||
func (bst *BatchingState) Commit(ctx context.Context) error {
|
||||
ctx, span := trace.StartSpan(ctx, "state/dsstate/Commit")
|
||||
_, span := trace.StartSpan(ctx, "state/dsstate/Commit")
|
||||
defer span.End()
|
||||
return bst.batch.Commit()
|
||||
}
|
||||
|
|
|
@ -13,7 +13,7 @@ import (
|
|||
)
|
||||
|
||||
var testCid1, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmq")
|
||||
var testPeerID1, _ = peer.IDB58Decode("QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLewuabc")
|
||||
var testPeerID1, _ = peer.Decode("QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLewuabc")
|
||||
|
||||
var c = &api.Pin{
|
||||
Cid: testCid1,
|
||||
|
|
12
test/cids.go
12
test/cids.go
|
@ -23,12 +23,12 @@ var (
|
|||
// NotFoundCid is meant to be used as a CID that doesn't exist in the
|
||||
// pinset.
|
||||
NotFoundCid, _ = cid.Decode("bafyreiay3jpjk74dkckv2r74eyvf3lfnxujefay2rtuluintasq2zlapv4")
|
||||
PeerID1, _ = peer.IDB58Decode("QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLewuabc")
|
||||
PeerID2, _ = peer.IDB58Decode("QmUZ13osndQ5uL4tPWHXe3iBgBgq9gfewcBMSCAuMBsDJ6")
|
||||
PeerID3, _ = peer.IDB58Decode("QmPGDFvBkgWhvzEK9qaTWrWurSwqXNmhnK3hgELPdZZNPa")
|
||||
PeerID4, _ = peer.IDB58Decode("QmZ8naDy5mEz4GLuQwjWt9MPYqHTBbsm8tQBrNSjiq6zBc")
|
||||
PeerID5, _ = peer.IDB58Decode("QmZVAo3wd8s5eTTy2kPYs34J9PvfxpKPuYsePPYGjgRRjg")
|
||||
PeerID6, _ = peer.IDB58Decode("QmR8Vu6kZk7JvAN2rWVWgiduHatgBq2bb15Yyq8RRhYSbx")
|
||||
PeerID1, _ = peer.Decode("QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLewuabc")
|
||||
PeerID2, _ = peer.Decode("QmUZ13osndQ5uL4tPWHXe3iBgBgq9gfewcBMSCAuMBsDJ6")
|
||||
PeerID3, _ = peer.Decode("QmPGDFvBkgWhvzEK9qaTWrWurSwqXNmhnK3hgELPdZZNPa")
|
||||
PeerID4, _ = peer.Decode("QmZ8naDy5mEz4GLuQwjWt9MPYqHTBbsm8tQBrNSjiq6zBc")
|
||||
PeerID5, _ = peer.Decode("QmZVAo3wd8s5eTTy2kPYs34J9PvfxpKPuYsePPYGjgRRjg")
|
||||
PeerID6, _ = peer.Decode("QmR8Vu6kZk7JvAN2rWVWgiduHatgBq2bb15Yyq8RRhYSbx")
|
||||
|
||||
PeerName1 = "TestPeer1"
|
||||
PeerName2 = "TestPeer2"
|
||||
|
|
|
@ -82,12 +82,6 @@ type mockConfigResp struct {
|
|||
}
|
||||
}
|
||||
|
||||
type mockAddResp struct {
|
||||
Name string
|
||||
Hash string
|
||||
Bytes uint64
|
||||
}
|
||||
|
||||
type mockRefsResp struct {
|
||||
Ref string
|
||||
Err string
|
||||
|
|
|
@ -201,26 +201,26 @@ func (mock *mockCluster) ConnectGraph(ctx context.Context, in struct{}, out *api
|
|||
*out = api.ConnectGraph{
|
||||
ClusterID: PeerID1,
|
||||
IPFSLinks: map[string][]peer.ID{
|
||||
peer.IDB58Encode(PeerID4): []peer.ID{PeerID5, PeerID6},
|
||||
peer.IDB58Encode(PeerID5): []peer.ID{PeerID4, PeerID6},
|
||||
peer.IDB58Encode(PeerID6): []peer.ID{PeerID4, PeerID5},
|
||||
peer.Encode(PeerID4): []peer.ID{PeerID5, PeerID6},
|
||||
peer.Encode(PeerID5): []peer.ID{PeerID4, PeerID6},
|
||||
peer.Encode(PeerID6): []peer.ID{PeerID4, PeerID5},
|
||||
},
|
||||
ClusterLinks: map[string][]peer.ID{
|
||||
peer.IDB58Encode(PeerID1): []peer.ID{PeerID2, PeerID3},
|
||||
peer.IDB58Encode(PeerID2): []peer.ID{PeerID1, PeerID3},
|
||||
peer.IDB58Encode(PeerID3): []peer.ID{PeerID1, PeerID2},
|
||||
peer.Encode(PeerID1): []peer.ID{PeerID2, PeerID3},
|
||||
peer.Encode(PeerID2): []peer.ID{PeerID1, PeerID3},
|
||||
peer.Encode(PeerID3): []peer.ID{PeerID1, PeerID2},
|
||||
},
|
||||
ClustertoIPFS: map[string]peer.ID{
|
||||
peer.IDB58Encode(PeerID1): PeerID4,
|
||||
peer.IDB58Encode(PeerID2): PeerID5,
|
||||
peer.IDB58Encode(PeerID3): PeerID6,
|
||||
peer.Encode(PeerID1): PeerID4,
|
||||
peer.Encode(PeerID2): PeerID5,
|
||||
peer.Encode(PeerID3): PeerID6,
|
||||
},
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mock *mockCluster) StatusAll(ctx context.Context, in struct{}, out *[]*api.GlobalPinInfo) error {
|
||||
pid := peer.IDB58Encode(PeerID1)
|
||||
pid := peer.Encode(PeerID1)
|
||||
*out = []*api.GlobalPinInfo{
|
||||
{
|
||||
Cid: Cid1,
|
||||
|
@ -270,7 +270,7 @@ func (mock *mockCluster) Status(ctx context.Context, in cid.Cid, out *api.Global
|
|||
*out = api.GlobalPinInfo{
|
||||
Cid: in,
|
||||
PeerMap: map[string]*api.PinInfo{
|
||||
peer.IDB58Encode(PeerID1): {
|
||||
peer.Encode(PeerID1): {
|
||||
Cid: in,
|
||||
Peer: PeerID1,
|
||||
Status: api.TrackerStatusPinned,
|
||||
|
@ -314,7 +314,7 @@ func (mock *mockCluster) RepoGC(ctx context.Context, in struct{}, out *api.Globa
|
|||
_ = mock.RepoGCLocal(ctx, struct{}{}, localrepoGC)
|
||||
*out = api.GlobalRepoGC{
|
||||
PeerMap: map[string]*api.RepoGC{
|
||||
peer.IDB58Encode(PeerID1): localrepoGC,
|
||||
peer.Encode(PeerID1): localrepoGC,
|
||||
},
|
||||
}
|
||||
return nil
|
||||
|
@ -392,7 +392,7 @@ func (mock *mockPinTracker) Status(ctx context.Context, in cid.Cid, out *api.Pin
|
|||
}
|
||||
|
||||
func (mock *mockPinTracker) RecoverAll(ctx context.Context, in struct{}, out *[]*api.PinInfo) error {
|
||||
*out = make([]*api.PinInfo, 0, 0)
|
||||
*out = make([]*api.PinInfo, 0)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
9
util.go
9
util.go
|
@ -67,15 +67,6 @@ func containsPeer(list []peer.ID, peer peer.ID) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
func containsCid(list []cid.Cid, ci cid.Cid) bool {
|
||||
for _, c := range list {
|
||||
if c.String() == ci.String() {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func minInt(x, y int) int {
|
||||
if x < y {
|
||||
return x
|
||||
|
|
|
@ -4,7 +4,7 @@ import (
|
|||
"fmt"
|
||||
|
||||
semver "github.com/blang/semver"
|
||||
protocol "github.com/libp2p/go-libp2p-protocol"
|
||||
protocol "github.com/libp2p/go-libp2p-core/protocol"
|
||||
)
|
||||
|
||||
// Version is the current cluster version. Version alignment between
|
||||
|
|
Loading…
Reference in New Issue
Block a user