Finish rebase of sharding branch. It builds!

License: MIT
Signed-off-by: Hector Sanjuan <code@hector.link>
This commit is contained in:
Hector Sanjuan 2018-06-28 17:01:39 +02:00
parent 463840bab3
commit 68ec9316f7
12 changed files with 78 additions and 88 deletions

View File

@ -53,7 +53,7 @@ func (a *AddSession) finishLocalAdd(rootCid string, replMin, replMax int) error
pinS := api.PinSerial{
Cid: rootCid,
Type: api.DataType,
Type: int(api.DataType),
Recursive: true,
ReplicationFactorMin: replMin,
ReplicationFactorMax: replMax,

View File

@ -12,6 +12,7 @@ import (
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"math/rand"
"net"
"net/http"
@ -828,7 +829,7 @@ func parseCidOrError(w http.ResponseWriter, r *http.Request) types.PinSerial {
pin := types.PinSerial{
Cid: hash,
Type: types.DataType,
Type: int(types.DataType),
}
queryValues := r.URL.Query()

View File

@ -518,30 +518,6 @@ func (addrsS MultiaddrsSerial) ToMultiaddrs() []ma.Multiaddr {
return addrs
}
// PeersToStrings IDB58Encodes a list of peers.
func PeersToStrings(peers []peer.ID) []string {
strs := make([]string, len(peers))
for i, p := range peers {
if p != "" {
strs[i] = peer.IDB58Encode(p)
}
}
return strs
}
// StringsToPeers decodes peer.IDs from strings.
func StringsToPeers(strs []string) []peer.ID {
peers := make([]peer.ID, len(strs))
for i, p := range strs {
var err error
peers[i], err = peer.IDB58Decode(p)
if err != nil {
logger.Error(p, err)
}
}
return peers
}
// CidsToStrings encodes cid.Cids to strings.
func CidsToStrings(cids []*cid.Cid) []string {
strs := make([]string, len(cids))
@ -566,7 +542,8 @@ func StringsToCidSet(strs []string) *cid.Set {
// PinType values
const (
DataType PinType = iota + 1
BadType PinType = iota
DataType
MetaType
CdagType
ShardType
@ -603,7 +580,7 @@ func PinTypeFromString(str string) PinType {
case "all":
return AllType
default:
return PinType(0) // invalid string
return BadType
}
}

View File

@ -9,8 +9,8 @@ import (
"github.com/ipfs/ipfs-cluster/api"
offline "github.com/ipfs/go-ipfs-exchange-offline"
bserv "github.com/ipfs/go-ipfs/blockservice"
"github.com/ipfs/go-ipfs/exchange/offline"
balanced "github.com/ipfs/go-ipfs/importer/balanced"
ihelper "github.com/ipfs/go-ipfs/importer/helpers"
trickle "github.com/ipfs/go-ipfs/importer/trickle"

View File

@ -15,6 +15,7 @@ import (
"github.com/ipfs/ipfs-cluster/monitor/basic"
"github.com/ipfs/ipfs-cluster/monitor/pubsubmon"
"github.com/ipfs/ipfs-cluster/pintracker/maptracker"
"github.com/ipfs/ipfs-cluster/sharder"
)
type cfgs struct {
@ -27,6 +28,7 @@ type cfgs struct {
pubsubmonCfg *pubsubmon.Config
diskInfCfg *disk.Config
numpinInfCfg *numpin.Config
sharderCfg *sharder.Config
}
func makeConfigs() (*config.Manager, *cfgs) {
@ -40,6 +42,7 @@ func makeConfigs() (*config.Manager, *cfgs) {
pubsubmonCfg := &pubsubmon.Config{}
diskInfCfg := &disk.Config{}
numpinInfCfg := &numpin.Config{}
sharderCfg := &sharder.Config{}
cfg.RegisterComponent(config.Cluster, clusterCfg)
cfg.RegisterComponent(config.API, apiCfg)
cfg.RegisterComponent(config.IPFSConn, ipfshttpCfg)
@ -49,7 +52,8 @@ func makeConfigs() (*config.Manager, *cfgs) {
cfg.RegisterComponent(config.Monitor, pubsubmonCfg)
cfg.RegisterComponent(config.Informer, diskInfCfg)
cfg.RegisterComponent(config.Informer, numpinInfCfg)
return cfg, &cfgs{clusterCfg, apiCfg, ipfshttpCfg, consensusCfg, trackerCfg, monCfg, pubsubmonCfg, diskInfCfg, numpinInfCfg}
cfg.RegisterComponent(config.Sharder, sharderCfg)
return cfg, &cfgs{clusterCfg, apiCfg, ipfshttpCfg, consensusCfg, trackerCfg, monCfg, pubsubmonCfg, diskInfCfg, numpinInfCfg, sharderCfg}
}
func saveConfig(cfg *config.Manager, force bool) {

View File

@ -23,6 +23,7 @@ import (
"github.com/ipfs/ipfs-cluster/monitor/pubsubmon"
"github.com/ipfs/ipfs-cluster/pintracker/maptracker"
"github.com/ipfs/ipfs-cluster/pstoremgr"
"github.com/ipfs/ipfs-cluster/sharder"
"github.com/ipfs/ipfs-cluster/state/mapstate"
ma "github.com/multiformats/go-multiaddr"
@ -131,6 +132,9 @@ func createCluster(
ipfscluster.ReadyTimeout = cfgs.consensusCfg.WaitForLeaderTimeout + 5*time.Second
sharder, err := sharder.New(cfgs.sharderCfg)
checkErr("creating Sharder component", err)
return ipfscluster.NewCluster(
host,
cfgs.clusterCfg,
@ -142,6 +146,7 @@ func createCluster(
mon,
alloc,
informer,
sharder,
)
}

View File

@ -590,7 +590,7 @@ func (ipfs *Connector) ID() (api.IPFSID, error) {
ctx, cancel := context.WithTimeout(ipfs.ctx, ipfs.config.IPFSRequestTimeout)
defer cancel()
id := api.IPFSID{}
body, err := ipfs.postCtx(ctx, "id", nil)
body, err := ipfs.postCtx(ctx, "id", "", nil)
if err != nil {
id.Error = err.Error()
return id, err
@ -644,7 +644,7 @@ func (ipfs *Connector) Pin(ctx context.Context, hash *cid.Cid, recursive bool) e
}
path := fmt.Sprintf("pin/add?arg=%s&recursive=%t", hash, recursive)
_, err = ipfs.postCtx(ctx, path, nil)
_, err = ipfs.postCtx(ctx, path, "", nil)
if err == nil {
logger.Info("IPFS Pin request succeeded: ", hash)
}
@ -665,7 +665,7 @@ func (ipfs *Connector) Unpin(ctx context.Context, hash *cid.Cid) error {
}
if pinStatus.IsPinned() {
path := fmt.Sprintf("pin/rm?arg=%s", hash)
_, err := ipfs.postCtx(ctx, path, nil)
_, err := ipfs.postCtx(ctx, path, "", nil)
if err == nil {
logger.Info("IPFS Unpin request succeeded:", hash)
}
@ -681,7 +681,7 @@ func (ipfs *Connector) Unpin(ctx context.Context, hash *cid.Cid) error {
func (ipfs *Connector) PinLs(ctx context.Context, typeFilter string) (map[string]api.IPFSPinStatus, error) {
ctx, cancel := context.WithTimeout(ctx, ipfs.config.IPFSRequestTimeout)
defer cancel()
body, err := ipfs.postCtx(ctx, "pin/ls?type="+typeFilter, nil)
body, err := ipfs.postCtx(ctx, "pin/ls?type="+typeFilter, "", nil)
// Some error talking to the daemon
if err != nil {
@ -709,7 +709,7 @@ func (ipfs *Connector) PinLsCid(ctx context.Context, hash *cid.Cid) (api.IPFSPin
ctx, cancel := context.WithTimeout(ctx, ipfs.config.IPFSRequestTimeout)
defer cancel()
lsPath := fmt.Sprintf("pin/ls?arg=%s&type=recursive", hash)
body, err := ipfs.postCtx(ctx, lsPath, nil)
body, err := ipfs.postCtx(ctx, lsPath, "", nil)
// Network error, daemon down
if body == nil && err != nil {
@ -837,6 +837,7 @@ func (ipfs *Connector) ConnectSwarms() error {
_, err := ipfs.postCtx(
ctx,
fmt.Sprintf("swarm/connect?arg=%s", addr),
"",
nil,
)
if err != nil {
@ -855,7 +856,7 @@ func (ipfs *Connector) ConnectSwarms() error {
func (ipfs *Connector) ConfigKey(keypath string) (interface{}, error) {
ctx, cancel := context.WithTimeout(ipfs.ctx, ipfs.config.IPFSRequestTimeout)
defer cancel()
res, err := ipfs.postCtx(ctx, "config/show", nil)
res, err := ipfs.postCtx(ctx, "config/show", "", nil)
if err != nil {
logger.Error(err)
return nil, err
@ -901,7 +902,7 @@ func getConfigValue(path []string, cfg map[string]interface{}) (interface{}, err
func (ipfs *Connector) FreeSpace() (uint64, error) {
ctx, cancel := context.WithTimeout(ipfs.ctx, ipfs.config.IPFSRequestTimeout)
defer cancel()
res, err := ipfs.postCtx(ctx, "repo/stat", nil)
res, err := ipfs.postCtx(ctx, "repo/stat", "", nil)
if err != nil {
logger.Error(err)
return 0, err
@ -921,7 +922,7 @@ func (ipfs *Connector) FreeSpace() (uint64, error) {
func (ipfs *Connector) RepoSize() (uint64, error) {
ctx, cancel := context.WithTimeout(ipfs.ctx, ipfs.config.IPFSRequestTimeout)
defer cancel()
res, err := ipfs.postCtx(ctx, "repo/stat", nil)
res, err := ipfs.postCtx(ctx, "repo/stat", "", nil)
if err != nil {
logger.Error(err)
return 0, err
@ -941,7 +942,7 @@ func (ipfs *Connector) SwarmPeers() (api.SwarmPeers, error) {
ctx, cancel := context.WithTimeout(ipfs.ctx, ipfs.config.IPFSRequestTimeout)
defer cancel()
swarm := api.SwarmPeers{}
res, err := ipfs.postCtx(ctx, "swarm/peers", nil)
res, err := ipfs.postCtx(ctx, "swarm/peers", "", nil)
if err != nil {
logger.Error(err)
return swarm, err
@ -965,6 +966,32 @@ func (ipfs *Connector) SwarmPeers() (api.SwarmPeers, error) {
return swarm, nil
}
// BlockPut triggers an ipfs block put on the given data, inserting the block
// into the ipfs daemon's repo.
func (ipfs *Connector) BlockPut(b api.NodeWithMeta) error {
ctx, cancel := context.WithTimeout(ipfs.ctx, ipfs.config.IPFSRequestTimeout)
defer cancel()
r := ioutil.NopCloser(bytes.NewReader(b.Data))
rFile := files.NewReaderFile("", "", r, nil)
sliceFile := files.NewSliceFile("", "", []files.File{rFile}) // IPFS reqs require a wrapping directory
multiFileR := files.NewMultiFileReader(sliceFile, true)
if b.Format == "" {
b.Format = "v0"
}
url := "block/put?f=" + b.Format
contentType := "multipart/form-data; boundary=" + multiFileR.Boundary()
_, err := ipfs.postCtx(ctx, url, contentType, multiFileR)
return err
}
// BlockGet retrieves an ipfs block with the given cid
func (ipfs *Connector) BlockGet(c *cid.Cid) ([]byte, error) {
ctx, cancel := context.WithTimeout(ipfs.ctx, ipfs.config.IPFSRequestTimeout)
defer cancel()
url := "block/get?arg=" + c.String()
return ipfs.postCtx(ctx, url, "", nil)
}
// extractArgument extracts the cid argument from a url.URL, either via
// the query string parameters or from the url path itself.
func extractArgument(u *url.URL) (string, bool) {
@ -985,25 +1012,3 @@ func extractArgument(u *url.URL) (string, bool) {
}
return "", false
}
// BlockPut triggers an ipfs block put on the given data, inserting the block
// into the ipfs daemon's repo.
func (ipfs *Connector) BlockPut(b api.NodeWithMeta) error {
r := ioutil.NopCloser(bytes.NewReader(b.Data))
rFile := files.NewReaderFile("", "", r, nil)
sliceFile := files.NewSliceFile("", "", []files.File{rFile}) // IPFS reqs require a wrapping directory
multiFileR := files.NewMultiFileReader(sliceFile, true)
if b.Format == "" {
b.Format = "v0"
}
url := "block/put?f=" + b.Format
contentType := "multipart/form-data; boundary=" + multiFileR.Boundary()
_, err := ipfs.post(url, contentType, multiFileR)
return err
}
// BlockGet retrieves an ipfs block with the given cid
func (ipfs *Connector) BlockGet(c *cid.Cid) ([]byte, error) {
url := "block/get?arg=" + c.String()
return ipfs.post(url, "", nil)
}

View File

@ -25,12 +25,6 @@
"name": "go-libp2p-raft",
"version": "1.2.10"
},
{
"author": "whyrusleeping",
"hash": "QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ",
"name": "go-cid",
"version": "0.7.18"
},
{
"author": "urfave",
"hash": "Qmc1AtgBdoUHP8oYSqU81NRYdzohmF45t5XNwVMvhCxsBA",
@ -81,9 +75,9 @@
},
{
"author": "ipfs",
"hash": "QmXmEFbXmC6ExXg3H8MV2FhminVLWf9qJYUdr83DnNCy86",
"hash": "QmZqrWkDmNwNzQYQdfLHMiKGbVo4jQwNib2iUBzWqGc9JE",
"name": "go-ipfs-api",
"version": "1.2.8"
"version": "1.2.9"
},
{
"author": "whyrusleeping",
@ -98,20 +92,15 @@
"version": "0.7.21"
},
{
"hash": "QmceUdzxkimdYsgtX733uNgzf1DLHyBKN6ehGSp85ayppM",
"hash": "QmdE4gMduCKCGAcczM2F5ioYDfdeKuPix138wrES1YSr7f",
"name": "go-ipfs-cmdkit",
"version": "1.0.0"
},
{
"hash": "QmXporsyf5xMvffd2eiTDoq85dNpYUynGJhfabzDjwP8uR",
"name": "go-ipfs",
"version": "0.4.14-rc1"
"version": "1.1.1"
},
{
"author": "whyrusleeping",
"hash": "Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES",
"hash": "QmWi2BYBL5gJ3CiAiQchg6rn1A8iBsrWy51EYxvHVjFvLb",
"name": "go-ipld-format",
"version": "0.5.3"
"version": "0.5.4"
},
{
"author": "satori",
@ -121,21 +110,26 @@
},
{
"author": "hsanjuan",
"hash": "QmPwNSAKhfSDEjQ2LYx8bemvnoyXYTaL96JxsAvjzphT75",
"hash": "QmdpuJBPBZ6sLPj9BQpn3Rpi38BT2cF1QMiUfyzNWeySW4",
"name": "go-ipfs-blockstore",
"version": "0.0.3"
"version": "0.0.8"
},
{
"author": "hsanjuan",
"hash": "QmWo8jYc19ppG7YoTsrr2kEtLRbARTJho5oNXFTR6B7Peq",
"hash": "QmXnzH7wowyLZy8XJxxaQCVTgLMcDXdMBznmsrmQWCyiQV",
"name": "go-ipfs-chunker",
"version": "0.0.2"
"version": "0.0.6"
},
{
"author": "hector",
"hash": "Qmb3jLEFAQrqdVgWUajqEyuuDoavkSq1XQXz6tWdFWF995",
"hash": "QmUWsXLvYYDAaoAt9TPZpFX4ffHHMg46AHrz1ZLTN5ABbe",
"name": "go-ipfs-posinfo",
"version": "0.0.1"
"version": "0.0.2"
},
{
"hash": "QmdmoY2foMtfYWLE4PWUzAjcPRNMLMM4Q4wdAE27s9JLxv",
"name": "go-ipfs",
"version": "0.4.16-rc1"
}
],
"gxVersion": "0.11.0",

View File

@ -204,7 +204,7 @@ func (mpt *MapPinTracker) Track(c api.Pin) error {
// The problem is remote/unpin operation won't be cancelled
// but I don't know how bad is that
// Also, this is dup code
if c.Type == ShardType {
if c.Type == api.ShardType {
// cancel any other ops
op := mpt.optracker.TrackNewOperation(c, optracker.OperationSharded, optracker.PhaseInProgress)
if op == nil {

View File

@ -25,6 +25,9 @@ const (
OperationUnpin
// OperationRemote represents an noop operation
OperationRemote
// OperationSharded represents a pin which points to shard
// FIXME
OperationSharded
)
//go:generate stringer -type=Phase

View File

@ -10,6 +10,7 @@ import (
cid "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
peer "github.com/libp2p/go-libp2p-peer"
uuid "github.com/satori/go.uuid"
)
@ -42,9 +43,9 @@ type Sharder struct {
// currentShard: make(map[string]*cid.Cid),
// NewSharder returns a new sharder for use by an ipfs-cluster. In the future
// New returns a new sharder for use by an ipfs-cluster. In the future
// this may take in a shard-config
func NewSharder(cfg *Config) (*Sharder, error) {
func New(cfg *Config) (*Sharder, error) {
logger.Debugf("The alloc size provided: %d", cfg.AllocSize)
return &Sharder{
allocSize: cfg.AllocSize,

View File

@ -116,7 +116,7 @@ func (st *mapStateV3) next() migrateable {
ReplicationFactorMin: v.ReplicationFactorMin,
ReplicationFactorMax: v.ReplicationFactorMax,
Recursive: true,
Type: api.DataType,
Type: int(api.DataType),
Parents: nil,
Clusterdag: "",
}