From a97ed10d0b774f15e78f5e4cb78273af7305b245 Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Thu, 7 Apr 2022 13:53:30 +0200 Subject: [PATCH] Adopt api.Cid type - replaces cid.Cid everwhere. This commit introduces an api.Cid type and replaces the usage of cid.Cid everywhere. The main motivation here is to override MarshalJSON so that Cids are JSON-ified as '"Qm...."' instead of '{ "/": "Qm....." }', as this "ipld" representation of IDs is horrible to work with, and our APIs are not issuing IPLD objects to start with. Unfortunately, there is no way to do this cleanly, and the best way is to just switch everything to our own type. --- Makefile | 2 +- adder/adder.go | 50 +++++------ adder/adder_test.go | 4 +- adder/adderutils/adderutils.go | 3 +- adder/ipfsadd/add.go | 2 +- adder/sharding/dag_service.go | 17 ++-- adder/sharding/dag_service_test.go | 5 +- adder/sharding/shard.go | 5 +- adder/sharding/verify.go | 14 ++- adder/single/dag_service.go | 2 +- adder/util.go | 4 +- allocate.go | 7 +- allocator/balanced/balanced.go | 3 +- api/add.go | 4 +- api/common/api.go | 13 ++- api/common/test/helpers.go | 4 +- api/ipfsproxy/ipfsproxy.go | 14 ++- api/ipfsproxy/ipfsproxy_test.go | 16 ++-- api/pinsvcapi/pinsvc/pinsvc.go | 10 +-- api/pinsvcapi/pinsvcapi.go | 25 +++--- api/pinsvcapi/pinsvcapi_test.go | 8 +- api/rest/client/client.go | 13 ++- api/rest/client/lbclient.go | 13 ++- api/rest/client/methods.go | 17 ++-- api/rest/client/methods_test.go | 13 ++- api/rest/restapi.go | 9 +- api/rest/restapi_test.go | 3 +- api/types.go | 85 +++++++++++++------ api/types_test.go | 3 +- cluster.go | 41 +++++---- cluster_test.go | 23 +++-- cmd/ipfs-cluster-ctl/main.go | 13 ++- cmd/ipfs-cluster-follow/commands.go | 3 +- consensus/crdt/consensus.go | 5 +- consensus/crdt/consensus_test.go | 5 +- consensus/raft/consensus_test.go | 3 +- ipfscluster.go | 15 ++-- ipfscluster_test.go | 26 +++--- ipfsconn/ipfshttp/ipfshttp.go | 32 +++---- observations/metrics.go | 1 + peer_manager_test.go | 5 +- pintracker/optracker/operation.go | 4 +- pintracker/optracker/operationtracker.go | 25 +++--- pintracker/optracker/operationtracker_test.go | 3 +- pintracker/pintracker_test.go | 11 ++- pintracker/stateless/stateless.go | 9 +- pintracker/stateless/stateless_test.go | 7 +- rpc_api.go | 21 +++-- sharness/t0030-ctl-pin.sh | 6 +- sharness/t0052-service-state-export.sh | 4 +- state/dsstate/datastore.go | 22 ++--- state/dsstate/datastore_test.go | 3 +- state/empty.go | 6 +- state/interface.go | 8 +- test/cids.go | 20 ++--- test/ipfs_mock.go | 18 ++-- test/rpc_api_mock.go | 22 ++--- test/sharding.go | 3 +- util.go | 3 +- version/version.go | 1 + 60 files changed, 369 insertions(+), 367 deletions(-) diff --git a/Makefile b/Makefile index 225eeae7..1d997e89 100644 --- a/Makefile +++ b/Makefile @@ -29,7 +29,7 @@ follow: check: go vet ./... - staticcheck ./... + staticcheck --checks all ./... test: go test -v ./... diff --git a/adder/adder.go b/adder/adder.go index 1d5cc637..6b93656e 100644 --- a/adder/adder.go +++ b/adder/adder.go @@ -42,7 +42,7 @@ type ClusterDAGService interface { ipld.DAGService // Finalize receives the IPFS content root CID as // returned by the ipfs adder. - Finalize(ctx context.Context, ipfsRoot cid.Cid) (cid.Cid, error) + Finalize(ctx context.Context, ipfsRoot api.Cid) (api.Cid, error) // Allocations returns the allocations made by the cluster DAG service // for the added content. Allocations() []peer.ID @@ -51,7 +51,7 @@ type ClusterDAGService interface { // A dagFormatter can create dags from files.Node. It can keep state // to add several files to the same dag. type dagFormatter interface { - Add(name string, f files.Node) (cid.Cid, error) + Add(name string, f files.Node) (api.Cid, error) } // Adder is used to add content to IPFS Cluster using an implementation of @@ -103,12 +103,12 @@ func (a *Adder) setContext(ctx context.Context) { // FromMultipart adds content from a multipart.Reader. The adder will // no longer be usable after calling this method. -func (a *Adder) FromMultipart(ctx context.Context, r *multipart.Reader) (cid.Cid, error) { +func (a *Adder) FromMultipart(ctx context.Context, r *multipart.Reader) (api.Cid, error) { logger.Debugf("adding from multipart with params: %+v", a.params) f, err := files.NewFileFromPartReader(r, "multipart/form-data") if err != nil { - return cid.Undef, err + return api.CidUndef, err } defer f.Close() return a.FromFiles(ctx, f) @@ -116,12 +116,12 @@ func (a *Adder) FromMultipart(ctx context.Context, r *multipart.Reader) (cid.Cid // FromFiles adds content from a files.Directory. The adder will no longer // be usable after calling this method. -func (a *Adder) FromFiles(ctx context.Context, f files.Directory) (cid.Cid, error) { +func (a *Adder) FromFiles(ctx context.Context, f files.Directory) (api.Cid, error) { logger.Debug("adding from files") a.setContext(ctx) if a.ctx.Err() != nil { // don't allow running twice - return cid.Undef, a.ctx.Err() + return api.CidUndef, a.ctx.Err() } defer a.cancel() @@ -139,7 +139,7 @@ func (a *Adder) FromFiles(ctx context.Context, f files.Directory) (cid.Cid, erro err = errors.New("bad dag formatter option") } if err != nil { - return cid.Undef, err + return api.CidUndef, err } // setup wrapping @@ -150,18 +150,18 @@ func (a *Adder) FromFiles(ctx context.Context, f files.Directory) (cid.Cid, erro } it := f.Entries() - var adderRoot cid.Cid + var adderRoot api.Cid for it.Next() { select { case <-a.ctx.Done(): - return cid.Undef, a.ctx.Err() + return api.CidUndef, a.ctx.Err() default: logger.Debugf("ipfsAdder AddFile(%s)", it.Name()) adderRoot, err = dagFmtr.Add(it.Name(), it.Node()) if err != nil { logger.Error("error adding to cluster: ", err) - return cid.Undef, err + return api.CidUndef, err } } // TODO (hector): We can only add a single CAR file for the @@ -171,13 +171,13 @@ func (a *Adder) FromFiles(ctx context.Context, f files.Directory) (cid.Cid, erro } } if it.Err() != nil { - return cid.Undef, it.Err() + return api.CidUndef, it.Err() } clusterRoot, err := a.dgs.Finalize(a.ctx, adderRoot) if err != nil { logger.Error("error finalizing adder:", err) - return cid.Undef, err + return api.CidUndef, err } logger.Infof("%s successfully added to cluster", clusterRoot) return clusterRoot, nil @@ -220,7 +220,7 @@ func newIpfsAdder(ctx context.Context, dgs ClusterDAGService, params api.AddPara }, nil } -func (ia *ipfsAdder) Add(name string, f files.Node) (cid.Cid, error) { +func (ia *ipfsAdder) Add(name string, f files.Node) (api.Cid, error) { // In order to set the AddedOutput names right, we use // OutputPrefix: // @@ -239,9 +239,9 @@ func (ia *ipfsAdder) Add(name string, f files.Node) (cid.Cid, error) { nd, err := ia.AddAllAndPin(f) if err != nil { - return cid.Undef, err + return api.CidUndef, err } - return nd.Cid(), nil + return api.NewCid(nd.Cid()), nil } // An adder to add CAR files. It is at the moment very basic, and can @@ -267,22 +267,22 @@ func newCarAdder(ctx context.Context, dgs ClusterDAGService, params api.AddParam // Add takes a node which should be a CAR file and nothing else and // adds its blocks using the ClusterDAGService. -func (ca *carAdder) Add(name string, fn files.Node) (cid.Cid, error) { +func (ca *carAdder) Add(name string, fn files.Node) (api.Cid, error) { if ca.params.Wrap { - return cid.Undef, errors.New("cannot wrap a CAR file upload") + return api.CidUndef, errors.New("cannot wrap a CAR file upload") } f, ok := fn.(files.File) if !ok { - return cid.Undef, errors.New("expected CAR file is not of type file") + return api.CidUndef, errors.New("expected CAR file is not of type file") } carReader, err := car.NewCarReader(f) if err != nil { - return cid.Undef, err + return api.CidUndef, err } if len(carReader.Header.Roots) != 1 { - return cid.Undef, errors.New("only CAR files with a single root are supported") + return api.CidUndef, errors.New("only CAR files with a single root are supported") } root := carReader.Header.Roots[0] @@ -292,7 +292,7 @@ func (ca *carAdder) Add(name string, fn files.Node) (cid.Cid, error) { for { block, err := carReader.Next() if err != nil && err != io.EOF { - return cid.Undef, err + return api.CidUndef, err } else if block == nil { break } @@ -301,7 +301,7 @@ func (ca *carAdder) Add(name string, fn files.Node) (cid.Cid, error) { nd, err := ipld.Decode(block) if err != nil { - return cid.Undef, err + return api.CidUndef, err } // If the root is in the CAR and the root is a UnixFS @@ -315,17 +315,17 @@ func (ca *carAdder) Add(name string, fn files.Node) (cid.Cid, error) { err = ca.dgs.Add(ca.ctx, nd) if err != nil { - return cid.Undef, err + return api.CidUndef, err } } ca.output <- api.AddedOutput{ Name: name, - Cid: root, + Cid: api.NewCid(root), Bytes: bytes, Size: size, Allocations: ca.dgs.Allocations(), } - return root, nil + return api.NewCid(root), nil } diff --git a/adder/adder_test.go b/adder/adder_test.go index 7d9383ae..61d39984 100644 --- a/adder/adder_test.go +++ b/adder/adder_test.go @@ -28,7 +28,7 @@ func newMockCDAGServ() *mockCDAGServ { } // noop -func (dag *mockCDAGServ) Finalize(ctx context.Context, root cid.Cid) (cid.Cid, error) { +func (dag *mockCDAGServ) Finalize(ctx context.Context, root api.Cid) (api.Cid, error) { return root, nil } @@ -152,7 +152,7 @@ func TestAdder_CAR(t *testing.T) { t.Fatal(err) } var carBuf bytes.Buffer - err = car.WriteCar(ctx, dags, []cid.Cid{root}, &carBuf) + err = car.WriteCar(ctx, dags, []cid.Cid{root.Cid}, &carBuf) if err != nil { t.Fatal(err) } diff --git a/adder/adderutils/adderutils.go b/adder/adderutils/adderutils.go index 94c98d06..0a181d91 100644 --- a/adder/adderutils/adderutils.go +++ b/adder/adderutils/adderutils.go @@ -13,7 +13,6 @@ import ( "github.com/ipfs/ipfs-cluster/adder/single" "github.com/ipfs/ipfs-cluster/api" - cid "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" rpc "github.com/libp2p/go-libp2p-gorpc" ) @@ -31,7 +30,7 @@ func AddMultipartHTTPHandler( reader *multipart.Reader, w http.ResponseWriter, outputTransform func(api.AddedOutput) interface{}, -) (cid.Cid, error) { +) (api.Cid, error) { var dags adder.ClusterDAGService output := make(chan api.AddedOutput, 200) diff --git a/adder/ipfsadd/add.go b/adder/ipfsadd/add.go index 31e617de..f25c3f77 100644 --- a/adder/ipfsadd/add.go +++ b/adder/ipfsadd/add.go @@ -446,7 +446,7 @@ func (adder *Adder) outputDagnode(out chan api.AddedOutput, name string, dn ipld name = filepath.Join(adder.OutputPrefix, name) out <- api.AddedOutput{ - Cid: dn.Cid(), + Cid: api.NewCid(dn.Cid()), Name: name, Size: s, Allocations: adder.allocsFun(), diff --git a/adder/sharding/dag_service.go b/adder/sharding/dag_service.go index 0ffe971d..da343478 100644 --- a/adder/sharding/dag_service.go +++ b/adder/sharding/dag_service.go @@ -79,13 +79,13 @@ func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error { // Finalize finishes sharding, creates the cluster DAG and pins it along // with the meta pin for the root node of the content. -func (dgs *DAGService) Finalize(ctx context.Context, dataRoot cid.Cid) (cid.Cid, error) { +func (dgs *DAGService) Finalize(ctx context.Context, dataRoot api.Cid) (api.Cid, error) { lastCid, err := dgs.flushCurrentShard(ctx) if err != nil { - return lastCid, err + return api.NewCid(lastCid), err } - if !lastCid.Equals(dataRoot) { + if !lastCid.Equals(dataRoot.Cid) { logger.Warnf("the last added CID (%s) is not the IPFS data root (%s). This is only normal when adding a single file without wrapping in directory.", lastCid, dataRoot) } @@ -124,13 +124,13 @@ func (dgs *DAGService) Finalize(ctx context.Context, dataRoot cid.Cid) (cid.Cid, dgs.sendOutput(api.AddedOutput{ Name: fmt.Sprintf("%s-clusterDAG", dgs.addParams.Name), - Cid: clusterDAG, + Cid: api.NewCid(clusterDAG), Size: dgs.totalSize, Allocations: nil, }) // Pin the ClusterDAG - clusterDAGPin := api.PinWithOpts(clusterDAG, dgs.addParams.PinOptions) + clusterDAGPin := api.PinWithOpts(api.NewCid(clusterDAG), dgs.addParams.PinOptions) clusterDAGPin.ReplicationFactorMin = -1 clusterDAGPin.ReplicationFactorMax = -1 clusterDAGPin.MaxDepth = 0 // pin direct @@ -146,7 +146,8 @@ func (dgs *DAGService) Finalize(ctx context.Context, dataRoot cid.Cid) (cid.Cid, // Pin the META pin metaPin := api.PinWithOpts(dataRoot, dgs.addParams.PinOptions) metaPin.Type = api.MetaType - metaPin.Reference = &clusterDAG + ref := api.NewCid(clusterDAG) + metaPin.Reference = &ref metaPin.MaxDepth = 0 // irrelevant. Meta-pins are not pinned err = adder.Pin(ctx, dgs.rpcClient, metaPin) if err != nil { @@ -238,7 +239,7 @@ func (dgs *DAGService) ingestBlock(ctx context.Context, n ipld.Node) error { return dgs.ingestBlock(ctx, n) // <-- retry ingest } -func (dgs *DAGService) logStats(metaPin, clusterDAGPin cid.Cid) { +func (dgs *DAGService) logStats(metaPin, clusterDAGPin api.Cid) { duration := time.Since(dgs.startTime) seconds := uint64(duration) / uint64(time.Second) var rate string @@ -294,7 +295,7 @@ func (dgs *DAGService) flushCurrentShard(ctx context.Context) (cid.Cid, error) { dgs.currentShard = nil dgs.sendOutput(api.AddedOutput{ Name: fmt.Sprintf("shard-%d", lens), - Cid: shardCid, + Cid: api.NewCid(shardCid), Size: shard.Size(), Allocations: shard.Allocations(), }) diff --git a/adder/sharding/dag_service_test.go b/adder/sharding/dag_service_test.go index fcb52293..2128e456 100644 --- a/adder/sharding/dag_service_test.go +++ b/adder/sharding/dag_service_test.go @@ -11,7 +11,6 @@ import ( "github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/test" - cid "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" peer "github.com/libp2p/go-libp2p-core/peer" rpc "github.com/libp2p/go-libp2p-gorpc" @@ -51,7 +50,7 @@ func (rpcs *testRPC) BlockAllocate(ctx context.Context, in api.Pin, out *[]peer. return nil } -func (rpcs *testRPC) PinGet(ctx context.Context, c cid.Cid) (api.Pin, error) { +func (rpcs *testRPC) PinGet(ctx context.Context, c api.Cid) (api.Pin, error) { pI, ok := rpcs.pins.Load(c.String()) if !ok { return api.Pin{}, errors.New("not found") @@ -59,7 +58,7 @@ func (rpcs *testRPC) PinGet(ctx context.Context, c cid.Cid) (api.Pin, error) { return pI.(api.Pin), nil } -func (rpcs *testRPC) BlockGet(ctx context.Context, c cid.Cid) ([]byte, error) { +func (rpcs *testRPC) BlockGet(ctx context.Context, c api.Cid) ([]byte, error) { bI, ok := rpcs.blocks.Load(c.String()) if !ok { return nil, errors.New("not found") diff --git a/adder/sharding/shard.go b/adder/sharding/shard.go index cd3d5549..cb1ae37c 100644 --- a/adder/sharding/shard.go +++ b/adder/sharding/shard.go @@ -122,12 +122,13 @@ func (sh *shard) Flush(ctx context.Context, shardN int, prev cid.Cid) (cid.Cid, } rootCid := nodes[0].Cid() - pin := api.PinWithOpts(rootCid, sh.pinOptions) + pin := api.PinWithOpts(api.NewCid(rootCid), sh.pinOptions) pin.Name = fmt.Sprintf("%s-shard-%d", sh.pinOptions.Name, shardN) // this sets allocations as priority allocation pin.Allocations = sh.allocations pin.Type = api.ShardType - pin.Reference = &prev + ref := api.NewCid(prev) + pin.Reference = &ref pin.MaxDepth = 1 pin.ShardSize = sh.Size() // use current size, not the limit if len(nodes) > len(sh.dagNode)+1 { // using an indirect graph diff --git a/adder/sharding/verify.go b/adder/sharding/verify.go index 24a506cd..a588669c 100644 --- a/adder/sharding/verify.go +++ b/adder/sharding/verify.go @@ -7,26 +7,24 @@ import ( "testing" "github.com/ipfs/ipfs-cluster/api" - - cid "github.com/ipfs/go-cid" ) // MockPinStore is used in VerifyShards type MockPinStore interface { // Gets a pin - PinGet(context.Context, cid.Cid) (api.Pin, error) + PinGet(context.Context, api.Cid) (api.Pin, error) } // MockBlockStore is used in VerifyShards type MockBlockStore interface { // Gets a block - BlockGet(context.Context, cid.Cid) ([]byte, error) + BlockGet(context.Context, api.Cid) ([]byte, error) } // VerifyShards checks that a sharded CID has been correctly formed and stored. // This is a helper function for testing. It returns a map with all the blocks // from all shards. -func VerifyShards(t *testing.T, rootCid cid.Cid, pins MockPinStore, ipfs MockBlockStore, expectedShards int) (map[string]struct{}, error) { +func VerifyShards(t *testing.T, rootCid api.Cid, pins MockPinStore, ipfs MockBlockStore, expectedShards int) (map[string]struct{}, error) { ctx := context.Background() metaPin, err := pins.PinGet(ctx, rootCid) if err != nil { @@ -69,7 +67,7 @@ func VerifyShards(t *testing.T, rootCid cid.Cid, pins MockPinStore, ipfs MockBlo } shardBlocks := make(map[string]struct{}) - var ref cid.Cid + var ref api.Cid // traverse shards in order for i := 0; i < len(shards); i++ { sh, _, err := clusterDAGNode.ResolveLink([]string{fmt.Sprintf("%d", i)}) @@ -77,12 +75,12 @@ func VerifyShards(t *testing.T, rootCid cid.Cid, pins MockPinStore, ipfs MockBlo return nil, err } - shardPin, err := pins.PinGet(ctx, sh.Cid) + shardPin, err := pins.PinGet(ctx, api.NewCid(sh.Cid)) if err != nil { return nil, fmt.Errorf("shard was not pinned: %s %s", sh.Cid, err) } - if ref != cid.Undef && !shardPin.Reference.Equals(ref) { + if ref != api.CidUndef && !shardPin.Reference.Equals(ref) { t.Errorf("Ref (%s) should point to previous shard (%s)", ref, shardPin.Reference) } ref = shardPin.Cid diff --git a/adder/single/dag_service.go b/adder/single/dag_service.go index 13b5b9cb..005fd186 100644 --- a/adder/single/dag_service.go +++ b/adder/single/dag_service.go @@ -110,7 +110,7 @@ func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error { } // Finalize pins the last Cid added to this DAGService. -func (dgs *DAGService) Finalize(ctx context.Context, root cid.Cid) (cid.Cid, error) { +func (dgs *DAGService) Finalize(ctx context.Context, root api.Cid) (api.Cid, error) { close(dgs.blocks) select { diff --git a/adder/util.go b/adder/util.go index 26ad48ca..f647468a 100644 --- a/adder/util.go +++ b/adder/util.go @@ -110,7 +110,7 @@ func IpldNodeToNodeWithMeta(n ipld.Node) api.NodeWithMeta { } return api.NodeWithMeta{ - Cid: n.Cid(), + Cid: api.NewCid(n.Cid()), Data: n.RawData(), CumSize: size, } @@ -125,7 +125,7 @@ func BlockAllocate(ctx context.Context, rpc *rpc.Client, pinOpts api.PinOptions) "", "Cluster", "BlockAllocate", - api.PinWithOpts(cid.Undef, pinOpts), + api.PinWithOpts(api.CidUndef, pinOpts), &allocsStr, ) return allocsStr, err diff --git a/allocate.go b/allocate.go index 2a632271..8efeccab 100644 --- a/allocate.go +++ b/allocate.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" - cid "github.com/ipfs/go-cid" peer "github.com/libp2p/go-libp2p-core/peer" "go.opencensus.io/trace" @@ -57,7 +56,7 @@ type classifiedMetrics struct { // into account if the given CID was previously in a "pin everywhere" mode, // and will consider such Pins as currently unallocated ones, providing // new allocations as available. -func (c *Cluster) allocate(ctx context.Context, hash cid.Cid, currentPin api.Pin, rplMin, rplMax int, blacklist []peer.ID, priorityList []peer.ID) ([]peer.ID, error) { +func (c *Cluster) allocate(ctx context.Context, hash api.Cid, currentPin api.Pin, rplMin, rplMax int, blacklist []peer.ID, priorityList []peer.ID) ([]peer.ID, error) { ctx, span := trace.StartSpan(ctx, "cluster/allocate") defer span.End() @@ -180,7 +179,7 @@ func (c *Cluster) filterMetrics(ctx context.Context, mSet api.MetricsSet, numMet } // allocationError logs an allocation error -func allocationError(hash cid.Cid, needed, wanted int, candidatesValid []peer.ID) error { +func allocationError(hash api.Cid, needed, wanted int, candidatesValid []peer.ID) error { logger.Errorf("Not enough candidates to allocate %s:", hash) logger.Errorf(" Needed: %d", needed) logger.Errorf(" Wanted: %d", wanted) @@ -198,7 +197,7 @@ func allocationError(hash cid.Cid, needed, wanted int, candidatesValid []peer.ID func (c *Cluster) obtainAllocations( ctx context.Context, - hash cid.Cid, + hash api.Cid, rplMin, rplMax int, metrics classifiedMetrics, ) ([]peer.ID, error) { diff --git a/allocator/balanced/balanced.go b/allocator/balanced/balanced.go index 4a573e1b..83c408a5 100644 --- a/allocator/balanced/balanced.go +++ b/allocator/balanced/balanced.go @@ -12,7 +12,6 @@ import ( "fmt" "sort" - cid "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" api "github.com/ipfs/ipfs-cluster/api" peer "github.com/libp2p/go-libp2p-core/peer" @@ -254,7 +253,7 @@ func (pnedm *partitionedMetric) chooseNext() peer.ID { // - Third, based on the AllocateBy order, it select the first metric func (a *Allocator) Allocate( ctx context.Context, - c cid.Cid, + c api.Cid, current, candidates, priority api.MetricsSet, ) ([]peer.ID, error) { diff --git a/api/add.go b/api/add.go index fceea629..b0dcd782 100644 --- a/api/add.go +++ b/api/add.go @@ -17,7 +17,7 @@ var DefaultShardSize = uint64(100 * 1024 * 1024) // 100 MB // indicating a node of a file has been added. type AddedOutput struct { Name string `json:"name" codec:"n,omitempty"` - Cid cid.Cid `json:"cid" codec:"c"` + Cid Cid `json:"cid" codec:"c"` Bytes uint64 `json:"bytes,omitempty" codec:"b,omitempty"` Size uint64 `json:"size,omitempty" codec:"s,omitempty"` Allocations []peer.ID `json:"allocations,omitempty" codec:"a,omitempty"` @@ -120,7 +120,7 @@ func AddParamsFromQuery(query url.Values) (AddParams, error) { return params, err } params.PinOptions = *opts - params.PinUpdate = cid.Undef // hardcode as does not make sense for adding + params.PinUpdate.Cid = cid.Undef // hardcode as does not make sense for adding layout := query.Get("layout") switch layout { diff --git a/api/common/api.go b/api/common/api.go index 094c439c..79cddbfa 100644 --- a/api/common/api.go +++ b/api/common/api.go @@ -27,7 +27,6 @@ import ( "sync" "time" - cid "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" gopath "github.com/ipfs/go-path" types "github.com/ipfs/ipfs-cluster/api" @@ -69,8 +68,8 @@ var ( ErrHTTPEndpointNotEnabled = errors.New("the HTTP endpoint is not enabled") ) -// When passed to SendResponse(), it will figure out which http status -// to set by itself. +// SetStatusAutomatically can be passed to SendResponse(), so that it will +// figure out which http status to set by itself. const SetStatusAutomatically = -1 // API implements an API and aims to provides @@ -496,12 +495,12 @@ func (api *API) ParsePinPathOrFail(w http.ResponseWriter, r *http.Request) types return pinPath } -// ParsePidOrFail parses a Cid and returns it or makes the request fail. +// ParseCidOrFail parses a Cid and returns it or makes the request fail. func (api *API) ParseCidOrFail(w http.ResponseWriter, r *http.Request) types.Pin { vars := mux.Vars(r) hash := vars["hash"] - c, err := cid.Decode(hash) + c, err := types.DecodeCid(hash) if err != nil { api.SendResponse(w, http.StatusBadRequest, errors.New("error decoding Cid: "+err.Error()), nil) return types.Pin{} @@ -697,8 +696,8 @@ func (api *API) Headers() map[string][]string { return api.config.Headers } -// Controls the HTTP server Keep Alive settings. -// Useful for testing. +// SetKeepAlivesEnabled controls the HTTP server Keep Alive settings. Useful +// for testing. func (api *API) SetKeepAlivesEnabled(b bool) { api.server.SetKeepAlivesEnabled(b) } diff --git a/api/common/test/helpers.go b/api/common/test/helpers.go index 833fa09c..ac0e1cd0 100644 --- a/api/common/test/helpers.go +++ b/api/common/test/helpers.go @@ -122,7 +122,7 @@ func CheckHeaders(t *testing.T, expected map[string][]string, url string, header } } -// This represents what an API is to us. +// API represents what an API is to us. type API interface { HTTPAddresses() ([]string, error) Host() host.Host @@ -286,7 +286,7 @@ func BothEndpoints(t *testing.T, test Func) { }) } -// HTTPSEndpoint runs the given test.Func against an HTTPs endpoint. +// HTTPSEndPoint runs the given test.Func against an HTTPs endpoint. func HTTPSEndPoint(t *testing.T, test Func) { t.Run("in-parallel", func(t *testing.T) { t.Run("https", func(t *testing.T) { diff --git a/api/ipfsproxy/ipfsproxy.go b/api/ipfsproxy/ipfsproxy.go index c5de02ae..592a1b77 100644 --- a/api/ipfsproxy/ipfsproxy.go +++ b/api/ipfsproxy/ipfsproxy.go @@ -1,3 +1,9 @@ +// Package ipfsproxy implements the Cluster API interface by providing an +// IPFS HTTP interface as exposed by the go-ipfs daemon. +// +// In this API, select endpoints like pin*, add*, and repo* endpoints are used +// to instead perform cluster operations. Requests for any other endpoints are +// passed to the underlying IPFS daemon. package ipfsproxy import ( @@ -396,7 +402,7 @@ func (proxy *Server) pinLsHandler(w http.ResponseWriter, r *http.Request) { } if arg != "" { - c, err := cid.Decode(arg) + c, err := api.DecodeCid(arg) if err != nil { ipfsErrorResponder(w, err.Error(), -1) return @@ -526,7 +532,7 @@ func (proxy *Server) pinUpdateHandler(w http.ResponseWriter, r *http.Request) { } // Resolve the FROM argument - var fromCid cid.Cid + var fromCid api.Cid err = proxy.rpcClient.CallContext( ctx, "", @@ -733,9 +739,9 @@ func (proxy *Server) repoGCHandler(w http.ResponseWriter, r *http.Request) { for _, gc := range repoGC.PeerMap { for _, key := range gc.Keys { if streamErrors { - ipfsRepoGC = ipfsRepoGCResp{Key: key.Key, Error: key.Error} + ipfsRepoGC = ipfsRepoGCResp{Key: key.Key.Cid, Error: key.Error} } else { - ipfsRepoGC = ipfsRepoGCResp{Key: key.Key} + ipfsRepoGC = ipfsRepoGCResp{Key: key.Key.Cid} if key.Error != "" { mError.add(key.Error) } diff --git a/api/ipfsproxy/ipfsproxy_test.go b/api/ipfsproxy/ipfsproxy_test.go index 674a3696..d112cf07 100644 --- a/api/ipfsproxy/ipfsproxy_test.go +++ b/api/ipfsproxy/ipfsproxy_test.go @@ -13,8 +13,6 @@ import ( "testing" "time" - cid "github.com/ipfs/go-cid" - "github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/test" @@ -100,7 +98,7 @@ func TestIPFSProxyPin(t *testing.T) { tests := []struct { name string args args - want cid.Cid + want api.Cid wantErr bool }{ { @@ -140,7 +138,7 @@ func TestIPFSProxyPin(t *testing.T) { test.ErrorCid.String(), http.StatusInternalServerError, }, - cid.Undef, + api.CidUndef, true, }, { @@ -150,7 +148,7 @@ func TestIPFSProxyPin(t *testing.T) { test.ErrorCid.String(), http.StatusInternalServerError, }, - cid.Undef, + api.CidUndef, true, }, } @@ -218,7 +216,7 @@ func TestIPFSProxyUnpin(t *testing.T) { tests := []struct { name string args args - want cid.Cid + want api.Cid wantErr bool }{ { @@ -258,7 +256,7 @@ func TestIPFSProxyUnpin(t *testing.T) { test.ErrorCid.String(), http.StatusInternalServerError, }, - cid.Undef, + api.CidUndef, true, }, { @@ -268,7 +266,7 @@ func TestIPFSProxyUnpin(t *testing.T) { test.ErrorCid.String(), http.StatusInternalServerError, }, - cid.Undef, + api.CidUndef, true, }, } @@ -583,7 +581,7 @@ func TestProxyRepoGC(t *testing.T) { repoGC = append(repoGC, resp) } - if !repoGC[0].Key.Equals(test.Cid1) { + if !repoGC[0].Key.Equals(test.Cid1.Cid) { t.Errorf("expected a different cid, expected: %s, found: %s", test.Cid1, repoGC[0].Key) } diff --git a/api/pinsvcapi/pinsvc/pinsvc.go b/api/pinsvcapi/pinsvc/pinsvc.go index b14f0497..3baa5043 100644 --- a/api/pinsvcapi/pinsvc/pinsvc.go +++ b/api/pinsvcapi/pinsvc/pinsvc.go @@ -10,7 +10,6 @@ import ( "strings" "time" - "github.com/ipfs/go-cid" types "github.com/ipfs/ipfs-cluster/api" ) @@ -54,7 +53,7 @@ func (pname *PinName) UnmarshalJSON(data []byte) error { // Pin contains basic information about a Pin and pinning options. type Pin struct { - Cid string `json:"cid"` // a cid.Cid does not json properly + Cid types.Cid `json:"cid"` Name PinName `json:"name"` Origins []types.Multiaddr `json:"origins"` Meta map[string]string `json:"meta"` @@ -62,7 +61,7 @@ type Pin struct { // Defined returns if the pinis empty (Cid not set). func (p Pin) Defined() bool { - return p.Cid != "" + return p.Cid.Defined() } // MatchesName returns in a pin status matches a name option with a given @@ -233,7 +232,7 @@ type PinList struct { // ListOptions represents possible options given to the List endpoint. type ListOptions struct { - Cids []cid.Cid + Cids []types.Cid Name string MatchingStrategy MatchingStrategy Status Status @@ -243,11 +242,12 @@ type ListOptions struct { Meta map[string]string } +// FromQuery parses ListOptions from url.Values. func (lo *ListOptions) FromQuery(q url.Values) error { cidq := q.Get("cid") if len(cidq) > 0 { for _, cstr := range strings.Split(cidq, ",") { - c, err := cid.Decode(cstr) + c, err := types.DecodeCid(cstr) if err != nil { return fmt.Errorf("error decoding cid %s: %w", cstr, err) } diff --git a/api/pinsvcapi/pinsvcapi.go b/api/pinsvcapi/pinsvcapi.go index 81431c67..1490cee2 100644 --- a/api/pinsvcapi/pinsvcapi.go +++ b/api/pinsvcapi/pinsvcapi.go @@ -15,7 +15,6 @@ import ( "sync" "github.com/gorilla/mux" - "github.com/ipfs/go-cid" types "github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/api/common" "github.com/ipfs/ipfs-cluster/api/pinsvcapi/pinsvc" @@ -79,11 +78,7 @@ func svcPinToClusterPin(p pinsvc.Pin) (types.Pin, error) { Metadata: p.Meta, Mode: types.PinModeRecursive, } - c, err := cid.Decode(p.Cid) - if err != nil { - return types.Pin{}, err - } - return types.PinWithOpts(c, opts), nil + return types.PinWithOpts(p.Cid, opts), nil } func globalPinInfoToSvcPinStatus( @@ -103,7 +98,7 @@ func globalPinInfoToSvcPinStatus( status.Status = trackerStatusToSvcStatus(statusMask) status.Created = gpi.Created status.Pin = pinsvc.Pin{ - Cid: gpi.Cid.String(), + Cid: gpi.Cid, Name: pinsvc.PinName(gpi.Name), Origins: gpi.Origins, Meta: gpi.Metadata, @@ -131,7 +126,7 @@ func NewAPI(ctx context.Context, cfg *Config) (*API, error) { return NewAPIWithHost(ctx, cfg, nil) } -// NewAPI creates a new REST API component using the given libp2p Host. +// NewAPIWithHost creates a new REST API component using the given libp2p Host. func NewAPIWithHost(ctx context.Context, cfg *Config, h host.Host) (*API, error) { api := API{ config: cfg, @@ -191,13 +186,13 @@ func (api *API) parseBodyOrFail(w http.ResponseWriter, r *http.Request) pinsvc.P return pin } -func (api *API) parseRequestIDOrFail(w http.ResponseWriter, r *http.Request) (cid.Cid, bool) { +func (api *API) parseRequestIDOrFail(w http.ResponseWriter, r *http.Request) (types.Cid, bool) { vars := mux.Vars(r) cStr, ok := vars["requestID"] if !ok { - return cid.Undef, true + return types.CidUndef, true } - c, err := cid.Decode(cStr) + c, err := types.DecodeCid(cStr) if err != nil { api.SendResponse(w, http.StatusBadRequest, errors.New("error decoding requestID: "+err.Error()), nil) return c, false @@ -233,12 +228,12 @@ func (api *API) addPin(w http.ResponseWriter, r *http.Request) { return } - status := api.pinToSvcPinStatus(r.Context(), pin.Cid, pinObj) + status := api.pinToSvcPinStatus(r.Context(), pin.Cid.String(), pinObj) api.SendResponse(w, common.SetStatusAutomatically, nil, status) } } -func (api *API) getPinSvcStatus(ctx context.Context, c cid.Cid) (pinsvc.PinStatus, error) { +func (api *API) getPinSvcStatus(ctx context.Context, c types.Cid) (pinsvc.PinStatus, error) { var pinInfo types.GlobalPinInfo err := api.rpcClient.CallContext( @@ -318,7 +313,7 @@ func (api *API) listPins(w http.ResponseWriter, r *http.Request) { }() for _, ci := range opts.Cids { - go func(c cid.Cid) { + go func(c types.Cid) { defer wg.Done() st, err := api.getPinSvcStatus(r.Context(), c) stCh <- statusResult{st: st, err: err} @@ -410,7 +405,7 @@ func (api *API) pinToSvcPinStatus(ctx context.Context, rID string, pin types.Pin Status: pinsvc.StatusQueued, Created: pin.Timestamp, Pin: pinsvc.Pin{ - Cid: pin.Cid.String(), + Cid: pin.Cid, Name: pinsvc.PinName(pin.Name), Origins: pin.Origins, Meta: pin.Metadata, diff --git a/api/pinsvcapi/pinsvcapi_test.go b/api/pinsvcapi/pinsvcapi_test.go index fb33761e..452e4a0b 100644 --- a/api/pinsvcapi/pinsvcapi_test.go +++ b/api/pinsvcapi/pinsvcapi_test.go @@ -68,7 +68,7 @@ func TestAPIListEndpoint(t *testing.T) { } results := resp.Results - if results[0].Pin.Cid != clustertest.Cid1.String() || + if !results[0].Pin.Cid.Equals(clustertest.Cid1) || results[1].Status != pinsvc.StatusPinning { t.Errorf("unexpected statusAll resp: %+v", results) } @@ -161,7 +161,7 @@ func TestAPIPinEndpoint(t *testing.T) { tf := func(t *testing.T, url test.URLFunc) { // test normal pin pin := pinsvc.Pin{ - Cid: clustertest.Cid3.String(), + Cid: clustertest.Cid3, Name: "testname", Origins: []api.Multiaddr{ ma, @@ -192,7 +192,7 @@ func TestAPIPinEndpoint(t *testing.T) { var errName pinsvc.APIError pin2 := pinsvc.Pin{ - Cid: clustertest.Cid1.String(), + Cid: clustertest.Cid1, Name: pinsvc.PinName(make([]byte, 256)), } pinJSON, err = json.Marshal(pin2) @@ -218,7 +218,7 @@ func TestAPIGetPinEndpoint(t *testing.T) { var status pinsvc.PinStatus test.MakeGet(t, svcapi, url(svcapi)+"/pins/"+clustertest.Cid1.String(), &status) - if status.Pin.Cid != clustertest.Cid1.String() { + if !status.Pin.Cid.Equals(clustertest.Cid1) { t.Error("Cid should be set") } diff --git a/api/rest/client/client.go b/api/rest/client/client.go index a49f704c..f43e01ed 100644 --- a/api/rest/client/client.go +++ b/api/rest/client/client.go @@ -12,7 +12,6 @@ import ( "github.com/ipfs/ipfs-cluster/api" - cid "github.com/ipfs/go-cid" shell "github.com/ipfs/go-ipfs-api" files "github.com/ipfs/go-ipfs-files" logging "github.com/ipfs/go-log/v2" @@ -64,9 +63,9 @@ type Client interface { // Pin tracks a Cid with the given replication factor and a name for // human-friendliness. - Pin(ctx context.Context, ci cid.Cid, opts api.PinOptions) (api.Pin, error) + Pin(ctx context.Context, ci api.Cid, opts api.PinOptions) (api.Pin, error) // Unpin untracks a Cid from cluster. - Unpin(ctx context.Context, ci cid.Cid) (api.Pin, error) + Unpin(ctx context.Context, ci api.Cid) (api.Pin, error) // PinPath resolves given path into a cid and performs the pin operation. PinPath(ctx context.Context, path string, opts api.PinOptions) (api.Pin, error) @@ -78,21 +77,21 @@ type Client interface { // and the peers that should be pinning them. Allocations(ctx context.Context, filter api.PinType, out chan<- api.Pin) error // Allocation returns the current allocations for a given Cid. - Allocation(ctx context.Context, ci cid.Cid) (api.Pin, error) + Allocation(ctx context.Context, ci api.Cid) (api.Pin, error) // Status returns the current ipfs state for a given Cid. If local is true, // the information affects only the current peer, otherwise the information // is fetched from all cluster peers. - Status(ctx context.Context, ci cid.Cid, local bool) (api.GlobalPinInfo, error) + Status(ctx context.Context, ci api.Cid, local bool) (api.GlobalPinInfo, error) // StatusCids status information for the requested CIDs. - StatusCids(ctx context.Context, cids []cid.Cid, local bool, out chan<- api.GlobalPinInfo) error + StatusCids(ctx context.Context, cids []api.Cid, local bool, out chan<- api.GlobalPinInfo) error // StatusAll gathers Status() for all tracked items. StatusAll(ctx context.Context, filter api.TrackerStatus, local bool, out chan<- api.GlobalPinInfo) error // Recover retriggers pin or unpin ipfs operations for a Cid in error // state. If local is true, the operation is limited to the current // peer, otherwise it happens on every cluster peer. - Recover(ctx context.Context, ci cid.Cid, local bool) (api.GlobalPinInfo, error) + Recover(ctx context.Context, ci api.Cid, local bool) (api.GlobalPinInfo, error) // RecoverAll triggers Recover() operations on all tracked items. If // local is true, the operation is limited to the current peer. // Otherwise, it happens everywhere. diff --git a/api/rest/client/lbclient.go b/api/rest/client/lbclient.go index 8ae6e2e2..2daee558 100644 --- a/api/rest/client/lbclient.go +++ b/api/rest/client/lbclient.go @@ -4,7 +4,6 @@ import ( "context" "sync/atomic" - cid "github.com/ipfs/go-cid" shell "github.com/ipfs/go-ipfs-api" files "github.com/ipfs/go-ipfs-files" "github.com/ipfs/ipfs-cluster/api" @@ -156,7 +155,7 @@ func (lc *loadBalancingClient) PeerRm(ctx context.Context, id peer.ID) error { // Pin tracks a Cid with the given replication factor and a name for // human-friendliness. -func (lc *loadBalancingClient) Pin(ctx context.Context, ci cid.Cid, opts api.PinOptions) (api.Pin, error) { +func (lc *loadBalancingClient) Pin(ctx context.Context, ci api.Cid, opts api.PinOptions) (api.Pin, error) { var pin api.Pin call := func(c Client) error { var err error @@ -169,7 +168,7 @@ func (lc *loadBalancingClient) Pin(ctx context.Context, ci cid.Cid, opts api.Pin } // Unpin untracks a Cid from cluster. -func (lc *loadBalancingClient) Unpin(ctx context.Context, ci cid.Cid) (api.Pin, error) { +func (lc *loadBalancingClient) Unpin(ctx context.Context, ci api.Cid) (api.Pin, error) { var pin api.Pin call := func(c Client) error { var err error @@ -220,7 +219,7 @@ func (lc *loadBalancingClient) Allocations(ctx context.Context, filter api.PinTy } // Allocation returns the current allocations for a given Cid. -func (lc *loadBalancingClient) Allocation(ctx context.Context, ci cid.Cid) (api.Pin, error) { +func (lc *loadBalancingClient) Allocation(ctx context.Context, ci api.Cid) (api.Pin, error) { var pin api.Pin call := func(c Client) error { var err error @@ -235,7 +234,7 @@ func (lc *loadBalancingClient) Allocation(ctx context.Context, ci cid.Cid) (api. // Status returns the current ipfs state for a given Cid. If local is true, // the information affects only the current peer, otherwise the information // is fetched from all cluster peers. -func (lc *loadBalancingClient) Status(ctx context.Context, ci cid.Cid, local bool) (api.GlobalPinInfo, error) { +func (lc *loadBalancingClient) Status(ctx context.Context, ci api.Cid, local bool) (api.GlobalPinInfo, error) { var pinInfo api.GlobalPinInfo call := func(c Client) error { var err error @@ -250,7 +249,7 @@ func (lc *loadBalancingClient) Status(ctx context.Context, ci cid.Cid, local boo // StatusCids returns Status() information for the given Cids. If local is // true, the information affects only the current peer, otherwise the // information is fetched from all cluster peers. -func (lc *loadBalancingClient) StatusCids(ctx context.Context, cids []cid.Cid, local bool, out chan<- api.GlobalPinInfo) error { +func (lc *loadBalancingClient) StatusCids(ctx context.Context, cids []api.Cid, local bool, out chan<- api.GlobalPinInfo) error { call := func(c Client) error { return c.StatusCids(ctx, cids, local, out) } @@ -276,7 +275,7 @@ func (lc *loadBalancingClient) StatusAll(ctx context.Context, filter api.Tracker // Recover retriggers pin or unpin ipfs operations for a Cid in error state. // If local is true, the operation is limited to the current peer, otherwise // it happens on every cluster peer. -func (lc *loadBalancingClient) Recover(ctx context.Context, ci cid.Cid, local bool) (api.GlobalPinInfo, error) { +func (lc *loadBalancingClient) Recover(ctx context.Context, ci api.Cid, local bool) (api.GlobalPinInfo, error) { var pinInfo api.GlobalPinInfo call := func(c Client) error { var err error diff --git a/api/rest/client/methods.go b/api/rest/client/methods.go index 9aa07812..9bfba5a4 100644 --- a/api/rest/client/methods.go +++ b/api/rest/client/methods.go @@ -15,7 +15,6 @@ import ( "github.com/ipfs/ipfs-cluster/api" - cid "github.com/ipfs/go-cid" files "github.com/ipfs/go-ipfs-files" gopath "github.com/ipfs/go-path" peer "github.com/libp2p/go-libp2p-core/peer" @@ -85,7 +84,7 @@ func (c *defaultClient) PeerRm(ctx context.Context, id peer.ID) error { // Pin tracks a Cid with the given replication factor and a name for // human-friendliness. -func (c *defaultClient) Pin(ctx context.Context, ci cid.Cid, opts api.PinOptions) (api.Pin, error) { +func (c *defaultClient) Pin(ctx context.Context, ci api.Cid, opts api.PinOptions) (api.Pin, error) { ctx, span := trace.StartSpan(ctx, "client/Pin") defer span.End() @@ -110,7 +109,7 @@ func (c *defaultClient) Pin(ctx context.Context, ci cid.Cid, opts api.PinOptions } // Unpin untracks a Cid from cluster. -func (c *defaultClient) Unpin(ctx context.Context, ci cid.Cid) (api.Pin, error) { +func (c *defaultClient) Unpin(ctx context.Context, ci api.Cid) (api.Pin, error) { ctx, span := trace.StartSpan(ctx, "client/Unpin") defer span.End() var pin api.Pin @@ -212,7 +211,7 @@ func (c *defaultClient) Allocations(ctx context.Context, filter api.PinType, out } // Allocation returns the current allocations for a given Cid. -func (c *defaultClient) Allocation(ctx context.Context, ci cid.Cid) (api.Pin, error) { +func (c *defaultClient) Allocation(ctx context.Context, ci api.Cid) (api.Pin, error) { ctx, span := trace.StartSpan(ctx, "client/Allocation") defer span.End() @@ -224,7 +223,7 @@ func (c *defaultClient) Allocation(ctx context.Context, ci cid.Cid) (api.Pin, er // Status returns the current ipfs state for a given Cid. If local is true, // the information affects only the current peer, otherwise the information // is fetched from all cluster peers. -func (c *defaultClient) Status(ctx context.Context, ci cid.Cid, local bool) (api.GlobalPinInfo, error) { +func (c *defaultClient) Status(ctx context.Context, ci api.Cid, local bool) (api.GlobalPinInfo, error) { ctx, span := trace.StartSpan(ctx, "client/Status") defer span.End() @@ -243,7 +242,7 @@ func (c *defaultClient) Status(ctx context.Context, ci cid.Cid, local bool) (api // StatusCids returns Status() information for the given Cids. If local is // true, the information affects only the current peer, otherwise the // information is fetched from all cluster peers. -func (c *defaultClient) StatusCids(ctx context.Context, cids []cid.Cid, local bool, out chan<- api.GlobalPinInfo) error { +func (c *defaultClient) StatusCids(ctx context.Context, cids []api.Cid, local bool, out chan<- api.GlobalPinInfo) error { return c.statusAllWithCids(ctx, api.TrackerStatusUndefined, cids, local, out) } @@ -256,7 +255,7 @@ func (c *defaultClient) StatusAll(ctx context.Context, filter api.TrackerStatus, return c.statusAllWithCids(ctx, filter, nil, local, out) } -func (c *defaultClient) statusAllWithCids(ctx context.Context, filter api.TrackerStatus, cids []cid.Cid, local bool, out chan<- api.GlobalPinInfo) error { +func (c *defaultClient) statusAllWithCids(ctx context.Context, filter api.TrackerStatus, cids []api.Cid, local bool, out chan<- api.GlobalPinInfo) error { defer close(out) ctx, span := trace.StartSpan(ctx, "client/StatusAll") defer span.End() @@ -298,7 +297,7 @@ func (c *defaultClient) statusAllWithCids(ctx context.Context, filter api.Tracke // Recover retriggers pin or unpin ipfs operations for a Cid in error state. // If local is true, the operation is limited to the current peer, otherwise // it happens on every cluster peer. -func (c *defaultClient) Recover(ctx context.Context, ci cid.Cid, local bool) (api.GlobalPinInfo, error) { +func (c *defaultClient) Recover(ctx context.Context, ci api.Cid, local bool) (api.GlobalPinInfo, error) { ctx, span := trace.StartSpan(ctx, "client/Recover") defer span.End() @@ -458,7 +457,7 @@ func WaitFor(ctx context.Context, c Client, fp StatusFilterParams) (api.GlobalPi // StatusFilterParams contains the parameters required // to filter a stream of status results. type StatusFilterParams struct { - Cid cid.Cid + Cid api.Cid Local bool // query status from the local peer only Target api.TrackerStatus Limit int // wait for N peers reaching status. 0 == all diff --git a/api/rest/client/methods_test.go b/api/rest/client/methods_test.go index ee68cb33..a4de9518 100644 --- a/api/rest/client/methods_test.go +++ b/api/rest/client/methods_test.go @@ -11,7 +11,6 @@ import ( rest "github.com/ipfs/ipfs-cluster/api/rest" test "github.com/ipfs/ipfs-cluster/test" - cid "github.com/ipfs/go-cid" peer "github.com/libp2p/go-libp2p-core/peer" rpc "github.com/libp2p/go-libp2p-gorpc" ma "github.com/multiformats/go-multiaddr" @@ -227,7 +226,7 @@ func TestPinPath(t *testing.T) { testF := func(t *testing.T, c Client) { for _, testCase := range pathTestCases { - ec, _ := cid.Decode(testCase.expectedCid) + ec, _ := types.DecodeCid(testCase.expectedCid) resultantPin := types.PinWithOpts(ec, opts) p := testCase.path pin, err := c.PinPath(ctx, p, opts) @@ -351,7 +350,7 @@ func TestStatusCids(t *testing.T) { out := make(chan types.GlobalPinInfo) go func() { - err := c.StatusCids(ctx, []cid.Cid{test.Cid1}, false, out) + err := c.StatusCids(ctx, []types.Cid{test.Cid1}, false, out) if err != nil { t.Error(err) } @@ -590,7 +589,7 @@ func (wait *waitService) Pin(ctx context.Context, in types.Pin, out *types.Pin) return nil } -func (wait *waitService) Status(ctx context.Context, in cid.Cid, out *types.GlobalPinInfo) error { +func (wait *waitService) Status(ctx context.Context, in types.Cid, out *types.GlobalPinInfo) error { wait.l.Lock() defer wait.l.Unlock() if time.Now().After(wait.pinStart.Add(5 * time.Second)) { //pinned @@ -642,7 +641,7 @@ func (wait *waitService) Status(ctx context.Context, in cid.Cid, out *types.Glob return nil } -func (wait *waitService) PinGet(ctx context.Context, in cid.Cid, out *types.Pin) error { +func (wait *waitService) PinGet(ctx context.Context, in types.Cid, out *types.Pin) error { p := types.PinCid(in) p.ReplicationFactorMin = 2 p.ReplicationFactorMax = 3 @@ -662,7 +661,7 @@ func (wait *waitServiceUnpin) Unpin(ctx context.Context, in types.Pin, out *type return nil } -func (wait *waitServiceUnpin) Status(ctx context.Context, in cid.Cid, out *types.GlobalPinInfo) error { +func (wait *waitServiceUnpin) Status(ctx context.Context, in types.Cid, out *types.GlobalPinInfo) error { wait.l.Lock() defer wait.l.Unlock() if time.Now().After(wait.unpinStart.Add(5 * time.Second)) { //unpinned @@ -698,7 +697,7 @@ func (wait *waitServiceUnpin) Status(ctx context.Context, in cid.Cid, out *types return nil } -func (wait *waitServiceUnpin) PinGet(ctx context.Context, in cid.Cid, out *types.Pin) error { +func (wait *waitServiceUnpin) PinGet(ctx context.Context, in types.Cid, out *types.Pin) error { return errors.New("not found") } diff --git a/api/rest/restapi.go b/api/rest/restapi.go index 4caee0d2..e8591c81 100644 --- a/api/rest/restapi.go +++ b/api/rest/restapi.go @@ -17,7 +17,6 @@ import ( "sync" "time" - "github.com/ipfs/go-cid" "github.com/ipfs/ipfs-cluster/adder/adderutils" types "github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/api/common" @@ -611,10 +610,10 @@ func (api *API) statusCidsHandler(w http.ResponseWriter, r *http.Request) { queryValues := r.URL.Query() filterCidsStr := strings.Split(queryValues.Get("cids"), ",") - var cids []cid.Cid + var cids []types.Cid for _, cidStr := range filterCidsStr { - c, err := cid.Decode(cidStr) + c, err := types.DecodeCid(cidStr) if err != nil { api.SendResponse(w, http.StatusBadRequest, fmt.Errorf("error decoding Cid: %w", err), nil) return @@ -638,7 +637,7 @@ func (api *API) statusCidsHandler(w http.ResponseWriter, r *http.Request) { if local == "true" { for _, ci := range cids { - go func(c cid.Cid) { + go func(c types.Cid) { defer wg.Done() var pinInfo types.PinInfo err := api.rpcClient.CallContext( @@ -658,7 +657,7 @@ func (api *API) statusCidsHandler(w http.ResponseWriter, r *http.Request) { } } else { for _, ci := range cids { - go func(c cid.Cid) { + go func(c types.Cid) { defer wg.Done() var pinInfo types.GlobalPinInfo err := api.rpcClient.CallContext( diff --git a/api/rest/restapi_test.go b/api/rest/restapi_test.go index 5d826d8e..328512be 100644 --- a/api/rest/restapi_test.go +++ b/api/rest/restapi_test.go @@ -13,7 +13,6 @@ import ( test "github.com/ipfs/ipfs-cluster/api/common/test" clustertest "github.com/ipfs/ipfs-cluster/test" - cid "github.com/ipfs/go-cid" libp2p "github.com/libp2p/go-libp2p" peer "github.com/libp2p/go-libp2p-core/peer" ma "github.com/multiformats/go-multiaddr" @@ -401,7 +400,7 @@ func TestAPIPinEndpointWithPath(t *testing.T) { tf := func(t *testing.T, url test.URLFunc) { for _, testCase := range pathTestCases[:3] { - c, _ := cid.Decode(testCase.expectedCid) + c, _ := api.DecodeCid(testCase.expectedCid) resultantPin := api.PinWithOpts( c, testPinOpts, diff --git a/api/types.go b/api/types.go index 4766b93a..ca0c4599 100644 --- a/api/types.go +++ b/api/types.go @@ -277,33 +277,66 @@ var ipfsPinStatus2TrackerStatusMap = map[IPFSPinStatus]TrackerStatus{ IPFSPinStatusError: TrackerStatusClusterError, //TODO(ajl): check suitability } -// Cid is a CID with the MarshalJSON/UnmarshalJSON methods overwritten. -type Cid cid.Cid +// Cid embeds a cid.Cid with the MarshalJSON/UnmarshalJSON methods overwritten. +type Cid struct { + cid.Cid +} -func (c Cid) String() string { - return cid.Cid(c).String() +// CidUndef is an Undefined CID. +var CidUndef = Cid{cid.Undef} + +// NewCid wraps a cid.Cid in a Cid. +func NewCid(c cid.Cid) Cid { + return Cid{ + Cid: c, + } +} + +// DecodeCid parses a CID from its string form. +func DecodeCid(str string) (Cid, error) { + c, err := cid.Decode(str) + return Cid{c}, err +} + +// CastCid returns a CID from its bytes. +func CastCid(bs []byte) (Cid, error) { + c, err := cid.Cast(bs) + return Cid{c}, err } // MarshalJSON marshals a CID as JSON as a normal CID string. func (c Cid) MarshalJSON() ([]byte, error) { - return json.Marshal(c.String()) + if !c.Defined() { + return []byte("null"), nil + } + return []byte(`"` + c.String() + `"`), nil } // UnmarshalJSON reads a CID from its representation as JSON string. func (c *Cid) UnmarshalJSON(b []byte) error { + if string(b) == "null" { + *c = CidUndef + return nil + } + var cidStr string err := json.Unmarshal(b, &cidStr) if err != nil { return err } - cc, err := cid.Decode(cidStr) + cc, err := DecodeCid(cidStr) if err != nil { return err } - *c = Cid(cc) + *c = cc return nil } +// Equals returns true if two Cids are equal. +func (c Cid) Equals(c2 Cid) bool { + return c.Cid.Equals(c2.Cid) +} + // IPFSPinInfo represents an IPFS Pin, which only has a CID and type. // Its JSON form is what IPFS returns when querying a pinset. type IPFSPinInfo struct { @@ -314,7 +347,7 @@ type IPFSPinInfo struct { // GlobalPinInfo contains cluster-wide status information about a tracked Cid, // indexed by cluster peer. type GlobalPinInfo struct { - Cid cid.Cid `json:"cid" codec:"c"` + Cid Cid `json:"cid" codec:"c"` Name string `json:"name" codec:"n"` Allocations []peer.ID `json:"allocations" codec:"a,omitempty"` Origins []Multiaddr `json:"origins" codec:"g,omitempty"` @@ -360,7 +393,7 @@ func (gpi *GlobalPinInfo) Defined() bool { return gpi.Cid.Defined() } -// Matches returns true if one of the statuses in GlobalPinInfo matches +// Match returns true if one of the statuses in GlobalPinInfo matches // the given filter. func (gpi GlobalPinInfo) Match(filter TrackerStatus) bool { for _, pi := range gpi.PeerMap { @@ -400,7 +433,7 @@ func (pis PinInfoShort) String() string { // PinInfo holds information about local pins. This is used by the Pin // Trackers. type PinInfo struct { - Cid cid.Cid `json:"cid" codec:"c"` + Cid Cid `json:"cid" codec:"c"` Name string `json:"name" codec:"m,omitempty"` Peer peer.ID `json:"peer" codec:"p,omitempty"` Allocations []peer.ID `json:"allocations" codec:"o,omitempty"` @@ -709,7 +742,7 @@ type PinOptions struct { UserAllocations []peer.ID `json:"user_allocations" codec:"ua,omitempty"` ExpireAt time.Time `json:"expire_at" codec:"e,omitempty"` Metadata map[string]string `json:"metadata" codec:"m,omitempty"` - PinUpdate cid.Cid `json:"pin_update,omitempty" codec:"pu,omitempty"` + PinUpdate Cid `json:"pin_update,omitempty" codec:"pu,omitempty"` Origins []Multiaddr `json:"origins" codec:"g,omitempty"` } @@ -807,7 +840,7 @@ func (po PinOptions) ToQuery() (string, error) { } q.Set(fmt.Sprintf("%s%s", pinOptionsMetaPrefix, k), v) } - if po.PinUpdate != cid.Undef { + if po.PinUpdate.Defined() { q.Set("pin-update", po.PinUpdate.String()) } @@ -888,7 +921,7 @@ func (po *PinOptions) FromQuery(q url.Values) error { updateStr := q.Get("pin-update") if updateStr != "" { - updateCid, err := cid.Decode(updateStr) + updateCid, err := DecodeCid(updateStr) if err != nil { return fmt.Errorf("error decoding update option parameter: %s", err) } @@ -940,7 +973,7 @@ func (pd PinDepth) ToPinMode() PinMode { type Pin struct { PinOptions - Cid cid.Cid `json:"cid" codec:"c"` + Cid Cid `json:"cid" codec:"c"` // See PinType comments Type PinType `json:"type" codec:"t,omitempty"` @@ -957,7 +990,7 @@ type Pin struct { // MetaPin it is the ClusterDAG CID. For Shards, // it is the previous shard CID. // When not needed the pointer is nil - Reference *cid.Cid `json:"reference" codec:"r,omitempty"` + Reference *Cid `json:"reference" codec:"r,omitempty"` // The time that the pin was submitted to the consensus layer. Timestamp time.Time `json:"timestamp" codec:"i,omitempty"` @@ -994,7 +1027,7 @@ func (pp PinPath) Defined() bool { // PinCid is a shortcut to create a Pin only with a Cid. Default is for pin to // be recursive and the pin to be of DataType. -func PinCid(c cid.Cid) Pin { +func PinCid(c Cid) Pin { return Pin{ Cid: c, Type: DataType, @@ -1007,7 +1040,7 @@ func PinCid(c cid.Cid) Pin { // PinWithOpts creates a new Pin calling PinCid(c) and then sets its // PinOptions fields with the given options. Pin fields that are linked to // options are set accordingly (MaxDepth from Mode). -func PinWithOpts(c cid.Cid, opts PinOptions) Pin { +func PinWithOpts(c Cid, opts PinOptions) Pin { p := PinCid(c) p.PinOptions = opts p.MaxDepth = p.Mode.ToPinDepth() @@ -1090,9 +1123,9 @@ func (pin *Pin) ProtoUnmarshal(data []byte) error { if err != nil { return err } - ci, err := cid.Cast(pbPin.GetCid()) + ci, err := CastCid(pbPin.GetCid()) if err != nil { - pin.Cid = cid.Undef + pin.Cid = CidUndef } else { pin.Cid = ci } @@ -1112,7 +1145,7 @@ func (pin *Pin) ProtoUnmarshal(data []byte) error { pin.Allocations = allocs pin.MaxDepth = PinDepth(pbPin.GetMaxDepth()) - ref, err := cid.Cast(pbPin.GetReference()) + ref, err := CastCid(pbPin.GetReference()) if err != nil { pin.Reference = nil @@ -1137,7 +1170,7 @@ func (pin *Pin) ProtoUnmarshal(data []byte) error { pin.ExpireAt = time.Unix(int64(exp), 0) } pin.Metadata = opts.GetMetadata() - pinUpdate, err := cid.Cast(opts.GetPinUpdate()) + pinUpdate, err := CastCid(opts.GetPinUpdate()) if err == nil { pin.PinUpdate = pinUpdate } @@ -1230,9 +1263,9 @@ func (pin Pin) Defined() bool { // NodeWithMeta specifies a block of data and a set of optional metadata fields // carrying information about the encoded ipld node type NodeWithMeta struct { - Data []byte `codec:"d,omitempty"` - Cid cid.Cid `codec:"c,omitempty"` - CumSize uint64 `codec:"s,omitempty"` // Cumulative size + Data []byte `codec:"d,omitempty"` + Cid Cid `codec:"c,omitempty"` + CumSize uint64 `codec:"s,omitempty"` // Cumulative size } // Size returns how big is the block. It is different from CumSize, which @@ -1357,8 +1390,8 @@ type IPFSRepoStat struct { // IPFSRepoGC represents the streaming response sent from repo gc API of IPFS. type IPFSRepoGC struct { - Key cid.Cid `json:"key,omitempty" codec:"k,omitempty"` - Error string `json:"error,omitempty" codec:"e,omitempty"` + Key Cid `json:"key,omitempty" codec:"k,omitempty"` + Error string `json:"error,omitempty" codec:"e,omitempty"` } // RepoGC contains garbage collected CIDs from a cluster peer's IPFS daemon. diff --git a/api/types_test.go b/api/types_test.go index 17c9b502..5ee3a1f3 100644 --- a/api/types_test.go +++ b/api/types_test.go @@ -8,7 +8,6 @@ import ( "testing" "time" - cid "github.com/ipfs/go-cid" peer "github.com/libp2p/go-libp2p-core/peer" multiaddr "github.com/multiformats/go-multiaddr" @@ -263,7 +262,7 @@ func TestIDCodec(t *testing.T) { } func TestPinCodec(t *testing.T) { - ci, _ := cid.Decode("QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLewuabc") + ci, _ := DecodeCid("QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLewuabc") pin := PinCid(ci) var buf bytes.Buffer enc := codec.NewEncoder(&buf, &codec.MsgpackHandle{}) diff --git a/cluster.go b/cluster.go index 39e40311..e663ae6b 100644 --- a/cluster.go +++ b/cluster.go @@ -19,7 +19,6 @@ import ( "github.com/ipfs/ipfs-cluster/version" "go.uber.org/multierr" - cid "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" host "github.com/libp2p/go-libp2p-core/host" peer "github.com/libp2p/go-libp2p-core/peer" @@ -1205,7 +1204,7 @@ func (c *Cluster) StatusAllLocal(ctx context.Context, filter api.TrackerStatus, // Status returns the GlobalPinInfo for a given Cid as fetched from all // current peers. If an error happens, the GlobalPinInfo should contain // as much information as could be fetched from the other peers. -func (c *Cluster) Status(ctx context.Context, h cid.Cid) (api.GlobalPinInfo, error) { +func (c *Cluster) Status(ctx context.Context, h api.Cid) (api.GlobalPinInfo, error) { _, span := trace.StartSpan(ctx, "cluster/Status") defer span.End() ctx = trace.NewContext(c.ctx, span) @@ -1214,7 +1213,7 @@ func (c *Cluster) Status(ctx context.Context, h cid.Cid) (api.GlobalPinInfo, err } // StatusLocal returns this peer's PinInfo for a given Cid. -func (c *Cluster) StatusLocal(ctx context.Context, h cid.Cid) api.PinInfo { +func (c *Cluster) StatusLocal(ctx context.Context, h api.Cid) api.PinInfo { _, span := trace.StartSpan(ctx, "cluster/StatusLocal") defer span.End() ctx = trace.NewContext(c.ctx, span) @@ -1225,8 +1224,8 @@ func (c *Cluster) StatusLocal(ctx context.Context, h cid.Cid) api.PinInfo { // used for RecoverLocal and SyncLocal. func (c *Cluster) localPinInfoOp( ctx context.Context, - h cid.Cid, - f func(context.Context, cid.Cid) (api.PinInfo, error), + h api.Cid, + f func(context.Context, api.Cid) (api.PinInfo, error), ) (pInfo api.PinInfo, err error) { ctx, span := trace.StartSpan(ctx, "cluster/localPinInfoOp") defer span.End() @@ -1284,7 +1283,7 @@ func (c *Cluster) RecoverAllLocal(ctx context.Context, out chan<- api.PinInfo) e // Recover operations ask IPFS to pin or unpin items in error state. Recover // is faster than calling Pin on the same CID as it avoids committing an // identical pin to the consensus layer. -func (c *Cluster) Recover(ctx context.Context, h cid.Cid) (api.GlobalPinInfo, error) { +func (c *Cluster) Recover(ctx context.Context, h api.Cid) (api.GlobalPinInfo, error) { _, span := trace.StartSpan(ctx, "cluster/Recover") defer span.End() ctx = trace.NewContext(c.ctx, span) @@ -1298,7 +1297,7 @@ func (c *Cluster) Recover(ctx context.Context, h cid.Cid) (api.GlobalPinInfo, er // Recover operations ask IPFS to pin or unpin items in error state. Recover // is faster than calling Pin on the same CID as it avoids committing an // identical pin to the consensus layer. -func (c *Cluster) RecoverLocal(ctx context.Context, h cid.Cid) (api.PinInfo, error) { +func (c *Cluster) RecoverLocal(ctx context.Context, h api.Cid) (api.PinInfo, error) { _, span := trace.StartSpan(ctx, "cluster/RecoverLocal") defer span.End() ctx = trace.NewContext(c.ctx, span) @@ -1353,7 +1352,7 @@ func (c *Cluster) pinsSlice(ctx context.Context) ([]api.Pin, error) { // assigned for the requested Cid, but does not indicate if // the item is successfully pinned. For that, use Status(). PinGet // returns an error if the given Cid is not part of the global state. -func (c *Cluster) PinGet(ctx context.Context, h cid.Cid) (api.Pin, error) { +func (c *Cluster) PinGet(ctx context.Context, h api.Cid) (api.Pin, error) { _, span := trace.StartSpan(ctx, "cluster/PinGet") defer span.End() ctx = trace.NewContext(c.ctx, span) @@ -1390,7 +1389,7 @@ func (c *Cluster) PinGet(ctx context.Context, h cid.Cid) (api.Pin, error) { // // If the Update option is set, the pin options (including allocations) will // be copied from an existing one. This is equivalent to running PinUpdate. -func (c *Cluster) Pin(ctx context.Context, h cid.Cid, opts api.PinOptions) (api.Pin, error) { +func (c *Cluster) Pin(ctx context.Context, h api.Cid, opts api.PinOptions) (api.Pin, error) { _, span := trace.StartSpan(ctx, "cluster/Pin") defer span.End() @@ -1521,12 +1520,12 @@ func (c *Cluster) pin( return api.Pin{}, false, errFollowerMode } - if pin.Cid == cid.Undef { + if !pin.Cid.Defined() { return pin, false, errors.New("bad pin object") } // Handle pin updates when the option is set - if update := pin.PinUpdate; update != cid.Undef && !update.Equals(pin.Cid) { + if update := pin.PinUpdate; update.Defined() && !update.Equals(pin.Cid) { pin, err := c.PinUpdate(ctx, update, pin.Cid, pin.PinOptions) return pin, true, err } @@ -1600,7 +1599,7 @@ func (c *Cluster) pin( // // Unpin does not reflect the success or failure of underlying IPFS daemon // unpinning operations, which happen in async fashion. -func (c *Cluster) Unpin(ctx context.Context, h cid.Cid) (api.Pin, error) { +func (c *Cluster) Unpin(ctx context.Context, h api.Cid) (api.Pin, error) { _, span := trace.StartSpan(ctx, "cluster/Unpin") defer span.End() ctx = trace.NewContext(c.ctx, span) @@ -1670,7 +1669,7 @@ func (c *Cluster) unpinClusterDag(metaPin api.Pin) error { // IPFSConnector supports it - the default one does). This may offer // significant speed when pinning items which are similar to previously pinned // content. -func (c *Cluster) PinUpdate(ctx context.Context, from cid.Cid, to cid.Cid, opts api.PinOptions) (api.Pin, error) { +func (c *Cluster) PinUpdate(ctx context.Context, from api.Cid, to api.Cid, opts api.PinOptions) (api.Pin, error) { existing, err := c.PinGet(ctx, from) if err != nil { // including when the existing pin is not found return api.Pin{}, err @@ -1728,7 +1727,7 @@ func (c *Cluster) UnpinPath(ctx context.Context, path string) (api.Pin, error) { // pipeline is used to DAGify the file. Depending on input parameters this // DAG can be added locally to the calling cluster peer's ipfs repo, or // sharded across the entire cluster. -func (c *Cluster) AddFile(ctx context.Context, reader *multipart.Reader, params api.AddParams) (cid.Cid, error) { +func (c *Cluster) AddFile(ctx context.Context, reader *multipart.Reader, params api.AddParams) (api.Cid, error) { // TODO: add context param and tracing var dags adder.ClusterDAGService @@ -1842,7 +1841,7 @@ func (c *Cluster) getTrustedPeers(ctx context.Context, exclude peer.ID) ([]peer. return trustedPeers, nil } -func (c *Cluster) setTrackerStatus(gpin *api.GlobalPinInfo, h cid.Cid, peers []peer.ID, status api.TrackerStatus, pin api.Pin, t time.Time) { +func (c *Cluster) setTrackerStatus(gpin *api.GlobalPinInfo, h api.Cid, peers []peer.ID, status api.TrackerStatus, pin api.Pin, t time.Time) { for _, p := range peers { pv := pingValueFromMetric(c.monitor.LatestForPeer(c.ctx, pingMetricName, p)) gpin.Add(api.PinInfo{ @@ -1864,7 +1863,7 @@ func (c *Cluster) setTrackerStatus(gpin *api.GlobalPinInfo, h cid.Cid, peers []p } } -func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h cid.Cid) (api.GlobalPinInfo, error) { +func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h api.Cid) (api.GlobalPinInfo, error) { ctx, span := trace.StartSpan(ctx, "cluster/globalPinInfoCid") defer span.End() @@ -2008,7 +2007,7 @@ func (c *Cluster) globalPinInfoStream(ctx context.Context, comp, method string, inChan = emptyChan } - fullMap := make(map[cid.Cid]api.GlobalPinInfo) + fullMap := make(map[api.Cid]api.GlobalPinInfo) var members []peer.ID var err error @@ -2141,7 +2140,7 @@ func (c *Cluster) getIDForPeer(ctx context.Context, pid peer.ID) (*api.ID, error // that order (the MetaPin is the last element). // It returns a slice with only the given Cid if it's not a known Cid or not a // MetaPin. -func (c *Cluster) cidsFromMetaPin(ctx context.Context, h cid.Cid) ([]cid.Cid, error) { +func (c *Cluster) cidsFromMetaPin(ctx context.Context, h api.Cid) ([]api.Cid, error) { ctx, span := trace.StartSpan(ctx, "cluster/cidsFromMetaPin") defer span.End() @@ -2150,7 +2149,7 @@ func (c *Cluster) cidsFromMetaPin(ctx context.Context, h cid.Cid) ([]cid.Cid, er return nil, err } - list := []cid.Cid{h} + list := []api.Cid{h} pin, err := cState.Get(ctx, h) if err != nil { @@ -2164,7 +2163,7 @@ func (c *Cluster) cidsFromMetaPin(ctx context.Context, h cid.Cid) ([]cid.Cid, er if pin.Reference == nil { return nil, errors.New("metaPin.Reference is unset") } - list = append([]cid.Cid{*pin.Reference}, list...) + list = append([]api.Cid{*pin.Reference}, list...) clusterDagPin, err := c.PinGet(ctx, *pin.Reference) if err != nil { return list, fmt.Errorf("could not get clusterDAG pin from state. Malformed pin?: %s", err) @@ -2180,7 +2179,7 @@ func (c *Cluster) cidsFromMetaPin(ctx context.Context, h cid.Cid) ([]cid.Cid, er return list, fmt.Errorf("error parsing clusterDAG block: %s", err) } for _, l := range clusterDagNode.Links() { - list = append([]cid.Cid{l.Cid}, list...) + list = append([]api.Cid{api.NewCid(l.Cid)}, list...) } return list, nil diff --git a/cluster_test.go b/cluster_test.go index 51e23438..b0d64438 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -21,7 +21,6 @@ import ( "github.com/ipfs/ipfs-cluster/test" "github.com/ipfs/ipfs-cluster/version" - cid "github.com/ipfs/go-cid" gopath "github.com/ipfs/go-path" peer "github.com/libp2p/go-libp2p-core/peer" rpc "github.com/libp2p/go-libp2p-gorpc" @@ -68,7 +67,7 @@ func (ipfs *mockConnector) Pin(ctx context.Context, pin api.Pin) error { return nil } -func (ipfs *mockConnector) Unpin(ctx context.Context, c cid.Cid) error { +func (ipfs *mockConnector) Unpin(ctx context.Context, c api.Cid) error { ipfs.pins.Delete(c) return nil } @@ -96,7 +95,7 @@ func (ipfs *mockConnector) PinLs(ctx context.Context, in []string, out chan<- ap default: st = api.IPFSPinStatusRecursive } - c := k.(cid.Cid) + c := k.(api.Cid) out <- api.IPFSPinInfo{Cid: api.Cid(c), Type: st} return true @@ -123,10 +122,10 @@ func (ipfs *mockConnector) RepoGC(ctx context.Context) (api.RepoGC, error) { }, nil } -func (ipfs *mockConnector) Resolve(ctx context.Context, path string) (cid.Cid, error) { +func (ipfs *mockConnector) Resolve(ctx context.Context, path string) (api.Cid, error) { _, err := gopath.ParsePath(path) if err != nil { - return cid.Undef, err + return api.CidUndef, err } return test.CidResolved, nil @@ -141,7 +140,7 @@ func (ipfs *mockConnector) BlockStream(ctx context.Context, in <-chan api.NodeWi return nil } -func (ipfs *mockConnector) BlockGet(ctx context.Context, c cid.Cid) ([]byte, error) { +func (ipfs *mockConnector) BlockGet(ctx context.Context, c api.Cid) ([]byte, error) { d, ok := ipfs.blocks.Load(c.String()) if !ok { return nil, errors.New("block not found") @@ -444,7 +443,7 @@ func TestUnpinShard(t *testing.T) { sharding.VerifyShards(t, root, cl, cl.ipfs, 14) // skipping errors, VerifyShards has checked - pinnedCids := []cid.Cid{} + pinnedCids := []api.Cid{} pinnedCids = append(pinnedCids, root) metaPin, _ := cl.PinGet(ctx, root) cDag, _ := cl.PinGet(ctx, *metaPin.Reference) @@ -452,7 +451,7 @@ func TestUnpinShard(t *testing.T) { cDagBlock, _ := cl.ipfs.BlockGet(ctx, cDag.Cid) cDagNode, _ := sharding.CborDataToNode(cDagBlock, "cbor") for _, l := range cDagNode.Links() { - pinnedCids = append(pinnedCids, l.Cid) + pinnedCids = append(pinnedCids, api.NewCid(l.Cid)) } t.Run("unpin clusterdag should fail", func(t *testing.T) { @@ -464,7 +463,7 @@ func TestUnpinShard(t *testing.T) { }) t.Run("unpin shard should fail", func(t *testing.T) { - _, err := cl.Unpin(ctx, cDagNode.Links()[0].Cid) + _, err := cl.Unpin(ctx, api.NewCid(cDagNode.Links()[0].Cid)) if err == nil { t.Fatal("should not allow unpinning shards directly") } @@ -504,10 +503,10 @@ func TestUnpinShard(t *testing.T) { // cShard, _ := cid.Decode(test.ShardCid) // cCdag, _ := cid.Decode(test.CdagCid) // cMeta, _ := cid.Decode(test.MetaRootCid) -// pinMeta(t, cl, []cid.Cid{cShard}, cCdag, cMeta) +// pinMeta(t, cl, []api.NewCid(cShard), cCdag, cMeta) // } -// func pinMeta(t *testing.T, cl *Cluster, shardCids []cid.Cid, cCdag, cMeta cid.Cid) { +// func pinMeta(t *testing.T, cl *Cluster, shardCids []api.Cid, cCdag, cMeta api.Cid) { // for _, cShard := range shardCids { // shardPin := api.Pin{ // Cid: cShard, @@ -609,7 +608,7 @@ func TestUnpinShard(t *testing.T) { // cShard2, _ := cid.Decode(test.ShardCid2) // cCdag2, _ := cid.Decode(test.CdagCid2) // cMeta2, _ := cid.Decode(test.MetaRootCid2) -// pinMeta(t, cl, []cid.Cid{cShard, cShard2}, cCdag2, cMeta2) +// pinMeta(t, cl, []api.Cid{cShard, cShard2}, cCdag2, cMeta2) // shardPin, err := cl.PinGet(cShard) // if err != nil { diff --git a/cmd/ipfs-cluster-ctl/main.go b/cmd/ipfs-cluster-ctl/main.go index c87582db..9d1aca1f 100644 --- a/cmd/ipfs-cluster-ctl/main.go +++ b/cmd/ipfs-cluster-ctl/main.go @@ -15,7 +15,6 @@ import ( "github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/api/rest/client" - cid "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" peer "github.com/libp2p/go-libp2p-core/peer" ma "github.com/multiformats/go-multiaddr" @@ -784,7 +783,7 @@ existing item from the cluster. Please run "pin rm" for that. from := c.Args().Get(0) to := c.Args().Get(1) - fromCid, err := cid.Decode(from) + fromCid, err := api.DecodeCid(from) checkErr("parsing from Cid", err) var expireAt time.Time @@ -842,7 +841,7 @@ The filter only takes effect when listing all pins. The possible values are: Action: func(c *cli.Context) error { cidStr := c.Args().First() if cidStr != "" { - ci, err := cid.Decode(cidStr) + ci, err := api.DecodeCid(cidStr) checkErr("parsing cid", err) resp, cerr := globalClient.Allocation(ctx, ci) formatResponse(c, resp, cerr) @@ -889,9 +888,9 @@ separated list). The following are valid status values: }, Action: func(c *cli.Context) error { cidsStr := c.Args() - cids := make([]cid.Cid, len(cidsStr)) + cids := make([]api.Cid, len(cidsStr)) for i, cStr := range cidsStr { - ci, err := cid.Decode(cStr) + ci, err := api.DecodeCid(cStr) checkErr("parsing cid", err) cids[i] = ci } @@ -944,7 +943,7 @@ operations on the contacted peer (as opposed to on every peer). Action: func(c *cli.Context) error { cidStr := c.Args().First() if cidStr != "" { - ci, err := cid.Decode(cidStr) + ci, err := api.DecodeCid(cidStr) checkErr("parsing cid", err) resp, cerr := globalClient.Recover(ctx, ci, c.Bool("local")) formatResponse(c, resp, cerr) @@ -1215,7 +1214,7 @@ func handlePinResponseFormatFlags( } func waitFor( - ci cid.Cid, + ci api.Cid, target api.TrackerStatus, timeout time.Duration, limit int, diff --git a/cmd/ipfs-cluster-follow/commands.go b/cmd/ipfs-cluster-follow/commands.go index 6d1cf13c..b749b3e7 100644 --- a/cmd/ipfs-cluster-follow/commands.go +++ b/cmd/ipfs-cluster-follow/commands.go @@ -9,7 +9,6 @@ import ( "strings" "time" - "github.com/ipfs/go-cid" ipfscluster "github.com/ipfs/ipfs-cluster" "github.com/ipfs/ipfs-cluster/allocator/balanced" "github.com/ipfs/ipfs-cluster/api" @@ -548,7 +547,7 @@ func printStatusOffline(cfgHelper *cmdutils.ConfigHelper) error { return err } -func printPin(c cid.Cid, status, name, err string) { +func printPin(c api.Cid, status, name, err string) { if err != "" { name = name + " (" + err + ")" } diff --git a/consensus/crdt/consensus.go b/consensus/crdt/consensus.go index 2e9b9103..bdcf8e4c 100644 --- a/consensus/crdt/consensus.go +++ b/consensus/crdt/consensus.go @@ -1,3 +1,5 @@ +// Package crdt implements the IPFS Cluster consensus interface using +// CRDT-datastore to replicate the cluster global state to every peer. package crdt import ( @@ -8,7 +10,6 @@ import ( "sync" "time" - "github.com/ipfs/go-cid" "github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/pstoremgr" "github.com/ipfs/ipfs-cluster/state" @@ -238,7 +239,7 @@ func (css *Consensus) setup() { logger.Error(err, k) return } - c, err := cid.Cast(kb) + c, err := api.CastCid(kb) if err != nil { logger.Error(err, k) return diff --git a/consensus/crdt/consensus_test.go b/consensus/crdt/consensus_test.go index 083f85c6..670229e5 100644 --- a/consensus/crdt/consensus_test.go +++ b/consensus/crdt/consensus_test.go @@ -10,7 +10,6 @@ import ( "github.com/ipfs/ipfs-cluster/datastore/inmem" "github.com/ipfs/ipfs-cluster/test" - cid "github.com/ipfs/go-cid" ipns "github.com/ipfs/go-ipns" libp2p "github.com/libp2p/go-libp2p" host "github.com/libp2p/go-libp2p-core/host" @@ -87,7 +86,7 @@ func clean(t *testing.T, cc *Consensus) { } } -func testPin(c cid.Cid) api.Pin { +func testPin(c api.Cid) api.Pin { p := api.PinCid(c) p.ReplicationFactorMin = -1 p.ReplicationFactorMax = -1 @@ -453,7 +452,7 @@ func TestBatching(t *testing.T) { } // Pin 4 things, and check that 3 are commited - for _, c := range []cid.Cid{test.Cid2, test.Cid3, test.Cid4, test.Cid5} { + for _, c := range []api.Cid{test.Cid2, test.Cid3, test.Cid4, test.Cid5} { err = cc.LogPin(ctx, testPin(c)) if err != nil { t.Error(err) diff --git a/consensus/raft/consensus_test.go b/consensus/raft/consensus_test.go index d158901d..bf49d0a2 100644 --- a/consensus/raft/consensus_test.go +++ b/consensus/raft/consensus_test.go @@ -12,7 +12,6 @@ import ( "github.com/ipfs/ipfs-cluster/state/dsstate" "github.com/ipfs/ipfs-cluster/test" - cid "github.com/ipfs/go-cid" libp2p "github.com/libp2p/go-libp2p" host "github.com/libp2p/go-libp2p-core/host" peerstore "github.com/libp2p/go-libp2p-core/peerstore" @@ -22,7 +21,7 @@ func cleanRaft(idn int) { os.RemoveAll(fmt.Sprintf("raftFolderFromTests-%d", idn)) } -func testPin(c cid.Cid) api.Pin { +func testPin(c api.Cid) api.Pin { p := api.PinCid(c) p.ReplicationFactorMin = -1 p.ReplicationFactorMax = -1 diff --git a/ipfscluster.go b/ipfscluster.go index d21e0a63..ad820222 100644 --- a/ipfscluster.go +++ b/ipfscluster.go @@ -12,7 +12,6 @@ import ( "github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/state" - cid "github.com/ipfs/go-cid" peer "github.com/libp2p/go-libp2p-core/peer" rpc "github.com/libp2p/go-libp2p-gorpc" ) @@ -76,7 +75,7 @@ type IPFSConnector interface { Component ID(context.Context) (api.IPFSID, error) Pin(context.Context, api.Pin) error - Unpin(context.Context, cid.Cid) error + Unpin(context.Context, api.Cid) error PinLsCid(context.Context, api.Pin) (api.IPFSPinStatus, error) // PinLs returns pins in the pinset of the given types (recursive, direct...) PinLs(ctx context.Context, typeFilters []string, out chan<- api.IPFSPinInfo) error @@ -94,11 +93,11 @@ type IPFSConnector interface { // RepoGC performs garbage collection sweep on the IPFS repo. RepoGC(context.Context) (api.RepoGC, error) // Resolve returns a cid given a path. - Resolve(context.Context, string) (cid.Cid, error) + Resolve(context.Context, string) (api.Cid, error) // BlockStream adds a stream of blocks to IPFS. BlockStream(context.Context, <-chan api.NodeWithMeta) error // BlockGet retrieves the raw data of an IPFS block. - BlockGet(context.Context, cid.Cid) ([]byte, error) + BlockGet(context.Context, api.Cid) ([]byte, error) } // Peered represents a component which needs to be aware of the peers @@ -119,16 +118,16 @@ type PinTracker interface { Track(context.Context, api.Pin) error // Untrack tells the tracker that a Cid is to be forgotten. The tracker // may perform an IPFS unpin operation. - Untrack(context.Context, cid.Cid) error + Untrack(context.Context, api.Cid) error // StatusAll returns the list of pins with their local status. Takes a // filter to specify which statuses to report. StatusAll(context.Context, api.TrackerStatus, chan<- api.PinInfo) error // Status returns the local status of a given Cid. - Status(context.Context, cid.Cid) api.PinInfo + Status(context.Context, api.Cid) api.PinInfo // RecoverAll calls Recover() for all pins tracked. RecoverAll(context.Context, chan<- api.PinInfo) error // Recover retriggers a Pin/Unpin operation in a Cids with error status. - Recover(context.Context, cid.Cid) (api.PinInfo, error) + Recover(context.Context, api.Cid) (api.PinInfo, error) } // Informer provides Metric information from a peer. The metrics produced by @@ -155,7 +154,7 @@ type PinAllocator interface { // which are currently pinning the content. The candidates map // contains the metrics for all peers which are eligible for pinning // the content. - Allocate(ctx context.Context, c cid.Cid, current, candidates, priority api.MetricsSet) ([]peer.ID, error) + Allocate(ctx context.Context, c api.Cid, current, candidates, priority api.MetricsSet) ([]peer.ID, error) // Metrics returns the list of metrics that the allocator needs. Metrics() []string } diff --git a/ipfscluster_test.go b/ipfscluster_test.go index 1807fe97..3e4d904e 100644 --- a/ipfscluster_test.go +++ b/ipfscluster_test.go @@ -656,7 +656,7 @@ func TestClustersPin(t *testing.T) { if err != nil { t.Fatal(err) } - _, err = clusters[j].Pin(ctx, h, api.PinOptions{}) + _, err = clusters[j].Pin(ctx, api.NewCid(h), api.PinOptions{}) if err != nil { t.Errorf("error pinning %s: %s", h, err) } @@ -759,12 +759,12 @@ func TestClustersPinUpdate(t *testing.T) { h, _ := prefix.Sum(randomBytes()) // create random cid h2, _ := prefix.Sum(randomBytes()) // create random cid - _, err := clusters[0].PinUpdate(ctx, h, h2, api.PinOptions{}) + _, err := clusters[0].PinUpdate(ctx, api.NewCid(h), api.NewCid(h2), api.PinOptions{}) if err == nil || err != state.ErrNotFound { t.Fatal("pin update should fail when from is not pinned") } - _, err = clusters[0].Pin(ctx, h, api.PinOptions{}) + _, err = clusters[0].Pin(ctx, api.NewCid(h), api.PinOptions{}) if err != nil { t.Errorf("error pinning %s: %s", h, err) } @@ -773,12 +773,12 @@ func TestClustersPinUpdate(t *testing.T) { expiry := time.Now().AddDate(1, 0, 0) opts2 := api.PinOptions{ UserAllocations: []peer.ID{clusters[0].host.ID()}, // should not be used - PinUpdate: h, + PinUpdate: api.NewCid(h), Name: "new name", ExpireAt: expiry, } - _, err = clusters[0].Pin(ctx, h2, opts2) // should call PinUpdate + _, err = clusters[0].Pin(ctx, api.NewCid(h2), opts2) // should call PinUpdate if err != nil { t.Errorf("error pin-updating %s: %s", h2, err) } @@ -786,7 +786,7 @@ func TestClustersPinUpdate(t *testing.T) { pinDelay() f := func(t *testing.T, c *Cluster) { - pinget, err := c.PinGet(ctx, h2) + pinget, err := c.PinGet(ctx, api.NewCid(h2)) if err != nil { t.Fatal(err) } @@ -821,7 +821,7 @@ func TestClustersPinDirect(t *testing.T) { h, _ := prefix.Sum(randomBytes()) // create random cid - _, err := clusters[0].Pin(ctx, h, api.PinOptions{Mode: api.PinModeDirect}) + _, err := clusters[0].Pin(ctx, api.NewCid(h), api.PinOptions{Mode: api.PinModeDirect}) if err != nil { t.Fatal(err) } @@ -829,7 +829,7 @@ func TestClustersPinDirect(t *testing.T) { pinDelay() f := func(t *testing.T, c *Cluster, mode api.PinMode) { - pinget, err := c.PinGet(ctx, h) + pinget, err := c.PinGet(ctx, api.NewCid(h)) if err != nil { t.Fatal(err) } @@ -842,7 +842,7 @@ func TestClustersPinDirect(t *testing.T) { t.Errorf("pin should have max-depth %d but has %d", mode.ToPinDepth(), pinget.MaxDepth) } - pInfo := c.StatusLocal(ctx, h) + pInfo := c.StatusLocal(ctx, api.NewCid(h)) if pInfo.Error != "" { t.Error(pInfo.Error) } @@ -857,7 +857,7 @@ func TestClustersPinDirect(t *testing.T) { }) // Convert into a recursive mode - _, err = clusters[0].Pin(ctx, h, api.PinOptions{Mode: api.PinModeRecursive}) + _, err = clusters[0].Pin(ctx, api.NewCid(h), api.PinOptions{Mode: api.PinModeRecursive}) if err != nil { t.Fatal(err) } @@ -869,7 +869,7 @@ func TestClustersPinDirect(t *testing.T) { }) // This should fail as we cannot convert back to direct - _, err = clusters[0].Pin(ctx, h, api.PinOptions{Mode: api.PinModeDirect}) + _, err = clusters[0].Pin(ctx, api.NewCid(h), api.PinOptions{Mode: api.PinModeDirect}) if err == nil { t.Error("a recursive pin cannot be converted back to direct pin") } @@ -1229,14 +1229,14 @@ func TestClustersReplicationOverall(t *testing.T) { if err != nil { t.Fatal(err) } - _, err = clusters[j].Pin(ctx, h, api.PinOptions{}) + _, err = clusters[j].Pin(ctx, api.NewCid(h), api.PinOptions{}) if err != nil { t.Error(err) } pinDelay() // check that it is held by exactly nClusters - 1 peers - gpi, err := clusters[j].Status(ctx, h) + gpi, err := clusters[j].Status(ctx, api.NewCid(h)) if err != nil { t.Fatal(err) } diff --git a/ipfsconn/ipfshttp/ipfshttp.go b/ipfsconn/ipfshttp/ipfshttp.go index c1129834..45e68305 100644 --- a/ipfsconn/ipfshttp/ipfshttp.go +++ b/ipfsconn/ipfshttp/ipfshttp.go @@ -353,7 +353,7 @@ func (ipfs *Connector) Pin(ctx context.Context, pin api.Pin) error { // If we have a pin-update, and the old object // is pinned recursively, then do pin/update. // Otherwise do a normal pin. - if from := pin.PinUpdate; from != cid.Undef { + if from := pin.PinUpdate; from.Defined() { fromPin := api.PinWithOpts(from, pin.PinOptions) pinStatus, _ := ipfs.PinLsCid(ctx, fromPin) if pinStatus.IsPinned(-1) { // pinned recursively. @@ -408,7 +408,7 @@ func (ipfs *Connector) Pin(ctx context.Context, pin api.Pin) error { // pinProgress pins an item and sends fetched node's progress on a // channel. Blocks until done or error. pinProgress will always close the out // channel. pinProgress will not block on sending to the channel if it is full. -func (ipfs *Connector) pinProgress(ctx context.Context, hash cid.Cid, maxDepth api.PinDepth, out chan<- int) error { +func (ipfs *Connector) pinProgress(ctx context.Context, hash api.Cid, maxDepth api.PinDepth, out chan<- int) error { defer close(out) ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/pinsProgress") @@ -451,7 +451,7 @@ func (ipfs *Connector) pinProgress(ctx context.Context, hash cid.Cid, maxDepth a } } -func (ipfs *Connector) pinUpdate(ctx context.Context, from, to cid.Cid) error { +func (ipfs *Connector) pinUpdate(ctx context.Context, from, to api.Cid) error { ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/pinUpdate") defer span.End() @@ -467,7 +467,7 @@ func (ipfs *Connector) pinUpdate(ctx context.Context, from, to cid.Cid) error { // Unpin performs an unpin request against the configured IPFS // daemon. -func (ipfs *Connector) Unpin(ctx context.Context, hash cid.Cid) error { +func (ipfs *Connector) Unpin(ctx context.Context, hash api.Cid) error { ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/Unpin") defer span.End() @@ -834,23 +834,23 @@ func (ipfs *Connector) RepoGC(ctx context.Context) (api.RepoGC, error) { } } - repoGC.Keys = append(repoGC.Keys, api.IPFSRepoGC{Key: resp.Key, Error: resp.Error}) + repoGC.Keys = append(repoGC.Keys, api.IPFSRepoGC{Key: api.NewCid(resp.Key), Error: resp.Error}) } } // Resolve accepts ipfs or ipns path and resolves it into a cid -func (ipfs *Connector) Resolve(ctx context.Context, path string) (cid.Cid, error) { +func (ipfs *Connector) Resolve(ctx context.Context, path string) (api.Cid, error) { ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/Resolve") defer span.End() validPath, err := gopath.ParsePath(path) if err != nil { logger.Error("could not parse path: " + err.Error()) - return cid.Undef, err + return api.CidUndef, err } if !strings.HasPrefix(path, "/ipns") && validPath.IsJustAKey() { ci, _, err := gopath.SplitAbsPath(validPath) - return ci, err + return api.NewCid(ci), err } ctx, cancel := context.WithTimeout(ctx, ipfs.config.IPFSRequestTimeout) @@ -858,18 +858,18 @@ func (ipfs *Connector) Resolve(ctx context.Context, path string) (cid.Cid, error res, err := ipfs.postCtx(ctx, "resolve?arg="+url.QueryEscape(path), "", nil) if err != nil { logger.Error(err) - return cid.Undef, err + return api.CidUndef, err } var resp ipfsResolveResp err = json.Unmarshal(res, &resp) if err != nil { logger.Error("could not unmarshal response: " + err.Error()) - return cid.Undef, err + return api.CidUndef, err } ci, _, err := gopath.SplitAbsPath(gopath.FromString(resp.Path)) - return ci, err + return api.NewCid(ci), err } // SwarmPeers returns the peers currently connected to this ipfs daemon. @@ -950,15 +950,15 @@ func (ci *chanIterator) Node() files.Node { return nil } ci.seenMu.Lock() - ci.seen.Add(ci.current.Cid) + ci.seen.Add(ci.current.Cid.Cid) ci.seenMu.Unlock() return files.NewBytesFile(ci.current.Data) } func (ci *chanIterator) Seen(c api.Cid) bool { ci.seenMu.Lock() - has := ci.seen.Has(cid.Cid(c)) - ci.seen.Remove(cid.Cid(c)) + has := ci.seen.Has(c.Cid) + ci.seen.Remove(c.Cid) ci.seenMu.Unlock() return has } @@ -1117,7 +1117,7 @@ func (ipfs *Connector) BlockStream(ctx context.Context, blocks <-chan api.NodeWi } // BlockGet retrieves an ipfs block with the given cid -func (ipfs *Connector) BlockGet(ctx context.Context, c cid.Cid) ([]byte, error) { +func (ipfs *Connector) BlockGet(ctx context.Context, c api.Cid) ([]byte, error) { ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/BlockGet") defer span.End() @@ -1129,7 +1129,7 @@ func (ipfs *Connector) BlockGet(ctx context.Context, c cid.Cid) ([]byte, error) // // FetchRefs asks IPFS to download blocks recursively to the given depth. // // It discards the response, but waits until it completes. -// func (ipfs *Connector) FetchRefs(ctx context.Context, c cid.Cid, maxDepth int) error { +// func (ipfs *Connector) FetchRefs(ctx context.Context, c api.Cid, maxDepth int) error { // ctx, cancel := context.WithTimeout(ipfs.ctx, ipfs.config.PinTimeout) // defer cancel() diff --git a/observations/metrics.go b/observations/metrics.go index 28ef241f..9a9dc5e3 100644 --- a/observations/metrics.go +++ b/observations/metrics.go @@ -1,3 +1,4 @@ +// Package observations sets up metric and trace exporting for IPFS cluster. package observations import ( diff --git a/peer_manager_test.go b/peer_manager_test.go index b000f206..c12a1da8 100644 --- a/peer_manager_test.go +++ b/peer_manager_test.go @@ -12,7 +12,6 @@ import ( "github.com/ipfs/ipfs-cluster/config" "github.com/ipfs/ipfs-cluster/test" - cid "github.com/ipfs/go-cid" host "github.com/libp2p/go-libp2p-core/host" peer "github.com/libp2p/go-libp2p-core/peer" ma "github.com/multiformats/go-multiaddr" @@ -462,7 +461,7 @@ func TestClustersPeerRemoveReallocsPins(t *testing.T) { if err != nil { t.Fatal(err) } - _, err = chosen.Pin(ctx, h, api.PinOptions{}) + _, err = chosen.Pin(ctx, api.NewCid(h), api.PinOptions{}) if err != nil { t.Fatal(err) } @@ -474,7 +473,7 @@ func TestClustersPeerRemoveReallocsPins(t *testing.T) { // At this point, all peers must have nClusters -1 pins // associated to them. // Find out which pins are associated to the chosen peer. - interestingCids := []cid.Cid{} + interestingCids := []api.Cid{} pins, err := chosen.pinsSlice(ctx) if err != nil { diff --git a/pintracker/optracker/operation.go b/pintracker/optracker/operation.go index e4464352..c9f9e08e 100644 --- a/pintracker/optracker/operation.go +++ b/pintracker/optracker/operation.go @@ -7,8 +7,6 @@ import ( "sync" "time" - cid "github.com/ipfs/go-cid" - "github.com/ipfs/ipfs-cluster/api" "go.opencensus.io/trace" ) @@ -109,7 +107,7 @@ func (op *Operation) String() string { } // Cid returns the Cid associated to this operation. -func (op *Operation) Cid() cid.Cid { +func (op *Operation) Cid() api.Cid { return op.pin.Cid } diff --git a/pintracker/optracker/operationtracker.go b/pintracker/optracker/operationtracker.go index c0a853cf..767540b7 100644 --- a/pintracker/optracker/operationtracker.go +++ b/pintracker/optracker/operationtracker.go @@ -14,7 +14,6 @@ import ( "github.com/ipfs/ipfs-cluster/api" - cid "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" peer "github.com/libp2p/go-libp2p-core/peer" @@ -30,7 +29,7 @@ type OperationTracker struct { peerName string mu sync.RWMutex - operations map[cid.Cid]*Operation + operations map[api.Cid]*Operation } func (opt *OperationTracker) String() string { @@ -57,7 +56,7 @@ func NewOperationTracker(ctx context.Context, pid peer.ID, peerName string) *Ope ctx: ctx, pid: pid, peerName: peerName, - operations: make(map[cid.Cid]*Operation), + operations: make(map[api.Cid]*Operation), } } @@ -107,7 +106,7 @@ func (opt *OperationTracker) Clean(ctx context.Context, op *Operation) { // Status returns the TrackerStatus associated to the last operation known // with the given Cid. It returns false if we are not tracking any operation // for the given Cid. -func (opt *OperationTracker) Status(ctx context.Context, c cid.Cid) (api.TrackerStatus, bool) { +func (opt *OperationTracker) Status(ctx context.Context, c api.Cid) (api.TrackerStatus, bool) { opt.mu.RLock() defer opt.mu.RUnlock() op, ok := opt.operations[c] @@ -122,7 +121,7 @@ func (opt *OperationTracker) Status(ctx context.Context, c cid.Cid) (api.Tracker // is PhaseDone. Any other phases are considered in-flight and not touched. // For things already in error, the error message is updated. // Remote pins are ignored too. -func (opt *OperationTracker) SetError(ctx context.Context, c cid.Cid, err error) { +func (opt *OperationTracker) SetError(ctx context.Context, c api.Cid, err error) { opt.mu.Lock() defer opt.mu.Unlock() op, ok := opt.operations[c] @@ -143,7 +142,7 @@ func (opt *OperationTracker) SetError(ctx context.Context, c cid.Cid, err error) func (opt *OperationTracker) unsafePinInfo(ctx context.Context, op *Operation, ipfs api.IPFSID) api.PinInfo { if op == nil { return api.PinInfo{ - Cid: cid.Undef, + Cid: api.CidUndef, Peer: opt.pid, Name: "", PinInfoShort: api.PinInfoShort{ @@ -175,7 +174,7 @@ func (opt *OperationTracker) unsafePinInfo(ctx context.Context, op *Operation, i } // Get returns a PinInfo object for Cid. -func (opt *OperationTracker) Get(ctx context.Context, c cid.Cid, ipfs api.IPFSID) api.PinInfo { +func (opt *OperationTracker) Get(ctx context.Context, c api.Cid, ipfs api.IPFSID) api.PinInfo { ctx, span := trace.StartSpan(ctx, "optracker/GetAll") defer span.End() @@ -183,7 +182,7 @@ func (opt *OperationTracker) Get(ctx context.Context, c cid.Cid, ipfs api.IPFSID defer opt.mu.RUnlock() op := opt.operations[c] pInfo := opt.unsafePinInfo(ctx, op, ipfs) - if pInfo.Cid == cid.Undef { + if !pInfo.Cid.Defined() { pInfo.Cid = c } return pInfo @@ -191,7 +190,7 @@ func (opt *OperationTracker) Get(ctx context.Context, c cid.Cid, ipfs api.IPFSID // GetExists returns a PinInfo object for a Cid only if there exists // an associated Operation. -func (opt *OperationTracker) GetExists(ctx context.Context, c cid.Cid, ipfs api.IPFSID) (api.PinInfo, bool) { +func (opt *OperationTracker) GetExists(ctx context.Context, c api.Cid, ipfs api.IPFSID) (api.PinInfo, bool) { ctx, span := trace.StartSpan(ctx, "optracker/GetExists") defer span.End() @@ -258,7 +257,7 @@ func (opt *OperationTracker) CleanAllDone(ctx context.Context) { } // OpContext gets the context of an operation, if any. -func (opt *OperationTracker) OpContext(ctx context.Context, c cid.Cid) context.Context { +func (opt *OperationTracker) OpContext(ctx context.Context, c api.Cid) context.Context { opt.mu.RLock() defer opt.mu.RUnlock() op, ok := opt.operations[c] @@ -298,8 +297,8 @@ func (opt *OperationTracker) filterOps(ctx context.Context, filters ...interface return fltops } -func filterOpsMap(ctx context.Context, ops map[cid.Cid]*Operation, filters []interface{}) map[cid.Cid]*Operation { - fltops := make(map[cid.Cid]*Operation) +func filterOpsMap(ctx context.Context, ops map[api.Cid]*Operation, filters []interface{}) map[api.Cid]*Operation { + fltops := make(map[api.Cid]*Operation) if len(filters) < 1 { return nil } @@ -315,7 +314,7 @@ func filterOpsMap(ctx context.Context, ops map[cid.Cid]*Operation, filters []int return filterOpsMap(ctx, fltops, filters) } -func filter(ctx context.Context, in, out map[cid.Cid]*Operation, filter interface{}) { +func filter(ctx context.Context, in, out map[api.Cid]*Operation, filter interface{}) { for _, op := range in { switch filter.(type) { case OperationType: diff --git a/pintracker/optracker/operationtracker_test.go b/pintracker/optracker/operationtracker_test.go index 493c396e..9f23841e 100644 --- a/pintracker/optracker/operationtracker_test.go +++ b/pintracker/optracker/operationtracker_test.go @@ -5,7 +5,6 @@ import ( "errors" "testing" - cid "github.com/ipfs/go-cid" "github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/test" ) @@ -203,7 +202,7 @@ func TestOperationTracker_OpContext(t *testing.T) { func TestOperationTracker_filterOps(t *testing.T) { ctx := context.Background() - testOpsMap := map[cid.Cid]*Operation{ + testOpsMap := map[api.Cid]*Operation{ test.Cid1: {pin: api.PinCid(test.Cid1), opType: OperationPin, phase: PhaseQueued}, test.Cid2: {pin: api.PinCid(test.Cid2), opType: OperationPin, phase: PhaseInProgress}, test.Cid3: {pin: api.PinCid(test.Cid3), opType: OperationUnpin, phase: PhaseInProgress}, diff --git a/pintracker/pintracker_test.go b/pintracker/pintracker_test.go index 9a0305b6..78ef5750 100644 --- a/pintracker/pintracker_test.go +++ b/pintracker/pintracker_test.go @@ -19,7 +19,6 @@ import ( "github.com/ipfs/ipfs-cluster/state/dsstate" "github.com/ipfs/ipfs-cluster/test" - cid "github.com/ipfs/go-cid" peer "github.com/libp2p/go-libp2p-core/peer" ) @@ -139,7 +138,7 @@ func BenchmarkPinTracker_Track(b *testing.B) { func TestPinTracker_Untrack(t *testing.T) { type args struct { - c cid.Cid + c api.Cid tracker ipfscluster.PinTracker } tests := []struct { @@ -273,7 +272,7 @@ func TestPinTracker_StatusAll(t *testing.T) { func TestPinTracker_Status(t *testing.T) { type args struct { - c cid.Cid + c api.Cid tracker ipfscluster.PinTracker } tests := []struct { @@ -397,7 +396,7 @@ func TestPinTracker_RecoverAll(t *testing.T) { func TestPinTracker_Recover(t *testing.T) { type args struct { - c cid.Cid + c api.Cid tracker ipfscluster.PinTracker } tests := []struct { @@ -438,7 +437,7 @@ func TestPinTracker_Recover(t *testing.T) { func TestUntrackTrack(t *testing.T) { type args struct { - c cid.Cid + c api.Cid tracker ipfscluster.PinTracker } tests := []struct { @@ -481,7 +480,7 @@ func TestUntrackTrack(t *testing.T) { func TestTrackUntrackWithCancel(t *testing.T) { type args struct { - c cid.Cid + c api.Cid tracker ipfscluster.PinTracker } tests := []struct { diff --git a/pintracker/stateless/stateless.go b/pintracker/stateless/stateless.go index fd43823a..a6f2caa0 100644 --- a/pintracker/stateless/stateless.go +++ b/pintracker/stateless/stateless.go @@ -14,7 +14,6 @@ import ( "github.com/ipfs/ipfs-cluster/pintracker/optracker" "github.com/ipfs/ipfs-cluster/state" - cid "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" peer "github.com/libp2p/go-libp2p-core/peer" rpc "github.com/libp2p/go-libp2p-gorpc" @@ -315,7 +314,7 @@ func (spt *Tracker) Track(ctx context.Context, c api.Pin) error { // Untrack tells the StatelessPinTracker to stop managing a Cid. // If the Cid is pinned locally, it will be unpinned. -func (spt *Tracker) Untrack(ctx context.Context, c cid.Cid) error { +func (spt *Tracker) Untrack(ctx context.Context, c api.Cid) error { ctx, span := trace.StartSpan(ctx, "tracker/stateless/Untrack") defer span.End() @@ -455,7 +454,7 @@ func (spt *Tracker) StatusAll(ctx context.Context, filter api.TrackerStatus, out } // Status returns information for a Cid pinned to the local IPFS node. -func (spt *Tracker) Status(ctx context.Context, c cid.Cid) api.PinInfo { +func (spt *Tracker) Status(ctx context.Context, c api.Cid) api.PinInfo { ctx, span := trace.StartSpan(ctx, "tracker/stateless/Status") defer span.End() @@ -594,7 +593,7 @@ func (spt *Tracker) RecoverAll(ctx context.Context, out chan<- api.PinInfo) erro // Recover will trigger pinning or unpinning for items in // PinError or UnpinError states. -func (spt *Tracker) Recover(ctx context.Context, c cid.Cid) (api.PinInfo, error) { +func (spt *Tracker) Recover(ctx context.Context, c api.Cid) (api.PinInfo, error) { ctx, span := trace.StartSpan(ctx, "tracker/stateless/Recover") defer span.End() @@ -662,7 +661,7 @@ func (spt *Tracker) ipfsPins(ctx context.Context) (<-chan api.IPFSPinInfo, error // OpContext exports the internal optracker's OpContext method. // For testing purposes only. -func (spt *Tracker) OpContext(ctx context.Context, c cid.Cid) context.Context { +func (spt *Tracker) OpContext(ctx context.Context, c api.Cid) context.Context { return spt.optracker.OpContext(ctx, c) } diff --git a/pintracker/stateless/stateless_test.go b/pintracker/stateless/stateless_test.go index d6f18980..7dffc07e 100644 --- a/pintracker/stateless/stateless_test.go +++ b/pintracker/stateless/stateless_test.go @@ -12,7 +12,6 @@ import ( "github.com/ipfs/ipfs-cluster/state/dsstate" "github.com/ipfs/ipfs-cluster/test" - cid "github.com/ipfs/go-cid" peer "github.com/libp2p/go-libp2p-core/peer" rpc "github.com/libp2p/go-libp2p-gorpc" ) @@ -286,7 +285,7 @@ func TestTrackUntrackWithNoCancel(t *testing.T) { } pi := spt.optracker.Get(ctx, fastPin.Cid, api.IPFSID{}) - if pi.Cid == cid.Undef { + if !pi.Cid.Defined() { t.Error("fastPin should have been removed from tracker") } } @@ -318,7 +317,7 @@ func TestUntrackTrackWithCancel(t *testing.T) { time.Sleep(100 * time.Millisecond) pi := spt.optracker.Get(ctx, slowPin.Cid, api.IPFSID{}) - if pi.Cid == cid.Undef { + if !pi.Cid.Defined() { t.Fatal("expected slowPin to be tracked") } @@ -379,7 +378,7 @@ func TestUntrackTrackWithNoCancel(t *testing.T) { } pi := spt.optracker.Get(ctx, fastPin.Cid, api.IPFSID{}) - if pi.Cid == cid.Undef { + if !pi.Cid.Defined() { t.Fatal("c untrack operation should be tracked") } diff --git a/rpc_api.go b/rpc_api.go index 5efbc4c1..581dfc09 100644 --- a/rpc_api.go +++ b/rpc_api.go @@ -8,7 +8,6 @@ import ( "github.com/ipfs/ipfs-cluster/state" "github.com/ipfs/ipfs-cluster/version" - cid "github.com/ipfs/go-cid" peer "github.com/libp2p/go-libp2p-core/peer" rpc "github.com/libp2p/go-libp2p-gorpc" @@ -221,7 +220,7 @@ func (rpcapi *ClusterRPCAPI) Pins(ctx context.Context, in <-chan struct{}, out c } // PinGet runs Cluster.PinGet(). -func (rpcapi *ClusterRPCAPI) PinGet(ctx context.Context, in cid.Cid, out *api.Pin) error { +func (rpcapi *ClusterRPCAPI) PinGet(ctx context.Context, in api.Cid, out *api.Pin) error { pin, err := rpcapi.c.PinGet(ctx, in) if err != nil { return err @@ -294,7 +293,7 @@ func (rpcapi *ClusterRPCAPI) StatusAllLocal(ctx context.Context, in <-chan api.T } // Status runs Cluster.Status(). -func (rpcapi *ClusterRPCAPI) Status(ctx context.Context, in cid.Cid, out *api.GlobalPinInfo) error { +func (rpcapi *ClusterRPCAPI) Status(ctx context.Context, in api.Cid, out *api.GlobalPinInfo) error { pinfo, err := rpcapi.c.Status(ctx, in) if err != nil { return err @@ -304,7 +303,7 @@ func (rpcapi *ClusterRPCAPI) Status(ctx context.Context, in cid.Cid, out *api.Gl } // StatusLocal runs Cluster.StatusLocal(). -func (rpcapi *ClusterRPCAPI) StatusLocal(ctx context.Context, in cid.Cid, out *api.PinInfo) error { +func (rpcapi *ClusterRPCAPI) StatusLocal(ctx context.Context, in api.Cid, out *api.PinInfo) error { pinfo := rpcapi.c.StatusLocal(ctx, in) *out = pinfo return nil @@ -321,7 +320,7 @@ func (rpcapi *ClusterRPCAPI) RecoverAllLocal(ctx context.Context, in <-chan stru } // Recover runs Cluster.Recover(). -func (rpcapi *ClusterRPCAPI) Recover(ctx context.Context, in cid.Cid, out *api.GlobalPinInfo) error { +func (rpcapi *ClusterRPCAPI) Recover(ctx context.Context, in api.Cid, out *api.GlobalPinInfo) error { pinfo, err := rpcapi.c.Recover(ctx, in) if err != nil { return err @@ -331,7 +330,7 @@ func (rpcapi *ClusterRPCAPI) Recover(ctx context.Context, in cid.Cid, out *api.G } // RecoverLocal runs Cluster.RecoverLocal(). -func (rpcapi *ClusterRPCAPI) RecoverLocal(ctx context.Context, in cid.Cid, out *api.PinInfo) error { +func (rpcapi *ClusterRPCAPI) RecoverLocal(ctx context.Context, in api.Cid, out *api.PinInfo) error { pinfo, err := rpcapi.c.RecoverLocal(ctx, in) if err != nil { return err @@ -411,7 +410,7 @@ func (rpcapi *ClusterRPCAPI) RepoGCLocal(ctx context.Context, in struct{}, out * return nil } -// SendInformerMetric runs Cluster.sendInformerMetric(). +// SendInformerMetrics runs Cluster.sendInformerMetric(). func (rpcapi *ClusterRPCAPI) SendInformerMetrics(ctx context.Context, in struct{}, out *struct{}) error { return rpcapi.c.sendInformersMetrics(ctx) } @@ -475,7 +474,7 @@ func (rpcapi *PinTrackerRPCAPI) StatusAll(ctx context.Context, in <-chan api.Tra } // Status runs PinTracker.Status(). -func (rpcapi *PinTrackerRPCAPI) Status(ctx context.Context, in cid.Cid, out *api.PinInfo) error { +func (rpcapi *PinTrackerRPCAPI) Status(ctx context.Context, in api.Cid, out *api.PinInfo) error { ctx, span := trace.StartSpan(ctx, "rpc/tracker/Status") defer span.End() pinfo := rpcapi.tracker.Status(ctx, in) @@ -491,7 +490,7 @@ func (rpcapi *PinTrackerRPCAPI) RecoverAll(ctx context.Context, in <-chan struct } // Recover runs PinTracker.Recover(). -func (rpcapi *PinTrackerRPCAPI) Recover(ctx context.Context, in cid.Cid, out *api.PinInfo) error { +func (rpcapi *PinTrackerRPCAPI) Recover(ctx context.Context, in api.Cid, out *api.PinInfo) error { ctx, span := trace.StartSpan(ctx, "rpc/tracker/Recover") defer span.End() pinfo, err := rpcapi.tracker.Recover(ctx, in) @@ -577,7 +576,7 @@ func (rpcapi *IPFSConnectorRPCAPI) BlockStream(ctx context.Context, in <-chan ap } // BlockGet runs IPFSConnector.BlockGet(). -func (rpcapi *IPFSConnectorRPCAPI) BlockGet(ctx context.Context, in cid.Cid, out *[]byte) error { +func (rpcapi *IPFSConnectorRPCAPI) BlockGet(ctx context.Context, in api.Cid, out *[]byte) error { res, err := rpcapi.ipfs.BlockGet(ctx, in) if err != nil { return err @@ -587,7 +586,7 @@ func (rpcapi *IPFSConnectorRPCAPI) BlockGet(ctx context.Context, in cid.Cid, out } // Resolve runs IPFSConnector.Resolve(). -func (rpcapi *IPFSConnectorRPCAPI) Resolve(ctx context.Context, in string, out *cid.Cid) error { +func (rpcapi *IPFSConnectorRPCAPI) Resolve(ctx context.Context, in string, out *api.Cid) error { c, err := rpcapi.ipfs.Resolve(ctx, in) if err != nil { return err diff --git a/sharness/t0030-ctl-pin.sh b/sharness/t0030-ctl-pin.sh index e6da9901..fafb4456 100755 --- a/sharness/t0030-ctl-pin.sh +++ b/sharness/t0030-ctl-pin.sh @@ -15,7 +15,7 @@ test_expect_success IPFS,CLUSTER "pin data to cluster with ctl" ' ' test_expect_success IPFS,CLUSTER "unpin data from cluster with ctl" ' - cid=`ipfs-cluster-ctl --enc=json pin ls | jq -r ".cid | .[\"/\"]" | head -1` + cid=`ipfs-cluster-ctl --enc=json pin ls | jq -r ".cid" | head -1` ipfs-cluster-ctl pin rm "$cid" && !(ipfs-cluster-ctl pin ls "$cid" | grep -q "$cid") && ipfs-cluster-ctl status "$cid" | grep -q -i "UNPINNED" @@ -29,7 +29,7 @@ test_expect_success IPFS,CLUSTER "wait for data to pin to cluster with ctl" ' ' test_expect_success IPFS,CLUSTER "wait for data to unpin from cluster with ctl" ' - cid=`ipfs-cluster-ctl --enc=json pin ls | jq -r ".cid | .[\"/\"]" | head -1` + cid=`ipfs-cluster-ctl --enc=json pin ls | jq -r ".cid" | head -1` ipfs-cluster-ctl pin rm --wait "$cid" | grep -q -i "UNPINNED" && !(ipfs-cluster-ctl pin ls "$cid" | grep -q "$cid") && ipfs-cluster-ctl status "$cid" | grep -q -i "UNPINNED" @@ -43,7 +43,7 @@ test_expect_success IPFS,CLUSTER "wait for data to pin to cluster with ctl with ' test_expect_success IPFS,CLUSTER "wait for data to unpin from cluster with ctl with timeout" ' - cid=`ipfs-cluster-ctl --enc=json pin ls | jq -r ".cid | .[\"/\"]" | head -1` + cid=`ipfs-cluster-ctl --enc=json pin ls | jq -r ".cid" | head -1` ipfs-cluster-ctl pin rm --wait --wait-timeout 2s "$cid" | grep -q -i "UNPINNED" && !(ipfs-cluster-ctl pin ls "$cid" | grep -q "$cid") && ipfs-cluster-ctl status "$cid" | grep -q -i "UNPINNED" diff --git a/sharness/t0052-service-state-export.sh b/sharness/t0052-service-state-export.sh index d0dcdccc..60700376 100755 --- a/sharness/t0052-service-state-export.sh +++ b/sharness/t0052-service-state-export.sh @@ -14,7 +14,7 @@ test_expect_success IPFS,CLUSTER,JQ "state export saves the correct state to exp cluster_kill && sleep 5 && ipfs-cluster-service --debug --config "test-config" state export -f export.json && [ -f export.json ] && - jq -r ".cid | .[\"/\"]" export.json | grep -q "$cid" + jq -r ".cid" export.json | grep -q "$cid" ' cluster_kill @@ -28,7 +28,7 @@ test_expect_success IPFS,CLUSTER,JQ "state export saves the correct state to exp cluster_kill && sleep 5 && ipfs-cluster-service --debug --config "test-config" state export -f export.json && [ -f export.json ] && - jq -r ".cid | .[\"/\"]" export.json | grep -q "$cid" + jq -r ".cid" export.json | grep -q "$cid" ' test_clean_ipfs diff --git a/state/dsstate/datastore.go b/state/dsstate/datastore.go index 82e340f8..8cda7e1a 100644 --- a/state/dsstate/datastore.go +++ b/state/dsstate/datastore.go @@ -10,7 +10,6 @@ import ( "github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/state" - cid "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" query "github.com/ipfs/go-datastore/query" dshelp "github.com/ipfs/go-ipfs-ds-help" @@ -79,7 +78,7 @@ func (st *State) Add(ctx context.Context, c api.Pin) error { // Rm removes an existing Pin. It is a no-op when the // item does not exist. -func (st *State) Rm(ctx context.Context, c cid.Cid) error { +func (st *State) Rm(ctx context.Context, c api.Cid) error { _, span := trace.StartSpan(ctx, "state/dsstate/Rm") defer span.End() @@ -93,7 +92,7 @@ func (st *State) Rm(ctx context.Context, c cid.Cid) error { // Get returns a Pin from the store and whether it // was present. When not present, a default pin // is returned. -func (st *State) Get(ctx context.Context, c cid.Cid) (api.Pin, error) { +func (st *State) Get(ctx context.Context, c api.Cid) (api.Pin, error) { _, span := trace.StartSpan(ctx, "state/dsstate/Get") defer span.End() @@ -112,7 +111,7 @@ func (st *State) Get(ctx context.Context, c cid.Cid) (api.Pin, error) { } // Has returns whether a Cid is stored. -func (st *State) Has(ctx context.Context, c cid.Cid) (bool, error) { +func (st *State) Has(ctx context.Context, c api.Cid) (bool, error) { _, span := trace.StartSpan(ctx, "state/dsstate/Has") defer span.End() @@ -254,27 +253,28 @@ func (st *State) Unmarshal(r io.Reader) error { } // used to be on go-ipfs-ds-help -func cidToDsKey(c cid.Cid) ds.Key { +func cidToDsKey(c api.Cid) ds.Key { return dshelp.NewKeyFromBinary(c.Bytes()) } // used to be on go-ipfs-ds-help -func dsKeyToCid(k ds.Key) (cid.Cid, error) { +func dsKeyToCid(k ds.Key) (api.Cid, error) { kb, err := dshelp.BinaryFromDsKey(k) if err != nil { - return cid.Undef, err + return api.CidUndef, err } - return cid.Cast(kb) + c, err := api.CastCid(kb) + return c, err } // convert Cid to /namespace/cid1Key -func (st *State) key(c cid.Cid) ds.Key { +func (st *State) key(c api.Cid) ds.Key { k := cidToDsKey(c) return st.namespace.Child(k) } // convert /namespace/cidKey to Cid -func (st *State) unkey(k ds.Key) (cid.Cid, error) { +func (st *State) unkey(k ds.Key) (api.Cid, error) { return dsKeyToCid(ds.NewKey(k.BaseNamespace())) } @@ -286,7 +286,7 @@ func (st *State) serializePin(c api.Pin) ([]byte, error) { // this deserializes a Pin object from the datastore. It should be // the exact opposite from serializePin. -func (st *State) deserializePin(c cid.Cid, buf []byte) (api.Pin, error) { +func (st *State) deserializePin(c api.Cid, buf []byte) (api.Pin, error) { p := api.Pin{} err := p.ProtoUnmarshal(buf) p.Cid = c diff --git a/state/dsstate/datastore_test.go b/state/dsstate/datastore_test.go index 6ece8c54..324e3699 100644 --- a/state/dsstate/datastore_test.go +++ b/state/dsstate/datastore_test.go @@ -9,11 +9,10 @@ import ( "github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/datastore/inmem" - cid "github.com/ipfs/go-cid" peer "github.com/libp2p/go-libp2p-core/peer" ) -var testCid1, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmq") +var testCid1, _ = api.DecodeCid("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmq") var testPeerID1, _ = peer.Decode("QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLewuabc") var c = api.Pin{ diff --git a/state/empty.go b/state/empty.go index 64523a5b..f4b3ee7a 100644 --- a/state/empty.go +++ b/state/empty.go @@ -4,8 +4,6 @@ import ( "context" "github.com/ipfs/ipfs-cluster/api" - - cid "github.com/ipfs/go-cid" ) type empty struct{} @@ -15,11 +13,11 @@ func (e *empty) List(ctx context.Context, out chan<- api.Pin) error { return nil } -func (e *empty) Has(ctx context.Context, c cid.Cid) (bool, error) { +func (e *empty) Has(ctx context.Context, c api.Cid) (bool, error) { return false, nil } -func (e *empty) Get(ctx context.Context, c cid.Cid) (api.Pin, error) { +func (e *empty) Get(ctx context.Context, c api.Cid) (api.Pin, error) { return api.Pin{}, ErrNotFound } diff --git a/state/interface.go b/state/interface.go index 4de202eb..6fc6b489 100644 --- a/state/interface.go +++ b/state/interface.go @@ -9,8 +9,6 @@ import ( "io" "github.com/ipfs/ipfs-cluster/api" - - cid "github.com/ipfs/go-cid" ) // ErrNotFound should be returned when a pin is not part of the state. @@ -36,10 +34,10 @@ type ReadOnly interface { // List lists all the pins in the state. List(context.Context, chan<- api.Pin) error // Has returns true if the state is holding information for a Cid. - Has(context.Context, cid.Cid) (bool, error) + Has(context.Context, api.Cid) (bool, error) // Get returns the information attacthed to this pin, if any. If the // pin is not part of the state, it should return ErrNotFound. - Get(context.Context, cid.Cid) (api.Pin, error) + Get(context.Context, api.Cid) (api.Pin, error) } // WriteOnly represents the write side of a State. @@ -47,7 +45,7 @@ type WriteOnly interface { // Add adds a pin to the State Add(context.Context, api.Pin) error // Rm removes a pin from the State. - Rm(context.Context, cid.Cid) error + Rm(context.Context, api.Cid) error } // BatchingState represents a state which batches write operations. diff --git a/test/cids.go b/test/cids.go index 8a2f1e30..95e96215 100644 --- a/test/cids.go +++ b/test/cids.go @@ -1,30 +1,30 @@ package test import ( - cid "github.com/ipfs/go-cid" + "github.com/ipfs/ipfs-cluster/api" peer "github.com/libp2p/go-libp2p-core/peer" ) // Common variables used all around tests. var ( - Cid1, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmq") - Cid2, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmma") - Cid3, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmb") + Cid1, _ = api.DecodeCid("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmq") + Cid2, _ = api.DecodeCid("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmma") + Cid3, _ = api.DecodeCid("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmb") Cid4Data = "Cid4Data" // Cid resulting from block put using blake2b-256 and raw format - Cid4, _ = cid.Decode("bafk2bzaceawsyhsnrwwy5mtit2emnjfalkxsyq2p2ptd6fuliolzwwjbs42fq") + Cid4, _ = api.DecodeCid("bafk2bzaceawsyhsnrwwy5mtit2emnjfalkxsyq2p2ptd6fuliolzwwjbs42fq") // Cid resulting from block put using format "v0" defaults - Cid5, _ = cid.Decode("QmbgmXgsFjxAJ7cEaziL2NDSptHAkPwkEGMmKMpfyYeFXL") + Cid5, _ = api.DecodeCid("QmbgmXgsFjxAJ7cEaziL2NDSptHAkPwkEGMmKMpfyYeFXL") Cid5Data = "Cid5Data" - SlowCid1, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmd") - CidResolved, _ = cid.Decode("zb2rhiKhUepkTMw7oFfBUnChAN7ABAvg2hXUwmTBtZ6yxuabc") + SlowCid1, _ = api.DecodeCid("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmd") + CidResolved, _ = api.DecodeCid("zb2rhiKhUepkTMw7oFfBUnChAN7ABAvg2hXUwmTBtZ6yxuabc") // ErrorCid is meant to be used as a Cid which causes errors. i.e. the // ipfs mock fails when pinning this CID. - ErrorCid, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmc") + ErrorCid, _ = api.DecodeCid("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmc") // NotFoundCid is meant to be used as a CID that doesn't exist in the // pinset. - NotFoundCid, _ = cid.Decode("bafyreiay3jpjk74dkckv2r74eyvf3lfnxujefay2rtuluintasq2zlapv4") + NotFoundCid, _ = api.DecodeCid("bafyreiay3jpjk74dkckv2r74eyvf3lfnxujefay2rtuluintasq2zlapv4") PeerID1, _ = peer.Decode("QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLewuabc") PeerID2, _ = peer.Decode("QmUZ13osndQ5uL4tPWHXe3iBgBgq9gfewcBMSCAuMBsDJ6") PeerID3, _ = peer.Decode("QmPGDFvBkgWhvzEK9qaTWrWurSwqXNmhnK3hgELPdZZNPa") diff --git a/test/ipfs_mock.go b/test/ipfs_mock.go index f4fa00fc..d02ba3c8 100644 --- a/test/ipfs_mock.go +++ b/test/ipfs_mock.go @@ -192,7 +192,7 @@ func (m *IpfsMock) handler(w http.ResponseWriter, r *http.Request) { if arg == ErrorCid.String() { goto ERROR } - c, err := cid.Decode(arg) + c, err := api.DecodeCid(arg) if err != nil { goto ERROR } @@ -222,7 +222,7 @@ func (m *IpfsMock) handler(w http.ResponseWriter, r *http.Request) { if !ok { goto ERROR } - c, err := cid.Decode(arg) + c, err := api.DecodeCid(arg) if err != nil { goto ERROR } @@ -239,11 +239,11 @@ func (m *IpfsMock) handler(w http.ResponseWriter, r *http.Request) { } fromStr := args[0] toStr := args[1] - from, err := cid.Decode(fromStr) + from, err := api.DecodeCid(fromStr) if err != nil { goto ERROR } - to, err := cid.Decode(toStr) + to, err := api.DecodeCid(toStr) if err != nil { goto ERROR } @@ -301,7 +301,7 @@ func (m *IpfsMock) handler(w http.ResponseWriter, r *http.Request) { } cidStr := arg - c, err := cid.Decode(cidStr) + c, err := api.DecodeCid(cidStr) if err != nil { goto ERROR } @@ -443,16 +443,16 @@ func (m *IpfsMock) handler(w http.ResponseWriter, r *http.Request) { enc := json.NewEncoder(w) resp := []mockRepoGCResp{ { - Key: Cid1, + Key: Cid1.Cid, }, { - Key: Cid2, + Key: Cid2.Cid, }, { - Key: Cid3, + Key: Cid3.Cid, }, { - Key: Cid4, + Key: Cid4.Cid, }, { Error: "no link by that name", diff --git a/test/rpc_api_mock.go b/test/rpc_api_mock.go index 894be139..d24fed91 100644 --- a/test/rpc_api_mock.go +++ b/test/rpc_api_mock.go @@ -10,7 +10,6 @@ import ( "github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/state" - cid "github.com/ipfs/go-cid" gopath "github.com/ipfs/go-path" host "github.com/libp2p/go-libp2p-core/host" peer "github.com/libp2p/go-libp2p-core/peer" @@ -105,10 +104,11 @@ func (mock *mockCluster) PinPath(ctx context.Context, in api.PinPath, out *api.P if err != nil { return err } - if c.Equals(ErrorCid) { + cc := api.NewCid(c) + if cc.Equals(ErrorCid) { return ErrBadCid } - pin = api.PinWithOpts(c, in.PinOptions) + pin = api.PinWithOpts(cc, in.PinOptions) } else { pin = api.PinWithOpts(CidResolved, in.PinOptions) } @@ -139,7 +139,7 @@ func (mock *mockCluster) Pins(ctx context.Context, in <-chan struct{}, out chan< return nil } -func (mock *mockCluster) PinGet(ctx context.Context, in cid.Cid, out *api.Pin) error { +func (mock *mockCluster) PinGet(ctx context.Context, in api.Cid, out *api.Pin) error { switch in.String() { case ErrorCid.String(): return errors.New("this is an expected error when using ErrorCid") @@ -307,7 +307,7 @@ func (mock *mockCluster) StatusAllLocal(ctx context.Context, in <-chan api.Track return (&mockPinTracker{}).StatusAll(ctx, in, out) } -func (mock *mockCluster) Status(ctx context.Context, in cid.Cid, out *api.GlobalPinInfo) error { +func (mock *mockCluster) Status(ctx context.Context, in api.Cid, out *api.GlobalPinInfo) error { if in.Equals(ErrorCid) { return ErrBadCid } @@ -335,7 +335,7 @@ func (mock *mockCluster) Status(ctx context.Context, in cid.Cid, out *api.Global return nil } -func (mock *mockCluster) StatusLocal(ctx context.Context, in cid.Cid, out *api.PinInfo) error { +func (mock *mockCluster) StatusLocal(ctx context.Context, in api.Cid, out *api.PinInfo) error { return (&mockPinTracker{}).Status(ctx, in, out) } @@ -350,11 +350,11 @@ func (mock *mockCluster) RecoverAllLocal(ctx context.Context, in <-chan struct{} return (&mockPinTracker{}).RecoverAll(ctx, in, out) } -func (mock *mockCluster) Recover(ctx context.Context, in cid.Cid, out *api.GlobalPinInfo) error { +func (mock *mockCluster) Recover(ctx context.Context, in api.Cid, out *api.GlobalPinInfo) error { return mock.Status(ctx, in, out) } -func (mock *mockCluster) RecoverLocal(ctx context.Context, in cid.Cid, out *api.PinInfo) error { +func (mock *mockCluster) RecoverLocal(ctx context.Context, in api.Cid, out *api.PinInfo) error { return (&mockPinTracker{}).Recover(ctx, in, out) } @@ -469,7 +469,7 @@ func (mock *mockPinTracker) StatusAll(ctx context.Context, in <-chan api.Tracker return nil } -func (mock *mockPinTracker) Status(ctx context.Context, in cid.Cid, out *api.PinInfo) error { +func (mock *mockPinTracker) Status(ctx context.Context, in api.Cid, out *api.PinInfo) error { if in.Equals(ErrorCid) { return ErrBadCid } @@ -490,7 +490,7 @@ func (mock *mockPinTracker) RecoverAll(ctx context.Context, in <-chan struct{}, return nil } -func (mock *mockPinTracker) Recover(ctx context.Context, in cid.Cid, out *api.PinInfo) error { +func (mock *mockPinTracker) Recover(ctx context.Context, in api.Cid, out *api.PinInfo) error { *out = api.PinInfo{ Cid: in, Peer: PeerID1, @@ -589,7 +589,7 @@ func (mock *mockIPFSConnector) BlockStream(ctx context.Context, in <-chan api.No return nil } -func (mock *mockIPFSConnector) Resolve(ctx context.Context, in string, out *cid.Cid) error { +func (mock *mockIPFSConnector) Resolve(ctx context.Context, in string, out *api.Cid) error { switch in { case ErrorCid.String(), "/ipfs/" + ErrorCid.String(): *out = ErrorCid diff --git a/test/sharding.go b/test/sharding.go index fbf8ec3b..9082ef62 100644 --- a/test/sharding.go +++ b/test/sharding.go @@ -12,6 +12,7 @@ import ( files "github.com/ipfs/go-ipfs-files" format "github.com/ipfs/go-ipld-format" + "github.com/ipfs/ipfs-cluster/api" cid "github.com/ipfs/go-cid" ) @@ -62,7 +63,7 @@ var ( } // Used for testing blockput/blockget - ShardCid, _ = cid.Decode("zdpuAoiNm1ntWx6jpgcReTiCWFHJSTpvTw4bAAn9p6yDnznqh") + ShardCid, _ = api.DecodeCid("zdpuAoiNm1ntWx6jpgcReTiCWFHJSTpvTw4bAAn9p6yDnznqh") ShardData, _ = hex.DecodeString("a16130d82a58230012209273fd63ec94bed5abb219b2d9cb010cabe4af7b0177292d4335eff50464060a") ) diff --git a/util.go b/util.go index ba45ea52..fad96375 100644 --- a/util.go +++ b/util.go @@ -9,7 +9,6 @@ import ( blake2b "golang.org/x/crypto/blake2b" - cid "github.com/ipfs/go-cid" "github.com/ipfs/ipfs-cluster/api" peer "github.com/libp2p/go-libp2p-core/peer" ma "github.com/multiformats/go-multiaddr" @@ -100,7 +99,7 @@ type distanceChecker struct { cache map[peer.ID]distance } -func (dc distanceChecker) isClosest(ci cid.Cid) bool { +func (dc distanceChecker) isClosest(ci api.Cid) bool { ciHash := convertKey(ci.KeyString()) localPeerHash := dc.convertPeerID(dc.local) myDistance := xor(ciHash, localPeerHash) diff --git a/version/version.go b/version/version.go index b5a50dfa..5cc7015a 100644 --- a/version/version.go +++ b/version/version.go @@ -1,3 +1,4 @@ +// Package version stores version information for IPFS Cluster. package version import (