diff --git a/TODO.md b/TODO.md index 970351ed..077812b6 100644 --- a/TODO.md +++ b/TODO.md @@ -5,7 +5,9 @@ Things that need to be done: * ~~Start up sync fix~~ * ~~Allow to shutdown multiple times (+ test)~~ * Efficient SyncAll (use single ipfs pin ls call) -* LeaderRPC implementation +* ~~GlobalStatus~~ +* GlobalSync +* ~~LeaderRPC implementation~~ * /pin/add /pin/rm hijack * End-to-end tests * ipfscluster-server tool diff --git a/cluster.go b/cluster.go index 9c7512c6..82efa42f 100644 --- a/cluster.go +++ b/cluster.go @@ -116,21 +116,21 @@ func (c *Cluster) Shutdown() error { // IPFS is pinning content that should be pinned locally, and not pinning // other content. It will also try to recover any failed pin or unpin // operations by retrigerring them. -func (c *Cluster) LocalSync() ([]Pin, error) { +func (c *Cluster) LocalSync() ([]PinInfo, error) { cState, err := c.consensus.State() if err != nil { return nil, err } changed := c.tracker.SyncState(cState) - for _, p := range changed { - logger.Debugf("recovering %s", p.Cid) - err = c.tracker.Recover(p.Cid) + for _, h := range changed { + logger.Debugf("recovering %s", h) + err = c.tracker.Recover(h) if err != nil { - logger.Errorf("Error recovering %s: %s", p.Cid, err) + logger.Errorf("Error recovering %s: %s", h, err) return nil, err } } - return c.tracker.ListPins(), nil + return c.tracker.LocalStatus(), nil } // LocalSyncCid makes sure that the current state of the cluster @@ -138,39 +138,37 @@ func (c *Cluster) LocalSync() ([]Pin, error) { // makes sure that IPFS is pinning content that should be pinned locally, // and not pinning other content. It will also try to recover any failed // pin or unpin operations by retriggering them. -func (c *Cluster) LocalSyncCid(h *cid.Cid) (Pin, error) { +func (c *Cluster) LocalSyncCid(h *cid.Cid) (PinInfo, error) { changed := c.tracker.Sync(h) if changed { err := c.tracker.Recover(h) if err != nil { logger.Errorf("Error recovering %s: %s", h, err) - return Pin{}, err + return PinInfo{}, err } } - return c.tracker.GetPin(h), nil + return c.tracker.LocalStatusCid(h), nil } // GlobalSync triggers Sync() operations in all members of the Cluster. -func (c *Cluster) GlobalSync() ([]Pin, error) { - return c.Status(), nil +func (c *Cluster) GlobalSync() ([]GlobalPinInfo, error) { + return c.Status() } // GlobalSunc triggers a Sync() operation for a given Cid in all members // of the Cluster. -func (c *Cluster) GlobalSyncCid(h *cid.Cid) (Pin, error) { - return c.StatusCid(h), nil +func (c *Cluster) GlobalSyncCid(h *cid.Cid) (GlobalPinInfo, error) { + return c.StatusCid(h) } // Status returns the last known status for all Pins tracked by Cluster. -func (c *Cluster) Status() []Pin { - // TODO: Global - return c.tracker.ListPins() +func (c *Cluster) Status() ([]GlobalPinInfo, error) { + return c.tracker.GlobalStatus() } // StatusCid returns the last known status for a given Cid -func (c *Cluster) StatusCid(h *cid.Cid) Pin { - // TODO: Global - return c.tracker.GetPin(h) +func (c *Cluster) StatusCid(h *cid.Cid) (GlobalPinInfo, error) { + return c.tracker.GlobalStatusCid(h) } // Pins returns the list of Cids managed by Cluster and which are part @@ -198,7 +196,7 @@ func (c *Cluster) Pin(h *cid.Cid) error { defer cancel() logger.Info("pinning:", h) - rpc := NewRPC(ConsensusAddPinRPC, h) + rpc := NewRPC(ConsensusLogPinRPC, h) wrpc := NewRPC(LeaderRPC, rpc) resp := MakeRPC(ctx, c.rpcCh, wrpc, true) if resp.Error != nil { @@ -220,7 +218,7 @@ func (c *Cluster) Unpin(h *cid.Cid) error { defer cancel() logger.Info("unpinning:", h) - rpc := NewRPC(ConsensusRmPinRPC, h) + rpc := NewRPC(ConsensusLogUnpinRPC, h) wrpc := NewRPC(LeaderRPC, rpc) resp := MakeRPC(ctx, c.rpcCh, wrpc, true) if resp.Error != nil { @@ -305,14 +303,16 @@ func (c *Cluster) handleGenericRPC(grpc *GenericRPC) { case GlobalSyncRPC: data, err = c.GlobalSync() case StatusRPC: - data = c.Status() + data, err = c.Status() + case TrackerLocalStatusRPC: + data = c.tracker.LocalStatus() case RollbackRPC: - state, ok := grpc.Argument.(State) - if !ok { - err = errors.New("bad RollbackRPC type") - break - } - err = c.consensus.Rollback(state) + // State, ok := grpc.Argument.(State) + // if !ok { + // err = errors.New("bad RollbackRPC type") + // break + // } + // err = c.consensus.Rollback(state) default: logger.Error("unknown operation for GenericRPC. Ignoring.") } @@ -336,30 +336,24 @@ func (c *Cluster) handleCidRPC(crpc *CidRPC) { err = c.Pin(h) case UnpinRPC: err = c.Unpin(h) - case ConsensusAddPinRPC: - err = c.consensus.AddPin(h) - case ConsensusRmPinRPC: - err = c.consensus.RmPin(h) + case ConsensusLogPinRPC: + err = c.consensus.LogPin(h) + case ConsensusLogUnpinRPC: + err = c.consensus.LogUnpin(h) + case TrackRPC: + err = c.tracker.Track(h) + case UntrackRPC: + err = c.tracker.Untrack(h) + case TrackerLocalStatusCidRPC: + data = c.tracker.LocalStatusCid(h) case IPFSPinRPC: - c.tracker.Pinning(h) err = c.ipfs.Pin(h) - if err != nil { - c.tracker.PinError(h) - } else { - c.tracker.Pinned(h) - } case IPFSUnpinRPC: - c.tracker.Unpinning(h) err = c.ipfs.Unpin(h) - if err != nil { - c.tracker.UnpinError(h) - } else { - c.tracker.Unpinned(h) - } case IPFSIsPinnedRPC: data, err = c.ipfs.IsPinned(h) case StatusCidRPC: - data = c.StatusCid(h) + data, err = c.StatusCid(h) case LocalSyncCidRPC: data, err = c.LocalSyncCid(h) case GlobalSyncCidRPC: @@ -379,8 +373,26 @@ func (c *Cluster) handleCidRPC(crpc *CidRPC) { func (c *Cluster) handleWrappedRPC(wrpc *WrappedRPC) { innerRPC := wrpc.WRPC var resp RPCResponse + // resp initialization + switch innerRPC.Op() { + case TrackerLocalStatusRPC: + resp = RPCResponse{ + Data: []PinInfo{}, + Error: nil, + } + case TrackerLocalStatusCidRPC: + resp = RPCResponse{ + Data: PinInfo{}, + Error: nil, + } + default: + resp = RPCResponse{} + } + switch wrpc.Op() { case LeaderRPC: + // This is very generic for the moment. Only used for consensus + // LogPin/unpin. leader, err := c.consensus.Leader() if err != nil { resp = RPCResponse{ @@ -388,7 +400,7 @@ func (c *Cluster) handleWrappedRPC(wrpc *WrappedRPC) { Error: err, } } - resp, err = c.remote.MakeRemoteRPC(innerRPC, leader) + err = c.remote.MakeRemoteRPC(innerRPC, leader, &resp) if err != nil { resp = RPCResponse{ Data: nil, @@ -396,9 +408,37 @@ func (c *Cluster) handleWrappedRPC(wrpc *WrappedRPC) { } } case BroadcastRPC: + var wg sync.WaitGroup + var responses []RPCResponse + members := c.Members() + rch := make(chan RPCResponse, len(members)) + + makeRemote := func(p peer.ID, r RPCResponse) { + defer wg.Done() + err := c.remote.MakeRemoteRPC(innerRPC, p, &r) + if err != nil { + logger.Error("Error making remote RPC: ", err) + rch <- RPCResponse{ + Error: err, + } + } else { + rch <- r + } + } + wg.Add(len(members)) + for _, m := range members { + go makeRemote(m, resp) + } + wg.Wait() + close(rch) + + for r := range rch { + responses = append(responses, r) + } + resp = RPCResponse{ - Data: nil, - Error: errors.New("not implemented"), + Data: responses, + Error: nil, } default: resp = RPCResponse{ diff --git a/cluster_test.go b/cluster_test.go index 4b9d8cfe..3760b650 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -57,7 +57,7 @@ func testingCluster(t *testing.T) (*Cluster, *mockApi, *mockConnector, *MapState ipfs.rpcCh = make(chan RPC, 2) cfg := testingConfig() st := NewMapState() - tracker := NewMapPinTracker() + tracker := NewMapPinTracker(cfg) remote := NewLibp2pRemote() cl, err := NewCluster( diff --git a/config.go b/config.go index 270c282b..249a45c5 100644 --- a/config.go +++ b/config.go @@ -2,6 +2,7 @@ package ipfscluster import ( "encoding/json" + "fmt" "io/ioutil" ) @@ -38,11 +39,13 @@ type Config struct { } func LoadConfig(path string) (*Config, error) { + fmt.Println(path) config := &Config{} file, err := ioutil.ReadFile(path) if err != nil { return nil, err } json.Unmarshal(file, config) + fmt.Printf("%+v", config) return config, nil } diff --git a/consensus.go b/consensus.go index 5916b06b..1c6202af 100644 --- a/consensus.go +++ b/consensus.go @@ -69,14 +69,14 @@ func (op *clusterLogOp) ApplyTo(cstate consensus.State) (consensus.State, error) goto ROLLBACK } // Async, we let the PinTracker take care of any problems - MakeRPC(ctx, op.rpcCh, NewRPC(IPFSPinRPC, c), false) + MakeRPC(ctx, op.rpcCh, NewRPC(TrackRPC, c), false) case LogOpUnpin: err := state.RmPin(c) if err != nil { goto ROLLBACK } // Async, we let the PinTracker take care of any problems - MakeRPC(ctx, op.rpcCh, NewRPC(IPFSUnpinRPC, c), false) + MakeRPC(ctx, op.rpcCh, NewRPC(UntrackRPC, c), false) default: logger.Error("unknown clusterLogOp type. Ignoring") } @@ -251,8 +251,8 @@ func (cc *Consensus) op(c *cid.Cid, t clusterLogOpType) *clusterLogOp { } } -// AddPin submits a Cid to the shared state of the cluster. -func (cc *Consensus) AddPin(c *cid.Cid) error { +// LogPin submits a Cid to the shared state of the cluster. +func (cc *Consensus) LogPin(c *cid.Cid) error { // Create pin operation for the log op := cc.op(c, LogOpPin) _, err := cc.consensus.CommitOp(op) @@ -264,8 +264,8 @@ func (cc *Consensus) AddPin(c *cid.Cid) error { return nil } -// RmPin removes a Cid from the shared state of the cluster. -func (cc *Consensus) RmPin(c *cid.Cid) error { +// LogUnpin removes a Cid from the shared state of the cluster. +func (cc *Consensus) LogUnpin(c *cid.Cid) error { // Create unpin operation for the log op := cc.op(c, LogOpUnpin) _, err := cc.consensus.CommitOp(op) diff --git a/consensus_test.go b/consensus_test.go index c1785e66..69bf48dc 100644 --- a/consensus_test.go +++ b/consensus_test.go @@ -124,7 +124,7 @@ func TestConsensusPin(t *testing.T) { defer cc.Shutdown() c, _ := cid.Decode(testCid) - err := cc.AddPin(c) + err := cc.LogPin(c) if err != nil { t.Error("the operation did not make it to the log:", err) } @@ -147,7 +147,7 @@ func TestConsensusUnpin(t *testing.T) { defer cc.Shutdown() c, _ := cid.Decode(testCid2) - err := cc.RmPin(c) + err := cc.LogUnpin(c) if err != nil { t.Error("the operation did not make it to the log:", err) } diff --git a/e2e_test.go b/e2e_test.go index 1d3ec8e5..32d81f79 100644 --- a/e2e_test.go +++ b/e2e_test.go @@ -15,7 +15,8 @@ import ( peer "github.com/libp2p/go-libp2p-peer" ) -// This runs tests using the standard components and the IPFS mock daemon. +// This runs end-to-end tests using the standard components and the IPFS mock +// daemon. // End-to-end means that all default implementations of components are tested // together. It is not fully end-to-end because the ipfs daemon is a mock which // never hangs. @@ -24,7 +25,7 @@ import ( var nClusters = 3 // number of pins to pin/unpin/check -var nPins = 1000 +var nPins = 500 // ports var clusterPort = 20000 @@ -50,9 +51,9 @@ func randomBytes() []byte { return bs } -func createClusters(t *testing.T) ([]*Cluster, *ipfsMock) { +func createClusters(t *testing.T) ([]*Cluster, []*ipfsMock) { os.RemoveAll("./e2eTestRaft") - ipfsmock := newIpfsMock() + ipfsMocks := make([]*ipfsMock, 0, nClusters) clusters := make([]*Cluster, 0, nClusters) cfgs := make([]*Config, 0, nClusters) @@ -82,6 +83,8 @@ func createClusters(t *testing.T) ([]*Cluster, *ipfsMock) { // Generate nClusters configs for i := 0; i < nClusters; i++ { + mock := newIpfsMock() + ipfsMocks = append(ipfsMocks, mock) cfgs = append(cfgs, &Config{ ID: peers[i].pid, PrivateKey: peers[i].priv, @@ -93,8 +96,8 @@ func createClusters(t *testing.T) ([]*Cluster, *ipfsMock) { APIPort: apiPort + i, IPFSAPIAddr: "127.0.0.1", IPFSAPIPort: ipfsApiPort + i, - IPFSAddr: ipfsmock.addr, - IPFSPort: ipfsmock.port, + IPFSAddr: mock.addr, + IPFSPort: mock.port, }) } @@ -104,19 +107,19 @@ func createClusters(t *testing.T) ([]*Cluster, *ipfsMock) { ipfs, err := NewIPFSHTTPConnector(cfgs[i]) checkErr(t, err) state := NewMapState() - tracker := NewMapPinTracker() + tracker := NewMapPinTracker(cfgs[i]) remote := NewLibp2pRemote() cl, err := NewCluster(cfgs[i], api, ipfs, state, tracker, remote) checkErr(t, err) clusters = append(clusters, cl) } - return clusters, ipfsmock + return clusters, ipfsMocks } -func shutdownClusters(t *testing.T, clusters []*Cluster, m *ipfsMock) { - m.Close() - for _, c := range clusters { +func shutdownClusters(t *testing.T, clusters []*Cluster, m []*ipfsMock) { + for i, c := range clusters { + m[i].Close() err := c.Shutdown() if err != nil { t.Error(err) @@ -163,26 +166,101 @@ func TestE2EPin(t *testing.T) { for i := 0; i < nPins; i++ { j := rand.Intn(nClusters) // choose a random cluster member h, err := prefix.Sum(randomBytes()) // create random cid - fmt.Println(h) checkErr(t, err) err = clusters[j].Pin(h) if err != nil { t.Errorf("error pinning %s: %s", h, err) } + // Test re-pin + err = clusters[j].Pin(h) + if err != nil { + t.Errorf("error repinning %s: %s", h, err) + } } delay() - f := func(t *testing.T, c *Cluster) { - status := c.tracker.ListPins() + fpinned := func(t *testing.T, c *Cluster) { + status := c.tracker.LocalStatus() for _, v := range status { - if v.Status != Pinned { + if v.IPFS != Pinned { t.Errorf("%s should have been pinned but it is %s", - v.Cid.String, - v.Status.String()) + v.CidStr, + v.IPFS.String()) } } if l := len(status); l != nPins { t.Errorf("Pinned %d out of %d requests", l, nPins) } } + runF(t, clusters, fpinned) + + // Unpin everything + pinList := clusters[0].Pins() + + for i := 0; i < nPins; i++ { + j := rand.Intn(nClusters) // choose a random cluster member + err := clusters[j].Unpin(pinList[i]) + if err != nil { + t.Errorf("error unpinning %s: %s", pinList[i], err) + } + // test re-unpin + err = clusters[j].Unpin(pinList[i]) + if err != nil { + t.Errorf("error re-unpinning %s: %s", pinList[i], err) + } + + } + delay() + + funpinned := func(t *testing.T, c *Cluster) { + status := c.tracker.LocalStatus() + if l := len(status); l != 0 { + t.Errorf("Nothing should be pinned") + } + } + runF(t, clusters, funpinned) +} + +func TestE2EStatus(t *testing.T) { + clusters, mock := createClusters(t) + defer shutdownClusters(t, clusters, mock) + delay() + h, _ := cid.Decode(testCid) + clusters[0].Pin(h) + delay() + // Global status + f := func(t *testing.T, c *Cluster) { + statuses, err := c.Status() + if err != nil { + t.Error(err) + } + if len(statuses) == 0 { + t.Fatal("bad status. Expected one item") + } + if statuses[0].Cid.String() != testCid { + t.Error("bad cid in status") + } + info := statuses[0].Status + if len(info) != nClusters { + t.Error("bad info in status") + } + + if info[c.host.ID()].IPFS != Pinned { + t.Error("the hash should have been pinned") + } + + status, err := c.StatusCid(h) + if err != nil { + t.Error(err) + } + + pinfo, ok := status.Status[c.host.ID()] + if !ok { + t.Fatal("Host not in status") + } + + if pinfo.IPFS != Pinned { + t.Error("the status should show the hash as pinned") + } + } runF(t, clusters, f) } diff --git a/ipfs-cluster/main.go b/ipfs-cluster/main.go index c076ef94..261684de 100644 --- a/ipfs-cluster/main.go +++ b/ipfs-cluster/main.go @@ -26,7 +26,7 @@ func main() { fmt.Println(err) return } - api, err := ipfscluster.NewHTTPAPI(clusterCfg) + api, err := ipfscluster.NewRESTAPI(clusterCfg) if err != nil { fmt.Println(err) return @@ -39,7 +39,7 @@ func main() { } state := ipfscluster.NewMapState() - tracker := ipfscluster.NewMapPinTracker() + tracker := ipfscluster.NewMapPinTracker(clusterCfg) remote := ipfscluster.NewLibp2pRemote() cluster, err := ipfscluster.NewCluster(clusterCfg, api, proxy, state, tracker, remote) diff --git a/ipfs_http_connector.go b/ipfs_http_connector.go index 39d7107d..662d30e0 100644 --- a/ipfs_http_connector.go +++ b/ipfs_http_connector.go @@ -166,7 +166,6 @@ func (ipfs *IPFSHTTPConnector) Shutdown() error { // Pin performs a pin request against the configured IPFS // daemon. func (ipfs *IPFSHTTPConnector) Pin(hash *cid.Cid) error { - logger.Infof("IPFS Pin request for: %s", hash) pinned, err := ipfs.IsPinned(hash) if err != nil { return err @@ -174,16 +173,18 @@ func (ipfs *IPFSHTTPConnector) Pin(hash *cid.Cid) error { if !pinned { path := fmt.Sprintf("pin/add?arg=%s", hash) _, err = ipfs.get(path) + if err == nil { + logger.Info("IPFS Pin request succeeded: ", hash) + } return err } - logger.Debug("object is already pinned. Doing nothing") + logger.Info("IPFS object is already pinned: ", hash) return nil } // UnPin performs an unpin request against the configured IPFS // daemon. func (ipfs *IPFSHTTPConnector) Unpin(hash *cid.Cid) error { - logger.Info("IPFS Unpin request for:", hash) pinned, err := ipfs.IsPinned(hash) if err != nil { return err @@ -191,10 +192,13 @@ func (ipfs *IPFSHTTPConnector) Unpin(hash *cid.Cid) error { if pinned { path := fmt.Sprintf("pin/rm?arg=%s", hash) _, err := ipfs.get(path) + if err == nil { + logger.Info("IPFS Unpin request succeeded:", hash) + } return err } - logger.Debug("object not [directly] pinned. Doing nothing") + logger.Info("IPFS object is already unpinned: ", hash) return nil } diff --git a/ipfscluster.go b/ipfscluster.go index 9e13a148..179e080a 100644 --- a/ipfscluster.go +++ b/ipfscluster.go @@ -99,33 +99,32 @@ type State interface { // IPFS daemon. This component should be thread safe. type PinTracker interface { ClusterComponent - // Pinning tells the pin tracker that a pin is being pinned by IPFS - Pinning(*cid.Cid) error - // Pinned tells the pin tracer is pinned by IPFS - Pinned(*cid.Cid) error - // Pinned tells the pin tracker is being unpinned by IPFS - Unpinning(*cid.Cid) error - // Unpinned tells the pin tracker that a pin has been unpinned by IFPS - Unpinned(*cid.Cid) error - // PinError tells the pin tracker that an IPFS pin operation has failed - PinError(*cid.Cid) error - // UnpinError tells the pin tracker that an IPFS unpin operation has failed - UnpinError(*cid.Cid) error - // ListPins returns the list of pins with their status - ListPins() []Pin - // GetPin returns a Pin. - GetPin(*cid.Cid) Pin + // Track tells the tracker that a Cid is now under its supervision + // The tracker may decide to perform an IPFS pin. + Track(*cid.Cid) error + // Untrack tells the tracker that a Cid is to be forgotten. The tracker + // may perform an IPFS unpin operation. + Untrack(*cid.Cid) error + // LocalStatus returns the list of pins with their local status. + LocalStatus() []PinInfo + // GlobalStatus returns the list of pins with their local and remote + // status, which has been fetched. + LocalStatusCid(*cid.Cid) PinInfo + // GlobalStatusCid returns the global status of a given Cid. + GlobalStatus() ([]GlobalPinInfo, error) + // LocalStatusCid returns the local status of a given Cid. + GlobalStatusCid(*cid.Cid) (GlobalPinInfo, error) // Sync makes sure that the Cid status reflect the real IPFS status. If not, // the status is marked as error. The return value indicates if the // Pin status was updated. Sync(*cid.Cid) bool - // Recover attempts to recover an error by re-[un]pinning the item. + // Recover attempts to recover an error by re-[un]pinning the item if needed. Recover(*cid.Cid) error // SyncAll runs Sync() on every known Pin. It returns a list of changed Pins - SyncAll() []Pin + SyncAll() []PinInfo // SyncState makes sure that the tracked Pins matches those in the // cluster state and runs SyncAll(). It returns a list of changed Pins. - SyncState(State) []Pin + SyncState(State) []*cid.Cid } // Remote represents a component which takes care of @@ -133,7 +132,10 @@ type PinTracker interface { // handling any incoming remote requests from other nodes. type Remote interface { ClusterComponent - MakeRemoteRPC(rpc RPC, node peer.ID) (RPCResponse, error) + // MakeRemoteRPC performs an RPC requests to a remote peer. + // The response is decoded onto the RPCResponse provided. + MakeRemoteRPC(RPC, peer.ID, *RPCResponse) error + // SetHost provides a libp2p host to use by this remote SetHost(host.Host) } diff --git a/libp2p_remote.go b/libp2p_remote.go index 91578b24..c8b844a8 100644 --- a/libp2p_remote.go +++ b/libp2p_remote.go @@ -33,7 +33,7 @@ func NewLibp2pRemote() *Libp2pRemote { r := &Libp2pRemote{ ctx: ctx, - rpcCh: make(chan RPC), + rpcCh: make(chan RPC, RPCMaxQueue), shutdownCh: make(chan struct{}), } @@ -91,14 +91,13 @@ func (r *Libp2pRemote) handleRemoteRPC(s *streamWrap) error { func (r *Libp2pRemote) decodeRPC(s *streamWrap, rpcType int) (RPC, error) { var err error - switch RPCOpToType[rpcType] { + switch rpcType { case CidRPCType: var rpc *CidRPC err = s.dec.Decode(&rpc) if err != nil { goto DECODE_ERROR } - logger.Debugf("%+v", rpc) return rpc, nil case GenericRPCType: var rpc *GenericRPC @@ -135,43 +134,43 @@ func (r *Libp2pRemote) sendStreamResponse(s *streamWrap, resp RPCResponse) error return nil } -func (r *Libp2pRemote) MakeRemoteRPC(rpc RPC, node peer.ID) (RPCResponse, error) { +func (r *Libp2pRemote) MakeRemoteRPC(rpc RPC, node peer.ID, resp *RPCResponse) error { ctx, cancel := context.WithCancel(r.ctx) defer cancel() - var resp RPCResponse if r.host == nil { - return resp, errors.New("no host set") + return errors.New("no host set") } if node == r.host.ID() { // libp2p cannot dial itself - return MakeRPC(ctx, r.rpcCh, rpc, true), nil + *resp = MakeRPC(ctx, r.rpcCh, rpc, true) + return nil } s, err := r.host.NewStream(ctx, node, ClusterP2PProtocol) if err != nil { - return resp, err + return err } defer s.Close() sWrap := wrapStream(s) logger.Debugf("sending remote RPC %d to %s", rpc.Op(), node) - if err := sWrap.w.WriteByte(byte(rpc.Op())); err != nil { - return resp, err + if err := sWrap.w.WriteByte(byte(rpc.RType())); err != nil { + return err } if err := sWrap.enc.Encode(rpc); err != nil { - return resp, err + return err } if err := sWrap.w.Flush(); err != nil { - return resp, err + return err } logger.Debug("Waiting for response from %s", node) - if err := sWrap.dec.Decode(&resp); err != nil { - return resp, err + if err := sWrap.dec.Decode(resp); err != nil { + return err } - return resp, nil + return nil } diff --git a/map_pin_tracker.go b/map_pin_tracker.go index 43d86891..4e2f92de 100644 --- a/map_pin_tracker.go +++ b/map_pin_tracker.go @@ -2,14 +2,14 @@ package ipfscluster import ( "context" + "errors" + "fmt" "sync" "time" - cid "github.com/ipfs/go-cid" -) + peer "github.com/libp2p/go-libp2p-peer" -const ( - pinEverywhere = -1 + cid "github.com/ipfs/go-cid" ) // A Pin or Unpin operation will be considered failed @@ -27,19 +27,27 @@ const ( Pinning Unpinning Unpinned + RemotePin ) -type Pin struct { - Cid *cid.Cid - PinMode PinMode - Status PinStatus - TS time.Time +type GlobalPinInfo struct { + Cid *cid.Cid + Status map[peer.ID]PinInfo } -type PinMode int -type PinStatus int +// PinInfo holds information about local pins. PinInfo is +// serialized when requesting the Global status, therefore +// we cannot use *cid.Cid. +type PinInfo struct { + CidStr string + Peer peer.ID + IPFS IPFSStatus + TS time.Time +} -func (st PinStatus) String() string { +type IPFSStatus int + +func (st IPFSStatus) String() string { switch st { case PinError: return "pin_error" @@ -59,10 +67,11 @@ func (st PinStatus) String() string { type MapPinTracker struct { mux sync.Mutex - status map[string]Pin + status map[string]PinInfo - ctx context.Context - rpcCh chan RPC + ctx context.Context + rpcCh chan RPC + peerID peer.ID shutdownLock sync.Mutex shutdown bool @@ -70,12 +79,19 @@ type MapPinTracker struct { wg sync.WaitGroup } -func NewMapPinTracker() *MapPinTracker { +func NewMapPinTracker(cfg *Config) *MapPinTracker { ctx := context.Background() + + pID, err := peer.IDB58Decode(cfg.ID) + if err != nil { + panic(err) + } + mpt := &MapPinTracker{ - status: make(map[string]Pin), - rpcCh: make(chan RPC, RPCMaxQueue), ctx: ctx, + status: make(map[string]PinInfo), + rpcCh: make(chan RPC, RPCMaxQueue), + peerID: pID, shutdownCh: make(chan struct{}), } logger.Info("starting MapPinTracker") @@ -110,74 +126,179 @@ func (mpt *MapPinTracker) Shutdown() error { return nil } -func (mpt *MapPinTracker) set(c *cid.Cid, s PinStatus) error { - mpt.mux.Lock() - defer mpt.mux.Unlock() +func (mpt *MapPinTracker) unsafeSet(c *cid.Cid, s IPFSStatus) { if s == Unpinned { delete(mpt.status, c.String()) - return nil } - mpt.status[c.String()] = Pin{ - Cid: c, - PinMode: pinEverywhere, - Status: s, - TS: time.Now(), + mpt.status[c.String()] = PinInfo{ + // cid: c, + CidStr: c.String(), + Peer: mpt.peerID, + IPFS: s, + TS: time.Now(), } - return nil } -func (mpt *MapPinTracker) get(c *cid.Cid) Pin { +func (mpt *MapPinTracker) set(c *cid.Cid, s IPFSStatus) { + mpt.mux.Lock() + defer mpt.mux.Unlock() + mpt.unsafeSet(c, s) +} + +func (mpt *MapPinTracker) get(c *cid.Cid) PinInfo { mpt.mux.Lock() defer mpt.mux.Unlock() p, ok := mpt.status[c.String()] if !ok { - return Pin{ - Cid: c, - Status: Unpinned, + return PinInfo{ + // cid: c, + CidStr: c.String(), + Peer: mpt.peerID, + IPFS: Unpinned, + TS: time.Now(), } } return p } -func (mpt *MapPinTracker) Pinning(c *cid.Cid) error { - return mpt.set(c, Pinning) +func (mpt *MapPinTracker) pin(c *cid.Cid) error { + ctx, cancel := context.WithCancel(mpt.ctx) + defer cancel() + + mpt.set(c, Pinning) + resp := MakeRPC(ctx, mpt.rpcCh, NewRPC(IPFSPinRPC, c), true) + if resp.Error != nil { + mpt.set(c, PinError) + return resp.Error + } + mpt.set(c, Pinned) + return nil } -func (mpt *MapPinTracker) Unpinning(c *cid.Cid) error { - return mpt.set(c, Unpinning) +func (mpt *MapPinTracker) unpin(c *cid.Cid) error { + ctx, cancel := context.WithCancel(mpt.ctx) + defer cancel() + + mpt.set(c, Unpinning) + resp := MakeRPC(ctx, mpt.rpcCh, NewRPC(IPFSUnpinRPC, c), true) + if resp.Error != nil { + mpt.set(c, PinError) + return resp.Error + } + mpt.set(c, Unpinned) + return nil } -func (mpt *MapPinTracker) Pinned(c *cid.Cid) error { - return mpt.set(c, Pinned) +func (mpt *MapPinTracker) Track(c *cid.Cid) error { + return mpt.pin(c) } -func (mpt *MapPinTracker) PinError(c *cid.Cid) error { - return mpt.set(c, PinError) +func (mpt *MapPinTracker) Untrack(c *cid.Cid) error { + return mpt.unpin(c) } -func (mpt *MapPinTracker) UnpinError(c *cid.Cid) error { - return mpt.set(c, UnpinError) -} - -func (mpt *MapPinTracker) Unpinned(c *cid.Cid) error { - return mpt.set(c, Unpinned) -} - -func (mpt *MapPinTracker) GetPin(c *cid.Cid) Pin { +func (mpt *MapPinTracker) LocalStatusCid(c *cid.Cid) PinInfo { return mpt.get(c) } -func (mpt *MapPinTracker) ListPins() []Pin { +func (mpt *MapPinTracker) LocalStatus() []PinInfo { mpt.mux.Lock() defer mpt.mux.Unlock() - pins := make([]Pin, 0, len(mpt.status)) + pins := make([]PinInfo, 0, len(mpt.status)) for _, v := range mpt.status { pins = append(pins, v) } return pins } +func (mpt *MapPinTracker) GlobalStatus() ([]GlobalPinInfo, error) { + ctx, cancel := context.WithCancel(mpt.ctx) + defer cancel() + + rpc := NewRPC(TrackerLocalStatusRPC, nil) + wrpc := NewRPC(BroadcastRPC, rpc) + resp := MakeRPC(ctx, mpt.rpcCh, wrpc, true) + if resp.Error != nil { + return nil, resp.Error + } + + responses, ok := resp.Data.([]RPCResponse) + if !ok { + return nil, errors.New("unexpected responses format") + } + fullMap := make(map[string]GlobalPinInfo) + + mergePins := func(pins []PinInfo) { + for _, p := range pins { + item, ok := fullMap[p.CidStr] + c, _ := cid.Decode(p.CidStr) + if !ok { + fullMap[p.CidStr] = GlobalPinInfo{ + Cid: c, + Status: map[peer.ID]PinInfo{ + p.Peer: p, + }, + } + } else { + item.Status[p.Peer] = p + } + } + } + for _, r := range responses { + if r.Error != nil { + logger.Error("error in one of the broadcast responses: ", r.Error) + continue + } + pins, ok := r.Data.([]PinInfo) + if !ok { + return nil, fmt.Errorf("unexpected response format: %+v", r.Data) + } + mergePins(pins) + } + + status := make([]GlobalPinInfo, 0, len(fullMap)) + for _, v := range fullMap { + status = append(status, v) + } + return status, nil +} + +func (mpt *MapPinTracker) GlobalStatusCid(c *cid.Cid) (GlobalPinInfo, error) { + ctx, cancel := context.WithCancel(mpt.ctx) + defer cancel() + + pin := GlobalPinInfo{ + Cid: c, + Status: make(map[peer.ID]PinInfo), + } + + rpc := NewRPC(TrackerLocalStatusCidRPC, c) + wrpc := NewRPC(BroadcastRPC, rpc) + resp := MakeRPC(ctx, mpt.rpcCh, wrpc, true) + if resp.Error != nil { + return pin, resp.Error + } + + responses, ok := resp.Data.([]RPCResponse) + if !ok { + return pin, errors.New("unexpected responses format") + } + + for _, r := range responses { + if r.Error != nil { + return pin, r.Error + } + info, ok := r.Data.(PinInfo) + if !ok { + return pin, errors.New("unexpected response format") + } + pin.Status[info.Peer] = info + } + + return pin, nil +} + func (mpt *MapPinTracker) Sync(c *cid.Cid) bool { ctx, cancel := context.WithCancel(mpt.ctx) defer cancel() @@ -185,17 +306,17 @@ func (mpt *MapPinTracker) Sync(c *cid.Cid) bool { p := mpt.get(c) // We assume errors will need a Recover() so we return true - if p.Status == PinError || p.Status == UnpinError { + if p.IPFS == PinError || p.IPFS == UnpinError { return true } resp := MakeRPC(ctx, mpt.rpcCh, NewRPC(IPFSIsPinnedRPC, c), true) if resp.Error != nil { - if p.Status == Pinned || p.Status == Pinning { + if p.IPFS == Pinned || p.IPFS == Pinning { mpt.set(c, PinError) return true } - if p.Status == Unpinned || p.Status == Unpinning { + if p.IPFS == Unpinned || p.IPFS == Unpinning { mpt.set(c, UnpinError) return true } @@ -208,7 +329,7 @@ func (mpt *MapPinTracker) Sync(c *cid.Cid) bool { } if ipfsPinned { - switch p.Status { + switch p.IPFS { case Pinned: return false case Pinning: @@ -225,7 +346,7 @@ func (mpt *MapPinTracker) Sync(c *cid.Cid) bool { return true } } else { - switch p.Status { + switch p.IPFS { case Pinned: mpt.set(c, PinError) return true @@ -247,26 +368,25 @@ func (mpt *MapPinTracker) Sync(c *cid.Cid) bool { func (mpt *MapPinTracker) Recover(c *cid.Cid) error { p := mpt.get(c) - if p.Status != PinError && p.Status != UnpinError { + if p.IPFS != PinError && p.IPFS != UnpinError { return nil } - ctx, cancel := context.WithCancel(mpt.ctx) - defer cancel() - if p.Status == PinError { - MakeRPC(ctx, mpt.rpcCh, NewRPC(IPFSPinRPC, c), false) + if p.IPFS == PinError { + mpt.pin(c) } - if p.Status == UnpinError { - MakeRPC(ctx, mpt.rpcCh, NewRPC(IPFSUnpinRPC, c), false) + if p.IPFS == UnpinError { + mpt.unpin(c) } return nil } -func (mpt *MapPinTracker) SyncAll() []Pin { - var changedPins []Pin - pins := mpt.ListPins() +func (mpt *MapPinTracker) SyncAll() []PinInfo { + var changedPins []PinInfo + pins := mpt.LocalStatus() for _, p := range pins { - changed := mpt.Sync(p.Cid) + c, _ := cid.Decode(p.CidStr) + changed := mpt.Sync(c) if changed { changedPins = append(changedPins, p) } @@ -274,7 +394,7 @@ func (mpt *MapPinTracker) SyncAll() []Pin { return changedPins } -func (mpt *MapPinTracker) SyncState(cState State) []Pin { +func (mpt *MapPinTracker) SyncState(cState State) []*cid.Cid { clusterPins := cState.ListPins() clusterMap := make(map[string]struct{}) // Make a map for faster lookup @@ -284,7 +404,7 @@ func (mpt *MapPinTracker) SyncState(cState State) []Pin { } var toRemove []*cid.Cid var toAdd []*cid.Cid - var changed []Pin + var changed []*cid.Cid mpt.mux.Lock() // Collect items in the State not in the tracker @@ -297,32 +417,23 @@ func (mpt *MapPinTracker) SyncState(cState State) []Pin { // Collect items in the tracker not in the State for _, p := range mpt.status { - _, ok := clusterMap[p.Cid.String()] + c, _ := cid.Decode(p.CidStr) + _, ok := clusterMap[p.CidStr] if !ok { - toRemove = append(toRemove, p.Cid) + toRemove = append(toRemove, c) } } // Update new items and mark them as pinning error for _, c := range toAdd { - p := Pin{ - Cid: c, - PinMode: pinEverywhere, - Status: PinError, - } - mpt.status[c.String()] = p - changed = append(changed, p) + mpt.unsafeSet(c, PinError) + changed = append(changed, c) } // Mark items that need to be removed as unpin error for _, c := range toRemove { - p := Pin{ - Cid: c, - PinMode: pinEverywhere, - Status: UnpinError, - } - mpt.status[c.String()] = p - changed = append(changed, p) + mpt.unsafeSet(c, UnpinError) + changed = append(changed, c) } mpt.mux.Unlock() return changed diff --git a/rest_api.go b/rest_api.go index 5cf82470..780d78ab 100644 --- a/rest_api.go +++ b/rest_api.go @@ -61,9 +61,13 @@ type unpinResp struct { Unpinned string `json:"unpinned"` } +type statusInfo struct { + Status string +} + type statusCidResp struct { - Cid string `json:"cid"` - Status string `json:"status"` + Cid string `json:"cid"` + Status map[string]statusInfo `json:"status"` } type statusResp []statusCidResp @@ -296,7 +300,7 @@ func (api *RESTAPI) statusCidHandler(w http.ResponseWriter, r *http.Request) { func (api *RESTAPI) syncHandler(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithCancel(api.ctx) defer cancel() - rRpc := NewRPC(LocalSyncRPC, nil) + rRpc := NewRPC(GlobalSyncRPC, nil) resp := MakeRPC(ctx, api.rpcCh, rRpc, true) if checkResponse(w, rRpc.Op(), resp) { sendStatusResponse(w, resp) @@ -308,7 +312,7 @@ func (api *RESTAPI) syncCidHandler(w http.ResponseWriter, r *http.Request) { defer cancel() if c := parseCidOrError(w, r); c != nil { - op := NewRPC(LocalSyncCidRPC, c) + op := NewRPC(GlobalSyncCidRPC, c) resp := MakeRPC(ctx, api.rpcCh, op, true) if checkResponse(w, op.Op(), resp) { sendStatusCidResponse(w, resp) @@ -344,9 +348,9 @@ func checkResponse(w http.ResponseWriter, op RPCOp, resp RPCResponse) bool { case PinRPC: // Pin/Unpin only return errors case UnpinRPC: case StatusRPC, LocalSyncRPC, GlobalSyncRPC: - _, ok = resp.Data.([]Pin) + _, ok = resp.Data.([]GlobalPinInfo) case StatusCidRPC, LocalSyncCidRPC, GlobalSyncCidRPC: - _, ok = resp.Data.(Pin) + _, ok = resp.Data.(GlobalPinInfo) case PinListRPC: _, ok = resp.Data.([]*cid.Cid) case IPFSPinRPC: @@ -389,23 +393,30 @@ func sendErrorResponse(w http.ResponseWriter, code int, msg string) { sendJSONResponse(w, code, errorResp) } +func transformPinToStatusCid(p GlobalPinInfo) statusCidResp { + s := statusCidResp{} + s.Cid = p.Cid.String() + s.Status = make(map[string]statusInfo) + for k, v := range p.Status { + s.Status[k.Pretty()] = statusInfo{ + Status: v.IPFS.String(), + } + } + return s +} + func sendStatusResponse(w http.ResponseWriter, resp RPCResponse) { - data := resp.Data.([]Pin) + data := resp.Data.([]GlobalPinInfo) pins := make(statusResp, 0, len(data)) + for _, d := range data { - pins = append(pins, statusCidResp{ - Cid: d.Cid.String(), - Status: d.Status.String(), - }) + pins = append(pins, transformPinToStatusCid(d)) } sendJSONResponse(w, 200, pins) } func sendStatusCidResponse(w http.ResponseWriter, resp RPCResponse) { - data := resp.Data.(Pin) - pin := statusCidResp{ - Cid: data.Cid.String(), - Status: data.Status.String(), - } - sendJSONResponse(w, 200, pin) + data := resp.Data.(GlobalPinInfo) + st := transformPinToStatusCid(data) + sendJSONResponse(w, 200, st) } diff --git a/rest_api_test.go b/rest_api_test.go index 12e69d13..f8ee5862 100644 --- a/rest_api_test.go +++ b/rest_api_test.go @@ -189,33 +189,74 @@ func TestStatusEndpoint(t *testing.T) { c3, _ := cid.Decode(testCid3) api := testClusterApi(t) defer api.Shutdown() - pList := []Pin{ - Pin{ - Status: PinError, - Cid: c, + pList := []GlobalPinInfo{ + GlobalPinInfo{ + Status: map[peer.ID]PinInfo{ + testPeerID: PinInfo{ + CidStr: testCid, + Peer: testPeerID, + IPFS: Pinned, + }, + }, + Cid: c, }, - Pin{ - Status: UnpinError, - Cid: c, + GlobalPinInfo{ + Status: map[peer.ID]PinInfo{ + testPeerID: PinInfo{ + CidStr: testCid, + Peer: testPeerID, + IPFS: Unpinned, + }, + }, + Cid: c2, }, - Pin{ - Status: Pinned, - Cid: c3, + GlobalPinInfo{ + Status: map[peer.ID]PinInfo{ + testPeerID: PinInfo{ + CidStr: testCid, + Peer: testPeerID, + IPFS: PinError, + }, + }, + + Cid: c3, }, - Pin{ - Status: Pinning, - Cid: c, + GlobalPinInfo{ + Status: map[peer.ID]PinInfo{ + testPeerID: PinInfo{ + CidStr: testCid, + Peer: testPeerID, + IPFS: UnpinError, + }, + }, + Cid: c, }, - Pin{ - Status: Unpinning, - Cid: c2, + GlobalPinInfo{ + Status: map[peer.ID]PinInfo{ + testPeerID: PinInfo{ + CidStr: testCid, + Peer: testPeerID, + IPFS: Unpinning, + }, + }, + Cid: c2, + }, + GlobalPinInfo{ + Status: map[peer.ID]PinInfo{ + testPeerID: PinInfo{ + CidStr: testCid, + Peer: testPeerID, + IPFS: Pinning, + }, + }, + Cid: c3, }, } simulateAnswer(api.RpcChan(), pList, nil) var resp statusResp makeGet(t, "/status", &resp) - if len(resp) != 5 { - t.Error("unexpected statusResp: ", resp) + if len(resp) != 6 { + t.Errorf("unexpected statusResp:\n %+v", resp) } } diff --git a/rpc.go b/rpc.go index 9ac08f5b..4064953e 100644 --- a/rpc.go +++ b/rpc.go @@ -10,8 +10,8 @@ const ( IPFSPinRPC IPFSUnpinRPC IPFSIsPinnedRPC - ConsensusAddPinRPC - ConsensusRmPinRPC + ConsensusLogPinRPC + ConsensusLogUnpinRPC VersionRPC MemberListRPC RollbackRPC @@ -21,6 +21,10 @@ const ( LocalSyncCidRPC GlobalSyncRPC GlobalSyncCidRPC + TrackRPC + UntrackRPC + TrackerLocalStatusRPC + TrackerLocalStatusCidRPC StatusRPC StatusCidRPC @@ -35,42 +39,27 @@ const ( WrappedRPCType ) -var RPCOpToType = map[int]int{ - PinRPC: CidRPCType, - UnpinRPC: CidRPCType, - PinListRPC: GenericRPCType, - IPFSPinRPC: CidRPCType, - IPFSUnpinRPC: CidRPCType, - IPFSIsPinnedRPC: CidRPCType, - ConsensusAddPinRPC: CidRPCType, - ConsensusRmPinRPC: CidRPCType, - VersionRPC: GenericRPCType, - MemberListRPC: GenericRPCType, - RollbackRPC: GenericRPCType, - LeaderRPC: WrappedRPCType, - BroadcastRPC: WrappedRPCType, - LocalSyncRPC: GenericRPCType, - LocalSyncCidRPC: CidRPCType, - GlobalSyncRPC: GenericRPCType, - GlobalSyncCidRPC: CidRPCType, - StatusRPC: GenericRPCType, - StatusCidRPC: CidRPCType, -} - -// RPCMethod identifies which RPC supported operation we are trying to make +// RPCOp identifies which RPC supported operation we are trying to make type RPCOp int +// RPCType identified which implementation of RPC we are using +type RPCType int + // RPC represents an internal RPC operation. It should be implemented // by all RPC types. type RPC interface { + // Op indicates which operation should be performed Op() RPCOp + // RType indicates which RPC implementation is used + RType() RPCType + // ResponseCh returns a channel to place the response for this RPC ResponseCh() chan RPCResponse } // baseRPC implements RPC and can be included as anonymous // field in other types. type baseRPC struct { - Type int + Type RPCType Method RPCOp RespChan chan RPCResponse } @@ -86,6 +75,10 @@ func (brpc *baseRPC) ResponseCh() chan RPCResponse { return brpc.RespChan } +func (brpc *baseRPC) RType() RPCType { + return brpc.Type +} + // GenericRPC is a ClusterRPC with generic arguments. type GenericRPC struct { baseRPC