diff --git a/api/rest/restapi.go b/api/rest/restapi.go index 264b9367..2b2af8d9 100644 --- a/api/rest/restapi.go +++ b/api/rest/restapi.go @@ -29,6 +29,7 @@ import ( p2phttp "github.com/hsanjuan/go-libp2p-http" cid "github.com/ipfs/go-cid" "github.com/ipfs/go-ipfs-cmdkit/files" + ipld "github.com/ipfs/go-ipld-format" logging "github.com/ipfs/go-log" libp2p "github.com/libp2p/go-libp2p" host "github.com/libp2p/go-libp2p-host" @@ -515,8 +516,28 @@ func (api *API) addFileHandler(w http.ResponseWriter, r *http.Request) { sendAcceptedResponse(w, errors.New("unsupported media type")) return } + outChan := make(chan *ipld.Node) + go func() { + for nodePtr := range outChan { + node := *nodePtr + /* Send block data to ipfs */ + pinS := types.PinSerial{} + err := api.rpcClient.Call("", + "Cluster", + "IPFSBlockPut", + node.RawData(), + &pinS) - err := importer.ToPrint(f) + /* Verify that block put cid matches*/ + if err != nil { + logger.Warning(err) + } + if node.String() != pinS.Cid { // node string is just cid string + logger.Warningf("mismatch. node cid: %s\nrpc cid: %s", node.String(), pinS.Cid) + } + } + }() + err := dex.ImportToChannel(f, outChan, context.Background()) /* buf := make([]byte, 256) for { file, err := f.NextFile() diff --git a/ipfscluster.go b/ipfscluster.go index f5072f1f..71295073 100644 --- a/ipfscluster.go +++ b/ipfscluster.go @@ -90,6 +90,8 @@ type IPFSConnector interface { // RepoSize returns the current repository size as expressed // by "repo stat". RepoSize() (uint64, error) + // BlockPut directly adds a block of data to the IPFS repo + BlockPut([]byte) (api.Pin, error) } // Peered represents a component which needs to be aware of the peers diff --git a/ipfsconn/ipfshttp/ipfshttp.go b/ipfsconn/ipfshttp/ipfshttp.go index 89db540d..117ad218 100644 --- a/ipfsconn/ipfshttp/ipfshttp.go +++ b/ipfsconn/ipfshttp/ipfshttp.go @@ -22,6 +22,7 @@ import ( rpc "github.com/hsanjuan/go-libp2p-gorpc" cid "github.com/ipfs/go-cid" + "github.com/ipfs/go-ipfs-cmdkit/files" logging "github.com/ipfs/go-log" peer "github.com/libp2p/go-libp2p-peer" ma "github.com/multiformats/go-multiaddr" @@ -109,6 +110,10 @@ type ipfsStream struct { Protocol string } +type ipfsBlockPutResp struct { + Key string +} + // NewConnector creates the component and leaves it ready to be started func NewConnector(cfg *Config) (*Connector, error) { err := cfg.Validate() @@ -589,7 +594,7 @@ func (ipfs *Connector) ID() (api.IPFSID, error) { ctx, cancel := context.WithTimeout(ipfs.ctx, ipfs.config.IPFSRequestTimeout) defer cancel() id := api.IPFSID{} - body, err := ipfs.postCtx(ctx, "id") + body, err := ipfs.postCtx(ctx, "id", nil) if err != nil { id.Error = err.Error() return id, err @@ -643,7 +648,7 @@ func (ipfs *Connector) Pin(ctx context.Context, hash *cid.Cid, recursive bool) e } path := fmt.Sprintf("pin/add?arg=%s&recursive=%t", hash, recursive) - _, err = ipfs.postCtx(ctx, path) + _, err = ipfs.postCtx(ctx, path, nil) if err == nil { logger.Info("IPFS Pin request succeeded: ", hash) } @@ -664,7 +669,7 @@ func (ipfs *Connector) Unpin(ctx context.Context, hash *cid.Cid) error { } if pinStatus.IsPinned() { path := fmt.Sprintf("pin/rm?arg=%s", hash) - _, err := ipfs.postCtx(ctx, path) + _, err := ipfs.postCtx(ctx, path, nil) if err == nil { logger.Info("IPFS Unpin request succeeded:", hash) } @@ -680,7 +685,7 @@ func (ipfs *Connector) Unpin(ctx context.Context, hash *cid.Cid) error { func (ipfs *Connector) PinLs(ctx context.Context, typeFilter string) (map[string]api.IPFSPinStatus, error) { ctx, cancel := context.WithTimeout(ctx, ipfs.config.IPFSRequestTimeout) defer cancel() - body, err := ipfs.postCtx(ctx, "pin/ls?type="+typeFilter) + body, err := ipfs.postCtx(ctx, "pin/ls?type="+typeFilter, nil) // Some error talking to the daemon if err != nil { @@ -708,7 +713,7 @@ func (ipfs *Connector) PinLsCid(ctx context.Context, hash *cid.Cid) (api.IPFSPin ctx, cancel := context.WithTimeout(ctx, ipfs.config.IPFSRequestTimeout) defer cancel() lsPath := fmt.Sprintf("pin/ls?arg=%s&type=recursive", hash) - body, err := ipfs.postCtx(ctx, lsPath) + body, err := ipfs.postCtx(ctx, lsPath, nil) // Network error, daemon down if body == nil && err != nil { @@ -735,15 +740,16 @@ func (ipfs *Connector) PinLsCid(ctx context.Context, hash *cid.Cid) (api.IPFSPin return api.IPFSPinStatusFromString(pinObj.Type), nil } -func (ipfs *Connector) doPostCtx(ctx context.Context, client *http.Client, apiURL, path string) (*http.Response, error) { +func (ipfs *Connector) doPostCtx(ctx context.Context, client *http.Client, apiURL, path string, contentType string, postBody io.Reader) (*http.Response, error) { logger.Debugf("posting %s", path) urlstr := fmt.Sprintf("%s/%s", apiURL, path) - req, err := http.NewRequest("POST", urlstr, nil) + req, err := http.NewRequest("POST", urlstr, postBody) if err != nil { logger.Error("error creating POST request:", err) } + req.Header.Set("Content-Type", contentType) req = req.WithContext(ctx) res, err := ipfs.client.Do(req) if err != nil { @@ -772,8 +778,8 @@ func checkResponse(path string, code int, body []byte) error { // postCtx makes a POST request against // the ipfs daemon, reads the full body of the response and // returns it after checking for errors. -func (ipfs *Connector) postCtx(ctx context.Context, path string) ([]byte, error) { - res, err := ipfs.doPostCtx(ctx, ipfs.client, ipfs.apiURL(), path) +func (ipfs *Connector) postCtx(ctx context.Context, path string, contentType string, postBody io.Reader) ([]byte, error) { + res, err := ipfs.doPostCtx(ctx, ipfs.client, ipfs.apiURL(), path, contentType, postBody) if err != nil { return nil, err } @@ -832,7 +838,11 @@ func (ipfs *Connector) ConnectSwarms() error { // This is a best effort attempt // We ignore errors which happens // when passing in a bunch of addresses - _, err := ipfs.postCtx(ctx, fmt.Sprintf("swarm/connect?arg=%s", addr)) + _, err := ipfs.postCtx( + ctx, + fmt.Sprintf("swarm/connect?arg=%s", addr), + nil, + ) if err != nil { logger.Debug(err) continue @@ -849,7 +859,7 @@ func (ipfs *Connector) ConnectSwarms() error { func (ipfs *Connector) ConfigKey(keypath string) (interface{}, error) { ctx, cancel := context.WithTimeout(ipfs.ctx, ipfs.config.IPFSRequestTimeout) defer cancel() - res, err := ipfs.postCtx(ctx, "config/show") + res, err := ipfs.postCtx(ctx, "config/show", nil) if err != nil { logger.Error(err) return nil, err @@ -895,7 +905,7 @@ func getConfigValue(path []string, cfg map[string]interface{}) (interface{}, err func (ipfs *Connector) FreeSpace() (uint64, error) { ctx, cancel := context.WithTimeout(ipfs.ctx, ipfs.config.IPFSRequestTimeout) defer cancel() - res, err := ipfs.postCtx(ctx, "repo/stat") + res, err := ipfs.postCtx(ctx, "repo/stat", nil) if err != nil { logger.Error(err) return 0, err @@ -915,7 +925,7 @@ func (ipfs *Connector) FreeSpace() (uint64, error) { func (ipfs *Connector) RepoSize() (uint64, error) { ctx, cancel := context.WithTimeout(ipfs.ctx, ipfs.config.IPFSRequestTimeout) defer cancel() - res, err := ipfs.postCtx(ctx, "repo/stat") + res, err := ipfs.postCtx(ctx, "repo/stat", nil) if err != nil { logger.Error(err) return 0, err @@ -930,12 +940,12 @@ func (ipfs *Connector) RepoSize() (uint64, error) { return stats.RepoSize, nil } -// SwarmPeers returns the peers currently connected to this ipfs daemon +// SwarmPeers returns the peers currently connected to this ipfs daemon. func (ipfs *Connector) SwarmPeers() (api.SwarmPeers, error) { ctx, cancel := context.WithTimeout(ipfs.ctx, ipfs.config.IPFSRequestTimeout) defer cancel() swarm := api.SwarmPeers{} - res, err := ipfs.postCtx(ctx, "swarm/peers") + res, err := ipfs.postCtx(ctx, "swarm/peers", nil) if err != nil { logger.Error(err) return swarm, err @@ -979,3 +989,31 @@ func extractArgument(u *url.URL) (string, bool) { } return "", false } + +// BlockPut triggers an ipfs block put on the given data, inserting the block +// into the ipfs daemon's repo. +func (ipfs *Connector) BlockPut(data []byte) (api.Pin, error) { + pin := api.Pin{} + + r := ioutil.NopCloser(bytes.NewReader(data)) + rFile := files.NewReaderFile("", "", r, nil) + sliceFile := files.NewSliceFile("", "", []files.File{rFile}) // IPFS reqs require a wrapping directory + multiFileR := files.NewMultiFileReader(sliceFile, true) + contentType := "multipart/form-data; boundary=" + multiFileR.Boundary() + + res, err := ipfs.post("block/put", contentType, multiFileR) + if err != nil { + return pin, err + } + var keyRaw ipfsBlockPutResp + err = json.Unmarshal(res, &keyRaw) + if err != nil { + return pin, err + } + c, err := cid.Decode(keyRaw.Key) + if err != nil { + return pin, err + } + + return api.PinCid(c), nil +} diff --git a/rpc_api.go b/rpc_api.go index 5e2ef26c..6c1914b2 100644 --- a/rpc_api.go +++ b/rpc_api.go @@ -302,6 +302,13 @@ func (rpcapi *RPCAPI) IPFSSwarmPeers(ctx context.Context, in struct{}, out *api. return err } +// IPFSBlockPut runs IPFSConnector.BlockPut(). +func (rpcapi *RPCAPI) IPFSBlockPut(in []byte, out *api.PinSerial) error { + res, err := rpcapi.c.ipfs.BlockPut(in) + *out = res.ToSerial() + return err +} + /* Consensus component methods */