diff --git a/Makefile b/Makefile index 225eeae7..1d997e89 100644 --- a/Makefile +++ b/Makefile @@ -29,7 +29,7 @@ follow: check: go vet ./... - staticcheck ./... + staticcheck --checks all ./... test: go test -v ./... diff --git a/adder/adder.go b/adder/adder.go index 1d5cc637..6b93656e 100644 --- a/adder/adder.go +++ b/adder/adder.go @@ -42,7 +42,7 @@ type ClusterDAGService interface { ipld.DAGService // Finalize receives the IPFS content root CID as // returned by the ipfs adder. - Finalize(ctx context.Context, ipfsRoot cid.Cid) (cid.Cid, error) + Finalize(ctx context.Context, ipfsRoot api.Cid) (api.Cid, error) // Allocations returns the allocations made by the cluster DAG service // for the added content. Allocations() []peer.ID @@ -51,7 +51,7 @@ type ClusterDAGService interface { // A dagFormatter can create dags from files.Node. It can keep state // to add several files to the same dag. type dagFormatter interface { - Add(name string, f files.Node) (cid.Cid, error) + Add(name string, f files.Node) (api.Cid, error) } // Adder is used to add content to IPFS Cluster using an implementation of @@ -103,12 +103,12 @@ func (a *Adder) setContext(ctx context.Context) { // FromMultipart adds content from a multipart.Reader. The adder will // no longer be usable after calling this method. -func (a *Adder) FromMultipart(ctx context.Context, r *multipart.Reader) (cid.Cid, error) { +func (a *Adder) FromMultipart(ctx context.Context, r *multipart.Reader) (api.Cid, error) { logger.Debugf("adding from multipart with params: %+v", a.params) f, err := files.NewFileFromPartReader(r, "multipart/form-data") if err != nil { - return cid.Undef, err + return api.CidUndef, err } defer f.Close() return a.FromFiles(ctx, f) @@ -116,12 +116,12 @@ func (a *Adder) FromMultipart(ctx context.Context, r *multipart.Reader) (cid.Cid // FromFiles adds content from a files.Directory. The adder will no longer // be usable after calling this method. -func (a *Adder) FromFiles(ctx context.Context, f files.Directory) (cid.Cid, error) { +func (a *Adder) FromFiles(ctx context.Context, f files.Directory) (api.Cid, error) { logger.Debug("adding from files") a.setContext(ctx) if a.ctx.Err() != nil { // don't allow running twice - return cid.Undef, a.ctx.Err() + return api.CidUndef, a.ctx.Err() } defer a.cancel() @@ -139,7 +139,7 @@ func (a *Adder) FromFiles(ctx context.Context, f files.Directory) (cid.Cid, erro err = errors.New("bad dag formatter option") } if err != nil { - return cid.Undef, err + return api.CidUndef, err } // setup wrapping @@ -150,18 +150,18 @@ func (a *Adder) FromFiles(ctx context.Context, f files.Directory) (cid.Cid, erro } it := f.Entries() - var adderRoot cid.Cid + var adderRoot api.Cid for it.Next() { select { case <-a.ctx.Done(): - return cid.Undef, a.ctx.Err() + return api.CidUndef, a.ctx.Err() default: logger.Debugf("ipfsAdder AddFile(%s)", it.Name()) adderRoot, err = dagFmtr.Add(it.Name(), it.Node()) if err != nil { logger.Error("error adding to cluster: ", err) - return cid.Undef, err + return api.CidUndef, err } } // TODO (hector): We can only add a single CAR file for the @@ -171,13 +171,13 @@ func (a *Adder) FromFiles(ctx context.Context, f files.Directory) (cid.Cid, erro } } if it.Err() != nil { - return cid.Undef, it.Err() + return api.CidUndef, it.Err() } clusterRoot, err := a.dgs.Finalize(a.ctx, adderRoot) if err != nil { logger.Error("error finalizing adder:", err) - return cid.Undef, err + return api.CidUndef, err } logger.Infof("%s successfully added to cluster", clusterRoot) return clusterRoot, nil @@ -220,7 +220,7 @@ func newIpfsAdder(ctx context.Context, dgs ClusterDAGService, params api.AddPara }, nil } -func (ia *ipfsAdder) Add(name string, f files.Node) (cid.Cid, error) { +func (ia *ipfsAdder) Add(name string, f files.Node) (api.Cid, error) { // In order to set the AddedOutput names right, we use // OutputPrefix: // @@ -239,9 +239,9 @@ func (ia *ipfsAdder) Add(name string, f files.Node) (cid.Cid, error) { nd, err := ia.AddAllAndPin(f) if err != nil { - return cid.Undef, err + return api.CidUndef, err } - return nd.Cid(), nil + return api.NewCid(nd.Cid()), nil } // An adder to add CAR files. It is at the moment very basic, and can @@ -267,22 +267,22 @@ func newCarAdder(ctx context.Context, dgs ClusterDAGService, params api.AddParam // Add takes a node which should be a CAR file and nothing else and // adds its blocks using the ClusterDAGService. -func (ca *carAdder) Add(name string, fn files.Node) (cid.Cid, error) { +func (ca *carAdder) Add(name string, fn files.Node) (api.Cid, error) { if ca.params.Wrap { - return cid.Undef, errors.New("cannot wrap a CAR file upload") + return api.CidUndef, errors.New("cannot wrap a CAR file upload") } f, ok := fn.(files.File) if !ok { - return cid.Undef, errors.New("expected CAR file is not of type file") + return api.CidUndef, errors.New("expected CAR file is not of type file") } carReader, err := car.NewCarReader(f) if err != nil { - return cid.Undef, err + return api.CidUndef, err } if len(carReader.Header.Roots) != 1 { - return cid.Undef, errors.New("only CAR files with a single root are supported") + return api.CidUndef, errors.New("only CAR files with a single root are supported") } root := carReader.Header.Roots[0] @@ -292,7 +292,7 @@ func (ca *carAdder) Add(name string, fn files.Node) (cid.Cid, error) { for { block, err := carReader.Next() if err != nil && err != io.EOF { - return cid.Undef, err + return api.CidUndef, err } else if block == nil { break } @@ -301,7 +301,7 @@ func (ca *carAdder) Add(name string, fn files.Node) (cid.Cid, error) { nd, err := ipld.Decode(block) if err != nil { - return cid.Undef, err + return api.CidUndef, err } // If the root is in the CAR and the root is a UnixFS @@ -315,17 +315,17 @@ func (ca *carAdder) Add(name string, fn files.Node) (cid.Cid, error) { err = ca.dgs.Add(ca.ctx, nd) if err != nil { - return cid.Undef, err + return api.CidUndef, err } } ca.output <- api.AddedOutput{ Name: name, - Cid: root, + Cid: api.NewCid(root), Bytes: bytes, Size: size, Allocations: ca.dgs.Allocations(), } - return root, nil + return api.NewCid(root), nil } diff --git a/adder/adder_test.go b/adder/adder_test.go index 7d9383ae..61d39984 100644 --- a/adder/adder_test.go +++ b/adder/adder_test.go @@ -28,7 +28,7 @@ func newMockCDAGServ() *mockCDAGServ { } // noop -func (dag *mockCDAGServ) Finalize(ctx context.Context, root cid.Cid) (cid.Cid, error) { +func (dag *mockCDAGServ) Finalize(ctx context.Context, root api.Cid) (api.Cid, error) { return root, nil } @@ -152,7 +152,7 @@ func TestAdder_CAR(t *testing.T) { t.Fatal(err) } var carBuf bytes.Buffer - err = car.WriteCar(ctx, dags, []cid.Cid{root}, &carBuf) + err = car.WriteCar(ctx, dags, []cid.Cid{root.Cid}, &carBuf) if err != nil { t.Fatal(err) } diff --git a/adder/adderutils/adderutils.go b/adder/adderutils/adderutils.go index 94c98d06..0a181d91 100644 --- a/adder/adderutils/adderutils.go +++ b/adder/adderutils/adderutils.go @@ -13,7 +13,6 @@ import ( "github.com/ipfs/ipfs-cluster/adder/single" "github.com/ipfs/ipfs-cluster/api" - cid "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" rpc "github.com/libp2p/go-libp2p-gorpc" ) @@ -31,7 +30,7 @@ func AddMultipartHTTPHandler( reader *multipart.Reader, w http.ResponseWriter, outputTransform func(api.AddedOutput) interface{}, -) (cid.Cid, error) { +) (api.Cid, error) { var dags adder.ClusterDAGService output := make(chan api.AddedOutput, 200) diff --git a/adder/ipfsadd/add.go b/adder/ipfsadd/add.go index 31e617de..f25c3f77 100644 --- a/adder/ipfsadd/add.go +++ b/adder/ipfsadd/add.go @@ -446,7 +446,7 @@ func (adder *Adder) outputDagnode(out chan api.AddedOutput, name string, dn ipld name = filepath.Join(adder.OutputPrefix, name) out <- api.AddedOutput{ - Cid: dn.Cid(), + Cid: api.NewCid(dn.Cid()), Name: name, Size: s, Allocations: adder.allocsFun(), diff --git a/adder/sharding/dag_service.go b/adder/sharding/dag_service.go index 0ffe971d..da343478 100644 --- a/adder/sharding/dag_service.go +++ b/adder/sharding/dag_service.go @@ -79,13 +79,13 @@ func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error { // Finalize finishes sharding, creates the cluster DAG and pins it along // with the meta pin for the root node of the content. -func (dgs *DAGService) Finalize(ctx context.Context, dataRoot cid.Cid) (cid.Cid, error) { +func (dgs *DAGService) Finalize(ctx context.Context, dataRoot api.Cid) (api.Cid, error) { lastCid, err := dgs.flushCurrentShard(ctx) if err != nil { - return lastCid, err + return api.NewCid(lastCid), err } - if !lastCid.Equals(dataRoot) { + if !lastCid.Equals(dataRoot.Cid) { logger.Warnf("the last added CID (%s) is not the IPFS data root (%s). This is only normal when adding a single file without wrapping in directory.", lastCid, dataRoot) } @@ -124,13 +124,13 @@ func (dgs *DAGService) Finalize(ctx context.Context, dataRoot cid.Cid) (cid.Cid, dgs.sendOutput(api.AddedOutput{ Name: fmt.Sprintf("%s-clusterDAG", dgs.addParams.Name), - Cid: clusterDAG, + Cid: api.NewCid(clusterDAG), Size: dgs.totalSize, Allocations: nil, }) // Pin the ClusterDAG - clusterDAGPin := api.PinWithOpts(clusterDAG, dgs.addParams.PinOptions) + clusterDAGPin := api.PinWithOpts(api.NewCid(clusterDAG), dgs.addParams.PinOptions) clusterDAGPin.ReplicationFactorMin = -1 clusterDAGPin.ReplicationFactorMax = -1 clusterDAGPin.MaxDepth = 0 // pin direct @@ -146,7 +146,8 @@ func (dgs *DAGService) Finalize(ctx context.Context, dataRoot cid.Cid) (cid.Cid, // Pin the META pin metaPin := api.PinWithOpts(dataRoot, dgs.addParams.PinOptions) metaPin.Type = api.MetaType - metaPin.Reference = &clusterDAG + ref := api.NewCid(clusterDAG) + metaPin.Reference = &ref metaPin.MaxDepth = 0 // irrelevant. Meta-pins are not pinned err = adder.Pin(ctx, dgs.rpcClient, metaPin) if err != nil { @@ -238,7 +239,7 @@ func (dgs *DAGService) ingestBlock(ctx context.Context, n ipld.Node) error { return dgs.ingestBlock(ctx, n) // <-- retry ingest } -func (dgs *DAGService) logStats(metaPin, clusterDAGPin cid.Cid) { +func (dgs *DAGService) logStats(metaPin, clusterDAGPin api.Cid) { duration := time.Since(dgs.startTime) seconds := uint64(duration) / uint64(time.Second) var rate string @@ -294,7 +295,7 @@ func (dgs *DAGService) flushCurrentShard(ctx context.Context) (cid.Cid, error) { dgs.currentShard = nil dgs.sendOutput(api.AddedOutput{ Name: fmt.Sprintf("shard-%d", lens), - Cid: shardCid, + Cid: api.NewCid(shardCid), Size: shard.Size(), Allocations: shard.Allocations(), }) diff --git a/adder/sharding/dag_service_test.go b/adder/sharding/dag_service_test.go index fcb52293..2128e456 100644 --- a/adder/sharding/dag_service_test.go +++ b/adder/sharding/dag_service_test.go @@ -11,7 +11,6 @@ import ( "github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/test" - cid "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" peer "github.com/libp2p/go-libp2p-core/peer" rpc "github.com/libp2p/go-libp2p-gorpc" @@ -51,7 +50,7 @@ func (rpcs *testRPC) BlockAllocate(ctx context.Context, in api.Pin, out *[]peer. return nil } -func (rpcs *testRPC) PinGet(ctx context.Context, c cid.Cid) (api.Pin, error) { +func (rpcs *testRPC) PinGet(ctx context.Context, c api.Cid) (api.Pin, error) { pI, ok := rpcs.pins.Load(c.String()) if !ok { return api.Pin{}, errors.New("not found") @@ -59,7 +58,7 @@ func (rpcs *testRPC) PinGet(ctx context.Context, c cid.Cid) (api.Pin, error) { return pI.(api.Pin), nil } -func (rpcs *testRPC) BlockGet(ctx context.Context, c cid.Cid) ([]byte, error) { +func (rpcs *testRPC) BlockGet(ctx context.Context, c api.Cid) ([]byte, error) { bI, ok := rpcs.blocks.Load(c.String()) if !ok { return nil, errors.New("not found") diff --git a/adder/sharding/shard.go b/adder/sharding/shard.go index cd3d5549..cb1ae37c 100644 --- a/adder/sharding/shard.go +++ b/adder/sharding/shard.go @@ -122,12 +122,13 @@ func (sh *shard) Flush(ctx context.Context, shardN int, prev cid.Cid) (cid.Cid, } rootCid := nodes[0].Cid() - pin := api.PinWithOpts(rootCid, sh.pinOptions) + pin := api.PinWithOpts(api.NewCid(rootCid), sh.pinOptions) pin.Name = fmt.Sprintf("%s-shard-%d", sh.pinOptions.Name, shardN) // this sets allocations as priority allocation pin.Allocations = sh.allocations pin.Type = api.ShardType - pin.Reference = &prev + ref := api.NewCid(prev) + pin.Reference = &ref pin.MaxDepth = 1 pin.ShardSize = sh.Size() // use current size, not the limit if len(nodes) > len(sh.dagNode)+1 { // using an indirect graph diff --git a/adder/sharding/verify.go b/adder/sharding/verify.go index 24a506cd..a588669c 100644 --- a/adder/sharding/verify.go +++ b/adder/sharding/verify.go @@ -7,26 +7,24 @@ import ( "testing" "github.com/ipfs/ipfs-cluster/api" - - cid "github.com/ipfs/go-cid" ) // MockPinStore is used in VerifyShards type MockPinStore interface { // Gets a pin - PinGet(context.Context, cid.Cid) (api.Pin, error) + PinGet(context.Context, api.Cid) (api.Pin, error) } // MockBlockStore is used in VerifyShards type MockBlockStore interface { // Gets a block - BlockGet(context.Context, cid.Cid) ([]byte, error) + BlockGet(context.Context, api.Cid) ([]byte, error) } // VerifyShards checks that a sharded CID has been correctly formed and stored. // This is a helper function for testing. It returns a map with all the blocks // from all shards. -func VerifyShards(t *testing.T, rootCid cid.Cid, pins MockPinStore, ipfs MockBlockStore, expectedShards int) (map[string]struct{}, error) { +func VerifyShards(t *testing.T, rootCid api.Cid, pins MockPinStore, ipfs MockBlockStore, expectedShards int) (map[string]struct{}, error) { ctx := context.Background() metaPin, err := pins.PinGet(ctx, rootCid) if err != nil { @@ -69,7 +67,7 @@ func VerifyShards(t *testing.T, rootCid cid.Cid, pins MockPinStore, ipfs MockBlo } shardBlocks := make(map[string]struct{}) - var ref cid.Cid + var ref api.Cid // traverse shards in order for i := 0; i < len(shards); i++ { sh, _, err := clusterDAGNode.ResolveLink([]string{fmt.Sprintf("%d", i)}) @@ -77,12 +75,12 @@ func VerifyShards(t *testing.T, rootCid cid.Cid, pins MockPinStore, ipfs MockBlo return nil, err } - shardPin, err := pins.PinGet(ctx, sh.Cid) + shardPin, err := pins.PinGet(ctx, api.NewCid(sh.Cid)) if err != nil { return nil, fmt.Errorf("shard was not pinned: %s %s", sh.Cid, err) } - if ref != cid.Undef && !shardPin.Reference.Equals(ref) { + if ref != api.CidUndef && !shardPin.Reference.Equals(ref) { t.Errorf("Ref (%s) should point to previous shard (%s)", ref, shardPin.Reference) } ref = shardPin.Cid diff --git a/adder/single/dag_service.go b/adder/single/dag_service.go index 13b5b9cb..005fd186 100644 --- a/adder/single/dag_service.go +++ b/adder/single/dag_service.go @@ -110,7 +110,7 @@ func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error { } // Finalize pins the last Cid added to this DAGService. -func (dgs *DAGService) Finalize(ctx context.Context, root cid.Cid) (cid.Cid, error) { +func (dgs *DAGService) Finalize(ctx context.Context, root api.Cid) (api.Cid, error) { close(dgs.blocks) select { diff --git a/adder/util.go b/adder/util.go index 26ad48ca..f647468a 100644 --- a/adder/util.go +++ b/adder/util.go @@ -110,7 +110,7 @@ func IpldNodeToNodeWithMeta(n ipld.Node) api.NodeWithMeta { } return api.NodeWithMeta{ - Cid: n.Cid(), + Cid: api.NewCid(n.Cid()), Data: n.RawData(), CumSize: size, } @@ -125,7 +125,7 @@ func BlockAllocate(ctx context.Context, rpc *rpc.Client, pinOpts api.PinOptions) "", "Cluster", "BlockAllocate", - api.PinWithOpts(cid.Undef, pinOpts), + api.PinWithOpts(api.CidUndef, pinOpts), &allocsStr, ) return allocsStr, err diff --git a/allocate.go b/allocate.go index 2a632271..8efeccab 100644 --- a/allocate.go +++ b/allocate.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" - cid "github.com/ipfs/go-cid" peer "github.com/libp2p/go-libp2p-core/peer" "go.opencensus.io/trace" @@ -57,7 +56,7 @@ type classifiedMetrics struct { // into account if the given CID was previously in a "pin everywhere" mode, // and will consider such Pins as currently unallocated ones, providing // new allocations as available. -func (c *Cluster) allocate(ctx context.Context, hash cid.Cid, currentPin api.Pin, rplMin, rplMax int, blacklist []peer.ID, priorityList []peer.ID) ([]peer.ID, error) { +func (c *Cluster) allocate(ctx context.Context, hash api.Cid, currentPin api.Pin, rplMin, rplMax int, blacklist []peer.ID, priorityList []peer.ID) ([]peer.ID, error) { ctx, span := trace.StartSpan(ctx, "cluster/allocate") defer span.End() @@ -180,7 +179,7 @@ func (c *Cluster) filterMetrics(ctx context.Context, mSet api.MetricsSet, numMet } // allocationError logs an allocation error -func allocationError(hash cid.Cid, needed, wanted int, candidatesValid []peer.ID) error { +func allocationError(hash api.Cid, needed, wanted int, candidatesValid []peer.ID) error { logger.Errorf("Not enough candidates to allocate %s:", hash) logger.Errorf(" Needed: %d", needed) logger.Errorf(" Wanted: %d", wanted) @@ -198,7 +197,7 @@ func allocationError(hash cid.Cid, needed, wanted int, candidatesValid []peer.ID func (c *Cluster) obtainAllocations( ctx context.Context, - hash cid.Cid, + hash api.Cid, rplMin, rplMax int, metrics classifiedMetrics, ) ([]peer.ID, error) { diff --git a/allocator/balanced/balanced.go b/allocator/balanced/balanced.go index 4a573e1b..83c408a5 100644 --- a/allocator/balanced/balanced.go +++ b/allocator/balanced/balanced.go @@ -12,7 +12,6 @@ import ( "fmt" "sort" - cid "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" api "github.com/ipfs/ipfs-cluster/api" peer "github.com/libp2p/go-libp2p-core/peer" @@ -254,7 +253,7 @@ func (pnedm *partitionedMetric) chooseNext() peer.ID { // - Third, based on the AllocateBy order, it select the first metric func (a *Allocator) Allocate( ctx context.Context, - c cid.Cid, + c api.Cid, current, candidates, priority api.MetricsSet, ) ([]peer.ID, error) { diff --git a/api/add.go b/api/add.go index fceea629..b0dcd782 100644 --- a/api/add.go +++ b/api/add.go @@ -17,7 +17,7 @@ var DefaultShardSize = uint64(100 * 1024 * 1024) // 100 MB // indicating a node of a file has been added. type AddedOutput struct { Name string `json:"name" codec:"n,omitempty"` - Cid cid.Cid `json:"cid" codec:"c"` + Cid Cid `json:"cid" codec:"c"` Bytes uint64 `json:"bytes,omitempty" codec:"b,omitempty"` Size uint64 `json:"size,omitempty" codec:"s,omitempty"` Allocations []peer.ID `json:"allocations,omitempty" codec:"a,omitempty"` @@ -120,7 +120,7 @@ func AddParamsFromQuery(query url.Values) (AddParams, error) { return params, err } params.PinOptions = *opts - params.PinUpdate = cid.Undef // hardcode as does not make sense for adding + params.PinUpdate.Cid = cid.Undef // hardcode as does not make sense for adding layout := query.Get("layout") switch layout { diff --git a/api/common/api.go b/api/common/api.go index 094c439c..79cddbfa 100644 --- a/api/common/api.go +++ b/api/common/api.go @@ -27,7 +27,6 @@ import ( "sync" "time" - cid "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" gopath "github.com/ipfs/go-path" types "github.com/ipfs/ipfs-cluster/api" @@ -69,8 +68,8 @@ var ( ErrHTTPEndpointNotEnabled = errors.New("the HTTP endpoint is not enabled") ) -// When passed to SendResponse(), it will figure out which http status -// to set by itself. +// SetStatusAutomatically can be passed to SendResponse(), so that it will +// figure out which http status to set by itself. const SetStatusAutomatically = -1 // API implements an API and aims to provides @@ -496,12 +495,12 @@ func (api *API) ParsePinPathOrFail(w http.ResponseWriter, r *http.Request) types return pinPath } -// ParsePidOrFail parses a Cid and returns it or makes the request fail. +// ParseCidOrFail parses a Cid and returns it or makes the request fail. func (api *API) ParseCidOrFail(w http.ResponseWriter, r *http.Request) types.Pin { vars := mux.Vars(r) hash := vars["hash"] - c, err := cid.Decode(hash) + c, err := types.DecodeCid(hash) if err != nil { api.SendResponse(w, http.StatusBadRequest, errors.New("error decoding Cid: "+err.Error()), nil) return types.Pin{} @@ -697,8 +696,8 @@ func (api *API) Headers() map[string][]string { return api.config.Headers } -// Controls the HTTP server Keep Alive settings. -// Useful for testing. +// SetKeepAlivesEnabled controls the HTTP server Keep Alive settings. Useful +// for testing. func (api *API) SetKeepAlivesEnabled(b bool) { api.server.SetKeepAlivesEnabled(b) } diff --git a/api/common/test/helpers.go b/api/common/test/helpers.go index 833fa09c..ac0e1cd0 100644 --- a/api/common/test/helpers.go +++ b/api/common/test/helpers.go @@ -122,7 +122,7 @@ func CheckHeaders(t *testing.T, expected map[string][]string, url string, header } } -// This represents what an API is to us. +// API represents what an API is to us. type API interface { HTTPAddresses() ([]string, error) Host() host.Host @@ -286,7 +286,7 @@ func BothEndpoints(t *testing.T, test Func) { }) } -// HTTPSEndpoint runs the given test.Func against an HTTPs endpoint. +// HTTPSEndPoint runs the given test.Func against an HTTPs endpoint. func HTTPSEndPoint(t *testing.T, test Func) { t.Run("in-parallel", func(t *testing.T) { t.Run("https", func(t *testing.T) { diff --git a/api/ipfsproxy/ipfsproxy.go b/api/ipfsproxy/ipfsproxy.go index c5de02ae..592a1b77 100644 --- a/api/ipfsproxy/ipfsproxy.go +++ b/api/ipfsproxy/ipfsproxy.go @@ -1,3 +1,9 @@ +// Package ipfsproxy implements the Cluster API interface by providing an +// IPFS HTTP interface as exposed by the go-ipfs daemon. +// +// In this API, select endpoints like pin*, add*, and repo* endpoints are used +// to instead perform cluster operations. Requests for any other endpoints are +// passed to the underlying IPFS daemon. package ipfsproxy import ( @@ -396,7 +402,7 @@ func (proxy *Server) pinLsHandler(w http.ResponseWriter, r *http.Request) { } if arg != "" { - c, err := cid.Decode(arg) + c, err := api.DecodeCid(arg) if err != nil { ipfsErrorResponder(w, err.Error(), -1) return @@ -526,7 +532,7 @@ func (proxy *Server) pinUpdateHandler(w http.ResponseWriter, r *http.Request) { } // Resolve the FROM argument - var fromCid cid.Cid + var fromCid api.Cid err = proxy.rpcClient.CallContext( ctx, "", @@ -733,9 +739,9 @@ func (proxy *Server) repoGCHandler(w http.ResponseWriter, r *http.Request) { for _, gc := range repoGC.PeerMap { for _, key := range gc.Keys { if streamErrors { - ipfsRepoGC = ipfsRepoGCResp{Key: key.Key, Error: key.Error} + ipfsRepoGC = ipfsRepoGCResp{Key: key.Key.Cid, Error: key.Error} } else { - ipfsRepoGC = ipfsRepoGCResp{Key: key.Key} + ipfsRepoGC = ipfsRepoGCResp{Key: key.Key.Cid} if key.Error != "" { mError.add(key.Error) } diff --git a/api/ipfsproxy/ipfsproxy_test.go b/api/ipfsproxy/ipfsproxy_test.go index 674a3696..d112cf07 100644 --- a/api/ipfsproxy/ipfsproxy_test.go +++ b/api/ipfsproxy/ipfsproxy_test.go @@ -13,8 +13,6 @@ import ( "testing" "time" - cid "github.com/ipfs/go-cid" - "github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/test" @@ -100,7 +98,7 @@ func TestIPFSProxyPin(t *testing.T) { tests := []struct { name string args args - want cid.Cid + want api.Cid wantErr bool }{ { @@ -140,7 +138,7 @@ func TestIPFSProxyPin(t *testing.T) { test.ErrorCid.String(), http.StatusInternalServerError, }, - cid.Undef, + api.CidUndef, true, }, { @@ -150,7 +148,7 @@ func TestIPFSProxyPin(t *testing.T) { test.ErrorCid.String(), http.StatusInternalServerError, }, - cid.Undef, + api.CidUndef, true, }, } @@ -218,7 +216,7 @@ func TestIPFSProxyUnpin(t *testing.T) { tests := []struct { name string args args - want cid.Cid + want api.Cid wantErr bool }{ { @@ -258,7 +256,7 @@ func TestIPFSProxyUnpin(t *testing.T) { test.ErrorCid.String(), http.StatusInternalServerError, }, - cid.Undef, + api.CidUndef, true, }, { @@ -268,7 +266,7 @@ func TestIPFSProxyUnpin(t *testing.T) { test.ErrorCid.String(), http.StatusInternalServerError, }, - cid.Undef, + api.CidUndef, true, }, } @@ -583,7 +581,7 @@ func TestProxyRepoGC(t *testing.T) { repoGC = append(repoGC, resp) } - if !repoGC[0].Key.Equals(test.Cid1) { + if !repoGC[0].Key.Equals(test.Cid1.Cid) { t.Errorf("expected a different cid, expected: %s, found: %s", test.Cid1, repoGC[0].Key) } diff --git a/api/pinsvcapi/pinsvc/pinsvc.go b/api/pinsvcapi/pinsvc/pinsvc.go index b14f0497..3baa5043 100644 --- a/api/pinsvcapi/pinsvc/pinsvc.go +++ b/api/pinsvcapi/pinsvc/pinsvc.go @@ -10,7 +10,6 @@ import ( "strings" "time" - "github.com/ipfs/go-cid" types "github.com/ipfs/ipfs-cluster/api" ) @@ -54,7 +53,7 @@ func (pname *PinName) UnmarshalJSON(data []byte) error { // Pin contains basic information about a Pin and pinning options. type Pin struct { - Cid string `json:"cid"` // a cid.Cid does not json properly + Cid types.Cid `json:"cid"` Name PinName `json:"name"` Origins []types.Multiaddr `json:"origins"` Meta map[string]string `json:"meta"` @@ -62,7 +61,7 @@ type Pin struct { // Defined returns if the pinis empty (Cid not set). func (p Pin) Defined() bool { - return p.Cid != "" + return p.Cid.Defined() } // MatchesName returns in a pin status matches a name option with a given @@ -233,7 +232,7 @@ type PinList struct { // ListOptions represents possible options given to the List endpoint. type ListOptions struct { - Cids []cid.Cid + Cids []types.Cid Name string MatchingStrategy MatchingStrategy Status Status @@ -243,11 +242,12 @@ type ListOptions struct { Meta map[string]string } +// FromQuery parses ListOptions from url.Values. func (lo *ListOptions) FromQuery(q url.Values) error { cidq := q.Get("cid") if len(cidq) > 0 { for _, cstr := range strings.Split(cidq, ",") { - c, err := cid.Decode(cstr) + c, err := types.DecodeCid(cstr) if err != nil { return fmt.Errorf("error decoding cid %s: %w", cstr, err) } diff --git a/api/pinsvcapi/pinsvcapi.go b/api/pinsvcapi/pinsvcapi.go index 81431c67..1490cee2 100644 --- a/api/pinsvcapi/pinsvcapi.go +++ b/api/pinsvcapi/pinsvcapi.go @@ -15,7 +15,6 @@ import ( "sync" "github.com/gorilla/mux" - "github.com/ipfs/go-cid" types "github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/api/common" "github.com/ipfs/ipfs-cluster/api/pinsvcapi/pinsvc" @@ -79,11 +78,7 @@ func svcPinToClusterPin(p pinsvc.Pin) (types.Pin, error) { Metadata: p.Meta, Mode: types.PinModeRecursive, } - c, err := cid.Decode(p.Cid) - if err != nil { - return types.Pin{}, err - } - return types.PinWithOpts(c, opts), nil + return types.PinWithOpts(p.Cid, opts), nil } func globalPinInfoToSvcPinStatus( @@ -103,7 +98,7 @@ func globalPinInfoToSvcPinStatus( status.Status = trackerStatusToSvcStatus(statusMask) status.Created = gpi.Created status.Pin = pinsvc.Pin{ - Cid: gpi.Cid.String(), + Cid: gpi.Cid, Name: pinsvc.PinName(gpi.Name), Origins: gpi.Origins, Meta: gpi.Metadata, @@ -131,7 +126,7 @@ func NewAPI(ctx context.Context, cfg *Config) (*API, error) { return NewAPIWithHost(ctx, cfg, nil) } -// NewAPI creates a new REST API component using the given libp2p Host. +// NewAPIWithHost creates a new REST API component using the given libp2p Host. func NewAPIWithHost(ctx context.Context, cfg *Config, h host.Host) (*API, error) { api := API{ config: cfg, @@ -191,13 +186,13 @@ func (api *API) parseBodyOrFail(w http.ResponseWriter, r *http.Request) pinsvc.P return pin } -func (api *API) parseRequestIDOrFail(w http.ResponseWriter, r *http.Request) (cid.Cid, bool) { +func (api *API) parseRequestIDOrFail(w http.ResponseWriter, r *http.Request) (types.Cid, bool) { vars := mux.Vars(r) cStr, ok := vars["requestID"] if !ok { - return cid.Undef, true + return types.CidUndef, true } - c, err := cid.Decode(cStr) + c, err := types.DecodeCid(cStr) if err != nil { api.SendResponse(w, http.StatusBadRequest, errors.New("error decoding requestID: "+err.Error()), nil) return c, false @@ -233,12 +228,12 @@ func (api *API) addPin(w http.ResponseWriter, r *http.Request) { return } - status := api.pinToSvcPinStatus(r.Context(), pin.Cid, pinObj) + status := api.pinToSvcPinStatus(r.Context(), pin.Cid.String(), pinObj) api.SendResponse(w, common.SetStatusAutomatically, nil, status) } } -func (api *API) getPinSvcStatus(ctx context.Context, c cid.Cid) (pinsvc.PinStatus, error) { +func (api *API) getPinSvcStatus(ctx context.Context, c types.Cid) (pinsvc.PinStatus, error) { var pinInfo types.GlobalPinInfo err := api.rpcClient.CallContext( @@ -318,7 +313,7 @@ func (api *API) listPins(w http.ResponseWriter, r *http.Request) { }() for _, ci := range opts.Cids { - go func(c cid.Cid) { + go func(c types.Cid) { defer wg.Done() st, err := api.getPinSvcStatus(r.Context(), c) stCh <- statusResult{st: st, err: err} @@ -410,7 +405,7 @@ func (api *API) pinToSvcPinStatus(ctx context.Context, rID string, pin types.Pin Status: pinsvc.StatusQueued, Created: pin.Timestamp, Pin: pinsvc.Pin{ - Cid: pin.Cid.String(), + Cid: pin.Cid, Name: pinsvc.PinName(pin.Name), Origins: pin.Origins, Meta: pin.Metadata, diff --git a/api/pinsvcapi/pinsvcapi_test.go b/api/pinsvcapi/pinsvcapi_test.go index fb33761e..452e4a0b 100644 --- a/api/pinsvcapi/pinsvcapi_test.go +++ b/api/pinsvcapi/pinsvcapi_test.go @@ -68,7 +68,7 @@ func TestAPIListEndpoint(t *testing.T) { } results := resp.Results - if results[0].Pin.Cid != clustertest.Cid1.String() || + if !results[0].Pin.Cid.Equals(clustertest.Cid1) || results[1].Status != pinsvc.StatusPinning { t.Errorf("unexpected statusAll resp: %+v", results) } @@ -161,7 +161,7 @@ func TestAPIPinEndpoint(t *testing.T) { tf := func(t *testing.T, url test.URLFunc) { // test normal pin pin := pinsvc.Pin{ - Cid: clustertest.Cid3.String(), + Cid: clustertest.Cid3, Name: "testname", Origins: []api.Multiaddr{ ma, @@ -192,7 +192,7 @@ func TestAPIPinEndpoint(t *testing.T) { var errName pinsvc.APIError pin2 := pinsvc.Pin{ - Cid: clustertest.Cid1.String(), + Cid: clustertest.Cid1, Name: pinsvc.PinName(make([]byte, 256)), } pinJSON, err = json.Marshal(pin2) @@ -218,7 +218,7 @@ func TestAPIGetPinEndpoint(t *testing.T) { var status pinsvc.PinStatus test.MakeGet(t, svcapi, url(svcapi)+"/pins/"+clustertest.Cid1.String(), &status) - if status.Pin.Cid != clustertest.Cid1.String() { + if !status.Pin.Cid.Equals(clustertest.Cid1) { t.Error("Cid should be set") } diff --git a/api/rest/client/client.go b/api/rest/client/client.go index a49f704c..f43e01ed 100644 --- a/api/rest/client/client.go +++ b/api/rest/client/client.go @@ -12,7 +12,6 @@ import ( "github.com/ipfs/ipfs-cluster/api" - cid "github.com/ipfs/go-cid" shell "github.com/ipfs/go-ipfs-api" files "github.com/ipfs/go-ipfs-files" logging "github.com/ipfs/go-log/v2" @@ -64,9 +63,9 @@ type Client interface { // Pin tracks a Cid with the given replication factor and a name for // human-friendliness. - Pin(ctx context.Context, ci cid.Cid, opts api.PinOptions) (api.Pin, error) + Pin(ctx context.Context, ci api.Cid, opts api.PinOptions) (api.Pin, error) // Unpin untracks a Cid from cluster. - Unpin(ctx context.Context, ci cid.Cid) (api.Pin, error) + Unpin(ctx context.Context, ci api.Cid) (api.Pin, error) // PinPath resolves given path into a cid and performs the pin operation. PinPath(ctx context.Context, path string, opts api.PinOptions) (api.Pin, error) @@ -78,21 +77,21 @@ type Client interface { // and the peers that should be pinning them. Allocations(ctx context.Context, filter api.PinType, out chan<- api.Pin) error // Allocation returns the current allocations for a given Cid. - Allocation(ctx context.Context, ci cid.Cid) (api.Pin, error) + Allocation(ctx context.Context, ci api.Cid) (api.Pin, error) // Status returns the current ipfs state for a given Cid. If local is true, // the information affects only the current peer, otherwise the information // is fetched from all cluster peers. - Status(ctx context.Context, ci cid.Cid, local bool) (api.GlobalPinInfo, error) + Status(ctx context.Context, ci api.Cid, local bool) (api.GlobalPinInfo, error) // StatusCids status information for the requested CIDs. - StatusCids(ctx context.Context, cids []cid.Cid, local bool, out chan<- api.GlobalPinInfo) error + StatusCids(ctx context.Context, cids []api.Cid, local bool, out chan<- api.GlobalPinInfo) error // StatusAll gathers Status() for all tracked items. StatusAll(ctx context.Context, filter api.TrackerStatus, local bool, out chan<- api.GlobalPinInfo) error // Recover retriggers pin or unpin ipfs operations for a Cid in error // state. If local is true, the operation is limited to the current // peer, otherwise it happens on every cluster peer. - Recover(ctx context.Context, ci cid.Cid, local bool) (api.GlobalPinInfo, error) + Recover(ctx context.Context, ci api.Cid, local bool) (api.GlobalPinInfo, error) // RecoverAll triggers Recover() operations on all tracked items. If // local is true, the operation is limited to the current peer. // Otherwise, it happens everywhere. diff --git a/api/rest/client/lbclient.go b/api/rest/client/lbclient.go index 8ae6e2e2..2daee558 100644 --- a/api/rest/client/lbclient.go +++ b/api/rest/client/lbclient.go @@ -4,7 +4,6 @@ import ( "context" "sync/atomic" - cid "github.com/ipfs/go-cid" shell "github.com/ipfs/go-ipfs-api" files "github.com/ipfs/go-ipfs-files" "github.com/ipfs/ipfs-cluster/api" @@ -156,7 +155,7 @@ func (lc *loadBalancingClient) PeerRm(ctx context.Context, id peer.ID) error { // Pin tracks a Cid with the given replication factor and a name for // human-friendliness. -func (lc *loadBalancingClient) Pin(ctx context.Context, ci cid.Cid, opts api.PinOptions) (api.Pin, error) { +func (lc *loadBalancingClient) Pin(ctx context.Context, ci api.Cid, opts api.PinOptions) (api.Pin, error) { var pin api.Pin call := func(c Client) error { var err error @@ -169,7 +168,7 @@ func (lc *loadBalancingClient) Pin(ctx context.Context, ci cid.Cid, opts api.Pin } // Unpin untracks a Cid from cluster. -func (lc *loadBalancingClient) Unpin(ctx context.Context, ci cid.Cid) (api.Pin, error) { +func (lc *loadBalancingClient) Unpin(ctx context.Context, ci api.Cid) (api.Pin, error) { var pin api.Pin call := func(c Client) error { var err error @@ -220,7 +219,7 @@ func (lc *loadBalancingClient) Allocations(ctx context.Context, filter api.PinTy } // Allocation returns the current allocations for a given Cid. -func (lc *loadBalancingClient) Allocation(ctx context.Context, ci cid.Cid) (api.Pin, error) { +func (lc *loadBalancingClient) Allocation(ctx context.Context, ci api.Cid) (api.Pin, error) { var pin api.Pin call := func(c Client) error { var err error @@ -235,7 +234,7 @@ func (lc *loadBalancingClient) Allocation(ctx context.Context, ci cid.Cid) (api. // Status returns the current ipfs state for a given Cid. If local is true, // the information affects only the current peer, otherwise the information // is fetched from all cluster peers. -func (lc *loadBalancingClient) Status(ctx context.Context, ci cid.Cid, local bool) (api.GlobalPinInfo, error) { +func (lc *loadBalancingClient) Status(ctx context.Context, ci api.Cid, local bool) (api.GlobalPinInfo, error) { var pinInfo api.GlobalPinInfo call := func(c Client) error { var err error @@ -250,7 +249,7 @@ func (lc *loadBalancingClient) Status(ctx context.Context, ci cid.Cid, local boo // StatusCids returns Status() information for the given Cids. If local is // true, the information affects only the current peer, otherwise the // information is fetched from all cluster peers. -func (lc *loadBalancingClient) StatusCids(ctx context.Context, cids []cid.Cid, local bool, out chan<- api.GlobalPinInfo) error { +func (lc *loadBalancingClient) StatusCids(ctx context.Context, cids []api.Cid, local bool, out chan<- api.GlobalPinInfo) error { call := func(c Client) error { return c.StatusCids(ctx, cids, local, out) } @@ -276,7 +275,7 @@ func (lc *loadBalancingClient) StatusAll(ctx context.Context, filter api.Tracker // Recover retriggers pin or unpin ipfs operations for a Cid in error state. // If local is true, the operation is limited to the current peer, otherwise // it happens on every cluster peer. -func (lc *loadBalancingClient) Recover(ctx context.Context, ci cid.Cid, local bool) (api.GlobalPinInfo, error) { +func (lc *loadBalancingClient) Recover(ctx context.Context, ci api.Cid, local bool) (api.GlobalPinInfo, error) { var pinInfo api.GlobalPinInfo call := func(c Client) error { var err error diff --git a/api/rest/client/methods.go b/api/rest/client/methods.go index 9aa07812..9bfba5a4 100644 --- a/api/rest/client/methods.go +++ b/api/rest/client/methods.go @@ -15,7 +15,6 @@ import ( "github.com/ipfs/ipfs-cluster/api" - cid "github.com/ipfs/go-cid" files "github.com/ipfs/go-ipfs-files" gopath "github.com/ipfs/go-path" peer "github.com/libp2p/go-libp2p-core/peer" @@ -85,7 +84,7 @@ func (c *defaultClient) PeerRm(ctx context.Context, id peer.ID) error { // Pin tracks a Cid with the given replication factor and a name for // human-friendliness. -func (c *defaultClient) Pin(ctx context.Context, ci cid.Cid, opts api.PinOptions) (api.Pin, error) { +func (c *defaultClient) Pin(ctx context.Context, ci api.Cid, opts api.PinOptions) (api.Pin, error) { ctx, span := trace.StartSpan(ctx, "client/Pin") defer span.End() @@ -110,7 +109,7 @@ func (c *defaultClient) Pin(ctx context.Context, ci cid.Cid, opts api.PinOptions } // Unpin untracks a Cid from cluster. -func (c *defaultClient) Unpin(ctx context.Context, ci cid.Cid) (api.Pin, error) { +func (c *defaultClient) Unpin(ctx context.Context, ci api.Cid) (api.Pin, error) { ctx, span := trace.StartSpan(ctx, "client/Unpin") defer span.End() var pin api.Pin @@ -212,7 +211,7 @@ func (c *defaultClient) Allocations(ctx context.Context, filter api.PinType, out } // Allocation returns the current allocations for a given Cid. -func (c *defaultClient) Allocation(ctx context.Context, ci cid.Cid) (api.Pin, error) { +func (c *defaultClient) Allocation(ctx context.Context, ci api.Cid) (api.Pin, error) { ctx, span := trace.StartSpan(ctx, "client/Allocation") defer span.End() @@ -224,7 +223,7 @@ func (c *defaultClient) Allocation(ctx context.Context, ci cid.Cid) (api.Pin, er // Status returns the current ipfs state for a given Cid. If local is true, // the information affects only the current peer, otherwise the information // is fetched from all cluster peers. -func (c *defaultClient) Status(ctx context.Context, ci cid.Cid, local bool) (api.GlobalPinInfo, error) { +func (c *defaultClient) Status(ctx context.Context, ci api.Cid, local bool) (api.GlobalPinInfo, error) { ctx, span := trace.StartSpan(ctx, "client/Status") defer span.End() @@ -243,7 +242,7 @@ func (c *defaultClient) Status(ctx context.Context, ci cid.Cid, local bool) (api // StatusCids returns Status() information for the given Cids. If local is // true, the information affects only the current peer, otherwise the // information is fetched from all cluster peers. -func (c *defaultClient) StatusCids(ctx context.Context, cids []cid.Cid, local bool, out chan<- api.GlobalPinInfo) error { +func (c *defaultClient) StatusCids(ctx context.Context, cids []api.Cid, local bool, out chan<- api.GlobalPinInfo) error { return c.statusAllWithCids(ctx, api.TrackerStatusUndefined, cids, local, out) } @@ -256,7 +255,7 @@ func (c *defaultClient) StatusAll(ctx context.Context, filter api.TrackerStatus, return c.statusAllWithCids(ctx, filter, nil, local, out) } -func (c *defaultClient) statusAllWithCids(ctx context.Context, filter api.TrackerStatus, cids []cid.Cid, local bool, out chan<- api.GlobalPinInfo) error { +func (c *defaultClient) statusAllWithCids(ctx context.Context, filter api.TrackerStatus, cids []api.Cid, local bool, out chan<- api.GlobalPinInfo) error { defer close(out) ctx, span := trace.StartSpan(ctx, "client/StatusAll") defer span.End() @@ -298,7 +297,7 @@ func (c *defaultClient) statusAllWithCids(ctx context.Context, filter api.Tracke // Recover retriggers pin or unpin ipfs operations for a Cid in error state. // If local is true, the operation is limited to the current peer, otherwise // it happens on every cluster peer. -func (c *defaultClient) Recover(ctx context.Context, ci cid.Cid, local bool) (api.GlobalPinInfo, error) { +func (c *defaultClient) Recover(ctx context.Context, ci api.Cid, local bool) (api.GlobalPinInfo, error) { ctx, span := trace.StartSpan(ctx, "client/Recover") defer span.End() @@ -458,7 +457,7 @@ func WaitFor(ctx context.Context, c Client, fp StatusFilterParams) (api.GlobalPi // StatusFilterParams contains the parameters required // to filter a stream of status results. type StatusFilterParams struct { - Cid cid.Cid + Cid api.Cid Local bool // query status from the local peer only Target api.TrackerStatus Limit int // wait for N peers reaching status. 0 == all diff --git a/api/rest/client/methods_test.go b/api/rest/client/methods_test.go index ee68cb33..a4de9518 100644 --- a/api/rest/client/methods_test.go +++ b/api/rest/client/methods_test.go @@ -11,7 +11,6 @@ import ( rest "github.com/ipfs/ipfs-cluster/api/rest" test "github.com/ipfs/ipfs-cluster/test" - cid "github.com/ipfs/go-cid" peer "github.com/libp2p/go-libp2p-core/peer" rpc "github.com/libp2p/go-libp2p-gorpc" ma "github.com/multiformats/go-multiaddr" @@ -227,7 +226,7 @@ func TestPinPath(t *testing.T) { testF := func(t *testing.T, c Client) { for _, testCase := range pathTestCases { - ec, _ := cid.Decode(testCase.expectedCid) + ec, _ := types.DecodeCid(testCase.expectedCid) resultantPin := types.PinWithOpts(ec, opts) p := testCase.path pin, err := c.PinPath(ctx, p, opts) @@ -351,7 +350,7 @@ func TestStatusCids(t *testing.T) { out := make(chan types.GlobalPinInfo) go func() { - err := c.StatusCids(ctx, []cid.Cid{test.Cid1}, false, out) + err := c.StatusCids(ctx, []types.Cid{test.Cid1}, false, out) if err != nil { t.Error(err) } @@ -590,7 +589,7 @@ func (wait *waitService) Pin(ctx context.Context, in types.Pin, out *types.Pin) return nil } -func (wait *waitService) Status(ctx context.Context, in cid.Cid, out *types.GlobalPinInfo) error { +func (wait *waitService) Status(ctx context.Context, in types.Cid, out *types.GlobalPinInfo) error { wait.l.Lock() defer wait.l.Unlock() if time.Now().After(wait.pinStart.Add(5 * time.Second)) { //pinned @@ -642,7 +641,7 @@ func (wait *waitService) Status(ctx context.Context, in cid.Cid, out *types.Glob return nil } -func (wait *waitService) PinGet(ctx context.Context, in cid.Cid, out *types.Pin) error { +func (wait *waitService) PinGet(ctx context.Context, in types.Cid, out *types.Pin) error { p := types.PinCid(in) p.ReplicationFactorMin = 2 p.ReplicationFactorMax = 3 @@ -662,7 +661,7 @@ func (wait *waitServiceUnpin) Unpin(ctx context.Context, in types.Pin, out *type return nil } -func (wait *waitServiceUnpin) Status(ctx context.Context, in cid.Cid, out *types.GlobalPinInfo) error { +func (wait *waitServiceUnpin) Status(ctx context.Context, in types.Cid, out *types.GlobalPinInfo) error { wait.l.Lock() defer wait.l.Unlock() if time.Now().After(wait.unpinStart.Add(5 * time.Second)) { //unpinned @@ -698,7 +697,7 @@ func (wait *waitServiceUnpin) Status(ctx context.Context, in cid.Cid, out *types return nil } -func (wait *waitServiceUnpin) PinGet(ctx context.Context, in cid.Cid, out *types.Pin) error { +func (wait *waitServiceUnpin) PinGet(ctx context.Context, in types.Cid, out *types.Pin) error { return errors.New("not found") } diff --git a/api/rest/restapi.go b/api/rest/restapi.go index 4caee0d2..e8591c81 100644 --- a/api/rest/restapi.go +++ b/api/rest/restapi.go @@ -17,7 +17,6 @@ import ( "sync" "time" - "github.com/ipfs/go-cid" "github.com/ipfs/ipfs-cluster/adder/adderutils" types "github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/api/common" @@ -611,10 +610,10 @@ func (api *API) statusCidsHandler(w http.ResponseWriter, r *http.Request) { queryValues := r.URL.Query() filterCidsStr := strings.Split(queryValues.Get("cids"), ",") - var cids []cid.Cid + var cids []types.Cid for _, cidStr := range filterCidsStr { - c, err := cid.Decode(cidStr) + c, err := types.DecodeCid(cidStr) if err != nil { api.SendResponse(w, http.StatusBadRequest, fmt.Errorf("error decoding Cid: %w", err), nil) return @@ -638,7 +637,7 @@ func (api *API) statusCidsHandler(w http.ResponseWriter, r *http.Request) { if local == "true" { for _, ci := range cids { - go func(c cid.Cid) { + go func(c types.Cid) { defer wg.Done() var pinInfo types.PinInfo err := api.rpcClient.CallContext( @@ -658,7 +657,7 @@ func (api *API) statusCidsHandler(w http.ResponseWriter, r *http.Request) { } } else { for _, ci := range cids { - go func(c cid.Cid) { + go func(c types.Cid) { defer wg.Done() var pinInfo types.GlobalPinInfo err := api.rpcClient.CallContext( diff --git a/api/rest/restapi_test.go b/api/rest/restapi_test.go index 5d826d8e..328512be 100644 --- a/api/rest/restapi_test.go +++ b/api/rest/restapi_test.go @@ -13,7 +13,6 @@ import ( test "github.com/ipfs/ipfs-cluster/api/common/test" clustertest "github.com/ipfs/ipfs-cluster/test" - cid "github.com/ipfs/go-cid" libp2p "github.com/libp2p/go-libp2p" peer "github.com/libp2p/go-libp2p-core/peer" ma "github.com/multiformats/go-multiaddr" @@ -401,7 +400,7 @@ func TestAPIPinEndpointWithPath(t *testing.T) { tf := func(t *testing.T, url test.URLFunc) { for _, testCase := range pathTestCases[:3] { - c, _ := cid.Decode(testCase.expectedCid) + c, _ := api.DecodeCid(testCase.expectedCid) resultantPin := api.PinWithOpts( c, testPinOpts, diff --git a/api/types.go b/api/types.go index 4766b93a..ca0c4599 100644 --- a/api/types.go +++ b/api/types.go @@ -277,33 +277,66 @@ var ipfsPinStatus2TrackerStatusMap = map[IPFSPinStatus]TrackerStatus{ IPFSPinStatusError: TrackerStatusClusterError, //TODO(ajl): check suitability } -// Cid is a CID with the MarshalJSON/UnmarshalJSON methods overwritten. -type Cid cid.Cid +// Cid embeds a cid.Cid with the MarshalJSON/UnmarshalJSON methods overwritten. +type Cid struct { + cid.Cid +} -func (c Cid) String() string { - return cid.Cid(c).String() +// CidUndef is an Undefined CID. +var CidUndef = Cid{cid.Undef} + +// NewCid wraps a cid.Cid in a Cid. +func NewCid(c cid.Cid) Cid { + return Cid{ + Cid: c, + } +} + +// DecodeCid parses a CID from its string form. +func DecodeCid(str string) (Cid, error) { + c, err := cid.Decode(str) + return Cid{c}, err +} + +// CastCid returns a CID from its bytes. +func CastCid(bs []byte) (Cid, error) { + c, err := cid.Cast(bs) + return Cid{c}, err } // MarshalJSON marshals a CID as JSON as a normal CID string. func (c Cid) MarshalJSON() ([]byte, error) { - return json.Marshal(c.String()) + if !c.Defined() { + return []byte("null"), nil + } + return []byte(`"` + c.String() + `"`), nil } // UnmarshalJSON reads a CID from its representation as JSON string. func (c *Cid) UnmarshalJSON(b []byte) error { + if string(b) == "null" { + *c = CidUndef + return nil + } + var cidStr string err := json.Unmarshal(b, &cidStr) if err != nil { return err } - cc, err := cid.Decode(cidStr) + cc, err := DecodeCid(cidStr) if err != nil { return err } - *c = Cid(cc) + *c = cc return nil } +// Equals returns true if two Cids are equal. +func (c Cid) Equals(c2 Cid) bool { + return c.Cid.Equals(c2.Cid) +} + // IPFSPinInfo represents an IPFS Pin, which only has a CID and type. // Its JSON form is what IPFS returns when querying a pinset. type IPFSPinInfo struct { @@ -314,7 +347,7 @@ type IPFSPinInfo struct { // GlobalPinInfo contains cluster-wide status information about a tracked Cid, // indexed by cluster peer. type GlobalPinInfo struct { - Cid cid.Cid `json:"cid" codec:"c"` + Cid Cid `json:"cid" codec:"c"` Name string `json:"name" codec:"n"` Allocations []peer.ID `json:"allocations" codec:"a,omitempty"` Origins []Multiaddr `json:"origins" codec:"g,omitempty"` @@ -360,7 +393,7 @@ func (gpi *GlobalPinInfo) Defined() bool { return gpi.Cid.Defined() } -// Matches returns true if one of the statuses in GlobalPinInfo matches +// Match returns true if one of the statuses in GlobalPinInfo matches // the given filter. func (gpi GlobalPinInfo) Match(filter TrackerStatus) bool { for _, pi := range gpi.PeerMap { @@ -400,7 +433,7 @@ func (pis PinInfoShort) String() string { // PinInfo holds information about local pins. This is used by the Pin // Trackers. type PinInfo struct { - Cid cid.Cid `json:"cid" codec:"c"` + Cid Cid `json:"cid" codec:"c"` Name string `json:"name" codec:"m,omitempty"` Peer peer.ID `json:"peer" codec:"p,omitempty"` Allocations []peer.ID `json:"allocations" codec:"o,omitempty"` @@ -709,7 +742,7 @@ type PinOptions struct { UserAllocations []peer.ID `json:"user_allocations" codec:"ua,omitempty"` ExpireAt time.Time `json:"expire_at" codec:"e,omitempty"` Metadata map[string]string `json:"metadata" codec:"m,omitempty"` - PinUpdate cid.Cid `json:"pin_update,omitempty" codec:"pu,omitempty"` + PinUpdate Cid `json:"pin_update,omitempty" codec:"pu,omitempty"` Origins []Multiaddr `json:"origins" codec:"g,omitempty"` } @@ -807,7 +840,7 @@ func (po PinOptions) ToQuery() (string, error) { } q.Set(fmt.Sprintf("%s%s", pinOptionsMetaPrefix, k), v) } - if po.PinUpdate != cid.Undef { + if po.PinUpdate.Defined() { q.Set("pin-update", po.PinUpdate.String()) } @@ -888,7 +921,7 @@ func (po *PinOptions) FromQuery(q url.Values) error { updateStr := q.Get("pin-update") if updateStr != "" { - updateCid, err := cid.Decode(updateStr) + updateCid, err := DecodeCid(updateStr) if err != nil { return fmt.Errorf("error decoding update option parameter: %s", err) } @@ -940,7 +973,7 @@ func (pd PinDepth) ToPinMode() PinMode { type Pin struct { PinOptions - Cid cid.Cid `json:"cid" codec:"c"` + Cid Cid `json:"cid" codec:"c"` // See PinType comments Type PinType `json:"type" codec:"t,omitempty"` @@ -957,7 +990,7 @@ type Pin struct { // MetaPin it is the ClusterDAG CID. For Shards, // it is the previous shard CID. // When not needed the pointer is nil - Reference *cid.Cid `json:"reference" codec:"r,omitempty"` + Reference *Cid `json:"reference" codec:"r,omitempty"` // The time that the pin was submitted to the consensus layer. Timestamp time.Time `json:"timestamp" codec:"i,omitempty"` @@ -994,7 +1027,7 @@ func (pp PinPath) Defined() bool { // PinCid is a shortcut to create a Pin only with a Cid. Default is for pin to // be recursive and the pin to be of DataType. -func PinCid(c cid.Cid) Pin { +func PinCid(c Cid) Pin { return Pin{ Cid: c, Type: DataType, @@ -1007,7 +1040,7 @@ func PinCid(c cid.Cid) Pin { // PinWithOpts creates a new Pin calling PinCid(c) and then sets its // PinOptions fields with the given options. Pin fields that are linked to // options are set accordingly (MaxDepth from Mode). -func PinWithOpts(c cid.Cid, opts PinOptions) Pin { +func PinWithOpts(c Cid, opts PinOptions) Pin { p := PinCid(c) p.PinOptions = opts p.MaxDepth = p.Mode.ToPinDepth() @@ -1090,9 +1123,9 @@ func (pin *Pin) ProtoUnmarshal(data []byte) error { if err != nil { return err } - ci, err := cid.Cast(pbPin.GetCid()) + ci, err := CastCid(pbPin.GetCid()) if err != nil { - pin.Cid = cid.Undef + pin.Cid = CidUndef } else { pin.Cid = ci } @@ -1112,7 +1145,7 @@ func (pin *Pin) ProtoUnmarshal(data []byte) error { pin.Allocations = allocs pin.MaxDepth = PinDepth(pbPin.GetMaxDepth()) - ref, err := cid.Cast(pbPin.GetReference()) + ref, err := CastCid(pbPin.GetReference()) if err != nil { pin.Reference = nil @@ -1137,7 +1170,7 @@ func (pin *Pin) ProtoUnmarshal(data []byte) error { pin.ExpireAt = time.Unix(int64(exp), 0) } pin.Metadata = opts.GetMetadata() - pinUpdate, err := cid.Cast(opts.GetPinUpdate()) + pinUpdate, err := CastCid(opts.GetPinUpdate()) if err == nil { pin.PinUpdate = pinUpdate } @@ -1230,9 +1263,9 @@ func (pin Pin) Defined() bool { // NodeWithMeta specifies a block of data and a set of optional metadata fields // carrying information about the encoded ipld node type NodeWithMeta struct { - Data []byte `codec:"d,omitempty"` - Cid cid.Cid `codec:"c,omitempty"` - CumSize uint64 `codec:"s,omitempty"` // Cumulative size + Data []byte `codec:"d,omitempty"` + Cid Cid `codec:"c,omitempty"` + CumSize uint64 `codec:"s,omitempty"` // Cumulative size } // Size returns how big is the block. It is different from CumSize, which @@ -1357,8 +1390,8 @@ type IPFSRepoStat struct { // IPFSRepoGC represents the streaming response sent from repo gc API of IPFS. type IPFSRepoGC struct { - Key cid.Cid `json:"key,omitempty" codec:"k,omitempty"` - Error string `json:"error,omitempty" codec:"e,omitempty"` + Key Cid `json:"key,omitempty" codec:"k,omitempty"` + Error string `json:"error,omitempty" codec:"e,omitempty"` } // RepoGC contains garbage collected CIDs from a cluster peer's IPFS daemon. diff --git a/api/types_test.go b/api/types_test.go index 17c9b502..5ee3a1f3 100644 --- a/api/types_test.go +++ b/api/types_test.go @@ -8,7 +8,6 @@ import ( "testing" "time" - cid "github.com/ipfs/go-cid" peer "github.com/libp2p/go-libp2p-core/peer" multiaddr "github.com/multiformats/go-multiaddr" @@ -263,7 +262,7 @@ func TestIDCodec(t *testing.T) { } func TestPinCodec(t *testing.T) { - ci, _ := cid.Decode("QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLewuabc") + ci, _ := DecodeCid("QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLewuabc") pin := PinCid(ci) var buf bytes.Buffer enc := codec.NewEncoder(&buf, &codec.MsgpackHandle{}) diff --git a/cluster.go b/cluster.go index 39e40311..e663ae6b 100644 --- a/cluster.go +++ b/cluster.go @@ -19,7 +19,6 @@ import ( "github.com/ipfs/ipfs-cluster/version" "go.uber.org/multierr" - cid "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" host "github.com/libp2p/go-libp2p-core/host" peer "github.com/libp2p/go-libp2p-core/peer" @@ -1205,7 +1204,7 @@ func (c *Cluster) StatusAllLocal(ctx context.Context, filter api.TrackerStatus, // Status returns the GlobalPinInfo for a given Cid as fetched from all // current peers. If an error happens, the GlobalPinInfo should contain // as much information as could be fetched from the other peers. -func (c *Cluster) Status(ctx context.Context, h cid.Cid) (api.GlobalPinInfo, error) { +func (c *Cluster) Status(ctx context.Context, h api.Cid) (api.GlobalPinInfo, error) { _, span := trace.StartSpan(ctx, "cluster/Status") defer span.End() ctx = trace.NewContext(c.ctx, span) @@ -1214,7 +1213,7 @@ func (c *Cluster) Status(ctx context.Context, h cid.Cid) (api.GlobalPinInfo, err } // StatusLocal returns this peer's PinInfo for a given Cid. -func (c *Cluster) StatusLocal(ctx context.Context, h cid.Cid) api.PinInfo { +func (c *Cluster) StatusLocal(ctx context.Context, h api.Cid) api.PinInfo { _, span := trace.StartSpan(ctx, "cluster/StatusLocal") defer span.End() ctx = trace.NewContext(c.ctx, span) @@ -1225,8 +1224,8 @@ func (c *Cluster) StatusLocal(ctx context.Context, h cid.Cid) api.PinInfo { // used for RecoverLocal and SyncLocal. func (c *Cluster) localPinInfoOp( ctx context.Context, - h cid.Cid, - f func(context.Context, cid.Cid) (api.PinInfo, error), + h api.Cid, + f func(context.Context, api.Cid) (api.PinInfo, error), ) (pInfo api.PinInfo, err error) { ctx, span := trace.StartSpan(ctx, "cluster/localPinInfoOp") defer span.End() @@ -1284,7 +1283,7 @@ func (c *Cluster) RecoverAllLocal(ctx context.Context, out chan<- api.PinInfo) e // Recover operations ask IPFS to pin or unpin items in error state. Recover // is faster than calling Pin on the same CID as it avoids committing an // identical pin to the consensus layer. -func (c *Cluster) Recover(ctx context.Context, h cid.Cid) (api.GlobalPinInfo, error) { +func (c *Cluster) Recover(ctx context.Context, h api.Cid) (api.GlobalPinInfo, error) { _, span := trace.StartSpan(ctx, "cluster/Recover") defer span.End() ctx = trace.NewContext(c.ctx, span) @@ -1298,7 +1297,7 @@ func (c *Cluster) Recover(ctx context.Context, h cid.Cid) (api.GlobalPinInfo, er // Recover operations ask IPFS to pin or unpin items in error state. Recover // is faster than calling Pin on the same CID as it avoids committing an // identical pin to the consensus layer. -func (c *Cluster) RecoverLocal(ctx context.Context, h cid.Cid) (api.PinInfo, error) { +func (c *Cluster) RecoverLocal(ctx context.Context, h api.Cid) (api.PinInfo, error) { _, span := trace.StartSpan(ctx, "cluster/RecoverLocal") defer span.End() ctx = trace.NewContext(c.ctx, span) @@ -1353,7 +1352,7 @@ func (c *Cluster) pinsSlice(ctx context.Context) ([]api.Pin, error) { // assigned for the requested Cid, but does not indicate if // the item is successfully pinned. For that, use Status(). PinGet // returns an error if the given Cid is not part of the global state. -func (c *Cluster) PinGet(ctx context.Context, h cid.Cid) (api.Pin, error) { +func (c *Cluster) PinGet(ctx context.Context, h api.Cid) (api.Pin, error) { _, span := trace.StartSpan(ctx, "cluster/PinGet") defer span.End() ctx = trace.NewContext(c.ctx, span) @@ -1390,7 +1389,7 @@ func (c *Cluster) PinGet(ctx context.Context, h cid.Cid) (api.Pin, error) { // // If the Update option is set, the pin options (including allocations) will // be copied from an existing one. This is equivalent to running PinUpdate. -func (c *Cluster) Pin(ctx context.Context, h cid.Cid, opts api.PinOptions) (api.Pin, error) { +func (c *Cluster) Pin(ctx context.Context, h api.Cid, opts api.PinOptions) (api.Pin, error) { _, span := trace.StartSpan(ctx, "cluster/Pin") defer span.End() @@ -1521,12 +1520,12 @@ func (c *Cluster) pin( return api.Pin{}, false, errFollowerMode } - if pin.Cid == cid.Undef { + if !pin.Cid.Defined() { return pin, false, errors.New("bad pin object") } // Handle pin updates when the option is set - if update := pin.PinUpdate; update != cid.Undef && !update.Equals(pin.Cid) { + if update := pin.PinUpdate; update.Defined() && !update.Equals(pin.Cid) { pin, err := c.PinUpdate(ctx, update, pin.Cid, pin.PinOptions) return pin, true, err } @@ -1600,7 +1599,7 @@ func (c *Cluster) pin( // // Unpin does not reflect the success or failure of underlying IPFS daemon // unpinning operations, which happen in async fashion. -func (c *Cluster) Unpin(ctx context.Context, h cid.Cid) (api.Pin, error) { +func (c *Cluster) Unpin(ctx context.Context, h api.Cid) (api.Pin, error) { _, span := trace.StartSpan(ctx, "cluster/Unpin") defer span.End() ctx = trace.NewContext(c.ctx, span) @@ -1670,7 +1669,7 @@ func (c *Cluster) unpinClusterDag(metaPin api.Pin) error { // IPFSConnector supports it - the default one does). This may offer // significant speed when pinning items which are similar to previously pinned // content. -func (c *Cluster) PinUpdate(ctx context.Context, from cid.Cid, to cid.Cid, opts api.PinOptions) (api.Pin, error) { +func (c *Cluster) PinUpdate(ctx context.Context, from api.Cid, to api.Cid, opts api.PinOptions) (api.Pin, error) { existing, err := c.PinGet(ctx, from) if err != nil { // including when the existing pin is not found return api.Pin{}, err @@ -1728,7 +1727,7 @@ func (c *Cluster) UnpinPath(ctx context.Context, path string) (api.Pin, error) { // pipeline is used to DAGify the file. Depending on input parameters this // DAG can be added locally to the calling cluster peer's ipfs repo, or // sharded across the entire cluster. -func (c *Cluster) AddFile(ctx context.Context, reader *multipart.Reader, params api.AddParams) (cid.Cid, error) { +func (c *Cluster) AddFile(ctx context.Context, reader *multipart.Reader, params api.AddParams) (api.Cid, error) { // TODO: add context param and tracing var dags adder.ClusterDAGService @@ -1842,7 +1841,7 @@ func (c *Cluster) getTrustedPeers(ctx context.Context, exclude peer.ID) ([]peer. return trustedPeers, nil } -func (c *Cluster) setTrackerStatus(gpin *api.GlobalPinInfo, h cid.Cid, peers []peer.ID, status api.TrackerStatus, pin api.Pin, t time.Time) { +func (c *Cluster) setTrackerStatus(gpin *api.GlobalPinInfo, h api.Cid, peers []peer.ID, status api.TrackerStatus, pin api.Pin, t time.Time) { for _, p := range peers { pv := pingValueFromMetric(c.monitor.LatestForPeer(c.ctx, pingMetricName, p)) gpin.Add(api.PinInfo{ @@ -1864,7 +1863,7 @@ func (c *Cluster) setTrackerStatus(gpin *api.GlobalPinInfo, h cid.Cid, peers []p } } -func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h cid.Cid) (api.GlobalPinInfo, error) { +func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h api.Cid) (api.GlobalPinInfo, error) { ctx, span := trace.StartSpan(ctx, "cluster/globalPinInfoCid") defer span.End() @@ -2008,7 +2007,7 @@ func (c *Cluster) globalPinInfoStream(ctx context.Context, comp, method string, inChan = emptyChan } - fullMap := make(map[cid.Cid]api.GlobalPinInfo) + fullMap := make(map[api.Cid]api.GlobalPinInfo) var members []peer.ID var err error @@ -2141,7 +2140,7 @@ func (c *Cluster) getIDForPeer(ctx context.Context, pid peer.ID) (*api.ID, error // that order (the MetaPin is the last element). // It returns a slice with only the given Cid if it's not a known Cid or not a // MetaPin. -func (c *Cluster) cidsFromMetaPin(ctx context.Context, h cid.Cid) ([]cid.Cid, error) { +func (c *Cluster) cidsFromMetaPin(ctx context.Context, h api.Cid) ([]api.Cid, error) { ctx, span := trace.StartSpan(ctx, "cluster/cidsFromMetaPin") defer span.End() @@ -2150,7 +2149,7 @@ func (c *Cluster) cidsFromMetaPin(ctx context.Context, h cid.Cid) ([]cid.Cid, er return nil, err } - list := []cid.Cid{h} + list := []api.Cid{h} pin, err := cState.Get(ctx, h) if err != nil { @@ -2164,7 +2163,7 @@ func (c *Cluster) cidsFromMetaPin(ctx context.Context, h cid.Cid) ([]cid.Cid, er if pin.Reference == nil { return nil, errors.New("metaPin.Reference is unset") } - list = append([]cid.Cid{*pin.Reference}, list...) + list = append([]api.Cid{*pin.Reference}, list...) clusterDagPin, err := c.PinGet(ctx, *pin.Reference) if err != nil { return list, fmt.Errorf("could not get clusterDAG pin from state. Malformed pin?: %s", err) @@ -2180,7 +2179,7 @@ func (c *Cluster) cidsFromMetaPin(ctx context.Context, h cid.Cid) ([]cid.Cid, er return list, fmt.Errorf("error parsing clusterDAG block: %s", err) } for _, l := range clusterDagNode.Links() { - list = append([]cid.Cid{l.Cid}, list...) + list = append([]api.Cid{api.NewCid(l.Cid)}, list...) } return list, nil diff --git a/cluster_test.go b/cluster_test.go index 51e23438..b0d64438 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -21,7 +21,6 @@ import ( "github.com/ipfs/ipfs-cluster/test" "github.com/ipfs/ipfs-cluster/version" - cid "github.com/ipfs/go-cid" gopath "github.com/ipfs/go-path" peer "github.com/libp2p/go-libp2p-core/peer" rpc "github.com/libp2p/go-libp2p-gorpc" @@ -68,7 +67,7 @@ func (ipfs *mockConnector) Pin(ctx context.Context, pin api.Pin) error { return nil } -func (ipfs *mockConnector) Unpin(ctx context.Context, c cid.Cid) error { +func (ipfs *mockConnector) Unpin(ctx context.Context, c api.Cid) error { ipfs.pins.Delete(c) return nil } @@ -96,7 +95,7 @@ func (ipfs *mockConnector) PinLs(ctx context.Context, in []string, out chan<- ap default: st = api.IPFSPinStatusRecursive } - c := k.(cid.Cid) + c := k.(api.Cid) out <- api.IPFSPinInfo{Cid: api.Cid(c), Type: st} return true @@ -123,10 +122,10 @@ func (ipfs *mockConnector) RepoGC(ctx context.Context) (api.RepoGC, error) { }, nil } -func (ipfs *mockConnector) Resolve(ctx context.Context, path string) (cid.Cid, error) { +func (ipfs *mockConnector) Resolve(ctx context.Context, path string) (api.Cid, error) { _, err := gopath.ParsePath(path) if err != nil { - return cid.Undef, err + return api.CidUndef, err } return test.CidResolved, nil @@ -141,7 +140,7 @@ func (ipfs *mockConnector) BlockStream(ctx context.Context, in <-chan api.NodeWi return nil } -func (ipfs *mockConnector) BlockGet(ctx context.Context, c cid.Cid) ([]byte, error) { +func (ipfs *mockConnector) BlockGet(ctx context.Context, c api.Cid) ([]byte, error) { d, ok := ipfs.blocks.Load(c.String()) if !ok { return nil, errors.New("block not found") @@ -444,7 +443,7 @@ func TestUnpinShard(t *testing.T) { sharding.VerifyShards(t, root, cl, cl.ipfs, 14) // skipping errors, VerifyShards has checked - pinnedCids := []cid.Cid{} + pinnedCids := []api.Cid{} pinnedCids = append(pinnedCids, root) metaPin, _ := cl.PinGet(ctx, root) cDag, _ := cl.PinGet(ctx, *metaPin.Reference) @@ -452,7 +451,7 @@ func TestUnpinShard(t *testing.T) { cDagBlock, _ := cl.ipfs.BlockGet(ctx, cDag.Cid) cDagNode, _ := sharding.CborDataToNode(cDagBlock, "cbor") for _, l := range cDagNode.Links() { - pinnedCids = append(pinnedCids, l.Cid) + pinnedCids = append(pinnedCids, api.NewCid(l.Cid)) } t.Run("unpin clusterdag should fail", func(t *testing.T) { @@ -464,7 +463,7 @@ func TestUnpinShard(t *testing.T) { }) t.Run("unpin shard should fail", func(t *testing.T) { - _, err := cl.Unpin(ctx, cDagNode.Links()[0].Cid) + _, err := cl.Unpin(ctx, api.NewCid(cDagNode.Links()[0].Cid)) if err == nil { t.Fatal("should not allow unpinning shards directly") } @@ -504,10 +503,10 @@ func TestUnpinShard(t *testing.T) { // cShard, _ := cid.Decode(test.ShardCid) // cCdag, _ := cid.Decode(test.CdagCid) // cMeta, _ := cid.Decode(test.MetaRootCid) -// pinMeta(t, cl, []cid.Cid{cShard}, cCdag, cMeta) +// pinMeta(t, cl, []api.NewCid(cShard), cCdag, cMeta) // } -// func pinMeta(t *testing.T, cl *Cluster, shardCids []cid.Cid, cCdag, cMeta cid.Cid) { +// func pinMeta(t *testing.T, cl *Cluster, shardCids []api.Cid, cCdag, cMeta api.Cid) { // for _, cShard := range shardCids { // shardPin := api.Pin{ // Cid: cShard, @@ -609,7 +608,7 @@ func TestUnpinShard(t *testing.T) { // cShard2, _ := cid.Decode(test.ShardCid2) // cCdag2, _ := cid.Decode(test.CdagCid2) // cMeta2, _ := cid.Decode(test.MetaRootCid2) -// pinMeta(t, cl, []cid.Cid{cShard, cShard2}, cCdag2, cMeta2) +// pinMeta(t, cl, []api.Cid{cShard, cShard2}, cCdag2, cMeta2) // shardPin, err := cl.PinGet(cShard) // if err != nil { diff --git a/cmd/ipfs-cluster-ctl/main.go b/cmd/ipfs-cluster-ctl/main.go index c87582db..9d1aca1f 100644 --- a/cmd/ipfs-cluster-ctl/main.go +++ b/cmd/ipfs-cluster-ctl/main.go @@ -15,7 +15,6 @@ import ( "github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/api/rest/client" - cid "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" peer "github.com/libp2p/go-libp2p-core/peer" ma "github.com/multiformats/go-multiaddr" @@ -784,7 +783,7 @@ existing item from the cluster. Please run "pin rm" for that. from := c.Args().Get(0) to := c.Args().Get(1) - fromCid, err := cid.Decode(from) + fromCid, err := api.DecodeCid(from) checkErr("parsing from Cid", err) var expireAt time.Time @@ -842,7 +841,7 @@ The filter only takes effect when listing all pins. The possible values are: Action: func(c *cli.Context) error { cidStr := c.Args().First() if cidStr != "" { - ci, err := cid.Decode(cidStr) + ci, err := api.DecodeCid(cidStr) checkErr("parsing cid", err) resp, cerr := globalClient.Allocation(ctx, ci) formatResponse(c, resp, cerr) @@ -889,9 +888,9 @@ separated list). The following are valid status values: }, Action: func(c *cli.Context) error { cidsStr := c.Args() - cids := make([]cid.Cid, len(cidsStr)) + cids := make([]api.Cid, len(cidsStr)) for i, cStr := range cidsStr { - ci, err := cid.Decode(cStr) + ci, err := api.DecodeCid(cStr) checkErr("parsing cid", err) cids[i] = ci } @@ -944,7 +943,7 @@ operations on the contacted peer (as opposed to on every peer). Action: func(c *cli.Context) error { cidStr := c.Args().First() if cidStr != "" { - ci, err := cid.Decode(cidStr) + ci, err := api.DecodeCid(cidStr) checkErr("parsing cid", err) resp, cerr := globalClient.Recover(ctx, ci, c.Bool("local")) formatResponse(c, resp, cerr) @@ -1215,7 +1214,7 @@ func handlePinResponseFormatFlags( } func waitFor( - ci cid.Cid, + ci api.Cid, target api.TrackerStatus, timeout time.Duration, limit int, diff --git a/cmd/ipfs-cluster-follow/commands.go b/cmd/ipfs-cluster-follow/commands.go index 6d1cf13c..b749b3e7 100644 --- a/cmd/ipfs-cluster-follow/commands.go +++ b/cmd/ipfs-cluster-follow/commands.go @@ -9,7 +9,6 @@ import ( "strings" "time" - "github.com/ipfs/go-cid" ipfscluster "github.com/ipfs/ipfs-cluster" "github.com/ipfs/ipfs-cluster/allocator/balanced" "github.com/ipfs/ipfs-cluster/api" @@ -548,7 +547,7 @@ func printStatusOffline(cfgHelper *cmdutils.ConfigHelper) error { return err } -func printPin(c cid.Cid, status, name, err string) { +func printPin(c api.Cid, status, name, err string) { if err != "" { name = name + " (" + err + ")" } diff --git a/consensus/crdt/consensus.go b/consensus/crdt/consensus.go index 2e9b9103..bdcf8e4c 100644 --- a/consensus/crdt/consensus.go +++ b/consensus/crdt/consensus.go @@ -1,3 +1,5 @@ +// Package crdt implements the IPFS Cluster consensus interface using +// CRDT-datastore to replicate the cluster global state to every peer. package crdt import ( @@ -8,7 +10,6 @@ import ( "sync" "time" - "github.com/ipfs/go-cid" "github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/pstoremgr" "github.com/ipfs/ipfs-cluster/state" @@ -238,7 +239,7 @@ func (css *Consensus) setup() { logger.Error(err, k) return } - c, err := cid.Cast(kb) + c, err := api.CastCid(kb) if err != nil { logger.Error(err, k) return diff --git a/consensus/crdt/consensus_test.go b/consensus/crdt/consensus_test.go index 083f85c6..670229e5 100644 --- a/consensus/crdt/consensus_test.go +++ b/consensus/crdt/consensus_test.go @@ -10,7 +10,6 @@ import ( "github.com/ipfs/ipfs-cluster/datastore/inmem" "github.com/ipfs/ipfs-cluster/test" - cid "github.com/ipfs/go-cid" ipns "github.com/ipfs/go-ipns" libp2p "github.com/libp2p/go-libp2p" host "github.com/libp2p/go-libp2p-core/host" @@ -87,7 +86,7 @@ func clean(t *testing.T, cc *Consensus) { } } -func testPin(c cid.Cid) api.Pin { +func testPin(c api.Cid) api.Pin { p := api.PinCid(c) p.ReplicationFactorMin = -1 p.ReplicationFactorMax = -1 @@ -453,7 +452,7 @@ func TestBatching(t *testing.T) { } // Pin 4 things, and check that 3 are commited - for _, c := range []cid.Cid{test.Cid2, test.Cid3, test.Cid4, test.Cid5} { + for _, c := range []api.Cid{test.Cid2, test.Cid3, test.Cid4, test.Cid5} { err = cc.LogPin(ctx, testPin(c)) if err != nil { t.Error(err) diff --git a/consensus/raft/consensus_test.go b/consensus/raft/consensus_test.go index d158901d..bf49d0a2 100644 --- a/consensus/raft/consensus_test.go +++ b/consensus/raft/consensus_test.go @@ -12,7 +12,6 @@ import ( "github.com/ipfs/ipfs-cluster/state/dsstate" "github.com/ipfs/ipfs-cluster/test" - cid "github.com/ipfs/go-cid" libp2p "github.com/libp2p/go-libp2p" host "github.com/libp2p/go-libp2p-core/host" peerstore "github.com/libp2p/go-libp2p-core/peerstore" @@ -22,7 +21,7 @@ func cleanRaft(idn int) { os.RemoveAll(fmt.Sprintf("raftFolderFromTests-%d", idn)) } -func testPin(c cid.Cid) api.Pin { +func testPin(c api.Cid) api.Pin { p := api.PinCid(c) p.ReplicationFactorMin = -1 p.ReplicationFactorMax = -1 diff --git a/ipfscluster.go b/ipfscluster.go index d21e0a63..ad820222 100644 --- a/ipfscluster.go +++ b/ipfscluster.go @@ -12,7 +12,6 @@ import ( "github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/state" - cid "github.com/ipfs/go-cid" peer "github.com/libp2p/go-libp2p-core/peer" rpc "github.com/libp2p/go-libp2p-gorpc" ) @@ -76,7 +75,7 @@ type IPFSConnector interface { Component ID(context.Context) (api.IPFSID, error) Pin(context.Context, api.Pin) error - Unpin(context.Context, cid.Cid) error + Unpin(context.Context, api.Cid) error PinLsCid(context.Context, api.Pin) (api.IPFSPinStatus, error) // PinLs returns pins in the pinset of the given types (recursive, direct...) PinLs(ctx context.Context, typeFilters []string, out chan<- api.IPFSPinInfo) error @@ -94,11 +93,11 @@ type IPFSConnector interface { // RepoGC performs garbage collection sweep on the IPFS repo. RepoGC(context.Context) (api.RepoGC, error) // Resolve returns a cid given a path. - Resolve(context.Context, string) (cid.Cid, error) + Resolve(context.Context, string) (api.Cid, error) // BlockStream adds a stream of blocks to IPFS. BlockStream(context.Context, <-chan api.NodeWithMeta) error // BlockGet retrieves the raw data of an IPFS block. - BlockGet(context.Context, cid.Cid) ([]byte, error) + BlockGet(context.Context, api.Cid) ([]byte, error) } // Peered represents a component which needs to be aware of the peers @@ -119,16 +118,16 @@ type PinTracker interface { Track(context.Context, api.Pin) error // Untrack tells the tracker that a Cid is to be forgotten. The tracker // may perform an IPFS unpin operation. - Untrack(context.Context, cid.Cid) error + Untrack(context.Context, api.Cid) error // StatusAll returns the list of pins with their local status. Takes a // filter to specify which statuses to report. StatusAll(context.Context, api.TrackerStatus, chan<- api.PinInfo) error // Status returns the local status of a given Cid. - Status(context.Context, cid.Cid) api.PinInfo + Status(context.Context, api.Cid) api.PinInfo // RecoverAll calls Recover() for all pins tracked. RecoverAll(context.Context, chan<- api.PinInfo) error // Recover retriggers a Pin/Unpin operation in a Cids with error status. - Recover(context.Context, cid.Cid) (api.PinInfo, error) + Recover(context.Context, api.Cid) (api.PinInfo, error) } // Informer provides Metric information from a peer. The metrics produced by @@ -155,7 +154,7 @@ type PinAllocator interface { // which are currently pinning the content. The candidates map // contains the metrics for all peers which are eligible for pinning // the content. - Allocate(ctx context.Context, c cid.Cid, current, candidates, priority api.MetricsSet) ([]peer.ID, error) + Allocate(ctx context.Context, c api.Cid, current, candidates, priority api.MetricsSet) ([]peer.ID, error) // Metrics returns the list of metrics that the allocator needs. Metrics() []string } diff --git a/ipfscluster_test.go b/ipfscluster_test.go index 1807fe97..3e4d904e 100644 --- a/ipfscluster_test.go +++ b/ipfscluster_test.go @@ -656,7 +656,7 @@ func TestClustersPin(t *testing.T) { if err != nil { t.Fatal(err) } - _, err = clusters[j].Pin(ctx, h, api.PinOptions{}) + _, err = clusters[j].Pin(ctx, api.NewCid(h), api.PinOptions{}) if err != nil { t.Errorf("error pinning %s: %s", h, err) } @@ -759,12 +759,12 @@ func TestClustersPinUpdate(t *testing.T) { h, _ := prefix.Sum(randomBytes()) // create random cid h2, _ := prefix.Sum(randomBytes()) // create random cid - _, err := clusters[0].PinUpdate(ctx, h, h2, api.PinOptions{}) + _, err := clusters[0].PinUpdate(ctx, api.NewCid(h), api.NewCid(h2), api.PinOptions{}) if err == nil || err != state.ErrNotFound { t.Fatal("pin update should fail when from is not pinned") } - _, err = clusters[0].Pin(ctx, h, api.PinOptions{}) + _, err = clusters[0].Pin(ctx, api.NewCid(h), api.PinOptions{}) if err != nil { t.Errorf("error pinning %s: %s", h, err) } @@ -773,12 +773,12 @@ func TestClustersPinUpdate(t *testing.T) { expiry := time.Now().AddDate(1, 0, 0) opts2 := api.PinOptions{ UserAllocations: []peer.ID{clusters[0].host.ID()}, // should not be used - PinUpdate: h, + PinUpdate: api.NewCid(h), Name: "new name", ExpireAt: expiry, } - _, err = clusters[0].Pin(ctx, h2, opts2) // should call PinUpdate + _, err = clusters[0].Pin(ctx, api.NewCid(h2), opts2) // should call PinUpdate if err != nil { t.Errorf("error pin-updating %s: %s", h2, err) } @@ -786,7 +786,7 @@ func TestClustersPinUpdate(t *testing.T) { pinDelay() f := func(t *testing.T, c *Cluster) { - pinget, err := c.PinGet(ctx, h2) + pinget, err := c.PinGet(ctx, api.NewCid(h2)) if err != nil { t.Fatal(err) } @@ -821,7 +821,7 @@ func TestClustersPinDirect(t *testing.T) { h, _ := prefix.Sum(randomBytes()) // create random cid - _, err := clusters[0].Pin(ctx, h, api.PinOptions{Mode: api.PinModeDirect}) + _, err := clusters[0].Pin(ctx, api.NewCid(h), api.PinOptions{Mode: api.PinModeDirect}) if err != nil { t.Fatal(err) } @@ -829,7 +829,7 @@ func TestClustersPinDirect(t *testing.T) { pinDelay() f := func(t *testing.T, c *Cluster, mode api.PinMode) { - pinget, err := c.PinGet(ctx, h) + pinget, err := c.PinGet(ctx, api.NewCid(h)) if err != nil { t.Fatal(err) } @@ -842,7 +842,7 @@ func TestClustersPinDirect(t *testing.T) { t.Errorf("pin should have max-depth %d but has %d", mode.ToPinDepth(), pinget.MaxDepth) } - pInfo := c.StatusLocal(ctx, h) + pInfo := c.StatusLocal(ctx, api.NewCid(h)) if pInfo.Error != "" { t.Error(pInfo.Error) } @@ -857,7 +857,7 @@ func TestClustersPinDirect(t *testing.T) { }) // Convert into a recursive mode - _, err = clusters[0].Pin(ctx, h, api.PinOptions{Mode: api.PinModeRecursive}) + _, err = clusters[0].Pin(ctx, api.NewCid(h), api.PinOptions{Mode: api.PinModeRecursive}) if err != nil { t.Fatal(err) } @@ -869,7 +869,7 @@ func TestClustersPinDirect(t *testing.T) { }) // This should fail as we cannot convert back to direct - _, err = clusters[0].Pin(ctx, h, api.PinOptions{Mode: api.PinModeDirect}) + _, err = clusters[0].Pin(ctx, api.NewCid(h), api.PinOptions{Mode: api.PinModeDirect}) if err == nil { t.Error("a recursive pin cannot be converted back to direct pin") } @@ -1229,14 +1229,14 @@ func TestClustersReplicationOverall(t *testing.T) { if err != nil { t.Fatal(err) } - _, err = clusters[j].Pin(ctx, h, api.PinOptions{}) + _, err = clusters[j].Pin(ctx, api.NewCid(h), api.PinOptions{}) if err != nil { t.Error(err) } pinDelay() // check that it is held by exactly nClusters - 1 peers - gpi, err := clusters[j].Status(ctx, h) + gpi, err := clusters[j].Status(ctx, api.NewCid(h)) if err != nil { t.Fatal(err) } diff --git a/ipfsconn/ipfshttp/ipfshttp.go b/ipfsconn/ipfshttp/ipfshttp.go index c1129834..45e68305 100644 --- a/ipfsconn/ipfshttp/ipfshttp.go +++ b/ipfsconn/ipfshttp/ipfshttp.go @@ -353,7 +353,7 @@ func (ipfs *Connector) Pin(ctx context.Context, pin api.Pin) error { // If we have a pin-update, and the old object // is pinned recursively, then do pin/update. // Otherwise do a normal pin. - if from := pin.PinUpdate; from != cid.Undef { + if from := pin.PinUpdate; from.Defined() { fromPin := api.PinWithOpts(from, pin.PinOptions) pinStatus, _ := ipfs.PinLsCid(ctx, fromPin) if pinStatus.IsPinned(-1) { // pinned recursively. @@ -408,7 +408,7 @@ func (ipfs *Connector) Pin(ctx context.Context, pin api.Pin) error { // pinProgress pins an item and sends fetched node's progress on a // channel. Blocks until done or error. pinProgress will always close the out // channel. pinProgress will not block on sending to the channel if it is full. -func (ipfs *Connector) pinProgress(ctx context.Context, hash cid.Cid, maxDepth api.PinDepth, out chan<- int) error { +func (ipfs *Connector) pinProgress(ctx context.Context, hash api.Cid, maxDepth api.PinDepth, out chan<- int) error { defer close(out) ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/pinsProgress") @@ -451,7 +451,7 @@ func (ipfs *Connector) pinProgress(ctx context.Context, hash cid.Cid, maxDepth a } } -func (ipfs *Connector) pinUpdate(ctx context.Context, from, to cid.Cid) error { +func (ipfs *Connector) pinUpdate(ctx context.Context, from, to api.Cid) error { ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/pinUpdate") defer span.End() @@ -467,7 +467,7 @@ func (ipfs *Connector) pinUpdate(ctx context.Context, from, to cid.Cid) error { // Unpin performs an unpin request against the configured IPFS // daemon. -func (ipfs *Connector) Unpin(ctx context.Context, hash cid.Cid) error { +func (ipfs *Connector) Unpin(ctx context.Context, hash api.Cid) error { ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/Unpin") defer span.End() @@ -834,23 +834,23 @@ func (ipfs *Connector) RepoGC(ctx context.Context) (api.RepoGC, error) { } } - repoGC.Keys = append(repoGC.Keys, api.IPFSRepoGC{Key: resp.Key, Error: resp.Error}) + repoGC.Keys = append(repoGC.Keys, api.IPFSRepoGC{Key: api.NewCid(resp.Key), Error: resp.Error}) } } // Resolve accepts ipfs or ipns path and resolves it into a cid -func (ipfs *Connector) Resolve(ctx context.Context, path string) (cid.Cid, error) { +func (ipfs *Connector) Resolve(ctx context.Context, path string) (api.Cid, error) { ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/Resolve") defer span.End() validPath, err := gopath.ParsePath(path) if err != nil { logger.Error("could not parse path: " + err.Error()) - return cid.Undef, err + return api.CidUndef, err } if !strings.HasPrefix(path, "/ipns") && validPath.IsJustAKey() { ci, _, err := gopath.SplitAbsPath(validPath) - return ci, err + return api.NewCid(ci), err } ctx, cancel := context.WithTimeout(ctx, ipfs.config.IPFSRequestTimeout) @@ -858,18 +858,18 @@ func (ipfs *Connector) Resolve(ctx context.Context, path string) (cid.Cid, error res, err := ipfs.postCtx(ctx, "resolve?arg="+url.QueryEscape(path), "", nil) if err != nil { logger.Error(err) - return cid.Undef, err + return api.CidUndef, err } var resp ipfsResolveResp err = json.Unmarshal(res, &resp) if err != nil { logger.Error("could not unmarshal response: " + err.Error()) - return cid.Undef, err + return api.CidUndef, err } ci, _, err := gopath.SplitAbsPath(gopath.FromString(resp.Path)) - return ci, err + return api.NewCid(ci), err } // SwarmPeers returns the peers currently connected to this ipfs daemon. @@ -950,15 +950,15 @@ func (ci *chanIterator) Node() files.Node { return nil } ci.seenMu.Lock() - ci.seen.Add(ci.current.Cid) + ci.seen.Add(ci.current.Cid.Cid) ci.seenMu.Unlock() return files.NewBytesFile(ci.current.Data) } func (ci *chanIterator) Seen(c api.Cid) bool { ci.seenMu.Lock() - has := ci.seen.Has(cid.Cid(c)) - ci.seen.Remove(cid.Cid(c)) + has := ci.seen.Has(c.Cid) + ci.seen.Remove(c.Cid) ci.seenMu.Unlock() return has } @@ -1117,7 +1117,7 @@ func (ipfs *Connector) BlockStream(ctx context.Context, blocks <-chan api.NodeWi } // BlockGet retrieves an ipfs block with the given cid -func (ipfs *Connector) BlockGet(ctx context.Context, c cid.Cid) ([]byte, error) { +func (ipfs *Connector) BlockGet(ctx context.Context, c api.Cid) ([]byte, error) { ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/BlockGet") defer span.End() @@ -1129,7 +1129,7 @@ func (ipfs *Connector) BlockGet(ctx context.Context, c cid.Cid) ([]byte, error) // // FetchRefs asks IPFS to download blocks recursively to the given depth. // // It discards the response, but waits until it completes. -// func (ipfs *Connector) FetchRefs(ctx context.Context, c cid.Cid, maxDepth int) error { +// func (ipfs *Connector) FetchRefs(ctx context.Context, c api.Cid, maxDepth int) error { // ctx, cancel := context.WithTimeout(ipfs.ctx, ipfs.config.PinTimeout) // defer cancel() diff --git a/observations/metrics.go b/observations/metrics.go index 28ef241f..9a9dc5e3 100644 --- a/observations/metrics.go +++ b/observations/metrics.go @@ -1,3 +1,4 @@ +// Package observations sets up metric and trace exporting for IPFS cluster. package observations import ( diff --git a/peer_manager_test.go b/peer_manager_test.go index b000f206..c12a1da8 100644 --- a/peer_manager_test.go +++ b/peer_manager_test.go @@ -12,7 +12,6 @@ import ( "github.com/ipfs/ipfs-cluster/config" "github.com/ipfs/ipfs-cluster/test" - cid "github.com/ipfs/go-cid" host "github.com/libp2p/go-libp2p-core/host" peer "github.com/libp2p/go-libp2p-core/peer" ma "github.com/multiformats/go-multiaddr" @@ -462,7 +461,7 @@ func TestClustersPeerRemoveReallocsPins(t *testing.T) { if err != nil { t.Fatal(err) } - _, err = chosen.Pin(ctx, h, api.PinOptions{}) + _, err = chosen.Pin(ctx, api.NewCid(h), api.PinOptions{}) if err != nil { t.Fatal(err) } @@ -474,7 +473,7 @@ func TestClustersPeerRemoveReallocsPins(t *testing.T) { // At this point, all peers must have nClusters -1 pins // associated to them. // Find out which pins are associated to the chosen peer. - interestingCids := []cid.Cid{} + interestingCids := []api.Cid{} pins, err := chosen.pinsSlice(ctx) if err != nil { diff --git a/pintracker/optracker/operation.go b/pintracker/optracker/operation.go index e4464352..c9f9e08e 100644 --- a/pintracker/optracker/operation.go +++ b/pintracker/optracker/operation.go @@ -7,8 +7,6 @@ import ( "sync" "time" - cid "github.com/ipfs/go-cid" - "github.com/ipfs/ipfs-cluster/api" "go.opencensus.io/trace" ) @@ -109,7 +107,7 @@ func (op *Operation) String() string { } // Cid returns the Cid associated to this operation. -func (op *Operation) Cid() cid.Cid { +func (op *Operation) Cid() api.Cid { return op.pin.Cid } diff --git a/pintracker/optracker/operationtracker.go b/pintracker/optracker/operationtracker.go index c0a853cf..767540b7 100644 --- a/pintracker/optracker/operationtracker.go +++ b/pintracker/optracker/operationtracker.go @@ -14,7 +14,6 @@ import ( "github.com/ipfs/ipfs-cluster/api" - cid "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" peer "github.com/libp2p/go-libp2p-core/peer" @@ -30,7 +29,7 @@ type OperationTracker struct { peerName string mu sync.RWMutex - operations map[cid.Cid]*Operation + operations map[api.Cid]*Operation } func (opt *OperationTracker) String() string { @@ -57,7 +56,7 @@ func NewOperationTracker(ctx context.Context, pid peer.ID, peerName string) *Ope ctx: ctx, pid: pid, peerName: peerName, - operations: make(map[cid.Cid]*Operation), + operations: make(map[api.Cid]*Operation), } } @@ -107,7 +106,7 @@ func (opt *OperationTracker) Clean(ctx context.Context, op *Operation) { // Status returns the TrackerStatus associated to the last operation known // with the given Cid. It returns false if we are not tracking any operation // for the given Cid. -func (opt *OperationTracker) Status(ctx context.Context, c cid.Cid) (api.TrackerStatus, bool) { +func (opt *OperationTracker) Status(ctx context.Context, c api.Cid) (api.TrackerStatus, bool) { opt.mu.RLock() defer opt.mu.RUnlock() op, ok := opt.operations[c] @@ -122,7 +121,7 @@ func (opt *OperationTracker) Status(ctx context.Context, c cid.Cid) (api.Tracker // is PhaseDone. Any other phases are considered in-flight and not touched. // For things already in error, the error message is updated. // Remote pins are ignored too. -func (opt *OperationTracker) SetError(ctx context.Context, c cid.Cid, err error) { +func (opt *OperationTracker) SetError(ctx context.Context, c api.Cid, err error) { opt.mu.Lock() defer opt.mu.Unlock() op, ok := opt.operations[c] @@ -143,7 +142,7 @@ func (opt *OperationTracker) SetError(ctx context.Context, c cid.Cid, err error) func (opt *OperationTracker) unsafePinInfo(ctx context.Context, op *Operation, ipfs api.IPFSID) api.PinInfo { if op == nil { return api.PinInfo{ - Cid: cid.Undef, + Cid: api.CidUndef, Peer: opt.pid, Name: "", PinInfoShort: api.PinInfoShort{ @@ -175,7 +174,7 @@ func (opt *OperationTracker) unsafePinInfo(ctx context.Context, op *Operation, i } // Get returns a PinInfo object for Cid. -func (opt *OperationTracker) Get(ctx context.Context, c cid.Cid, ipfs api.IPFSID) api.PinInfo { +func (opt *OperationTracker) Get(ctx context.Context, c api.Cid, ipfs api.IPFSID) api.PinInfo { ctx, span := trace.StartSpan(ctx, "optracker/GetAll") defer span.End() @@ -183,7 +182,7 @@ func (opt *OperationTracker) Get(ctx context.Context, c cid.Cid, ipfs api.IPFSID defer opt.mu.RUnlock() op := opt.operations[c] pInfo := opt.unsafePinInfo(ctx, op, ipfs) - if pInfo.Cid == cid.Undef { + if !pInfo.Cid.Defined() { pInfo.Cid = c } return pInfo @@ -191,7 +190,7 @@ func (opt *OperationTracker) Get(ctx context.Context, c cid.Cid, ipfs api.IPFSID // GetExists returns a PinInfo object for a Cid only if there exists // an associated Operation. -func (opt *OperationTracker) GetExists(ctx context.Context, c cid.Cid, ipfs api.IPFSID) (api.PinInfo, bool) { +func (opt *OperationTracker) GetExists(ctx context.Context, c api.Cid, ipfs api.IPFSID) (api.PinInfo, bool) { ctx, span := trace.StartSpan(ctx, "optracker/GetExists") defer span.End() @@ -258,7 +257,7 @@ func (opt *OperationTracker) CleanAllDone(ctx context.Context) { } // OpContext gets the context of an operation, if any. -func (opt *OperationTracker) OpContext(ctx context.Context, c cid.Cid) context.Context { +func (opt *OperationTracker) OpContext(ctx context.Context, c api.Cid) context.Context { opt.mu.RLock() defer opt.mu.RUnlock() op, ok := opt.operations[c] @@ -298,8 +297,8 @@ func (opt *OperationTracker) filterOps(ctx context.Context, filters ...interface return fltops } -func filterOpsMap(ctx context.Context, ops map[cid.Cid]*Operation, filters []interface{}) map[cid.Cid]*Operation { - fltops := make(map[cid.Cid]*Operation) +func filterOpsMap(ctx context.Context, ops map[api.Cid]*Operation, filters []interface{}) map[api.Cid]*Operation { + fltops := make(map[api.Cid]*Operation) if len(filters) < 1 { return nil } @@ -315,7 +314,7 @@ func filterOpsMap(ctx context.Context, ops map[cid.Cid]*Operation, filters []int return filterOpsMap(ctx, fltops, filters) } -func filter(ctx context.Context, in, out map[cid.Cid]*Operation, filter interface{}) { +func filter(ctx context.Context, in, out map[api.Cid]*Operation, filter interface{}) { for _, op := range in { switch filter.(type) { case OperationType: diff --git a/pintracker/optracker/operationtracker_test.go b/pintracker/optracker/operationtracker_test.go index 493c396e..9f23841e 100644 --- a/pintracker/optracker/operationtracker_test.go +++ b/pintracker/optracker/operationtracker_test.go @@ -5,7 +5,6 @@ import ( "errors" "testing" - cid "github.com/ipfs/go-cid" "github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/test" ) @@ -203,7 +202,7 @@ func TestOperationTracker_OpContext(t *testing.T) { func TestOperationTracker_filterOps(t *testing.T) { ctx := context.Background() - testOpsMap := map[cid.Cid]*Operation{ + testOpsMap := map[api.Cid]*Operation{ test.Cid1: {pin: api.PinCid(test.Cid1), opType: OperationPin, phase: PhaseQueued}, test.Cid2: {pin: api.PinCid(test.Cid2), opType: OperationPin, phase: PhaseInProgress}, test.Cid3: {pin: api.PinCid(test.Cid3), opType: OperationUnpin, phase: PhaseInProgress}, diff --git a/pintracker/pintracker_test.go b/pintracker/pintracker_test.go index 9a0305b6..78ef5750 100644 --- a/pintracker/pintracker_test.go +++ b/pintracker/pintracker_test.go @@ -19,7 +19,6 @@ import ( "github.com/ipfs/ipfs-cluster/state/dsstate" "github.com/ipfs/ipfs-cluster/test" - cid "github.com/ipfs/go-cid" peer "github.com/libp2p/go-libp2p-core/peer" ) @@ -139,7 +138,7 @@ func BenchmarkPinTracker_Track(b *testing.B) { func TestPinTracker_Untrack(t *testing.T) { type args struct { - c cid.Cid + c api.Cid tracker ipfscluster.PinTracker } tests := []struct { @@ -273,7 +272,7 @@ func TestPinTracker_StatusAll(t *testing.T) { func TestPinTracker_Status(t *testing.T) { type args struct { - c cid.Cid + c api.Cid tracker ipfscluster.PinTracker } tests := []struct { @@ -397,7 +396,7 @@ func TestPinTracker_RecoverAll(t *testing.T) { func TestPinTracker_Recover(t *testing.T) { type args struct { - c cid.Cid + c api.Cid tracker ipfscluster.PinTracker } tests := []struct { @@ -438,7 +437,7 @@ func TestPinTracker_Recover(t *testing.T) { func TestUntrackTrack(t *testing.T) { type args struct { - c cid.Cid + c api.Cid tracker ipfscluster.PinTracker } tests := []struct { @@ -481,7 +480,7 @@ func TestUntrackTrack(t *testing.T) { func TestTrackUntrackWithCancel(t *testing.T) { type args struct { - c cid.Cid + c api.Cid tracker ipfscluster.PinTracker } tests := []struct { diff --git a/pintracker/stateless/stateless.go b/pintracker/stateless/stateless.go index fd43823a..a6f2caa0 100644 --- a/pintracker/stateless/stateless.go +++ b/pintracker/stateless/stateless.go @@ -14,7 +14,6 @@ import ( "github.com/ipfs/ipfs-cluster/pintracker/optracker" "github.com/ipfs/ipfs-cluster/state" - cid "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" peer "github.com/libp2p/go-libp2p-core/peer" rpc "github.com/libp2p/go-libp2p-gorpc" @@ -315,7 +314,7 @@ func (spt *Tracker) Track(ctx context.Context, c api.Pin) error { // Untrack tells the StatelessPinTracker to stop managing a Cid. // If the Cid is pinned locally, it will be unpinned. -func (spt *Tracker) Untrack(ctx context.Context, c cid.Cid) error { +func (spt *Tracker) Untrack(ctx context.Context, c api.Cid) error { ctx, span := trace.StartSpan(ctx, "tracker/stateless/Untrack") defer span.End() @@ -455,7 +454,7 @@ func (spt *Tracker) StatusAll(ctx context.Context, filter api.TrackerStatus, out } // Status returns information for a Cid pinned to the local IPFS node. -func (spt *Tracker) Status(ctx context.Context, c cid.Cid) api.PinInfo { +func (spt *Tracker) Status(ctx context.Context, c api.Cid) api.PinInfo { ctx, span := trace.StartSpan(ctx, "tracker/stateless/Status") defer span.End() @@ -594,7 +593,7 @@ func (spt *Tracker) RecoverAll(ctx context.Context, out chan<- api.PinInfo) erro // Recover will trigger pinning or unpinning for items in // PinError or UnpinError states. -func (spt *Tracker) Recover(ctx context.Context, c cid.Cid) (api.PinInfo, error) { +func (spt *Tracker) Recover(ctx context.Context, c api.Cid) (api.PinInfo, error) { ctx, span := trace.StartSpan(ctx, "tracker/stateless/Recover") defer span.End() @@ -662,7 +661,7 @@ func (spt *Tracker) ipfsPins(ctx context.Context) (<-chan api.IPFSPinInfo, error // OpContext exports the internal optracker's OpContext method. // For testing purposes only. -func (spt *Tracker) OpContext(ctx context.Context, c cid.Cid) context.Context { +func (spt *Tracker) OpContext(ctx context.Context, c api.Cid) context.Context { return spt.optracker.OpContext(ctx, c) } diff --git a/pintracker/stateless/stateless_test.go b/pintracker/stateless/stateless_test.go index d6f18980..7dffc07e 100644 --- a/pintracker/stateless/stateless_test.go +++ b/pintracker/stateless/stateless_test.go @@ -12,7 +12,6 @@ import ( "github.com/ipfs/ipfs-cluster/state/dsstate" "github.com/ipfs/ipfs-cluster/test" - cid "github.com/ipfs/go-cid" peer "github.com/libp2p/go-libp2p-core/peer" rpc "github.com/libp2p/go-libp2p-gorpc" ) @@ -286,7 +285,7 @@ func TestTrackUntrackWithNoCancel(t *testing.T) { } pi := spt.optracker.Get(ctx, fastPin.Cid, api.IPFSID{}) - if pi.Cid == cid.Undef { + if !pi.Cid.Defined() { t.Error("fastPin should have been removed from tracker") } } @@ -318,7 +317,7 @@ func TestUntrackTrackWithCancel(t *testing.T) { time.Sleep(100 * time.Millisecond) pi := spt.optracker.Get(ctx, slowPin.Cid, api.IPFSID{}) - if pi.Cid == cid.Undef { + if !pi.Cid.Defined() { t.Fatal("expected slowPin to be tracked") } @@ -379,7 +378,7 @@ func TestUntrackTrackWithNoCancel(t *testing.T) { } pi := spt.optracker.Get(ctx, fastPin.Cid, api.IPFSID{}) - if pi.Cid == cid.Undef { + if !pi.Cid.Defined() { t.Fatal("c untrack operation should be tracked") } diff --git a/rpc_api.go b/rpc_api.go index 5efbc4c1..581dfc09 100644 --- a/rpc_api.go +++ b/rpc_api.go @@ -8,7 +8,6 @@ import ( "github.com/ipfs/ipfs-cluster/state" "github.com/ipfs/ipfs-cluster/version" - cid "github.com/ipfs/go-cid" peer "github.com/libp2p/go-libp2p-core/peer" rpc "github.com/libp2p/go-libp2p-gorpc" @@ -221,7 +220,7 @@ func (rpcapi *ClusterRPCAPI) Pins(ctx context.Context, in <-chan struct{}, out c } // PinGet runs Cluster.PinGet(). -func (rpcapi *ClusterRPCAPI) PinGet(ctx context.Context, in cid.Cid, out *api.Pin) error { +func (rpcapi *ClusterRPCAPI) PinGet(ctx context.Context, in api.Cid, out *api.Pin) error { pin, err := rpcapi.c.PinGet(ctx, in) if err != nil { return err @@ -294,7 +293,7 @@ func (rpcapi *ClusterRPCAPI) StatusAllLocal(ctx context.Context, in <-chan api.T } // Status runs Cluster.Status(). -func (rpcapi *ClusterRPCAPI) Status(ctx context.Context, in cid.Cid, out *api.GlobalPinInfo) error { +func (rpcapi *ClusterRPCAPI) Status(ctx context.Context, in api.Cid, out *api.GlobalPinInfo) error { pinfo, err := rpcapi.c.Status(ctx, in) if err != nil { return err @@ -304,7 +303,7 @@ func (rpcapi *ClusterRPCAPI) Status(ctx context.Context, in cid.Cid, out *api.Gl } // StatusLocal runs Cluster.StatusLocal(). -func (rpcapi *ClusterRPCAPI) StatusLocal(ctx context.Context, in cid.Cid, out *api.PinInfo) error { +func (rpcapi *ClusterRPCAPI) StatusLocal(ctx context.Context, in api.Cid, out *api.PinInfo) error { pinfo := rpcapi.c.StatusLocal(ctx, in) *out = pinfo return nil @@ -321,7 +320,7 @@ func (rpcapi *ClusterRPCAPI) RecoverAllLocal(ctx context.Context, in <-chan stru } // Recover runs Cluster.Recover(). -func (rpcapi *ClusterRPCAPI) Recover(ctx context.Context, in cid.Cid, out *api.GlobalPinInfo) error { +func (rpcapi *ClusterRPCAPI) Recover(ctx context.Context, in api.Cid, out *api.GlobalPinInfo) error { pinfo, err := rpcapi.c.Recover(ctx, in) if err != nil { return err @@ -331,7 +330,7 @@ func (rpcapi *ClusterRPCAPI) Recover(ctx context.Context, in cid.Cid, out *api.G } // RecoverLocal runs Cluster.RecoverLocal(). -func (rpcapi *ClusterRPCAPI) RecoverLocal(ctx context.Context, in cid.Cid, out *api.PinInfo) error { +func (rpcapi *ClusterRPCAPI) RecoverLocal(ctx context.Context, in api.Cid, out *api.PinInfo) error { pinfo, err := rpcapi.c.RecoverLocal(ctx, in) if err != nil { return err @@ -411,7 +410,7 @@ func (rpcapi *ClusterRPCAPI) RepoGCLocal(ctx context.Context, in struct{}, out * return nil } -// SendInformerMetric runs Cluster.sendInformerMetric(). +// SendInformerMetrics runs Cluster.sendInformerMetric(). func (rpcapi *ClusterRPCAPI) SendInformerMetrics(ctx context.Context, in struct{}, out *struct{}) error { return rpcapi.c.sendInformersMetrics(ctx) } @@ -475,7 +474,7 @@ func (rpcapi *PinTrackerRPCAPI) StatusAll(ctx context.Context, in <-chan api.Tra } // Status runs PinTracker.Status(). -func (rpcapi *PinTrackerRPCAPI) Status(ctx context.Context, in cid.Cid, out *api.PinInfo) error { +func (rpcapi *PinTrackerRPCAPI) Status(ctx context.Context, in api.Cid, out *api.PinInfo) error { ctx, span := trace.StartSpan(ctx, "rpc/tracker/Status") defer span.End() pinfo := rpcapi.tracker.Status(ctx, in) @@ -491,7 +490,7 @@ func (rpcapi *PinTrackerRPCAPI) RecoverAll(ctx context.Context, in <-chan struct } // Recover runs PinTracker.Recover(). -func (rpcapi *PinTrackerRPCAPI) Recover(ctx context.Context, in cid.Cid, out *api.PinInfo) error { +func (rpcapi *PinTrackerRPCAPI) Recover(ctx context.Context, in api.Cid, out *api.PinInfo) error { ctx, span := trace.StartSpan(ctx, "rpc/tracker/Recover") defer span.End() pinfo, err := rpcapi.tracker.Recover(ctx, in) @@ -577,7 +576,7 @@ func (rpcapi *IPFSConnectorRPCAPI) BlockStream(ctx context.Context, in <-chan ap } // BlockGet runs IPFSConnector.BlockGet(). -func (rpcapi *IPFSConnectorRPCAPI) BlockGet(ctx context.Context, in cid.Cid, out *[]byte) error { +func (rpcapi *IPFSConnectorRPCAPI) BlockGet(ctx context.Context, in api.Cid, out *[]byte) error { res, err := rpcapi.ipfs.BlockGet(ctx, in) if err != nil { return err @@ -587,7 +586,7 @@ func (rpcapi *IPFSConnectorRPCAPI) BlockGet(ctx context.Context, in cid.Cid, out } // Resolve runs IPFSConnector.Resolve(). -func (rpcapi *IPFSConnectorRPCAPI) Resolve(ctx context.Context, in string, out *cid.Cid) error { +func (rpcapi *IPFSConnectorRPCAPI) Resolve(ctx context.Context, in string, out *api.Cid) error { c, err := rpcapi.ipfs.Resolve(ctx, in) if err != nil { return err diff --git a/sharness/t0030-ctl-pin.sh b/sharness/t0030-ctl-pin.sh index e6da9901..fafb4456 100755 --- a/sharness/t0030-ctl-pin.sh +++ b/sharness/t0030-ctl-pin.sh @@ -15,7 +15,7 @@ test_expect_success IPFS,CLUSTER "pin data to cluster with ctl" ' ' test_expect_success IPFS,CLUSTER "unpin data from cluster with ctl" ' - cid=`ipfs-cluster-ctl --enc=json pin ls | jq -r ".cid | .[\"/\"]" | head -1` + cid=`ipfs-cluster-ctl --enc=json pin ls | jq -r ".cid" | head -1` ipfs-cluster-ctl pin rm "$cid" && !(ipfs-cluster-ctl pin ls "$cid" | grep -q "$cid") && ipfs-cluster-ctl status "$cid" | grep -q -i "UNPINNED" @@ -29,7 +29,7 @@ test_expect_success IPFS,CLUSTER "wait for data to pin to cluster with ctl" ' ' test_expect_success IPFS,CLUSTER "wait for data to unpin from cluster with ctl" ' - cid=`ipfs-cluster-ctl --enc=json pin ls | jq -r ".cid | .[\"/\"]" | head -1` + cid=`ipfs-cluster-ctl --enc=json pin ls | jq -r ".cid" | head -1` ipfs-cluster-ctl pin rm --wait "$cid" | grep -q -i "UNPINNED" && !(ipfs-cluster-ctl pin ls "$cid" | grep -q "$cid") && ipfs-cluster-ctl status "$cid" | grep -q -i "UNPINNED" @@ -43,7 +43,7 @@ test_expect_success IPFS,CLUSTER "wait for data to pin to cluster with ctl with ' test_expect_success IPFS,CLUSTER "wait for data to unpin from cluster with ctl with timeout" ' - cid=`ipfs-cluster-ctl --enc=json pin ls | jq -r ".cid | .[\"/\"]" | head -1` + cid=`ipfs-cluster-ctl --enc=json pin ls | jq -r ".cid" | head -1` ipfs-cluster-ctl pin rm --wait --wait-timeout 2s "$cid" | grep -q -i "UNPINNED" && !(ipfs-cluster-ctl pin ls "$cid" | grep -q "$cid") && ipfs-cluster-ctl status "$cid" | grep -q -i "UNPINNED" diff --git a/sharness/t0052-service-state-export.sh b/sharness/t0052-service-state-export.sh index d0dcdccc..60700376 100755 --- a/sharness/t0052-service-state-export.sh +++ b/sharness/t0052-service-state-export.sh @@ -14,7 +14,7 @@ test_expect_success IPFS,CLUSTER,JQ "state export saves the correct state to exp cluster_kill && sleep 5 && ipfs-cluster-service --debug --config "test-config" state export -f export.json && [ -f export.json ] && - jq -r ".cid | .[\"/\"]" export.json | grep -q "$cid" + jq -r ".cid" export.json | grep -q "$cid" ' cluster_kill @@ -28,7 +28,7 @@ test_expect_success IPFS,CLUSTER,JQ "state export saves the correct state to exp cluster_kill && sleep 5 && ipfs-cluster-service --debug --config "test-config" state export -f export.json && [ -f export.json ] && - jq -r ".cid | .[\"/\"]" export.json | grep -q "$cid" + jq -r ".cid" export.json | grep -q "$cid" ' test_clean_ipfs diff --git a/state/dsstate/datastore.go b/state/dsstate/datastore.go index 82e340f8..8cda7e1a 100644 --- a/state/dsstate/datastore.go +++ b/state/dsstate/datastore.go @@ -10,7 +10,6 @@ import ( "github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/state" - cid "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" query "github.com/ipfs/go-datastore/query" dshelp "github.com/ipfs/go-ipfs-ds-help" @@ -79,7 +78,7 @@ func (st *State) Add(ctx context.Context, c api.Pin) error { // Rm removes an existing Pin. It is a no-op when the // item does not exist. -func (st *State) Rm(ctx context.Context, c cid.Cid) error { +func (st *State) Rm(ctx context.Context, c api.Cid) error { _, span := trace.StartSpan(ctx, "state/dsstate/Rm") defer span.End() @@ -93,7 +92,7 @@ func (st *State) Rm(ctx context.Context, c cid.Cid) error { // Get returns a Pin from the store and whether it // was present. When not present, a default pin // is returned. -func (st *State) Get(ctx context.Context, c cid.Cid) (api.Pin, error) { +func (st *State) Get(ctx context.Context, c api.Cid) (api.Pin, error) { _, span := trace.StartSpan(ctx, "state/dsstate/Get") defer span.End() @@ -112,7 +111,7 @@ func (st *State) Get(ctx context.Context, c cid.Cid) (api.Pin, error) { } // Has returns whether a Cid is stored. -func (st *State) Has(ctx context.Context, c cid.Cid) (bool, error) { +func (st *State) Has(ctx context.Context, c api.Cid) (bool, error) { _, span := trace.StartSpan(ctx, "state/dsstate/Has") defer span.End() @@ -254,27 +253,28 @@ func (st *State) Unmarshal(r io.Reader) error { } // used to be on go-ipfs-ds-help -func cidToDsKey(c cid.Cid) ds.Key { +func cidToDsKey(c api.Cid) ds.Key { return dshelp.NewKeyFromBinary(c.Bytes()) } // used to be on go-ipfs-ds-help -func dsKeyToCid(k ds.Key) (cid.Cid, error) { +func dsKeyToCid(k ds.Key) (api.Cid, error) { kb, err := dshelp.BinaryFromDsKey(k) if err != nil { - return cid.Undef, err + return api.CidUndef, err } - return cid.Cast(kb) + c, err := api.CastCid(kb) + return c, err } // convert Cid to /namespace/cid1Key -func (st *State) key(c cid.Cid) ds.Key { +func (st *State) key(c api.Cid) ds.Key { k := cidToDsKey(c) return st.namespace.Child(k) } // convert /namespace/cidKey to Cid -func (st *State) unkey(k ds.Key) (cid.Cid, error) { +func (st *State) unkey(k ds.Key) (api.Cid, error) { return dsKeyToCid(ds.NewKey(k.BaseNamespace())) } @@ -286,7 +286,7 @@ func (st *State) serializePin(c api.Pin) ([]byte, error) { // this deserializes a Pin object from the datastore. It should be // the exact opposite from serializePin. -func (st *State) deserializePin(c cid.Cid, buf []byte) (api.Pin, error) { +func (st *State) deserializePin(c api.Cid, buf []byte) (api.Pin, error) { p := api.Pin{} err := p.ProtoUnmarshal(buf) p.Cid = c diff --git a/state/dsstate/datastore_test.go b/state/dsstate/datastore_test.go index 6ece8c54..324e3699 100644 --- a/state/dsstate/datastore_test.go +++ b/state/dsstate/datastore_test.go @@ -9,11 +9,10 @@ import ( "github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/datastore/inmem" - cid "github.com/ipfs/go-cid" peer "github.com/libp2p/go-libp2p-core/peer" ) -var testCid1, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmq") +var testCid1, _ = api.DecodeCid("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmq") var testPeerID1, _ = peer.Decode("QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLewuabc") var c = api.Pin{ diff --git a/state/empty.go b/state/empty.go index 64523a5b..f4b3ee7a 100644 --- a/state/empty.go +++ b/state/empty.go @@ -4,8 +4,6 @@ import ( "context" "github.com/ipfs/ipfs-cluster/api" - - cid "github.com/ipfs/go-cid" ) type empty struct{} @@ -15,11 +13,11 @@ func (e *empty) List(ctx context.Context, out chan<- api.Pin) error { return nil } -func (e *empty) Has(ctx context.Context, c cid.Cid) (bool, error) { +func (e *empty) Has(ctx context.Context, c api.Cid) (bool, error) { return false, nil } -func (e *empty) Get(ctx context.Context, c cid.Cid) (api.Pin, error) { +func (e *empty) Get(ctx context.Context, c api.Cid) (api.Pin, error) { return api.Pin{}, ErrNotFound } diff --git a/state/interface.go b/state/interface.go index 4de202eb..6fc6b489 100644 --- a/state/interface.go +++ b/state/interface.go @@ -9,8 +9,6 @@ import ( "io" "github.com/ipfs/ipfs-cluster/api" - - cid "github.com/ipfs/go-cid" ) // ErrNotFound should be returned when a pin is not part of the state. @@ -36,10 +34,10 @@ type ReadOnly interface { // List lists all the pins in the state. List(context.Context, chan<- api.Pin) error // Has returns true if the state is holding information for a Cid. - Has(context.Context, cid.Cid) (bool, error) + Has(context.Context, api.Cid) (bool, error) // Get returns the information attacthed to this pin, if any. If the // pin is not part of the state, it should return ErrNotFound. - Get(context.Context, cid.Cid) (api.Pin, error) + Get(context.Context, api.Cid) (api.Pin, error) } // WriteOnly represents the write side of a State. @@ -47,7 +45,7 @@ type WriteOnly interface { // Add adds a pin to the State Add(context.Context, api.Pin) error // Rm removes a pin from the State. - Rm(context.Context, cid.Cid) error + Rm(context.Context, api.Cid) error } // BatchingState represents a state which batches write operations. diff --git a/test/cids.go b/test/cids.go index 8a2f1e30..95e96215 100644 --- a/test/cids.go +++ b/test/cids.go @@ -1,30 +1,30 @@ package test import ( - cid "github.com/ipfs/go-cid" + "github.com/ipfs/ipfs-cluster/api" peer "github.com/libp2p/go-libp2p-core/peer" ) // Common variables used all around tests. var ( - Cid1, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmq") - Cid2, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmma") - Cid3, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmb") + Cid1, _ = api.DecodeCid("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmq") + Cid2, _ = api.DecodeCid("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmma") + Cid3, _ = api.DecodeCid("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmb") Cid4Data = "Cid4Data" // Cid resulting from block put using blake2b-256 and raw format - Cid4, _ = cid.Decode("bafk2bzaceawsyhsnrwwy5mtit2emnjfalkxsyq2p2ptd6fuliolzwwjbs42fq") + Cid4, _ = api.DecodeCid("bafk2bzaceawsyhsnrwwy5mtit2emnjfalkxsyq2p2ptd6fuliolzwwjbs42fq") // Cid resulting from block put using format "v0" defaults - Cid5, _ = cid.Decode("QmbgmXgsFjxAJ7cEaziL2NDSptHAkPwkEGMmKMpfyYeFXL") + Cid5, _ = api.DecodeCid("QmbgmXgsFjxAJ7cEaziL2NDSptHAkPwkEGMmKMpfyYeFXL") Cid5Data = "Cid5Data" - SlowCid1, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmd") - CidResolved, _ = cid.Decode("zb2rhiKhUepkTMw7oFfBUnChAN7ABAvg2hXUwmTBtZ6yxuabc") + SlowCid1, _ = api.DecodeCid("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmd") + CidResolved, _ = api.DecodeCid("zb2rhiKhUepkTMw7oFfBUnChAN7ABAvg2hXUwmTBtZ6yxuabc") // ErrorCid is meant to be used as a Cid which causes errors. i.e. the // ipfs mock fails when pinning this CID. - ErrorCid, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmc") + ErrorCid, _ = api.DecodeCid("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmc") // NotFoundCid is meant to be used as a CID that doesn't exist in the // pinset. - NotFoundCid, _ = cid.Decode("bafyreiay3jpjk74dkckv2r74eyvf3lfnxujefay2rtuluintasq2zlapv4") + NotFoundCid, _ = api.DecodeCid("bafyreiay3jpjk74dkckv2r74eyvf3lfnxujefay2rtuluintasq2zlapv4") PeerID1, _ = peer.Decode("QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLewuabc") PeerID2, _ = peer.Decode("QmUZ13osndQ5uL4tPWHXe3iBgBgq9gfewcBMSCAuMBsDJ6") PeerID3, _ = peer.Decode("QmPGDFvBkgWhvzEK9qaTWrWurSwqXNmhnK3hgELPdZZNPa") diff --git a/test/ipfs_mock.go b/test/ipfs_mock.go index f4fa00fc..d02ba3c8 100644 --- a/test/ipfs_mock.go +++ b/test/ipfs_mock.go @@ -192,7 +192,7 @@ func (m *IpfsMock) handler(w http.ResponseWriter, r *http.Request) { if arg == ErrorCid.String() { goto ERROR } - c, err := cid.Decode(arg) + c, err := api.DecodeCid(arg) if err != nil { goto ERROR } @@ -222,7 +222,7 @@ func (m *IpfsMock) handler(w http.ResponseWriter, r *http.Request) { if !ok { goto ERROR } - c, err := cid.Decode(arg) + c, err := api.DecodeCid(arg) if err != nil { goto ERROR } @@ -239,11 +239,11 @@ func (m *IpfsMock) handler(w http.ResponseWriter, r *http.Request) { } fromStr := args[0] toStr := args[1] - from, err := cid.Decode(fromStr) + from, err := api.DecodeCid(fromStr) if err != nil { goto ERROR } - to, err := cid.Decode(toStr) + to, err := api.DecodeCid(toStr) if err != nil { goto ERROR } @@ -301,7 +301,7 @@ func (m *IpfsMock) handler(w http.ResponseWriter, r *http.Request) { } cidStr := arg - c, err := cid.Decode(cidStr) + c, err := api.DecodeCid(cidStr) if err != nil { goto ERROR } @@ -443,16 +443,16 @@ func (m *IpfsMock) handler(w http.ResponseWriter, r *http.Request) { enc := json.NewEncoder(w) resp := []mockRepoGCResp{ { - Key: Cid1, + Key: Cid1.Cid, }, { - Key: Cid2, + Key: Cid2.Cid, }, { - Key: Cid3, + Key: Cid3.Cid, }, { - Key: Cid4, + Key: Cid4.Cid, }, { Error: "no link by that name", diff --git a/test/rpc_api_mock.go b/test/rpc_api_mock.go index 894be139..d24fed91 100644 --- a/test/rpc_api_mock.go +++ b/test/rpc_api_mock.go @@ -10,7 +10,6 @@ import ( "github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/state" - cid "github.com/ipfs/go-cid" gopath "github.com/ipfs/go-path" host "github.com/libp2p/go-libp2p-core/host" peer "github.com/libp2p/go-libp2p-core/peer" @@ -105,10 +104,11 @@ func (mock *mockCluster) PinPath(ctx context.Context, in api.PinPath, out *api.P if err != nil { return err } - if c.Equals(ErrorCid) { + cc := api.NewCid(c) + if cc.Equals(ErrorCid) { return ErrBadCid } - pin = api.PinWithOpts(c, in.PinOptions) + pin = api.PinWithOpts(cc, in.PinOptions) } else { pin = api.PinWithOpts(CidResolved, in.PinOptions) } @@ -139,7 +139,7 @@ func (mock *mockCluster) Pins(ctx context.Context, in <-chan struct{}, out chan< return nil } -func (mock *mockCluster) PinGet(ctx context.Context, in cid.Cid, out *api.Pin) error { +func (mock *mockCluster) PinGet(ctx context.Context, in api.Cid, out *api.Pin) error { switch in.String() { case ErrorCid.String(): return errors.New("this is an expected error when using ErrorCid") @@ -307,7 +307,7 @@ func (mock *mockCluster) StatusAllLocal(ctx context.Context, in <-chan api.Track return (&mockPinTracker{}).StatusAll(ctx, in, out) } -func (mock *mockCluster) Status(ctx context.Context, in cid.Cid, out *api.GlobalPinInfo) error { +func (mock *mockCluster) Status(ctx context.Context, in api.Cid, out *api.GlobalPinInfo) error { if in.Equals(ErrorCid) { return ErrBadCid } @@ -335,7 +335,7 @@ func (mock *mockCluster) Status(ctx context.Context, in cid.Cid, out *api.Global return nil } -func (mock *mockCluster) StatusLocal(ctx context.Context, in cid.Cid, out *api.PinInfo) error { +func (mock *mockCluster) StatusLocal(ctx context.Context, in api.Cid, out *api.PinInfo) error { return (&mockPinTracker{}).Status(ctx, in, out) } @@ -350,11 +350,11 @@ func (mock *mockCluster) RecoverAllLocal(ctx context.Context, in <-chan struct{} return (&mockPinTracker{}).RecoverAll(ctx, in, out) } -func (mock *mockCluster) Recover(ctx context.Context, in cid.Cid, out *api.GlobalPinInfo) error { +func (mock *mockCluster) Recover(ctx context.Context, in api.Cid, out *api.GlobalPinInfo) error { return mock.Status(ctx, in, out) } -func (mock *mockCluster) RecoverLocal(ctx context.Context, in cid.Cid, out *api.PinInfo) error { +func (mock *mockCluster) RecoverLocal(ctx context.Context, in api.Cid, out *api.PinInfo) error { return (&mockPinTracker{}).Recover(ctx, in, out) } @@ -469,7 +469,7 @@ func (mock *mockPinTracker) StatusAll(ctx context.Context, in <-chan api.Tracker return nil } -func (mock *mockPinTracker) Status(ctx context.Context, in cid.Cid, out *api.PinInfo) error { +func (mock *mockPinTracker) Status(ctx context.Context, in api.Cid, out *api.PinInfo) error { if in.Equals(ErrorCid) { return ErrBadCid } @@ -490,7 +490,7 @@ func (mock *mockPinTracker) RecoverAll(ctx context.Context, in <-chan struct{}, return nil } -func (mock *mockPinTracker) Recover(ctx context.Context, in cid.Cid, out *api.PinInfo) error { +func (mock *mockPinTracker) Recover(ctx context.Context, in api.Cid, out *api.PinInfo) error { *out = api.PinInfo{ Cid: in, Peer: PeerID1, @@ -589,7 +589,7 @@ func (mock *mockIPFSConnector) BlockStream(ctx context.Context, in <-chan api.No return nil } -func (mock *mockIPFSConnector) Resolve(ctx context.Context, in string, out *cid.Cid) error { +func (mock *mockIPFSConnector) Resolve(ctx context.Context, in string, out *api.Cid) error { switch in { case ErrorCid.String(), "/ipfs/" + ErrorCid.String(): *out = ErrorCid diff --git a/test/sharding.go b/test/sharding.go index fbf8ec3b..9082ef62 100644 --- a/test/sharding.go +++ b/test/sharding.go @@ -12,6 +12,7 @@ import ( files "github.com/ipfs/go-ipfs-files" format "github.com/ipfs/go-ipld-format" + "github.com/ipfs/ipfs-cluster/api" cid "github.com/ipfs/go-cid" ) @@ -62,7 +63,7 @@ var ( } // Used for testing blockput/blockget - ShardCid, _ = cid.Decode("zdpuAoiNm1ntWx6jpgcReTiCWFHJSTpvTw4bAAn9p6yDnznqh") + ShardCid, _ = api.DecodeCid("zdpuAoiNm1ntWx6jpgcReTiCWFHJSTpvTw4bAAn9p6yDnznqh") ShardData, _ = hex.DecodeString("a16130d82a58230012209273fd63ec94bed5abb219b2d9cb010cabe4af7b0177292d4335eff50464060a") ) diff --git a/util.go b/util.go index ba45ea52..fad96375 100644 --- a/util.go +++ b/util.go @@ -9,7 +9,6 @@ import ( blake2b "golang.org/x/crypto/blake2b" - cid "github.com/ipfs/go-cid" "github.com/ipfs/ipfs-cluster/api" peer "github.com/libp2p/go-libp2p-core/peer" ma "github.com/multiformats/go-multiaddr" @@ -100,7 +99,7 @@ type distanceChecker struct { cache map[peer.ID]distance } -func (dc distanceChecker) isClosest(ci cid.Cid) bool { +func (dc distanceChecker) isClosest(ci api.Cid) bool { ciHash := convertKey(ci.KeyString()) localPeerHash := dc.convertPeerID(dc.local) myDistance := xor(ciHash, localPeerHash) diff --git a/version/version.go b/version/version.go index b5a50dfa..5cc7015a 100644 --- a/version/version.go +++ b/version/version.go @@ -1,3 +1,4 @@ +// Package version stores version information for IPFS Cluster. package version import (