removing pointer across rpc

instead add logic is a submodule imported in api and cluster comp

License: MIT
Signed-off-by: Wyatt Daviau <wdaviau@cs.stanford.edu>
This commit is contained in:
Wyatt Daviau 2018-04-24 22:12:30 -04:00 committed by Hector Sanjuan
parent 48479a0ca5
commit f7c3dcce5b
4 changed files with 49 additions and 36 deletions

View File

@ -10,10 +10,26 @@ import (
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/importer"
rpc "github.com/hsanjuan/go-libp2p-gorpc"
"github.com/ipfs/go-ipfs-cmdkit/files"
logging "github.com/ipfs/go-log"
)
func (c *Cluster) consumeLocalAdd(
// AddSession stores utilities of the calling component needed for the add
type AddSession struct {
rpcClient *rpc.Client
logger logging.EventLogger
}
// NewAddSession creates a new AddSession for adding a file
func NewAddSession(rpcClient *rpc.Client, logger logging.EventLogger) *AddSession {
return &AddSession{
rpcClient: rpcClient,
logger: logger,
}
}
func (a *AddSession) consumeLocalAdd(
args map[string]string,
outObj *api.NodeWithMeta,
replMin, replMax int,
@ -23,19 +39,19 @@ func (c *Cluster) consumeLocalAdd(
outObj.Format = ""
args["cid"] = outObj.Cid // root node stored on last call
var hash string
err := c.rpcClient.Call(
err := a.rpcClient.Call(
"",
"Cluster",
"IPFSBlockPut",
*outObj,
&hash)
if outObj.Cid != hash {
logger.Warningf("mismatch. node cid: %s\nrpc cid: %s", outObj.Cid, hash)
a.logger.Warningf("mismatch. node cid: %s\nrpc cid: %s", outObj.Cid, hash)
}
return err
}
func (c *Cluster) finishLocalAdd(
func (a *AddSession) finishLocalAdd(
args map[string]string,
replMin, replMax int,
) error {
@ -51,7 +67,7 @@ func (c *Cluster) finishLocalAdd(
ReplicationFactorMin: replMin,
ReplicationFactorMax: replMax,
}
return c.rpcClient.Call(
return a.rpcClient.Call(
"",
"Cluster",
"Pin",
@ -60,7 +76,7 @@ func (c *Cluster) finishLocalAdd(
)
}
func (c *Cluster) consumeShardAdd(
func (a *AddSession) consumeShardAdd(
args map[string]string,
outObj *api.NodeWithMeta,
replMin, replMax int,
@ -72,7 +88,7 @@ func (c *Cluster) consumeShardAdd(
outObj.ReplMax = replMax
outObj.ReplMin = replMin
var retStr string
err := c.rpcClient.Call(
err := a.rpcClient.Call(
"",
"Cluster",
"SharderAddNode",
@ -84,7 +100,7 @@ func (c *Cluster) consumeShardAdd(
return err
}
func (c *Cluster) finishShardAdd(
func (a *AddSession) finishShardAdd(
args map[string]string,
replMin, replMax int,
) error {
@ -92,7 +108,7 @@ func (c *Cluster) finishShardAdd(
if !ok {
return errors.New("bad state: shardID passed incorrectly")
}
err := c.rpcClient.Call(
err := a.rpcClient.Call(
"",
"Cluster",
"SharderFinalize",
@ -102,7 +118,7 @@ func (c *Cluster) finishShardAdd(
return err
}
func (c *Cluster) consumeImport(ctx context.Context,
func (a *AddSession) consumeImport(ctx context.Context,
outChan <-chan *api.NodeWithMeta,
printChan <-chan *api.AddedOutput,
errChan <-chan error,
@ -162,7 +178,7 @@ func (c *Cluster) consumeImport(ctx context.Context,
if err := finish(args, replMin, replMax); err != nil {
return nil, err
}
logger.Debugf("succeeding sharding import")
a.logger.Debugf("succeeding sharding import")
return toPrint, nil
}
@ -170,7 +186,7 @@ func (c *Cluster) consumeImport(ctx context.Context,
// pipeline is used to DAGify the file. Depending on input parameters this
// DAG can be added locally to the calling cluster peer's ipfs repo, or
// sharded across the entire cluster.
func (c *Cluster) AddFile(
func (a *AddSession) AddFile(ctx context.Context,
reader *multipart.Reader,
params url.Values,
) ([]api.AddedOutput, error) {
@ -190,7 +206,7 @@ func (c *Cluster) AddFile(
Mediatype: "multipart/form-data",
Reader: reader,
}
ctx, cancel := context.WithCancel(c.ctx)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
printChan, outChan, errChan := importer.ToChannel(
ctx,
@ -211,14 +227,14 @@ func (c *Cluster) AddFile(
var consume func(map[string]string, *api.NodeWithMeta, int, int) error
var finish func(map[string]string, int, int) error
if shard == "true" {
consume = c.consumeShardAdd
finish = c.finishShardAdd
consume = a.consumeShardAdd
finish = a.finishShardAdd
} else {
consume = c.consumeLocalAdd
finish = c.finishLocalAdd
consume = a.consumeLocalAdd
finish = a.finishLocalAdd
}
return c.consumeImport(
return a.consumeImport(
ctx,
outChan,
printChan,

View File

@ -20,6 +20,7 @@ import (
"sync"
"time"
add "github.com/ipfs/ipfs-cluster/add"
types "github.com/ipfs/ipfs-cluster/api"
mux "github.com/gorilla/mux"
@ -512,17 +513,9 @@ func (api *API) addFileHandler(w http.ResponseWriter, r *http.Request) {
return
}
queryValues := r.URL.Query()
fI := types.FileInfo{
Reader: reader,
Params: queryValues,
}
var toPrint []types.AddedOutput
err = api.rpcClient.Call("",
"Cluster",
"AddFile",
fI,
&toPrint)
params := r.URL.Query()
addSess := add.NewAddSession(api.rpcClient, logger)
toPrint, err := addSess.AddFile(api.ctx, reader, params)
if err != nil {
sendErrorResponse(w, 500, err.Error())
}

View File

@ -4,9 +4,11 @@ import (
"context"
"errors"
"fmt"
"mime/multipart"
"sync"
"time"
add "github.com/ipfs/ipfs-cluster/add"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/pstoremgr"
"github.com/ipfs/ipfs-cluster/rpcutil"
@ -1160,6 +1162,15 @@ func (c *Cluster) unpinShard(cdagCid, shardCid *cid.Cid) error {
return c.consensus.LogUnpin(shardPin)
}
// AddFile adds a file to the ipfs daemons of the cluster. The ipfs importer
// pipeline is used to DAGify the file. Depending on input parameters this
// DAG can be added locally to the calling cluster peer's ipfs repo, or
// sharded across the entire cluster.
func (c *Cluster) AddFile(reader *multipart.Reader, params map[string][]string) ([]api.AddedOutput, error) {
addSess := add.NewAddSession(c.rpcClient, logger)
return addSess.AddFile(c.ctx, reader, params)
}
// Version returns the current IPFS Cluster version.
func (c *Cluster) Version() string {
return Version

View File

@ -53,13 +53,6 @@ func (rpcapi *RPCAPI) Pins(ctx context.Context, in struct{}, out *[]api.PinSeria
return nil
}
// AddFile runs Cluster.AddFile
func (rpcapi *RPCAPI) AddFile(ctx context.Context, in api.FileInfo, out *[]api.AddedOutput) error {
toPrint, err := rpcapi.c.AddFile(in.Reader, in.Params)
*out = toPrint
return err
}
// PinGet runs Cluster.PinGet().
func (rpcapi *RPCAPI) PinGet(ctx context.Context, in api.PinSerial, out *api.PinSerial) error {
cidarg := in.ToPin()