Merge branch 'master' into feat/alerts

This commit is contained in:
Kishan Mohanbhai Sagathiya 2019-12-23 12:45:22 +05:30
commit 68abae9287
40 changed files with 1602 additions and 333 deletions

1
.gitignore vendored
View File

@ -2,6 +2,7 @@ tag_annotation
coverage.out
cmd/ipfs-cluster-service/ipfs-cluster-service
cmd/ipfs-cluster-ctl/ipfs-cluster-ctl
cmd/ipfs-cluster-follow/ipfs-cluster-follow
sharness/lib/sharness
sharness/test-results
sharness/trash*

View File

@ -30,12 +30,13 @@ jobs:
- name: "Main Tests with raft consensus"
script:
- travis_wait go test -v -timeout 15m -failfast -consensus raft .
- name: "Golint and go vet"
- name: "Golint, go vet, binary builds"
script:
- go get -u golang.org/x/lint/golint
- make check
- make service
- make ctl
- make follow
- name: "Docker and Compose build"
script:
- make docker

View File

@ -1,5 +1,105 @@
# IPFS Cluster Changelog
### v0.12.0 - 2019-12-20
IPFS Cluster v0.12.0 brings many useful features and makes it very easy to
create and participate on collaborative clusters.
The new `ipfs-cluster-follow` command provides a very simple way of joining
one or several clusters as a follower (a peer without permissions to pin/unpin
anything). `ipfs-cluster-follow` peers are initialize using a configuration
"template" distributed over IPFS or HTTP, which is then optimized and secured.
`ipfs-cluster-follow` is limited in scope and attempts to be very
straightforward to use. `ipfs-cluster-service` continues to offer power users
the full set of options to running peers of all kinds (followers or not).
We have additionally added many new features: pin with an expiration date, the
ability to trigger garbage collection on IPFS daemons, improvements on
NAT-traversal and connectivity etc.
Users planning to setup public collaborative clusters should upgrade to this
release, which improves the user experience and comes with documentation on
how to setup and join these clusters
(https://cluster.ipfs.io/documentation/collaborative).
#### List of changes
##### Features
* cluster: `--local` flag for add: adds only to the local peer instead of multiple destinations | [ipfs/ipfs-cluster#848](https://github.com/ipfs/ipfs-cluster/issues/848) | [ipfs/ipfs-cluster#907](https://github.com/ipfs/ipfs-cluster/issues/907)
* cluster: `RecoverAll` operation can trigger recover operation in all peers.
* ipfsproxy: log HTTP requests | [ipfs/ipfs-cluster#574](https://github.com/ipfs/ipfs-cluster/issues/574) | [ipfs/ipfs-cluster#915](https://github.com/ipfs/ipfs-cluster/issues/915)
* api: `health/metrics` returns list of available metrics | [ipfs/ipfs-cluster#374](https://github.com/ipfs/ipfs-cluster/issues/374) | [ipfs/ipfs-cluster#924](https://github.com/ipfs/ipfs-cluster/issues/924)
* service: `init --randomports` sets random, unused ports on initialization | [ipfs/ipfs-cluster#794](https://github.com/ipfs/ipfs-cluster/issues/794) | [ipfs/ipfs-cluster#926](https://github.com/ipfs/ipfs-cluster/issues/926)
* cluster: support pin expiration | [ipfs/ipfs-cluster#481](https://github.com/ipfs/ipfs-cluster/issues/481) | [ipfs/ipfs-cluster#923](https://github.com/ipfs/ipfs-cluster/issues/923)
* cluster: quic, autorelay, autonat, TLS handshake support | [ipfs/ipfs-cluster#614](https://github.com/ipfs/ipfs-cluster/issues/614) | [ipfs/ipfs-cluster#932](https://github.com/ipfs/ipfs-cluster/issues/932) | [ipfs/ipfs-cluster#973](https://github.com/ipfs/ipfs-cluster/issues/973) | [ipfs/ipfs-cluster#975](https://github.com/ipfs/ipfs-cluster/issues/975)
* cluster: `health/graph` improvements | [ipfs/ipfs-cluster#800](https://github.com/ipfs/ipfs-cluster/issues/800) | [ipfs/ipfs-cluster#925](https://github.com/ipfs/ipfs-cluster/issues/925) | [ipfs/ipfs-cluster#954](https://github.com/ipfs/ipfs-cluster/issues/954)
* cluster: `ipfs-cluster-ctl ipfs gc` triggers GC on cluster peers | [ipfs/ipfs-cluster#628](https://github.com/ipfs/ipfs-cluster/issues/628) | [ipfs/ipfs-cluster#777](https://github.com/ipfs/ipfs-cluster/issues/777) | [ipfs/ipfs-cluster#739](https://github.com/ipfs/ipfs-cluster/issues/739) | [ipfs/ipfs-cluster#945](https://github.com/ipfs/ipfs-cluster/issues/945) | [ipfs/ipfs-cluster#961](https://github.com/ipfs/ipfs-cluster/issues/961)
* cluster: advertise external addresses as soon as known | [ipfs/ipfs-cluster#949](https://github.com/ipfs/ipfs-cluster/issues/949) | [ipfs/ipfs-cluster#950](https://github.com/ipfs/ipfs-cluster/issues/950)
* cluster: skip contacting remote-allocations (peers) for recover/status operations | [ipfs/ipfs-cluster#935](https://github.com/ipfs/ipfs-cluster/issues/935) | [ipfs/ipfs-cluster#947](https://github.com/ipfs/ipfs-cluster/issues/947)
* restapi: support listening on a unix socket | [ipfs/ipfs-cluster#969](https://github.com/ipfs/ipfs-cluster/issues/969)
* config: support `peer_addresses` | [ipfs/ipfs-cluster#791](https://github.com/ipfs/ipfs-cluster/issues/791)
* pintracker: remove `mappintracker`. Upgrade `stateless` for prime-time | [ipfs/ipfs-cluster#944](https://github.com/ipfs/ipfs-cluster/issues/944) | [ipfs/ipfs-cluster#929](https://github.com/ipfs/ipfs-cluster/issues/929)
* service: `--loglevel` supports specifying levels for multiple components | [ipfs/ipfs-cluster#938](https://github.com/ipfs/ipfs-cluster/issues/938) | [ipfs/ipfs-cluster#960](https://github.com/ipfs/ipfs-cluster/issues/960)
* ipfs-cluster-follow: a new CLI tool to run follower cluster peers | [ipfs/ipfs-cluster#976](https://github.com/ipfs/ipfs-cluster/issues/976)
##### Bug fixes
* restapi/client: Fix out of bounds error on load balanced client | [ipfs/ipfs-cluster#951](https://github.com/ipfs/ipfs-cluster/issues/951)
* service: disable libp2p restapi on CRDT clusters | [ipfs/ipfs-cluster#968](https://github.com/ipfs/ipfs-cluster/issues/968)
* observations: Fix pprof index links | [ipfs/ipfs-cluster#965](https://github.com/ipfs/ipfs-cluster/issues/965)
##### Other changes
* Spelling fix in changelog | [ipfs/ipfs-cluster#920](https://github.com/ipfs/ipfs-cluster/issues/920)
* Tests: multiple fixes | [ipfs/ipfs-cluster#919](https://github.com/ipfs/ipfs-cluster/issues/919) | [ipfs/ipfs-cluster#943](https://github.com/ipfs/ipfs-cluster/issues/943) | [ipfs/ipfs-cluster#953](https://github.com/ipfs/ipfs-cluster/issues/953) | [ipfs/ipfs-cluster#956](https://github.com/ipfs/ipfs-cluster/issues/956)
* Stateless tracker: increase default queue size | [ipfs/ipfs-cluster#377](https://github.com/ipfs/ipfs-cluster/issues/377) | [ipfs/ipfs-cluster#917](https://github.com/ipfs/ipfs-cluster/issues/917)
* Upgrade to Go1.13 | [ipfs/ipfs-cluster#934](https://github.com/ipfs/ipfs-cluster/issues/934)
* Dockerfiles: improvements | [ipfs/ipfs-cluster#946](https://github.com/ipfs/ipfs-cluster/issues/946)
* cluster: support multiple informers on initialization | [ipfs/ipfs-cluster#940](https://github.com/ipfs/ipfs-cluster/issues/940) | 962
* cmdutils: move some methods to cmdutils | [ipfs/ipfs-cluster#970](https://github.com/ipfs/ipfs-cluster/issues/970)
#### Upgrading notices
##### Configuration changes
* `cluster` section:
* A new `peer_addresses` key allows specifying additional peer addresses in the configuration (similar to the `peerstore` file). These are treated as libp2p bootstrap addreses (do not mix with Raft bootstrap process). This setting is mostly useful for CRDT collaborative clusters, as template configurations can be distributed including bootstrap peers (usually the same as trusted peers). The values are the full multiaddress of these peers: `/ip4/x.x.x.x/tcp/1234/p2p/Qmxxx...`.
* `listen_multiaddress` can now be set to be an array providing multiple listen multiaddresses, the new defaults being `/tcp/9096` and `/udp/9096/quic`.
* `enable_relay_hop` (true by default), lets the cluster peer act as a relay for other cluster peers behind NATs. This is only for the Cluster network. As a reminder, while this setting is problematic on IPFS (due to the amount of traffic the HOP peers start relaying), the cluster-peers networks are smaller and do not move huge amounts of content around.
* The `ipfs_sync_interval` option dissappears as the stateless tracker does not keep a state that can lose synchronization with IPFS.
* `ipfshttp` section:
* A new `repogc_timeout` key specifies the timeout for garbage collection operations on IPFS. It is set to 24h by default.
##### REST API
The `pin/add` and `add` endpoints support two new query parameters to indicate pin expirations: `expire-at` (with an expected value in RFC3339 format) and `expire-in` (with an expected value in Go's time format, i.e. `12h`). `expire-at` has preference.
A new `/ipfs/gc` endpoint has been added to trigger GC in the IPFS daemons attached to Cluster peers. It supports the `local` parameter to limit the operation to the local peer.
##### Go APIs
There are few changes to Go APIs. The `RepoGC` and `RepoGCLocal` methods have been added, the `mappintracker` module has been removed and the `stateless` module has changed the signature of the constructor.
##### Other
The IPFS Proxy now intercepts the `/repo/gc` endpoint and triggers a cluster-wide GC operation.
The `ipfs-cluster-follow` application is an easy to use way to run one or several cluster peers in follower mode using remote configuration templates. It is fully independent from `ipfs-cluster-service` and `ipfs-cluster-ctl` and acts as both a peer (`run` subcommand) and a client (`list` subcommand). The purpose is to facilitate IPFS Cluster usage without having to deal with the configuration and flags etc.
That said, the configuration layout and folder is the same for both `ipfs-cluster-service` and `ipfs-cluster-follow` and they can be run one in place of the other. In the same way, remote-source configurations usually used for `ipfs-cluster-follow` can be replaced with local ones usually used by `ipfs-cluster-service`.
The removal of the `map pintracker` has resulted in a simplification of some operations. `StateSync` (regularly run every `state_sync_interval`) does not trigger repinnings now, but only checks for pin expirations. `RecoverAllLocal` (reguarly run every `pin_recover_interval`) will now trigger repinnings when necessary (i.e. when things that were expected to be on IPFS are not). On very large pinsets, this operation can trigger a memory spike as the full recursive pinset from IPFS is requested and loaded on memory (before this happened on `StateSync`).
---
### v0.11.0 - 2019-09-13
#### Summary

View File

@ -46,6 +46,7 @@ EXPOSE 9096
COPY --from=builder $GOPATH/bin/ipfs-cluster-service /usr/local/bin/ipfs-cluster-service
COPY --from=builder $GOPATH/bin/ipfs-cluster-ctl /usr/local/bin/ipfs-cluster-ctl
COPY --from=builder $GOPATH/bin/ipfs-cluster-follow /usr/local/bin/ipfs-cluster-follow
COPY --from=builder $SRC_PATH/docker/entrypoint.sh /usr/local/bin/entrypoint.sh
COPY --from=builder /tmp/su-exec/su-exec /sbin/su-exec
COPY --from=builder /tmp/tini /sbin/tini

View File

@ -35,6 +35,7 @@ EXPOSE 9096
COPY --from=builder $GOPATH/bin/ipfs-cluster-service /usr/local/bin/ipfs-cluster-service
COPY --from=builder $GOPATH/bin/ipfs-cluster-ctl /usr/local/bin/ipfs-cluster-ctl
COPY --from=builder $GOPATH/bin/ipfs-cluster-follow /usr/local/bin/ipfs-cluster-follow
COPY --from=builder $SRC_PATH/docker/start-daemons.sh /usr/local/bin/start-daemons.sh
RUN mkdir -p $IPFS_CLUSTER_PATH && \

View File

@ -39,6 +39,7 @@ EXPOSE 9096
COPY --from=builder $GOPATH/bin/ipfs-cluster-service /usr/local/bin/ipfs-cluster-service
COPY --from=builder $GOPATH/bin/ipfs-cluster-ctl /usr/local/bin/ipfs-cluster-ctl
COPY --from=builder $GOPATH/bin/ipfs-cluster-follow /usr/local/bin/ipfs-cluster-follow
COPY --from=builder $SRC_PATH/docker/test-entrypoint.sh /usr/local/bin/test-entrypoint.sh
COPY --from=builder $SRC_PATH/docker/random-stopper.sh /usr/local/bin/random-stopper.sh
COPY --from=builder $SRC_PATH/docker/random-killer.sh /usr/local/bin/random-killer.sh

View File

@ -6,21 +6,26 @@ all: build
clean: rwundo clean_sharness
$(MAKE) -C cmd/ipfs-cluster-service clean
$(MAKE) -C cmd/ipfs-cluster-ctl clean
$(MAKE) -C cmd/ipfs-cluster-follow clean
@rm -rf ./test/testingData
@rm -rf ./compose
install:
$(MAKE) -C cmd/ipfs-cluster-service install
$(MAKE) -C cmd/ipfs-cluster-ctl install
$(MAKE) -C cmd/ipfs-cluster-follow install
build:
$(MAKE) -C cmd/ipfs-cluster-service build
$(MAKE) -C cmd/ipfs-cluster-ctl build
$(MAKE) -C cmd/ipfs-cluster-follow build
service:
$(MAKE) -C cmd/ipfs-cluster-service ipfs-cluster-service
ctl:
$(MAKE) -C cmd/ipfs-cluster-ctl ipfs-cluster-ctl
follow:
$(MAKE) -C cmd/ipfs-cluster-follow ipfs-cluster-follow
check:
go vet ./...
@ -53,13 +58,13 @@ docker:
docker exec tmp-make-cluster sh -c "ipfs-cluster-ctl version"
docker exec tmp-make-cluster sh -c "ipfs-cluster-service -v"
docker kill tmp-make-cluster
docker build -t cluster-image-test -f Dockerfile-test .
docker run --name tmp-make-cluster-test -d --rm cluster-image && sleep 8
docker run --name tmp-make-cluster-test -d --rm cluster-image && sleep 4
docker exec tmp-make-cluster-test sh -c "ipfs-cluster-ctl version"
docker exec tmp-make-cluster-test sh -c "ipfs-cluster-service -v"
docker kill tmp-make-cluster-test
docker-compose:
mkdir -p compose/ipfs0 compose/ipfs1 compose/cluster0 compose/cluster1
chmod -R 0777 compose
@ -69,6 +74,6 @@ docker-compose:
docker exec cluster1 ipfs-cluster-ctl peers ls | grep -o "Sees 2 other peers" | uniq -c | grep 3
docker-compose down
prcheck: check service ctl test
prcheck: check service ctl follow test
.PHONY: all test test_sharness clean_sharness rw rwundo publish service ctl install clean docker

View File

@ -4,24 +4,24 @@
[![Made by](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](https://protocol.ai)
[![Main project](https://img.shields.io/badge/project-ipfs-blue.svg?style=flat-square)](http://github.com/ipfs/ipfs)
[![IRC channel](https://img.shields.io/badge/freenode-%23ipfs--cluster-blue.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23ipfs-cluster)
[![standard-readme compliant](https://img.shields.io/badge/standard--readme-OK-green.svg?style=flat-square)](https://github.com/RichardLitt/standard-readme)
[![GoDoc](https://godoc.org/github.com/ipfs/ipfs-cluster?status.svg)](https://godoc.org/github.com/ipfs/ipfs-cluster)
[![Go Report Card](https://goreportcard.com/badge/github.com/ipfs/ipfs-cluster)](https://goreportcard.com/report/github.com/ipfs/ipfs-cluster)
[![Build Status](https://travis-ci.com/ipfs/ipfs-cluster.svg?branch=master)](https://travis-ci.com/ipfs/ipfs-cluster)
[![codecov](https://codecov.io/gh/ipfs/ipfs-cluster/branch/master/graph/badge.svg)](https://codecov.io/gh/ipfs/ipfs-cluster)
> Pinset orchestration for IPFS.
> Automated data availability and redundancy on IPFS
<p align="center">
<img src="https://cluster.ipfs.io/cluster/png/IPFS_Cluster_color_no_text.png" alt="logo" width="300" height="300" />
</p>
IPFS Cluster is a stand-alone application and a CLI client that allocates, replicates, and tracks pins across a cluster of IPFS daemons.
IPFS Cluster provides data orchestration across a swarm of IPFS daemons by allocating, replicating and tracking a global pinset distributed among multiple peers.
It provides:
* A cluster peer application: `ipfs-cluster-service`, to be run along with `go-ipfs`.
* A client CLI application: `ipfs-cluster-ctl`, which allows easily interacting with the peer's HTTP API.
* An additional "follower" peer application: `ipfs-cluster-follow`, focused on simplifying the process of configuring and running follower peers.
---
@ -59,17 +59,16 @@ Instructions for different installation methods (including from source) are avai
Extensive usage information is provided at https://cluster.ipfs.io/documentation/ , including:
* [Docs for `ipfs-cluster-service`](https://cluster.ipfs.io/documentation/ipfs-cluster-service/)
* [Docs for `ipfs-cluster-ctl`](https://cluster.ipfs.io/documentation/ipfs-cluster-ctl/)
* [Docs for `ipfs-cluster-service`](https://cluster.ipfs.io/documentation/reference/service/)
* [Docs for `ipfs-cluster-ctl`](https://cluster.ipfs.io/documentation/reference/ctl/)
* [Docs for `ipfs-cluster-follow`](https://cluster.ipfs.io/documentation/reference/follow/)
## Contribute
PRs accepted. As part of the IPFS project, we have some [contribution guidelines](https://cluster.ipfs.io/developer/contribute).
Small note: If editing the README, please conform to the [standard-readme](https://github.com/RichardLitt/standard-readme) specification.
PRs accepted. As part of the IPFS project, we have some [contribution guidelines](https://cluster.ipfs.io/support/#contribution-guidelines).
## License
This library is dual-licensed under Apache 2.0 and MIT terms.
© 2019. Protocol Labs, Inc.
© 2020. Protocol Labs, Inc.

View File

@ -141,6 +141,22 @@ func (a *Adder) FromFiles(ctx context.Context, f files.Directory) (cid.Cid, erro
it := f.Entries()
var adderRoot ipld.Node
for it.Next() {
// In order to set the AddedOutput names right, we use
// OutputPrefix:
//
// When adding a folder, this is the root folder name which is
// prepended to the addedpaths. When adding a single file,
// this is the name of the file which overrides the empty
// AddedOutput name.
//
// After coreunix/add.go was refactored in go-ipfs and we
// followed suit, it no longer receives the name of the
// file/folder being added and does not emit AddedOutput
// events with the right names. We addressed this by adding
// OutputPrefix to our version. go-ipfs modifies emmited
// events before sending to user).
ipfsAdder.OutputPrefix = it.Name()
select {
case <-a.ctx.Done():
return cid.Undef, a.ctx.Err()

View File

@ -7,6 +7,7 @@ import (
"fmt"
"io"
gopath "path"
"path/filepath"
"github.com/ipfs/ipfs-cluster/api"
@ -59,6 +60,11 @@ type Adder struct {
CidBuilder cid.Builder
liveNodes uint64
lastFile mfs.FSNode
// Cluster: ipfs does a hack in commands/add.go to set the filenames
// in emmited events correctly. We carry a root folder name (or a
// filename in the case of single files here and emit those events
// correctly from the beginning).
OutputPrefix string
}
func (adder *Adder) mfsRoot() (*mfs.Root, error) {
@ -193,7 +199,7 @@ func (adder *Adder) outputDirs(path string, fsn mfs.FSNode) error {
return err
}
return outputDagnode(adder.Out, path, nd)
return adder.outputDagnode(adder.Out, path, nd)
default:
return fmt.Errorf("unrecognized fsn type: %#v", fsn)
}
@ -201,8 +207,10 @@ func (adder *Adder) outputDirs(path string, fsn mfs.FSNode) error {
func (adder *Adder) addNode(node ipld.Node, path string) error {
// patch it into the root
outputName := path
if path == "" {
path = node.Cid().String()
outputName = ""
}
if pi, ok := node.(*posinfo.FilestoreNode); ok {
@ -239,7 +247,7 @@ func (adder *Adder) addNode(node ipld.Node, path string) error {
adder.lastFile = lastFile
if !adder.Silent {
return outputDagnode(adder.Out, path, node)
return adder.outputDagnode(adder.Out, outputName, node)
}
return nil
}
@ -413,8 +421,9 @@ func (adder *Adder) addDir(path string, dir files.Directory, toplevel bool) erro
}
// outputDagnode sends dagnode info over the output channel.
// Cluster: we use *api.AddedOutput instead of coreiface events.
func outputDagnode(out chan *api.AddedOutput, name string, dn ipld.Node) error {
// Cluster: we use *api.AddedOutput instead of coreiface events
// and make this an adder method to be be able to prefix.
func (adder *Adder) outputDagnode(out chan *api.AddedOutput, name string, dn ipld.Node) error {
if out == nil {
return nil
}
@ -424,6 +433,14 @@ func outputDagnode(out chan *api.AddedOutput, name string, dn ipld.Node) error {
return err
}
// When adding things in a folder: "OutputPrefix/name"
// When adding a single file: "OutputPrefix" (name is unset)
// When adding a single thing with no name: ""
// Note: ipfs sets the name of files received on stdin to the CID,
// but cluster does not support stdin-adding so we do not
// account for this here.
name = filepath.Join(adder.OutputPrefix, name)
out <- &api.AddedOutput{
Cid: dn.Cid(),
Name: name,

View File

@ -482,15 +482,15 @@ func statusReached(target api.TrackerStatus, gblPinInfo *api.GlobalPinInfo) (boo
}
// logic drawn from go-ipfs-cmds/cli/parse.go: appendFile
func makeSerialFile(fpath string, params *api.AddParams) (files.Node, error) {
func makeSerialFile(fpath string, params *api.AddParams) (string, files.Node, error) {
if fpath == "." {
cwd, err := os.Getwd()
if err != nil {
return nil, err
return "", nil, err
}
cwd, err = filepath.EvalSymlinks(cwd)
if err != nil {
return nil, err
return "", nil, err
}
fpath = cwd
}
@ -499,16 +499,17 @@ func makeSerialFile(fpath string, params *api.AddParams) (files.Node, error) {
stat, err := os.Lstat(fpath)
if err != nil {
return nil, err
return "", nil, err
}
if stat.IsDir() {
if !params.Recursive {
return nil, fmt.Errorf("%s is a directory, but Recursive option is not set", fpath)
return "", nil, fmt.Errorf("%s is a directory, but Recursive option is not set", fpath)
}
}
return files.NewSerialFile(fpath, params.Hidden, stat)
sf, err := files.NewSerialFile(fpath, params.Hidden, stat)
return path.Base(fpath), sf, err
}
// Add imports files to the cluster from the given paths. A path can
@ -534,7 +535,7 @@ func (c *defaultClient) Add(
close(out)
return fmt.Errorf("error parsing path: %s", err)
}
name := path.Base(p)
var name string
var addFile files.Node
if strings.HasPrefix(u.Scheme, "http") {
addFile = files.NewWebFile(u)
@ -544,7 +545,7 @@ func (c *defaultClient) Add(
close(out)
return fmt.Errorf("nocopy option is only valid for URLs")
}
addFile, err = makeSerialFile(p, params)
name, addFile, err = makeSerialFile(p, params)
if err != nil {
close(out)
return err

View File

@ -416,6 +416,12 @@ func (c *Cluster) alertsHandler() {
case <-c.ctx.Done():
return
case alrt := <-c.monitor.Alerts():
// Follower peers do not care about alerts.
// They can do nothing about them.
if c.config.FollowerMode {
continue
}
logger.Warningf("metric alert for %s: Peer: %s.", alrt.MetricName, alrt.Peer)
c.alertsMux.Lock()
for pID, alert := range c.alerts {
@ -644,10 +650,10 @@ This might be due to one or several causes:
c.Shutdown(ctx)
return
case <-c.consensus.Ready(ctx):
// Consensus ready means the state is up to date so we can sync
// it to the tracker. We ignore errors (normal when state
// doesn't exist in new peers).
c.StateSync(ctx)
// Consensus ready means the state is up to date. Every item
// in the state that is not pinned will appear as PinError so
// we can proceed to recover all of those in the tracker.
c.RecoverAllLocal(ctx)
case <-c.ctx.Done():
return
}
@ -685,7 +691,15 @@ func (c *Cluster) Ready() <-chan struct{} {
return c.readyCh
}
// Shutdown stops the IPFS cluster components
// Shutdown performs all the necessary operations to shutdown
// the IPFS Cluster peer:
// * Save peerstore with the current peers
// * Remove itself from consensus when LeaveOnShutdown is set
// * It Shutdowns all the components
// * Closes the datastore
// * Collects all goroutines
//
// Shutdown does not closes the libp2p host or the DHT.
func (c *Cluster) Shutdown(ctx context.Context) error {
_, span := trace.StartSpan(ctx, "cluster/Shutdown")
defer span.End()
@ -769,6 +783,13 @@ func (c *Cluster) Shutdown(ctx context.Context) error {
return err
}
for _, inf := range c.informers {
if err := inf.Shutdown(ctx); err != nil {
logger.Errorf("error stopping informer: %s", err)
return err
}
}
if err := c.tracer.Shutdown(ctx); err != nil {
logger.Errorf("error stopping Tracer: %s", err)
return err
@ -996,7 +1017,8 @@ func (c *Cluster) Join(ctx context.Context, addr ma.Multiaddr) error {
return err
}
c.StateSync(ctx)
// Start pinning items in the state that are not on IPFS yet.
c.RecoverAllLocal(ctx)
logger.Infof("%s: joined %s's cluster", c.id.Pretty(), pid.Pretty())
return nil
@ -1122,7 +1144,7 @@ func (c *Cluster) localPinInfoOp(
}
// RecoverAll triggers a RecoverAllLocal operation on all peer.
// RecoverAll triggers a RecoverAllLocal operation on all peers.
func (c *Cluster) RecoverAll(ctx context.Context) ([]*api.GlobalPinInfo, error) {
_, span := trace.StartSpan(ctx, "cluster/RecoverAll")
defer span.End()
@ -1137,6 +1159,8 @@ func (c *Cluster) RecoverAll(ctx context.Context) ([]*api.GlobalPinInfo, error)
// Recover operations ask IPFS to pin or unpin items in error state. Recover
// is faster than calling Pin on the same CID as it avoids committing an
// identical pin to the consensus layer.
//
// RecoverAllLocal is called automatically every PinRecoverInterval.
func (c *Cluster) RecoverAllLocal(ctx context.Context) ([]*api.PinInfo, error) {
_, span := trace.StartSpan(ctx, "cluster/RecoverAllLocal")
defer span.End()
@ -1382,7 +1406,9 @@ func (c *Cluster) pin(
// Usually allocations are unset when pinning normally, however, the
// allocations may have been preset by the adder in which case they
// need to be respected. Whenever allocations are set. We don't
// re-allocate.
// re-allocate. repinFromPeer() unsets allocations for this reason.
// allocate() will check which peers are currently allocated
// and try to respect them.
if len(pin.Allocations) == 0 {
allocs, err := c.allocate(
ctx,

View File

@ -28,8 +28,8 @@ var DefaultListenAddrs = []string{"/ip4/0.0.0.0/tcp/9096", "/ip4/0.0.0.0/udp/909
// Configuration defaults
const (
DefaultEnableRelayHop = true
DefaultStateSyncInterval = 600 * time.Second
DefaultPinRecoverInterval = 1 * time.Hour
DefaultStateSyncInterval = 5 * time.Minute
DefaultPinRecoverInterval = 12 * time.Minute
DefaultMonitorPingInterval = 15 * time.Second
DefaultPeerWatchInterval = 5 * time.Second
DefaultReplicationFactor = -1
@ -497,6 +497,7 @@ func (cfg *Config) toConfigJSON() (jcfg *configJSON, err error) {
jcfg.MDNSInterval = cfg.MDNSInterval.String()
jcfg.DisableRepinning = cfg.DisableRepinning
jcfg.PeerstoreFile = cfg.PeerstoreFile
jcfg.PeerAddresses = []string{}
for _, addr := range cfg.PeerAddresses {
jcfg.PeerAddresses = append(jcfg.PeerAddresses, addr.String())
}

View File

@ -0,0 +1,13 @@
Copyright 2020. Protocol Labs, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@ -1,6 +1,4 @@
The MIT License (MIT)
Copyright (c) 2017 Protocol Labs, Inc
Copyright 2020. Protocol Labs, Inc.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal

View File

@ -29,7 +29,7 @@ const programName = `ipfs-cluster-ctl`
// Version is the cluster-ctl tool version. It should match
// the IPFS cluster's version
const Version = "0.11.0"
const Version = "0.12.0"
var (
defaultHost = "/ip4/127.0.0.1/tcp/9094"

View File

@ -0,0 +1,17 @@
# go source files
SRC := $(shell find .. -type f -name '*.go')
all: ipfs-cluster-follow
ipfs-cluster-follow: $(SRC)
go build -mod=readonly -ldflags "-X main.commit=$(shell git rev-parse HEAD)"
build: ipfs-cluster-follow
install:
go install -ldflags "-X main.commit=$(shell git rev-parse HEAD)"
clean:
rm -f ipfs-cluster-follow
.PHONY: clean install build

View File

@ -0,0 +1,506 @@
package main
import (
"context"
"fmt"
"os"
"os/signal"
"path/filepath"
"strings"
"time"
"github.com/ipfs/go-cid"
ipfscluster "github.com/ipfs/ipfs-cluster"
"github.com/ipfs/ipfs-cluster/allocator/descendalloc"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/api/rest"
"github.com/ipfs/ipfs-cluster/cmdutils"
"github.com/ipfs/ipfs-cluster/config"
"github.com/ipfs/ipfs-cluster/consensus/crdt"
"github.com/ipfs/ipfs-cluster/informer/disk"
"github.com/ipfs/ipfs-cluster/ipfsconn/ipfshttp"
"github.com/ipfs/ipfs-cluster/monitor/pubsubmon"
"github.com/ipfs/ipfs-cluster/observations"
"github.com/ipfs/ipfs-cluster/pintracker/stateless"
"github.com/pkg/errors"
cli "github.com/urfave/cli/v2"
)
func printFirstStart() {
fmt.Printf(`
No clusters configured yet!
If this is the first time you are running %s,
be sure to check out the usage documentation. Here are some
examples to get you going:
$ %s --help - general description and usage help
$ %s <clusterName> --help - Help and subcommands for the <clusterName>'s follower peer
$ %s <clusterName> info --help - Help for the "info" subcommand (same for others).
`, programName, programName, programName, programName)
}
func printNotInitialized(clusterName string) {
fmt.Printf(`
This cluster peer has not been initialized.
Try running "%s %s init <config-url>" first.
`, programName, clusterName)
}
func setLogLevels(lvl string) {
for f := range ipfscluster.LoggingFacilities {
ipfscluster.SetFacilityLogLevel(f, lvl)
}
}
// returns whether the config folder exists
func isInitialized(absPath string) bool {
_, err := os.Stat(absPath)
return err == nil
}
func listClustersCmd(c *cli.Context) error {
absPath, _, _ := buildPaths(c, "")
f, err := os.Open(absPath)
if os.IsNotExist(err) {
printFirstStart()
return nil
}
if err != nil {
return cli.Exit(err, 1)
}
dirs, err := f.Readdir(-1)
if err != nil {
return cli.Exit(errors.Wrapf(err, "reading %s", absPath), 1)
}
var filteredDirs []string
for _, d := range dirs {
if d.IsDir() {
configPath := filepath.Join(absPath, d.Name(), DefaultConfigFile)
if _, err := os.Stat(configPath); err == nil {
filteredDirs = append(filteredDirs, d.Name())
}
}
}
if len(filteredDirs) == 0 {
printFirstStart()
return nil
}
fmt.Printf("Configurations found for %d follower peers. For info and help, try running:\n\n", len(filteredDirs))
for _, d := range filteredDirs {
fmt.Printf("%s \"%s\"\n", programName, d)
}
fmt.Printf("\nTip: \"%s --help\" for help and examples.\n", programName)
return nil
}
func infoCmd(c *cli.Context) error {
clusterName := c.String("clusterName")
// Avoid pollution of the screen
setLogLevels("critical")
absPath, configPath, identityPath := buildPaths(c, clusterName)
if !isInitialized(absPath) {
printNotInitialized(clusterName)
return cli.Exit("", 1)
}
cfgHelper, err := cmdutils.NewLoadedConfigHelper(configPath, identityPath)
var url string
if err != nil {
if config.IsErrFetchingSource(err) {
url = fmt.Sprintf(
"failed retrieving configuration source: %s",
cfgHelper.Manager().Source,
)
} else {
return cli.Exit(errors.Wrapf(err, "reading the configurations in %s", absPath), 1)
}
} else {
url = fmt.Sprintf("Available (%s)", cfgHelper.Manager().Source)
}
cfgHelper.Manager().Shutdown()
fmt.Printf("Information about follower peer for Cluster \"%s\":\n\n", clusterName)
fmt.Printf("Config folder: %s\n", absPath)
fmt.Printf("Config source URL: %s\n", url)
ctx := context.Background()
client, err := getClient(absPath, clusterName)
if err != nil {
return cli.Exit(errors.Wrap(err, "error creating client"), 1)
}
_, err = client.Version(ctx)
fmt.Printf("Cluster Peer online: %t\n", err == nil)
// Either we loaded a valid config, or we are using a default. Worth
// applying env vars in the second case.
if err := cfgHelper.Configs().Ipfshttp.ApplyEnvVars(); err != nil {
return cli.Exit(errors.Wrap(err, "applying environment variables to ipfshttp config"), 1)
}
cfgHelper.Configs().Ipfshttp.ConnectSwarmsDelay = 0
connector, err := ipfshttp.NewConnector(cfgHelper.Configs().Ipfshttp)
if err == nil {
_, err = connector.ID(ctx)
}
fmt.Printf("IPFS peer online: %t\n", err == nil)
if c.Command.Name == "" {
fmt.Printf("Additional help:\n\n")
fmt.Printf("-------------------------------------------------\n\n")
return cli.ShowAppHelp(c)
}
return nil
}
func initCmd(c *cli.Context) error {
if !c.Args().Present() {
return cli.Exit("configuration URL not provided", 1)
}
cfgURL := c.Args().First()
return initCluster(c, false, cfgURL)
}
func initCluster(c *cli.Context, ignoreReinit bool, cfgURL string) error {
clusterName := c.String(clusterNameFlag)
absPath, configPath, identityPath := buildPaths(c, clusterName)
if isInitialized(absPath) {
if ignoreReinit {
fmt.Println("Configuration for this cluster already exists. Skipping initialization.")
fmt.Printf("If you wish to re-initialize, simply delete %s\n\n", absPath)
return nil
}
cmdutils.ErrorOut("Configuration for this cluster already exists.\n")
cmdutils.ErrorOut("Please delete %s if you wish to re-initialize.", absPath)
return cli.Exit("", 1)
}
gw := c.String("gateway")
if !strings.HasPrefix(cfgURL, "http://") && !strings.HasPrefix(cfgURL, "https://") {
fmt.Printf("%s will be assumed to be an DNSLink-powered address: /ipns/%s.\n", cfgURL, cfgURL)
fmt.Printf("It will be resolved using the local IPFS daemon's gateway (%s).\n", gw)
fmt.Println("If this is not the case, specify the full url starting with http:// or https://.")
fmt.Println("(You can override the gateway URL by setting IPFS_GATEWAY)")
fmt.Println()
cfgURL = fmt.Sprintf("http://%s/ipns/%s", gw, cfgURL)
}
cfgHelper := cmdutils.NewConfigHelper(configPath, identityPath, "crdt")
cfgHelper.Manager().Shutdown()
cfgHelper.Manager().Source = cfgURL
err := cfgHelper.Manager().Default()
if err != nil {
return cli.Exit(errors.Wrap(err, "error generating default config"), 1)
}
ident := cfgHelper.Identity()
err = ident.Default()
if err != nil {
return cli.Exit(errors.Wrap(err, "error generating identity"), 1)
}
err = ident.ApplyEnvVars()
if err != nil {
return cli.Exit(errors.Wrap(err, "error applying environment variables to the identity"), 1)
}
err = cfgHelper.SaveIdentityToDisk()
if err != nil {
return cli.Exit(errors.Wrapf(err, "error saving %s", identityPath), 1)
}
fmt.Printf("Identity written to %s.\n", identityPath)
err = cfgHelper.SaveConfigToDisk()
if err != nil {
return cli.Exit(errors.Wrapf(err, "saving %s", configPath), 1)
}
fmt.Printf("Configuration written to %s.\n", configPath)
fmt.Printf("Cluster \"%s\" follower peer initialized.\n\n", clusterName)
fmt.Printf(
"You can now use \"%s %s run\" to start a follower peer for this cluster.\n",
programName,
clusterName,
)
fmt.Println("(Remember to start your IPFS daemon before)")
return nil
}
func runCmd(c *cli.Context) error {
clusterName := c.String(clusterNameFlag)
if cfgURL := c.String("init"); cfgURL != "" {
err := initCluster(c, true, cfgURL)
if err != nil {
return err
}
}
absPath, configPath, identityPath := buildPaths(c, clusterName)
if !isInitialized(absPath) {
printNotInitialized(clusterName)
return cli.Exit("", 1)
}
fmt.Printf("Starting the IPFS Cluster follower peer for \"%s\".\nCTRL-C to stop it.\n", clusterName)
fmt.Println("Checking if IPFS is online (will wait for 2 minutes)...")
ctxIpfs, cancelIpfs := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancelIpfs()
err := cmdutils.WaitForIPFS(ctxIpfs)
if err != nil {
return cli.Exit("timed out waiting for IPFS to be available", 1)
}
setLogLevels(logLevel) // set to "info" by default.
// Avoid API logs polluting the screen everytime we
// run some "list" command.
ipfscluster.SetFacilityLogLevel("restapilog", "error")
cfgHelper, err := cmdutils.NewLoadedConfigHelper(configPath, identityPath)
if err != nil {
return cli.Exit(errors.Wrapf(err, "reading the configurations in %s", absPath), 1)
}
cfgHelper.Manager().Shutdown()
cfgs := cfgHelper.Configs()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
host, pubsub, dht, err := ipfscluster.NewClusterHost(ctx, cfgHelper.Identity(), cfgs.Cluster)
if err != nil {
return cli.Exit(errors.Wrap(err, "error creating libp2p components"), 1)
}
// Always run followers in follower mode.
cfgs.Cluster.FollowerMode = true
// Discard API configurations and create our own
apiCfg := rest.Config{}
cfgs.Restapi = &apiCfg
_ = apiCfg.Default()
listenSocket, err := socketAddress(absPath, clusterName)
if err != nil {
return cli.Exit(err, 1)
}
apiCfg.HTTPListenAddr = listenSocket
// Allow customization via env vars
err = apiCfg.ApplyEnvVars()
if err != nil {
return cli.Exit(errors.Wrap(err, "error applying enviromental variables to restapi configuration"), 1)
}
rest, err := rest.NewAPI(ctx, &apiCfg)
if err != nil {
return cli.Exit(errors.Wrap(err, "creating REST API component"), 1)
}
connector, err := ipfshttp.NewConnector(cfgs.Ipfshttp)
if err != nil {
return cli.Exit(errors.Wrap(err, "creating IPFS Connector component"), 1)
}
informer, err := disk.NewInformer(cfgs.Diskinf)
if err != nil {
return cli.Exit(errors.Wrap(err, "creating disk informer"), 1)
}
alloc := descendalloc.NewAllocator()
stmgr, err := cmdutils.NewStateManager(cfgHelper.GetConsensus(), cfgHelper.Identity(), cfgs)
if err != nil {
return cli.Exit(errors.Wrap(err, "creating state manager"), 1)
}
store, err := stmgr.GetStore()
if err != nil {
return cli.Exit(errors.Wrap(err, "creating datastore"), 1)
}
crdtcons, err := crdt.New(
host,
dht,
pubsub,
cfgs.Crdt,
store,
)
if err != nil {
store.Close()
return cli.Exit(errors.Wrap(err, "creating CRDT component"), 1)
}
tracker := stateless.New(cfgs.Statelesstracker, host.ID(), cfgs.Cluster.Peername, crdtcons.State)
mon, err := pubsubmon.New(ctx, cfgs.Pubsubmon, pubsub, nil)
if err != nil {
store.Close()
return cli.Exit(errors.Wrap(err, "setting up PeerMonitor"), 1)
}
// Hardcode disabled tracing and metrics to avoid mistakenly
// exposing any user data.
tracerCfg := observations.TracingConfig{}
_ = tracerCfg.Default()
tracerCfg.EnableTracing = false
cfgs.Tracing = &tracerCfg
tracer, err := observations.SetupTracing(&tracerCfg)
if err != nil {
return cli.Exit(errors.Wrap(err, "error setting up tracer"), 1)
}
// This does nothing since we are not calling SetupMetrics anyways
// But stays just to be explicit.
metricsCfg := observations.MetricsConfig{}
_ = metricsCfg.Default()
metricsCfg.EnableStats = false
cfgs.Metrics = &metricsCfg
// We are going to run a cluster peer and should do an
// oderly shutdown if we are interrupted: cancel default
// signal handling and leave things to HandleSignals.
signal.Stop(signalChan)
close(signalChan)
cluster, err := ipfscluster.NewCluster(
ctx,
host,
dht,
cfgs.Cluster,
store,
crdtcons,
[]ipfscluster.API{rest},
connector,
tracker,
mon,
alloc,
[]ipfscluster.Informer{informer},
tracer,
)
if err != nil {
store.Close()
return cli.Exit(errors.Wrap(err, "error creating cluster peer"), 1)
}
return cmdutils.HandleSignals(ctx, cancel, cluster, host, dht)
}
// List
func listCmd(c *cli.Context) error {
clusterName := c.String("clusterName")
absPath, configPath, identityPath := buildPaths(c, clusterName)
cfgHelper, err := cmdutils.NewLoadedConfigHelper(configPath, identityPath)
if err != nil {
fmt.Println("error loading configurations.")
if config.IsErrFetchingSource(err) {
fmt.Println("Make sure the source URL is reachable:")
}
return cli.Exit(err, 1)
}
cfgHelper.Manager().Shutdown()
err = printStatusOnline(absPath, clusterName)
if err != nil {
apiErr, ok := err.(*api.Error)
if ok && apiErr.Code != 0 {
return cli.Exit(
errors.Wrapf(
err,
"The Peer API seems to be running but returned with code %d",
apiErr.Code,
), 1)
}
err := printStatusOffline(cfgHelper)
if err != nil {
return cli.Exit(errors.Wrap(err, "error obtaining the pinset"), 1)
}
}
return nil
}
func printStatusOnline(absPath, clusterName string) error {
ctx := context.Background()
client, err := getClient(absPath, clusterName)
if err != nil {
return cli.Exit(errors.Wrap(err, "error creating client"), 1)
}
gpis, err := client.StatusAll(ctx, 0, true)
if err != nil {
return err
}
// do not return errors after this.
var pid string
for _, gpi := range gpis {
if pid == "" { // do this once
// PeerMap will only have one key
for k := range gpi.PeerMap {
pid = k
break
}
}
pinInfo := gpi.PeerMap[pid]
// Get pin name
var name string
pin, err := client.Allocation(ctx, gpi.Cid)
if err != nil {
name = "(" + err.Error() + ")"
} else {
name = pin.Name
}
printPin(gpi.Cid, pinInfo.Status.String(), name, pinInfo.Error)
}
return nil
}
func printStatusOffline(cfgHelper *cmdutils.ConfigHelper) error {
// The blockstore module loaded from ipfs-lite tends to print
// an error when the datastore is closed before the bloom
// filter cached has finished building. Could not find a way
// to avoid it other than disabling bloom chaching on offline
// ipfs-lite peers which is overkill. So we just hide it.
ipfscluster.SetFacilityLogLevel("blockstore", "critical")
mgr, err := cmdutils.NewStateManagerWithHelper(cfgHelper)
if err != nil {
return err
}
store, err := mgr.GetStore()
if err != nil {
return err
}
defer store.Close()
st, err := mgr.GetOfflineState(store)
if err != nil {
return err
}
pins, err := st.List(context.Background())
if err != nil {
return err
}
for _, pin := range pins {
printPin(pin.Cid, "offline", pin.Name, "")
}
return nil
}
func printPin(c cid.Cid, status, name, err string) {
if err != "" {
name = name + " (" + err + ")"
}
fmt.Printf("%-20s %s %s\n", status, c, name)
}

View File

@ -0,0 +1,13 @@
Copyright 2020. Protocol Labs, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@ -1,6 +1,4 @@
The MIT License (MIT)
Copyright (c) 2017 Protocol Labs, Inc
Copyright 2020. Protocol Labs, Inc.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal

33
cmd/ipfs-cluster-follow/dist/README.md vendored Normal file
View File

@ -0,0 +1,33 @@
# `ipfs-cluster-follow`
> A tool to run IPFS Cluster follower peers
`ipfs-cluster-follow` allows to setup and run IPFS Cluster follower peers.
Follower peers can join collaborative clusters to track content in the
cluster. Follower peers do not have permissions to modify the cluster pinset
or access endpoints from other follower peers.
`ipfs-cluster-follow` allows to run several peers at the same time (each
joining a different cluster) and it is intended to be a very easy to use
application with a minimal feature set. In order to run a fully-featured peer
(follower or not), use `ipfs-cluster-service`.
### Usage
The `ipfs-cluster-follow` command is always followed by the cluster name
that we wish to work with. Full usage information can be obtained by running:
```
$ ipfs-cluster-follow --help
$ ipfs-cluster-follow --help
$ ipfs-cluster-follow <clusterName> --help
$ ipfs-cluster-follow <clusterName> info --help
$ ipfs-cluster-follow <clusterName> init --help
$ ipfs-cluster-follow <clusterName> run --help
$ ipfs-cluster-follow <clusterName> list --help
```
For more information, please check the [Documentation](https://cluster.ipfs.io/documentation), in particular the [`ipfs-cluster-follow` section](https://cluster.ipfs.io/documentation/ipfs-cluster-follow).

View File

@ -0,0 +1,324 @@
// The ipfs-cluster-follow application.
package main
import (
"fmt"
"os"
"os/signal"
"os/user"
"path/filepath"
"syscall"
"github.com/ipfs/ipfs-cluster/api/rest/client"
"github.com/ipfs/ipfs-cluster/cmdutils"
"github.com/ipfs/ipfs-cluster/version"
"github.com/multiformats/go-multiaddr"
"github.com/pkg/errors"
semver "github.com/blang/semver"
cli "github.com/urfave/cli/v2"
)
const (
// ProgramName of this application
programName = "ipfs-cluster-follow"
clusterNameFlag = "clusterName"
logLevel = "info"
)
// Default location for the configurations and data
var (
// DefaultFolder is the name of the cluster folder
DefaultFolder = ".ipfs-cluster-follow"
// DefaultPath is set on init() to $HOME/DefaultFolder
// and holds all the ipfs-cluster data
DefaultPath string
// The name of the configuration file inside DefaultPath
DefaultConfigFile = "service.json"
// The name of the identity file inside DefaultPath
DefaultIdentityFile = "identity.json"
DefaultGateway = "127.0.0.1:8080"
)
var (
commit string
configPath string
identityPath string
signalChan = make(chan os.Signal, 20)
)
// Description provides a short summary of the functionality of this tool
var Description = fmt.Sprintf(`
%s helps running IPFS Cluster follower peers.
Follower peers subscribe to a Cluster controlled by a set of "trusted
peers". They collaborate in pinning items as dictated by the trusted peers and
do not have the power to make Cluster-wide modifications to the pinset.
Follower peers cannot access information nor trigger actions in other peers.
%s can be used to follow different clusters by launching it
with different options. Each Cluster has an identity, a configuration
and a datastore associated to it, which are kept under
"~/%s/<cluster_name>".
For feedback, bug reports or any additional information, visit
https://github.com/ipfs/ipfs-cluster.
EXAMPLES:
List configured follower peers:
$ %s
Display information for a follower peer:
$ %s <clusterName> info
Initialize a follower peer:
$ %s <clusterName> init <example.url>
Launch a follower peer (will stay running):
$ %s <clusterName> run
List items in the pinset for a given cluster:
$ %s <clusterName> list
Getting help and usage info:
$ %s --help
$ %s <clusterName> --help
$ %s <clusterName> info --help
$ %s <clusterName> init --help
$ %s <clusterName> run --help
$ %s <clusterName> list --help
`,
programName,
programName,
DefaultFolder,
programName,
programName,
programName,
programName,
programName,
programName,
programName,
programName,
programName,
programName,
programName,
)
func init() {
// Set build information.
if build, err := semver.NewBuildVersion(commit); err == nil {
version.Version.Build = []string{"git" + build}
}
// We try guessing user's home from the HOME variable. This
// allows HOME hacks for things like Snapcraft builds. HOME
// should be set in all UNIX by the OS. Alternatively, we fall back to
// usr.HomeDir (which should work on Windows etc.).
home := os.Getenv("HOME")
if home == "" {
usr, err := user.Current()
if err != nil {
panic(fmt.Sprintf("cannot get current user: %s", err))
}
home = usr.HomeDir
}
DefaultPath = filepath.Join(home, DefaultFolder)
// This will abort the program on signal. We close the signal channel
// when launching the peer so that we can do an orderly shutdown in
// that case though.
go func() {
signal.Notify(
signalChan,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGHUP,
)
_, ok := <-signalChan // channel closed.
if !ok {
return
}
os.Exit(1)
}()
}
func main() {
app := cli.NewApp()
app.Name = programName
app.Usage = "IPFS Cluster Follower"
app.UsageText = fmt.Sprintf("%s [global options] <clusterName> [subcommand]...", programName)
app.Description = Description
//app.Copyright = "© Protocol Labs, Inc."
app.Version = version.Version.String()
app.Flags = []cli.Flag{
&cli.StringFlag{
Name: "config, c",
Value: DefaultPath,
Usage: "path to the followers configuration and data `FOLDER`",
EnvVars: []string{"IPFS_CLUSTER_PATH"},
},
}
app.Action = func(c *cli.Context) error {
if !c.Args().Present() {
return listClustersCmd(c)
}
clusterName := c.Args().Get(0)
clusterApp := cli.NewApp()
clusterApp.Name = fmt.Sprintf("%s %s", programName, clusterName)
clusterApp.HelpName = clusterApp.Name
clusterApp.Usage = fmt.Sprintf("Follower peer management for \"%s\"", clusterName)
clusterApp.UsageText = fmt.Sprintf("%s %s [subcommand]", programName, clusterName)
clusterApp.Action = infoCmd
clusterApp.HideVersion = true
clusterApp.Flags = []cli.Flag{
&cli.StringFlag{ // pass clusterName to subcommands
Name: clusterNameFlag,
Value: clusterName,
Hidden: true,
},
}
clusterApp.Commands = []*cli.Command{
{
Name: "info",
Usage: "displays information for this peer",
ArgsUsage: "",
Description: fmt.Sprintf(`
This command display useful information for "%s"'s follower peer.
`, clusterName),
Action: infoCmd,
},
{
Name: "init",
Usage: "initializes the follower peer",
ArgsUsage: "<template_URL>",
Description: fmt.Sprintf(`
This command initializes a follower peer for the cluster named "%s". You
will need to pass the peer configuration URL. The command will generate a new
peer identity and leave things ready to run "%s %s run".
An error will be returned if a configuration folder for a cluster peer with
this name already exists. If you wish to re-initialize from scratch, delete
this folder first.
`, clusterName, programName, clusterName),
Action: initCmd,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "gateway",
Value: DefaultGateway,
Usage: "gateway URL",
EnvVars: []string{"IPFS_GATEWAY"},
Hidden: true,
},
},
},
{
Name: "run",
Usage: "runs the follower peer",
ArgsUsage: "",
Description: fmt.Sprintf(`
This commands runs a "%s" cluster follower peer. The peer should have already
been initialized with "init" alternatively the --init flag needs to be
passed.
Before running, ensure that you have connectivity and that the IPFS daemon is
running.
You can obtain more information about this follower peer by running
"%s %s" (without any arguments).
The peer will stay running in the foreground until manually stopped.
`, clusterName, programName, clusterName),
Action: runCmd,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "init",
Usage: "initialize cluster peer with the given URL before running",
},
},
},
{
Name: "list",
Usage: "list items in the peers' pinset",
ArgsUsage: "",
Description: `
This commands lists all the items pinned by this follower cluster peer on IPFS.
If the peer is currently running, it will display status information for each
pin (such as PINNING). If not, it will just display the current list of pins
as obtained from the internal state on disk.
`,
Action: listCmd,
},
}
return clusterApp.RunAsSubcommand(c)
}
app.Run(os.Args)
}
// build paths returns the path to the configuration folder,
// the identity.json and the service.json files.
func buildPaths(c *cli.Context, clusterName string) (string, string, string) {
absPath, err := filepath.Abs(c.String("config"))
if err != nil {
cmdutils.ErrorOut("error getting absolute path for %s: %s", err, clusterName)
os.Exit(1)
}
// ~/.ipfs-cluster-follow/clusterName
absPath = filepath.Join(absPath, clusterName)
// ~/.ipfs-cluster-follow/clusterName/service.json
configPath = filepath.Join(absPath, DefaultConfigFile)
// ~/.ipfs-cluster-follow/clusterName/indentity.json
identityPath = filepath.Join(absPath, DefaultIdentityFile)
return absPath, configPath, identityPath
}
func socketAddress(absPath, clusterName string) (multiaddr.Multiaddr, error) {
socket := fmt.Sprintf("/unix/%s", filepath.Join(absPath, "api-socket"))
ma, err := multiaddr.NewMultiaddr(socket)
if err != nil {
return nil, errors.Wrapf(err, "error parsing socket: %s", socket)
}
return ma, nil
}
// returns an REST API client. Points to the socket address unless
// CLUSTER_RESTAPI_HTTPLISTENMULTIADDRESS is set, in which case it uses it.
func getClient(absPath, clusterName string) (client.Client, error) {
var endp multiaddr.Multiaddr
var err error
if endpStr := os.Getenv("CLUSTER_RESTAPI_HTTPLISTENMULTIADDRESS"); endpStr != "" {
endp, err = multiaddr.NewMultiaddr(endpStr)
if err != nil {
return nil, errors.Wrapf(err, "error parsing the value of CLUSTER_RESTAPI_HTTPLISTENMULTIADDRESS: %s", endpStr)
}
} else {
endp, err = socketAddress(absPath, clusterName)
}
if err != nil {
return nil, err
}
cfg := client.Config{
APIAddr: endp,
}
return client.NewDefaultClient(&cfg)
}

View File

@ -0,0 +1,13 @@
Copyright 2020. Protocol Labs, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@ -0,0 +1,19 @@
Copyright 2020. Protocol Labs, Inc.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@ -97,6 +97,13 @@ $ ipfs-cluster-service daemon
Launch a peer and join existing cluster:
$ ipfs-cluster-service daemon --bootstrap /ip4/192.168.1.2/tcp/9096/p2p/QmPSoSaPXpyunaBwHs1rZBKYSqRV4bLRk32VGYLuvdrypL
Customize logs using --loglevel flag. To customize component-level
logging pass a comma-separated list of component-identifer:log-level
pair or without identifier for overall loglevel. Valid loglevels
are critical, error, warning, notice, info and debug.
$ ipfs-cluster-service --loglevel info,cluster:debug,pintracker:debug daemon
`,
programName,
programName,
@ -189,9 +196,9 @@ func main() {
Usage: "enable full debug logging (very verbose)",
},
cli.StringFlag{
Name: "loglevel, l",
Value: defaultLogLevel,
Usage: "set the loglevel for cluster components only [critical, error, warning, info, debug]",
Name: "loglevel, l",
EnvVar: "IPFS_CLUSTER_LOG_LEVEL",
Usage: "set overall and component-wise log levels",
},
}
@ -204,11 +211,10 @@ func main() {
configPath = filepath.Join(absPath, DefaultConfigFile)
identityPath = filepath.Join(absPath, DefaultIdentityFile)
setupLogLevel(c.String("loglevel"))
if c.Bool("debug") {
setupDebug()
err = setupLogLevel(c.Bool("debug"), c.String("loglevel"))
if err != nil {
return err
}
locker = &lock{path: absPath}
return nil
@ -573,15 +579,67 @@ func run(c *cli.Context) error {
return nil
}
func setupLogLevel(lvl string) {
for f := range ipfscluster.LoggingFacilities {
ipfscluster.SetFacilityLogLevel(f, lvl)
func setupLogLevel(debug bool, l string) error {
// if debug is set to true, log everything in debug level
if debug {
ipfscluster.SetFacilityLogLevel("*", "DEBUG")
return nil
}
ipfscluster.SetFacilityLogLevel("service", lvl)
}
func setupDebug() {
ipfscluster.SetFacilityLogLevel("*", "DEBUG")
compLogLevel := strings.Split(l, ",")
var logLevel string
compLogFacs := make(map[string]string)
// get overall log level and component-wise log levels from arguments
for _, cll := range compLogLevel {
if cll == "" {
continue
}
identifierToLevel := strings.Split(cll, ":")
var lvl string
var comp string
switch len(identifierToLevel) {
case 1:
lvl = identifierToLevel[0]
comp = "all"
case 2:
lvl = identifierToLevel[1]
comp = identifierToLevel[0]
default:
return errors.New("log level not in expected format \"identifier:loglevel\" or \"loglevel\"")
}
_, ok := compLogFacs[comp]
if ok {
fmt.Printf("overwriting existing %s log level\n", comp)
}
compLogFacs[comp] = lvl
}
logLevel, ok := compLogFacs["all"]
if !ok {
logLevel = defaultLogLevel
} else {
delete(compLogFacs, "all")
}
// log service with logLevel
ipfscluster.SetFacilityLogLevel("service", logLevel)
logfacs := make(map[string]string)
for key := range ipfscluster.LoggingFacilities {
logfacs[key] = logLevel
}
// fill component-wise log levels
for identifier, level := range compLogFacs {
logfacs[identifier] = level
}
for identifier, level := range logfacs {
ipfscluster.SetFacilityLogLevel(identifier, level)
}
return nil
}
func userProvidedSecret(enterSecret bool) ([]byte, bool) {

View File

@ -9,11 +9,14 @@ import (
"os"
"os/signal"
"syscall"
"time"
ipfscluster "github.com/ipfs/ipfs-cluster"
ipfshttp "github.com/ipfs/ipfs-cluster/ipfsconn/ipfshttp"
host "github.com/libp2p/go-libp2p-host"
dht "github.com/libp2p/go-libp2p-kad-dht"
ma "github.com/multiformats/go-multiaddr"
"github.com/pkg/errors"
)
// RandomizePorts replaces TCP and UDP ports with random, but valid port
@ -127,3 +130,43 @@ Note that this may corrupt the local cluster state.
func ErrorOut(m string, a ...interface{}) {
fmt.Fprintf(os.Stderr, m, a...)
}
// WaitForIPFS hangs until IPFS API becomes available or the given context is
// cancelled. The IPFS API location is determined by the default ipfshttp
// component configuration and can be overriden using environment variables
// that affect that configuration. Note that we have to do this in the blind,
// since we want to wait for IPFS before we even fetch the IPFS component
// configuration (because the configuration might be hosted on IPFS itself)
func WaitForIPFS(ctx context.Context) error {
ipfshttpCfg := ipfshttp.Config{}
ipfshttpCfg.Default()
ipfshttpCfg.ApplyEnvVars()
ipfshttpCfg.ConnectSwarmsDelay = 0
ipfshttpCfg.Tracing = false
ipfscluster.SetFacilityLogLevel("ipfshttp", "critical")
defer ipfscluster.SetFacilityLogLevel("ipfshttp", "info")
ipfs, err := ipfshttp.NewConnector(&ipfshttpCfg)
if err != nil {
return errors.Wrap(err, "error creating an ipfshttp instance to wait for IPFS")
}
i := 0
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
if i%10 == 0 {
fmt.Printf("waiting for IPFS to become available on %s...\n", ipfshttpCfg.NodeAddr)
}
i++
time.Sleep(time.Second)
_, err := ipfs.ID(ctx)
if err == nil {
// sleep an extra second and quit
time.Sleep(time.Second)
return nil
}
}
}
}

View File

@ -360,7 +360,7 @@ func (cfg *Manager) LoadJSONFromHTTPSource(url string) error {
cfg.Source = url
resp, err := http.Get(url)
if err != nil {
return errFetchingSource
return fmt.Errorf("%w: %s", errFetchingSource, url)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)

View File

@ -31,6 +31,7 @@ var testingClusterCfg = []byte(`{
"grace_period": "2m0s"
},
"state_sync_interval": "1m0s",
"pin_recover_interval": "1m0s",
"replication_factor": -1,
"monitor_ping_interval": "1s",
"peer_watch_interval": "1s",

View File

@ -67,9 +67,10 @@ type Consensus struct {
dht *dht.IpfsDHT
pubsub *pubsub.PubSub
rpcClient *rpc.Client
rpcReady chan struct{}
readyCh chan struct{}
rpcClient *rpc.Client
rpcReady chan struct{}
stateReady chan struct{}
readyCh chan struct{}
shutdownLock sync.RWMutex
shutdown bool
@ -124,6 +125,7 @@ func New(
pubsub: pubsub,
rpcReady: make(chan struct{}, 1),
readyCh: make(chan struct{}, 1),
stateReady: make(chan struct{}, 1),
}
go css.setup()
@ -263,6 +265,8 @@ func (css *Consensus) setup() {
logger.Info("'trust all' mode enabled. Any peer in the cluster can modify the pinset.")
}
// notifies State() it is safe to return
close(css.stateReady)
css.readyCh <- struct{}{}
}
@ -427,8 +431,18 @@ func (css *Consensus) RmPeer(ctx context.Context, pid peer.ID) error {
return ErrRmPeer
}
// State returns the cluster shared state.
func (css *Consensus) State(ctx context.Context) (state.ReadOnly, error) { return css.state, nil }
// State returns the cluster shared state. It will block until the consensus
// component is ready, shutdown or the given context has been cancelled.
func (css *Consensus) State(ctx context.Context) (state.ReadOnly, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-css.ctx.Done():
return nil, css.ctx.Err()
case <-css.stateReady:
return css.state, nil
}
}
// Clean deletes all crdt-consensus datas from the datastore.
func (css *Consensus) Clean(ctx context.Context) error {
@ -468,10 +482,9 @@ func (css *Consensus) Leader(ctx context.Context) (peer.ID, error) {
return "", ErrNoLeader
}
// OfflineState returns an offline, read-only batching state using the given
// datastore. Any writes to this state are processed through the given
// ipfs connector (the state is offline as it does not require a
// running cluster peer).
// OfflineState returns an offline, batching state using the given
// datastore. This allows to inspect and modify the shared state in offline
// mode.
func OfflineState(cfg *Config, store ds.Datastore) (state.BatchingState, error) {
batching, ok := store.(ds.Batching)
if !ok {

1
go.mod
View File

@ -72,6 +72,7 @@ require (
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926
github.com/ugorji/go/codec v1.1.7
github.com/urfave/cli v1.22.1
github.com/urfave/cli/v2 v2.0.0
go.opencensus.io v0.22.1
golang.org/x/crypto v0.0.0-20191205180655-e7c4368fe9dd
gonum.org/v1/gonum v0.0.0-20190926113837-94b2bbd8ac13

2
go.sum
View File

@ -844,6 +844,8 @@ github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
github.com/urfave/cli v1.22.1 h1:+mkCCcOFKPnCmVYVcURKps1Xe+3zP90gSYGNfRkjoIY=
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/urfave/cli/v2 v2.0.0 h1:+HU9SCbu8GnEUFtIBfuUNXN39ofWViIEJIp6SURMpCg=
github.com/urfave/cli/v2 v2.0.0/go.mod h1:SE9GqnLQmjVa0iPEY0f1w3ygNIYcIJ0OKPMoW2caLfQ=
github.com/vishvananda/netlink v1.0.0 h1:bqNY2lgheFIu1meHUFSH3d7vG93AFyqg3oGbJCOJgSM=
github.com/vishvananda/netlink v1.0.0/go.mod h1:+SR5DhBJrl6ZM7CoCKvpw5BKroDKQ+PJqOg65H/2ktk=
github.com/vishvananda/netns v0.0.0-20190625233234-7109fa855b0f h1:nBX3nTcmxEtHSERBJaIo1Qa26VwRaopnZmfDQUXsF4I=

View File

@ -1,6 +1,8 @@
package ipfscluster
import logging "github.com/ipfs/go-log"
import (
logging "github.com/ipfs/go-log"
)
var logger = logging.Logger("cluster")

View File

@ -21,7 +21,7 @@ const tracingEnvConfigKey = "cluster_tracing"
// Default values for this Config.
const (
DefaultEnableStats = false
DefaultPrometheusEndpoint = "/ip4/0.0.0.0/tcp/8888"
DefaultPrometheusEndpoint = "/ip4/127.0.0.1/tcp/8888"
DefaultReportingInterval = 2 * time.Second
DefaultEnableTracing = false

View File

@ -23,7 +23,7 @@ import (
// if enabled.
func SetupMetrics(cfg *MetricsConfig) error {
if cfg.EnableStats {
logger.Info("stats collection enabled...")
logger.Infof("stats collection enabled on %s", cfg.PrometheusEndpoint)
return setupMetrics(cfg)
}
return nil

View File

@ -1,10 +1,12 @@
// Package pintracker_test tests the multiple implementations
// of the PinTracker interface.
//
// These tests are legacy from the time when there were several
// pintracker implementations.
package pintracker_test
import (
"context"
"errors"
"sort"
"testing"
"time"
@ -19,88 +21,33 @@ import (
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-core/peer"
rpc "github.com/libp2p/go-libp2p-gorpc"
)
var (
pinCancelCid = test.Cid3
unpinCancelCid = test.Cid2
ErrPinCancelCid = errors.New("should not have received rpc.IPFSPin operation")
ErrUnpinCancelCid = errors.New("should not have received rpc.IPFSUnpin operation")
pinOpts = api.PinOptions{
pinOpts = api.PinOptions{
ReplicationFactorMax: -1,
ReplicationFactorMin: -1,
}
)
type mockIPFS struct{}
func mockRPCClient(t testing.TB) *rpc.Client {
s := rpc.NewServer(nil, "mock")
c := rpc.NewClientWithServer(nil, "mock", s)
err := s.RegisterName("IPFSConnector", &mockIPFS{})
if err != nil {
t.Fatal(err)
}
return c
}
func (mock *mockIPFS) Pin(ctx context.Context, in *api.Pin, out *struct{}) error {
c := in.Cid
switch c.String() {
case test.SlowCid1.String():
time.Sleep(3 * time.Second)
case pinCancelCid.String():
return ErrPinCancelCid
}
return nil
}
func (mock *mockIPFS) PinLsCid(ctx context.Context, in cid.Cid, out *api.IPFSPinStatus) error {
switch in.String() {
case test.Cid1.String(), test.Cid2.String():
*out = api.IPFSPinStatusRecursive
case test.Cid4.String():
*out = api.IPFSPinStatusError
return errors.New("an ipfs error")
default:
*out = api.IPFSPinStatusUnpinned
}
return nil
}
func (mock *mockIPFS) Unpin(ctx context.Context, in *api.Pin, out *struct{}) error {
switch in.Cid.String() {
case test.SlowCid1.String():
time.Sleep(3 * time.Second)
case unpinCancelCid.String():
return ErrUnpinCancelCid
}
return nil
}
func (mock *mockIPFS) PinLs(ctx context.Context, in string, out *map[string]api.IPFSPinStatus) error {
m := map[string]api.IPFSPinStatus{
test.Cid1.String(): api.IPFSPinStatusRecursive,
}
*out = m
return nil
}
var sortPinInfoByCid = func(p []*api.PinInfo) {
sort.Slice(p, func(i, j int) bool {
return p[i].Cid.String() < p[j].Cid.String()
})
}
// prefilledState return a state instance with some pins.
// prefilledState return a state instance with some pins:
// - Cid1 - pin everywhere
// - Cid2 - weird / remote // replication factor set to 0, no allocations
// - Cid3 - remote - this pin is on ipfs
// - Cid4 - pin everywhere - this pin is not on ipfs
func prefilledState(context.Context) (state.ReadOnly, error) {
st, err := dsstate.New(inmem.New(), "", dsstate.DefaultHandle())
if err != nil {
return nil, err
}
remote := api.PinWithOpts(test.Cid4, api.PinOptions{
remote := api.PinWithOpts(test.Cid3, api.PinOptions{
ReplicationFactorMax: 1,
ReplicationFactorMin: 1,
})
@ -109,8 +56,8 @@ func prefilledState(context.Context) (state.ReadOnly, error) {
pins := []*api.Pin{
api.PinWithOpts(test.Cid1, pinOpts),
api.PinCid(test.Cid2),
api.PinWithOpts(test.Cid3, pinOpts),
remote,
api.PinWithOpts(test.Cid4, pinOpts),
}
ctx := context.Background()
@ -123,16 +70,6 @@ func prefilledState(context.Context) (state.ReadOnly, error) {
return st, nil
}
func testSlowStatelessPinTracker(t testing.TB) *stateless.Tracker {
t.Helper()
cfg := &stateless.Config{}
cfg.Default()
spt := stateless.New(cfg, test.PeerID1, test.PeerName1, prefilledState)
spt.SetClient(mockRPCClient(t))
return spt
}
func testStatelessPinTracker(t testing.TB) *stateless.Tracker {
t.Helper()
@ -255,32 +192,12 @@ func TestPinTracker_StatusAll(t *testing.T) {
},
{
Cid: test.Cid3,
Status: api.TrackerStatusPinned,
Status: api.TrackerStatusRemote,
},
{
// in state but not on IPFS
Cid: test.Cid4,
Status: api.TrackerStatusRemote,
},
},
},
{
"slow stateless statusall",
args{
api.PinWithOpts(test.Cid1, pinOpts),
testSlowStatelessPinTracker(t),
},
[]*api.PinInfo{
{
Cid: test.Cid1,
Status: api.TrackerStatusPinned,
},
{
Cid: test.Cid2,
Status: api.TrackerStatusRemote,
},
{
Cid: test.Cid4,
Status: api.TrackerStatusRemote,
Status: api.TrackerStatusPinError,
},
},
},
@ -290,7 +207,7 @@ func TestPinTracker_StatusAll(t *testing.T) {
if err := tt.args.tracker.Track(context.Background(), tt.args.c); err != nil {
t.Errorf("PinTracker.Track() error = %v", err)
}
time.Sleep(1 * time.Second)
time.Sleep(200 * time.Millisecond)
got := tt.args.tracker.StatusAll(context.Background())
if len(got) != len(tt.want) {
for _, pi := range got {
@ -304,7 +221,7 @@ func TestPinTracker_StatusAll(t *testing.T) {
sortPinInfoByCid(tt.want)
for i := range tt.want {
if got[i].Cid.String() != tt.want[i].Cid.String() {
if got[i].Cid != tt.want[i].Cid {
t.Errorf("got: %v\nwant: %v", got, tt.want)
}
if got[i].Status != tt.want[i].Status {
@ -372,23 +289,12 @@ func TestPinTracker_Status(t *testing.T) {
Status: api.TrackerStatusUnpinned,
},
},
{
"slow stateless status",
args{
test.Cid1,
testSlowStatelessPinTracker(t),
},
api.PinInfo{
Cid: test.Cid1,
Status: api.TrackerStatusPinned,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := tt.args.tracker.Status(context.Background(), tt.args.c)
if got.Cid.String() != tt.want.Cid.String() {
if got.Cid != tt.want.Cid {
t.Errorf("PinTracker.Status() = %v, want %v", got.Cid, tt.want.Cid)
}
@ -425,11 +331,15 @@ func TestPinTracker_RecoverAll(t *testing.T) {
},
{
Cid: test.Cid3,
Status: api.TrackerStatusPinned,
Status: api.TrackerStatusRemote,
},
{
// This will recover and status
// is ignored as it could come back as
// queued, pinning or error.
Cid: test.Cid4,
Status: api.TrackerStatusRemote,
Status: api.TrackerStatusPinError,
},
},
false,
@ -447,18 +357,22 @@ func TestPinTracker_RecoverAll(t *testing.T) {
for _, pi := range got {
t.Logf("pinfo: %v", pi)
}
t.Errorf("got len = %d, want = %d", len(got), len(tt.want))
t.FailNow()
t.Fatalf("got len = %d, want = %d", len(got), len(tt.want))
}
sortPinInfoByCid(got)
sortPinInfoByCid(tt.want)
for i := range tt.want {
if got[i].Cid.String() != tt.want[i].Cid.String() {
if got[i].Cid != tt.want[i].Cid {
t.Errorf("\ngot: %v,\nwant: %v", got[i].Cid, tt.want[i].Cid)
}
// Cid4 needs to be recovered, we do not care
// on what status it finds itself.
if got[i].Cid == test.Cid4 {
continue
}
if got[i].Status != tt.want[i].Status {
t.Errorf("for cid: %v:\ngot: %v,\nwant: %v", tt.want[i].Cid, got[i].Status, tt.want[i].Status)
}
@ -499,7 +413,7 @@ func TestPinTracker_Recover(t *testing.T) {
return
}
if got.Cid.String() != tt.want.Cid.String() {
if got.Cid != tt.want.Cid {
t.Errorf("PinTracker.Recover() = %v, want %v", got, tt.want)
}
})
@ -537,7 +451,7 @@ func TestUntrackTrack(t *testing.T) {
t.Fatal(err)
}
time.Sleep(time.Second / 2)
time.Sleep(200 * time.Millisecond)
err = tt.args.tracker.Untrack(context.Background(), tt.args.c)
if err != nil {
@ -559,10 +473,10 @@ func TestTrackUntrackWithCancel(t *testing.T) {
wantErr bool
}{
{
"slow stateless tracker untrack w/ cancel",
"stateless tracker untrack w/ cancel",
args{
test.SlowCid1,
testSlowStatelessPinTracker(t),
testStatelessPinTracker(t),
},
api.PinInfo{
Cid: test.SlowCid1,
@ -579,7 +493,7 @@ func TestTrackUntrackWithCancel(t *testing.T) {
t.Fatal(err)
}
time.Sleep(100 * time.Millisecond) // let pinning start
time.Sleep(200 * time.Millisecond) // let pinning start
pInfo := tt.args.tracker.Status(context.Background(), tt.args.c)
if pInfo.Status == api.TrackerStatusUnpinned {
@ -614,7 +528,7 @@ func TestTrackUntrackWithCancel(t *testing.T) {
func TestPinTracker_RemoteIgnoresError(t *testing.T) {
ctx := context.Background()
testF := func(t *testing.T, pt ipfscluster.PinTracker) {
remoteCid := test.Cid4
remoteCid := test.Cid3
remote := api.PinWithOpts(remoteCid, pinOpts)
remote.Allocations = []peer.ID{test.PeerID2}
@ -628,12 +542,12 @@ func TestPinTracker_RemoteIgnoresError(t *testing.T) {
pi := pt.Status(ctx, remoteCid)
if pi.Status != api.TrackerStatusRemote || pi.Error != "" {
t.Error("Remote pin should not be in error")
t.Error("Remote pin should not be in error", pi.Status, pi.Error)
}
}
t.Run("stateless pintracker", func(t *testing.T) {
pt := testSlowStatelessPinTracker(t)
pt := testStatelessPinTracker(t)
testF(t, pt)
})
}

View File

@ -26,6 +26,9 @@ var logger = logging.Logger("pintracker")
var (
// ErrFullQueue is the error used when pin or unpin operation channel is full.
ErrFullQueue = errors.New("pin/unpin operation queue is full. Try increasing max_pin_queue_size")
// items with this error should be recovered
errUnexpectedlyUnpinned = errors.New("the item should be pinned but it is not")
)
// Tracker uses the optracker.OperationTracker to manage
@ -328,6 +331,7 @@ func (spt *Tracker) Status(ctx context.Context, c cid.Cid) *api.PinInfo {
addError(pinInfo, err)
return pinInfo
}
// The pin IS in the state.
// check if pin is a meta pin
if gpin.Type == api.MetaType {
@ -357,7 +361,16 @@ func (spt *Tracker) Status(ctx context.Context, c cid.Cid) *api.PinInfo {
return pinInfo
}
pinInfo.Status = ips.ToTrackerStatus()
ipfsStatus := ips.ToTrackerStatus()
switch ipfsStatus {
case api.TrackerStatusUnpinned:
// The item is in the state but not in IPFS:
// PinError. Should be pinned.
pinInfo.Status = api.TrackerStatusPinError
pinInfo.Error = errUnexpectedlyUnpinned.Error()
default:
pinInfo.Status = ipfsStatus
}
return pinInfo
}
@ -369,7 +382,7 @@ func (spt *Tracker) RecoverAll(ctx context.Context) ([]*api.PinInfo, error) {
statuses := spt.StatusAll(ctx)
resp := make([]*api.PinInfo, 0)
for _, st := range statuses {
r, err := spt.Recover(ctx, st.Cid)
r, err := spt.recoverWithPinInfo(ctx, st)
if err != nil {
return resp, err
}
@ -378,32 +391,36 @@ func (spt *Tracker) RecoverAll(ctx context.Context) ([]*api.PinInfo, error) {
return resp, nil
}
// Recover will re-track or re-untrack a Cid in error state,
// possibly retriggering an IPFS pinning operation and returning
// only when it is done.
// Recover will trigger pinning or unpinning for items in
// PinError or UnpinError states.
func (spt *Tracker) Recover(ctx context.Context, c cid.Cid) (*api.PinInfo, error) {
ctx, span := trace.StartSpan(ctx, "tracker/stateless/Recover")
defer span.End()
pInfo, ok := spt.optracker.GetExists(ctx, c)
if !ok {
return spt.Status(ctx, c), nil
// Check if we have a status in the operation tracker
pi, ok := spt.optracker.GetExists(ctx, c)
if ok {
return spt.recoverWithPinInfo(ctx, pi)
}
// Get a status by checking against IPFS and use that.
return spt.recoverWithPinInfo(ctx, spt.Status(ctx, c))
}
func (spt *Tracker) recoverWithPinInfo(ctx context.Context, pi *api.PinInfo) (*api.PinInfo, error) {
var err error
switch pInfo.Status {
switch pi.Status {
case api.TrackerStatusPinError:
logger.Infof("Restarting pin operation for %s", c)
err = spt.enqueue(ctx, api.PinCid(c), optracker.OperationPin)
logger.Infof("Restarting pin operation for %s", pi.Cid)
err = spt.enqueue(ctx, api.PinCid(pi.Cid), optracker.OperationPin)
case api.TrackerStatusUnpinError:
logger.Infof("Restarting unpin operation for %s", c)
err = spt.enqueue(ctx, api.PinCid(c), optracker.OperationUnpin)
logger.Infof("Restarting unpin operation for %s", pi.Cid)
err = spt.enqueue(ctx, api.PinCid(pi.Cid), optracker.OperationUnpin)
}
if err != nil {
return spt.Status(ctx, c), err
return spt.Status(ctx, pi.Cid), err
}
return spt.Status(ctx, c), nil
return spt.Status(ctx, pi.Cid), nil
}
func (spt *Tracker) ipfsStatusAll(ctx context.Context) (map[string]*api.PinInfo, error) {
@ -497,15 +514,14 @@ func (spt *Tracker) localStatus(ctx context.Context, incExtra bool) (map[string]
case pinnedInIpfs:
pininfos[pCid] = ipfsInfo
default:
// report as undefined for this peer. this will be
// report as PIN_ERROR for this peer. this will be
// overwritten if the operation tracker has more info
// for this. Otherwise, this is a problem: a pin in
// the state that should be pinned by this peer but
// which no operation is handling.
// TODO (hector): Consider a pinError so it can be
// recovered?
pinInfo.Status = api.TrackerStatusUndefined
// for this (an ongoing pinning operation). Otherwise,
// it means something should be pinned and it is not
// known by IPFS. Should be handled to "recover".
pinInfo.Status = api.TrackerStatusPinError
pinInfo.Error = errUnexpectedlyUnpinned.Error()
pininfos[pCid] = pinInfo
}
}
return pininfos, nil

View File

@ -20,17 +20,69 @@ import (
var (
pinCancelCid = test.Cid3
unpinCancelCid = test.Cid2
ErrPinCancelCid = errors.New("should not have received rpc.IPFSPin operation")
ErrUnpinCancelCid = errors.New("should not have received rpc.IPFSUnpin operation")
errPinCancelCid = errors.New("should not have received rpc.IPFSPin operation")
errUnpinCancelCid = errors.New("should not have received rpc.IPFSUnpin operation")
pinOpts = api.PinOptions{
ReplicationFactorMax: -1,
ReplicationFactorMin: -1,
}
)
type mockIPFS struct{}
// func TestMain(m *testing.M) {
// logging.SetLogLevel("pintracker", "debug")
// os.Exit(m.Run())
// }
// Overwrite Pin and Unpin methods on the normal mock in order to return
// special errors when unwanted operations have been triggered.
type mockIPFS struct {
}
func (mock *mockIPFS) Pin(ctx context.Context, in *api.Pin, out *struct{}) error {
switch in.Cid {
case pinCancelCid:
return errPinCancelCid
case test.SlowCid1:
time.Sleep(time.Second)
}
return nil
}
func (mock *mockIPFS) Unpin(ctx context.Context, in *api.Pin, out *struct{}) error {
switch in.Cid {
case unpinCancelCid:
return errUnpinCancelCid
case test.SlowCid1:
time.Sleep(time.Second)
}
return nil
}
func (mock *mockIPFS) PinLs(ctx context.Context, in string, out *map[string]api.IPFSPinStatus) error {
// Must be consistent with PinLsCid
m := map[string]api.IPFSPinStatus{
test.Cid1.String(): api.IPFSPinStatusRecursive,
test.Cid2.String(): api.IPFSPinStatusRecursive,
}
*out = m
return nil
}
func (mock *mockIPFS) PinLsCid(ctx context.Context, in cid.Cid, out *api.IPFSPinStatus) error {
switch in {
case test.Cid1, test.Cid2:
*out = api.IPFSPinStatusRecursive
default:
*out = api.IPFSPinStatusUnpinned
return nil
}
return nil
}
func mockRPCClient(t testing.TB) *rpc.Client {
t.Helper()
func mockRPCClient(t *testing.T) *rpc.Client {
s := rpc.NewServer(nil, "mock")
c := rpc.NewClientWithServer(nil, "mock", s)
@ -41,80 +93,38 @@ func mockRPCClient(t *testing.T) *rpc.Client {
return c
}
func (mock *mockIPFS) Pin(ctx context.Context, in *api.Pin, out *struct{}) error {
switch in.Cid.String() {
case test.SlowCid1.String():
time.Sleep(2 * time.Second)
case pinCancelCid.String():
return ErrPinCancelCid
func getStateFunc(t testing.TB, items ...*api.Pin) func(context.Context) (state.ReadOnly, error) {
t.Helper()
ctx := context.Background()
st, err := dsstate.New(inmem.New(), "", dsstate.DefaultHandle())
if err != nil {
t.Fatal(err)
}
return nil
for _, item := range items {
err := st.Add(ctx, item)
if err != nil {
t.Fatal(err)
}
}
return func(ctx context.Context) (state.ReadOnly, error) {
return st, nil
}
}
func (mock *mockIPFS) Unpin(ctx context.Context, in *api.Pin, out *struct{}) error {
switch in.Cid.String() {
case test.SlowCid1.String():
time.Sleep(2 * time.Second)
case unpinCancelCid.String():
return ErrUnpinCancelCid
}
return nil
}
func (mock *mockIPFS) PinLs(ctx context.Context, in string, out *map[string]api.IPFSPinStatus) error {
m := map[string]api.IPFSPinStatus{
test.Cid1.String(): api.IPFSPinStatusRecursive,
}
*out = m
return nil
}
func (mock *mockIPFS) PinLsCid(ctx context.Context, in cid.Cid, out *api.IPFSPinStatus) error {
switch in.String() {
case test.Cid1.String(), test.Cid2.String():
*out = api.IPFSPinStatusRecursive
default:
*out = api.IPFSPinStatusUnpinned
}
return nil
}
func testSlowStatelessPinTracker(t *testing.T) *Tracker {
func testStatelessPinTracker(t testing.TB, pins ...*api.Pin) *Tracker {
t.Helper()
cfg := &Config{}
cfg.Default()
cfg.ConcurrentPins = 1
st, err := dsstate.New(inmem.New(), "", dsstate.DefaultHandle())
if err != nil {
t.Fatal(err)
}
getState := func(ctx context.Context) (state.ReadOnly, error) {
return st, nil
}
spt := New(cfg, test.PeerID1, test.PeerName1, getState)
spt := New(cfg, test.PeerID1, test.PeerName1, getStateFunc(t, pins...))
spt.SetClient(mockRPCClient(t))
return spt
}
func testStatelessPinTracker(t testing.TB) *Tracker {
t.Helper()
cfg := &Config{}
cfg.Default()
cfg.ConcurrentPins = 1
st, err := dsstate.New(inmem.New(), "", dsstate.DefaultHandle())
if err != nil {
t.Fatal(err)
}
getState := func(ctx context.Context) (state.ReadOnly, error) {
return st, nil
}
spt := New(cfg, test.PeerID1, test.PeerName1, getState)
spt.SetClient(test.NewMockRPCClient(t))
return spt
}
func TestStatelessPinTracker_New(t *testing.T) {
ctx := context.Background()
spt := testStatelessPinTracker(t)
@ -159,7 +169,7 @@ func TestUntrackTrack(t *testing.T) {
func TestTrackUntrackWithCancel(t *testing.T) {
ctx := context.Background()
spt := testSlowStatelessPinTracker(t)
spt := testStatelessPinTracker(t)
defer spt.Shutdown(ctx)
slowPinCid := test.SlowCid1
@ -167,27 +177,27 @@ func TestTrackUntrackWithCancel(t *testing.T) {
// LocalPin
slowPin := api.PinWithOpts(slowPinCid, pinOpts)
err := spt.Track(context.Background(), slowPin)
err := spt.Track(ctx, slowPin)
if err != nil {
t.Fatal(err)
}
time.Sleep(100 * time.Millisecond) // let pinning start
pInfo := spt.optracker.Get(context.Background(), slowPin.Cid)
pInfo := spt.optracker.Get(ctx, slowPin.Cid)
if pInfo.Status == api.TrackerStatusUnpinned {
t.Fatal("slowPin should be tracked")
}
if pInfo.Status == api.TrackerStatusPinning {
go func() {
err = spt.Untrack(context.Background(), slowPinCid)
err = spt.Untrack(ctx, slowPinCid)
if err != nil {
t.Fatal(err)
}
}()
select {
case <-spt.optracker.OpContext(context.Background(), slowPinCid).Done():
case <-spt.optracker.OpContext(ctx, slowPinCid).Done():
return
case <-time.Tick(100 * time.Millisecond):
t.Errorf("operation context should have been cancelled by now")
@ -204,7 +214,7 @@ func TestTrackUntrackWithCancel(t *testing.T) {
// cancelling of the pinning operation happens (unlike on WithCancel).
func TestTrackUntrackWithNoCancel(t *testing.T) {
ctx := context.Background()
spt := testSlowStatelessPinTracker(t)
spt := testStatelessPinTracker(t)
defer spt.Shutdown(ctx)
slowPinCid := test.SlowCid1
@ -216,7 +226,7 @@ func TestTrackUntrackWithNoCancel(t *testing.T) {
// LocalPin
fastPin := api.PinWithOpts(fastPinCid, pinOpts)
err := spt.Track(context.Background(), slowPin)
err := spt.Track(ctx, slowPin)
if err != nil {
t.Fatal(err)
}
@ -224,18 +234,18 @@ func TestTrackUntrackWithNoCancel(t *testing.T) {
// Otherwise fails when running with -race
time.Sleep(300 * time.Millisecond)
err = spt.Track(context.Background(), fastPin)
err = spt.Track(ctx, fastPin)
if err != nil {
t.Fatal(err)
}
// fastPin should be queued because slow pin is pinning
fastPInfo := spt.optracker.Get(context.Background(), fastPin.Cid)
fastPInfo := spt.optracker.Get(ctx, fastPin.Cid)
if fastPInfo.Status == api.TrackerStatusUnpinned {
t.Fatal("fastPin should be tracked")
}
if fastPInfo.Status == api.TrackerStatusPinQueued {
err = spt.Untrack(context.Background(), fastPinCid)
err = spt.Untrack(ctx, fastPinCid)
if err != nil {
t.Fatal(err)
}
@ -247,7 +257,7 @@ func TestTrackUntrackWithNoCancel(t *testing.T) {
t.Errorf("fastPin should be queued to pin but is %s", fastPInfo.Status)
}
pi := spt.optracker.Get(context.Background(), fastPin.Cid)
pi := spt.optracker.Get(ctx, fastPin.Cid)
if pi.Cid == cid.Undef {
t.Error("fastPin should have been removed from tracker")
}
@ -255,7 +265,7 @@ func TestTrackUntrackWithNoCancel(t *testing.T) {
func TestUntrackTrackWithCancel(t *testing.T) {
ctx := context.Background()
spt := testSlowStatelessPinTracker(t)
spt := testStatelessPinTracker(t)
defer spt.Shutdown(ctx)
slowPinCid := test.SlowCid1
@ -263,7 +273,7 @@ func TestUntrackTrackWithCancel(t *testing.T) {
// LocalPin
slowPin := api.PinWithOpts(slowPinCid, pinOpts)
err := spt.Track(context.Background(), slowPin)
err := spt.Track(ctx, slowPin)
if err != nil {
t.Fatal(err)
}
@ -272,27 +282,27 @@ func TestUntrackTrackWithCancel(t *testing.T) {
// Untrack should cancel the ongoing request
// and unpin right away
err = spt.Untrack(context.Background(), slowPinCid)
err = spt.Untrack(ctx, slowPinCid)
if err != nil {
t.Fatal(err)
}
time.Sleep(100 * time.Millisecond)
pi := spt.optracker.Get(context.Background(), slowPin.Cid)
pi := spt.optracker.Get(ctx, slowPin.Cid)
if pi.Cid == cid.Undef {
t.Fatal("expected slowPin to be tracked")
}
if pi.Status == api.TrackerStatusUnpinning {
go func() {
err = spt.Track(context.Background(), slowPin)
err = spt.Track(ctx, slowPin)
if err != nil {
t.Fatal(err)
}
}()
select {
case <-spt.optracker.OpContext(context.Background(), slowPinCid).Done():
case <-spt.optracker.OpContext(ctx, slowPinCid).Done():
return
case <-time.Tick(100 * time.Millisecond):
t.Errorf("operation context should have been cancelled by now")
@ -317,35 +327,35 @@ func TestUntrackTrackWithNoCancel(t *testing.T) {
// LocalPin
fastPin := api.PinWithOpts(fastPinCid, pinOpts)
err := spt.Track(context.Background(), slowPin)
err := spt.Track(ctx, slowPin)
if err != nil {
t.Fatal(err)
}
err = spt.Track(context.Background(), fastPin)
err = spt.Track(ctx, fastPin)
if err != nil {
t.Fatal(err)
}
time.Sleep(3 * time.Second)
err = spt.Untrack(context.Background(), slowPin.Cid)
err = spt.Untrack(ctx, slowPin.Cid)
if err != nil {
t.Fatal(err)
}
err = spt.Untrack(context.Background(), fastPin.Cid)
err = spt.Untrack(ctx, fastPin.Cid)
if err != nil {
t.Fatal(err)
}
pi := spt.optracker.Get(context.Background(), fastPin.Cid)
pi := spt.optracker.Get(ctx, fastPin.Cid)
if pi.Cid == cid.Undef {
t.Fatal("c untrack operation should be tracked")
}
if pi.Status == api.TrackerStatusUnpinQueued {
err = spt.Track(context.Background(), fastPin)
err = spt.Track(ctx, fastPin)
if err != nil {
t.Fatal(err)
}
@ -359,6 +369,102 @@ func TestUntrackTrackWithNoCancel(t *testing.T) {
}
}
// TestStatusAll checks that StatusAll correctly reports tracked
// items and mismatches between what's on IPFS and on the state.
func TestStatusAll(t *testing.T) {
ctx := context.Background()
normalPin := api.PinWithOpts(test.Cid1, pinOpts)
normalPin2 := api.PinWithOpts(test.Cid4, pinOpts)
// - Build a state with one pins (Cid1,Cid4)
// - The IPFS Mock reports Cid1 and Cid2
// - Track a SlowCid additionally
spt := testStatelessPinTracker(t, normalPin, normalPin2)
defer spt.Shutdown(ctx)
slowPin := api.PinWithOpts(test.SlowCid1, pinOpts)
err := spt.Track(ctx, slowPin)
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Second / 2)
// Needs to return:
// * A slow CID pinning
// * Cid1 is pinned
// * Cid4 should be in PinError (it's in the state but not on IPFS)
stAll := spt.StatusAll(ctx)
if len(stAll) != 3 {
t.Errorf("wrong status length. Expected 3, got: %d", len(stAll))
}
for _, pi := range stAll {
switch pi.Cid {
case test.Cid1:
if pi.Status != api.TrackerStatusPinned {
t.Error("cid1 should be pinned")
}
case test.Cid4:
if pi.Status != api.TrackerStatusPinError {
t.Error("cid2 should be in pin_error status")
}
case test.SlowCid1:
if pi.Status != api.TrackerStatusPinning {
t.Error("slowCid1 should be pinning")
}
default:
t.Error("Unexpected pin:", pi.Cid)
}
}
}
// TestStatus checks that the Status calls correctly reports tracked
// items and mismatches between what's on IPFS and on the state.
func TestStatus(t *testing.T) {
ctx := context.Background()
normalPin := api.PinWithOpts(test.Cid1, pinOpts)
normalPin2 := api.PinWithOpts(test.Cid4, pinOpts)
// - Build a state with one pins (Cid1,Cid4)
// - The IPFS Mock reports Cid1 and Cid2
// - Track a SlowCid additionally
spt := testStatelessPinTracker(t, normalPin, normalPin2)
defer spt.Shutdown(ctx)
slowPin := api.PinWithOpts(test.SlowCid1, pinOpts)
err := spt.Track(ctx, slowPin)
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Second / 2)
// Status needs to return:
// * For slowCid1: A slow CID pinning
// * For Cid1: pinned
// * For Cid4: pin error
st := spt.Status(ctx, test.Cid1)
if st.Status != api.TrackerStatusPinned {
t.Error("cid1 should be pinned")
}
st = spt.Status(ctx, test.Cid4)
if st.Status != api.TrackerStatusPinError {
t.Error("cid2 should be in pin_error status")
}
st = spt.Status(ctx, test.SlowCid1)
if st.Status != api.TrackerStatusPinning {
t.Error("slowCid1 should be pinning")
}
}
var sortPinInfoByCid = func(p []*api.PinInfo) {
sort.Slice(p, func(i, j int) bool {
return p[i].Cid.String() < p[j].Cid.String()
@ -367,8 +473,9 @@ var sortPinInfoByCid = func(p []*api.PinInfo) {
func BenchmarkTracker_localStatus(b *testing.B) {
tracker := testStatelessPinTracker(b)
ctx := context.Background()
b.ResetTimer()
for i := 0; i < b.N; i++ {
tracker.localStatus(context.Background(), true)
tracker.localStatus(ctx, true)
}
}

View File

@ -19,41 +19,41 @@ test_expect_success IPFS,CLUSTER "add files locally and compare with ipfs" '
docker cp smallfile.bin ipfs:/tmp/smallfile.bin
docker cp testFolder ipfs:/tmp/testFolder
ipfs-cluster-ctl add --quiet smallfile.bin > cidscluster.txt
ipfs-cluster-ctl add --quiet -w smallfile.bin >> cidscluster.txt
ipfs-cluster-ctl add smallfile.bin > cidscluster.txt
ipfs-cluster-ctl add -w smallfile.bin >> cidscluster.txt
ipfs-cluster-ctl add --quiet --raw-leaves -w smallfile.bin >> cidscluster.txt
ipfs-cluster-ctl add --quiet --raw-leaves smallfile.bin >> cidscluster.txt
ipfs-cluster-ctl add --raw-leaves -w smallfile.bin >> cidscluster.txt
ipfs-cluster-ctl add --raw-leaves smallfile.bin >> cidscluster.txt
ipfs-cluster-ctl add --quiet bigfile.bin >> cidscluster.txt
ipfs-cluster-ctl add --quiet --layout trickle bigfile.bin >> cidscluster.txt
ipfs-cluster-ctl add --quiet -w bigfile.bin >> cidscluster.txt
ipfs-cluster-ctl add --quiet --raw-leaves -w bigfile.bin >> cidscluster.txt
ipfs-cluster-ctl add --quiet --raw-leaves bigfile.bin >> cidscluster.txt
ipfs-cluster-ctl add bigfile.bin >> cidscluster.txt
ipfs-cluster-ctl add --layout trickle bigfile.bin >> cidscluster.txt
ipfs-cluster-ctl add -w bigfile.bin >> cidscluster.txt
ipfs-cluster-ctl add --raw-leaves -w bigfile.bin >> cidscluster.txt
ipfs-cluster-ctl add --raw-leaves bigfile.bin >> cidscluster.txt
ipfs-cluster-ctl add --quiet -r testFolder >> cidscluster.txt
ipfs-cluster-ctl add --quiet -r -w testFolder >> cidscluster.txt
ipfs-cluster-ctl add -r testFolder >> cidscluster.txt
ipfs-cluster-ctl add -r -w testFolder >> cidscluster.txt
ipfs-cluster-ctl add --quiet --cid-version 1 -r testFolder >> cidscluster.txt
ipfs-cluster-ctl add --quiet --hash sha3-512 -r testFolder >> cidscluster.txt
ipfs-cluster-ctl add --cid-version 1 -r testFolder >> cidscluster.txt
ipfs-cluster-ctl add --hash sha3-512 -r testFolder >> cidscluster.txt
ipfsCmd add --quiet /tmp/smallfile.bin > cidsipfs.txt
ipfsCmd add --quiet -w /tmp/smallfile.bin >> cidsipfs.txt
ipfsCmd add /tmp/smallfile.bin > cidsipfs.txt
ipfsCmd add -w /tmp/smallfile.bin >> cidsipfs.txt
ipfsCmd add --quiet --raw-leaves -w /tmp/smallfile.bin >> cidsipfs.txt
ipfsCmd add --quiet --raw-leaves /tmp/smallfile.bin >> cidsipfs.txt
ipfsCmd add --raw-leaves -w /tmp/smallfile.bin >> cidsipfs.txt
ipfsCmd add --raw-leaves /tmp/smallfile.bin >> cidsipfs.txt
ipfsCmd add --quiet /tmp/bigfile.bin >> cidsipfs.txt
ipfsCmd add --quiet --trickle /tmp/bigfile.bin >> cidsipfs.txt
ipfsCmd add --quiet -w /tmp/bigfile.bin >> cidsipfs.txt
ipfsCmd add --quiet --raw-leaves -w /tmp/bigfile.bin >> cidsipfs.txt
ipfsCmd add --quiet --raw-leaves /tmp/bigfile.bin >> cidsipfs.txt
ipfsCmd add /tmp/bigfile.bin >> cidsipfs.txt
ipfsCmd add --trickle /tmp/bigfile.bin >> cidsipfs.txt
ipfsCmd add -w /tmp/bigfile.bin >> cidsipfs.txt
ipfsCmd add --raw-leaves -w /tmp/bigfile.bin >> cidsipfs.txt
ipfsCmd add --raw-leaves /tmp/bigfile.bin >> cidsipfs.txt
ipfsCmd add --quiet -r /tmp/testFolder >> cidsipfs.txt
ipfsCmd add --quiet -r -w /tmp/testFolder >> cidsipfs.txt
ipfsCmd add -r /tmp/testFolder >> cidsipfs.txt
ipfsCmd add -r -w /tmp/testFolder >> cidsipfs.txt
ipfsCmd add --quiet --cid-version 1 -r /tmp/testFolder >> cidsipfs.txt
ipfsCmd add --quiet --hash sha3-512 -r /tmp/testFolder >> cidsipfs.txt
ipfsCmd add --cid-version 1 -r /tmp/testFolder >> cidsipfs.txt
ipfsCmd add --hash sha3-512 -r /tmp/testFolder >> cidsipfs.txt
test_cmp cidscluster.txt cidsipfs.txt
'

View File

@ -443,10 +443,18 @@ func (mock *mockPeerMonitor) MetricNames(ctx context.Context, in struct{}, out *
/* IPFSConnector methods */
func (mock *mockIPFSConnector) Pin(ctx context.Context, in *api.Pin, out *struct{}) error {
switch in.Cid {
case SlowCid1:
time.Sleep(2 * time.Second)
}
return nil
}
func (mock *mockIPFSConnector) Unpin(ctx context.Context, in *api.Pin, out *struct{}) error {
switch in.Cid {
case SlowCid1:
time.Sleep(2 * time.Second)
}
return nil
}

View File

@ -9,7 +9,7 @@ import (
// Version is the current cluster version. Version alignment between
// components, apis and tools ensures compatibility among them.
var Version = semver.MustParse("0.11.0")
var Version = semver.MustParse("0.12.0")
// RPCProtocol is used to send libp2p messages between cluster peers
var RPCProtocol = protocol.ID(