cluster-ctl add to ipfs

RPC call to put a block in ipfs
IPFSConnector method to implement the RPC call
cluster restapi reads from channel and puts
blocks into IPFS via RPC in ctl add handler

License: MIT
Signed-off-by: Wyatt Daviau <wdaviau@cs.stanford.edu>
This commit is contained in:
Wyatt Daviau 2018-02-09 00:23:24 -05:00 committed by Hector Sanjuan
parent 3e9b08ba60
commit 40f8eeedb5
4 changed files with 84 additions and 16 deletions

View File

@ -29,6 +29,7 @@ import (
p2phttp "github.com/hsanjuan/go-libp2p-http" p2phttp "github.com/hsanjuan/go-libp2p-http"
cid "github.com/ipfs/go-cid" cid "github.com/ipfs/go-cid"
"github.com/ipfs/go-ipfs-cmdkit/files" "github.com/ipfs/go-ipfs-cmdkit/files"
ipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
libp2p "github.com/libp2p/go-libp2p" libp2p "github.com/libp2p/go-libp2p"
host "github.com/libp2p/go-libp2p-host" host "github.com/libp2p/go-libp2p-host"
@ -515,8 +516,28 @@ func (api *API) addFileHandler(w http.ResponseWriter, r *http.Request) {
sendAcceptedResponse(w, errors.New("unsupported media type")) sendAcceptedResponse(w, errors.New("unsupported media type"))
return return
} }
outChan := make(chan *ipld.Node)
go func() {
for nodePtr := range outChan {
node := *nodePtr
/* Send block data to ipfs */
pinS := types.PinSerial{}
err := api.rpcClient.Call("",
"Cluster",
"IPFSBlockPut",
node.RawData(),
&pinS)
err := importer.ToPrint(f) /* Verify that block put cid matches*/
if err != nil {
logger.Warning(err)
}
if node.String() != pinS.Cid { // node string is just cid string
logger.Warningf("mismatch. node cid: %s\nrpc cid: %s", node.String(), pinS.Cid)
}
}
}()
err := dex.ImportToChannel(f, outChan, context.Background())
/* buf := make([]byte, 256) /* buf := make([]byte, 256)
for { for {
file, err := f.NextFile() file, err := f.NextFile()

View File

@ -90,6 +90,8 @@ type IPFSConnector interface {
// RepoSize returns the current repository size as expressed // RepoSize returns the current repository size as expressed
// by "repo stat". // by "repo stat".
RepoSize() (uint64, error) RepoSize() (uint64, error)
// BlockPut directly adds a block of data to the IPFS repo
BlockPut([]byte) (api.Pin, error)
} }
// Peered represents a component which needs to be aware of the peers // Peered represents a component which needs to be aware of the peers

View File

@ -22,6 +22,7 @@ import (
rpc "github.com/hsanjuan/go-libp2p-gorpc" rpc "github.com/hsanjuan/go-libp2p-gorpc"
cid "github.com/ipfs/go-cid" cid "github.com/ipfs/go-cid"
"github.com/ipfs/go-ipfs-cmdkit/files"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
peer "github.com/libp2p/go-libp2p-peer" peer "github.com/libp2p/go-libp2p-peer"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
@ -109,6 +110,10 @@ type ipfsStream struct {
Protocol string Protocol string
} }
type ipfsBlockPutResp struct {
Key string
}
// NewConnector creates the component and leaves it ready to be started // NewConnector creates the component and leaves it ready to be started
func NewConnector(cfg *Config) (*Connector, error) { func NewConnector(cfg *Config) (*Connector, error) {
err := cfg.Validate() err := cfg.Validate()
@ -589,7 +594,7 @@ func (ipfs *Connector) ID() (api.IPFSID, error) {
ctx, cancel := context.WithTimeout(ipfs.ctx, ipfs.config.IPFSRequestTimeout) ctx, cancel := context.WithTimeout(ipfs.ctx, ipfs.config.IPFSRequestTimeout)
defer cancel() defer cancel()
id := api.IPFSID{} id := api.IPFSID{}
body, err := ipfs.postCtx(ctx, "id") body, err := ipfs.postCtx(ctx, "id", nil)
if err != nil { if err != nil {
id.Error = err.Error() id.Error = err.Error()
return id, err return id, err
@ -643,7 +648,7 @@ func (ipfs *Connector) Pin(ctx context.Context, hash *cid.Cid, recursive bool) e
} }
path := fmt.Sprintf("pin/add?arg=%s&recursive=%t", hash, recursive) path := fmt.Sprintf("pin/add?arg=%s&recursive=%t", hash, recursive)
_, err = ipfs.postCtx(ctx, path) _, err = ipfs.postCtx(ctx, path, nil)
if err == nil { if err == nil {
logger.Info("IPFS Pin request succeeded: ", hash) logger.Info("IPFS Pin request succeeded: ", hash)
} }
@ -664,7 +669,7 @@ func (ipfs *Connector) Unpin(ctx context.Context, hash *cid.Cid) error {
} }
if pinStatus.IsPinned() { if pinStatus.IsPinned() {
path := fmt.Sprintf("pin/rm?arg=%s", hash) path := fmt.Sprintf("pin/rm?arg=%s", hash)
_, err := ipfs.postCtx(ctx, path) _, err := ipfs.postCtx(ctx, path, nil)
if err == nil { if err == nil {
logger.Info("IPFS Unpin request succeeded:", hash) logger.Info("IPFS Unpin request succeeded:", hash)
} }
@ -680,7 +685,7 @@ func (ipfs *Connector) Unpin(ctx context.Context, hash *cid.Cid) error {
func (ipfs *Connector) PinLs(ctx context.Context, typeFilter string) (map[string]api.IPFSPinStatus, error) { func (ipfs *Connector) PinLs(ctx context.Context, typeFilter string) (map[string]api.IPFSPinStatus, error) {
ctx, cancel := context.WithTimeout(ctx, ipfs.config.IPFSRequestTimeout) ctx, cancel := context.WithTimeout(ctx, ipfs.config.IPFSRequestTimeout)
defer cancel() defer cancel()
body, err := ipfs.postCtx(ctx, "pin/ls?type="+typeFilter) body, err := ipfs.postCtx(ctx, "pin/ls?type="+typeFilter, nil)
// Some error talking to the daemon // Some error talking to the daemon
if err != nil { if err != nil {
@ -708,7 +713,7 @@ func (ipfs *Connector) PinLsCid(ctx context.Context, hash *cid.Cid) (api.IPFSPin
ctx, cancel := context.WithTimeout(ctx, ipfs.config.IPFSRequestTimeout) ctx, cancel := context.WithTimeout(ctx, ipfs.config.IPFSRequestTimeout)
defer cancel() defer cancel()
lsPath := fmt.Sprintf("pin/ls?arg=%s&type=recursive", hash) lsPath := fmt.Sprintf("pin/ls?arg=%s&type=recursive", hash)
body, err := ipfs.postCtx(ctx, lsPath) body, err := ipfs.postCtx(ctx, lsPath, nil)
// Network error, daemon down // Network error, daemon down
if body == nil && err != nil { if body == nil && err != nil {
@ -735,15 +740,16 @@ func (ipfs *Connector) PinLsCid(ctx context.Context, hash *cid.Cid) (api.IPFSPin
return api.IPFSPinStatusFromString(pinObj.Type), nil return api.IPFSPinStatusFromString(pinObj.Type), nil
} }
func (ipfs *Connector) doPostCtx(ctx context.Context, client *http.Client, apiURL, path string) (*http.Response, error) { func (ipfs *Connector) doPostCtx(ctx context.Context, client *http.Client, apiURL, path string, contentType string, postBody io.Reader) (*http.Response, error) {
logger.Debugf("posting %s", path) logger.Debugf("posting %s", path)
urlstr := fmt.Sprintf("%s/%s", apiURL, path) urlstr := fmt.Sprintf("%s/%s", apiURL, path)
req, err := http.NewRequest("POST", urlstr, nil) req, err := http.NewRequest("POST", urlstr, postBody)
if err != nil { if err != nil {
logger.Error("error creating POST request:", err) logger.Error("error creating POST request:", err)
} }
req.Header.Set("Content-Type", contentType)
req = req.WithContext(ctx) req = req.WithContext(ctx)
res, err := ipfs.client.Do(req) res, err := ipfs.client.Do(req)
if err != nil { if err != nil {
@ -772,8 +778,8 @@ func checkResponse(path string, code int, body []byte) error {
// postCtx makes a POST request against // postCtx makes a POST request against
// the ipfs daemon, reads the full body of the response and // the ipfs daemon, reads the full body of the response and
// returns it after checking for errors. // returns it after checking for errors.
func (ipfs *Connector) postCtx(ctx context.Context, path string) ([]byte, error) { func (ipfs *Connector) postCtx(ctx context.Context, path string, contentType string, postBody io.Reader) ([]byte, error) {
res, err := ipfs.doPostCtx(ctx, ipfs.client, ipfs.apiURL(), path) res, err := ipfs.doPostCtx(ctx, ipfs.client, ipfs.apiURL(), path, contentType, postBody)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -832,7 +838,11 @@ func (ipfs *Connector) ConnectSwarms() error {
// This is a best effort attempt // This is a best effort attempt
// We ignore errors which happens // We ignore errors which happens
// when passing in a bunch of addresses // when passing in a bunch of addresses
_, err := ipfs.postCtx(ctx, fmt.Sprintf("swarm/connect?arg=%s", addr)) _, err := ipfs.postCtx(
ctx,
fmt.Sprintf("swarm/connect?arg=%s", addr),
nil,
)
if err != nil { if err != nil {
logger.Debug(err) logger.Debug(err)
continue continue
@ -849,7 +859,7 @@ func (ipfs *Connector) ConnectSwarms() error {
func (ipfs *Connector) ConfigKey(keypath string) (interface{}, error) { func (ipfs *Connector) ConfigKey(keypath string) (interface{}, error) {
ctx, cancel := context.WithTimeout(ipfs.ctx, ipfs.config.IPFSRequestTimeout) ctx, cancel := context.WithTimeout(ipfs.ctx, ipfs.config.IPFSRequestTimeout)
defer cancel() defer cancel()
res, err := ipfs.postCtx(ctx, "config/show") res, err := ipfs.postCtx(ctx, "config/show", nil)
if err != nil { if err != nil {
logger.Error(err) logger.Error(err)
return nil, err return nil, err
@ -895,7 +905,7 @@ func getConfigValue(path []string, cfg map[string]interface{}) (interface{}, err
func (ipfs *Connector) FreeSpace() (uint64, error) { func (ipfs *Connector) FreeSpace() (uint64, error) {
ctx, cancel := context.WithTimeout(ipfs.ctx, ipfs.config.IPFSRequestTimeout) ctx, cancel := context.WithTimeout(ipfs.ctx, ipfs.config.IPFSRequestTimeout)
defer cancel() defer cancel()
res, err := ipfs.postCtx(ctx, "repo/stat") res, err := ipfs.postCtx(ctx, "repo/stat", nil)
if err != nil { if err != nil {
logger.Error(err) logger.Error(err)
return 0, err return 0, err
@ -915,7 +925,7 @@ func (ipfs *Connector) FreeSpace() (uint64, error) {
func (ipfs *Connector) RepoSize() (uint64, error) { func (ipfs *Connector) RepoSize() (uint64, error) {
ctx, cancel := context.WithTimeout(ipfs.ctx, ipfs.config.IPFSRequestTimeout) ctx, cancel := context.WithTimeout(ipfs.ctx, ipfs.config.IPFSRequestTimeout)
defer cancel() defer cancel()
res, err := ipfs.postCtx(ctx, "repo/stat") res, err := ipfs.postCtx(ctx, "repo/stat", nil)
if err != nil { if err != nil {
logger.Error(err) logger.Error(err)
return 0, err return 0, err
@ -930,12 +940,12 @@ func (ipfs *Connector) RepoSize() (uint64, error) {
return stats.RepoSize, nil return stats.RepoSize, nil
} }
// SwarmPeers returns the peers currently connected to this ipfs daemon // SwarmPeers returns the peers currently connected to this ipfs daemon.
func (ipfs *Connector) SwarmPeers() (api.SwarmPeers, error) { func (ipfs *Connector) SwarmPeers() (api.SwarmPeers, error) {
ctx, cancel := context.WithTimeout(ipfs.ctx, ipfs.config.IPFSRequestTimeout) ctx, cancel := context.WithTimeout(ipfs.ctx, ipfs.config.IPFSRequestTimeout)
defer cancel() defer cancel()
swarm := api.SwarmPeers{} swarm := api.SwarmPeers{}
res, err := ipfs.postCtx(ctx, "swarm/peers") res, err := ipfs.postCtx(ctx, "swarm/peers", nil)
if err != nil { if err != nil {
logger.Error(err) logger.Error(err)
return swarm, err return swarm, err
@ -979,3 +989,31 @@ func extractArgument(u *url.URL) (string, bool) {
} }
return "", false return "", false
} }
// BlockPut triggers an ipfs block put on the given data, inserting the block
// into the ipfs daemon's repo.
func (ipfs *Connector) BlockPut(data []byte) (api.Pin, error) {
pin := api.Pin{}
r := ioutil.NopCloser(bytes.NewReader(data))
rFile := files.NewReaderFile("", "", r, nil)
sliceFile := files.NewSliceFile("", "", []files.File{rFile}) // IPFS reqs require a wrapping directory
multiFileR := files.NewMultiFileReader(sliceFile, true)
contentType := "multipart/form-data; boundary=" + multiFileR.Boundary()
res, err := ipfs.post("block/put", contentType, multiFileR)
if err != nil {
return pin, err
}
var keyRaw ipfsBlockPutResp
err = json.Unmarshal(res, &keyRaw)
if err != nil {
return pin, err
}
c, err := cid.Decode(keyRaw.Key)
if err != nil {
return pin, err
}
return api.PinCid(c), nil
}

View File

@ -302,6 +302,13 @@ func (rpcapi *RPCAPI) IPFSSwarmPeers(ctx context.Context, in struct{}, out *api.
return err return err
} }
// IPFSBlockPut runs IPFSConnector.BlockPut().
func (rpcapi *RPCAPI) IPFSBlockPut(in []byte, out *api.PinSerial) error {
res, err := rpcapi.c.ipfs.BlockPut(in)
*out = res.ToSerial()
return err
}
/* /*
Consensus component methods Consensus component methods
*/ */