Sharding rough draft:

sharding passes manual tests on single node cluster,
adding the shards of a directory and pinning the
clusterDAG to cluster/ipfs state

License: MIT
Signed-off-by: Wyatt Daviau <wdaviau@cs.stanford.edu>
This commit is contained in:
Wyatt Daviau 2018-02-19 14:51:32 -05:00 committed by Hector Sanjuan
parent fa74bc230d
commit 77a61890ff
6 changed files with 130 additions and 38 deletions

View File

@ -306,10 +306,11 @@ func statusReached(target api.TrackerStatus, gblPinInfo api.GlobalPinInfo) (bool
// AddMultiFile adds new files to the ipfs cluster, importing and potentially
// sharding underlying dags across the ipfs repos of multiple cluster peers
func (c *Client) AddMultiFile(multiFileR *files.MultiFileReader) error {
func (c *Client) AddMultiFile(multiFileR *files.MultiFileReader, shard bool) error {
headers := make(map[string]string)
headers["Content-Type"] = "multipart/form-data; boundary=" + multiFileR.Boundary()
return c.doStream("POST", "/allocations", multiFileR, headers, nil)
return c.doStream("POST", fmt.Sprintf("/allocations?shard=%t", shard),
multiFileR, headers, nil)
}
// Eventually an Add(io.Reader) method for adding raw readers as a multifile should be here.

View File

@ -29,6 +29,7 @@ import (
p2phttp "github.com/hsanjuan/go-libp2p-http"
cid "github.com/ipfs/go-cid"
"github.com/ipfs/go-ipfs-cmdkit/files"
ipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log"
libp2p "github.com/libp2p/go-libp2p"
host "github.com/libp2p/go-libp2p-host"
@ -493,31 +494,7 @@ func (api *API) graphHandler(w http.ResponseWriter, r *http.Request) {
sendResponse(w, err, graph)
}
func (api *API) addFileHandler(w http.ResponseWriter, r *http.Request) {
contentType := r.Header.Get("Content-Type")
mediatype, _, _ := mime.ParseMediaType(contentType)
var f files.File
if mediatype == "multipart/form-data" {
reader, err := r.MultipartReader()
if err != nil {
sendAcceptedResponse(w, err)
return
}
f = &files.MultipartFile{
Mediatype: mediatype,
Reader: reader,
}
} else {
sendAcceptedResponse(w, errors.New("unsupported media type"))
return
}
ctx, cancel := context.WithCancel(api.ctx)
defer cancel()
outChan, err := importer.ToChannel(ctx, f)
func (api *API) addFile(ctx context.Context, outChan <-chan *ipld.Node, w http.ResponseWriter) error {
for nodePtr := range outChan {
node := *nodePtr
/* Send block data to ipfs */
@ -545,7 +522,7 @@ func (api *API) addFileHandler(w http.ResponseWriter, r *http.Request) {
// We may not want to stop all work after one failure
logger.Error(err)
sendAcceptedResponse(w, errors.New("error forwarding block"))
return
return err
}
}
@ -555,6 +532,89 @@ func (api *API) addFileHandler(w http.ResponseWriter, r *http.Request) {
//
// Before returning this we will need to trigger a pin
// probably doing the same thing as Pin if its not a sharding call
//
// This is actually a bit tricky because right now we are not
// recording the root of the imported file, we could have ToChannel
// provide a root channel that can be read once the other channel is
// closed that reports the root at the end. Probably would need to
// rewrite add for this to work
return nil
}
func (api *API) addShardedFile(ctx context.Context, outChan <-chan *ipld.Node, w http.ResponseWriter) error {
for nodePtr := range outChan {
node := *nodePtr
nodeS := types.NodeSerial{
CidS: node.Cid().String(),
Data: node.RawData(),
}
err := api.rpcClient.Call("",
"Cluster",
"ShardAddNode",
nodeS,
&struct{}{})
if err != nil {
// TODO: even more important than in local add,
// we should think about the best way to handle this
// as we may not want to halt sharding with one error.
// Retry? Carry on with missing information? Get user
// feedback?
logger.Error(err)
sendAcceptedResponse(w,
errors.New("error adding block to shard"))
return err
}
}
// Last node of final shard may not have pushed over the threshold,
// force clusterDAG serialization and cluster pin tracking
err := api.rpcClient.Call("",
"Cluster",
"ShardFlush",
struct{}{},
&struct{}{})
if err != nil {
logger.Error(err)
sendAcceptedResponse(w,
errors.New("error flushing final shard"))
return err
}
return nil
}
func (api *API) addFileHandler(w http.ResponseWriter, r *http.Request) {
contentType := r.Header.Get("Content-Type")
mediatype, _, _ := mime.ParseMediaType(contentType)
var f files.File
if mediatype == "multipart/form-data" {
reader, err := r.MultipartReader()
if err != nil {
sendAcceptedResponse(w, err)
return
}
f = &files.MultipartFile{
Mediatype: mediatype,
Reader: reader,
}
} else {
sendAcceptedResponse(w, errors.New("unsupported media type"))
return
}
ctx, cancel := context.WithCancel(api.ctx)
defer cancel()
outChan, err := importer.ToChannel(ctx, f)
queryValues := r.URL.Query()
shard := queryValues.Get("shard")
if shard == "true" {
api.addShardedFile(ctx, outChan, w)
} else {
api.addFile(ctx, outChan, w)
}
sendAcceptedResponse(w, err)
}

View File

@ -52,6 +52,7 @@ const (
Monitor
Allocator
Informer
Sharder
)
// SectionType specifies to which section a component configuration belongs.
@ -161,6 +162,7 @@ type jsonConfig struct {
Monitor jsonSection `json:"monitor,omitempty"`
Allocator jsonSection `json:"allocator,omitempty"`
Informer jsonSection `json:"informer,omitempty"`
Sharder jsonSection `json:"sharder,omitempty"`
}
// Default generates a default configuration by generating defaults for all
@ -309,6 +311,7 @@ func (cfg *Manager) LoadJSON(bs []byte) error {
loadCompJSON(sections[Monitor], jcfg.Monitor)
loadCompJSON(sections[Allocator], jcfg.Allocator)
loadCompJSON(sections[Informer], jcfg.Informer)
loadCompJSON(sections[Sharder], jcfg.Informer)
return cfg.Validate()
}
@ -394,6 +397,8 @@ func (cfg *Manager) ToJSON() ([]byte, error) {
err = updateJSONConfigs(v, &jcfg.Allocator)
case Informer:
err = updateJSONConfigs(v, &jcfg.Informer)
case Sharder:
err = updateJSONConfigs(v, &jcfg.Sharder)
}
if err != nil {
return nil, err

View File

@ -197,6 +197,10 @@ is also TODO
Name: "recursive, r",
Usage: "add directory paths recursively, default false",
},
cli.BoolFlag{
Name: "shard",
Usage: "break the file into pieces (shards) and distributed among peers, default false",
},
},
Action: func(c *cli.Context) error {
paths := make([]string, c.NArg(), c.NArg())
@ -207,7 +211,7 @@ is also TODO
// Files are all opened but not read until they are sent.
multiFileR, err := parseFileArgs(paths, c.Bool("recursive"))
checkErr("serializing all files", err)
cerr := globalClient.AddMultiFile(multiFileR)
cerr := globalClient.AddMultiFile(multiFileR, c.Bool("shard"))
if cerr != nil {
formatResponse(c, nil, cerr)
}

View File

@ -350,6 +350,7 @@ func (rpcapi *RPCAPI) ConsensusPeers(ctx context.Context, in struct{}, out *[]pe
func (rpcapi *RPCAPI) ShardAddNode(in api.NodeSerial, out *struct{}) error {
node, err := in.ToIPLDNode()
if err != nil {
logger.Errorf("Found error converting to ipld node: %s", err.Error())
return err
}
return rpcapi.c.sharder.AddNode(node)

View File

@ -2,6 +2,7 @@ package shard
import (
"errors"
"fmt"
"github.com/ipfs/ipfs-cluster/api"
@ -34,12 +35,19 @@ type Sharder struct {
// NewSharder returns a new sharder for use by an ipfs-cluster. In the future
// this may take in a shard-config
func NewSharder(cfg *Config) (*Sharder, error) {
return &Sharder{allocSize: cfg.AllocSize}, nil
logger.Debugf("The alloc size provided: %d", cfg.AllocSize)
return &Sharder{allocSize: cfg.AllocSize,
currentShard: make(map[string]*cid.Cid),
}, nil
}
func (s *Sharder) unInit() bool {
return s.currentShard.links == nil && s.assignedPeer == peer.ID("") && s.byteCount == 0 &&
s.byteThreshold == 0
a := len(s.currentShard) == 0
b := s.assignedPeer == peer.ID("")
c := s.byteCount == 0
d := s.byteThreshold == 0
logger.Debugf("a: %t b: %t c: %t d: %t", a, b, c, d)
return a && b && c && d
}
// SetClient registers the rpcClient used by the Sharder to communicate with
@ -55,9 +63,7 @@ func (s *Sharder) Shutdown() error {
// Temporary storage of links to be serialized to ipld cbor once allocation is
// complete
type shardObj struct {
links []*cid.Cid
}
type shardObj map[string]*cid.Cid
// clusterDAGCountBytes tracks the number of bytes in the serialized cluster
// DAG node used to track this shard. For now ignoring these bytes
@ -149,13 +155,16 @@ func (s *Sharder) initShard() error {
return err
}
s.byteCount = 0
s.currentShard = shardObj{}
s.currentShard = make(map[string]*cid.Cid)
logger.Debugf("Within initShard. thresh: %d", s.byteThreshold)
return nil
}
// AddNode includes the provided node into a shard in the cluster DAG
// that tracks this node's graph
func (s *Sharder) AddNode(node ipld.Node) error {
logger.Debug("adding node to shard")
logger.Debugf("sharder size: %d---sharder thresh: %d", s.byteCount, s.byteThreshold)
size, err := node.Size()
if err != nil {
return err
@ -166,11 +175,15 @@ func (s *Sharder) AddNode(node ipld.Node) error {
format = ""
logger.Warning("unsupported cid type, treating as v0")
}
if c.Prefix().Version == 0 {
format = "v0"
}
if s.unInit() {
logger.Debug("initializing next shard of data")
if err := s.initShard(); err != nil {
return err
}
logger.Debugf("After first init. thresh: %d", s.byteThreshold)
} else {
if s.byteCount+size+s.clusterDAGCountBytes() > s.byteThreshold {
logger.Debug("shard at capacity, pin cluster DAG node")
@ -180,14 +193,18 @@ func (s *Sharder) AddNode(node ipld.Node) error {
if err := s.initShard(); err != nil {
return err
}
logger.Debugf("After flushing. thresh: %d", s.byteThreshold)
}
}
// Shard is initialized and can accommodate node by config-enforced
// invariant that shard size is always greater than the ipfs block
// max chunk size
logger.Debugf("Adding size: %d to byteCount at: %d", size, s.byteCount)
s.byteCount += size
s.currentShard.links = append(s.currentShard.links, node.Cid())
key := fmt.Sprintf("%d", len(s.currentShard))
s.currentShard[key] = node.Cid()
var retStr string
b := api.BlockWithFormat{
Data: node.RawData(),
@ -203,12 +220,15 @@ func (s *Sharder) AddNode(node ipld.Node) error {
// shard is being flushed.
func (s *Sharder) Flush() error {
// Serialize shard node and reset state
logger.Debugf("Flushing the current shard %v", s.currentShard)
shardNode, err := cbor.WrapObject(s.currentShard, mh.SHA2_256, mh.DefaultLengths[mh.SHA2_256])
if err != nil {
return err
}
logger.Debugf("The dag cbor Node Links: %v", shardNode.Links())
targetPeer := s.assignedPeer
s.currentShard = shardObj{}
s.currentShard = make(map[string]*cid.Cid)
s.assignedPeer = peer.ID("")
s.byteThreshold = 0
s.byteCount = 0
@ -217,6 +237,7 @@ func (s *Sharder) Flush() error {
Data: shardNode.RawData(),
Format: "cbor",
}
logger.Debugf("Here is the serialized ipld: %x", b.Data)
err = s.rpcClient.Call(targetPeer, "Cluster", "IPFSBlockPut",
b, &retStr)
if err != nil {