From 4a5a613d945f55ac677cccfba7f5c7da666df460 Mon Sep 17 00:00:00 2001 From: Wyatt Daviau Date: Fri, 16 Feb 2018 07:45:16 -0500 Subject: [PATCH] rpc call and config added License: MIT Signed-off-by: Wyatt Daviau --- api/types.go | 39 ++++++++++++++++++++++++++++++++++++ ipfs-cluster-service/main.go | 1 + logging.go | 1 + rpc_api.go | 13 ++++++++++++ shard/sharder.go | 26 ++++++++++++++++++++++-- 5 files changed, 78 insertions(+), 2 deletions(-) diff --git a/api/types.go b/api/types.go index 16ff5a41..0c5cde2c 100644 --- a/api/types.go +++ b/api/types.go @@ -15,7 +15,10 @@ import ( "strings" "time" + blocks "github.com/ipfs/go-block-format" cid "github.com/ipfs/go-cid" + dag "github.com/ipfs/go-ipfs/merkledag" + ipld "github.com/ipfs/go-ipld-format" logging "github.com/ipfs/go-log" peer "github.com/libp2p/go-libp2p-peer" protocol "github.com/libp2p/go-libp2p-protocol" @@ -27,6 +30,11 @@ import ( _ "github.com/multiformats/go-multiaddr-dns" ) +func init() { + ipld.Register(cid.DagProtobuf, dag.DecodeProtobufBlock) + ipld.Register(cid.Raw, dag.DecodeRawBlock) +} + var logger = logging.Logger("apitypes") // TrackerStatus values @@ -619,6 +627,37 @@ func (pins PinSerial) ToPin() Pin { } } +// NodeSerial encodes an ipld node. The cid and raw data are bundled together +// to be decoded into a block using NewBlockWithCid which can further be +// decoded into an ipld node +type NodeSerial struct { + CidS string + Data []byte +} + +func (nS NodeSerial) ToIPLDNode() (ipld.Node, error) { + c, err := cid.Decode(nS.CidS) + if err != nil { + return nil, err + } + blk, err := blocks.NewBlockWithCid(nS.Data, c) + if err != nil { + return nil, err + } + node, err := ipld.Decode(blk) + if err != nil { + return nil, err + } + return node, nil +} + +func ToNodeSerial(node ipld.Node) NodeSerial { + return NodeSerial{ + CidS: node.Cid().String(), + Data: node.RawData(), + } +} + // Metric transports information about a peer.ID. It is used to decide // pin allocations by a PinAllocator. IPFS cluster is agnostic to // the Value, which should be interpreted by the PinAllocator. diff --git a/ipfs-cluster-service/main.go b/ipfs-cluster-service/main.go index c4387967..dbcecbb2 100644 --- a/ipfs-cluster-service/main.go +++ b/ipfs-cluster-service/main.go @@ -204,6 +204,7 @@ configuration. }, Action: func(c *cli.Context) error { userSecret, userSecretDefined := userProvidedSecret(c.Bool("custom-secret")) + cfgMgr, cfgs := makeConfigs() defer cfgMgr.Shutdown() // wait for saves diff --git a/logging.go b/logging.go index 881f681c..41ebae59 100644 --- a/logging.go +++ b/logging.go @@ -18,6 +18,7 @@ var LoggingFacilities = map[string]string{ "diskinfo": "INFO", "apitypes": "INFO", "config": "INFO", + "shard": "INFO", } // LoggingFacilitiesExtra provides logging identifiers diff --git a/rpc_api.go b/rpc_api.go index 36f23778..ba804b37 100644 --- a/rpc_api.go +++ b/rpc_api.go @@ -342,6 +342,19 @@ func (rpcapi *RPCAPI) ConsensusPeers(ctx context.Context, in struct{}, out *[]pe return err } +/* + Sharder methods +*/ + +// ShardAddNode runs Sharder.AddNode(). +func (rpcapi *RPCAPI) ShardAddNode(in api.NodeSerial, out *struct{}) error { + node, err := in.ToIPLDNode() + if err != nil { + return err + } + return rpcapi.c.sharder.AddNode(node) +} + /* Peer Manager methods */ diff --git a/shard/sharder.go b/shard/sharder.go index 0f07008a..31c12f60 100644 --- a/shard/sharder.go +++ b/shard/sharder.go @@ -4,10 +4,15 @@ import ( // "github.com/ipfs/ipfs-cluster/api" rpc "github.com/hsanjuan/go-libp2p-gorpc" + cbor "github.com/ipfs/go-ipld-cbor" ipld "github.com/ipfs/go-ipld-format" + logging "github.com/ipfs/go-log" peer "github.com/libp2p/go-libp2p-peer" + mh "github.com/multiformats/go-multihash" ) +var logger = logging.Logger("shard") + // Sharder aggregates incident ipfs file dag nodes into a shard, or group of // nodes. The Sharder builds a reference node in the ipfs-cluster DAG to // reference the nodes in a shard. This component distributes shards among @@ -19,12 +24,14 @@ type Sharder struct { currentShard ipld.Node byteCount int byteThreshold int + + allocSize int } // NewSharder returns a new sharder for use by an ipfs-cluster. In the future // this may take in a shard-config -func NewSharder() (*Sharder, error) { - return &Sharder{}, nil +func NewSharder(cfg *Config) (*Sharder, error) { + return &Sharder{allocSize: cfg.AllocSize}, nil } // SetClient registers the rpcClient used by the Sharder to communicate with @@ -38,6 +45,21 @@ func (s *Sharder) Shutdown() error { return nil } +/* what we really need to do is track links in a cluster dag +proto object, and only after we have tallied them all up do we +call WrapObject to serialize go slice of links into a cbor node*/ +func newClusterDAGNode() (*cbor.Node, error) { + return cbor.WrapObject(struct{}{}, mh.SHA2_256, mh.DefaultLengths[mh.SHA2_256]) +} + +func clusterDAGAppend(clusterNode *cbor.Node, child ipld.Node) error { + lnk, err := ipld.MakeLink(child) + if err != nil { + return err + } + +} + // AddNode func (s *Sharder) AddNode(node ipld.Node) error {