From e1faf12bae1de724027a8a3fc7c6d270cd71d158 Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Fri, 6 Dec 2019 17:38:57 +0530 Subject: [PATCH] ipfsproxy: hijack repo/gc and trigger cluster-wide GC This adds hijacking of the repo/gc endpoint to the proxy to do cluster-wide gc. --- api/ipfsproxy/ipfsproxy.go | 61 ++++++++++++++++++++++++++ api/ipfsproxy/ipfsproxy_test.go | 77 +++++++++++++++++++++++++++++++++ api/ipfsproxy/util.go | 19 ++++++++ test/rpc_api_mock.go | 22 ++++++++-- 4 files changed, 176 insertions(+), 3 deletions(-) create mode 100644 api/ipfsproxy/util.go diff --git a/api/ipfsproxy/ipfsproxy.go b/api/ipfsproxy/ipfsproxy.go index 1b244ab6..5fdbea71 100644 --- a/api/ipfsproxy/ipfsproxy.go +++ b/api/ipfsproxy/ipfsproxy.go @@ -249,6 +249,10 @@ func New(cfg *Config) (*Server, error) { Path("/repo/stat"). HandlerFunc(proxy.repoStatHandler). Name("RepoStat") + hijackSubrouter. + Path("/repo/gc"). + HandlerFunc(proxy.repoGCHandler). + Name("RepoGC") // Everything else goes to the IPFS daemon. router.PathPrefix("/").Handler(reverseProxy) @@ -647,6 +651,63 @@ func (proxy *Server) repoStatHandler(w http.ResponseWriter, r *http.Request) { return } +type ipfsRepoGCResp struct { + Key cid.Cid `json:",omitempty"` + Error string `json:",omitempty"` +} + +func (proxy *Server) repoGCHandler(w http.ResponseWriter, r *http.Request) { + queryValues := r.URL.Query() + streamErrors := queryValues.Get("stream-errors") == "true" + // ignoring `quiet` since it only affects text output + + proxy.setHeaders(w.Header(), r) + + w.Header().Set("Trailer", "X-Stream-Error") + var repoGC api.GlobalRepoGC + err := proxy.rpcClient.CallContext( + r.Context(), + "", + "Cluster", + "RepoGC", + struct{}{}, + &repoGC, + ) + if err != nil { + ipfsErrorResponder(w, err.Error(), -1) + return + } + + w.WriteHeader(http.StatusOK) + enc := json.NewEncoder(w) + var ipfsRepoGC ipfsRepoGCResp + mError := multiError{} + for _, gc := range repoGC.PeerMap { + for _, key := range gc.Keys { + if streamErrors { + ipfsRepoGC = ipfsRepoGCResp{Key: key.Key, Error: key.Error} + } else { + ipfsRepoGC = ipfsRepoGCResp{Key: key.Key} + if key.Error != "" { + mError.add(key.Error) + } + } + + // Cluster tags start with small letter, but IPFS tags with capital letter. + if err := enc.Encode(ipfsRepoGC); err != nil { + logger.Error(err) + } + } + } + + mErrStr := mError.Error() + if !streamErrors && mErrStr != "" { + w.Header().Set("X-Stream-Error", mErrStr) + } + + return +} + // slashHandler returns a handler which converts a /a/b/c/ request // into an /a/b/c/?arg= one. And uses the given origHandler // for it. Our handlers expect that arguments are passed in the ?arg query diff --git a/api/ipfsproxy/ipfsproxy_test.go b/api/ipfsproxy/ipfsproxy_test.go index 327421b2..92dc3aa6 100644 --- a/api/ipfsproxy/ipfsproxy_test.go +++ b/api/ipfsproxy/ipfsproxy_test.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "io" "io/ioutil" "net/http" "net/url" @@ -534,6 +535,82 @@ func TestProxyRepoStat(t *testing.T) { } +func TestProxyRepoGC(t *testing.T) { + ctx := context.Background() + proxy, mock := testIPFSProxy(t) + defer mock.Close() + defer proxy.Shutdown(ctx) + + type testcase struct { + name string + streamErrors bool + } + + testcases := []testcase{ + testcase{ + name: "With streaming errors", + streamErrors: true, + }, + testcase{ + name: "Without streaming errors", + streamErrors: false, + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + res1, err := http.Post(fmt.Sprintf("%s/repo/gc?stream-errors=%t", proxyURL(proxy), tc.streamErrors), "", nil) + if err != nil { + t.Fatal(err) + } + defer res1.Body.Close() + if res1.StatusCode != http.StatusOK { + t.Error("request should have succeeded") + } + + var repoGC []ipfsRepoGCResp + dec := json.NewDecoder(res1.Body) + for { + resp := ipfsRepoGCResp{} + + if err := dec.Decode(&resp); err != nil { + if err == io.EOF { + break + } + t.Error(err) + } + + repoGC = append(repoGC, resp) + } + + if !repoGC[0].Key.Equals(test.Cid1) { + t.Errorf("expected a different cid, expected: %s, found: %s", test.Cid1, repoGC[0].Key) + } + + xStreamError, ok := res1.Trailer["X-Stream-Error"] + if !ok { + t.Error("trailer header X-Stream-Error not set") + } + if tc.streamErrors { + if repoGC[4].Error != test.ErrLinkNotFound.Error() { + t.Error("expected a different error") + } + if len(xStreamError) != 0 { + t.Error("expected X-Stream-Error header to be empty") + } + } else { + if repoGC[4].Error != "" { + t.Error("did not expect to stream error") + } + + if len(xStreamError) == 0 || xStreamError[0] != (test.ErrLinkNotFound.Error()+";") { + t.Error("expected X-Stream-Error header with link not found error") + } + } + }) + } +} + func TestProxyAdd(t *testing.T) { ctx := context.Background() proxy, mock := testIPFSProxy(t) diff --git a/api/ipfsproxy/util.go b/api/ipfsproxy/util.go new file mode 100644 index 00000000..fe29a2c1 --- /dev/null +++ b/api/ipfsproxy/util.go @@ -0,0 +1,19 @@ +package ipfsproxy + +import ( + "strings" +) + +// MultiError contains the results of multiple errors. +type multiError struct { + err strings.Builder +} + +func (e *multiError) add(err string) { + e.err.WriteString(err) + e.err.WriteString("; ") +} + +func (e *multiError) Error() string { + return e.err.String() +} diff --git a/test/rpc_api_mock.go b/test/rpc_api_mock.go index 1cc20a7e..0c563890 100644 --- a/test/rpc_api_mock.go +++ b/test/rpc_api_mock.go @@ -17,9 +17,13 @@ import ( rpc "github.com/libp2p/go-libp2p-gorpc" ) -// ErrBadCid is returned when using ErrorCid. Operations with that CID always -// fail. -var ErrBadCid = errors.New("this is an expected error when using ErrorCid") +var ( + // ErrBadCid is returned when using ErrorCid. Operations with that CID always + // fail. + ErrBadCid = errors.New("this is an expected error when using ErrorCid") + // ErrLinkNotFound is error returned when no link is found + ErrLinkNotFound = errors.New("no link by that name") +) // NewMockRPCClient creates a mock ipfs-cluster RPC server and returns // a client to it. @@ -339,6 +343,18 @@ func (mock *mockCluster) RepoGCLocal(ctx context.Context, in struct{}, out *api. { Key: Cid1, }, + { + Key: Cid2, + }, + { + Key: Cid3, + }, + { + Key: Cid4, + }, + { + Error: ErrLinkNotFound.Error(), + }, }, }