Address issues from self-review

License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
This commit is contained in:
Hector Sanjuan 2019-02-27 18:43:29 +00:00
parent 6447ea51d2
commit c4b18cd5f6
10 changed files with 133 additions and 68 deletions

View File

@ -94,7 +94,7 @@ func (a *Adder) FromMultipart(ctx context.Context, r *multipart.Reader) (cid.Cid
// FromFiles adds content from a files.Directory. The adder will no longer
// be usable after calling this method.
func (a *Adder) FromFiles(ctx context.Context, f files.Directory) (cid.Cid, error) {
logger.Error("adding from files")
logger.Debug("adding from files")
a.setContext(ctx)
if a.ctx.Err() != nil { // don't allow running twice

View File

@ -300,15 +300,11 @@ func (proxy *Server) pinOpHandler(op string, w http.ResponseWriter, r *http.Requ
return
}
var rpcArg interface{} = api.PinCid(c)
if op == "Unpin" {
rpcArg = c
}
err = proxy.rpcClient.Call(
"",
"Cluster",
op,
rpcArg,
api.PinCid(c),
&struct{}{},
)
if err != nil {
@ -485,6 +481,7 @@ func (proxy *Server) repoStatHandler(w http.ResponseWriter, r *http.Request) {
repoStats := make([]*api.IPFSRepoStat, len(peers), len(peers))
repoStatsIfaces := make([]interface{}, len(repoStats), len(repoStats))
for i := range repoStats {
repoStats[i] = &api.IPFSRepoStat{}
repoStatsIfaces[i] = repoStats[i]
}

View File

@ -697,7 +697,7 @@ func (api *API) unpinHandler(w http.ResponseWriter, r *http.Request) {
"",
"Cluster",
"Unpin",
pin.Cid,
pin,
&struct{}{},
)
api.sendResponse(w, http.StatusAccepted, err, nil)

View File

@ -781,12 +781,6 @@ func TestAPIStatusAllEndpoint(t *testing.T) {
resp[0].Cid.String() != test.TestCid1 ||
resp[1].PeerMap[peer.IDB58Encode(test.TestPeerID1)].Status.String() != "pinning" {
t.Errorf("unexpected statusAll resp")
for _, gpi := range resp {
t.Errorf("%s:\n", gpi.Cid)
for k, v := range gpi.PeerMap {
t.Errorf("%s: %+v\n", k, v)
}
}
}
// Test local=true

View File

@ -241,7 +241,7 @@ var ipfsPinStatus2TrackerStatusMap = map[IPFSPinStatus]TrackerStatus{
// GlobalPinInfo contains cluster-wide status information about a tracked Cid,
// indexed by cluster peer.
type GlobalPinInfo struct {
Cid cid.Cid `json:"cid" codec:"c,omitempty"`
Cid cid.Cid `json:"cid" codec:"c"`
// https://github.com/golang/go/issues/28827
// Peer IDs are of string Kind(). We can't use peer IDs here
// as Go ignores TextMarshaler.
@ -250,7 +250,7 @@ type GlobalPinInfo struct {
// PinInfo holds information about local pins.
type PinInfo struct {
Cid cid.Cid `json:"cid" codec:"c,omitempty"`
Cid cid.Cid `json:"cid" codec:"c"`
Peer peer.ID `json:"peer" codec:"p,omitempty"`
PeerName string `json:"peername" codec:"pn,omitempty"`
Status TrackerStatus `json:"status" codec:"st,omitempty"`
@ -260,21 +260,21 @@ type PinInfo struct {
// Version holds version information
type Version struct {
Version string `json:"Version" codec:"v,omitempty"`
Version string `json:"Version" codec:"v"`
}
// ConnectGraph holds information about the connectivity of the cluster
// To read, traverse the keys of ClusterLinks. Each such id is one of
// the peers of the "ClusterID" peer running the query. ClusterLinks[id]
// in turn lists the ids that peer "id" sees itself connected to. It is
// possible that id is a peer of ClusterID, but ClusterID can not reach id
// over rpc, in which case ClusterLinks[id] == [], as id's view of its
// connectivity can not be retrieved.
// ConnectGraph holds information about the connectivity of the cluster To
// read, traverse the keys of ClusterLinks. Each such id is one of the peers
// of the "ClusterID" peer running the query. ClusterLinks[id] in turn lists
// the ids that peer "id" sees itself connected to. It is possible that id is
// a peer of ClusterID, but ClusterID can not reach id over rpc, in which case
// ClusterLinks[id] == [], as id's view of its connectivity can not be
// retrieved.
//
// Iff there was an error reading the IPFSID of the peer then id will not be a
// key of ClustertoIPFS or IPFSLinks. Finally iff id is a key of ClustertoIPFS
// then id will be a key of IPFSLinks. In the event of a SwarmPeers error
// IPFSLinks[id] == [].
// Iff there was an error reading the IPFSID of the peer then id will not be a
// key of ClustertoIPFS or IPFSLinks. Finally iff id is a key of ClustertoIPFS
// then id will be a key of IPFSLinks. In the event of a SwarmPeers error
// IPFSLinks[id] == [].
type ConnectGraph struct {
ClusterID peer.ID
// ipfs to ipfs links
@ -285,32 +285,43 @@ type ConnectGraph struct {
ClustertoIPFS map[string]peer.ID `json:"cluster_to_ipfs" codec:"ci,omitempty"`
}
// Multiaddr is a utility type wrapping a Multiaddress
// Multiaddr is a concrete type to wrap a Multiaddress so that it knows how to
// serialize and deserialize itself.
type Multiaddr struct {
multiaddr.Multiaddr
}
// NewMultiaddr returns a cluster Multiaddr wrapper creating the
// multiaddr.Multiaddr with the given string.
func NewMultiaddr(mstr string) (Multiaddr, error) {
m, err := multiaddr.NewMultiaddr(mstr)
return Multiaddr{Multiaddr: m}, err
}
// NewMultiaddrWithValue returns a new cluster Multiaddr wrapper using the
// given multiaddr.Multiaddr.
func NewMultiaddrWithValue(ma multiaddr.Multiaddr) Multiaddr {
return Multiaddr{Multiaddr: ma}
}
// MarshalJSON returns a JSON-formatted multiaddress.
func (maddr Multiaddr) MarshalJSON() ([]byte, error) {
return maddr.Multiaddr.MarshalJSON()
}
// UnmarshalJSON parses a cluster Multiaddr from the JSON representation.
func (maddr *Multiaddr) UnmarshalJSON(data []byte) error {
maddr.Multiaddr, _ = multiaddr.NewMultiaddr("")
return maddr.Multiaddr.UnmarshalJSON(data)
}
// MarshalBinary returs the bytes of the wrapped multiaddress.
func (maddr Multiaddr) MarshalBinary() ([]byte, error) {
return maddr.Multiaddr.MarshalBinary()
}
// UnmarshalBinary casts some bytes as a multiaddress wraps it with
// the given cluster Multiaddr.
func (maddr *Multiaddr) UnmarshalBinary(data []byte) error {
datacopy := make([]byte, len(data)) // This is super important
copy(datacopy, data)
@ -318,6 +329,7 @@ func (maddr *Multiaddr) UnmarshalBinary(data []byte) error {
return maddr.Multiaddr.UnmarshalBinary(datacopy)
}
// Value returns the wrapped multiaddr.Multiaddr.
func (maddr Multiaddr) Value() multiaddr.Multiaddr {
return maddr.Multiaddr
}
@ -701,7 +713,13 @@ func (pin *Pin) Equals(pin2 *Pin) bool {
return false
}
if pin.Reference != pin2.Reference {
if pin.Reference != nil && pin2.Reference == nil ||
pin.Reference == nil && pin2.Reference != nil {
return false
}
if pin.Reference != nil && pin2.Reference != nil &&
!pin.Reference.Equals(*pin2.Reference) {
return false
}

View File

@ -1378,7 +1378,7 @@ func (c *Cluster) globalPinInfoSlice(ctx context.Context, method string) ([]*api
members, err := c.consensus.Peers(ctx)
if err != nil {
logger.Error(err)
return []*api.GlobalPinInfo{}, err
return nil, err
}
lenMembers := len(members)
@ -1398,6 +1398,9 @@ func (c *Cluster) globalPinInfoSlice(ctx context.Context, method string) ([]*api
mergePins := func(pins []*api.PinInfo) {
for _, p := range pins {
if p == nil {
continue
}
item, ok := fullMap[p.Cid]
if !ok {
fullMap[p.Cid] = &api.GlobalPinInfo{

View File

@ -95,7 +95,7 @@ func TestConsensusPin(t *testing.T) {
t.Error("the operation did not make it to the log:", err)
}
time.Sleep(5000 * time.Millisecond)
time.Sleep(250 * time.Millisecond)
st, err := cc.State(ctx)
if err != nil {
t.Fatal("error getting state:", err)

View File

@ -26,10 +26,10 @@ type LogOpType int
// It implements the consensus.Op interface and it is used by the
// Consensus component.
type LogOp struct {
SpanCtx trace.SpanContext `codec:"sctx,omitempty"`
TagCtx []byte `codec:"tctx,omitempty"`
Cid *api.Pin `codec:"p,omitempty"`
Type LogOpType `codec:"t,omitempty"`
SpanCtx trace.SpanContext `codec:"s,omitempty"`
TagCtx []byte `codec:"t,omitempty"`
Cid *api.Pin `codec:"c,omitempty"`
Type LogOpType `codec:"p,omitempty"`
consensus *Consensus `codec:-`
tracing bool `codec:-`
}
@ -91,7 +91,7 @@ func (op *LogOp) ApplyTo(cstate consensus.State) (consensus.State, error) {
"",
"Cluster",
"Untrack",
pin.Cid,
pin,
&struct{}{},
nil,
)

View File

@ -9,8 +9,6 @@ import (
cid "github.com/ipfs/go-cid"
multiaddr "github.com/multiformats/go-multiaddr"
"github.com/ipfs/ipfs-cluster/api"
)
@ -42,22 +40,28 @@ func (rpcapi *RPCAPI) Pin(ctx context.Context, in *api.Pin, out *struct{}) error
}
// Unpin runs Cluster.Unpin().
func (rpcapi *RPCAPI) Unpin(ctx context.Context, in cid.Cid, out *struct{}) error {
return rpcapi.c.Unpin(ctx, in)
func (rpcapi *RPCAPI) Unpin(ctx context.Context, in *api.Pin, out *struct{}) error {
return rpcapi.c.Unpin(ctx, in.Cid)
}
// PinPath resolves path into a cid and runs Cluster.Pin().
func (rpcapi *RPCAPI) PinPath(ctx context.Context, in *api.PinPath, out *api.Pin) error {
pin, err := rpcapi.c.PinPath(ctx, in)
if err != nil {
return err
}
*out = *pin
return err
return nil
}
// UnpinPath resolves path into a cid and runs Cluster.Unpin().
func (rpcapi *RPCAPI) UnpinPath(ctx context.Context, in string, out *api.Pin) error {
pin, err := rpcapi.c.UnpinPath(ctx, in)
if err != nil {
return err
}
*out = *pin
return err
return nil
}
// Pins runs Cluster.Pins().
@ -70,8 +74,11 @@ func (rpcapi *RPCAPI) Pins(ctx context.Context, in struct{}, out *[]*api.Pin) er
// PinGet runs Cluster.PinGet().
func (rpcapi *RPCAPI) PinGet(ctx context.Context, in cid.Cid, out *api.Pin) error {
pin, err := rpcapi.c.PinGet(ctx, in)
if err != nil {
return err
}
*out = *pin
return err
return nil
}
// Version runs Cluster.Version().
@ -91,15 +98,21 @@ func (rpcapi *RPCAPI) Peers(ctx context.Context, in struct{}, out *[]*api.ID) er
// PeerAdd runs Cluster.PeerAdd().
func (rpcapi *RPCAPI) PeerAdd(ctx context.Context, in peer.ID, out *api.ID) error {
id, err := rpcapi.c.PeerAdd(ctx, in)
if err != nil {
return err
}
*out = *id
return err
return nil
}
// ConnectGraph runs Cluster.GetConnectGraph().
func (rpcapi *RPCAPI) ConnectGraph(ctx context.Context, in struct{}, out *api.ConnectGraph) error {
graph, err := rpcapi.c.ConnectGraph()
if err != nil {
return err
}
*out = graph
return err
return nil
}
// PeerRemove runs Cluster.PeerRm().
@ -108,16 +121,18 @@ func (rpcapi *RPCAPI) PeerRemove(ctx context.Context, in peer.ID, out *struct{})
}
// Join runs Cluster.Join().
func (rpcapi *RPCAPI) Join(ctx context.Context, in multiaddr.Multiaddr, out *struct{}) error {
err := rpcapi.c.Join(ctx, in)
return err
func (rpcapi *RPCAPI) Join(ctx context.Context, in api.Multiaddr, out *struct{}) error {
return rpcapi.c.Join(ctx, in.Value())
}
// StatusAll runs Cluster.StatusAll().
func (rpcapi *RPCAPI) StatusAll(ctx context.Context, in struct{}, out *[]*api.GlobalPinInfo) error {
pinfos, err := rpcapi.c.StatusAll(ctx)
if err != nil {
return err
}
*out = pinfos
return err
return nil
}
// StatusAllLocal runs Cluster.StatusAllLocal().
@ -130,8 +145,11 @@ func (rpcapi *RPCAPI) StatusAllLocal(ctx context.Context, in struct{}, out *[]*a
// Status runs Cluster.Status().
func (rpcapi *RPCAPI) Status(ctx context.Context, in cid.Cid, out *api.GlobalPinInfo) error {
pinfo, err := rpcapi.c.Status(ctx, in)
if err != nil {
return err
}
*out = *pinfo
return err
return nil
}
// StatusLocal runs Cluster.StatusLocal().
@ -144,50 +162,71 @@ func (rpcapi *RPCAPI) StatusLocal(ctx context.Context, in cid.Cid, out *api.PinI
// SyncAll runs Cluster.SyncAll().
func (rpcapi *RPCAPI) SyncAll(ctx context.Context, in struct{}, out *[]*api.GlobalPinInfo) error {
pinfos, err := rpcapi.c.SyncAll(ctx)
if err != nil {
return err
}
*out = pinfos
return err
return nil
}
// SyncAllLocal runs Cluster.SyncAllLocal().
func (rpcapi *RPCAPI) SyncAllLocal(ctx context.Context, in struct{}, out *[]*api.PinInfo) error {
pinfos, err := rpcapi.c.SyncAllLocal(ctx)
if err != nil {
return err
}
*out = pinfos
return err
return nil
}
// Sync runs Cluster.Sync().
func (rpcapi *RPCAPI) Sync(ctx context.Context, in cid.Cid, out *api.GlobalPinInfo) error {
pinfo, err := rpcapi.c.Sync(ctx, in)
if err != nil {
return err
}
*out = *pinfo
return err
return nil
}
// SyncLocal runs Cluster.SyncLocal().
func (rpcapi *RPCAPI) SyncLocal(ctx context.Context, in cid.Cid, out *api.PinInfo) error {
pinfo, err := rpcapi.c.SyncLocal(ctx, in)
if err != nil {
return err
}
*out = *pinfo
return err
return nil
}
// RecoverAllLocal runs Cluster.RecoverAllLocal().
func (rpcapi *RPCAPI) RecoverAllLocal(ctx context.Context, in struct{}, out *[]*api.PinInfo) error {
pinfos, err := rpcapi.c.RecoverAllLocal(ctx)
if err != nil {
return err
}
*out = pinfos
return err
return nil
}
// Recover runs Cluster.Recover().
func (rpcapi *RPCAPI) Recover(ctx context.Context, in cid.Cid, out *api.GlobalPinInfo) error {
pinfo, err := rpcapi.c.Recover(ctx, in)
if err != nil {
return err
}
*out = *pinfo
return err
return nil
}
// RecoverLocal runs Cluster.RecoverLocal().
func (rpcapi *RPCAPI) RecoverLocal(ctx context.Context, in cid.Cid, out *api.PinInfo) error {
pinfo, err := rpcapi.c.RecoverLocal(ctx, in)
if err != nil {
return err
}
*out = *pinfo
return err
return nil
}
// BlockAllocate returns allocations for blocks. This is used in the adders.
@ -232,8 +271,11 @@ func (rpcapi *RPCAPI) BlockAllocate(ctx context.Context, in *api.Pin, out *[]pee
// SendInformerMetric runs Cluster.sendInformerMetric().
func (rpcapi *RPCAPI) SendInformerMetric(ctx context.Context, in struct{}, out *api.Metric) error {
m, err := rpcapi.c.sendInformerMetric(ctx)
if err != nil {
return err
}
*out = *m
return err
return nil
}
/*
@ -248,10 +290,10 @@ func (rpcapi *RPCAPI) Track(ctx context.Context, in *api.Pin, out *struct{}) err
}
// Untrack runs PinTracker.Untrack().
func (rpcapi *RPCAPI) Untrack(ctx context.Context, in cid.Cid, out *struct{}) error {
func (rpcapi *RPCAPI) Untrack(ctx context.Context, in *api.Pin, out *struct{}) error {
ctx, span := trace.StartSpan(ctx, "rpc/tracker/Untrack")
defer span.End()
return rpcapi.c.tracker.Untrack(ctx, in)
return rpcapi.c.tracker.Untrack(ctx, in.Cid)
}
// TrackerStatusAll runs PinTracker.StatusAll().
@ -276,8 +318,11 @@ func (rpcapi *RPCAPI) TrackerRecoverAll(ctx context.Context, in struct{}, out *[
ctx, span := trace.StartSpan(ctx, "rpc/tracker/RecoverAll")
defer span.End()
pinfos, err := rpcapi.c.tracker.RecoverAll(ctx)
if err != nil {
return err
}
*out = pinfos
return err
return nil
}
// TrackerRecover runs PinTracker.Recover().
@ -308,15 +353,21 @@ func (rpcapi *RPCAPI) IPFSUnpin(ctx context.Context, in cid.Cid, out *struct{})
// IPFSPinLsCid runs IPFSConnector.PinLsCid().
func (rpcapi *RPCAPI) IPFSPinLsCid(ctx context.Context, in cid.Cid, out *api.IPFSPinStatus) error {
b, err := rpcapi.c.ipfs.PinLsCid(ctx, in)
if err != nil {
return err
}
*out = b
return err
return nil
}
// IPFSPinLs runs IPFSConnector.PinLs().
func (rpcapi *RPCAPI) IPFSPinLs(ctx context.Context, in string, out *map[string]api.IPFSPinStatus) error {
m, err := rpcapi.c.ipfs.PinLs(ctx, in)
if err != nil {
return err
}
*out = m
return err
return nil
}
// IPFSConnectSwarms runs IPFSConnector.ConnectSwarms().
@ -328,8 +379,11 @@ func (rpcapi *RPCAPI) IPFSConnectSwarms(ctx context.Context, in struct{}, out *s
// IPFSConfigKey runs IPFSConnector.ConfigKey().
func (rpcapi *RPCAPI) IPFSConfigKey(ctx context.Context, in string, out *interface{}) error {
res, err := rpcapi.c.ipfs.ConfigKey(in)
if err != nil {
return err
}
*out = res
return err
return nil
}
// IPFSRepoStat runs IPFSConnector.RepoStat().
@ -415,8 +469,7 @@ func (rpcapi *RPCAPI) ConsensusPeers(ctx context.Context, in struct{}, out *[]pe
// PeerMonitorLogMetric runs PeerMonitor.LogMetric().
func (rpcapi *RPCAPI) PeerMonitorLogMetric(ctx context.Context, in *api.Metric, out *struct{}) error {
rpcapi.c.monitor.LogMetric(ctx, in)
return nil
return rpcapi.c.monitor.LogMetric(ctx, in)
}
// PeerMonitorLatestMetrics runs PeerMonitor.LatestMetrics().

View File

@ -287,7 +287,7 @@ func (mock *mockService) Track(ctx context.Context, in *api.Pin, out *struct{})
return nil
}
func (mock *mockService) Untrack(ctx context.Context, in cid.Cid, out *struct{}) error {
func (mock *mockService) Untrack(ctx context.Context, in *api.Pin, out *struct{}) error {
return nil
}
@ -368,7 +368,7 @@ func (mock *mockService) IPFSPin(ctx context.Context, in *api.Pin, out *struct{}
return nil
}
func (mock *mockService) IPFSUnpin(ctx context.Context, in cid.Cid, out *struct{}) error {
func (mock *mockService) IPFSUnpin(ctx context.Context, in *api.Pin, out *struct{}) error {
return nil
}