Merge branch 'master' of github.com:ipfs/ipfs-cluster into issue_453

This commit is contained in:
Kishan Sagathiya 2018-11-03 20:24:15 +05:30
commit 411eea664c
71 changed files with 1126 additions and 551 deletions

4
.gitignore vendored
View File

@ -2,15 +2,17 @@ tag_annotation
coverage.out
cmd/ipfs-cluster-service/ipfs-cluster-service
cmd/ipfs-cluster-ctl/ipfs-cluster-ctl
deptools
deptools/gx*
sharness/lib/sharness
sharness/test-results
sharness/trash*
vendor/
raftFolderFromTest*
peerstore
shardTesting
compose
# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o

View File

@ -1 +1 @@
0.6.0: QmUY1LMFsjmVn1prwerDFJyGpbatfbABj7xGmSCQTL56bM
0.7.0: QmbGeAqG8wWUqBggxHRCP5ErJGMJHm8SihCfurMmxRU4za

View File

@ -39,14 +39,15 @@ jobs:
- go test -v . -tracker stateless
- name: "Golint and go vet"
script:
- go get -u github.com/golang/lint/golint
- go get -u golang.org/x/lint/golint
- make deps
- make check
- make service
- make ctl
- name: "Docker build"
- name: "Docker and Compose build"
script:
- make docker
- make docker-compose
- name: "Sharness"
script:
- sudo apt-get update
@ -60,17 +61,16 @@ jobs:
if: (NOT type IN (pull_request)) AND (fork = false) AND (tag =~ ^v\d+\.\d+\.\d+$)
script:
- openssl aes-256-cbc -K $encrypted_5a1cb914c6c9_key -iv $encrypted_5a1cb914c6c9_iv -in .snapcraft/travis_snapcraft.cfg -out .snapcraft/snapcraft.cfg -d
- docker run -v $(pwd):$(pwd) -t snapcore/snapcraft sh -c "apt update -qq && cd $(pwd) && ./snap/snap-multiarch.sh stable"
- docker run -v $(pwd):$(pwd) -t snapcore/snapcraft sh -c "apt update -qq && cd $(pwd) && ./snap/snap-multiarch.sh edge" # should be stable
- stage: "Snapcraft deployment stage (Candidate)"
name: "Deploy Snapcraft"
if: (NOT type IN (pull_request)) AND (fork = false) AND (tag =~ ^v\d+\.\d+\.\d+-rc\d+$)
script:
- openssl aes-256-cbc -K $encrypted_5a1cb914c6c9_key -iv $encrypted_5a1cb914c6c9_iv -in .snapcraft/travis_snapcraft.cfg -out .snapcraft/snapcraft.cfg -d
- docker run -v $(pwd):$(pwd) -t snapcore/snapcraft sh -c "apt update -qq && cd $(pwd) && ./snap/snap-multiarch.sh candidate"
- docker run -v $(pwd):$(pwd) -t snapcore/snapcraft sh -c "apt update -qq && cd $(pwd) && ./snap/snap-multiarch.sh edge" # should be candidate
- stage: "Snapcraft deployment stage (Edge)"
name: "Deploy Snapcraft"
if: (NOT type IN (pull_request)) AND (branch = master) AND (fork = false) AND (tag IS NOT present)
script:
- openssl aes-256-cbc -K $encrypted_5a1cb914c6c9_key -iv $encrypted_5a1cb914c6c9_iv -in .snapcraft/travis_snapcraft.cfg -out .snapcraft/snapcraft.cfg -d
- docker run -v $(pwd):$(pwd) -t snapcore/snapcraft sh -c "apt update -qq && cd $(pwd) && ./snap/snap-multiarch.sh edge"

View File

@ -1,5 +1,86 @@
# IPFS Cluster Changelog
### v0.7.0 - 2018-11-01
#### Summary
IPFS version 0.7.0 is a maintenance release that includes a few bugfixes and some small features.
Note that the REST API response format for the `/add` endpoint has changed. Thus all clients need to be upgraded to deal with the new format. The `rest/api/client` has been accordingly updated.
#### List of changes
##### Features
* Clean (rotate) the state when running `init` | [ipfs/ipfs-cluster#532](https://github.com/ipfs/ipfs-cluster/issues/532) | [ipfs/ipfs-cluster#553](https://github.com/ipfs/ipfs-cluster/issues/553)
* Configurable REST API headers and CORS defaults | [ipfs/ipfs-cluster#578](https://github.com/ipfs/ipfs-cluster/issues/578)
* Upgrade libp2p and other deps | [ipfs/ipfs-cluster#580](https://github.com/ipfs/ipfs-cluster/issues/580) | [ipfs/ipfs-cluster#590](https://github.com/ipfs/ipfs-cluster/issues/590) | [ipfs/ipfs-cluster#592](https://github.com/ipfs/ipfs-cluster/issues/592) | [ipfs/ipfs-cluster#598](https://github.com/ipfs/ipfs-cluster/issues/598) | [ipfs/ipfs-cluster#599](https://github.com/ipfs/ipfs-cluster/issues/599)
* Use `gossipsub` to broadcast metrics | [ipfs/ipfs-cluster#573](https://github.com/ipfs/ipfs-cluster/issues/573)
* Download gx and gx-go from IPFS preferentially | [ipfs/ipfs-cluster#577](https://github.com/ipfs/ipfs-cluster/issues/577) | [ipfs/ipfs-cluster#581](https://github.com/ipfs/ipfs-cluster/issues/581)
* Expose peer metrics in the API + ctl commands | [ipfs/ipfs-cluster#449](https://github.com/ipfs/ipfs-cluster/issues/449) | [ipfs/ipfs-cluster#572](https://github.com/ipfs/ipfs-cluster/issues/572) | [ipfs/ipfs-cluster#589](https://github.com/ipfs/ipfs-cluster/issues/589) | [ipfs/ipfs-cluster#587](https://github.com/ipfs/ipfs-cluster/issues/587)
* Add a `docker-compose.yml` template, which creates a two peer cluster | [ipfs/ipfs-cluster#585](https://github.com/ipfs/ipfs-cluster/issues/585) | [ipfs/ipfs-cluster#588](https://github.com/ipfs/ipfs-cluster/issues/588)
* Support overwriting configuration values in the `cluster` section with environmental values | [ipfs/ipfs-cluster#575](https://github.com/ipfs/ipfs-cluster/issues/575) | [ipfs/ipfs-cluster#596](https://github.com/ipfs/ipfs-cluster/issues/596)
* Set snaps to `classic` confinement mode and revert it since approval never arrived | [ipfs/ipfs-cluster#579](https://github.com/ipfs/ipfs-cluster/issues/579) | [ipfs/ipfs-cluster#594](https://github.com/ipfs/ipfs-cluster/issues/594)
* Use Go's reverse proxy library in the proxy endpoint | [ipfs/ipfs-cluster#570](https://github.com/ipfs/ipfs-cluster/issues/570) | [ipfs/ipfs-cluster#605](https://github.com/ipfs/ipfs-cluster/issues/605)
##### Bug fixes
* `/add` endpoints improvements and IPFS Companion compatiblity | [ipfs/ipfs-cluster#582](https://github.com/ipfs/ipfs-cluster/issues/582) | [ipfs/ipfs-cluster#569](https://github.com/ipfs/ipfs-cluster/issues/569)
* Fix adding with spaces in the name parameter | [ipfs/ipfs-cluster#583](https://github.com/ipfs/ipfs-cluster/issues/583)
* Escape filter query parameter | [ipfs/ipfs-cluster#586](https://github.com/ipfs/ipfs-cluster/issues/586)
* Fix some race conditions | [ipfs/ipfs-cluster#597](https://github.com/ipfs/ipfs-cluster/issues/597)
* Improve pin deserialization efficiency | [ipfs/ipfs-cluster#601](https://github.com/ipfs/ipfs-cluster/issues/601)
* Do not error remote pins | [ipfs/ipfs-cluster#600](https://github.com/ipfs/ipfs-cluster/issues/600) | [ipfs/ipfs-cluster#603](https://github.com/ipfs/ipfs-cluster/issues/603)
* Clean up testing folders in `rest` and `rest/client` after tests | [ipfs/ipfs-cluster#607](https://github.com/ipfs/ipfs-cluster/issues/607)
#### Upgrading notices
##### Configuration changes
The configurations from previous versions are compatible, but a new `headers` key has been added to the `restapi` section. By default it gets CORS headers which will allow read-only interaction from any origin.
Additionally, all fields from the main `cluster` configuration section can now be overwrriten with environment variables. i.e. `CLUSTER_SECRET`, or `CLUSTER_DISABLEREPINNING`.
##### REST API
The `/add` endpoint stream now returns different objects, in line with the rest of the API types.
Before:
```
type AddedOutput struct {
Error
Name string
Hash string `json:",omitempty"`
Bytes int64 `json:",omitempty"`
Size string `json:",omitempty"`
}
```
Now:
```
type AddedOutput struct {
Name string `json:"name"`
Cid string `json:"cid,omitempty"`
Bytes uint64 `json:"bytes,omitempty"`
Size uint64 `json:"size,omitempty"`
}
```
The `/add` endpoint no longer reports errors as part of an AddedOutput object, but instead it uses trailer headers (same as `go-ipfs`). They are handled in the `client`.
##### Go APIs
The `AddedOutput` object has changed, thus the `api/rest/client` from older versions will not work with this one.
##### Other
No other things.
---
### v0.6.0 - 2018-10-03
#### Summary

View File

@ -1,14 +1,7 @@
gx_version=v0.13.0
gx-go_version=v1.8.0
deptools=deptools
gx=gx_$(gx_version)
gx-go=gx-go_$(gx-go_version)
gx_bin=$(deptools)/$(gx)
gx-go_bin=$(deptools)/$(gx-go)
bin_env=$(shell go env GOHOSTOS)-$(shell go env GOHOSTARCH)
sharness = sharness/lib/sharness
gx=$(deptools)/gx
gx-go=$(deptools)/gx-go
# For debugging
problematic_test = TestClustersReplicationRealloc
@ -21,9 +14,6 @@ clean: rwundo clean_sharness
$(MAKE) -C cmd/ipfs-cluster-ctl clean
@rm -rf ./test/testingData
gx-clean: clean
@rm -f $(deptools)/*
install: deps
$(MAKE) -C cmd/ipfs-cluster-service install
$(MAKE) -C cmd/ipfs-cluster-ctl install
@ -42,38 +32,22 @@ service: deps
ctl: deps
$(MAKE) -C cmd/ipfs-cluster-ctl ipfs-cluster-ctl
$(gx_bin):
@echo "Downloading gx"
mkdir -p ./$(deptools)
rm -f $(deptools)/gx
wget -nc -O $(gx_bin).tgz https://dist.ipfs.io/gx/$(gx_version)/$(gx)_$(bin_env).tar.gz
tar -zxf $(gx_bin).tgz -C $(deptools) --strip-components=1 gx/gx
mv $(deptools)/gx $(gx_bin)
ln -s $(gx) $(deptools)/gx
rm $(gx_bin).tgz
gx-clean: clean
$(MAKE) -C $(deptools) gx-clean
$(gx-go_bin):
@echo "Downloading gx-go"
mkdir -p ./$(deptools)
rm -f $(deptools)/gx-go
wget -nc -O $(gx-go_bin).tgz https://dist.ipfs.io/gx-go/$(gx-go_version)/$(gx-go)_$(bin_env).tar.gz
tar -zxf $(gx-go_bin).tgz -C $(deptools) --strip-components=1 gx-go/gx-go
mv $(deptools)/gx-go $(gx-go_bin)
ln -s $(gx-go) $(deptools)/gx-go
rm $(gx-go_bin).tgz
gx: $(gx_bin) $(gx-go_bin)
gx:
$(MAKE) -C $(deptools) gx
deps: gx
$(gx_bin) install --global
$(gx-go_bin) rewrite
$(gx) install --global
$(gx-go) rewrite
# Run this target before building the docker image
# and then gx won't attempt to pull all deps
# from the network each time
docker_deps: gx
$(gx_bin) install --local
$(gx-go_bin) rewrite
$(gx) install --local
$(gx-go) rewrite
check:
go vet ./...
@ -101,11 +75,11 @@ clean_sharness:
@rm -rf sharness/trash\ directory*
rw: gx
$(gx-go_bin) rewrite
$(gx-go) rewrite
rwundo: gx
$(gx-go_bin) rewrite --undo
$(gx-go) rewrite --undo
publish: rwundo
$(gx_bin) publish
$(gx) publish
docker:
docker build -t cluster-image -f Dockerfile .
@ -119,6 +93,14 @@ docker:
docker exec tmp-make-cluster-test sh -c "ipfs-cluster-service -v"
docker kill tmp-make-cluster-test
docker-compose:
CLUSTER_SECRET=$(shell od -vN 32 -An -tx1 /dev/urandom | tr -d ' \n') docker-compose up -d
sleep 20
docker exec cluster0 ipfs-cluster-ctl peers ls | grep -o "Sees 1 other peers" | uniq -c | grep 2
docker exec cluster1 ipfs-cluster-ctl peers ls | grep -o "Sees 1 other peers" | uniq -c | grep 2
docker-compose down
prcheck: deps check service ctl test
.PHONY: all gx deps test test_sharness clean_sharness rw rwundo publish service ctl install clean gx-clean docker

View File

@ -13,7 +13,7 @@ import (
"github.com/ipfs/ipfs-cluster/api"
cid "github.com/ipfs/go-cid"
files "github.com/ipfs/go-ipfs-cmdkit/files"
files "github.com/ipfs/go-ipfs-files"
ipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log"
merkledag "github.com/ipfs/go-merkledag"

View File

@ -13,9 +13,9 @@ import (
"github.com/ipfs/ipfs-cluster/adder/sharding"
"github.com/ipfs/ipfs-cluster/api"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
cid "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
rpc "github.com/libp2p/go-libp2p-gorpc"
)
var logger = logging.Logger("adder")

View File

@ -11,7 +11,7 @@ import (
cid "github.com/ipfs/go-cid"
chunker "github.com/ipfs/go-ipfs-chunker"
files "github.com/ipfs/go-ipfs-cmdkit/files"
files "github.com/ipfs/go-ipfs-files"
posinfo "github.com/ipfs/go-ipfs-posinfo"
ipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log"
@ -91,6 +91,8 @@ func (adder *Adder) add(reader io.Reader) (ipld.Node, error) {
return nil, err
}
// Cluster: we don't do batching.
params := ihelper.DagBuilderParams{
Dagserv: adder.dagService,
RawLeaves: adder.RawLeaves,

View File

@ -9,10 +9,10 @@ import (
adder "github.com/ipfs/ipfs-cluster/adder"
"github.com/ipfs/ipfs-cluster/api"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
cid "github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log"
rpc "github.com/libp2p/go-libp2p-gorpc"
peer "github.com/libp2p/go-libp2p-peer"
)

View File

@ -11,7 +11,7 @@ import (
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/test"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
rpc "github.com/libp2p/go-libp2p-gorpc"
)
type testRPC struct {

View File

@ -23,12 +23,12 @@ import (
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/rpcutil"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
ipld "github.com/ipfs/go-ipld-format"
dag "github.com/ipfs/go-merkledag"
rpc "github.com/libp2p/go-libp2p-gorpc"
peer "github.com/libp2p/go-libp2p-peer"
mh "github.com/multiformats/go-multihash"
)

View File

@ -14,10 +14,10 @@ import (
"github.com/ipfs/ipfs-cluster/api"
humanize "github.com/dustin/go-humanize"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
cid "github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log"
rpc "github.com/libp2p/go-libp2p-gorpc"
peer "github.com/libp2p/go-libp2p-peer"
)

View File

@ -11,9 +11,9 @@ import (
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/test"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
cid "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
rpc "github.com/libp2p/go-libp2p-gorpc"
)
func init() {

View File

@ -8,8 +8,8 @@ import (
"github.com/ipfs/ipfs-cluster/api"
humanize "github.com/dustin/go-humanize"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
cid "github.com/ipfs/go-cid"
rpc "github.com/libp2p/go-libp2p-gorpc"
peer "github.com/libp2p/go-libp2p-peer"
)

View File

@ -8,9 +8,9 @@ import (
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/rpcutil"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
cid "github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
rpc "github.com/libp2p/go-libp2p-gorpc"
peer "github.com/libp2p/go-libp2p-peer"
)

View File

@ -8,9 +8,9 @@ import (
"github.com/ipfs/ipfs-cluster/allocator/util"
"github.com/ipfs/ipfs-cluster/api"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
cid "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
rpc "github.com/libp2p/go-libp2p-gorpc"
peer "github.com/libp2p/go-libp2p-peer"
)

View File

@ -8,9 +8,9 @@ import (
"github.com/ipfs/ipfs-cluster/allocator/util"
"github.com/ipfs/ipfs-cluster/api"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
cid "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
rpc "github.com/libp2p/go-libp2p-gorpc"
peer "github.com/libp2p/go-libp2p-peer"
)

View File

@ -158,29 +158,22 @@ func AddParamsFromQuery(query url.Values) (*AddParams, error) {
// ToQueryString returns a url query string (key=value&key2=value2&...)
func (p *AddParams) ToQueryString() string {
fmtStr := "replication-min=%d&replication-max=%d&name=%s&"
fmtStr += "shard=%t&shard-size=%d&recursive=%t&"
fmtStr += "layout=%s&chunker=%s&raw-leaves=%t&hidden=%t&"
fmtStr += "wrap-with-directory=%t&progress=%t&"
fmtStr += "cid-version=%d&hash=%s"
query := fmt.Sprintf(
fmtStr,
p.ReplicationFactorMin,
p.ReplicationFactorMax,
p.Name,
p.Shard,
p.ShardSize,
p.Recursive,
p.Layout,
p.Chunker,
p.RawLeaves,
p.Hidden,
p.Wrap,
p.Progress,
p.CidVersion,
p.HashFun,
)
return query
query := url.Values{}
query.Set("replication-min", fmt.Sprintf("%d", p.ReplicationFactorMin))
query.Set("replication-max", fmt.Sprintf("%d", p.ReplicationFactorMax))
query.Set("name", p.Name)
query.Set("shard", fmt.Sprintf("%t", p.Shard))
query.Set("shard-size", fmt.Sprintf("%d", p.ShardSize))
query.Set("recursive", fmt.Sprintf("%t", p.Recursive))
query.Set("layout", p.Layout)
query.Set("chunker", p.Chunker)
query.Set("raw-leaves", fmt.Sprintf("%t", p.RawLeaves))
query.Set("hidden", fmt.Sprintf("%t", p.Hidden))
query.Set("wrap-with-directory", fmt.Sprintf("%t", p.Wrap))
query.Set("progress", fmt.Sprintf("%t", p.Progress))
query.Set("cid-version", fmt.Sprintf("%d", p.CidVersion))
query.Set("hash", p.HashFun)
return query.Encode()
}
// Equals checks if p equals p2.

View File

@ -146,14 +146,14 @@ func New(cfg *Config) (*Server, error) {
server: s,
}
smux.Handle("/", proxyHandler)
smux.HandleFunc("/api/v0/pin/add/", proxy.pinHandler)
smux.HandleFunc("/api/v0/pin/rm/", proxy.unpinHandler)
smux.HandleFunc("/api/v0/pin/ls", proxy.pinLsHandler) // required to handle /pin/ls for all pins
smux.HandleFunc("/api/v0/pin/ls/", proxy.pinLsHandler)
smux.HandleFunc("/api/v0/pin/add", proxy.pinHandler) // add?arg=xxx
smux.HandleFunc("/api/v0/pin/add/", proxy.pinHandler) // add/xxx
smux.HandleFunc("/api/v0/pin/rm", proxy.unpinHandler) // rm?arg=xxx
smux.HandleFunc("/api/v0/pin/rm/", proxy.unpinHandler) // rm/xxx
smux.HandleFunc("/api/v0/pin/ls", proxy.pinLsHandler) // required to handle /pin/ls for all pins
smux.HandleFunc("/api/v0/pin/ls/", proxy.pinLsHandler) // ls/xxx
smux.HandleFunc("/api/v0/add", proxy.addHandler)
smux.HandleFunc("/api/v0/add/", proxy.addHandler)
smux.HandleFunc("/api/v0/repo/stat", proxy.repoStatHandler)
smux.HandleFunc("/api/v0/repo/stat/", proxy.repoStatHandler)
go proxy.run()
return proxy, nil
@ -293,7 +293,7 @@ func (proxy *Server) pinLsHandler(w http.ResponseWriter, r *http.Request) {
Type: "recursive",
}
} else {
var pins []api.PinSerial
pins := make([]api.PinSerial, 0)
err := proxy.rpcClient.Call(
"",
"Cluster",
@ -394,7 +394,7 @@ func (proxy *Server) addHandler(w http.ResponseWriter, r *http.Request) {
}
func (proxy *Server) repoStatHandler(w http.ResponseWriter, r *http.Request) {
var peers []peer.ID
peers := make([]peer.ID, 0)
err := proxy.rpcClient.Call(
"",
"Cluster",

View File

@ -9,12 +9,11 @@ import (
"net/http"
"time"
"github.com/ipfs/go-ipfs-cmdkit/files"
"github.com/ipfs/ipfs-cluster/api"
cid "github.com/ipfs/go-cid"
shell "github.com/ipfs/go-ipfs-api"
files "github.com/ipfs/go-ipfs-files"
logging "github.com/ipfs/go-log"
host "github.com/libp2p/go-libp2p-host"
peer "github.com/libp2p/go-libp2p-peer"
@ -103,6 +102,10 @@ type Client interface {
// GetConnectGraph returns an ipfs-cluster connection graph. The
// serialized version, strings instead of pids, is returned
GetConnectGraph() (api.ConnectGraphSerial, error)
// Metrics returns a map with the latest metrics of matching name
// for the current cluster peers.
Metrics(name string) ([]api.Metric, error)
}
// Config allows to configure the parameters to connect

View File

@ -10,6 +10,7 @@ import (
"github.com/ipfs/ipfs-cluster/test"
libp2p "github.com/libp2p/go-libp2p"
peer "github.com/libp2p/go-libp2p-peer"
pnet "github.com/libp2p/go-libp2p-pnet"
ma "github.com/multiformats/go-multiaddr"
)
@ -59,9 +60,13 @@ func apiMAddr(a *rest.API) ma.Multiaddr {
}
func peerMAddr(a *rest.API) ma.Multiaddr {
listenAddr := a.Host().Addrs()[0]
ipfsAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s", a.Host().ID().Pretty()))
return listenAddr.Encapsulate(ipfsAddr)
ipfsAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s", peer.IDB58Encode(a.Host().ID())))
for _, a := range a.Host().Addrs() {
if _, err := a.ValueForProtocol(ma.P_IP4); err == nil {
return a.Encapsulate(ipfsAddr)
}
}
return nil
}
func testClientHTTP(t *testing.T, api *rest.API) *defaultClient {
@ -87,7 +92,6 @@ func testClientLibp2p(t *testing.T, api *rest.API) *defaultClient {
if err != nil {
t.Fatal(err)
}
return c.(*defaultClient)
}

View File

@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"net/url"
"os"
@ -15,7 +16,7 @@ import (
"github.com/ipfs/ipfs-cluster/api"
cid "github.com/ipfs/go-cid"
"github.com/ipfs/go-ipfs-cmdkit/files"
files "github.com/ipfs/go-ipfs-files"
peer "github.com/libp2p/go-libp2p-peer"
)
@ -109,7 +110,8 @@ func (c *defaultClient) Allocations(filter api.PinType) ([]api.Pin, error) {
}
}
err := c.do("GET", fmt.Sprintf("/allocations?filter=%s", strings.Join(strFilter, ",")), nil, nil, &pins)
f := url.QueryEscape(strings.Join(strFilter, ","))
err := c.do("GET", fmt.Sprintf("/allocations?filter=%s", f), nil, nil, &pins)
result := make([]api.Pin, len(pins))
for i, p := range pins {
result[i] = p.ToPin()
@ -204,6 +206,17 @@ func (c *defaultClient) GetConnectGraph() (api.ConnectGraphSerial, error) {
return graphS, err
}
// Metrics returns a map with the latest valid metrics of the given name
// for the current cluster peers.
func (c *defaultClient) Metrics(name string) ([]api.Metric, error) {
if name == "" {
return nil, errors.New("bad metric name")
}
var metrics []api.Metric
err := c.do("GET", fmt.Sprintf("/monitor/metrics/%s", name), nil, nil, &metrics)
return metrics, err
}
// WaitFor is a utility function that allows for a caller to wait for a
// paticular status for a CID (as defined by StatusFilterParams).
// It returns the final status for that CID and an error, if there was.

View File

@ -6,8 +6,8 @@ import (
"testing"
"time"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
cid "github.com/ipfs/go-cid"
rpc "github.com/libp2p/go-libp2p-gorpc"
peer "github.com/libp2p/go-libp2p-peer"
ma "github.com/multiformats/go-multiaddr"
@ -137,7 +137,7 @@ func TestPin(t *testing.T) {
testF := func(t *testing.T, c Client) {
ci, _ := cid.Decode(test.TestCid1)
err := c.Pin(ci, 6, 7, "hello")
err := c.Pin(ci, 6, 7, "hello there")
if err != nil {
t.Fatal(err)
}
@ -436,6 +436,7 @@ func TestAddMultiFile(t *testing.T) {
testF := func(t *testing.T, c Client) {
sth := test.NewShardingTestHelper()
defer sth.Clean(t)
mfr, closer := sth.GetTreeMultiReader(t)
defer closer.Close()
@ -443,7 +444,7 @@ func TestAddMultiFile(t *testing.T) {
PinOptions: types.PinOptions{
ReplicationFactorMin: -1,
ReplicationFactorMax: -1,
Name: "test",
Name: "test something",
ShardSize: 1024,
},
Shard: false,

View File

@ -92,9 +92,10 @@ func (c *defaultClient) handleResponse(resp *http.Response, obj interface{}) err
var apiErr api.Error
err = json.Unmarshal(body, &apiErr)
if err != nil {
// not json. 404s etc.
return &api.Error{
Code: resp.StatusCode,
Message: err.Error(),
Message: string(body),
}
}
return &apiErr

View File

@ -27,6 +27,15 @@ const (
DefaultIdleTimeout = 120 * time.Second
)
// These are the default values for Config.
var (
DefaultHeaders = map[string][]string{
"Access-Control-Allow-Headers": []string{"X-Requested-With", "Range"},
"Access-Control-Allow-Methods": []string{"GET"},
"Access-Control-Allow-Origin": []string{"*"},
}
)
// Config is used to intialize the API object and allows to
// customize the behaviour of it. It implements the config.ComponentConfig
// interface.
@ -71,6 +80,10 @@ type Config struct {
// BasicAuthCreds is a map of username-password pairs
// which are authorized to use Basic Authentication
BasicAuthCreds map[string]string
// Headers provides customization for the headers returned
// by the API. By default it sets a CORS policy.
Headers map[string][]string
}
type jsonConfig struct {
@ -87,7 +100,8 @@ type jsonConfig struct {
ID string `json:"id,omitempty"`
PrivateKey string `json:"private_key,omitempty"`
BasicAuthCreds map[string]string `json:"basic_auth_credentials"`
BasicAuthCreds map[string]string `json:"basic_auth_credentials"`
Headers map[string][]string `json:"headers"`
}
// ConfigKey returns a human-friendly identifier for this type of
@ -116,6 +130,9 @@ func (cfg *Config) Default() error {
// Auth
cfg.BasicAuthCreds = nil
// Headers
cfg.Headers = DefaultHeaders
return nil
}
@ -177,6 +194,7 @@ func (cfg *Config) LoadJSON(raw []byte) error {
// Other options
cfg.BasicAuthCreds = jcfg.BasicAuthCreds
cfg.Headers = jcfg.Headers
return cfg.Validate()
}
@ -295,6 +313,7 @@ func (cfg *Config) ToJSON() (raw []byte, err error) {
WriteTimeout: cfg.WriteTimeout.String(),
IdleTimeout: cfg.IdleTimeout.String(),
BasicAuthCreds: cfg.BasicAuthCreds,
Headers: cfg.Headers,
}
if cfg.ID != "" {

View File

@ -25,12 +25,12 @@ import (
types "github.com/ipfs/ipfs-cluster/api"
mux "github.com/gorilla/mux"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
gostream "github.com/hsanjuan/go-libp2p-gostream"
p2phttp "github.com/hsanjuan/go-libp2p-http"
cid "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
libp2p "github.com/libp2p/go-libp2p"
rpc "github.com/libp2p/go-libp2p-gorpc"
host "github.com/libp2p/go-libp2p-host"
peer "github.com/libp2p/go-libp2p-peer"
ma "github.com/multiformats/go-multiaddr"
@ -55,6 +55,9 @@ var (
ErrHTTPEndpointNotEnabled = errors.New("the HTTP endpoint is not enabled")
)
// Used by sendResponse to set the right status
const autoStatus = -1
// For making a random sharding ID
var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
@ -193,7 +196,7 @@ func (api *API) setupLibp2p(ctx context.Context) error {
return nil
}
l, err := gostream.Listen(api.host, p2phttp.P2PProtocol)
l, err := gostream.Listen(api.host, p2phttp.DefaultP2PProtocol)
if err != nil {
return err
}
@ -382,6 +385,12 @@ func (api *API) routes() []route {
"/health/graph",
api.graphHandler,
},
{
"Metrics",
"GET",
"/monitor/metrics/{name}",
api.metricsHandler,
},
}
}
@ -479,7 +488,7 @@ func (api *API) idHandler(w http.ResponseWriter, r *http.Request) {
struct{}{},
&idSerial)
sendResponse(w, err, idSerial)
api.sendResponse(w, autoStatus, err, idSerial)
}
func (api *API) versionHandler(w http.ResponseWriter, r *http.Request) {
@ -490,7 +499,7 @@ func (api *API) versionHandler(w http.ResponseWriter, r *http.Request) {
struct{}{},
&v)
sendResponse(w, err, v)
api.sendResponse(w, autoStatus, err, v)
}
func (api *API) graphHandler(w http.ResponseWriter, r *http.Request) {
@ -500,22 +509,37 @@ func (api *API) graphHandler(w http.ResponseWriter, r *http.Request) {
"ConnectGraph",
struct{}{},
&graph)
sendResponse(w, err, graph)
api.sendResponse(w, autoStatus, err, graph)
}
func (api *API) metricsHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
name := vars["name"]
var metrics []types.Metric
err := api.rpcClient.Call("",
"Cluster",
"PeerMonitorLatestMetrics",
name,
&metrics)
api.sendResponse(w, autoStatus, err, metrics)
}
func (api *API) addHandler(w http.ResponseWriter, r *http.Request) {
reader, err := r.MultipartReader()
if err != nil {
sendErrorResponse(w, http.StatusBadRequest, err.Error())
api.sendResponse(w, http.StatusBadRequest, err, nil)
return
}
params, err := types.AddParamsFromQuery(r.URL.Query())
if err != nil {
sendErrorResponse(w, http.StatusBadRequest, err.Error())
api.sendResponse(w, http.StatusBadRequest, err, nil)
return
}
api.setHeaders(w)
// any errors sent as trailer
adderutils.AddMultipartHTTPHandler(
api.ctx,
@ -537,7 +561,7 @@ func (api *API) peerListHandler(w http.ResponseWriter, r *http.Request) {
struct{}{},
&peersSerial)
sendResponse(w, err, peersSerial)
api.sendResponse(w, autoStatus, err, peersSerial)
}
func (api *API) peerAddHandler(w http.ResponseWriter, r *http.Request) {
@ -547,13 +571,13 @@ func (api *API) peerAddHandler(w http.ResponseWriter, r *http.Request) {
var addInfo peerAddBody
err := dec.Decode(&addInfo)
if err != nil {
sendErrorResponse(w, 400, "error decoding request body")
api.sendResponse(w, http.StatusBadRequest, errors.New("error decoding request body"), nil)
return
}
_, err = peer.IDB58Decode(addInfo.PeerID)
if err != nil {
sendErrorResponse(w, 400, "error decoding peer_id")
api.sendResponse(w, http.StatusBadRequest, errors.New("error decoding peer_id"), nil)
return
}
@ -563,22 +587,22 @@ func (api *API) peerAddHandler(w http.ResponseWriter, r *http.Request) {
"PeerAdd",
addInfo.PeerID,
&ids)
sendResponse(w, err, ids)
api.sendResponse(w, autoStatus, err, ids)
}
func (api *API) peerRemoveHandler(w http.ResponseWriter, r *http.Request) {
if p := parsePidOrError(w, r); p != "" {
if p := api.parsePidOrError(w, r); p != "" {
err := api.rpcClient.Call("",
"Cluster",
"PeerRemove",
p,
&struct{}{})
sendEmptyResponse(w, err)
api.sendResponse(w, autoStatus, err, nil)
}
}
func (api *API) pinHandler(w http.ResponseWriter, r *http.Request) {
if ps := parseCidOrError(w, r); ps.Cid != "" {
if ps := api.parseCidOrError(w, r); ps.Cid != "" {
logger.Debugf("rest api pinHandler: %s", ps.Cid)
err := api.rpcClient.Call("",
@ -586,20 +610,20 @@ func (api *API) pinHandler(w http.ResponseWriter, r *http.Request) {
"Pin",
ps,
&struct{}{})
sendAcceptedResponse(w, err)
api.sendResponse(w, http.StatusAccepted, err, nil)
logger.Debug("rest api pinHandler done")
}
}
func (api *API) unpinHandler(w http.ResponseWriter, r *http.Request) {
if ps := parseCidOrError(w, r); ps.Cid != "" {
if ps := api.parseCidOrError(w, r); ps.Cid != "" {
logger.Debugf("rest api unpinHandler: %s", ps.Cid)
err := api.rpcClient.Call("",
"Cluster",
"Unpin",
ps,
&struct{}{})
sendAcceptedResponse(w, err)
api.sendResponse(w, http.StatusAccepted, err, nil)
logger.Debug("rest api unpinHandler done")
}
}
@ -626,11 +650,11 @@ func (api *API) allocationsHandler(w http.ResponseWriter, r *http.Request) {
outPins = append(outPins, pinS)
}
}
sendResponse(w, err, outPins)
api.sendResponse(w, autoStatus, err, outPins)
}
func (api *API) allocationHandler(w http.ResponseWriter, r *http.Request) {
if ps := parseCidOrError(w, r); ps.Cid != "" {
if ps := api.parseCidOrError(w, r); ps.Cid != "" {
var pin types.PinSerial
err := api.rpcClient.Call("",
"Cluster",
@ -638,10 +662,10 @@ func (api *API) allocationHandler(w http.ResponseWriter, r *http.Request) {
ps,
&pin)
if err != nil { // errors here are 404s
sendErrorResponse(w, 404, err.Error())
api.sendResponse(w, http.StatusNotFound, err, nil)
return
}
sendJSONResponse(w, 200, pin)
api.sendResponse(w, autoStatus, nil, pin)
}
}
@ -656,7 +680,7 @@ func (api *API) statusAllHandler(w http.ResponseWriter, r *http.Request) {
"StatusAllLocal",
struct{}{},
&pinInfos)
sendResponse(w, err, pinInfosToGlobal(pinInfos))
api.sendResponse(w, autoStatus, err, pinInfosToGlobal(pinInfos))
} else {
var pinInfos []types.GlobalPinInfoSerial
err := api.rpcClient.Call("",
@ -664,7 +688,7 @@ func (api *API) statusAllHandler(w http.ResponseWriter, r *http.Request) {
"StatusAll",
struct{}{},
&pinInfos)
sendResponse(w, err, pinInfos)
api.sendResponse(w, autoStatus, err, pinInfos)
}
}
@ -672,7 +696,7 @@ func (api *API) statusHandler(w http.ResponseWriter, r *http.Request) {
queryValues := r.URL.Query()
local := queryValues.Get("local")
if ps := parseCidOrError(w, r); ps.Cid != "" {
if ps := api.parseCidOrError(w, r); ps.Cid != "" {
if local == "true" {
var pinInfo types.PinInfoSerial
err := api.rpcClient.Call("",
@ -680,7 +704,7 @@ func (api *API) statusHandler(w http.ResponseWriter, r *http.Request) {
"StatusLocal",
ps,
&pinInfo)
sendResponse(w, err, pinInfoToGlobal(pinInfo))
api.sendResponse(w, autoStatus, err, pinInfoToGlobal(pinInfo))
} else {
var pinInfo types.GlobalPinInfoSerial
err := api.rpcClient.Call("",
@ -688,7 +712,7 @@ func (api *API) statusHandler(w http.ResponseWriter, r *http.Request) {
"Status",
ps,
&pinInfo)
sendResponse(w, err, pinInfo)
api.sendResponse(w, autoStatus, err, pinInfo)
}
}
}
@ -704,7 +728,7 @@ func (api *API) syncAllHandler(w http.ResponseWriter, r *http.Request) {
"SyncAllLocal",
struct{}{},
&pinInfos)
sendResponse(w, err, pinInfosToGlobal(pinInfos))
api.sendResponse(w, autoStatus, err, pinInfosToGlobal(pinInfos))
} else {
var pinInfos []types.GlobalPinInfoSerial
err := api.rpcClient.Call("",
@ -712,7 +736,7 @@ func (api *API) syncAllHandler(w http.ResponseWriter, r *http.Request) {
"SyncAll",
struct{}{},
&pinInfos)
sendResponse(w, err, pinInfos)
api.sendResponse(w, autoStatus, err, pinInfos)
}
}
@ -720,7 +744,7 @@ func (api *API) syncHandler(w http.ResponseWriter, r *http.Request) {
queryValues := r.URL.Query()
local := queryValues.Get("local")
if ps := parseCidOrError(w, r); ps.Cid != "" {
if ps := api.parseCidOrError(w, r); ps.Cid != "" {
if local == "true" {
var pinInfo types.PinInfoSerial
err := api.rpcClient.Call("",
@ -728,7 +752,7 @@ func (api *API) syncHandler(w http.ResponseWriter, r *http.Request) {
"SyncLocal",
ps,
&pinInfo)
sendResponse(w, err, pinInfoToGlobal(pinInfo))
api.sendResponse(w, autoStatus, err, pinInfoToGlobal(pinInfo))
} else {
var pinInfo types.GlobalPinInfoSerial
err := api.rpcClient.Call("",
@ -736,7 +760,7 @@ func (api *API) syncHandler(w http.ResponseWriter, r *http.Request) {
"Sync",
ps,
&pinInfo)
sendResponse(w, err, pinInfo)
api.sendResponse(w, autoStatus, err, pinInfo)
}
}
}
@ -751,9 +775,9 @@ func (api *API) recoverAllHandler(w http.ResponseWriter, r *http.Request) {
"RecoverAllLocal",
struct{}{},
&pinInfos)
sendResponse(w, err, pinInfosToGlobal(pinInfos))
api.sendResponse(w, autoStatus, err, pinInfosToGlobal(pinInfos))
} else {
sendErrorResponse(w, 400, "only requests with parameter local=true are supported")
api.sendResponse(w, http.StatusBadRequest, errors.New("only requests with parameter local=true are supported"), nil)
}
}
@ -761,7 +785,7 @@ func (api *API) recoverHandler(w http.ResponseWriter, r *http.Request) {
queryValues := r.URL.Query()
local := queryValues.Get("local")
if ps := parseCidOrError(w, r); ps.Cid != "" {
if ps := api.parseCidOrError(w, r); ps.Cid != "" {
if local == "true" {
var pinInfo types.PinInfoSerial
err := api.rpcClient.Call("",
@ -769,7 +793,7 @@ func (api *API) recoverHandler(w http.ResponseWriter, r *http.Request) {
"RecoverLocal",
ps,
&pinInfo)
sendResponse(w, err, pinInfoToGlobal(pinInfo))
api.sendResponse(w, autoStatus, err, pinInfoToGlobal(pinInfo))
} else {
var pinInfo types.GlobalPinInfoSerial
err := api.rpcClient.Call("",
@ -777,18 +801,18 @@ func (api *API) recoverHandler(w http.ResponseWriter, r *http.Request) {
"Recover",
ps,
&pinInfo)
sendResponse(w, err, pinInfo)
api.sendResponse(w, autoStatus, err, pinInfo)
}
}
}
func parseCidOrError(w http.ResponseWriter, r *http.Request) types.PinSerial {
func (api *API) parseCidOrError(w http.ResponseWriter, r *http.Request) types.PinSerial {
vars := mux.Vars(r)
hash := vars["hash"]
_, err := cid.Decode(hash)
if err != nil {
sendErrorResponse(w, 400, "error decoding Cid: "+err.Error())
api.sendResponse(w, http.StatusBadRequest, errors.New("error decoding Cid: "+err.Error()), nil)
return types.PinSerial{Cid: ""}
}
@ -827,12 +851,12 @@ func parseCidOrError(w http.ResponseWriter, r *http.Request) types.PinSerial {
return pin
}
func parsePidOrError(w http.ResponseWriter, r *http.Request) peer.ID {
func (api *API) parsePidOrError(w http.ResponseWriter, r *http.Request) peer.ID {
vars := mux.Vars(r)
idStr := vars["peer"]
pid, err := peer.IDB58Decode(idStr)
if err != nil {
sendErrorResponse(w, 400, "error decoding Peer ID: "+err.Error())
api.sendResponse(w, http.StatusBadRequest, errors.New("error decoding Peer ID: "+err.Error()), nil)
return ""
}
return pid
@ -855,64 +879,70 @@ func pinInfosToGlobal(pInfos []types.PinInfoSerial) []types.GlobalPinInfoSerial
return gPInfos
}
func sendResponse(w http.ResponseWriter, err error, resp interface{}) {
if checkErr(w, err) {
sendJSONResponse(w, 200, resp)
}
}
// sendResponse wraps all the logic for writing the response to a request:
// * Write configured headers
// * Write application/json content type
// * Write status: determined automatically if given "autoStatus"
// * Write an error if there is or write the response if there is
func (api *API) sendResponse(
w http.ResponseWriter,
status int,
err error,
resp interface{},
) {
// checkErr takes care of returning standard error responses if we
// pass an error to it. It returns true when everythings OK (no error
// was handled), or false otherwise.
func checkErr(w http.ResponseWriter, err error) bool {
api.setHeaders(w)
enc := json.NewEncoder(w)
// Send an error
if err != nil {
sendErrorResponse(w, http.StatusInternalServerError, err.Error())
return false
}
return true
}
if status == autoStatus || status < 400 { // set a default error status
status = http.StatusInternalServerError
}
w.WriteHeader(status)
func sendEmptyResponse(w http.ResponseWriter, err error) {
if checkErr(w, err) {
w.WriteHeader(http.StatusNoContent)
}
}
errorResp := types.Error{
Code: status,
Message: err.Error(),
}
logger.Errorf("sending error response: %d: %s", status, err.Error())
func sendAcceptedResponse(w http.ResponseWriter, err error) {
if checkErr(w, err) {
w.WriteHeader(http.StatusAccepted)
}
}
func sendJSONResponse(w http.ResponseWriter, code int, resp interface{}) {
w.Header().Add("Content-Type", "application/json")
w.WriteHeader(code)
if err := json.NewEncoder(w).Encode(resp); err != nil {
logger.Error(err)
}
}
func sendErrorResponse(w http.ResponseWriter, code int, msg string) {
errorResp := types.Error{
Code: code,
Message: msg,
}
logger.Errorf("sending error response: %d: %s", code, msg)
sendJSONResponse(w, code, errorResp)
}
func sendStreamResponse(w http.ResponseWriter, err error, resp <-chan interface{}) {
if !checkErr(w, err) {
if err := enc.Encode(errorResp); err != nil {
logger.Error(err)
}
return
}
enc := json.NewEncoder(w)
w.Header().Add("Content-Type", "application/octet-stream")
w.WriteHeader(http.StatusOK)
for v := range resp {
err := enc.Encode(v)
if err != nil {
// Send a body
if resp != nil {
if status == autoStatus {
status = http.StatusOK
}
w.WriteHeader(status)
if err = enc.Encode(resp); err != nil {
logger.Error(err)
}
return
}
// Empty response
if status == autoStatus {
status = http.StatusNoContent
}
w.WriteHeader(status)
}
// this sets all the headers that are common to all responses
// from this API. Called from sendResponse() and /add.
func (api *API) setHeaders(w http.ResponseWriter) {
for header, values := range api.config.Headers {
for _, val := range values {
w.Header().Add(header, val)
}
}
w.Header().Add("Content-Type", "application/json")
}

View File

@ -124,6 +124,17 @@ func processStreamingResp(t *testing.T, httpResp *http.Response, err error, resp
}
}
func checkHeaders(t *testing.T, rest *API, url string, headers http.Header) {
for k, v := range rest.config.Headers {
if strings.Join(v, ",") != strings.Join(headers[k], ",") {
t.Errorf("%s does not show configured headers: %s", url, k)
}
}
if headers.Get("Content-Type") != "application/json" {
t.Errorf("%s is not application/json", url)
}
}
// makes a libp2p host that knows how to talk to the rest API host.
func makeHost(t *testing.T, rest *API) host.Host {
h, err := libp2p.New(context.Background())
@ -185,6 +196,7 @@ func makeGet(t *testing.T, rest *API, url string, resp interface{}) {
c := httpClient(t, h, isHTTPS(url))
httpResp, err := c.Get(url)
processResp(t, httpResp, err, resp)
checkHeaders(t, rest, url, httpResp.Header)
}
func makePost(t *testing.T, rest *API, url string, body []byte, resp interface{}) {
@ -193,6 +205,7 @@ func makePost(t *testing.T, rest *API, url string, body []byte, resp interface{}
c := httpClient(t, h, isHTTPS(url))
httpResp, err := c.Post(url, "application/json", bytes.NewReader(body))
processResp(t, httpResp, err, resp)
checkHeaders(t, rest, url, httpResp.Header)
}
func makeDelete(t *testing.T, rest *API, url string, resp interface{}) {
@ -202,6 +215,7 @@ func makeDelete(t *testing.T, rest *API, url string, resp interface{}) {
req, _ := http.NewRequest("DELETE", url, bytes.NewReader([]byte{}))
httpResp, err := c.Do(req)
processResp(t, httpResp, err, resp)
checkHeaders(t, rest, url, httpResp.Header)
}
func makeStreamingPost(t *testing.T, rest *API, url string, body io.Reader, contentType string, resp interface{}) {
@ -210,6 +224,7 @@ func makeStreamingPost(t *testing.T, rest *API, url string, body io.Reader, cont
c := httpClient(t, h, isHTTPS(url))
httpResp, err := c.Post(url, contentType, body)
processStreamingResp(t, httpResp, err, resp)
checkHeaders(t, rest, url, httpResp.Header)
}
type testF func(t *testing.T, url urlF)
@ -251,6 +266,7 @@ func TestRestAPIIDEndpoint(t *testing.T) {
rest := testAPI(t)
httpsrest := testHTTPSAPI(t)
defer rest.Shutdown()
defer httpsrest.Shutdown()
tf := func(t *testing.T, url urlF) {
id := api.IDSerial{}
@ -361,10 +377,20 @@ func TestAPIAddFileEndpointBadContentType(t *testing.T) {
func TestAPIAddFileEndpointLocal(t *testing.T) {
rest := testAPI(t)
defer rest.Shutdown()
sth := test.NewShardingTestHelper()
defer sth.Clean(t)
// This writes generates the testing files and
// writes them to disk.
// This is necessary here because we run tests
// in parallel, and otherwise a write-race might happen.
_, closer := sth.GetTreeMultiReader(t)
closer.Close()
tf := func(t *testing.T, url urlF) {
fmtStr1 := "/add?shard=true&repl_min=-1&repl_max=-1"
localURL := url(rest) + fmtStr1
sth := test.NewShardingTestHelper()
body, closer := sth.GetTreeMultiReader(t)
defer closer.Close()
resp := api.AddedOutput{}
@ -378,8 +404,18 @@ func TestAPIAddFileEndpointLocal(t *testing.T) {
func TestAPIAddFileEndpointShard(t *testing.T) {
rest := testAPI(t)
defer rest.Shutdown()
sth := test.NewShardingTestHelper()
defer sth.Clean(t)
// This writes generates the testing files and
// writes them to disk.
// This is necessary here because we run tests
// in parallel, and otherwise a write-race might happen.
_, closer := sth.GetTreeMultiReader(t)
closer.Close()
tf := func(t *testing.T, url urlF) {
sth := test.NewShardingTestHelper()
body, closer := sth.GetTreeMultiReader(t)
defer closer.Close()
mpContentType := "multipart/form-data; boundary=" + body.Boundary()

View File

@ -9,6 +9,8 @@
package api
import (
"bytes"
"encoding/json"
"fmt"
"regexp"
"sort"
@ -827,6 +829,25 @@ func (pins PinSerial) ToPin() Pin {
}
}
// Clone returns a deep copy of the PinSerial.
func (pins PinSerial) Clone() PinSerial {
new := pins // this copy all the simple fields.
// slices are pointers. We need to explicitally copy them.
new.Allocations = make([]string, len(pins.Allocations))
copy(new.Allocations, pins.Allocations)
return new
}
// DecodeCid retrieves just the cid from a PinSerial without
// allocating a Pin.
func (pins PinSerial) DecodeCid() cid.Cid {
c, err := cid.Decode(pins.Cid)
if err != nil {
logger.Debug(pins.Cid, err)
}
return c
}
// NodeWithMeta specifies a block of data and a set of optional metadata fields
// carrying information about the encoded ipld node
type NodeWithMeta struct {
@ -847,10 +868,10 @@ func (n *NodeWithMeta) Size() uint64 {
// the Value, which should be interpreted by the PinAllocator.
type Metric struct {
Name string
Peer peer.ID // filled-in by Cluster.
Peer peer.ID
Value string
Expire int64 // UnixNano
Valid bool // if the metric is not valid it will be discarded
Expire int64
Valid bool
}
// SetTTL sets Metric to expire after the given time.Duration
@ -876,6 +897,51 @@ func (m *Metric) Discard() bool {
return !m.Valid || m.Expired()
}
// helper for JSON marshaling. The Metric type is already
// serializable, but not pretty to humans (API).
type metricSerial struct {
Name string `json:"name"`
Peer string `json:"peer"`
Value string `json:"value"`
Expire int64 `json:"expire"`
Valid bool `json:"valid"`
}
// MarshalJSON allows a Metric to produce a JSON representation
// of itself.
func (m *Metric) MarshalJSON() ([]byte, error) {
return json.Marshal(&metricSerial{
Name: m.Name,
Peer: peer.IDB58Encode(m.Peer),
Value: m.Value,
Expire: m.Expire,
})
}
// UnmarshalJSON decodes JSON on top of the Metric.
func (m *Metric) UnmarshalJSON(j []byte) error {
if bytes.Equal(j, []byte("null")) {
return nil
}
ms := &metricSerial{}
err := json.Unmarshal(j, ms)
if err != nil {
return err
}
p, err := peer.IDB58Decode(ms.Peer)
if err != nil {
return err
}
m.Name = ms.Name
m.Peer = p
m.Value = ms.Value
m.Expire = ms.Expire
return nil
}
// Alert carries alerting information about a peer. WIP.
type Alert struct {
Peer peer.ID

View File

@ -257,3 +257,45 @@ func TestMetric(t *testing.T) {
t.Error("looks like a bad ttl")
}
}
func BenchmarkPinSerial_ToPin(b *testing.B) {
pin := Pin{
Cid: testCid1,
Type: ClusterDAGType,
Allocations: []peer.ID{testPeerID1},
Reference: testCid2,
MaxDepth: -1,
PinOptions: PinOptions{
ReplicationFactorMax: -1,
ReplicationFactorMin: -1,
Name: "A test pin",
},
}
pinS := pin.ToSerial()
b.ResetTimer()
for i := 0; i < b.N; i++ {
pinS.ToPin()
}
}
func BenchmarkPinSerial_DecodeCid(b *testing.B) {
pin := Pin{
Cid: testCid1,
Type: ClusterDAGType,
Allocations: []peer.ID{testPeerID1},
Reference: testCid2,
MaxDepth: -1,
PinOptions: PinOptions{
ReplicationFactorMax: -1,
ReplicationFactorMin: -1,
Name: "A test pin",
},
}
pinS := pin.ToSerial()
b.ResetTimer()
for i := 0; i < b.N; i++ {
pinS.DecodeCid()
}
}

View File

@ -16,8 +16,8 @@ import (
"github.com/ipfs/ipfs-cluster/rpcutil"
"github.com/ipfs/ipfs-cluster/state"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
cid "github.com/ipfs/go-cid"
rpc "github.com/libp2p/go-libp2p-gorpc"
host "github.com/libp2p/go-libp2p-host"
dht "github.com/libp2p/go-libp2p-kad-dht"
peer "github.com/libp2p/go-libp2p-peer"
@ -56,15 +56,18 @@ type Cluster struct {
allocator PinAllocator
informer Informer
doneCh chan struct{}
readyCh chan struct{}
readyB bool
wg sync.WaitGroup
// peerAdd
paMux sync.Mutex
// shutdown function and related variables
shutdownLock sync.Mutex
shutdownB bool
removed bool
doneCh chan struct{}
readyCh chan struct{}
readyB bool
wg sync.WaitGroup
paMux sync.Mutex
}
// NewCluster builds a new IPFS Cluster peer. It initializes a LibP2P host,
@ -277,7 +280,10 @@ func (c *Cluster) alertsHandler() {
// only the leader handles alerts
leader, err := c.consensus.Leader()
if err == nil && leader == c.id {
logger.Warningf("Peer %s received alert for %s in %s", c.id, alrt.MetricName, alrt.Peer.Pretty())
logger.Warningf(
"Peer %s received alert for %s in %s",
c.id, alrt.MetricName, alrt.Peer,
)
switch alrt.MetricName {
case pingMetricName:
c.repinFromPeer(alrt.Peer)
@ -312,6 +318,8 @@ func (c *Cluster) watchPeers() {
}
if !hasMe {
c.shutdownLock.Lock()
defer c.shutdownLock.Unlock()
logger.Infof("%s: removed from raft. Initiating shutdown", c.id.Pretty())
c.removed = true
go c.Shutdown()
@ -1095,11 +1103,12 @@ func (c *Cluster) Peers() []api.ID {
logger.Error("an empty list of peers will be returned")
return []api.ID{}
}
lenMembers := len(members)
peersSerial := make([]api.IDSerial, len(members), len(members))
peers := make([]api.ID, len(members), len(members))
peersSerial := make([]api.IDSerial, lenMembers, lenMembers)
peers := make([]api.ID, lenMembers, lenMembers)
ctxs, cancels := rpcutil.CtxsWithCancel(c.ctx, len(members))
ctxs, cancels := rpcutil.CtxsWithCancel(c.ctx, lenMembers)
defer rpcutil.MultiCancel(cancels)
errs := c.rpcClient.MultiCall(
@ -1135,13 +1144,14 @@ func (c *Cluster) globalPinInfoCid(method string, h cid.Cid) (api.GlobalPinInfo,
logger.Error(err)
return api.GlobalPinInfo{}, err
}
lenMembers := len(members)
replies := make([]api.PinInfoSerial, len(members), len(members))
replies := make([]api.PinInfoSerial, lenMembers, lenMembers)
arg := api.Pin{
Cid: h,
}
ctxs, cancels := rpcutil.CtxsWithCancel(c.ctx, len(members))
ctxs, cancels := rpcutil.CtxsWithCancel(c.ctx, lenMembers)
defer rpcutil.MultiCancel(cancels)
errs := c.rpcClient.MultiCall(
@ -1192,7 +1202,7 @@ func (c *Cluster) globalPinInfoCid(method string, h cid.Cid) (api.GlobalPinInfo,
}
func (c *Cluster) globalPinInfoSlice(method string) ([]api.GlobalPinInfo, error) {
var infos []api.GlobalPinInfo
infos := make([]api.GlobalPinInfo, 0)
fullMap := make(map[string]api.GlobalPinInfo)
members, err := c.consensus.Peers()
@ -1200,10 +1210,11 @@ func (c *Cluster) globalPinInfoSlice(method string) ([]api.GlobalPinInfo, error)
logger.Error(err)
return []api.GlobalPinInfo{}, err
}
lenMembers := len(members)
replies := make([][]api.PinInfoSerial, len(members), len(members))
replies := make([][]api.PinInfoSerial, lenMembers, lenMembers)
ctxs, cancels := rpcutil.CtxsWithCancel(c.ctx, len(members))
ctxs, cancels := rpcutil.CtxsWithCancel(c.ctx, lenMembers)
defer rpcutil.MultiCancel(cancels)
errs := c.rpcClient.MultiCall(

View File

@ -11,6 +11,8 @@ import (
"sync"
"time"
"github.com/kelseyhightower/envconfig"
"github.com/ipfs/ipfs-cluster/config"
crypto "github.com/libp2p/go-libp2p-crypto"
@ -240,8 +242,7 @@ func isReplicationFactorValid(rplMin, rplMax int) error {
return errors.New("cluster.replication_factor_max is wrong")
}
if (rplMin == -1 && rplMax != -1) ||
(rplMin != -1 && rplMax == -1) {
if (rplMin == -1 && rplMax != -1) || (rplMin != -1 && rplMax == -1) {
return errors.New("cluster.replication_factor_min and max must be -1 when one of them is")
}
return nil
@ -279,9 +280,7 @@ func (cfg *Config) LoadJSON(raw []byte) error {
return err
}
// Make sure all non-defined keys have good values.
cfg.setDefaults()
config.SetIfNotDefault(jcfg.PeerstoreFile, &cfg.PeerstoreFile)
if jcfg.Peers != nil || jcfg.Bootstrap != nil {
logger.Error(`
@ -303,6 +302,12 @@ for more information.`)
return errors.New("cluster.Peers and cluster.Bootstrap keys have been deprecated")
}
// override json config with env var
err = envconfig.Process(cfg.ConfigKey(), jcfg)
if err != nil {
return err
}
parseDuration := func(txt string) time.Duration {
d, _ := time.ParseDuration(txt)
if txt != "" && d == 0 {
@ -311,6 +316,8 @@ for more information.`)
return d
}
config.SetIfNotDefault(jcfg.PeerstoreFile, &cfg.PeerstoreFile)
id, err := peer.IDB58Decode(jcfg.ID)
if err != nil {
err = fmt.Errorf("error decoding cluster ID: %s", err)

View File

@ -2,6 +2,7 @@ package ipfscluster
import (
"encoding/json"
"os"
"testing"
)
@ -23,118 +24,178 @@ var ccfgTestJSON = []byte(`
`)
func TestLoadJSON(t *testing.T) {
cfg := &Config{}
err := cfg.LoadJSON(ccfgTestJSON)
if err != nil {
t.Fatal(err)
loadJSON := func(t *testing.T) (*Config, error) {
cfg := &Config{}
err := cfg.LoadJSON(ccfgTestJSON)
if err != nil {
return cfg, err
}
return cfg, nil
}
if cfg.Peername != "testpeer" {
t.Error("expected peername 'testpeer'")
t.Run("basic", func(t *testing.T) {
cfg := &Config{}
err := cfg.LoadJSON(ccfgTestJSON)
if err != nil {
t.Fatal(err)
}
})
t.Run("peername", func(t *testing.T) {
cfg, err := loadJSON(t)
if err != nil {
t.Error(err)
}
if cfg.Peername != "testpeer" {
t.Error("expected peername 'testpeer'")
}
})
t.Run("expected replication factor", func(t *testing.T) {
cfg, err := loadJSON(t)
if err != nil {
t.Error(err)
}
if cfg.ReplicationFactorMin != 5 {
t.Error("expected replication factor min == 5")
}
})
t.Run("expected disable_repinning", func(t *testing.T) {
cfg, err := loadJSON(t)
if err != nil {
t.Error(err)
}
if !cfg.DisableRepinning {
t.Error("expected disable_repinning to be true")
}
})
loadJSON2 := func(t *testing.T, f func(j *configJSON)) (*Config, error) {
cfg := &Config{}
j := &configJSON{}
json.Unmarshal(ccfgTestJSON, j)
f(j)
tst, err := json.Marshal(j)
if err != nil {
return cfg, err
}
err = cfg.LoadJSON(tst)
if err != nil {
return cfg, err
}
return cfg, nil
}
if cfg.ReplicationFactorMin != 5 {
t.Error("expected replication factor min == 5")
}
t.Run("bad id", func(t *testing.T) {
_, err := loadJSON2(t, func(j *configJSON) { j.ID = "abc" })
if err == nil {
t.Error("expected error decoding ID")
}
})
if !cfg.DisableRepinning {
t.Error("expected disable_repinning to be true")
}
t.Run("empty default peername", func(t *testing.T) {
cfg, err := loadJSON2(t, func(j *configJSON) { j.Peername = "" })
if err != nil {
t.Error(err)
}
if cfg.Peername == "" {
t.Error("expected default peername")
}
})
j := &configJSON{}
t.Run("bad private key", func(t *testing.T) {
_, err := loadJSON2(t, func(j *configJSON) { j.PrivateKey = "abc" })
if err == nil {
t.Error("expected error parsing private key")
}
})
json.Unmarshal(ccfgTestJSON, j)
j.ID = "abc"
tst, _ := json.Marshal(j)
err = cfg.LoadJSON(tst)
if err == nil {
t.Error("expected error decoding ID")
}
t.Run("bad listen multiaddress", func(t *testing.T) {
_, err := loadJSON2(t, func(j *configJSON) { j.ListenMultiaddress = "abc" })
if err == nil {
t.Error("expected error parsing listen_multiaddress")
}
})
j = &configJSON{}
json.Unmarshal(ccfgTestJSON, j)
j.Peername = ""
tst, _ = json.Marshal(j)
err = cfg.LoadJSON(tst)
if cfg.Peername == "" {
t.Error("expected default peername")
}
t.Run("bad secret", func(t *testing.T) {
_, err := loadJSON2(t, func(j *configJSON) { j.Secret = "abc" })
if err == nil {
t.Error("expected error decoding secret")
}
})
j = &configJSON{}
json.Unmarshal(ccfgTestJSON, j)
j.PrivateKey = "abc"
tst, _ = json.Marshal(j)
err = cfg.LoadJSON(tst)
if err == nil {
t.Error("expected error parsing private key")
}
t.Run("default replication factors", func(t *testing.T) {
cfg, err := loadJSON2(
t,
func(j *configJSON) {
j.ReplicationFactor = 0
j.ReplicationFactorMin = 0
j.ReplicationFactorMax = 0
},
)
if err != nil {
t.Error(err)
}
if cfg.ReplicationFactorMin != -1 || cfg.ReplicationFactorMax != -1 {
t.Error("expected default replication factor")
}
})
j = &configJSON{}
json.Unmarshal(ccfgTestJSON, j)
j.ListenMultiaddress = "abc"
tst, _ = json.Marshal(j)
err = cfg.LoadJSON(tst)
if err == nil {
t.Error("expected error parsing listen_multiaddress")
}
t.Run("replication factor min/max override", func(t *testing.T) {
cfg, err := loadJSON2(t, func(j *configJSON) { j.ReplicationFactor = 3 })
if err != nil {
t.Error(err)
}
if cfg.ReplicationFactorMin != 3 || cfg.ReplicationFactorMax != 3 {
t.Error("expected replicationFactor Min/Max override")
}
})
j = &configJSON{}
json.Unmarshal(ccfgTestJSON, j)
j.Secret = "abc"
tst, _ = json.Marshal(j)
err = cfg.LoadJSON(tst)
if err == nil {
t.Error("expected error decoding secret")
}
t.Run("only replication factor min set to -1", func(t *testing.T) {
_, err := loadJSON2(t, func(j *configJSON) { j.ReplicationFactorMin = -1 })
if err == nil {
t.Error("expected error when only one replication factor is -1")
}
})
j = &configJSON{}
json.Unmarshal(ccfgTestJSON, j)
j.ReplicationFactor = 0
j.ReplicationFactorMin = 0
j.ReplicationFactorMax = 0
tst, _ = json.Marshal(j)
cfg.LoadJSON(tst)
if cfg.ReplicationFactorMin != -1 || cfg.ReplicationFactorMax != -1 {
t.Error("expected default replication factor")
}
t.Run("replication factor min > max", func(t *testing.T) {
_, err := loadJSON2(
t,
func(j *configJSON) {
j.ReplicationFactorMin = 5
j.ReplicationFactorMax = 4
},
)
if err == nil {
t.Error("expected error when only rplMin > rplMax")
}
})
j = &configJSON{}
json.Unmarshal(ccfgTestJSON, j)
j.ReplicationFactor = 3
tst, _ = json.Marshal(j)
cfg.LoadJSON(tst)
if cfg.ReplicationFactorMin != 3 || cfg.ReplicationFactorMax != 3 {
t.Error("expected replicationFactor Min/Max override")
}
t.Run("default replication factor", func(t *testing.T) {
cfg, err := loadJSON2(
t,
func(j *configJSON) {
j.ReplicationFactorMin = 0
j.ReplicationFactorMax = 0
},
)
if err != nil {
t.Error(err)
}
if cfg.ReplicationFactorMin != -1 || cfg.ReplicationFactorMax != -1 {
t.Error("expected default replication factors")
}
})
j = &configJSON{}
json.Unmarshal(ccfgTestJSON, j)
j.ReplicationFactorMin = -1
tst, _ = json.Marshal(j)
err = cfg.LoadJSON(tst)
if err == nil {
t.Error("expected error when only one replication factor is -1")
}
j = &configJSON{}
json.Unmarshal(ccfgTestJSON, j)
j.ReplicationFactorMin = 5
j.ReplicationFactorMax = 4
tst, _ = json.Marshal(j)
err = cfg.LoadJSON(tst)
if err == nil {
t.Error("expected error when only rplMin > rplMax")
}
j = &configJSON{}
json.Unmarshal(ccfgTestJSON, j)
j.ReplicationFactorMin = 0
j.ReplicationFactorMax = 0
tst, _ = json.Marshal(j)
err = cfg.LoadJSON(tst)
if cfg.ReplicationFactorMin != -1 || cfg.ReplicationFactorMax != -1 {
t.Error("expected default replication factors")
}
t.Run("env var override", func(t *testing.T) {
os.Setenv("CLUSTER_PEERNAME", "envsetpeername")
cfg := &Config{}
cfg.LoadJSON(ccfgTestJSON)
if cfg.Peername != "envsetpeername" {
t.Fatal("failed to override peername with env var")
}
})
}
func TestToJSON(t *testing.T) {

View File

@ -19,8 +19,8 @@ import (
"github.com/ipfs/ipfs-cluster/state/mapstate"
"github.com/ipfs/ipfs-cluster/test"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
cid "github.com/ipfs/go-cid"
rpc "github.com/libp2p/go-libp2p-gorpc"
peer "github.com/libp2p/go-libp2p-peer"
)

View File

@ -6,8 +6,10 @@ import (
"fmt"
"sort"
"strings"
"time"
"github.com/ipfs/ipfs-cluster/api"
peer "github.com/libp2p/go-libp2p-peer"
)
func jsonFormatObject(resp interface{}) {
@ -24,6 +26,9 @@ func jsonFormatObject(resp interface{}) {
jsonFormatPrint(resp.(api.AddedOutput))
case api.Version:
jsonFormatPrint(resp.(api.Version))
case api.Metric:
serial := resp.(api.Metric)
textFormatPrintMetric(&serial)
case api.Error:
jsonFormatPrint(resp.(api.Error))
case []api.ID:
@ -51,6 +56,9 @@ func jsonFormatObject(resp interface{}) {
case []api.AddedOutput:
serials := resp.([]api.AddedOutput)
jsonFormatPrint(serials)
case []api.Metric:
serials := resp.([]api.Metric)
jsonFormatPrint(serials)
default:
checkErr("", errors.New("unsupported type returned"))
}
@ -84,6 +92,9 @@ func textFormatObject(resp interface{}) {
case api.Error:
serial := resp.(api.Error)
textFormatPrintError(&serial)
case api.Metric:
serial := resp.(api.Metric)
textFormatPrintMetric(&serial)
case []api.ID:
for _, item := range resp.([]api.ID) {
textFormatObject(item)
@ -100,6 +111,10 @@ func textFormatObject(resp interface{}) {
for _, item := range resp.([]api.AddedOutput) {
textFormatObject(item)
}
case []api.Metric:
for _, item := range resp.([]api.Metric) {
textFormatObject(item)
}
default:
checkErr("", errors.New("unsupported type returned"))
}
@ -202,6 +217,11 @@ func textFormatPrintAddedOutput(obj *api.AddedOutput) {
fmt.Printf("added %s %s\n", obj.Cid, obj.Name)
}
func textFormatPrintMetric(obj *api.Metric) {
date := time.Unix(0, obj.Expire).UTC().Format(time.RFC3339)
fmt.Printf("%s: %s | Expire: %s\n", peer.IDB58Encode(obj.Peer), obj.Value, date)
}
func textFormatPrintError(obj *api.Error) {
fmt.Printf("An error occurred:\n")
fmt.Printf(" Code: %d\n", obj.Code)

View File

@ -33,14 +33,14 @@ var simpleIpfs = `digraph cluster {
/* The nodes of the connectivity graph */
/* The cluster-service peers */
C0 [label="<peer.ID UBuxVH>" color="blue2"]
C1 [label="<peer.ID V35Ljb>" color="blue2"]
C2 [label="<peer.ID Z2ckU7>" color="blue2"]
C0 [label="<peer.ID Qm*eqhEhD>" color="blue2"]
C1 [label="<peer.ID Qm*cgHDQJ>" color="blue2"]
C2 [label="<peer.ID Qm*6MQmJu>" color="blue2"]
/* The ipfs peers */
I0 [label="<peer.ID PFKAGZ>" color="goldenrod"]
I1 [label="<peer.ID XbiVZd>" color="goldenrod"]
I2 [label="<peer.ID bU7273>" color="goldenrod"]
I0 [label="<peer.ID Qm*N5LSsq>" color="goldenrod"]
I1 [label="<peer.ID Qm*R3DZDV>" color="goldenrod"]
I2 [label="<peer.ID Qm*wbBsuL>" color="goldenrod"]
/* Edges representing active connections in the cluster */
/* The connections among cluster-service peers */
@ -115,17 +115,17 @@ var allIpfs = `digraph cluster {
/* The nodes of the connectivity graph */
/* The cluster-service peers */
C0 [label="<peer.ID UBuxVH>" color="blue2"]
C1 [label="<peer.ID V35Ljb>" color="blue2"]
C2 [label="<peer.ID Z2ckU7>" color="blue2"]
C0 [label="<peer.ID Qm*eqhEhD>" color="blue2"]
C1 [label="<peer.ID Qm*cgHDQJ>" color="blue2"]
C2 [label="<peer.ID Qm*6MQmJu>" color="blue2"]
/* The ipfs peers */
I0 [label="<peer.ID PFKAGZ>" color="goldenrod"]
I1 [label="<peer.ID QsdAdC>" color="goldenrod"]
I2 [label="<peer.ID VV2enw>" color="goldenrod"]
I3 [label="<peer.ID XbiVZd>" color="goldenrod"]
I4 [label="<peer.ID bU7273>" color="goldenrod"]
I5 [label="<peer.ID fCHNQ2>" color="goldenrod"]
I0 [label="<peer.ID Qm*N5LSsq>" color="goldenrod"]
I1 [label="<peer.ID Qm*S8xccb>" color="goldenrod"]
I2 [label="<peer.ID Qm*aaanM8>" color="goldenrod"]
I3 [label="<peer.ID Qm*R3DZDV>" color="goldenrod"]
I4 [label="<peer.ID Qm*wbBsuL>" color="goldenrod"]
I5 [label="<peer.ID Qm*tWZdeD>" color="goldenrod"]
/* Edges representing active connections in the cluster */
/* The connections among cluster-service peers */

View File

@ -27,7 +27,7 @@ const programName = `ipfs-cluster-ctl`
// Version is the cluster-ctl tool version. It should match
// the IPFS cluster's version
const Version = "0.6.0"
const Version = "0.7.0"
var (
defaultHost = "/ip4/127.0.0.1/tcp/9094"
@ -762,6 +762,31 @@ graph of the connections. Output is a dot file encoding the cluster's connectio
return nil
},
},
{
Name: "metrics",
Usage: "List latest metrics logged by this peer",
Description: `
This commands displays the latest valid metrics of the given type logged
by this peer for all current cluster peers.
Currently supported metrics depend on the informer component used,
but usually are:
- freespace
- ping
`,
ArgsUsage: "<metric name>",
Action: func(c *cli.Context) error {
metric := c.Args().First()
if metric == "" {
checkErr("", errors.New("provide a metric name"))
}
resp, cerr := globalClient.Metrics(metric)
formatResponse(c, resp, cerr)
return nil
},
},
},
},
{

View File

@ -1,7 +1,6 @@
package main
import (
"fmt"
"os"
"path/filepath"
@ -72,12 +71,7 @@ func makeConfigs() (*config.Manager, *cfgs) {
}
}
func saveConfig(cfg *config.Manager, force bool) {
if _, err := os.Stat(configPath); err == nil && !force {
err := fmt.Errorf("%s exists. Try running: %s -f init", configPath, programName)
checkErr("", err)
}
func saveConfig(cfg *config.Manager) {
err := os.MkdirAll(filepath.Dir(configPath), 0700)
err = cfg.SaveJSON(configPath)
checkErr("saving new configuration", err)

View File

@ -30,6 +30,11 @@ const (
defaultLogLevel = "info"
)
const (
stateCleanupPrompt = "The peer's state will be removed from the load path. Existing pins may be lost."
configurationOverwritePrompt = "Configuration(service.json) will be overwritten."
)
// We store a commit id here
var commit string
@ -213,6 +218,28 @@ configuration.
cfgMgr, cfgs := makeConfigs()
defer cfgMgr.Shutdown() // wait for saves
var alreadyInitialized bool
if _, err := os.Stat(configPath); !os.IsNotExist(err) {
alreadyInitialized = true
}
if alreadyInitialized {
// acquire lock for config folder
err := locker.lock()
checkErr("acquiring execution lock", err)
defer locker.tryUnlock()
if !c.Bool("force") && !yesNoPrompt(fmt.Sprintf("%s\n%s Continue? [y/n]:", stateCleanupPrompt, configurationOverwritePrompt)) {
return nil
}
err = cfgMgr.LoadJSONFromFile(configPath)
checkErr("reading configuration", err)
err = cleanupState(cfgs.consensusCfg)
checkErr("Cleaning up consensus data", err)
}
// Generate defaults for all registered components
err := cfgMgr.Default()
checkErr("generating default configuration", err)
@ -223,7 +250,7 @@ configuration.
}
// Save
saveConfig(cfgMgr, c.GlobalBool("force"))
saveConfig(cfgMgr)
return nil
},
},
@ -343,12 +370,18 @@ snapshot to be loaded as the cluster state when the cluster peer is restarted.
If an argument is provided, cluster will treat it as the path of the file to
import. If no argument is provided cluster will read json from stdin
`,
Flags: []cli.Flag{
cli.BoolFlag{
Name: "force, f",
Usage: "forcefully proceed with replacing the current state with the given one, without prompting",
},
},
Action: func(c *cli.Context) error {
err := locker.lock()
checkErr("acquiring execution lock", err)
defer locker.tryUnlock()
if !c.GlobalBool("force") {
if !c.Bool("force") {
if !yesNoPrompt("The peer's state will be replaced. Run with -h for details. Continue? [y/n]:") {
return nil
}
@ -381,13 +414,19 @@ this state from disk. This command renames cluster's data folder to <data-folde
deprecated data folders to <data-folder-name>.old.<n+1>, etc for some rotation factor before permanatly deleting
the mth data folder (m currently defaults to 5)
`,
Flags: []cli.Flag{
cli.BoolFlag{
Name: "force, f",
Usage: "forcefully proceed with rotating peer state without prompting",
},
},
Action: func(c *cli.Context) error {
err := locker.lock()
checkErr("acquiring execution lock", err)
defer locker.tryUnlock()
if !c.GlobalBool("force") {
if !yesNoPrompt("The peer's state will be removed from the load path. Existing pins may be lost. Continue? [y/n]:") {
if !c.Bool("force") {
if !yesNoPrompt(fmt.Sprintf("%s Continue? [y/n]:", stateCleanupPrompt)) {
return nil
}
}
@ -398,7 +437,6 @@ the mth data folder (m currently defaults to 5)
err = cleanupState(cfgs.consensusCfg)
checkErr("Cleaning up consensus data", err)
logger.Warningf("the %s folder has been rotated. Next start will use an empty state", cfgs.consensusCfg.GetDataFolder())
return nil
},
},

View File

@ -166,5 +166,10 @@ func exportState(state *mapstate.MapState, w io.Writer) error {
// CleanupState cleans the state
func cleanupState(cCfg *raft.Config) error {
return raft.CleanupRaft(cCfg.GetDataFolder(), cCfg.BackupsRotate)
err := raft.CleanupRaft(cCfg.GetDataFolder(), cCfg.BackupsRotate)
if err == nil {
logger.Warningf("the %s folder has been rotated. Next start will use an empty state", cCfg.GetDataFolder())
}
return err
}

View File

@ -50,7 +50,19 @@ var testingAPICfg = []byte(`{
"read_timeout": "0",
"read_header_timeout": "5s",
"write_timeout": "0",
"idle_timeout": "2m0s"
"idle_timeout": "2m0s",
"headers": {
"Access-Control-Allow-Headers": [
"X-Requested-With",
"Range"
],
"Access-Control-Allow-Methods": [
"GET"
],
"Access-Control-Allow-Origin": [
"*"
]
}
}`)
var testingIpfsCfg = []byte(`{

View File

@ -13,9 +13,9 @@ import (
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/state"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
logging "github.com/ipfs/go-log"
consensus "github.com/libp2p/go-libp2p-consensus"
rpc "github.com/libp2p/go-libp2p-gorpc"
host "github.com/libp2p/go-libp2p-host"
peer "github.com/libp2p/go-libp2p-peer"
libp2praft "github.com/libp2p/go-libp2p-raft"

View File

@ -15,22 +15,12 @@ import (
libp2p "github.com/libp2p/go-libp2p"
host "github.com/libp2p/go-libp2p-host"
peerstore "github.com/libp2p/go-libp2p-peerstore"
ma "github.com/multiformats/go-multiaddr"
)
func cleanRaft(idn int) {
os.RemoveAll(fmt.Sprintf("raftFolderFromTests-%d", idn))
}
func consensusListenAddr(c *Consensus) ma.Multiaddr {
return c.host.Addrs()[0]
}
func consensusAddr(c *Consensus) ma.Multiaddr {
cAddr, _ := ma.NewMultiaddr(fmt.Sprintf("%s/ipfs/%s", consensusListenAddr(c), c.host.ID().Pretty()))
return cAddr
}
func testPin(c cid.Cid) api.Pin {
p := api.PinCid(c)
p.ReplicationFactorMin = -1
@ -174,7 +164,7 @@ func TestConsensusAddPeer(t *testing.T) {
defer cc.Shutdown()
defer cc2.Shutdown()
cc.host.Peerstore().AddAddr(cc2.host.ID(), consensusListenAddr(cc2), peerstore.PermanentAddrTTL)
cc.host.Peerstore().AddAddrs(cc2.host.ID(), cc2.host.Addrs(), peerstore.PermanentAddrTTL)
err := cc.AddPeer(cc2.host.ID())
if err != nil {
t.Error("the operation did not make it to the log:", err)
@ -205,7 +195,7 @@ func TestConsensusRmPeer(t *testing.T) {
defer cc.Shutdown()
defer cc2.Shutdown()
cc.host.Peerstore().AddAddr(cc2.host.ID(), consensusListenAddr(cc2), peerstore.PermanentAddrTTL)
cc.host.Peerstore().AddAddrs(cc2.host.ID(), cc2.host.Addrs(), peerstore.PermanentAddrTTL)
err := cc.AddPeer(cc2.host.ID())
if err != nil {

View File

@ -36,31 +36,42 @@ func (op *LogOp) ApplyTo(cstate consensus.State) (consensus.State, error) {
panic("received unexpected state type")
}
// Copy the Cid. We are about to pass it to go-routines
// that will make things with it (read its fields). However,
// as soon as ApplyTo is done, the next operation will be deserealized
// on top of "op". This can cause data races with the slices in
// api.PinSerial, which don't get copied when passed.
pinS := op.Cid.Clone()
switch op.Type {
case LogOpPin:
err = state.Add(op.Cid.ToPin())
err = state.Add(pinS.ToPin())
if err != nil {
goto ROLLBACK
}
// Async, we let the PinTracker take care of any problems
op.consensus.rpcClient.Go("",
op.consensus.rpcClient.Go(
"",
"Cluster",
"Track",
op.Cid,
pinS,
&struct{}{},
nil)
nil,
)
case LogOpUnpin:
err = state.Rm(op.Cid.ToPin().Cid)
err = state.Rm(pinS.DecodeCid())
if err != nil {
goto ROLLBACK
}
// Async, we let the PinTracker take care of any problems
op.consensus.rpcClient.Go("",
op.consensus.rpcClient.Go(
"",
"Cluster",
"Untrack",
op.Cid,
pinS,
&struct{}{},
nil)
nil,
)
default:
logger.Error("unknown LogOp type. Ignoring")
}

45
deptools/Makefile Normal file
View File

@ -0,0 +1,45 @@
gx_version=v0.14.1
gx-go_version=v1.9.0
gateway=https://ipfs.io
local_gateway=http://127.0.0.1:8080
dist=dist.ipfs.io
dl_cmd=wget -nc
gx=gx_$(gx_version)
gx-go=gx-go_$(gx-go_version)
bin_env=$(shell go env GOHOSTOS)-$(shell go env GOHOSTARCH)
gx_tar=$(gx)_$(bin_env).tar.gz
gx-go_tar=$(gx-go)_$(bin_env).tar.gz
gx_dist_path=/ipns/$(dist)/gx/$(gx_version)/$(gx_tar)
gx-go_dist_path=/ipns/$(dist)/gx-go/$(gx-go_version)/$(gx-go_tar)
gx_download_local=$(dl_cmd) $(local_gateway)$(gx_dist_path)
gx_download=$(dl_cmd) $(gateway)$(gx_dist_path)
gx-go_download_local=$(dl_cmd) $(local_gateway)$(gx-go_dist_path)
gx-go_download=$(dl_cmd) $(gateway)$(gx-go_dist_path)
$(gx):
@echo "Downloading gx"
rm -f gx
$(gx_download_local) || $(gx_download)
tar -zxf $(gx_tar) --strip-components=1 gx/gx
mv gx $(gx)
ln -s $(gx) gx
rm $(gx_tar)
$(gx-go):
@echo "Downloading gx-go"
rm -f gx-go
$(gx-go_download_local) || $(gx-go_download)
tar -zxf $(gx-go_tar) --strip-components=1 gx-go/gx-go
mv gx-go $(gx-go)
ln -s $(gx-go) gx-go
rm $(gx-go_tar)
gx: $(gx) $(gx-go)
gx-clean:
@rm -f gx*

98
docker-compose.yml Normal file
View File

@ -0,0 +1,98 @@
version: '3.4'
# This is an example docker-compose file for IPFS Cluster
# It runs two Cluster peers (cluster0, cluster1) attached to two
# IPFS daemons (ipfs0, ipfs1).
#
# It expects a "compose" subfolder as follows where it will store configurations
# and states permanently:
#
# compose/
# |-- cluster0
# |-- cluster1
# |-- ipfs0
# |-- ipfs1
#
#
# During the first start, default configurations are created for all peers.
services:
##################################################################################
## Cluster PEER 0 ################################################################
##################################################################################
ipfs0:
container_name: ipfs0
image: ipfs/go-ipfs:release
ports:
- "4001:4001" # ipfs swarm
# - "5001:5001" # expose if needed/wanted
# - "8080:8080" # exposes if needed/wanted
volumes:
- ./compose/ipfs0:/data/ipfs
cluster0:
container_name: cluster0
image: ipfs/ipfs-cluster:latest
depends_on:
- ipfs0
environment:
CLUSTER_SECRET: ${CLUSTER_SECRET} # From shell variable
IPFS_API: /dns4/ipfs0/tcp/5001
ports:
- "127.0.0.1:9094:9094" # API
# - "9096:9096" # Cluster IPFS Proxy endpoint
volumes:
- ./compose/cluster0:/data/ipfs-cluster
##################################################################################
## Cluster PEER 1 ################################################################
##################################################################################
ipfs1:
container_name: ipfs1
image: ipfs/go-ipfs:release
ports:
- "4101:4001" # ipfs swarm
# - "5101:5001" # expose if needed/wanted
# - "8180:8080" # exposes if needed/wanted
volumes:
- ./compose/ipfs1:/data/ipfs
# cluster1 bootstraps to cluster0 if not bootstrapped before
cluster1:
container_name: cluster1
image: ipfs/ipfs-cluster:latest
depends_on:
- cluster0
- ipfs1
environment:
CLUSTER_SECRET: ${CLUSTER_SECRET} # From shell variable
IPFS_API: /dns4/ipfs1/tcp/5001
ports:
- "127.0.0.1:9194:9094" # API
# - "9196:9096" # Cluster IPFS Proxy endpoint
volumes:
- ./compose/cluster1:/data/ipfs-cluster
entrypoint:
- "/sbin/tini"
- "--"
# Translation: if state folder does not exist, find cluster0 id and bootstrap
# to it.
command: >-
sh -c '
cmd="daemon --upgrade"
if [ ! -d /data/ipfs-cluster/raft ]; then
while ! ipfs-cluster-ctl --host /dns4/cluster0/tcp/9094 id; do
sleep 1
done
pid=`ipfs-cluster-ctl --host /dns4/cluster0/tcp/9094 id | grep -o -E "^(\w+)"`
sleep 10
cmd="daemon --bootstrap /dns4/cluster0/tcp/9096/ipfs/$$pid"
fi
exec /usr/local/bin/entrypoint.sh $$cmd
'
# For adding more peers, copy PEER 1 and rename things to ipfs2, cluster2.
# Keep bootstrapping to cluster0.

View File

@ -5,8 +5,8 @@ package disk
import (
"fmt"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
logging "github.com/ipfs/go-log"
rpc "github.com/libp2p/go-libp2p-gorpc"
"github.com/ipfs/ipfs-cluster/api"
)

View File

@ -5,7 +5,7 @@ import (
"errors"
"testing"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
rpc "github.com/libp2p/go-libp2p-gorpc"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/test"

View File

@ -5,7 +5,7 @@ package numpin
import (
"fmt"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
rpc "github.com/libp2p/go-libp2p-gorpc"
"github.com/ipfs/ipfs-cluster/api"
)

View File

@ -6,7 +6,7 @@ import (
"github.com/ipfs/ipfs-cluster/api"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
rpc "github.com/libp2p/go-libp2p-gorpc"
)
type mockService struct{}

View File

@ -14,8 +14,8 @@ import (
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/state"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
cid "github.com/ipfs/go-cid"
rpc "github.com/libp2p/go-libp2p-gorpc"
peer "github.com/libp2p/go-libp2p-peer"
)

View File

@ -269,7 +269,7 @@ func createClusters(t *testing.T) ([]*Cluster, []*test.IpfsMock) {
// Start first node
clusters[0] = createCluster(t, hosts[0], cfgs[0], raftCons[0], apis[0], ipfss[0], states[0], trackers[0], mons[0], allocs[0], infs[0])
<-clusters[0].Ready()
bootstrapAddr, _ := ma.NewMultiaddr(fmt.Sprintf("%s/ipfs/%s", clusters[0].host.Addrs()[0], clusters[0].id.Pretty()))
bootstrapAddr := clusterAddr(clusters[0])
// Start the rest and join
for i := 1; i < nClusters; i++ {

View File

@ -17,10 +17,10 @@ import (
"github.com/ipfs/ipfs-cluster/api"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
cid "github.com/ipfs/go-cid"
"github.com/ipfs/go-ipfs-cmdkit/files"
files "github.com/ipfs/go-ipfs-files"
logging "github.com/ipfs/go-log"
rpc "github.com/libp2p/go-libp2p-gorpc"
peer "github.com/libp2p/go-libp2p-peer"
ma "github.com/multiformats/go-multiaddr"
madns "github.com/multiformats/go-multiaddr-dns"
@ -454,7 +454,7 @@ func (ipfs *Connector) apiURL() string {
func (ipfs *Connector) ConnectSwarms() error {
ctx, cancel := context.WithTimeout(ipfs.ctx, ipfs.config.IPFSRequestTimeout)
defer cancel()
var idsSerial []api.IDSerial
idsSerial := make([]api.IDSerial, 0)
err := ipfs.rpcClient.Call(
"",
"Cluster",

View File

@ -14,8 +14,8 @@ import (
"github.com/ipfs/ipfs-cluster/monitor/metrics"
"github.com/ipfs/ipfs-cluster/rpcutil"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
logging "github.com/ipfs/go-log"
rpc "github.com/libp2p/go-libp2p-gorpc"
peer "github.com/libp2p/go-libp2p-peer"
)

View File

@ -71,14 +71,14 @@ func (mtrs *Store) Latest(name string) []api.Metric {
// PeerMetrics returns the latest metrics for a given peer ID for
// all known metrics types. It may return expired metrics.
func (mtrs *Store) PeerMetrics(peer peer.ID) []api.Metric {
func (mtrs *Store) PeerMetrics(pid peer.ID) []api.Metric {
mtrs.mux.RLock()
defer mtrs.mux.RUnlock()
result := make([]api.Metric, 0)
for _, byPeer := range mtrs.byName {
window, ok := byPeer[peer]
window, ok := byPeer[pid]
if !ok {
continue
}

View File

@ -10,8 +10,8 @@ import (
// peerset
func PeersetFilter(metrics []api.Metric, peerset []peer.ID) []api.Metric {
peerMap := make(map[peer.ID]struct{})
for _, peer := range peerset {
peerMap[peer] = struct{}{}
for _, pid := range peerset {
peerMap[pid] = struct{}{}
}
filtered := make([]api.Metric, 0, len(metrics))

View File

@ -5,16 +5,17 @@ package pubsubmon
import (
"bytes"
"context"
"sync"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/monitor/metrics"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
logging "github.com/ipfs/go-log"
floodsub "github.com/libp2p/go-floodsub"
rpc "github.com/libp2p/go-libp2p-gorpc"
host "github.com/libp2p/go-libp2p-host"
peer "github.com/libp2p/go-libp2p-peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
msgpack "github.com/multiformats/go-multicodec/msgpack"
)
@ -34,8 +35,8 @@ type Monitor struct {
rpcReady chan struct{}
host host.Host
pubsub *floodsub.PubSub
subscription *floodsub.Subscription
pubsub *pubsub.PubSub
subscription *pubsub.Subscription
metrics *metrics.Store
checker *metrics.Checker
@ -59,7 +60,7 @@ func New(h host.Host, cfg *Config) (*Monitor, error) {
mtrs := metrics.NewStore()
checker := metrics.NewChecker(mtrs)
pubsub, err := floodsub.NewFloodSub(ctx, h)
pubsub, err := pubsub.NewGossipSub(ctx, h)
if err != nil {
cancel()
return nil, err

View File

@ -3,6 +3,7 @@ package pubsubmon
import (
"context"
"fmt"
"strconv"
"sync"
"testing"
@ -10,13 +11,19 @@ import (
libp2p "github.com/libp2p/go-libp2p"
peer "github.com/libp2p/go-libp2p-peer"
peerstore "github.com/libp2p/go-libp2p-peerstore"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/test"
)
func init() {
// GossipSub needs to heartbeat to discover newly connected hosts
// This speeds things up a little.
pubsub.GossipSubHeartbeatInterval = 50 * time.Millisecond
}
type metricFactory struct {
l sync.Mutex
counter int

View File

@ -15,15 +15,15 @@
},
{
"author": "whyrusleeping",
"hash": "QmUEqyXr97aUbNmQADHYNknjwjjdVpJXEt1UZXmSG81EV4",
"hash": "QmUDTcnDp2WssbmiDLC6aYurUeyt7QeRakHUQMxA2mZ5iB",
"name": "go-libp2p",
"version": "6.0.12"
"version": "6.0.23"
},
{
"author": "hsanjuan",
"hash": "QmaR1KVXHKDyysrMUCT1WKbu9fkFs12U7nLDPhfkswonzj",
"hash": "Qmc2WbBR7qA8tYfXwEQKgiHWNn3tZS3e2BhaqqsPoghzYP",
"name": "go-libp2p-raft",
"version": "1.2.13"
"version": "1.2.14"
},
{
"author": "urfave",
@ -45,15 +45,15 @@
},
{
"author": "hsanjuan",
"hash": "QmXyteEWrYHVJFEA8oX9cSfRp6PJ2kiVsmsFqPMi9ue1Ek",
"hash": "QmPYiV9nwnXPxdn9zDgY4d9yaHwTS414sUb1K6nvQVHqqo",
"name": "go-libp2p-gorpc",
"version": "1.0.18"
"version": "1.0.24"
},
{
"author": "libp2p",
"hash": "QmZaQ3K9PRd5sYYoG1xbTGPtd3N7TYiKBRmcBUTsx8HVET",
"hash": "QmY4Q5JC4vxLEi8EpVxJM4rcRryEVtH1zRKVTAm6BKV1pg",
"name": "go-libp2p-pnet",
"version": "3.0.2"
"version": "3.0.4"
},
{
"author": "ZenGround0",
@ -63,27 +63,21 @@
},
{
"author": "dignifiedquire",
"hash": "QmZzgxSj8QpR58KmdeNj97eD66X6xeDAFNjpP2xTY9oKeQ",
"hash": "Qmc4w3gm2TqoEbTYjpPs5FXP8DEB6cuvZWPy6bUTKiht7a",
"name": "go-fs-lock",
"version": "0.1.7"
"version": "0.1.8"
},
{
"author": "hsanjuan",
"hash": "Qmc6tqtdKn1fVCGmU2StfULdXb8xPxmGh19NsYsgVkqjbw",
"hash": "QmbuqT17YGSLRGiDRSJqVoXzTXrj4R5y6u4px2q42iyf23",
"name": "go-libp2p-http",
"version": "1.1.1"
"version": "1.1.6"
},
{
"author": "ipfs",
"hash": "QmR8y7XSkmWSpae9vm7YRES6Bz93pTXX1abeSVKDuNEFeq",
"hash": "QmQcy2Frmpvm5D5oiWoKXfs71eh3MXTv7LN12J39hJioBi",
"name": "go-ipfs-api",
"version": "1.3.6"
},
{
"author": "whyrusleeping",
"hash": "QmY1L5krVk8dv8d74uESmJTXGpoigVYqBVxXXz1aS8aFSb",
"name": "go-libp2p-floodsub",
"version": "0.9.28"
"version": "1.4.2"
},
{
"author": "whyrusleeping",
@ -99,15 +93,15 @@
},
{
"author": "hsanjuan",
"hash": "QmdSeG9s4EQ9TGruJJS9Us38TQDZtMmFGwzTYUDVqNTURm",
"hash": "QmTUTG9Jg9ZRA1EzTPGTDvnwfcfKhDMnqANnP9fe4rSjMR",
"name": "go-ipfs-chunker",
"version": "0.1.0"
"version": "0.1.3"
},
{
"author": "hector",
"hash": "QmPG32VXR5jmpo9q8R9FNdR4Ae97Ky9CiZE6SctJLUB79H",
"hash": "QmQyUyYcpKG1u53V7N25qRTGw5XwaAxTMKXbduqHotQztg",
"name": "go-ipfs-posinfo",
"version": "0.1.0"
"version": "0.1.2"
},
{
"author": "dustin",
@ -117,42 +111,50 @@
},
{
"author": "why",
"hash": "QmPL8bYtbACcSFFiSr4s2du7Na382NxRADR8hC7D9FkEA2",
"hash": "QmfB3oNXGGq9S4B2a9YeCajoATms3Zw2VvDm8fK7VeLSV8",
"name": "go-unixfs",
"version": "1.1.1"
"version": "1.1.16"
},
{
"author": "why",
"hash": "QmXv5mwmQ74r4aiHcNeQ4GAmfB3aWJuqaE4WyDfDfvkgLM",
"hash": "QmSei8kFMfqdJq7Q68d2LMnHbTWKKg2daA29ezUYFAUNgc",
"name": "go-merkledag",
"version": "1.1.1"
"version": "1.1.15"
},
{
"hash": "QmaXYSwxqJsX3EoGb1ZV2toZ9fXc8hWJPaBW1XAp1h2Tsp",
"hash": "QmQHnqaNULV8WeUGgh97o9K3KAW6kWQmDyNf9UuikgnPTe",
"name": "go-libp2p-kad-dht",
"version": "4.4.0"
},
{
"hash": "Qmbq7kGxgcpALGLPaWDyTa6KUq5kBUKdEvkvPZcBkJoLex",
"name": "go-log",
"version": "1.5.6"
},
{
"hash": "QmSP88ryZkHSRn1fnngAaV2Vcn63WUJzAavnRM9CVdU1Ky",
"name": "go-ipfs-cmdkit",
"version": "1.1.3"
"version": "4.4.12"
},
{
"author": "hsanjuan",
"hash": "QmRkrpnhZqDxTxwGCsDbuZMr7uCFZHH6SGfrcjgEQwxF3t",
"hash": "QmTFCS9QwPQEihtjqi6zTX5n21J5y1yQwc3hnk7t3hZDLN",
"name": "go-mfs",
"version": "0.1.1"
"version": "0.1.17"
},
{
"author": "blang",
"hash": "QmYRGECuvQnRX73fcvPnGbYijBcGN2HbKZQ7jh26qmLiHG",
"name": "semver",
"version": "3.5.1"
},
{
"author": "magik6k",
"hash": "QmZMWMvWMVKCbHetJ4RgndbuEF1io2UpUxwQwtNjtYPzSC",
"name": "go-ipfs-files",
"version": "1.0.1"
},
{
"author": "lanzafame",
"hash": "QmYgGtLm9WJRgh6iuaZap8qVC1gqixFbZCNfhjLNBhWMCm",
"name": "envconfig",
"version": "1.3.1"
},
{
"author": "whyrusleeping",
"hash": "QmYmrxfax5xGfLF6SL2Bq7SDEzFZFyNcLvGi8ExdC5iiko",
"name": "go-libp2p-pubsub",
"version": "100.11.3"
}
],
"gxVersion": "0.11.0",
@ -160,6 +162,6 @@
"license": "MIT",
"name": "ipfs-cluster",
"releaseCmd": "git commit -S -a -m \"gx publish $VERSION\"",
"version": "0.6.0"
"version": "0.7.0"
}

View File

@ -50,8 +50,14 @@ func peerManagerClusters(t *testing.T) ([]*Cluster, []*test.IpfsMock) {
}
func clusterAddr(c *Cluster) ma.Multiaddr {
cAddr, _ := ma.NewMultiaddr(fmt.Sprintf("%s/ipfs/%s", c.host.Addrs()[0], c.id.Pretty()))
return cAddr
for _, a := range c.host.Addrs() {
if _, err := a.ValueForProtocol(ma.P_IP4); err == nil {
p := peer.IDB58Encode(c.id)
cAddr, _ := ma.NewMultiaddr(fmt.Sprintf("%s/ipfs/%s", a, p))
return cAddr
}
}
return nil
}
func TestClustersPeerAdd(t *testing.T) {

View File

@ -11,9 +11,9 @@ import (
"github.com/ipfs/ipfs-cluster/pintracker/optracker"
"github.com/ipfs/ipfs-cluster/pintracker/util"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
cid "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
rpc "github.com/libp2p/go-libp2p-gorpc"
peer "github.com/libp2p/go-libp2p-peer"
)

View File

@ -8,8 +8,8 @@ import (
"testing"
"time"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
cid "github.com/ipfs/go-cid"
rpc "github.com/libp2p/go-libp2p-gorpc"
peer "github.com/libp2p/go-libp2p-peer"
"github.com/ipfs/ipfs-cluster/api"

View File

@ -94,6 +94,7 @@ func (opt *OperationTracker) Status(c cid.Cid) (api.TrackerStatus, bool) {
// SetError transitions an operation for a Cid into PhaseError if its Status
// is PhaseDone. Any other phases are considered in-flight and not touched.
// For things already in error, the error message is updated.
// Remote pins are ignored too.
func (opt *OperationTracker) SetError(c cid.Cid, err error) {
opt.mu.Lock()
defer opt.mu.Unlock()
@ -102,6 +103,10 @@ func (opt *OperationTracker) SetError(c cid.Cid, err error) {
return
}
if ty := op.Type(); ty == OperationRemote {
return
}
if ph := op.Phase(); ph == PhaseDone || ph == PhaseError {
op.SetPhase(PhaseError)
op.SetError(err)

View File

@ -9,15 +9,16 @@ import (
"testing"
"time"
peer "github.com/libp2p/go-libp2p-peer"
ipfscluster "github.com/ipfs/ipfs-cluster"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/pintracker/maptracker"
"github.com/ipfs/ipfs-cluster/pintracker/stateless"
"github.com/ipfs/ipfs-cluster/test"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-peer"
rpc "github.com/libp2p/go-libp2p-gorpc"
)
var (
@ -60,6 +61,9 @@ func (mock *mockService) IPFSPinLsCid(ctx context.Context, in api.PinSerial, out
switch in.Cid {
case test.TestCid1, test.TestCid2:
*out = api.IPFSPinStatusRecursive
case test.TestCid4:
*out = api.IPFSPinStatusError
return errors.New("an ipfs error")
default:
*out = api.IPFSPinStatusUnpinned
}
@ -85,94 +89,6 @@ func (mock *mockService) IPFSPinLs(ctx context.Context, in string, out *map[stri
return nil
}
func (mock *mockService) Status(ctx context.Context, in api.PinSerial, out *api.GlobalPinInfoSerial) error {
switch in.Cid {
case test.ErrorCid:
return test.ErrBadCid
case test.TestCid1:
c1, _ := cid.Decode(test.TestCid1)
*out = api.GlobalPinInfo{
Cid: c1,
PeerMap: map[peer.ID]api.PinInfo{
test.TestPeerID1: {
Cid: c1,
Peer: test.TestPeerID1,
Status: api.TrackerStatusPinned,
TS: time.Now(),
},
},
}.ToSerial()
// case test.TestSlowCid1:
// sc1 := test.MustDecodeCid(test.TestSlowCid1)
// *out = api.GlobalPinInfo{
// Cid: sc1,
// PeerMap: map[peer.ID]api.PinInfo{
// test.TestPeerID1: {
// Cid: sc1,
// Peer: test.TestPeerID1,
// Status: api.TrackerStatusPinned,
// TS: time.Now(),
// },
// },
// }.ToSerial()
}
return nil
}
func (mock *mockService) StatusAll(ctx context.Context, in struct{}, out *[]api.GlobalPinInfoSerial) error {
c1, _ := cid.Decode(test.TestCid1)
c2, _ := cid.Decode(test.TestCid2)
c3, _ := cid.Decode(test.TestCid3)
slowC1 := test.MustDecodeCid(test.TestSlowCid1)
*out = ipfscluster.GlobalPinInfoSliceToSerial([]api.GlobalPinInfo{
{
Cid: c1,
PeerMap: map[peer.ID]api.PinInfo{
test.TestPeerID1: {
Cid: c1,
Peer: test.TestPeerID1,
Status: api.TrackerStatusPinned,
TS: time.Now(),
},
},
},
{
Cid: c2,
PeerMap: map[peer.ID]api.PinInfo{
test.TestPeerID1: {
Cid: c2,
Peer: test.TestPeerID1,
Status: api.TrackerStatusPinning,
TS: time.Now(),
},
},
},
{
Cid: c3,
PeerMap: map[peer.ID]api.PinInfo{
test.TestPeerID1: {
Cid: c3,
Peer: test.TestPeerID1,
Status: api.TrackerStatusPinError,
TS: time.Now(),
},
},
},
{
Cid: slowC1,
PeerMap: map[peer.ID]api.PinInfo{
test.TestPeerID1: {
Cid: slowC1,
Peer: test.TestPeerID1,
Status: api.TrackerStatusPinning,
TS: time.Now(),
},
},
},
})
return nil
}
func (mock *mockService) Pins(ctx context.Context, in struct{}, out *[]api.PinSerial) error {
*out = []api.PinSerial{
api.PinWithOpts(test.MustDecodeCid(test.TestCid1), pinOpts).ToSerial(),
@ -1068,3 +984,49 @@ func TestTrackUntrackWithCancel(t *testing.T) {
})
}
}
func TestPinTracker_RemoteIgnoresError(t *testing.T) {
testF := func(t *testing.T, pt ipfscluster.PinTracker) {
remoteCid := test.MustDecodeCid(test.TestCid4)
remote := api.PinWithOpts(remoteCid, pinOpts)
remote.Allocations = []peer.ID{test.TestPeerID2}
remote.ReplicationFactorMin = 1
remote.ReplicationFactorMax = 1
err := pt.Track(remote)
if err != nil {
t.Fatal(err)
}
// Sync triggers IPFSPinLs which will return an error
// (see mock)
pi, err := pt.Sync(remoteCid)
if err != nil {
t.Fatal(err)
}
if pi.Status != api.TrackerStatusRemote || pi.Error != "" {
t.Error("Remote pin should not be in error")
}
pi = pt.Status(remoteCid)
if err != nil {
t.Fatal(err)
}
if pi.Status != api.TrackerStatusRemote || pi.Error != "" {
t.Error("Remote pin should not be in error")
}
}
t.Run("basic pintracker", func(t *testing.T) {
pt := testMapPinTracker(t)
testF(t, pt)
})
t.Run("stateless pintracker", func(t *testing.T) {
pt := testStatelessPinTracker(t)
testF(t, pt)
})
}

View File

@ -12,9 +12,9 @@ import (
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/pintracker/optracker"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
cid "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
rpc "github.com/libp2p/go-libp2p-gorpc"
peer "github.com/libp2p/go-libp2p-peer"
)

View File

@ -7,8 +7,8 @@ import (
"testing"
"time"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
cid "github.com/ipfs/go-cid"
rpc "github.com/libp2p/go-libp2p-gorpc"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/test"

View File

@ -37,7 +37,7 @@ func (rpcapi *RPCAPI) Pin(ctx context.Context, in api.PinSerial, out *struct{})
// Unpin runs Cluster.Unpin().
func (rpcapi *RPCAPI) Unpin(ctx context.Context, in api.PinSerial, out *struct{}) error {
c := in.ToPin().Cid
c := in.DecodeCid()
return rpcapi.c.Unpin(c)
}
@ -124,7 +124,7 @@ func (rpcapi *RPCAPI) StatusAllLocal(ctx context.Context, in struct{}, out *[]ap
// Status runs Cluster.Status().
func (rpcapi *RPCAPI) Status(ctx context.Context, in api.PinSerial, out *api.GlobalPinInfoSerial) error {
c := in.ToPin().Cid
c := in.DecodeCid()
pinfo, err := rpcapi.c.Status(c)
*out = pinfo.ToSerial()
return err
@ -132,7 +132,7 @@ func (rpcapi *RPCAPI) Status(ctx context.Context, in api.PinSerial, out *api.Glo
// StatusLocal runs Cluster.StatusLocal().
func (rpcapi *RPCAPI) StatusLocal(ctx context.Context, in api.PinSerial, out *api.PinInfoSerial) error {
c := in.ToPin().Cid
c := in.DecodeCid()
pinfo := rpcapi.c.StatusLocal(c)
*out = pinfo.ToSerial()
return nil
@ -154,7 +154,7 @@ func (rpcapi *RPCAPI) SyncAllLocal(ctx context.Context, in struct{}, out *[]api.
// Sync runs Cluster.Sync().
func (rpcapi *RPCAPI) Sync(ctx context.Context, in api.PinSerial, out *api.GlobalPinInfoSerial) error {
c := in.ToPin().Cid
c := in.DecodeCid()
pinfo, err := rpcapi.c.Sync(c)
*out = pinfo.ToSerial()
return err
@ -162,7 +162,7 @@ func (rpcapi *RPCAPI) Sync(ctx context.Context, in api.PinSerial, out *api.Globa
// SyncLocal runs Cluster.SyncLocal().
func (rpcapi *RPCAPI) SyncLocal(ctx context.Context, in api.PinSerial, out *api.PinInfoSerial) error {
c := in.ToPin().Cid
c := in.DecodeCid()
pinfo, err := rpcapi.c.SyncLocal(c)
*out = pinfo.ToSerial()
return err
@ -177,7 +177,7 @@ func (rpcapi *RPCAPI) RecoverAllLocal(ctx context.Context, in struct{}, out *[]a
// Recover runs Cluster.Recover().
func (rpcapi *RPCAPI) Recover(ctx context.Context, in api.PinSerial, out *api.GlobalPinInfoSerial) error {
c := in.ToPin().Cid
c := in.DecodeCid()
pinfo, err := rpcapi.c.Recover(c)
*out = pinfo.ToSerial()
return err
@ -185,7 +185,7 @@ func (rpcapi *RPCAPI) Recover(ctx context.Context, in api.PinSerial, out *api.Gl
// RecoverLocal runs Cluster.RecoverLocal().
func (rpcapi *RPCAPI) RecoverLocal(ctx context.Context, in api.PinSerial, out *api.PinInfoSerial) error {
c := in.ToPin().Cid
c := in.DecodeCid()
pinfo, err := rpcapi.c.RecoverLocal(c)
*out = pinfo.ToSerial()
return err
@ -205,12 +205,12 @@ func (rpcapi *RPCAPI) BlockAllocate(ctx context.Context, in api.PinSerial, out *
// Returned metrics are Valid and belong to current
// Cluster peers.
metrics := rpcapi.c.monitor.LatestMetrics(pingMetricName)
peers := make([]peer.ID, len(metrics), len(metrics))
peers := make([]string, len(metrics), len(metrics))
for i, m := range metrics {
peers[i] = m.Peer
peers[i] = peer.IDB58Encode(m.Peer)
}
*out = api.PeersToStrings(peers)
*out = peers
return nil
}
@ -248,7 +248,7 @@ func (rpcapi *RPCAPI) Track(ctx context.Context, in api.PinSerial, out *struct{}
// Untrack runs PinTracker.Untrack().
func (rpcapi *RPCAPI) Untrack(ctx context.Context, in api.PinSerial, out *struct{}) error {
c := in.ToPin().Cid
c := in.DecodeCid()
return rpcapi.c.tracker.Untrack(c)
}
@ -260,7 +260,7 @@ func (rpcapi *RPCAPI) TrackerStatusAll(ctx context.Context, in struct{}, out *[]
// TrackerStatus runs PinTracker.Status().
func (rpcapi *RPCAPI) TrackerStatus(ctx context.Context, in api.PinSerial, out *api.PinInfoSerial) error {
c := in.ToPin().Cid
c := in.DecodeCid()
pinfo := rpcapi.c.tracker.Status(c)
*out = pinfo.ToSerial()
return nil
@ -275,7 +275,7 @@ func (rpcapi *RPCAPI) TrackerRecoverAll(ctx context.Context, in struct{}, out *[
// TrackerRecover runs PinTracker.Recover().
func (rpcapi *RPCAPI) TrackerRecover(ctx context.Context, in api.PinSerial, out *api.PinInfoSerial) error {
c := in.ToPin().Cid
c := in.DecodeCid()
pinfo, err := rpcapi.c.tracker.Recover(c)
*out = pinfo.ToSerial()
return err
@ -287,20 +287,20 @@ func (rpcapi *RPCAPI) TrackerRecover(ctx context.Context, in api.PinSerial, out
// IPFSPin runs IPFSConnector.Pin().
func (rpcapi *RPCAPI) IPFSPin(ctx context.Context, in api.PinSerial, out *struct{}) error {
c := in.ToPin().Cid
c := in.DecodeCid()
depth := in.ToPin().MaxDepth
return rpcapi.c.ipfs.Pin(ctx, c, depth)
}
// IPFSUnpin runs IPFSConnector.Unpin().
func (rpcapi *RPCAPI) IPFSUnpin(ctx context.Context, in api.PinSerial, out *struct{}) error {
c := in.ToPin().Cid
c := in.DecodeCid()
return rpcapi.c.ipfs.Unpin(ctx, c)
}
// IPFSPinLsCid runs IPFSConnector.PinLsCid().
func (rpcapi *RPCAPI) IPFSPinLsCid(ctx context.Context, in api.PinSerial, out *api.IPFSPinStatus) error {
c := in.ToPin().Cid
c := in.DecodeCid()
b, err := rpcapi.c.ipfs.PinLsCid(ctx, c)
*out = b
return err
@ -347,7 +347,7 @@ func (rpcapi *RPCAPI) IPFSBlockPut(ctx context.Context, in api.NodeWithMeta, out
// IPFSBlockGet runs IPFSConnector.BlockGet().
func (rpcapi *RPCAPI) IPFSBlockGet(ctx context.Context, in api.PinSerial, out *[]byte) error {
c := in.ToPin().Cid
c := in.DecodeCid()
res, err := rpcapi.c.ipfs.BlockGet(c)
*out = res
return err

View File

@ -14,13 +14,13 @@ cluster_kill
test_expect_success IPFS,CLUSTER "state import fails on incorrect format" '
sleep 5 &&
echo "not exactly json" > badImportFile &&
test_expect_code 1 ipfs-cluster-service -f --config "test-config" state import badImportFile
test_expect_code 1 ipfs-cluster-service --config "test-config" state import -f badImportFile
'
test_expect_success IPFS,CLUSTER,IMPORTSTATE "state import succeeds on correct format" '
sleep 5
cid=`docker exec ipfs sh -c "echo test_53 | ipfs add -q"` &&
ipfs-cluster-service -f --debug --config "test-config" state import importState &&
ipfs-cluster-service --debug --config "test-config" state import -f importState &&
cluster_start &&
sleep 5 &&
ipfs-cluster-ctl pin ls "$cid" | grep -q "$cid" &&

View File

@ -14,7 +14,7 @@ test_expect_success IPFS,CLUSTER "state cleanup refreshes state on restart" '
ipfs-cluster-ctl status "$cid" | grep -q -i "PINNED" &&
[ 1 -eq "$(ipfs-cluster-ctl --enc=json status | jq ". | length")" ] &&
cluster_kill && sleep 5 &&
ipfs-cluster-service -f --debug --config "test-config" state cleanup &&
ipfs-cluster-service --debug --config "test-config" state cleanup -f &&
cluster_start && sleep 5 &&
[ 0 -eq "$(ipfs-cluster-ctl --enc=json status | jq ". | length")" ]
'
@ -25,8 +25,8 @@ test_expect_success IPFS,CLUSTER "export + cleanup + import == noop" '
[ 1 -eq "$(ipfs-cluster-ctl --enc=json status | jq ". | length")" ] &&
cluster_kill && sleep 5 &&
ipfs-cluster-service --debug --config "test-config" state export -f import.json &&
ipfs-cluster-service -f --debug --config "test-config" state cleanup &&
ipfs-cluster-service -f --debug --config "test-config" state import import.json &&
ipfs-cluster-service --debug --config "test-config" state cleanup -f &&
ipfs-cluster-service --debug --config "test-config" state import -f import.json &&
cluster_start && sleep 5 &&
ipfs-cluster-ctl pin ls "$cid" | grep -q "$cid" &&
ipfs-cluster-ctl status "$cid" | grep -q -i "PINNED" &&

View File

@ -8,8 +8,8 @@ import (
"github.com/ipfs/ipfs-cluster/api"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
cid "github.com/ipfs/go-cid"
rpc "github.com/libp2p/go-libp2p-gorpc"
host "github.com/libp2p/go-libp2p-host"
peer "github.com/libp2p/go-libp2p-peer"
)

View File

@ -8,7 +8,7 @@ import (
"path/filepath"
"testing"
"github.com/ipfs/go-ipfs-cmdkit/files"
files "github.com/ipfs/go-ipfs-files"
)
const shardingTestDir = "shardTesting"

View File

@ -9,7 +9,7 @@ import (
// Version is the current cluster version. Version alignment between
// components, apis and tools ensures compatibility among them.
var Version = semver.MustParse("0.6.0")
var Version = semver.MustParse("0.7.0")
// RPCProtocol is used to send libp2p messages between cluster peers
var RPCProtocol = protocol.ID(