ipfsproxy: hijack repo/gc and trigger cluster-wide GC
This adds hijacking of the repo/gc endpoint to the proxy to do cluster-wide gc.
This commit is contained in:
parent
db352534df
commit
e1faf12bae
|
@ -249,6 +249,10 @@ func New(cfg *Config) (*Server, error) {
|
||||||
Path("/repo/stat").
|
Path("/repo/stat").
|
||||||
HandlerFunc(proxy.repoStatHandler).
|
HandlerFunc(proxy.repoStatHandler).
|
||||||
Name("RepoStat")
|
Name("RepoStat")
|
||||||
|
hijackSubrouter.
|
||||||
|
Path("/repo/gc").
|
||||||
|
HandlerFunc(proxy.repoGCHandler).
|
||||||
|
Name("RepoGC")
|
||||||
|
|
||||||
// Everything else goes to the IPFS daemon.
|
// Everything else goes to the IPFS daemon.
|
||||||
router.PathPrefix("/").Handler(reverseProxy)
|
router.PathPrefix("/").Handler(reverseProxy)
|
||||||
|
@ -647,6 +651,63 @@ func (proxy *Server) repoStatHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type ipfsRepoGCResp struct {
|
||||||
|
Key cid.Cid `json:",omitempty"`
|
||||||
|
Error string `json:",omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (proxy *Server) repoGCHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
queryValues := r.URL.Query()
|
||||||
|
streamErrors := queryValues.Get("stream-errors") == "true"
|
||||||
|
// ignoring `quiet` since it only affects text output
|
||||||
|
|
||||||
|
proxy.setHeaders(w.Header(), r)
|
||||||
|
|
||||||
|
w.Header().Set("Trailer", "X-Stream-Error")
|
||||||
|
var repoGC api.GlobalRepoGC
|
||||||
|
err := proxy.rpcClient.CallContext(
|
||||||
|
r.Context(),
|
||||||
|
"",
|
||||||
|
"Cluster",
|
||||||
|
"RepoGC",
|
||||||
|
struct{}{},
|
||||||
|
&repoGC,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
ipfsErrorResponder(w, err.Error(), -1)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
enc := json.NewEncoder(w)
|
||||||
|
var ipfsRepoGC ipfsRepoGCResp
|
||||||
|
mError := multiError{}
|
||||||
|
for _, gc := range repoGC.PeerMap {
|
||||||
|
for _, key := range gc.Keys {
|
||||||
|
if streamErrors {
|
||||||
|
ipfsRepoGC = ipfsRepoGCResp{Key: key.Key, Error: key.Error}
|
||||||
|
} else {
|
||||||
|
ipfsRepoGC = ipfsRepoGCResp{Key: key.Key}
|
||||||
|
if key.Error != "" {
|
||||||
|
mError.add(key.Error)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cluster tags start with small letter, but IPFS tags with capital letter.
|
||||||
|
if err := enc.Encode(ipfsRepoGC); err != nil {
|
||||||
|
logger.Error(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
mErrStr := mError.Error()
|
||||||
|
if !streamErrors && mErrStr != "" {
|
||||||
|
w.Header().Set("X-Stream-Error", mErrStr)
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// slashHandler returns a handler which converts a /a/b/c/<argument> request
|
// slashHandler returns a handler which converts a /a/b/c/<argument> request
|
||||||
// into an /a/b/c/<argument>?arg=<argument> one. And uses the given origHandler
|
// into an /a/b/c/<argument>?arg=<argument> one. And uses the given origHandler
|
||||||
// for it. Our handlers expect that arguments are passed in the ?arg query
|
// for it. Our handlers expect that arguments are passed in the ?arg query
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
@ -534,6 +535,82 @@ func TestProxyRepoStat(t *testing.T) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestProxyRepoGC(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
proxy, mock := testIPFSProxy(t)
|
||||||
|
defer mock.Close()
|
||||||
|
defer proxy.Shutdown(ctx)
|
||||||
|
|
||||||
|
type testcase struct {
|
||||||
|
name string
|
||||||
|
streamErrors bool
|
||||||
|
}
|
||||||
|
|
||||||
|
testcases := []testcase{
|
||||||
|
testcase{
|
||||||
|
name: "With streaming errors",
|
||||||
|
streamErrors: true,
|
||||||
|
},
|
||||||
|
testcase{
|
||||||
|
name: "Without streaming errors",
|
||||||
|
streamErrors: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range testcases {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
res1, err := http.Post(fmt.Sprintf("%s/repo/gc?stream-errors=%t", proxyURL(proxy), tc.streamErrors), "", nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer res1.Body.Close()
|
||||||
|
if res1.StatusCode != http.StatusOK {
|
||||||
|
t.Error("request should have succeeded")
|
||||||
|
}
|
||||||
|
|
||||||
|
var repoGC []ipfsRepoGCResp
|
||||||
|
dec := json.NewDecoder(res1.Body)
|
||||||
|
for {
|
||||||
|
resp := ipfsRepoGCResp{}
|
||||||
|
|
||||||
|
if err := dec.Decode(&resp); err != nil {
|
||||||
|
if err == io.EOF {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
repoGC = append(repoGC, resp)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !repoGC[0].Key.Equals(test.Cid1) {
|
||||||
|
t.Errorf("expected a different cid, expected: %s, found: %s", test.Cid1, repoGC[0].Key)
|
||||||
|
}
|
||||||
|
|
||||||
|
xStreamError, ok := res1.Trailer["X-Stream-Error"]
|
||||||
|
if !ok {
|
||||||
|
t.Error("trailer header X-Stream-Error not set")
|
||||||
|
}
|
||||||
|
if tc.streamErrors {
|
||||||
|
if repoGC[4].Error != test.ErrLinkNotFound.Error() {
|
||||||
|
t.Error("expected a different error")
|
||||||
|
}
|
||||||
|
if len(xStreamError) != 0 {
|
||||||
|
t.Error("expected X-Stream-Error header to be empty")
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if repoGC[4].Error != "" {
|
||||||
|
t.Error("did not expect to stream error")
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(xStreamError) == 0 || xStreamError[0] != (test.ErrLinkNotFound.Error()+";") {
|
||||||
|
t.Error("expected X-Stream-Error header with link not found error")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestProxyAdd(t *testing.T) {
|
func TestProxyAdd(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
proxy, mock := testIPFSProxy(t)
|
proxy, mock := testIPFSProxy(t)
|
||||||
|
|
19
api/ipfsproxy/util.go
Normal file
19
api/ipfsproxy/util.go
Normal file
|
@ -0,0 +1,19 @@
|
||||||
|
package ipfsproxy
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
// MultiError contains the results of multiple errors.
|
||||||
|
type multiError struct {
|
||||||
|
err strings.Builder
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *multiError) add(err string) {
|
||||||
|
e.err.WriteString(err)
|
||||||
|
e.err.WriteString("; ")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *multiError) Error() string {
|
||||||
|
return e.err.String()
|
||||||
|
}
|
|
@ -17,9 +17,13 @@ import (
|
||||||
rpc "github.com/libp2p/go-libp2p-gorpc"
|
rpc "github.com/libp2p/go-libp2p-gorpc"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ErrBadCid is returned when using ErrorCid. Operations with that CID always
|
var (
|
||||||
// fail.
|
// ErrBadCid is returned when using ErrorCid. Operations with that CID always
|
||||||
var ErrBadCid = errors.New("this is an expected error when using ErrorCid")
|
// fail.
|
||||||
|
ErrBadCid = errors.New("this is an expected error when using ErrorCid")
|
||||||
|
// ErrLinkNotFound is error returned when no link is found
|
||||||
|
ErrLinkNotFound = errors.New("no link by that name")
|
||||||
|
)
|
||||||
|
|
||||||
// NewMockRPCClient creates a mock ipfs-cluster RPC server and returns
|
// NewMockRPCClient creates a mock ipfs-cluster RPC server and returns
|
||||||
// a client to it.
|
// a client to it.
|
||||||
|
@ -339,6 +343,18 @@ func (mock *mockCluster) RepoGCLocal(ctx context.Context, in struct{}, out *api.
|
||||||
{
|
{
|
||||||
Key: Cid1,
|
Key: Cid1,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
Key: Cid2,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: Cid3,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: Cid4,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Error: ErrLinkNotFound.Error(),
|
||||||
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user