diff --git a/adder/adder.go b/adder/adder.go index 873d5917..b1949d4e 100644 --- a/adder/adder.go +++ b/adder/adder.go @@ -94,7 +94,7 @@ func (a *Adder) FromMultipart(ctx context.Context, r *multipart.Reader) (cid.Cid // FromFiles adds content from a files.Directory. The adder will no longer // be usable after calling this method. func (a *Adder) FromFiles(ctx context.Context, f files.Directory) (cid.Cid, error) { - logger.Error("adding from files") + logger.Debug("adding from files") a.setContext(ctx) if a.ctx.Err() != nil { // don't allow running twice diff --git a/api/ipfsproxy/ipfsproxy.go b/api/ipfsproxy/ipfsproxy.go index 46aa3047..14e34a0b 100644 --- a/api/ipfsproxy/ipfsproxy.go +++ b/api/ipfsproxy/ipfsproxy.go @@ -300,15 +300,11 @@ func (proxy *Server) pinOpHandler(op string, w http.ResponseWriter, r *http.Requ return } - var rpcArg interface{} = api.PinCid(c) - if op == "Unpin" { - rpcArg = c - } err = proxy.rpcClient.Call( "", "Cluster", op, - rpcArg, + api.PinCid(c), &struct{}{}, ) if err != nil { @@ -485,6 +481,7 @@ func (proxy *Server) repoStatHandler(w http.ResponseWriter, r *http.Request) { repoStats := make([]*api.IPFSRepoStat, len(peers), len(peers)) repoStatsIfaces := make([]interface{}, len(repoStats), len(repoStats)) for i := range repoStats { + repoStats[i] = &api.IPFSRepoStat{} repoStatsIfaces[i] = repoStats[i] } diff --git a/api/rest/restapi.go b/api/rest/restapi.go index eac3964e..882eabf2 100644 --- a/api/rest/restapi.go +++ b/api/rest/restapi.go @@ -697,7 +697,7 @@ func (api *API) unpinHandler(w http.ResponseWriter, r *http.Request) { "", "Cluster", "Unpin", - pin.Cid, + pin, &struct{}{}, ) api.sendResponse(w, http.StatusAccepted, err, nil) diff --git a/api/rest/restapi_test.go b/api/rest/restapi_test.go index dd576af9..772ad2d1 100644 --- a/api/rest/restapi_test.go +++ b/api/rest/restapi_test.go @@ -781,12 +781,6 @@ func TestAPIStatusAllEndpoint(t *testing.T) { resp[0].Cid.String() != test.TestCid1 || resp[1].PeerMap[peer.IDB58Encode(test.TestPeerID1)].Status.String() != "pinning" { t.Errorf("unexpected statusAll resp") - for _, gpi := range resp { - t.Errorf("%s:\n", gpi.Cid) - for k, v := range gpi.PeerMap { - t.Errorf("%s: %+v\n", k, v) - } - } } // Test local=true diff --git a/api/types.go b/api/types.go index 43333441..97c96510 100644 --- a/api/types.go +++ b/api/types.go @@ -241,7 +241,7 @@ var ipfsPinStatus2TrackerStatusMap = map[IPFSPinStatus]TrackerStatus{ // GlobalPinInfo contains cluster-wide status information about a tracked Cid, // indexed by cluster peer. type GlobalPinInfo struct { - Cid cid.Cid `json:"cid" codec:"c,omitempty"` + Cid cid.Cid `json:"cid" codec:"c"` // https://github.com/golang/go/issues/28827 // Peer IDs are of string Kind(). We can't use peer IDs here // as Go ignores TextMarshaler. @@ -250,7 +250,7 @@ type GlobalPinInfo struct { // PinInfo holds information about local pins. type PinInfo struct { - Cid cid.Cid `json:"cid" codec:"c,omitempty"` + Cid cid.Cid `json:"cid" codec:"c"` Peer peer.ID `json:"peer" codec:"p,omitempty"` PeerName string `json:"peername" codec:"pn,omitempty"` Status TrackerStatus `json:"status" codec:"st,omitempty"` @@ -260,21 +260,21 @@ type PinInfo struct { // Version holds version information type Version struct { - Version string `json:"Version" codec:"v,omitempty"` + Version string `json:"Version" codec:"v"` } -// ConnectGraph holds information about the connectivity of the cluster -// To read, traverse the keys of ClusterLinks. Each such id is one of -// the peers of the "ClusterID" peer running the query. ClusterLinks[id] -// in turn lists the ids that peer "id" sees itself connected to. It is -// possible that id is a peer of ClusterID, but ClusterID can not reach id -// over rpc, in which case ClusterLinks[id] == [], as id's view of its -// connectivity can not be retrieved. +// ConnectGraph holds information about the connectivity of the cluster To +// read, traverse the keys of ClusterLinks. Each such id is one of the peers +// of the "ClusterID" peer running the query. ClusterLinks[id] in turn lists +// the ids that peer "id" sees itself connected to. It is possible that id is +// a peer of ClusterID, but ClusterID can not reach id over rpc, in which case +// ClusterLinks[id] == [], as id's view of its connectivity can not be +// retrieved. // -// Iff there was an error reading the IPFSID of the peer then id will not be a -// key of ClustertoIPFS or IPFSLinks. Finally iff id is a key of ClustertoIPFS -// then id will be a key of IPFSLinks. In the event of a SwarmPeers error -// IPFSLinks[id] == []. +// Iff there was an error reading the IPFSID of the peer then id will not be a +// key of ClustertoIPFS or IPFSLinks. Finally iff id is a key of ClustertoIPFS +// then id will be a key of IPFSLinks. In the event of a SwarmPeers error +// IPFSLinks[id] == []. type ConnectGraph struct { ClusterID peer.ID // ipfs to ipfs links @@ -285,32 +285,43 @@ type ConnectGraph struct { ClustertoIPFS map[string]peer.ID `json:"cluster_to_ipfs" codec:"ci,omitempty"` } -// Multiaddr is a utility type wrapping a Multiaddress +// Multiaddr is a concrete type to wrap a Multiaddress so that it knows how to +// serialize and deserialize itself. type Multiaddr struct { multiaddr.Multiaddr } +// NewMultiaddr returns a cluster Multiaddr wrapper creating the +// multiaddr.Multiaddr with the given string. func NewMultiaddr(mstr string) (Multiaddr, error) { m, err := multiaddr.NewMultiaddr(mstr) return Multiaddr{Multiaddr: m}, err } +// NewMultiaddrWithValue returns a new cluster Multiaddr wrapper using the +// given multiaddr.Multiaddr. func NewMultiaddrWithValue(ma multiaddr.Multiaddr) Multiaddr { return Multiaddr{Multiaddr: ma} } +// MarshalJSON returns a JSON-formatted multiaddress. func (maddr Multiaddr) MarshalJSON() ([]byte, error) { return maddr.Multiaddr.MarshalJSON() } +// UnmarshalJSON parses a cluster Multiaddr from the JSON representation. func (maddr *Multiaddr) UnmarshalJSON(data []byte) error { maddr.Multiaddr, _ = multiaddr.NewMultiaddr("") return maddr.Multiaddr.UnmarshalJSON(data) } +// MarshalBinary returs the bytes of the wrapped multiaddress. func (maddr Multiaddr) MarshalBinary() ([]byte, error) { return maddr.Multiaddr.MarshalBinary() } + +// UnmarshalBinary casts some bytes as a multiaddress wraps it with +// the given cluster Multiaddr. func (maddr *Multiaddr) UnmarshalBinary(data []byte) error { datacopy := make([]byte, len(data)) // This is super important copy(datacopy, data) @@ -318,6 +329,7 @@ func (maddr *Multiaddr) UnmarshalBinary(data []byte) error { return maddr.Multiaddr.UnmarshalBinary(datacopy) } +// Value returns the wrapped multiaddr.Multiaddr. func (maddr Multiaddr) Value() multiaddr.Multiaddr { return maddr.Multiaddr } @@ -701,7 +713,13 @@ func (pin *Pin) Equals(pin2 *Pin) bool { return false } - if pin.Reference != pin2.Reference { + if pin.Reference != nil && pin2.Reference == nil || + pin.Reference == nil && pin2.Reference != nil { + return false + } + + if pin.Reference != nil && pin2.Reference != nil && + !pin.Reference.Equals(*pin2.Reference) { return false } diff --git a/cluster.go b/cluster.go index 4c31ba86..50dced01 100644 --- a/cluster.go +++ b/cluster.go @@ -1378,7 +1378,7 @@ func (c *Cluster) globalPinInfoSlice(ctx context.Context, method string) ([]*api members, err := c.consensus.Peers(ctx) if err != nil { logger.Error(err) - return []*api.GlobalPinInfo{}, err + return nil, err } lenMembers := len(members) @@ -1398,6 +1398,9 @@ func (c *Cluster) globalPinInfoSlice(ctx context.Context, method string) ([]*api mergePins := func(pins []*api.PinInfo) { for _, p := range pins { + if p == nil { + continue + } item, ok := fullMap[p.Cid] if !ok { fullMap[p.Cid] = &api.GlobalPinInfo{ diff --git a/consensus/raft/consensus_test.go b/consensus/raft/consensus_test.go index ec6c770e..4b7c2464 100644 --- a/consensus/raft/consensus_test.go +++ b/consensus/raft/consensus_test.go @@ -95,7 +95,7 @@ func TestConsensusPin(t *testing.T) { t.Error("the operation did not make it to the log:", err) } - time.Sleep(5000 * time.Millisecond) + time.Sleep(250 * time.Millisecond) st, err := cc.State(ctx) if err != nil { t.Fatal("error getting state:", err) diff --git a/consensus/raft/log_op.go b/consensus/raft/log_op.go index 4129237f..bc31edf2 100644 --- a/consensus/raft/log_op.go +++ b/consensus/raft/log_op.go @@ -26,10 +26,10 @@ type LogOpType int // It implements the consensus.Op interface and it is used by the // Consensus component. type LogOp struct { - SpanCtx trace.SpanContext `codec:"sctx,omitempty"` - TagCtx []byte `codec:"tctx,omitempty"` - Cid *api.Pin `codec:"p,omitempty"` - Type LogOpType `codec:"t,omitempty"` + SpanCtx trace.SpanContext `codec:"s,omitempty"` + TagCtx []byte `codec:"t,omitempty"` + Cid *api.Pin `codec:"c,omitempty"` + Type LogOpType `codec:"p,omitempty"` consensus *Consensus `codec:-` tracing bool `codec:-` } @@ -91,7 +91,7 @@ func (op *LogOp) ApplyTo(cstate consensus.State) (consensus.State, error) { "", "Cluster", "Untrack", - pin.Cid, + pin, &struct{}{}, nil, ) diff --git a/rpc_api.go b/rpc_api.go index 605d7099..20f596d4 100644 --- a/rpc_api.go +++ b/rpc_api.go @@ -9,8 +9,6 @@ import ( cid "github.com/ipfs/go-cid" - multiaddr "github.com/multiformats/go-multiaddr" - "github.com/ipfs/ipfs-cluster/api" ) @@ -42,22 +40,28 @@ func (rpcapi *RPCAPI) Pin(ctx context.Context, in *api.Pin, out *struct{}) error } // Unpin runs Cluster.Unpin(). -func (rpcapi *RPCAPI) Unpin(ctx context.Context, in cid.Cid, out *struct{}) error { - return rpcapi.c.Unpin(ctx, in) +func (rpcapi *RPCAPI) Unpin(ctx context.Context, in *api.Pin, out *struct{}) error { + return rpcapi.c.Unpin(ctx, in.Cid) } // PinPath resolves path into a cid and runs Cluster.Pin(). func (rpcapi *RPCAPI) PinPath(ctx context.Context, in *api.PinPath, out *api.Pin) error { pin, err := rpcapi.c.PinPath(ctx, in) + if err != nil { + return err + } *out = *pin - return err + return nil } // UnpinPath resolves path into a cid and runs Cluster.Unpin(). func (rpcapi *RPCAPI) UnpinPath(ctx context.Context, in string, out *api.Pin) error { pin, err := rpcapi.c.UnpinPath(ctx, in) + if err != nil { + return err + } *out = *pin - return err + return nil } // Pins runs Cluster.Pins(). @@ -70,8 +74,11 @@ func (rpcapi *RPCAPI) Pins(ctx context.Context, in struct{}, out *[]*api.Pin) er // PinGet runs Cluster.PinGet(). func (rpcapi *RPCAPI) PinGet(ctx context.Context, in cid.Cid, out *api.Pin) error { pin, err := rpcapi.c.PinGet(ctx, in) + if err != nil { + return err + } *out = *pin - return err + return nil } // Version runs Cluster.Version(). @@ -91,15 +98,21 @@ func (rpcapi *RPCAPI) Peers(ctx context.Context, in struct{}, out *[]*api.ID) er // PeerAdd runs Cluster.PeerAdd(). func (rpcapi *RPCAPI) PeerAdd(ctx context.Context, in peer.ID, out *api.ID) error { id, err := rpcapi.c.PeerAdd(ctx, in) + if err != nil { + return err + } *out = *id - return err + return nil } // ConnectGraph runs Cluster.GetConnectGraph(). func (rpcapi *RPCAPI) ConnectGraph(ctx context.Context, in struct{}, out *api.ConnectGraph) error { graph, err := rpcapi.c.ConnectGraph() + if err != nil { + return err + } *out = graph - return err + return nil } // PeerRemove runs Cluster.PeerRm(). @@ -108,16 +121,18 @@ func (rpcapi *RPCAPI) PeerRemove(ctx context.Context, in peer.ID, out *struct{}) } // Join runs Cluster.Join(). -func (rpcapi *RPCAPI) Join(ctx context.Context, in multiaddr.Multiaddr, out *struct{}) error { - err := rpcapi.c.Join(ctx, in) - return err +func (rpcapi *RPCAPI) Join(ctx context.Context, in api.Multiaddr, out *struct{}) error { + return rpcapi.c.Join(ctx, in.Value()) } // StatusAll runs Cluster.StatusAll(). func (rpcapi *RPCAPI) StatusAll(ctx context.Context, in struct{}, out *[]*api.GlobalPinInfo) error { pinfos, err := rpcapi.c.StatusAll(ctx) + if err != nil { + return err + } *out = pinfos - return err + return nil } // StatusAllLocal runs Cluster.StatusAllLocal(). @@ -130,8 +145,11 @@ func (rpcapi *RPCAPI) StatusAllLocal(ctx context.Context, in struct{}, out *[]*a // Status runs Cluster.Status(). func (rpcapi *RPCAPI) Status(ctx context.Context, in cid.Cid, out *api.GlobalPinInfo) error { pinfo, err := rpcapi.c.Status(ctx, in) + if err != nil { + return err + } *out = *pinfo - return err + return nil } // StatusLocal runs Cluster.StatusLocal(). @@ -144,50 +162,71 @@ func (rpcapi *RPCAPI) StatusLocal(ctx context.Context, in cid.Cid, out *api.PinI // SyncAll runs Cluster.SyncAll(). func (rpcapi *RPCAPI) SyncAll(ctx context.Context, in struct{}, out *[]*api.GlobalPinInfo) error { pinfos, err := rpcapi.c.SyncAll(ctx) + if err != nil { + return err + } *out = pinfos - return err + return nil } // SyncAllLocal runs Cluster.SyncAllLocal(). func (rpcapi *RPCAPI) SyncAllLocal(ctx context.Context, in struct{}, out *[]*api.PinInfo) error { pinfos, err := rpcapi.c.SyncAllLocal(ctx) + if err != nil { + return err + } *out = pinfos - return err + return nil } // Sync runs Cluster.Sync(). func (rpcapi *RPCAPI) Sync(ctx context.Context, in cid.Cid, out *api.GlobalPinInfo) error { pinfo, err := rpcapi.c.Sync(ctx, in) + if err != nil { + return err + } *out = *pinfo - return err + return nil } // SyncLocal runs Cluster.SyncLocal(). func (rpcapi *RPCAPI) SyncLocal(ctx context.Context, in cid.Cid, out *api.PinInfo) error { pinfo, err := rpcapi.c.SyncLocal(ctx, in) + if err != nil { + return err + } *out = *pinfo - return err + return nil } // RecoverAllLocal runs Cluster.RecoverAllLocal(). func (rpcapi *RPCAPI) RecoverAllLocal(ctx context.Context, in struct{}, out *[]*api.PinInfo) error { pinfos, err := rpcapi.c.RecoverAllLocal(ctx) + if err != nil { + return err + } *out = pinfos - return err + return nil } // Recover runs Cluster.Recover(). func (rpcapi *RPCAPI) Recover(ctx context.Context, in cid.Cid, out *api.GlobalPinInfo) error { pinfo, err := rpcapi.c.Recover(ctx, in) + if err != nil { + return err + } *out = *pinfo - return err + return nil } // RecoverLocal runs Cluster.RecoverLocal(). func (rpcapi *RPCAPI) RecoverLocal(ctx context.Context, in cid.Cid, out *api.PinInfo) error { pinfo, err := rpcapi.c.RecoverLocal(ctx, in) + if err != nil { + return err + } *out = *pinfo - return err + return nil } // BlockAllocate returns allocations for blocks. This is used in the adders. @@ -232,8 +271,11 @@ func (rpcapi *RPCAPI) BlockAllocate(ctx context.Context, in *api.Pin, out *[]pee // SendInformerMetric runs Cluster.sendInformerMetric(). func (rpcapi *RPCAPI) SendInformerMetric(ctx context.Context, in struct{}, out *api.Metric) error { m, err := rpcapi.c.sendInformerMetric(ctx) + if err != nil { + return err + } *out = *m - return err + return nil } /* @@ -248,10 +290,10 @@ func (rpcapi *RPCAPI) Track(ctx context.Context, in *api.Pin, out *struct{}) err } // Untrack runs PinTracker.Untrack(). -func (rpcapi *RPCAPI) Untrack(ctx context.Context, in cid.Cid, out *struct{}) error { +func (rpcapi *RPCAPI) Untrack(ctx context.Context, in *api.Pin, out *struct{}) error { ctx, span := trace.StartSpan(ctx, "rpc/tracker/Untrack") defer span.End() - return rpcapi.c.tracker.Untrack(ctx, in) + return rpcapi.c.tracker.Untrack(ctx, in.Cid) } // TrackerStatusAll runs PinTracker.StatusAll(). @@ -276,8 +318,11 @@ func (rpcapi *RPCAPI) TrackerRecoverAll(ctx context.Context, in struct{}, out *[ ctx, span := trace.StartSpan(ctx, "rpc/tracker/RecoverAll") defer span.End() pinfos, err := rpcapi.c.tracker.RecoverAll(ctx) + if err != nil { + return err + } *out = pinfos - return err + return nil } // TrackerRecover runs PinTracker.Recover(). @@ -308,15 +353,21 @@ func (rpcapi *RPCAPI) IPFSUnpin(ctx context.Context, in cid.Cid, out *struct{}) // IPFSPinLsCid runs IPFSConnector.PinLsCid(). func (rpcapi *RPCAPI) IPFSPinLsCid(ctx context.Context, in cid.Cid, out *api.IPFSPinStatus) error { b, err := rpcapi.c.ipfs.PinLsCid(ctx, in) + if err != nil { + return err + } *out = b - return err + return nil } // IPFSPinLs runs IPFSConnector.PinLs(). func (rpcapi *RPCAPI) IPFSPinLs(ctx context.Context, in string, out *map[string]api.IPFSPinStatus) error { m, err := rpcapi.c.ipfs.PinLs(ctx, in) + if err != nil { + return err + } *out = m - return err + return nil } // IPFSConnectSwarms runs IPFSConnector.ConnectSwarms(). @@ -328,8 +379,11 @@ func (rpcapi *RPCAPI) IPFSConnectSwarms(ctx context.Context, in struct{}, out *s // IPFSConfigKey runs IPFSConnector.ConfigKey(). func (rpcapi *RPCAPI) IPFSConfigKey(ctx context.Context, in string, out *interface{}) error { res, err := rpcapi.c.ipfs.ConfigKey(in) + if err != nil { + return err + } *out = res - return err + return nil } // IPFSRepoStat runs IPFSConnector.RepoStat(). @@ -415,8 +469,7 @@ func (rpcapi *RPCAPI) ConsensusPeers(ctx context.Context, in struct{}, out *[]pe // PeerMonitorLogMetric runs PeerMonitor.LogMetric(). func (rpcapi *RPCAPI) PeerMonitorLogMetric(ctx context.Context, in *api.Metric, out *struct{}) error { - rpcapi.c.monitor.LogMetric(ctx, in) - return nil + return rpcapi.c.monitor.LogMetric(ctx, in) } // PeerMonitorLatestMetrics runs PeerMonitor.LatestMetrics(). diff --git a/test/rpc_api_mock.go b/test/rpc_api_mock.go index c02db207..8366cf4a 100644 --- a/test/rpc_api_mock.go +++ b/test/rpc_api_mock.go @@ -287,7 +287,7 @@ func (mock *mockService) Track(ctx context.Context, in *api.Pin, out *struct{}) return nil } -func (mock *mockService) Untrack(ctx context.Context, in cid.Cid, out *struct{}) error { +func (mock *mockService) Untrack(ctx context.Context, in *api.Pin, out *struct{}) error { return nil } @@ -368,7 +368,7 @@ func (mock *mockService) IPFSPin(ctx context.Context, in *api.Pin, out *struct{} return nil } -func (mock *mockService) IPFSUnpin(ctx context.Context, in cid.Cid, out *struct{}) error { +func (mock *mockService) IPFSUnpin(ctx context.Context, in *api.Pin, out *struct{}) error { return nil }