From f83ff9b6551def3aaef42776f8ca09797b0d6f3e Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Tue, 14 Apr 2020 19:58:00 +0200 Subject: [PATCH] staticcheck: fix all staticcheck warnings in the project --- add_test.go | 4 +- adder/ipfsadd/add.go | 41 +++++----- adder/sharding/dag.go | 1 - adder/sharding/dag_service.go | 1 - adder/sharding/shard.go | 2 +- adder/single/dag_service.go | 4 +- allocator/ascendalloc/ascendalloc.go | 3 - allocator/descendalloc/descendalloc.go | 3 - api/ipfsproxy/config.go | 2 +- api/ipfsproxy/ipfsproxy.go | 10 +-- api/ipfsproxy/ipfsproxy_test.go | 11 +-- api/rest/client/client.go | 7 +- api/rest/client/client_test.go | 4 +- api/rest/client/lbclient.go | 2 +- api/rest/client/methods.go | 4 +- api/rest/client/methods_test.go | 48 +++++------ api/rest/client/request.go | 6 +- api/rest/client/transports.go | 2 +- api/rest/config.go | 4 +- api/rest/config_test.go | 2 +- api/rest/restapi.go | 19 ++--- api/rest/restapi_test.go | 14 ++-- api/types.go | 6 +- api/types_test.go | 22 +----- api/util.go | 6 +- cluster.go | 105 +++++++++++++------------ cluster_config.go | 5 +- cluster_test.go | 3 +- clusterhost.go | 4 +- cmd/ipfs-cluster-ctl/formatters.go | 53 +++++-------- cmd/ipfs-cluster-ctl/graph.go | 16 ++-- cmd/ipfs-cluster-ctl/graph_test.go | 54 ++++++------- cmd/ipfs-cluster-ctl/main.go | 23 ++---- cmdutils/cmdutils.go | 2 +- config/identity.go | 4 +- connect_graph.go | 12 +-- consensus/crdt/config.go | 2 +- consensus/crdt/config_test.go | 4 + consensus/crdt/consensus.go | 9 +-- consensus/raft/consensus.go | 25 ++---- consensus/raft/raft.go | 38 ++++----- informer/disk/disk.go | 2 +- informer/numpin/numpin.go | 2 +- ipfscluster_test.go | 68 ++++++++-------- ipfsconn/ipfshttp/ipfshttp.go | 42 ++-------- ipfsconn/ipfshttp/ipfshttp_test.go | 5 +- monitor/metrics/checker.go | 16 ---- monitor/metrics/prob_test.go | 2 +- monitor/metrics/window.go | 3 - monitor/metrics/window_test.go | 17 +--- monitor/pubsubmon/pubsubmon.go | 17 ++-- observations/metrics.go | 4 +- peer_manager_test.go | 13 +-- pintracker/pintracker_test.go | 3 +- pintracker/stateless/stateless.go | 13 ++- pintracker/stateless/stateless_test.go | 13 +-- pstoremgr/pstoremgr.go | 2 +- pstoremgr/pstoremgr_test.go | 2 +- rpc_api.go | 2 +- rpcutil/rpcutil.go | 24 +++--- state/dsstate/datastore.go | 6 +- state/dsstate/datastore_test.go | 2 +- test/cids.go | 12 +-- test/ipfs_mock.go | 6 -- test/rpc_api_mock.go | 26 +++--- util.go | 9 --- version/version.go | 2 +- 67 files changed, 370 insertions(+), 530 deletions(-) diff --git a/add_test.go b/add_test.go index 905a2692..eb3bcd92 100644 --- a/add_test.go +++ b/add_test.go @@ -178,7 +178,7 @@ func TestAddOnePeerFails(t *testing.T) { defer wg.Done() _, err := clusters[0].AddFile(r, params) if err != nil { - t.Fatal(err) + t.Error(err) } }() @@ -236,7 +236,7 @@ func TestAddAllPeersFail(t *testing.T) { defer wg.Done() _, err := clusters[0].AddFile(r, params) if err != adder.ErrBlockAdder { - t.Fatal("expected ErrBlockAdder. Got: ", err) + t.Error("expected ErrBlockAdder. Got: ", err) } }() diff --git a/adder/ipfsadd/add.go b/adder/ipfsadd/add.go index e69e30e3..07e05888 100644 --- a/adder/ipfsadd/add.go +++ b/adder/ipfsadd/add.go @@ -121,29 +121,30 @@ func (adder *Adder) add(reader io.Reader) (ipld.Node, error) { return nd, nil } -// RootNode returns the mfs root node -func (adder *Adder) curRootNode() (ipld.Node, error) { - mr, err := adder.mfsRoot() - if err != nil { - return nil, err - } - root, err := mr.GetDirectory().GetNode() - if err != nil { - return nil, err - } +// Cluster: commented as it is unused +// // RootNode returns the mfs root node +// func (adder *Adder) curRootNode() (ipld.Node, error) { +// mr, err := adder.mfsRoot() +// if err != nil { +// return nil, err +// } +// root, err := mr.GetDirectory().GetNode() +// if err != nil { +// return nil, err +// } - // if one root file, use that hash as root. - if len(root.Links()) == 1 { - nd, err := root.Links()[0].GetNode(adder.ctx, adder.dagService) - if err != nil { - return nil, err - } +// // if one root file, use that hash as root. +// if len(root.Links()) == 1 { +// nd, err := root.Links()[0].GetNode(adder.ctx, adder.dagService) +// if err != nil { +// return nil, err +// } - root = nd - } +// root = nd +// } - return root, err -} +// return root, err +// } // PinRoot recursively pins the root node of Adder and // writes the pin state to the backing datastore. diff --git a/adder/sharding/dag.go b/adder/sharding/dag.go index 1a6886a4..f265d1ff 100644 --- a/adder/sharding/dag.go +++ b/adder/sharding/dag.go @@ -36,7 +36,6 @@ func init() { // MaxLinks is the max number of links that, when serialized fit into a block const MaxLinks = 5984 -const fixedPerLink = 40 const hashFn = mh.SHA2_256 // CborDataToNode parses cbor data into a clusterDAG node while making a few diff --git a/adder/sharding/dag_service.go b/adder/sharding/dag_service.go index a5cfca91..4a0102b9 100644 --- a/adder/sharding/dag_service.go +++ b/adder/sharding/dag_service.go @@ -21,7 +21,6 @@ import ( rpc "github.com/libp2p/go-libp2p-gorpc" ) -var errNotFound = errors.New("dagservice: block not found") var logger = logging.Logger("shardingdags") // DAGService is an implementation of a ClusterDAGService which diff --git a/adder/sharding/shard.go b/adder/sharding/shard.go index 6e6b2cdc..6246260f 100644 --- a/adder/sharding/shard.go +++ b/adder/sharding/shard.go @@ -35,7 +35,7 @@ func newShard(ctx context.Context, rpc *rpc.Client, opts api.PinOptions) (*shard return nil, err } - if opts.ReplicationFactorMin > 0 && (allocs == nil || len(allocs) == 0) { + if opts.ReplicationFactorMin > 0 && len(allocs) == 0 { // This would mean that the empty cid is part of the shared state somehow. panic("allocations for new shard cannot be empty without error") } diff --git a/adder/single/dag_service.go b/adder/single/dag_service.go index a06d3256..590afe57 100644 --- a/adder/single/dag_service.go +++ b/adder/single/dag_service.go @@ -4,7 +4,6 @@ package single import ( "context" - "errors" adder "github.com/ipfs/ipfs-cluster/adder" "github.com/ipfs/ipfs-cluster/api" @@ -16,9 +15,8 @@ import ( rpc "github.com/libp2p/go-libp2p-gorpc" ) -var errNotFound = errors.New("dagservice: block not found") - var logger = logging.Logger("singledags") +var _ = logger // otherwise unused // DAGService is an implementation of an adder.ClusterDAGService which // puts the added blocks directly in the peers allocated to them (without diff --git a/allocator/ascendalloc/ascendalloc.go b/allocator/ascendalloc/ascendalloc.go index 79019724..96f2b8ab 100644 --- a/allocator/ascendalloc/ascendalloc.go +++ b/allocator/ascendalloc/ascendalloc.go @@ -11,13 +11,10 @@ import ( "github.com/ipfs/ipfs-cluster/api" cid "github.com/ipfs/go-cid" - logging "github.com/ipfs/go-log/v2" peer "github.com/libp2p/go-libp2p-core/peer" rpc "github.com/libp2p/go-libp2p-gorpc" ) -var logger = logging.Logger("ascendalloc") - // AscendAllocator extends the SimpleAllocator type AscendAllocator struct{} diff --git a/allocator/descendalloc/descendalloc.go b/allocator/descendalloc/descendalloc.go index 4526fe11..82507f99 100644 --- a/allocator/descendalloc/descendalloc.go +++ b/allocator/descendalloc/descendalloc.go @@ -11,13 +11,10 @@ import ( "github.com/ipfs/ipfs-cluster/api" cid "github.com/ipfs/go-cid" - logging "github.com/ipfs/go-log/v2" peer "github.com/libp2p/go-libp2p-core/peer" rpc "github.com/libp2p/go-libp2p-gorpc" ) -var logger = logging.Logger("descendalloc") - // DescendAllocator extends the SimpleAllocator type DescendAllocator struct{} diff --git a/api/ipfsproxy/config.go b/api/ipfsproxy/config.go index 6c97011f..3c002eac 100644 --- a/api/ipfsproxy/config.go +++ b/api/ipfsproxy/config.go @@ -278,7 +278,7 @@ func (cfg *Config) applyJSONConfig(jcfg *jsonConfig) error { cfg.MaxHeaderBytes = jcfg.MaxHeaderBytes } - if extra := jcfg.ExtractHeadersExtra; extra != nil && len(extra) > 0 { + if extra := jcfg.ExtractHeadersExtra; len(extra) > 0 { cfg.ExtractHeadersExtra = extra } config.SetIfNotDefault(jcfg.ExtractHeadersPath, &cfg.ExtractHeadersPath) diff --git a/api/ipfsproxy/ipfsproxy.go b/api/ipfsproxy/ipfsproxy.go index 73758101..79ec9be3 100644 --- a/api/ipfsproxy/ipfsproxy.go +++ b/api/ipfsproxy/ipfsproxy.go @@ -340,7 +340,6 @@ func ipfsErrorResponder(w http.ResponseWriter, errMsg string, code int) { w.WriteHeader(http.StatusInternalServerError) } w.Write(resBytes) - return } func (proxy *Server) pinOpHandler(op string, w http.ResponseWriter, r *http.Request) { @@ -373,7 +372,6 @@ func (proxy *Server) pinOpHandler(op string, w http.ResponseWriter, r *http.Requ resBytes, _ := json.Marshal(res) w.WriteHeader(http.StatusOK) w.Write(resBytes) - return } func (proxy *Server) pinHandler(w http.ResponseWriter, r *http.Request) { @@ -529,7 +527,6 @@ func (proxy *Server) pinUpdateHandler(w http.ResponseWriter, r *http.Request) { resBytes, _ := json.Marshal(res) w.WriteHeader(http.StatusOK) w.Write(resBytes) - return } func (proxy *Server) addHandler(w http.ResponseWriter, r *http.Request) { @@ -628,8 +625,8 @@ func (proxy *Server) repoStatHandler(w http.ResponseWriter, r *http.Request) { ctxs, cancels := rpcutil.CtxsWithCancel(proxy.ctx, len(peers)) defer rpcutil.MultiCancel(cancels) - repoStats := make([]*api.IPFSRepoStat, len(peers), len(peers)) - repoStatsIfaces := make([]interface{}, len(repoStats), len(repoStats)) + repoStats := make([]*api.IPFSRepoStat, len(peers)) + repoStatsIfaces := make([]interface{}, len(repoStats)) for i := range repoStats { repoStats[i] = &api.IPFSRepoStat{} repoStatsIfaces[i] = repoStats[i] @@ -662,7 +659,6 @@ func (proxy *Server) repoStatHandler(w http.ResponseWriter, r *http.Request) { resBytes, _ := json.Marshal(totalStats) w.WriteHeader(http.StatusOK) w.Write(resBytes) - return } type ipfsRepoGCResp struct { @@ -718,8 +714,6 @@ func (proxy *Server) repoGCHandler(w http.ResponseWriter, r *http.Request) { if !streamErrors && mErrStr != "" { w.Header().Set("X-Stream-Error", mErrStr) } - - return } // slashHandler returns a handler which converts a /a/b/c/ request diff --git a/api/ipfsproxy/ipfsproxy_test.go b/api/ipfsproxy/ipfsproxy_test.go index 0b1ffac2..a67874e0 100644 --- a/api/ipfsproxy/ipfsproxy_test.go +++ b/api/ipfsproxy/ipfsproxy_test.go @@ -7,7 +7,6 @@ import ( "io" "io/ioutil" "net/http" - "net/url" "os" "path/filepath" "strings" @@ -641,7 +640,7 @@ func TestProxyAdd(t *testing.T) { }, } - reqs := make([]*http.Request, len(testcases), len(testcases)) + reqs := make([]*http.Request, len(testcases)) sth := test.NewShardingTestHelper() defer sth.Clean(t) @@ -732,14 +731,6 @@ func TestIPFSProxy(t *testing.T) { } } -func mustParseURL(rawurl string) *url.URL { - u, err := url.Parse(rawurl) - if err != nil { - panic(err) - } - return u -} - func TestHeaderExtraction(t *testing.T) { ctx := context.Background() proxy, mock := testIPFSProxy(t) diff --git a/api/rest/client/client.go b/api/rest/client/client.go index f3903952..031dc7a0 100644 --- a/api/rest/client/client.go +++ b/api/rest/client/client.go @@ -169,7 +169,7 @@ type Config struct { // NewDefaultClient() to create one. type defaultClient struct { ctx context.Context - cancel func() + cancel context.CancelFunc config *Config transport *http.Transport net string @@ -180,9 +180,10 @@ type defaultClient struct { // NewDefaultClient initializes a client given a Config. func NewDefaultClient(cfg *Config) (Client, error) { - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) client := &defaultClient{ ctx: ctx, + cancel: cancel, config: cfg, } @@ -352,7 +353,7 @@ func IsPeerAddress(addr ma.Multiaddr) bool { return false } pid, err := addr.ValueForProtocol(ma.P_P2P) - dnsaddr, err2 := addr.ValueForProtocol(madns.DnsaddrProtocol.Code) + dnsaddr, err2 := addr.ValueForProtocol(ma.P_DNSADDR) return (pid != "" && err == nil) || (dnsaddr != "" && err2 == nil) } diff --git a/api/rest/client/client_test.go b/api/rest/client/client_test.go index 3fa18f79..64c25dd5 100644 --- a/api/rest/client/client_test.go +++ b/api/rest/client/client_test.go @@ -23,7 +23,7 @@ func testAPI(t *testing.T) *rest.API { cfg := &rest.Config{} cfg.Default() cfg.HTTPListenAddr = []ma.Multiaddr{apiMAddr} - secret := make(pnet.PSK, 32, 32) + secret := make(pnet.PSK, 32) h, err := libp2p.New( context.Background(), @@ -58,7 +58,7 @@ func apiMAddr(a *rest.API) ma.Multiaddr { } func peerMAddr(a *rest.API) ma.Multiaddr { - ipfsAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s", peer.IDB58Encode(a.Host().ID()))) + ipfsAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s", peer.Encode(a.Host().ID()))) for _, a := range a.Host().Addrs() { if _, err := a.ValueForProtocol(ma.P_IP4); err == nil { return a.Encapsulate(ipfsAddr) diff --git a/api/rest/client/lbclient.go b/api/rest/client/lbclient.go index 031264a9..6a1e038f 100644 --- a/api/rest/client/lbclient.go +++ b/api/rest/client/lbclient.go @@ -8,7 +8,7 @@ import ( shell "github.com/ipfs/go-ipfs-api" files "github.com/ipfs/go-ipfs-files" "github.com/ipfs/ipfs-cluster/api" - peer "github.com/libp2p/go-libp2p-peer" + peer "github.com/libp2p/go-libp2p-core/peer" ) // loadBalancingClient is a client to interact with IPFS Cluster APIs diff --git a/api/rest/client/methods.go b/api/rest/client/methods.go index b6b692dd..07788a8c 100644 --- a/api/rest/client/methods.go +++ b/api/rest/client/methods.go @@ -52,7 +52,7 @@ func (c *defaultClient) PeerAdd(ctx context.Context, pid peer.ID) (*api.ID, erro ctx, span := trace.StartSpan(ctx, "client/PeerAdd") defer span.End() - pidStr := peer.IDB58Encode(pid) + pidStr := peer.Encode(pid) body := peerAddBody{pidStr} var buf bytes.Buffer @@ -518,7 +518,7 @@ func (c *defaultClient) Add( ctx, span := trace.StartSpan(ctx, "client/Add") defer span.End() - addFiles := make([]files.DirEntry, len(paths), len(paths)) + addFiles := make([]files.DirEntry, len(paths)) for i, p := range paths { u, err := url.Parse(p) if err != nil { diff --git a/api/rest/client/methods_test.go b/api/rest/client/methods_test.go index d2b39e87..e5f4a2fc 100644 --- a/api/rest/client/methods_test.go +++ b/api/rest/client/methods_test.go @@ -6,10 +6,9 @@ import ( "testing" "time" - "github.com/ipfs/ipfs-cluster/api" types "github.com/ipfs/ipfs-cluster/api" - "github.com/ipfs/ipfs-cluster/api/rest" - "github.com/ipfs/ipfs-cluster/test" + rest "github.com/ipfs/ipfs-cluster/api/rest" + test "github.com/ipfs/ipfs-cluster/test" cid "github.com/ipfs/go-cid" peer "github.com/libp2p/go-libp2p-core/peer" @@ -90,6 +89,7 @@ func TestPeersWithError(t *testing.T) { testF := func(t *testing.T, c Client) { addr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/44444") + var _ = c c, _ = NewDefaultClient(&Config{APIAddr: addr, DisableKeepAlives: true}) ids, err := c.Peers(ctx) if err == nil { @@ -367,7 +367,7 @@ func TestStatusAll(t *testing.T) { t.Error("there should be two pins") } - pins, err = c.StatusAll(ctx, 1<<25, false) + _, err = c.StatusAll(ctx, 1<<25, false) if err == nil { t.Error("expected an error") } @@ -476,7 +476,7 @@ type waitService struct { pinStart time.Time } -func (wait *waitService) Pin(ctx context.Context, in *api.Pin, out *api.Pin) error { +func (wait *waitService) Pin(ctx context.Context, in *types.Pin, out *types.Pin) error { wait.l.Lock() defer wait.l.Unlock() wait.pinStart = time.Now() @@ -484,41 +484,41 @@ func (wait *waitService) Pin(ctx context.Context, in *api.Pin, out *api.Pin) err return nil } -func (wait *waitService) Status(ctx context.Context, in cid.Cid, out *api.GlobalPinInfo) error { +func (wait *waitService) Status(ctx context.Context, in cid.Cid, out *types.GlobalPinInfo) error { wait.l.Lock() defer wait.l.Unlock() if time.Now().After(wait.pinStart.Add(5 * time.Second)) { //pinned - *out = api.GlobalPinInfo{ + *out = types.GlobalPinInfo{ Cid: in, - PeerMap: map[string]*api.PinInfo{ - peer.IDB58Encode(test.PeerID1): { + PeerMap: map[string]*types.PinInfo{ + peer.Encode(test.PeerID1): { Cid: in, Peer: test.PeerID1, - Status: api.TrackerStatusPinned, + Status: types.TrackerStatusPinned, TS: wait.pinStart, }, - peer.IDB58Encode(test.PeerID2): { + peer.Encode(test.PeerID2): { Cid: in, Peer: test.PeerID2, - Status: api.TrackerStatusPinned, + Status: types.TrackerStatusPinned, TS: wait.pinStart, }, }, } } else { // pinning - *out = api.GlobalPinInfo{ + *out = types.GlobalPinInfo{ Cid: in, - PeerMap: map[string]*api.PinInfo{ - peer.IDB58Encode(test.PeerID1): { + PeerMap: map[string]*types.PinInfo{ + peer.Encode(test.PeerID1): { Cid: in, Peer: test.PeerID1, - Status: api.TrackerStatusPinning, + Status: types.TrackerStatusPinning, TS: wait.pinStart, }, - peer.IDB58Encode(test.PeerID2): { + peer.Encode(test.PeerID2): { Cid: in, Peer: test.PeerID2, - Status: api.TrackerStatusPinned, + Status: types.TrackerStatusPinned, TS: wait.pinStart, }, }, @@ -554,21 +554,23 @@ func TestWaitFor(t *testing.T) { fp := StatusFilterParams{ Cid: test.Cid1, Local: false, - Target: api.TrackerStatusPinned, + Target: types.TrackerStatusPinned, CheckFreq: time.Second, } start := time.Now() st, err := WaitFor(ctx, c, fp) if err != nil { - t.Fatal(err) + t.Error(err) + return } - if time.Now().Sub(start) <= 5*time.Second { - t.Fatal("slow pin should have taken at least 5 seconds") + if time.Since(start) <= 5*time.Second { + t.Error("slow pin should have taken at least 5 seconds") + return } for _, pi := range st.PeerMap { - if pi.Status != api.TrackerStatusPinned { + if pi.Status != types.TrackerStatusPinned { t.Error("pin info should show the item is pinned") } } diff --git a/api/rest/client/request.go b/api/rest/client/request.go index 44f52970..d8fd1abf 100644 --- a/api/rest/client/request.go +++ b/api/rest/client/request.go @@ -73,10 +73,8 @@ func (c *defaultClient) doRequest( r.SetBasicAuth(c.config.Username, c.config.Password) } - if headers != nil { - for k, v := range headers { - r.Header.Set(k, v) - } + for k, v := range headers { + r.Header.Set(k, v) } if body != nil { diff --git a/api/rest/client/transports.go b/api/rest/client/transports.go index 2b939e13..65b58092 100644 --- a/api/rest/client/transports.go +++ b/api/rest/client/transports.go @@ -80,7 +80,7 @@ func (c *defaultClient) enableLibp2p() error { c.transport.RegisterProtocol("libp2p", p2phttp.NewTransport(h)) c.net = "libp2p" c.p2p = h - c.hostname = peer.IDB58Encode(pinfo.ID) + c.hostname = peer.Encode(pinfo.ID) return nil } diff --git a/api/rest/config.go b/api/rest/config.go index 4dbba795..c506b5d7 100644 --- a/api/rest/config.go +++ b/api/rest/config.go @@ -413,7 +413,7 @@ func (cfg *Config) loadLibp2pOptions(jcfg *jsonConfig) error { } if jcfg.ID != "" { - id, err := peer.IDB58Decode(jcfg.ID) + id, err := peer.Decode(jcfg.ID) if err != nil { return fmt.Errorf("error parsing restapi.ID: %s", err) } @@ -473,7 +473,7 @@ func (cfg *Config) toJSONConfig() (jcfg *jsonConfig, err error) { } if cfg.ID != "" { - jcfg.ID = peer.IDB58Encode(cfg.ID) + jcfg.ID = peer.Encode(cfg.ID) } if cfg.PrivateKey != nil { pkeyBytes, err := cfg.PrivateKey.Bytes() diff --git a/api/rest/config_test.go b/api/rest/config_test.go index 6505c95b..48a80b8a 100644 --- a/api/rest/config_test.go +++ b/api/rest/config_test.go @@ -203,7 +203,7 @@ func TestLibp2pConfig(t *testing.T) { } defer rest.Shutdown(ctx) - badPid, _ := peer.IDB58Decode("QmTQ6oKHDwFjzr4ihirVCLJe8CxanxD3ZjGRYzubFuNDjE") + badPid, _ := peer.Decode("QmTQ6oKHDwFjzr4ihirVCLJe8CxanxD3ZjGRYzubFuNDjE") cfg.ID = badPid err = cfg.Validate() if err == nil { diff --git a/api/rest/restapi.go b/api/rest/restapi.go index 9b81c939..07dbab5b 100644 --- a/api/rest/restapi.go +++ b/api/rest/restapi.go @@ -72,9 +72,6 @@ var ( // Used by sendResponse to set the right status const autoStatus = -1 -// For making a random sharding ID -var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") - // API implements an API and aims to provides // a RESTful HTTP API for Cluster. type API struct { @@ -324,7 +321,7 @@ func basicAuthHandler(credentials map[string]string, h http.Handler) http.Handle logger.Error(err) return } - http.Error(w, resp, 401) + http.Error(w, resp, http.StatusUnauthorized) return } @@ -340,7 +337,7 @@ func basicAuthHandler(credentials map[string]string, h http.Handler) http.Handle logger.Error(err) return } - http.Error(w, resp, 401) + http.Error(w, resp, http.StatusUnauthorized) return } h.ServeHTTP(w, r) @@ -686,8 +683,6 @@ func (api *API) addHandler(w http.ResponseWriter, r *http.Request) { w, nil, ) - - return } func (api *API) peerListHandler(w http.ResponseWriter, r *http.Request) { @@ -715,7 +710,7 @@ func (api *API) peerAddHandler(w http.ResponseWriter, r *http.Request) { return } - pid, err := peer.IDB58Decode(addInfo.PeerID) + pid, err := peer.Decode(addInfo.PeerID) if err != nil { api.sendResponse(w, http.StatusBadRequest, errors.New("error decoding peer_id"), nil) return @@ -1074,7 +1069,7 @@ func (api *API) repoGCHandler(w http.ResponseWriter, r *http.Request) { func repoGCToGlobal(r *types.RepoGC) types.GlobalRepoGC { return types.GlobalRepoGC{ PeerMap: map[string]*types.RepoGC{ - peer.IDB58Encode(r.Peer): r, + peer.Encode(r.Peer): r, }, } } @@ -1124,7 +1119,7 @@ func (api *API) parseCidOrError(w http.ResponseWriter, r *http.Request) *types.P func (api *API) parsePidOrError(w http.ResponseWriter, r *http.Request) peer.ID { vars := mux.Vars(r) idStr := vars["peer"] - pid, err := peer.IDB58Decode(idStr) + pid, err := peer.Decode(idStr) if err != nil { api.sendResponse(w, http.StatusBadRequest, errors.New("error decoding Peer ID: "+err.Error()), nil) return "" @@ -1136,13 +1131,13 @@ func pinInfoToGlobal(pInfo *types.PinInfo) *types.GlobalPinInfo { return &types.GlobalPinInfo{ Cid: pInfo.Cid, PeerMap: map[string]*types.PinInfo{ - peer.IDB58Encode(pInfo.Peer): pInfo, + peer.Encode(pInfo.Peer): pInfo, }, } } func pinInfosToGlobal(pInfos []*types.PinInfo) []*types.GlobalPinInfo { - gPInfos := make([]*types.GlobalPinInfo, len(pInfos), len(pInfos)) + gPInfos := make([]*types.GlobalPinInfo, len(pInfos)) for i, p := range pInfos { gPInfos[i] = pinInfoToGlobal(p) } diff --git a/api/rest/restapi_test.go b/api/rest/restapi_test.go index 0a0b3247..c9a6e302 100644 --- a/api/rest/restapi_test.go +++ b/api/rest/restapi_test.go @@ -179,7 +179,7 @@ func httpURL(a *API) string { } func p2pURL(a *API) string { - return fmt.Sprintf("libp2p://%s", peer.IDB58Encode(a.Host().ID())) + return fmt.Sprintf("libp2p://%s", peer.Encode(a.Host().ID())) } func httpsURL(a *API) string { @@ -558,10 +558,10 @@ func TestConnectGraphEndpoint(t *testing.T) { // test a few link values pid1 := test.PeerID1 pid4 := test.PeerID4 - if _, ok := cg.ClustertoIPFS[peer.IDB58Encode(pid1)]; !ok { + if _, ok := cg.ClustertoIPFS[peer.Encode(pid1)]; !ok { t.Fatal("missing cluster peer 1 from cluster to peer links map") } - if cg.ClustertoIPFS[peer.IDB58Encode(pid1)] != pid4 { + if cg.ClustertoIPFS[peer.Encode(pid1)] != pid4 { t.Error("unexpected ipfs peer mapped to cluster peer 1 in graph") } } @@ -860,7 +860,7 @@ func TestAPIStatusAllEndpoint(t *testing.T) { if len(resp) != 3 || !resp[0].Cid.Equals(test.Cid1) || - resp[1].PeerMap[peer.IDB58Encode(test.PeerID1)].Status.String() != "pinning" { + resp[1].PeerMap[peer.Encode(test.PeerID1)].Status.String() != "pinning" { t.Errorf("unexpected statusAll resp") } @@ -924,7 +924,7 @@ func TestAPIStatusEndpoint(t *testing.T) { if !resp.Cid.Equals(test.Cid1) { t.Error("expected the same cid") } - info, ok := resp.PeerMap[peer.IDB58Encode(test.PeerID1)] + info, ok := resp.PeerMap[peer.Encode(test.PeerID1)] if !ok { t.Fatal("expected info for test.PeerID1") } @@ -939,7 +939,7 @@ func TestAPIStatusEndpoint(t *testing.T) { if !resp2.Cid.Equals(test.Cid1) { t.Error("expected the same cid") } - info, ok = resp2.PeerMap[peer.IDB58Encode(test.PeerID2)] + info, ok = resp2.PeerMap[peer.Encode(test.PeerID2)] if !ok { t.Fatal("expected info for test.PeerID2") } @@ -963,7 +963,7 @@ func TestAPIRecoverEndpoint(t *testing.T) { if !resp.Cid.Equals(test.Cid1) { t.Error("expected the same cid") } - info, ok := resp.PeerMap[peer.IDB58Encode(test.PeerID1)] + info, ok := resp.PeerMap[peer.Encode(test.PeerID1)] if !ok { t.Fatal("expected info for test.PeerID1") } diff --git a/api/types.go b/api/types.go index 6bb40033..946ef875 100644 --- a/api/types.go +++ b/api/types.go @@ -709,7 +709,7 @@ func convertPinType(t PinType) pb.Pin_PinType { // ProtoMarshal marshals this Pin using probobuf. func (pin *Pin) ProtoMarshal() ([]byte, error) { - allocs := make([][]byte, len(pin.Allocations), len(pin.Allocations)) + allocs := make([][]byte, len(pin.Allocations)) for i, pid := range pin.Allocations { bs, err := pid.Marshal() if err != nil { @@ -766,7 +766,7 @@ func (pin *Pin) ProtoUnmarshal(data []byte) error { pbAllocs := pbPin.GetAllocations() lenAllocs := len(pbAllocs) - allocs := make([]peer.ID, lenAllocs, lenAllocs) + allocs := make([]peer.ID, lenAllocs) for i, pidb := range pbAllocs { pid, err := peer.IDFromBytes(pidb) if err != nil { @@ -914,7 +914,7 @@ func (m *Metric) SetTTL(d time.Duration) { // GetTTL returns the time left before the Metric expires func (m *Metric) GetTTL() time.Duration { expDate := time.Unix(0, m.Expire) - return expDate.Sub(time.Now()) + return time.Until(expDate) } // Expired returns if the Metric has expired diff --git a/api/types_test.go b/api/types_test.go index fad8c8c9..d541d11a 100644 --- a/api/types_test.go +++ b/api/types_test.go @@ -10,26 +10,10 @@ import ( cid "github.com/ipfs/go-cid" peer "github.com/libp2p/go-libp2p-core/peer" - ma "github.com/multiformats/go-multiaddr" "github.com/ugorji/go/codec" ) -var testTime = time.Date(2017, 12, 31, 15, 45, 50, 0, time.UTC) -var testMAddr, _ = ma.NewMultiaddr("/ip4/1.2.3.4") -var testMAddr2, _ = ma.NewMultiaddr("/dns4/a.b.c.d") -var testMAddr3, _ = ma.NewMultiaddr("/ip4/127.0.0.1/tcp/8081/ws/p2p/QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd") -var testCid1, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmq") -var testCid2, _ = cid.Decode("QmYCLpFCj9Av8NFjkQogvtXspnTDFWaizLpVFEijHTH4eV") -var testCid3, _ = cid.Decode("QmZmdA3UZKuHuy9FrWsxJ82q21nbEh97NUnxTzF5EHxZia") -var testCid4, _ = cid.Decode("QmZbNfi13Sb2WUDMjiW1ZNhnds5KDk6mJB5hP9B5h9m5CJ") -var testPeerID1, _ = peer.IDB58Decode("QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLewuabc") -var testPeerID2, _ = peer.IDB58Decode("QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLewuabd") -var testPeerID3, _ = peer.IDB58Decode("QmPGDFvBkgWhvzEK9qaTWrWurSwqXNmhnK3hgELPdZZNPa") -var testPeerID4, _ = peer.IDB58Decode("QmZ8naDy5mEz4GLuQwjWt9MPYqHTBbsm8tQBrNSjiq6zBc") -var testPeerID5, _ = peer.IDB58Decode("QmZVAo3wd8s5eTTy2kPYs34J9PvfxpKPuYsePPYGjgRRjg") -var testPeerID6, _ = peer.IDB58Decode("QmR8Vu6kZk7JvAN2rWVWgiduHatgBq2bb15Yyq8RRhYSbx") - func TestTrackerFromString(t *testing.T) { testcases := []string{"cluster_error", "pin_error", "unpin_error", "pinned", "pinning", "unpinning", "unpinned", "remote"} for i, tc := range testcases { @@ -231,9 +215,9 @@ func TestPinOptionsQuery(t *testing.T) { } func TestIDCodec(t *testing.T) { - TestPeerID1, _ := peer.IDB58Decode("QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLewuabc") - TestPeerID2, _ := peer.IDB58Decode("QmUZ13osndQ5uL4tPWHXe3iBgBgq9gfewcBMSCAuMBsDJ6") - TestPeerID3, _ := peer.IDB58Decode("QmPGDFvBkgWhvzEK9qaTWrWurSwqXNmhnK3hgELPdZZNPa") + TestPeerID1, _ := peer.Decode("QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLewuabc") + TestPeerID2, _ := peer.Decode("QmUZ13osndQ5uL4tPWHXe3iBgBgq9gfewcBMSCAuMBsDJ6") + TestPeerID3, _ := peer.Decode("QmPGDFvBkgWhvzEK9qaTWrWurSwqXNmhnK3hgELPdZZNPa") addr, _ := NewMultiaddr("/ip4/1.2.3.4") id := &ID{ ID: TestPeerID1, diff --git a/api/util.go b/api/util.go index 0cff0acd..0cb9b4d2 100644 --- a/api/util.go +++ b/api/util.go @@ -4,12 +4,12 @@ import ( peer "github.com/libp2p/go-libp2p-core/peer" ) -// PeersToStrings IDB58Encodes a list of peers. +// PeersToStrings Encodes a list of peers. func PeersToStrings(peers []peer.ID) []string { strs := make([]string, len(peers)) for i, p := range peers { if p != "" { - strs[i] = peer.IDB58Encode(p) + strs[i] = peer.Encode(p) } } return strs @@ -19,7 +19,7 @@ func PeersToStrings(peers []peer.ID) []string { func StringsToPeers(strs []string) []peer.ID { peers := []peer.ID{} for _, p := range strs { - pid, err := peer.IDB58Decode(p) + pid, err := peer.Decode(p) if err != nil { logger.Debugf("'%s': %s", p, err) continue diff --git a/cluster.go b/cluster.go index 0967ec16..becccc3f 100644 --- a/cluster.go +++ b/cluster.go @@ -1431,7 +1431,7 @@ func (c *Cluster) Unpin(ctx context.Context, h cid.Cid) (*api.Pin, error) { case api.DataType: return pin, c.consensus.LogUnpin(ctx, pin) case api.ShardType: - err := "cannot unpin a shard directly. Unpin content root CID instead." + err := "cannot unpin a shard directly. Unpin content root CID instead" return pin, errors.New(err) case api.MetaType: // Unpin cluster dag and referenced shards @@ -1441,7 +1441,7 @@ func (c *Cluster) Unpin(ctx context.Context, h cid.Cid) (*api.Pin, error) { } return pin, c.consensus.LogUnpin(ctx, pin) case api.ClusterDAGType: - err := "cannot unpin a Cluster DAG directly. Unpin content root CID instead." + err := "cannot unpin a Cluster DAG directly. Unpin content root CID instead" return pin, errors.New(err) default: return pin, errors.New("unrecognized pin type") @@ -1569,7 +1569,7 @@ func (c *Cluster) Peers(ctx context.Context) []*api.ID { } lenMembers := len(members) - peers := make([]*api.ID, lenMembers, lenMembers) + peers := make([]*api.ID, lenMembers) ctxs, cancels := rpcutil.CtxsWithCancel(ctx, lenMembers) defer rpcutil.MultiCancel(cancels) @@ -1588,6 +1588,7 @@ func (c *Cluster) Peers(ctx context.Context) []*api.ID { for i, err := range errs { if err == nil { finalPeers = append(finalPeers, peers[i]) + _ = finalPeers // staticcheck continue } @@ -1624,7 +1625,7 @@ func (c *Cluster) getTrustedPeers(ctx context.Context) ([]peer.ID, error) { func setTrackerStatus(gpin *api.GlobalPinInfo, h cid.Cid, peers []peer.ID, status api.TrackerStatus, t time.Time) { for _, p := range peers { - gpin.PeerMap[peer.IDB58Encode(p)] = &api.PinInfo{ + gpin.PeerMap[peer.Encode(p)] = &api.PinInfo{ Cid: h, Peer: p, PeerName: p.String(), @@ -1684,7 +1685,7 @@ func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h c setTrackerStatus(gpin, h, remote, api.TrackerStatusRemote, timeNow) lenDests := len(dests) - replies := make([]*api.PinInfo, lenDests, lenDests) + replies := make([]*api.PinInfo, lenDests) ctxs, cancels := rpcutil.CtxsWithCancel(ctx, lenDests) defer rpcutil.MultiCancel(cancels) @@ -1702,7 +1703,7 @@ func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h c // No error. Parse and continue if e == nil { - gpin.PeerMap[peer.IDB58Encode(dests[i])] = r + gpin.PeerMap[peer.Encode(dests[i])] = r continue } @@ -1713,7 +1714,7 @@ func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h c // Deal with error cases (err != nil): wrap errors in PinInfo logger.Errorf("%s: error in broadcast response from %s: %s ", c.id, dests[i], e) - gpin.PeerMap[peer.IDB58Encode(dests[i])] = &api.PinInfo{ + gpin.PeerMap[peer.Encode(dests[i])] = &api.PinInfo{ Cid: h, Peer: dests[i], PeerName: dests[i].String(), @@ -1746,7 +1747,7 @@ func (c *Cluster) globalPinInfoSlice(ctx context.Context, comp, method string) ( } lenMembers := len(members) - replies := make([][]*api.PinInfo, lenMembers, lenMembers) + replies := make([][]*api.PinInfo, lenMembers) ctxs, cancels := rpcutil.CtxsWithCancel(ctx, lenMembers) defer rpcutil.MultiCancel(cancels) @@ -1770,11 +1771,11 @@ func (c *Cluster) globalPinInfoSlice(ctx context.Context, comp, method string) ( fullMap[p.Cid] = &api.GlobalPinInfo{ Cid: p.Cid, PeerMap: map[string]*api.PinInfo{ - peer.IDB58Encode(p.Peer): p, + peer.Encode(p.Peer): p, }, } } else { - item.PeerMap[peer.IDB58Encode(p.Peer)] = p + item.PeerMap[peer.Encode(p.Peer)] = p } } } @@ -1796,7 +1797,7 @@ func (c *Cluster) globalPinInfoSlice(ctx context.Context, comp, method string) ( // Merge any errors for p, msg := range erroredPeers { for c := range fullMap { - fullMap[c].PeerMap[peer.IDB58Encode(p)] = &api.PinInfo{ + fullMap[c].PeerMap[peer.Encode(p)] = &api.PinInfo{ Cid: c, Peer: p, Status: api.TrackerStatusClusterError, @@ -1887,45 +1888,45 @@ func (c *Cluster) cidsFromMetaPin(ctx context.Context, h cid.Cid) ([]cid.Cid, er return list, nil } -// diffPeers returns the peerIDs added and removed from peers2 in relation to -// peers1 -func diffPeers(peers1, peers2 []peer.ID) (added, removed []peer.ID) { - m1 := make(map[peer.ID]struct{}) - m2 := make(map[peer.ID]struct{}) - added = make([]peer.ID, 0) - removed = make([]peer.ID, 0) - if peers1 == nil && peers2 == nil { - return - } - if peers1 == nil { - added = peers2 - return - } - if peers2 == nil { - removed = peers1 - return - } +// // diffPeers returns the peerIDs added and removed from peers2 in relation to +// // peers1 +// func diffPeers(peers1, peers2 []peer.ID) (added, removed []peer.ID) { +// m1 := make(map[peer.ID]struct{}) +// m2 := make(map[peer.ID]struct{}) +// added = make([]peer.ID, 0) +// removed = make([]peer.ID, 0) +// if peers1 == nil && peers2 == nil { +// return +// } +// if peers1 == nil { +// added = peers2 +// return +// } +// if peers2 == nil { +// removed = peers1 +// return +// } - for _, p := range peers1 { - m1[p] = struct{}{} - } - for _, p := range peers2 { - m2[p] = struct{}{} - } - for k := range m1 { - _, ok := m2[k] - if !ok { - removed = append(removed, k) - } - } - for k := range m2 { - _, ok := m1[k] - if !ok { - added = append(added, k) - } - } - return -} +// for _, p := range peers1 { +// m1[p] = struct{}{} +// } +// for _, p := range peers2 { +// m2[p] = struct{}{} +// } +// for k := range m1 { +// _, ok := m2[k] +// if !ok { +// removed = append(removed, k) +// } +// } +// for k := range m2 { +// _, ok := m1[k] +// if !ok { +// added = append(added, k) +// } +// } +// return +// } // RepoGC performs garbage collection sweep on all peers' IPFS repo. func (c *Cluster) RepoGC(ctx context.Context) (*api.GlobalRepoGC, error) { @@ -1952,7 +1953,7 @@ func (c *Cluster) RepoGC(ctx context.Context) (*api.GlobalRepoGC, error) { &repoGC, ) if err == nil { - globalRepoGC.PeerMap[peer.IDB58Encode(member)] = &repoGC + globalRepoGC.PeerMap[peer.Encode(member)] = &repoGC continue } @@ -1963,9 +1964,9 @@ func (c *Cluster) RepoGC(ctx context.Context) (*api.GlobalRepoGC, error) { logger.Errorf("%s: error in broadcast response from %s: %s ", c.id, member, err) - globalRepoGC.PeerMap[peer.IDB58Encode(member)] = &api.RepoGC{ + globalRepoGC.PeerMap[peer.Encode(member)] = &api.RepoGC{ Peer: member, - Peername: peer.IDB58Encode(member), + Peername: peer.Encode(member), Keys: []api.IPFSRepoGC{}, Error: err.Error(), } diff --git a/cluster_config.go b/cluster_config.go index e743c0b6..5d02add3 100644 --- a/cluster_config.go +++ b/cluster_config.go @@ -9,7 +9,6 @@ import ( "os" "path/filepath" "reflect" - "sync" "time" "github.com/ipfs/ipfs-cluster/config" @@ -59,8 +58,6 @@ type ConnMgrConfig struct { // config.ComponentConfig interface. type Config struct { config.Saver - lock sync.Mutex - peerstoreLock sync.Mutex // User-defined peername for use as human-readable identifier. Peername string @@ -203,7 +200,7 @@ func (cfg *Config) ConfigKey() string { func (cfg *Config) Default() error { cfg.setDefaults() - clusterSecret := make([]byte, 32, 32) + clusterSecret := make([]byte, 32) n, err := rand.Read(clusterSecret) if err != nil { return err diff --git a/cluster_test.go b/cluster_test.go index 16c6ee91..b0e9ace7 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -38,7 +38,6 @@ func (c *mockComponent) Shutdown(ctx context.Context) error { func (c *mockComponent) SetClient(client *rpc.Client) { c.rpcClient = client - return } type mockAPI struct { @@ -213,7 +212,7 @@ func cleanState() { os.RemoveAll(testsFolder) } -func testClusterShutdown(t *testing.T) { +func TestClusterShutdown(t *testing.T) { ctx := context.Background() cl, _, _, _ := testingCluster(t) err := cl.Shutdown(ctx) diff --git a/clusterhost.go b/clusterhost.go index 552709fc..eef36a4b 100644 --- a/clusterhost.go +++ b/clusterhost.go @@ -11,10 +11,10 @@ import ( libp2p "github.com/libp2p/go-libp2p" relay "github.com/libp2p/go-libp2p-circuit" connmgr "github.com/libp2p/go-libp2p-connmgr" + crypto "github.com/libp2p/go-libp2p-core/crypto" + host "github.com/libp2p/go-libp2p-core/host" corepnet "github.com/libp2p/go-libp2p-core/pnet" routing "github.com/libp2p/go-libp2p-core/routing" - crypto "github.com/libp2p/go-libp2p-crypto" - host "github.com/libp2p/go-libp2p-host" dht "github.com/libp2p/go-libp2p-kad-dht" pubsub "github.com/libp2p/go-libp2p-pubsub" libp2pquic "github.com/libp2p/go-libp2p-quic-transport" diff --git a/cmd/ipfs-cluster-ctl/formatters.go b/cmd/ipfs-cluster-ctl/formatters.go index 4622f6e3..dfbf7e99 100644 --- a/cmd/ipfs-cluster-ctl/formatters.go +++ b/cmd/ipfs-cluster-ctl/formatters.go @@ -22,15 +22,14 @@ type addedOutputQuiet struct { } func jsonFormatObject(resp interface{}) { - switch resp.(type) { + switch r := resp.(type) { case nil: return case []*addedOutputQuiet: // print original objects as in JSON it makes // no sense to have a human "quiet" output - serials := resp.([]*addedOutputQuiet) var actual []*api.AddedOutput - for _, s := range serials { + for _, s := range r { actual = append(actual, s.AddedOutput) } jsonFormatPrint(actual) @@ -46,55 +45,55 @@ func jsonFormatPrint(obj interface{}) { } func textFormatObject(resp interface{}) { - switch resp.(type) { + switch r := resp.(type) { case nil: return case string: fmt.Println(resp) case *api.ID: - textFormatPrintID(resp.(*api.ID)) + textFormatPrintID(r) case *api.GlobalPinInfo: - textFormatPrintGPInfo(resp.(*api.GlobalPinInfo)) + textFormatPrintGPInfo(r) case *api.Pin: - textFormatPrintPin(resp.(*api.Pin)) + textFormatPrintPin(r) case *api.AddedOutput: - textFormatPrintAddedOutput(resp.(*api.AddedOutput)) + textFormatPrintAddedOutput(r) case *addedOutputQuiet: - textFormatPrintAddedOutputQuiet(resp.(*addedOutputQuiet)) + textFormatPrintAddedOutputQuiet(r) case *api.Version: - textFormatPrintVersion(resp.(*api.Version)) + textFormatPrintVersion(r) case *api.Error: - textFormatPrintError(resp.(*api.Error)) + textFormatPrintError(r) case *api.Metric: - textFormatPrintMetric(resp.(*api.Metric)) + textFormatPrintMetric(r) case []*api.ID: - for _, item := range resp.([]*api.ID) { + for _, item := range r { textFormatObject(item) } case []*api.GlobalPinInfo: - for _, item := range resp.([]*api.GlobalPinInfo) { + for _, item := range r { textFormatObject(item) } case []*api.Pin: - for _, item := range resp.([]*api.Pin) { + for _, item := range r { textFormatObject(item) } case []*api.AddedOutput: - for _, item := range resp.([]*api.AddedOutput) { + for _, item := range r { textFormatObject(item) } case []*addedOutputQuiet: - for _, item := range resp.([]*addedOutputQuiet) { + for _, item := range r { textFormatObject(item) } case []*api.Metric: - for _, item := range resp.([]*api.Metric) { + for _, item := range r { textFormatObject(item) } case *api.GlobalRepoGC: - textFormatPrintGlobalRepoGC(resp.(*api.GlobalRepoGC)) + textFormatPrintGlobalRepoGC(r) case []string: - for _, item := range resp.([]string) { + for _, item := range r { textFormatObject(item) } default: @@ -163,16 +162,6 @@ func textFormatPrintGPInfo(obj *api.GlobalPinInfo) { } } -func textFormatPrintPInfo(obj *api.PinInfo) { - gpinfo := api.GlobalPinInfo{ - Cid: obj.Cid, - PeerMap: map[string]*api.PinInfo{ - peer.IDB58Encode(obj.Peer): obj, - }, - } - textFormatPrintGPInfo(&gpinfo) -} - func textFormatPrintVersion(obj *api.Version) { fmt.Println(obj.Version) } @@ -230,11 +219,11 @@ func textFormatPrintMetric(obj *api.Metric) { if obj.Name == "freespace" { u, err := strconv.ParseUint(obj.Value, 10, 64) checkErr("parsing to uint64", err) - fmt.Printf("%s | freespace: %s | Expires in: %s\n", peer.IDB58Encode(obj.Peer), humanize.Bytes(u), humanize.Time(time.Unix(0, obj.Expire))) + fmt.Printf("%s | freespace: %s | Expires in: %s\n", peer.Encode(obj.Peer), humanize.Bytes(u), humanize.Time(time.Unix(0, obj.Expire))) return } - fmt.Printf("%s | %s | Expires in: %s\n", peer.IDB58Encode(obj.Peer), obj.Name, humanize.Time(time.Unix(0, obj.Expire))) + fmt.Printf("%s | %s | Expires in: %s\n", peer.Encode(obj.Peer), obj.Name, humanize.Time(time.Unix(0, obj.Expire))) } func textFormatPrintGlobalRepoGC(obj *api.GlobalRepoGC) { diff --git a/cmd/ipfs-cluster-ctl/graph.go b/cmd/ipfs-cluster-ctl/graph.go index 5f030b31..27196345 100644 --- a/cmd/ipfs-cluster-ctl/graph.go +++ b/cmd/ipfs-cluster-ctl/graph.go @@ -7,7 +7,7 @@ import ( "sort" dot "github.com/kishansagathiya/go-dot" - peer "github.com/libp2p/go-libp2p-peer" + peer "github.com/libp2p/go-libp2p-core/peer" "github.com/ipfs/ipfs-cluster/api" ) @@ -38,16 +38,14 @@ const ( tIPFSMissing // Missing IPFS node ) -var errUnfinishedWrite = errors.New("could not complete write of line to output") var errUnknownNodeType = errors.New("unsupported node type. Expected cluster or ipfs") -var errCorruptOrdering = errors.New("expected pid to have an ordering within dot writer") func makeDot(cg *api.ConnectGraph, w io.Writer, allIpfs bool) error { ipfsEdges := make(map[string][]peer.ID) for k, v := range cg.IPFSLinks { ipfsEdges[k] = make([]peer.ID, 0) for _, id := range v { - strPid := peer.IDB58Encode(id) + strPid := peer.Encode(id) if _, ok := cg.IPFSLinks[strPid]; ok || allIpfs { ipfsEdges[k] = append(ipfsEdges[k], id) @@ -65,7 +63,7 @@ func makeDot(cg *api.ConnectGraph, w io.Writer, allIpfs bool) error { dW := dotWriter{ w: w, dotGraph: dot.NewGraph("cluster"), - self: peer.IDB58Encode(cg.ClusterID), + self: peer.Encode(cg.ClusterID), trustMap: cg.ClusterTrustLinks, idToPeername: cg.IDtoPeername, ipfsEdges: ipfsEdges, @@ -207,7 +205,7 @@ func (dW *dotWriter) print() error { v := dW.clusterEdges[k] for _, id := range v { toNode := dW.clusterNodes[k] - fromNode := dW.clusterNodes[peer.IDB58Encode(id)] + fromNode := dW.clusterNodes[peer.Encode(id)] dW.dotGraph.AddEdge(toNode, fromNode, true, "") } } @@ -229,7 +227,7 @@ func (dW *dotWriter) print() error { continue } - fromNode, ok = dW.ipfsNodes[peer.IDB58Encode(ipfsID)] + fromNode, ok = dW.ipfsNodes[peer.Encode(ipfsID)] if !ok { logger.Error("expected a node at this id") continue @@ -244,7 +242,7 @@ func (dW *dotWriter) print() error { v := dW.ipfsEdges[k] toNode := dW.ipfsNodes[k] for _, id := range v { - idStr := peer.IDB58Encode(id) + idStr := peer.Encode(id) fromNode, ok := dW.ipfsNodes[idStr] if !ok { logger.Error("expected a node here") @@ -257,7 +255,7 @@ func (dW *dotWriter) print() error { } func sortedKeys(dict map[string][]peer.ID) []string { - keys := make([]string, len(dict), len(dict)) + keys := make([]string, len(dict)) i := 0 for k := range dict { keys[i] = k diff --git a/cmd/ipfs-cluster-ctl/graph_test.go b/cmd/ipfs-cluster-ctl/graph_test.go index 5597086f..2978efa3 100644 --- a/cmd/ipfs-cluster-ctl/graph_test.go +++ b/cmd/ipfs-cluster-ctl/graph_test.go @@ -67,53 +67,53 @@ I2 -> I0 }` var ( - pid1, _ = peer.IDB58Decode("QmUBuxVHoNNjfmNpTad36UeaFQv3gXAtCv9r6KhmeqhEhD") - pid2, _ = peer.IDB58Decode("QmV35LjbEGPfN7KfMAJp43VV2enwXqqQf5esx4vUcgHDQJ") - pid3, _ = peer.IDB58Decode("QmZ2ckU7G35MYyJgMTwMUnicsGqSy3YUxGBX7qny6MQmJu") - pid4, _ = peer.IDB58Decode("QmXbiVZd93SLiu9TAm21F2y9JwGiFLydbEVkPBaMR3DZDV") - pid5, _ = peer.IDB58Decode("QmPFKAGZbUjdzt8BBx8VTWCe9UeUQVcoqHFehSPzN5LSsq") - pid6, _ = peer.IDB58Decode("QmbU7273zH6jxwDe2nqRmEm2rp5PpqP2xeQr2xCmwbBsuL") + pid1, _ = peer.Decode("QmUBuxVHoNNjfmNpTad36UeaFQv3gXAtCv9r6KhmeqhEhD") + pid2, _ = peer.Decode("QmV35LjbEGPfN7KfMAJp43VV2enwXqqQf5esx4vUcgHDQJ") + pid3, _ = peer.Decode("QmZ2ckU7G35MYyJgMTwMUnicsGqSy3YUxGBX7qny6MQmJu") + pid4, _ = peer.Decode("QmXbiVZd93SLiu9TAm21F2y9JwGiFLydbEVkPBaMR3DZDV") + pid5, _ = peer.Decode("QmPFKAGZbUjdzt8BBx8VTWCe9UeUQVcoqHFehSPzN5LSsq") + pid6, _ = peer.Decode("QmbU7273zH6jxwDe2nqRmEm2rp5PpqP2xeQr2xCmwbBsuL") - pid7, _ = peer.IDB58Decode("QmQsdAdCHs4PRLi5tcoLfasYppryqQENxgAy4b2aS8xccb") - pid8, _ = peer.IDB58Decode("QmVV2enwXqqQf5esx4v36UeaFQvFehSPzNfi8aaaaaanM8") - pid9, _ = peer.IDB58Decode("QmfCHNQ2vbUmAuJZhE2hEpgiJq4sL1XScWEKnUrVtWZdeD") + pid7, _ = peer.Decode("QmQsdAdCHs4PRLi5tcoLfasYppryqQENxgAy4b2aS8xccb") + pid8, _ = peer.Decode("QmVV2enwXqqQf5esx4v36UeaFQvFehSPzNfi8aaaaaanM8") + pid9, _ = peer.Decode("QmfCHNQ2vbUmAuJZhE2hEpgiJq4sL1XScWEKnUrVtWZdeD") ) func TestSimpleIpfsGraphs(t *testing.T) { cg := api.ConnectGraph{ ClusterID: pid1, ClusterLinks: map[string][]peer.ID{ - peer.IDB58Encode(pid1): []peer.ID{ + peer.Encode(pid1): []peer.ID{ pid2, pid3, }, - peer.IDB58Encode(pid2): []peer.ID{ + peer.Encode(pid2): []peer.ID{ pid1, pid3, }, - peer.IDB58Encode(pid3): []peer.ID{ + peer.Encode(pid3): []peer.ID{ pid1, pid2, }, }, IPFSLinks: map[string][]peer.ID{ - peer.IDB58Encode(pid4): []peer.ID{ + peer.Encode(pid4): []peer.ID{ pid5, pid6, }, - peer.IDB58Encode(pid5): []peer.ID{ + peer.Encode(pid5): []peer.ID{ pid4, pid6, }, - peer.IDB58Encode(pid6): []peer.ID{ + peer.Encode(pid6): []peer.ID{ pid4, pid5, }, }, ClustertoIPFS: map[string]peer.ID{ - peer.IDB58Encode(pid1): pid4, - peer.IDB58Encode(pid2): pid5, - peer.IDB58Encode(pid3): pid6, + peer.Encode(pid1): pid4, + peer.Encode(pid2): pid5, + peer.Encode(pid3): pid6, }, } buf := new(bytes.Buffer) @@ -181,35 +181,35 @@ func TestIpfsAllGraphs(t *testing.T) { cg := api.ConnectGraph{ ClusterID: pid1, ClusterLinks: map[string][]peer.ID{ - peer.IDB58Encode(pid1): []peer.ID{ + peer.Encode(pid1): []peer.ID{ pid2, pid3, }, - peer.IDB58Encode(pid2): []peer.ID{ + peer.Encode(pid2): []peer.ID{ pid1, pid3, }, - peer.IDB58Encode(pid3): []peer.ID{ + peer.Encode(pid3): []peer.ID{ pid1, pid2, }, }, IPFSLinks: map[string][]peer.ID{ - peer.IDB58Encode(pid4): []peer.ID{ + peer.Encode(pid4): []peer.ID{ pid5, pid6, pid7, pid8, pid9, }, - peer.IDB58Encode(pid5): []peer.ID{ + peer.Encode(pid5): []peer.ID{ pid4, pid6, pid7, pid8, pid9, }, - peer.IDB58Encode(pid6): []peer.ID{ + peer.Encode(pid6): []peer.ID{ pid4, pid5, pid7, @@ -218,9 +218,9 @@ func TestIpfsAllGraphs(t *testing.T) { }, }, ClustertoIPFS: map[string]peer.ID{ - peer.IDB58Encode(pid1): pid4, - peer.IDB58Encode(pid2): pid5, - peer.IDB58Encode(pid3): pid6, + peer.Encode(pid1): pid4, + peer.Encode(pid2): pid5, + peer.Encode(pid3): pid6, }, } diff --git a/cmd/ipfs-cluster-ctl/main.go b/cmd/ipfs-cluster-ctl/main.go index 9e6aab29..4edda51d 100644 --- a/cmd/ipfs-cluster-ctl/main.go +++ b/cmd/ipfs-cluster-ctl/main.go @@ -20,7 +20,6 @@ import ( peer "github.com/libp2p/go-libp2p-core/peer" ma "github.com/multiformats/go-multiaddr" - "contrib.go.opencensus.io/exporter/jaeger" uuid "github.com/google/uuid" cli "github.com/urfave/cli" ) @@ -34,16 +33,12 @@ const Version = "0.12.1" var ( defaultHost = "/ip4/127.0.0.1/tcp/9094" defaultTimeout = 0 - defaultUsername = "" - defaultPassword = "" defaultWaitCheckFreq = time.Second defaultAddParams = api.DefaultAddParams() ) var logger = logging.Logger("cluster-ctl") -var tracer *jaeger.Exporter - var globalClient client.Client // Description provides a short summary of the functionality of this tool @@ -70,9 +65,9 @@ https://github.com/ipfs/ipfs-cluster. programName, defaultHost) -type peerAddBody struct { - Addr string `json:"peer_multiaddress"` -} +// type peerAddBody struct { +// Addr string `json:"peer_multiaddress"` +// } func out(m string, a ...interface{}) { fmt.Fprintf(os.Stderr, m, a...) @@ -245,7 +240,7 @@ cluster peers. Flags: []cli.Flag{}, Action: func(c *cli.Context) error { pid := c.Args().First() - p, err := peer.IDB58Decode(pid) + p, err := peer.Decode(pid) checkErr("parsing peer ID", err) cerr := globalClient.PeerRm(ctx, p) formatResponse(c, nil, cerr) @@ -405,7 +400,7 @@ content. } // Read arguments (paths) - paths := make([]string, c.NArg(), c.NArg()) + paths := make([]string, c.NArg()) for i, path := range c.Args() { paths[i] = path } @@ -979,14 +974,6 @@ deamon, otherwise on all IPFS daemons. app.Run(os.Args) } -func parseFlag(t int) cli.IntFlag { - return cli.IntFlag{ - Name: "parseAs", - Value: t, - Hidden: true, - } -} - func localFlag() cli.BoolFlag { return cli.BoolFlag{ Name: "local", diff --git a/cmdutils/cmdutils.go b/cmdutils/cmdutils.go index df8327b6..e4bda146 100644 --- a/cmdutils/cmdutils.go +++ b/cmdutils/cmdutils.go @@ -16,7 +16,7 @@ import ( "github.com/ipfs/go-datastore" ipfscluster "github.com/ipfs/ipfs-cluster" ipfshttp "github.com/ipfs/ipfs-cluster/ipfsconn/ipfshttp" - host "github.com/libp2p/go-libp2p-host" + host "github.com/libp2p/go-libp2p-core/host" dht "github.com/libp2p/go-libp2p-kad-dht" ma "github.com/multiformats/go-multiaddr" "github.com/pkg/errors" diff --git a/config/identity.go b/config/identity.go index e6f39d1e..1f479a43 100644 --- a/config/identity.go +++ b/config/identity.go @@ -7,8 +7,8 @@ import ( "fmt" "io/ioutil" + crypto "github.com/libp2p/go-libp2p-core/crypto" peer "github.com/libp2p/go-libp2p-core/peer" - crypto "github.com/libp2p/go-libp2p-crypto" "github.com/kelseyhightower/envconfig" ) @@ -122,7 +122,7 @@ func (ident *Identity) LoadJSON(raw []byte) error { } func (ident *Identity) applyIdentityJSON(jID *identityJSON) error { - pid, err := peer.IDB58Decode(jID.ID) + pid, err := peer.Decode(jID.ID) if err != nil { err = fmt.Errorf("error decoding cluster ID: %s", err) return err diff --git a/connect_graph.go b/connect_graph.go index 88b7f2f6..0c9342c2 100644 --- a/connect_graph.go +++ b/connect_graph.go @@ -30,10 +30,10 @@ func (c *Cluster) ConnectGraph() (api.ConnectGraph, error) { for _, member := range members { // one of the entries is for itself, but that shouldn't hurt - cg.ClusterTrustLinks[peer.IDB58Encode(member)] = c.consensus.IsTrustedPeer(ctx, member) + cg.ClusterTrustLinks[peer.Encode(member)] = c.consensus.IsTrustedPeer(ctx, member) } - peers := make([][]*api.ID, len(members), len(members)) + peers := make([][]*api.ID, len(members)) ctxs, cancels := rpcutil.CtxsWithCancel(ctx, len(members)) defer rpcutil.MultiCancel(cancels) @@ -48,7 +48,7 @@ func (c *Cluster) ConnectGraph() (api.ConnectGraph, error) { ) for i, err := range errs { - p := peer.IDB58Encode(members[i]) + p := peer.Encode(members[i]) cg.ClusterLinks[p] = make([]peer.ID, 0) if err != nil { // Only setting cluster connections when no error occurs logger.Debugf("RPC error reaching cluster peer %s: %s", p, err.Error()) @@ -76,7 +76,7 @@ func (c *Cluster) recordClusterLinks(cg *api.ConnectGraph, p string, peers []*ap logger.Debugf("Peer %s errored connecting to its peer %s", p, id.ID.Pretty()) continue } - if peer.IDB58Encode(id.ID) == p { + if peer.Encode(id.ID) == p { selfConnection = true pID = id } else { @@ -93,8 +93,8 @@ func (c *Cluster) recordIPFSLinks(cg *api.ConnectGraph, pID *api.ID) { return } - pid := peer.IDB58Encode(pID.ID) - ipfsPid := peer.IDB58Encode(ipfsID) + pid := peer.Encode(pID.ID) + ipfsPid := peer.Encode(ipfsID) if _, ok := cg.IPFSLinks[pid]; ok { logger.Warnf("ipfs id: %s already recorded, one ipfs daemon in use by multiple cluster peers", ipfsID.Pretty()) diff --git a/consensus/crdt/config.go b/consensus/crdt/config.go index e2c2ba0a..3f540a56 100644 --- a/consensus/crdt/config.go +++ b/consensus/crdt/config.go @@ -115,7 +115,7 @@ func (cfg *Config) applyJSONConfig(jcfg *jsonConfig) error { cfg.TrustedPeers = []peer.ID{} break } - pid, err := peer.IDB58Decode(p) + pid, err := peer.Decode(p) if err != nil { return fmt.Errorf("error parsing trusted peers: %s", err) } diff --git a/consensus/crdt/config_test.go b/consensus/crdt/config_test.go index f9f83f22..684452bf 100644 --- a/consensus/crdt/config_test.go +++ b/consensus/crdt/config_test.go @@ -39,6 +39,10 @@ func TestLoadJSON(t *testing.T) { "cluster_name": "test", "trusted_peers": [] }`)) + if err != nil { + t.Fatal(err) + } + if cfg.TrustAll { t.Error("TrustAll is only enabled with '*'") } diff --git a/consensus/crdt/consensus.go b/consensus/crdt/consensus.go index 620f3d57..89d8dcac 100644 --- a/consensus/crdt/consensus.go +++ b/consensus/crdt/consensus.go @@ -323,7 +323,7 @@ func (css *Consensus) Ready(ctx context.Context) <-chan struct{} { // IsTrustedPeer returns whether the given peer is taken into account // when submitting updates to the consensus state. func (css *Consensus) IsTrustedPeer(ctx context.Context, pid peer.ID) bool { - ctx, span := trace.StartSpan(ctx, "consensus/IsTrustedPeer") + _, span := trace.StartSpan(ctx, "consensus/IsTrustedPeer") defer span.End() if css.config.TrustAll { @@ -343,7 +343,7 @@ func (css *Consensus) IsTrustedPeer(ctx context.Context, pid peer.ID) bool { // has the highest priority when the peerstore is saved, and it's addresses // are always remembered. func (css *Consensus) Trust(ctx context.Context, pid peer.ID) error { - ctx, span := trace.StartSpan(ctx, "consensus/Trust") + _, span := trace.StartSpan(ctx, "consensus/Trust") defer span.End() css.trustedPeers.Store(pid, struct{}{}) @@ -358,7 +358,7 @@ func (css *Consensus) Trust(ctx context.Context, pid peer.ID) error { // Distrust removes a peer from the "trusted" set. func (css *Consensus) Distrust(ctx context.Context, pid peer.ID) error { - ctx, span := trace.StartSpan(ctx, "consensus/Distrust") + _, span := trace.StartSpan(ctx, "consensus/Distrust") defer span.End() css.trustedPeers.Delete(pid) @@ -500,8 +500,7 @@ func OfflineState(cfg *Config, store ds.Datastore) (state.BatchingState, error) opts := crdt.DefaultOptions() opts.Logger = logger - var blocksDatastore ds.Batching - blocksDatastore = namespace.Wrap( + var blocksDatastore ds.Batching = namespace.Wrap( batching, ds.NewKey(cfg.DatastoreNamespace).ChildString(blocksNs), ) diff --git a/consensus/raft/consensus.go b/consensus/raft/consensus.go index 69b39013..21c1d5c0 100644 --- a/consensus/raft/consensus.go +++ b/consensus/raft/consensus.go @@ -21,7 +21,6 @@ import ( peer "github.com/libp2p/go-libp2p-core/peer" rpc "github.com/libp2p/go-libp2p-gorpc" libp2praft "github.com/libp2p/go-libp2p-raft" - ma "github.com/multiformats/go-multiaddr" "go.opencensus.io/tag" "go.opencensus.io/trace" @@ -223,7 +222,7 @@ func (cc *Consensus) SetClient(c *rpc.Client) { // Ready returns a channel which is signaled when the Consensus // algorithm has finished bootstrapping and is ready to use func (cc *Consensus) Ready(ctx context.Context) <-chan struct{} { - ctx, span := trace.StartSpan(ctx, "consensus/Ready") + _, span := trace.StartSpan(ctx, "consensus/Ready") defer span.End() return cc.readyCh @@ -276,7 +275,7 @@ func (cc *Consensus) redirectToLeader(method string, arg interface{}) (bool, err if err != nil { return false, fmt.Errorf("timed out waiting for leader: %s", err) } - leader, err = peer.IDB58Decode(pidstr) + leader, err = peer.Decode(pidstr) if err != nil { return false, err } @@ -409,7 +408,7 @@ func (cc *Consensus) AddPeer(ctx context.Context, pid peer.ID) error { } // Being here means we are the leader and can commit cc.shutdownLock.RLock() // do not shutdown while committing - finalErr = cc.raft.AddPeer(ctx, peer.IDB58Encode(pid)) + finalErr = cc.raft.AddPeer(ctx, peer.Encode(pid)) cc.shutdownLock.RUnlock() if finalErr != nil { @@ -440,7 +439,7 @@ func (cc *Consensus) RmPeer(ctx context.Context, pid peer.ID) error { } // Being here means we are the leader and can commit cc.shutdownLock.RLock() // do not shutdown while committing - finalErr = cc.raft.RemovePeer(ctx, peer.IDB58Encode(pid)) + finalErr = cc.raft.RemovePeer(ctx, peer.Encode(pid)) cc.shutdownLock.RUnlock() if finalErr != nil { time.Sleep(cc.config.CommitRetryDelay) @@ -458,7 +457,7 @@ func (cc *Consensus) RmPeer(ctx context.Context, pid peer.ID) error { // writes to the shared state should happen through the Consensus component // methods. func (cc *Consensus) State(ctx context.Context) (state.ReadOnly, error) { - ctx, span := trace.StartSpan(ctx, "consensus/State") + _, span := trace.StartSpan(ctx, "consensus/State") defer span.End() st, err := cc.consensus.GetLogHead() @@ -479,7 +478,7 @@ func (cc *Consensus) State(ctx context.Context) (state.ReadOnly, error) { // Leader returns the peerID of the Leader of the // cluster. It returns an error when there is no leader. func (cc *Consensus) Leader(ctx context.Context) (peer.ID, error) { - ctx, span := trace.StartSpan(ctx, "consensus/Leader") + _, span := trace.StartSpan(ctx, "consensus/Leader") defer span.End() // Note the hard-dependency on raft here... @@ -489,7 +488,7 @@ func (cc *Consensus) Leader(ctx context.Context) (peer.ID, error) { // Clean removes the Raft persisted state. func (cc *Consensus) Clean(ctx context.Context) error { - ctx, span := trace.StartSpan(ctx, "consensus/Clean") + _, span := trace.StartSpan(ctx, "consensus/Clean") defer span.End() cc.shutdownLock.RLock() @@ -532,7 +531,7 @@ func (cc *Consensus) Peers(ctx context.Context) ([]peer.ID, error) { sort.Strings(raftPeers) for _, p := range raftPeers { - id, err := peer.IDB58Decode(p) + id, err := peer.Decode(p) if err != nil { panic("could not decode peer") } @@ -541,14 +540,6 @@ func (cc *Consensus) Peers(ctx context.Context) ([]peer.ID, error) { return peers, nil } -func parsePIDFromMultiaddr(addr ma.Multiaddr) string { - pidstr, err := addr.ValueForProtocol(ma.P_P2P) - if err != nil { - panic("peer badly encoded") - } - return pidstr -} - // OfflineState state returns a cluster state by reading the Raft data and // writing it to the given datastore which is then wrapped as a state.State. // Usually an in-memory datastore suffices. The given datastore should be diff --git a/consensus/raft/raft.go b/consensus/raft/raft.go index 400a9d83..3c320b03 100644 --- a/consensus/raft/raft.go +++ b/consensus/raft/raft.go @@ -20,10 +20,6 @@ import ( "go.opencensus.io/trace" ) -// errBadRaftState is returned when the consensus component cannot start -// because the cluster peers do not match the raft peers. -var errBadRaftState = errors.New("cluster peers do not match raft peers") - // ErrWaitingForSelf is returned when we are waiting for ourselves to depart // the peer set, which won't happen var errWaitingForSelf = errors.New("waiting for ourselves to depart") @@ -78,7 +74,7 @@ func newRaftWrapper( raftW.host = host raftW.staging = staging // Set correct LocalID - cfg.RaftConfig.LocalID = hraft.ServerID(peer.IDB58Encode(host.ID())) + cfg.RaftConfig.LocalID = hraft.ServerID(peer.Encode(host.ID())) df := cfg.GetDataFolder() err := makeDataFolder(df) @@ -235,7 +231,7 @@ func makeServerConf(peers []peer.ID) hraft.Configuration { // Servers are peers + self. We avoid duplicate entries below for _, pid := range peers { - p := peer.IDB58Encode(pid) + p := peer.Encode(pid) _, ok := sm[p] if !ok { // avoid dups sm[p] = struct{}{} @@ -277,7 +273,7 @@ func (rw *raftWrapper) WaitForVoter(ctx context.Context) error { logger.Debug("waiting until we are promoted to a voter") - pid := hraft.ServerID(peer.IDB58Encode(rw.host.ID())) + pid := hraft.ServerID(peer.Encode(rw.host.ID())) for { select { case <-ctx.Done(): @@ -388,26 +384,21 @@ func (rw *raftWrapper) Snapshot() error { func (rw *raftWrapper) snapshotOnShutdown() error { var err error for i := 0; i < maxShutdownSnapshotRetries; i++ { - done := false ctx, cancel := context.WithTimeout(context.Background(), waitForUpdatesShutdownTimeout) - err := rw.WaitForUpdates(ctx) + err = rw.WaitForUpdates(ctx) cancel() if err != nil { logger.Warn("timed out waiting for state updates before shutdown. Snapshotting may fail") - done = true // let's not wait for updates again + return rw.Snapshot() } err = rw.Snapshot() - if err != nil { - err = errors.New("could not snapshot raft: " + err.Error()) - } else { - err = nil - done = true + if err == nil { + return nil // things worked } - if done { - break - } + // There was an error + err = errors.New("could not snapshot raft: " + err.Error()) logger.Warnf("retrying to snapshot (%d/%d)...", i+1, maxShutdownSnapshotRetries) } return err @@ -415,7 +406,7 @@ func (rw *raftWrapper) snapshotOnShutdown() error { // Shutdown shutdown Raft and closes the BoltDB. func (rw *raftWrapper) Shutdown(ctx context.Context) error { - ctx, span := trace.StartSpan(ctx, "consensus/raft/Shutdown") + _, span := trace.StartSpan(ctx, "consensus/raft/Shutdown") defer span.End() errMsgs := "" @@ -511,14 +502,14 @@ func (rw *raftWrapper) RemovePeer(ctx context.Context, peer string) error { // Leader returns Raft's leader. It may be an empty string if // there is no leader or it is unknown. func (rw *raftWrapper) Leader(ctx context.Context) string { - ctx, span := trace.StartSpan(ctx, "consensus/raft/Leader") + _, span := trace.StartSpan(ctx, "consensus/raft/Leader") defer span.End() return string(rw.raft.Leader()) } func (rw *raftWrapper) Peers(ctx context.Context) ([]string, error) { - ctx, span := trace.StartSpan(ctx, "consensus/raft/Peers") + _, span := trace.StartSpan(ctx, "consensus/raft/Peers") defer span.End() ids := make([]string, 0) @@ -594,8 +585,7 @@ func SnapshotSave(cfg *Config, newState state.State, pids []peer.ID) error { } // make a new raft snapshot - var raftSnapVersion hraft.SnapshotVersion - raftSnapVersion = 1 // As of hraft v1.0.0 this is always 1 + var raftSnapVersion hraft.SnapshotVersion = 1 // As of hraft v1.0.0 this is always 1 configIndex := uint64(1) var raftIndex uint64 var raftTerm uint64 @@ -692,7 +682,7 @@ func (rw *raftWrapper) observePeers() { case obs := <-obsCh: pObs := obs.Data.(hraft.PeerObservation) logger.Info("raft peer departed. Removing from peerstore: ", pObs.Peer.ID) - pID, err := peer.IDB58Decode(string(pObs.Peer.ID)) + pID, err := peer.Decode(string(pObs.Peer.ID)) if err != nil { logger.Error(err) continue diff --git a/informer/disk/disk.go b/informer/disk/disk.go index b8a32640..b9b9f31d 100644 --- a/informer/disk/disk.go +++ b/informer/disk/disk.go @@ -59,7 +59,7 @@ func (disk *Informer) SetClient(c *rpc.Client) { // Shutdown is called on cluster shutdown. We just invalidate // any metrics from this point. func (disk *Informer) Shutdown(ctx context.Context) error { - ctx, span := trace.StartSpan(ctx, "informer/disk/Shutdown") + _, span := trace.StartSpan(ctx, "informer/disk/Shutdown") defer span.End() disk.rpcClient = nil diff --git a/informer/numpin/numpin.go b/informer/numpin/numpin.go index 4a400be2..da4c4aff 100644 --- a/informer/numpin/numpin.go +++ b/informer/numpin/numpin.go @@ -44,7 +44,7 @@ func (npi *Informer) SetClient(c *rpc.Client) { // Shutdown is called on cluster shutdown. We just invalidate // any metrics from this point. func (npi *Informer) Shutdown(ctx context.Context) error { - ctx, span := trace.StartSpan(ctx, "informer/numpin/Shutdown") + _, span := trace.StartSpan(ctx, "informer/numpin/Shutdown") defer span.End() npi.rpcClient = nil diff --git a/ipfscluster_test.go b/ipfscluster_test.go index 05cbed25..48685fa1 100644 --- a/ipfscluster_test.go +++ b/ipfscluster_test.go @@ -132,7 +132,7 @@ func TestMain(m *testing.M) { } func randomBytes() []byte { - bs := make([]byte, 64, 64) + bs := make([]byte, 64) for i := 0; i < len(bs); i++ { b := byte(rand.Int()) bs[i] = b @@ -283,9 +283,9 @@ func createOnePeerCluster(t *testing.T, nth int, clusterSecret []byte) (*Cluster } func createHosts(t *testing.T, clusterSecret []byte, nClusters int) ([]host.Host, []*pubsub.PubSub, []*dht.IpfsDHT) { - hosts := make([]host.Host, nClusters, nClusters) - pubsubs := make([]*pubsub.PubSub, nClusters, nClusters) - dhts := make([]*dht.IpfsDHT, nClusters, nClusters) + hosts := make([]host.Host, nClusters) + pubsubs := make([]*pubsub.PubSub, nClusters) + dhts := make([]*dht.IpfsDHT, nClusters) tcpaddr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/0") quicAddr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/udp/0/quic") @@ -337,19 +337,19 @@ func newTestDHT(ctx context.Context, h host.Host) (*dht.IpfsDHT, error) { func createClusters(t *testing.T) ([]*Cluster, []*test.IpfsMock) { ctx := context.Background() os.RemoveAll(testsFolder) - cfgs := make([]*Config, nClusters, nClusters) - stores := make([]ds.Datastore, nClusters, nClusters) - cons := make([]Consensus, nClusters, nClusters) - apis := make([][]API, nClusters, nClusters) - ipfss := make([]IPFSConnector, nClusters, nClusters) - trackers := make([]PinTracker, nClusters, nClusters) - mons := make([]PeerMonitor, nClusters, nClusters) - allocs := make([]PinAllocator, nClusters, nClusters) - infs := make([]Informer, nClusters, nClusters) - tracers := make([]Tracer, nClusters, nClusters) - ipfsMocks := make([]*test.IpfsMock, nClusters, nClusters) + cfgs := make([]*Config, nClusters) + stores := make([]ds.Datastore, nClusters) + cons := make([]Consensus, nClusters) + apis := make([][]API, nClusters) + ipfss := make([]IPFSConnector, nClusters) + trackers := make([]PinTracker, nClusters) + mons := make([]PeerMonitor, nClusters) + allocs := make([]PinAllocator, nClusters) + infs := make([]Informer, nClusters) + tracers := make([]Tracer, nClusters) + ipfsMocks := make([]*test.IpfsMock, nClusters) - clusters := make([]*Cluster, nClusters, nClusters) + clusters := make([]*Cluster, nClusters) // Uncomment when testing with fixed ports // clusterPeers := make([]ma.Multiaddr, nClusters, nClusters) @@ -602,7 +602,7 @@ func TestClustersPeersRetainOrder(t *testing.T) { t.Fatal(err) } - if bytes.Compare(peers1, peers2) != 0 { + if !bytes.Equal(peers1, peers2) { t.Error("expected both results to be same") } } @@ -704,10 +704,10 @@ func TestClustersPinUpdate(t *testing.T) { ttlDelay() - h, err := prefix.Sum(randomBytes()) // create random cid - h2, err := prefix.Sum(randomBytes()) // create random cid + h, _ := prefix.Sum(randomBytes()) // create random cid + h2, _ := prefix.Sum(randomBytes()) // create random cid - _, err = clusters[0].PinUpdate(ctx, h, h2, api.PinOptions{}) + _, err := clusters[0].PinUpdate(ctx, h, h2, api.PinOptions{}) if err == nil || err != state.ErrNotFound { t.Fatal("pin update should fail when from is not pinned") } @@ -778,7 +778,7 @@ func TestClustersStatusAll(t *testing.T) { t.Error("bad info in status") } - pid := peer.IDB58Encode(c.host.ID()) + pid := peer.Encode(c.host.ID()) if info[pid].Status != api.TrackerStatusPinned { t.Error("the hash should have been pinned") } @@ -839,7 +839,7 @@ func TestClustersStatusAllWithErrors(t *testing.T) { t.Error("bad number of peers in status") } - pid := peer.IDB58Encode(clusters[1].id) + pid := peer.Encode(clusters[1].id) errst := stts.PeerMap[pid] if !errst.Cid.Equals(h) { @@ -896,20 +896,20 @@ func TestClustersRecoverLocal(t *testing.T) { pinDelay() f := func(t *testing.T, c *Cluster) { - info, err := c.RecoverLocal(ctx, h) + _, err := c.RecoverLocal(ctx, h) if err != nil { t.Fatal(err) } // Wait for queue to be processed delay() - info = c.StatusLocal(ctx, h) + info := c.StatusLocal(ctx, h) if info.Status != api.TrackerStatusPinError { t.Errorf("element is %s and not PinError", info.Status) } // Recover good ID - info, err = c.RecoverLocal(ctx, h2) + info, _ = c.RecoverLocal(ctx, h2) if info.Status != api.TrackerStatusPinned { t.Error("element should be in Pinned state") } @@ -951,7 +951,7 @@ func TestClustersRecover(t *testing.T) { t.Fatal(err) } - pinfo, ok := ginfo.PeerMap[peer.IDB58Encode(clusters[j].host.ID())] + pinfo, ok := ginfo.PeerMap[peer.Encode(clusters[j].host.ID())] if !ok { t.Fatal("should have info for this host") } @@ -960,7 +960,7 @@ func TestClustersRecover(t *testing.T) { } for _, c := range clusters { - inf, ok := ginfo.PeerMap[peer.IDB58Encode(c.host.ID())] + inf, ok := ginfo.PeerMap[peer.Encode(c.host.ID())] if !ok { t.Fatal("GlobalPinInfo should not be empty for this host") } @@ -985,7 +985,7 @@ func TestClustersRecover(t *testing.T) { } for _, c := range clusters { - inf, ok := ginfo.PeerMap[peer.IDB58Encode(c.host.ID())] + inf, ok := ginfo.PeerMap[peer.Encode(c.host.ID())] if !ok { t.Fatal("GlobalPinInfo should have this cluster") } @@ -1334,7 +1334,7 @@ func TestClustersReplicationFactorMin(t *testing.T) { t.Error("Pin should have failed as rplMin cannot be satisfied") } t.Log(err) - if !strings.Contains(err.Error(), fmt.Sprintf("not enough peers to allocate CID")) { + if !strings.Contains(err.Error(), "not enough peers to allocate CID") { t.Fatal(err) } } @@ -1672,7 +1672,7 @@ func TestClustersRebalanceOnPeerDown(t *testing.T) { // kill the local pinner for _, c := range clusters { - clid := peer.IDB58Encode(c.id) + clid := peer.Encode(c.id) if clid == localPinner { c.Shutdown(ctx) } else if clid == remotePinner { @@ -1709,7 +1709,7 @@ func validateClusterGraph(t *testing.T, graph api.ConnectGraph, clusterIDs map[s // Make lookup index for peers connected to id1 peerIndex := make(map[string]struct{}) for _, p := range peers { - peerIndex[peer.IDB58Encode(p)] = struct{}{} + peerIndex[peer.Encode(p)] = struct{}{} } for id2 := range clusterIDs { if _, ok := peerIndex[id2]; id1 != id2 && !ok { @@ -1736,7 +1736,7 @@ func validateClusterGraph(t *testing.T, graph api.ConnectGraph, clusterIDs map[s if len(graph.IPFSLinks) != 1 { t.Error("Expected exactly one ipfs peer for all cluster nodes, the mocked peer") } - links, ok := graph.IPFSLinks[peer.IDB58Encode(test.PeerID1)] + links, ok := graph.IPFSLinks[peer.Encode(test.PeerID1)] if !ok { t.Error("Expected the mocked ipfs peer to be a node in the graph") } else { @@ -1778,7 +1778,7 @@ func TestClustersGraphConnected(t *testing.T) { clusterIDs := make(map[string]struct{}) for _, c := range clusters { - id := peer.IDB58Encode(c.ID(ctx).ID) + id := peer.Encode(c.ID(ctx).ID) clusterIDs[id] = struct{}{} } validateClusterGraph(t, graph, clusterIDs, nClusters) @@ -1827,7 +1827,7 @@ func TestClustersGraphUnhealthy(t *testing.T) { if i == discon1 || i == discon2 { continue } - id := peer.IDB58Encode(c.ID(ctx).ID) + id := peer.Encode(c.ID(ctx).ID) clusterIDs[id] = struct{}{} } peerNum := nClusters diff --git a/ipfsconn/ipfshttp/ipfshttp.go b/ipfsconn/ipfshttp/ipfshttp.go index 951c3849..f9b4e5a4 100644 --- a/ipfsconn/ipfshttp/ipfshttp.go +++ b/ipfsconn/ipfshttp/ipfshttp.go @@ -45,10 +45,6 @@ var logger = logging.Logger("ipfshttp") // only the 10th will trigger a SendInformerMetrics call. var updateMetricMod = 10 -// progressTick sets how often we check progress when doing refs and pins -// requests. -var progressTick = 5 * time.Second - // Connector implements the IPFSConnector interface // and provides a component which is used to perform // on-demand requests against the configured IPFS daemom @@ -99,11 +95,6 @@ type ipfsRepoGCResp struct { Error string } -type ipfsRefsResp struct { - Ref string - Err string -} - type ipfsPinsResp struct { Pins []string Progress int @@ -122,10 +113,6 @@ type ipfsPeer struct { Peer string } -type ipfsStream struct { - Protocol string -} - // NewConnector creates the component and leaves it ready to be started func NewConnector(cfg *Config) (*Connector, error) { err := cfg.Validate() @@ -222,7 +209,7 @@ func (ipfs *Connector) SetClient(c *rpc.Client) { // Shutdown stops any listeners and stops the component from taking // any requests. func (ipfs *Connector) Shutdown(ctx context.Context) error { - ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/Shutdown") + _, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/Shutdown") defer span.End() ipfs.shutdownLock.Lock() @@ -266,7 +253,7 @@ func (ipfs *Connector) ID(ctx context.Context) (*api.IPFSID, error) { return nil, err } - pID, err := peer.IDB58Decode(res.ID) + pID, err := peer.Decode(res.ID) if err != nil { return nil, err } @@ -275,7 +262,7 @@ func (ipfs *Connector) ID(ctx context.Context) (*api.IPFSID, error) { ID: pID, } - mAddrs := make([]api.Multiaddr, len(res.Addresses), len(res.Addresses)) + mAddrs := make([]api.Multiaddr, len(res.Addresses)) for i, strAddr := range res.Addresses { mAddr, err := api.NewMultiaddr(strAddr) if err != nil { @@ -628,24 +615,6 @@ func (ipfs *Connector) postCtx(ctx context.Context, path string, contentType str return body, nil } -// postDiscardBodyCtx makes a POST requests but discards the body -// of the response directly after reading it. -func (ipfs *Connector) postDiscardBodyCtx(ctx context.Context, path string) error { - res, err := ipfs.doPostCtx(ctx, ipfs.client, ipfs.apiURL(), path, "", nil) - if err != nil { - return err - } - defer res.Body.Close() - - _, err = checkResponse(path, res) - if err != nil { - return err - } - - _, err = io.Copy(ioutil.Discard, res.Body) - return err -} - // apiURL is a short-hand for building the url of the IPFS // daemon API. func (ipfs *Connector) apiURL() string { @@ -736,9 +705,8 @@ func getConfigValue(path []string, cfg map[string]interface{}) (interface{}, err return value, nil } - switch value.(type) { + switch v := value.(type) { case map[string]interface{}: - v := value.(map[string]interface{}) return getConfigValue(path[1:], v) default: return nil, errors.New("invalid path") @@ -865,7 +833,7 @@ func (ipfs *Connector) SwarmPeers(ctx context.Context) ([]peer.ID, error) { swarm := make([]peer.ID, len(peersRaw.Peers)) for i, p := range peersRaw.Peers { - pID, err := peer.IDB58Decode(p.Peer) + pID, err := peer.Decode(p.Peer) if err != nil { logger.Error(err) return swarm, err diff --git a/ipfsconn/ipfshttp/ipfshttp_test.go b/ipfsconn/ipfshttp/ipfshttp_test.go index 13d2e9de..979e3407 100644 --- a/ipfsconn/ipfshttp/ipfshttp_test.go +++ b/ipfsconn/ipfshttp/ipfshttp_test.go @@ -63,7 +63,7 @@ func TestIPFSID(t *testing.T) { t.Error("expected no error") } mock.Close() - id, err = ipfs.ID(ctx) + _, err = ipfs.ID(ctx) if err == nil { t.Error("expected an error") } @@ -392,6 +392,9 @@ func TestConfigKey(t *testing.T) { } v, err = ipfs.ConfigKey("Datastore") + if err != nil { + t.Fatal(err) + } _, ok = v.(map[string]interface{}) if !ok { t.Error("should have returned the whole Datastore config object") diff --git a/monitor/metrics/checker.go b/monitor/metrics/checker.go index 2d0f086c..cee19c51 100644 --- a/monitor/metrics/checker.go +++ b/monitor/metrics/checker.go @@ -38,8 +38,6 @@ type Checker struct { metrics *Store threshold float64 - alertThreshold int - failedPeersMu sync.Mutex failedPeers map[peer.ID]map[string]int } @@ -93,20 +91,6 @@ func (mc *Checker) CheckAll() error { return nil } -func (mc *Checker) alertIfExpired(metric *api.Metric) error { - if !metric.Expired() { - return nil - } - - err := mc.alert(metric.Peer, metric.Name) - if err != nil { - return err - } - metric.Valid = false - mc.metrics.Add(metric) // invalidate so we don't alert again - return nil -} - func (mc *Checker) alert(pid peer.ID, metricName string) error { mc.failedPeersMu.Lock() defer mc.failedPeersMu.Unlock() diff --git a/monitor/metrics/prob_test.go b/monitor/metrics/prob_test.go index 38c45838..ee848bc3 100644 --- a/monitor/metrics/prob_test.go +++ b/monitor/metrics/prob_test.go @@ -308,7 +308,7 @@ func Benchmark_prob_meanStdDev(b *testing.B) { func makeRandSlice(size int) []float64 { r := rand.New(rand.NewSource(time.Now().UnixNano())) - s := make([]float64, size, size) + s := make([]float64, size) for i := 0; i < size-1; i++ { s[i] = float64(r.Int63n(25)) + r.Float64() diff --git a/monitor/metrics/window.go b/monitor/metrics/window.go index 19fb0ac3..f4f65210 100644 --- a/monitor/metrics/window.go +++ b/monitor/metrics/window.go @@ -9,12 +9,9 @@ import ( "sync" "time" - logging "github.com/ipfs/go-log/v2" "github.com/ipfs/ipfs-cluster/api" ) -var logger = logging.Logger("metricwin") - // DefaultWindowCap sets the amount of metrics to store per peer. var DefaultWindowCap = 25 diff --git a/monitor/metrics/window_test.go b/monitor/metrics/window_test.go index 0d9bdaca..088b13e2 100644 --- a/monitor/metrics/window_test.go +++ b/monitor/metrics/window_test.go @@ -47,7 +47,6 @@ func TestWindow_Race(t *testing.T) { i++ case <-done: return - default: } } }() @@ -64,7 +63,6 @@ func TestWindow_Race(t *testing.T) { // log <- fmt.Sprintf("latest: %v", l) case <-done: return - default: } } }() @@ -80,7 +78,6 @@ func TestWindow_Race(t *testing.T) { // log <- fmt.Sprintf("all: %v", w.All()) case <-done: return - default: } } }() @@ -95,23 +92,17 @@ func TestWindow_Race(t *testing.T) { log <- fmt.Sprintf("dist: %v", w.Distribution()) case <-done: return - default: } } }() go func() { <-start - for { - select { - case <-done: - for s := range log { - fmt.Println(s) - } - close(done) - return - } + <-done + for s := range log { + fmt.Println(s) } + close(done) }() close(start) diff --git a/monitor/pubsubmon/pubsubmon.go b/monitor/pubsubmon/pubsubmon.go index 0e5c3ad8..1e59bc2d 100644 --- a/monitor/pubsubmon/pubsubmon.go +++ b/monitor/pubsubmon/pubsubmon.go @@ -36,6 +36,7 @@ type Monitor struct { rpcReady chan struct{} pubsub *pubsub.PubSub + topic *pubsub.Topic subscription *pubsub.Subscription peers PeersFunc @@ -72,7 +73,12 @@ func New( mtrs := metrics.NewStore() checker := metrics.NewChecker(ctx, mtrs, cfg.FailureThreshold) - subscription, err := psub.Subscribe(PubsubTopic) + topic, err := psub.Join(PubsubTopic) + if err != nil { + cancel() + return nil, err + } + subscription, err := topic.Subscribe() if err != nil { cancel() return nil, err @@ -84,6 +90,7 @@ func New( rpcReady: make(chan struct{}, 1), pubsub: psub, + topic: topic, subscription: subscription, peers: peers, @@ -153,7 +160,7 @@ func (mon *Monitor) SetClient(c *rpc.Client) { // Shutdown stops the peer monitor. It particular, it will // not deliver any alerts. func (mon *Monitor) Shutdown(ctx context.Context) error { - ctx, span := trace.StartSpan(ctx, "monitor/pubsub/Shutdown") + _, span := trace.StartSpan(ctx, "monitor/pubsub/Shutdown") defer span.End() mon.shutdownLock.Lock() @@ -176,7 +183,7 @@ func (mon *Monitor) Shutdown(ctx context.Context) error { // LogMetric stores a metric so it can later be retrieved. func (mon *Monitor) LogMetric(ctx context.Context, m *api.Metric) error { - ctx, span := trace.StartSpan(ctx, "monitor/pubsub/LogMetric") + _, span := trace.StartSpan(ctx, "monitor/pubsub/LogMetric") defer span.End() mon.metrics.Add(m) @@ -209,7 +216,7 @@ func (mon *Monitor) PublishMetric(ctx context.Context, m *api.Metric) error { m.Expire, ) - err = mon.pubsub.Publish(PubsubTopic, b.Bytes()) + err = mon.topic.Publish(ctx, b.Bytes()) if err != nil { logger.Error(err) return err @@ -248,7 +255,7 @@ func (mon *Monitor) Alerts() <-chan *api.Alert { // MetricNames lists all metric names. func (mon *Monitor) MetricNames(ctx context.Context) []string { - ctx, span := trace.StartSpan(ctx, "monitor/pubsub/MetricNames") + _, span := trace.StartSpan(ctx, "monitor/pubsub/MetricNames") defer span.End() return mon.metrics.MetricNames() diff --git a/observations/metrics.go b/observations/metrics.go index e777fa9d..28ef241f 100644 --- a/observations/metrics.go +++ b/observations/metrics.go @@ -12,8 +12,8 @@ var logger = logging.Logger("observations") var ( // taken from ocgrpc (https://github.com/census-instrumentation/opencensus-go/blob/master/plugin/ocgrpc/stats_common.go) - latencyDistribution = view.Distribution(0, 0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, 800, 1000, 2000, 5000, 10000, 20000, 50000, 100000) - bytesDistribution = view.Distribution(0, 24, 32, 64, 128, 256, 512, 1024, 2048, 4096, 16384, 65536, 262144, 1048576) + // latencyDistribution = view.Distribution(0, 0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, 800, 1000, 2000, 5000, 10000, 20000, 50000, 100000) + // bytesDistribution = view.Distribution(0, 24, 32, 64, 128, 256, 512, 1024, 2048, 4096, 16384, 65536, 262144, 1048576) messageCountDistribution = view.Distribution(1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536) ) diff --git a/peer_manager_test.go b/peer_manager_test.go index 5db56102..521e754c 100644 --- a/peer_manager_test.go +++ b/peer_manager_test.go @@ -19,8 +19,8 @@ import ( ) func peerManagerClusters(t *testing.T) ([]*Cluster, []*test.IpfsMock, host.Host) { - cls := make([]*Cluster, nClusters, nClusters) - mocks := make([]*test.IpfsMock, nClusters, nClusters) + cls := make([]*Cluster, nClusters) + mocks := make([]*test.IpfsMock, nClusters) var wg sync.WaitGroup for i := 0; i < nClusters; i++ { wg.Add(1) @@ -67,7 +67,7 @@ func peerManagerClusters(t *testing.T) ([]*Cluster, []*test.IpfsMock, host.Host) func clusterAddr(c *Cluster) ma.Multiaddr { for _, a := range c.host.Addrs() { if _, err := a.ValueForProtocol(ma.P_IP4); err == nil { - p := peer.IDB58Encode(c.id) + p := peer.Encode(c.id) cAddr, _ := ma.NewMultiaddr(fmt.Sprintf("%s/p2p/%s", a, p)) return cAddr } @@ -196,7 +196,7 @@ func TestClustersPeerAddInUnhealthyCluster(t *testing.T) { t.Skip("need at least 3 nodes for this test") } - _, err := clusters[0].PeerAdd(ctx, clusters[1].id) + clusters[0].PeerAdd(ctx, clusters[1].id) ttlDelay() ids := clusters[1].Peers(ctx) // raft will have only 2 peers @@ -207,7 +207,7 @@ func TestClustersPeerAddInUnhealthyCluster(t *testing.T) { // Now we shutdown the one member of the running cluster // and try to add someone else. - err = clusters[1].Shutdown(ctx) + err := clusters[1].Shutdown(ctx) if err != nil { t.Error("Shutdown should be clean: ", err) } @@ -398,7 +398,6 @@ func TestClustersPeerRemoveReallocsPins(t *testing.T) { ctx := context.Background() clusters, mocks := createClusters(t) - defer shutdownClusters(t, clusters, mocks) if len(clusters) < 3 { t.Skip("test needs at least 3 clusters") @@ -428,6 +427,7 @@ func TestClustersPeerRemoveReallocsPins(t *testing.T) { } } if chosen == nil { + shutdownClusters(t, clusters, mocks) t.Fatal("did not get to choose a peer?") } @@ -438,6 +438,7 @@ func TestClustersPeerRemoveReallocsPins(t *testing.T) { mocks = append(mocks[:chosenIndex], mocks[chosenIndex+1:]...) defer chosen.Shutdown(ctx) defer chosenMock.Close() + defer shutdownClusters(t, clusters, mocks) prefix := test.Cid1.Prefix() diff --git a/pintracker/pintracker_test.go b/pintracker/pintracker_test.go index 8391da83..c919f907 100644 --- a/pintracker/pintracker_test.go +++ b/pintracker/pintracker_test.go @@ -504,7 +504,8 @@ func TestTrackUntrackWithCancel(t *testing.T) { go func() { err = tt.args.tracker.Untrack(context.Background(), tt.args.c) if err != nil { - t.Fatal(err) + t.Error() + return } }() var ctx context.Context diff --git a/pintracker/stateless/stateless.go b/pintracker/stateless/stateless.go index fffd4062..4c84d400 100644 --- a/pintracker/stateless/stateless.go +++ b/pintracker/stateless/stateless.go @@ -313,7 +313,7 @@ func (spt *Tracker) Status(ctx context.Context, c cid.Cid) *api.PinInfo { // check global state to see if cluster should even be caring about // the provided cid - gpin := &api.Pin{} + var gpin *api.Pin st, err := spt.getState(ctx) if err != nil { logger.Error(err) @@ -440,7 +440,7 @@ func (spt *Tracker) ipfsStatusAll(ctx context.Context) (map[string]*api.PinInfo, logger.Error(err) return nil, err } - pins := make(map[string]*api.PinInfo, 0) + pins := make(map[string]*api.PinInfo, len(ipsMap)) for cidstr, ips := range ipsMap { c, err := cid.Decode(cidstr) if err != nil { @@ -468,14 +468,13 @@ func (spt *Tracker) localStatus(ctx context.Context, incExtra bool) (map[string] defer span.End() // get shared state - statePins := []*api.Pin{} st, err := spt.getState(ctx) if err != nil { logger.Error(err) return nil, err } - statePins, err = st.List(ctx) + statePins, err := st.List(ctx) if err != nil { logger.Error(err) return nil, err @@ -527,9 +526,9 @@ func (spt *Tracker) localStatus(ctx context.Context, incExtra bool) (map[string] return pininfos, nil } -func (spt *Tracker) getErrorsAll(ctx context.Context) []*api.PinInfo { - return spt.optracker.Filter(ctx, optracker.PhaseError) -} +// func (spt *Tracker) getErrorsAll(ctx context.Context) []*api.PinInfo { +// return spt.optracker.Filter(ctx, optracker.PhaseError) +// } // OpContext exports the internal optracker's OpContext method. // For testing purposes only. diff --git a/pintracker/stateless/stateless_test.go b/pintracker/stateless/stateless_test.go index 4972f396..5b2d75c5 100644 --- a/pintracker/stateless/stateless_test.go +++ b/pintracker/stateless/stateless_test.go @@ -3,7 +3,6 @@ package stateless import ( "context" "errors" - "sort" "testing" "time" @@ -193,7 +192,8 @@ func TestTrackUntrackWithCancel(t *testing.T) { go func() { err = spt.Untrack(ctx, slowPinCid) if err != nil { - t.Fatal(err) + t.Error(err) + return } }() select { @@ -298,7 +298,8 @@ func TestUntrackTrackWithCancel(t *testing.T) { go func() { err = spt.Track(ctx, slowPin) if err != nil { - t.Fatal(err) + t.Error(err) + return } }() select { @@ -465,12 +466,6 @@ func TestStatus(t *testing.T) { } } -var sortPinInfoByCid = func(p []*api.PinInfo) { - sort.Slice(p, func(i, j int) bool { - return p[i].Cid.String() < p[j].Cid.String() - }) -} - func BenchmarkTracker_localStatus(b *testing.B) { tracker := testStatelessPinTracker(b) ctx := context.Background() diff --git a/pstoremgr/pstoremgr.go b/pstoremgr/pstoremgr.go index 9b54cdc7..39ed3a9c 100644 --- a/pstoremgr/pstoremgr.go +++ b/pstoremgr/pstoremgr.go @@ -66,7 +66,7 @@ func (pm *Manager) ImportPeer(addr ma.Multiaddr, connect bool, ttl time.Duration } protos := addr.Protocols() - if len(protos) > 0 && protos[0].Code == madns.DnsaddrProtocol.Code { + if len(protos) > 0 && protos[0].Code == ma.P_DNSADDR { // We need to pre-resolve this logger.Debugf("resolving %s", addr) ctx, cancel := context.WithTimeout(pm.ctx, DNSTimeout) diff --git a/pstoremgr/pstoremgr_test.go b/pstoremgr/pstoremgr_test.go index a6517e87..7c8a4bf9 100644 --- a/pstoremgr/pstoremgr_test.go +++ b/pstoremgr/pstoremgr_test.go @@ -28,7 +28,7 @@ func clean(pm *Manager) { } func testAddr(loc string, pid peer.ID) ma.Multiaddr { - m, _ := ma.NewMultiaddr(loc + "/p2p/" + peer.IDB58Encode(pid)) + m, _ := ma.NewMultiaddr(loc + "/p2p/" + peer.Encode(pid)) return m } diff --git a/rpc_api.go b/rpc_api.go index dfaeb5f0..9223f393 100644 --- a/rpc_api.go +++ b/rpc_api.go @@ -352,7 +352,7 @@ func (rpcapi *ClusterRPCAPI) BlockAllocate(ctx context.Context, in *api.Pin, out // Returned metrics are Valid and belong to current // Cluster peers. metrics := rpcapi.c.monitor.LatestMetrics(ctx, pingMetricName) - peers := make([]peer.ID, len(metrics), len(metrics)) + peers := make([]peer.ID, len(metrics)) for i, m := range metrics { peers[i] = m.Peer } diff --git a/rpcutil/rpcutil.go b/rpcutil/rpcutil.go index 361fc575..66d25551 100644 --- a/rpcutil/rpcutil.go +++ b/rpcutil/rpcutil.go @@ -21,8 +21,8 @@ func CtxsWithTimeout( timeout time.Duration, ) ([]context.Context, []context.CancelFunc) { - ctxs := make([]context.Context, n, n) - cancels := make([]context.CancelFunc, n, n) + ctxs := make([]context.Context, n) + cancels := make([]context.CancelFunc, n) for i := 0; i < n; i++ { ctx, cancel := context.WithTimeout(parent, timeout) ctxs[i] = ctx @@ -37,8 +37,8 @@ func CtxsWithCancel( n int, ) ([]context.Context, []context.CancelFunc) { - ctxs := make([]context.Context, n, n) - cancels := make([]context.CancelFunc, n, n) + ctxs := make([]context.Context, n) + cancels := make([]context.CancelFunc, n) for i := 0; i < n; i++ { ctx, cancel := context.WithCancel(parent) ctxs[i] = ctx @@ -61,7 +61,7 @@ func MultiCancel(cancels []context.CancelFunc) { // slice using pointers to each elements of the original slice. // Useful to handle gorpc.MultiCall() replies. func CopyPIDsToIfaces(in []peer.ID) []interface{} { - ifaces := make([]interface{}, len(in), len(in)) + ifaces := make([]interface{}, len(in)) for i := range in { ifaces[i] = &in[i] } @@ -72,7 +72,7 @@ func CopyPIDsToIfaces(in []peer.ID) []interface{} { // slice using pointers to each elements of the original slice. // Useful to handle gorpc.MultiCall() replies. func CopyIDsToIfaces(in []*api.ID) []interface{} { - ifaces := make([]interface{}, len(in), len(in)) + ifaces := make([]interface{}, len(in)) for i := range in { in[i] = &api.ID{} ifaces[i] = in[i] @@ -84,7 +84,7 @@ func CopyIDsToIfaces(in []*api.ID) []interface{} { // to an empty interface slice using pointers to each elements of the // original slice. Useful to handle gorpc.MultiCall() replies. func CopyIDSliceToIfaces(in [][]*api.ID) []interface{} { - ifaces := make([]interface{}, len(in), len(in)) + ifaces := make([]interface{}, len(in)) for i := range in { ifaces[i] = &in[i] } @@ -95,7 +95,7 @@ func CopyIDSliceToIfaces(in [][]*api.ID) []interface{} { // an empty interface slice using pointers to each elements of // the original slice. Useful to handle gorpc.MultiCall() replies. func CopyPinInfoToIfaces(in []*api.PinInfo) []interface{} { - ifaces := make([]interface{}, len(in), len(in)) + ifaces := make([]interface{}, len(in)) for i := range in { in[i] = &api.PinInfo{} ifaces[i] = in[i] @@ -107,7 +107,7 @@ func CopyPinInfoToIfaces(in []*api.PinInfo) []interface{} { // to an empty interface slice using pointers to each elements of the original // slice. Useful to handle gorpc.MultiCall() replies. func CopyPinInfoSliceToIfaces(in [][]*api.PinInfo) []interface{} { - ifaces := make([]interface{}, len(in), len(in)) + ifaces := make([]interface{}, len(in)) for i := range in { ifaces[i] = &in[i] } @@ -118,7 +118,7 @@ func CopyPinInfoSliceToIfaces(in [][]*api.PinInfo) []interface{} { // an empty interface slice using pointers to each elements of // the original slice. Useful to handle gorpc.MultiCall() replies. func CopyRepoGCSliceToIfaces(in []*api.RepoGC) []interface{} { - ifaces := make([]interface{}, len(in), len(in)) + ifaces := make([]interface{}, len(in)) for i := range in { in[i] = &api.RepoGC{} ifaces[i] = in[i] @@ -130,7 +130,7 @@ func CopyRepoGCSliceToIfaces(in []*api.RepoGC) []interface{} { // slice using pointers to each elements of the original slice. // Useful to handle gorpc.MultiCall() replies. func CopyEmptyStructToIfaces(in []struct{}) []interface{} { - ifaces := make([]interface{}, len(in), len(in)) + ifaces := make([]interface{}, len(in)) for i := range in { ifaces[i] = &in[i] } @@ -141,7 +141,7 @@ func CopyEmptyStructToIfaces(in []struct{}) []interface{} { // slice of then given length. Useful for RPC methods which have no response // types (so they use empty structs). func RPCDiscardReplies(n int) []interface{} { - replies := make([]struct{}, n, n) + replies := make([]struct{}, n) return CopyEmptyStructToIfaces(replies) } diff --git a/state/dsstate/datastore.go b/state/dsstate/datastore.go index 88fe6ff1..7c25e2b1 100644 --- a/state/dsstate/datastore.go +++ b/state/dsstate/datastore.go @@ -33,7 +33,7 @@ type State struct { dsWrite ds.Write codecHandle codec.Handle namespace ds.Key - version int + // version int } // DefaultHandle returns the codec handler of choice (Msgpack). @@ -166,8 +166,6 @@ func (st *State) List(ctx context.Context) ([]*api.Pin, error) { // Migrate migrates an older state version to the current one. // This is a no-op for now. func (st *State) Migrate(ctx context.Context, r io.Reader) error { - ctx, span := trace.StartSpan(ctx, "state/map/Migrate") - defer span.End() return nil } @@ -316,7 +314,7 @@ func NewBatching(dstore ds.Batching, namespace string, handle codec.Handle) (*Ba // Commit persists the batched write operations. func (bst *BatchingState) Commit(ctx context.Context) error { - ctx, span := trace.StartSpan(ctx, "state/dsstate/Commit") + _, span := trace.StartSpan(ctx, "state/dsstate/Commit") defer span.End() return bst.batch.Commit() } diff --git a/state/dsstate/datastore_test.go b/state/dsstate/datastore_test.go index 9f226c74..3127e115 100644 --- a/state/dsstate/datastore_test.go +++ b/state/dsstate/datastore_test.go @@ -13,7 +13,7 @@ import ( ) var testCid1, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmq") -var testPeerID1, _ = peer.IDB58Decode("QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLewuabc") +var testPeerID1, _ = peer.Decode("QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLewuabc") var c = &api.Pin{ Cid: testCid1, diff --git a/test/cids.go b/test/cids.go index 3c577b85..4d74c2a2 100644 --- a/test/cids.go +++ b/test/cids.go @@ -23,12 +23,12 @@ var ( // NotFoundCid is meant to be used as a CID that doesn't exist in the // pinset. NotFoundCid, _ = cid.Decode("bafyreiay3jpjk74dkckv2r74eyvf3lfnxujefay2rtuluintasq2zlapv4") - PeerID1, _ = peer.IDB58Decode("QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLewuabc") - PeerID2, _ = peer.IDB58Decode("QmUZ13osndQ5uL4tPWHXe3iBgBgq9gfewcBMSCAuMBsDJ6") - PeerID3, _ = peer.IDB58Decode("QmPGDFvBkgWhvzEK9qaTWrWurSwqXNmhnK3hgELPdZZNPa") - PeerID4, _ = peer.IDB58Decode("QmZ8naDy5mEz4GLuQwjWt9MPYqHTBbsm8tQBrNSjiq6zBc") - PeerID5, _ = peer.IDB58Decode("QmZVAo3wd8s5eTTy2kPYs34J9PvfxpKPuYsePPYGjgRRjg") - PeerID6, _ = peer.IDB58Decode("QmR8Vu6kZk7JvAN2rWVWgiduHatgBq2bb15Yyq8RRhYSbx") + PeerID1, _ = peer.Decode("QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLewuabc") + PeerID2, _ = peer.Decode("QmUZ13osndQ5uL4tPWHXe3iBgBgq9gfewcBMSCAuMBsDJ6") + PeerID3, _ = peer.Decode("QmPGDFvBkgWhvzEK9qaTWrWurSwqXNmhnK3hgELPdZZNPa") + PeerID4, _ = peer.Decode("QmZ8naDy5mEz4GLuQwjWt9MPYqHTBbsm8tQBrNSjiq6zBc") + PeerID5, _ = peer.Decode("QmZVAo3wd8s5eTTy2kPYs34J9PvfxpKPuYsePPYGjgRRjg") + PeerID6, _ = peer.Decode("QmR8Vu6kZk7JvAN2rWVWgiduHatgBq2bb15Yyq8RRhYSbx") PeerName1 = "TestPeer1" PeerName2 = "TestPeer2" diff --git a/test/ipfs_mock.go b/test/ipfs_mock.go index e6646a5c..5428b1f6 100644 --- a/test/ipfs_mock.go +++ b/test/ipfs_mock.go @@ -82,12 +82,6 @@ type mockConfigResp struct { } } -type mockAddResp struct { - Name string - Hash string - Bytes uint64 -} - type mockRefsResp struct { Ref string Err string diff --git a/test/rpc_api_mock.go b/test/rpc_api_mock.go index dcbb2dc4..51581e5e 100644 --- a/test/rpc_api_mock.go +++ b/test/rpc_api_mock.go @@ -201,26 +201,26 @@ func (mock *mockCluster) ConnectGraph(ctx context.Context, in struct{}, out *api *out = api.ConnectGraph{ ClusterID: PeerID1, IPFSLinks: map[string][]peer.ID{ - peer.IDB58Encode(PeerID4): []peer.ID{PeerID5, PeerID6}, - peer.IDB58Encode(PeerID5): []peer.ID{PeerID4, PeerID6}, - peer.IDB58Encode(PeerID6): []peer.ID{PeerID4, PeerID5}, + peer.Encode(PeerID4): []peer.ID{PeerID5, PeerID6}, + peer.Encode(PeerID5): []peer.ID{PeerID4, PeerID6}, + peer.Encode(PeerID6): []peer.ID{PeerID4, PeerID5}, }, ClusterLinks: map[string][]peer.ID{ - peer.IDB58Encode(PeerID1): []peer.ID{PeerID2, PeerID3}, - peer.IDB58Encode(PeerID2): []peer.ID{PeerID1, PeerID3}, - peer.IDB58Encode(PeerID3): []peer.ID{PeerID1, PeerID2}, + peer.Encode(PeerID1): []peer.ID{PeerID2, PeerID3}, + peer.Encode(PeerID2): []peer.ID{PeerID1, PeerID3}, + peer.Encode(PeerID3): []peer.ID{PeerID1, PeerID2}, }, ClustertoIPFS: map[string]peer.ID{ - peer.IDB58Encode(PeerID1): PeerID4, - peer.IDB58Encode(PeerID2): PeerID5, - peer.IDB58Encode(PeerID3): PeerID6, + peer.Encode(PeerID1): PeerID4, + peer.Encode(PeerID2): PeerID5, + peer.Encode(PeerID3): PeerID6, }, } return nil } func (mock *mockCluster) StatusAll(ctx context.Context, in struct{}, out *[]*api.GlobalPinInfo) error { - pid := peer.IDB58Encode(PeerID1) + pid := peer.Encode(PeerID1) *out = []*api.GlobalPinInfo{ { Cid: Cid1, @@ -270,7 +270,7 @@ func (mock *mockCluster) Status(ctx context.Context, in cid.Cid, out *api.Global *out = api.GlobalPinInfo{ Cid: in, PeerMap: map[string]*api.PinInfo{ - peer.IDB58Encode(PeerID1): { + peer.Encode(PeerID1): { Cid: in, Peer: PeerID1, Status: api.TrackerStatusPinned, @@ -314,7 +314,7 @@ func (mock *mockCluster) RepoGC(ctx context.Context, in struct{}, out *api.Globa _ = mock.RepoGCLocal(ctx, struct{}{}, localrepoGC) *out = api.GlobalRepoGC{ PeerMap: map[string]*api.RepoGC{ - peer.IDB58Encode(PeerID1): localrepoGC, + peer.Encode(PeerID1): localrepoGC, }, } return nil @@ -392,7 +392,7 @@ func (mock *mockPinTracker) Status(ctx context.Context, in cid.Cid, out *api.Pin } func (mock *mockPinTracker) RecoverAll(ctx context.Context, in struct{}, out *[]*api.PinInfo) error { - *out = make([]*api.PinInfo, 0, 0) + *out = make([]*api.PinInfo, 0) return nil } diff --git a/util.go b/util.go index 92d3c384..5b23779e 100644 --- a/util.go +++ b/util.go @@ -67,15 +67,6 @@ func containsPeer(list []peer.ID, peer peer.ID) bool { return false } -func containsCid(list []cid.Cid, ci cid.Cid) bool { - for _, c := range list { - if c.String() == ci.String() { - return true - } - } - return false -} - func minInt(x, y int) int { if x < y { return x diff --git a/version/version.go b/version/version.go index a0242305..f1cd9d93 100644 --- a/version/version.go +++ b/version/version.go @@ -4,7 +4,7 @@ import ( "fmt" semver "github.com/blang/semver" - protocol "github.com/libp2p/go-libp2p-protocol" + protocol "github.com/libp2p/go-libp2p-core/protocol" ) // Version is the current cluster version. Version alignment between