Adder: fix #1006: Specify block format, hash and length when adding

Currently we were only specifying the block format. When adding with
a custom hash function, even though we produced the right cids, IPFS
did not know the hash function and ended up storing them using SHA256.

Additionally, since NodeWithMeta serializes the CID, we do not need
to carry a Format parameter (which specifies the Codec): it is already
embedded.

Tests have been added and BlockPut in ipfshttp now checks that the
response's CID matches the data sent. This will catch errors like
what was happening, but also any data corruption between cluster and
IPFS during the block upload.
This commit is contained in:
Hector Sanjuan 2020-03-06 16:35:15 +01:00
parent 5d44275d0e
commit ede1b89252
6 changed files with 78 additions and 46 deletions

View File

@ -93,20 +93,10 @@ func ipldNodeToNodeWithMeta(n ipld.Node) *api.NodeWithMeta {
logger.Warning(err) logger.Warning(err)
} }
format, ok := cid.CodecToStr[n.Cid().Type()]
if !ok {
format = ""
logger.Warning("unsupported cid type, treating as v0")
}
if n.Cid().Prefix().Version == 0 {
format = "v0"
}
return &api.NodeWithMeta{ return &api.NodeWithMeta{
Cid: n.Cid(), Cid: n.Cid(),
Data: n.RawData(), Data: n.RawData(),
CumSize: size, CumSize: size,
Format: format,
} }
} }

View File

@ -883,7 +883,6 @@ type NodeWithMeta struct {
Data []byte `codec:"d,omitempty"` Data []byte `codec:"d,omitempty"`
Cid cid.Cid `codec:"c,omitempty"` Cid cid.Cid `codec:"c,omitempty"`
CumSize uint64 `codec:"s,omitempty"` // Cumulative size CumSize uint64 `codec:"s,omitempty"` // Cumulative size
Format string `codec:"f,omitempty"`
} }
// Size returns how big is the block. It is different from CumSize, which // Size returns how big is the block. It is different from CumSize, which

View File

@ -27,6 +27,7 @@ import (
rpc "github.com/libp2p/go-libp2p-gorpc" rpc "github.com/libp2p/go-libp2p-gorpc"
madns "github.com/multiformats/go-multiaddr-dns" madns "github.com/multiformats/go-multiaddr-dns"
manet "github.com/multiformats/go-multiaddr-net" manet "github.com/multiformats/go-multiaddr-net"
"github.com/multiformats/go-multihash"
"go.opencensus.io/plugin/ochttp" "go.opencensus.io/plugin/ochttp"
"go.opencensus.io/plugin/ochttp/propagation/tracecontext" "go.opencensus.io/plugin/ochttp/propagation/tracecontext"
@ -112,6 +113,11 @@ type ipfsSwarmPeersResp struct {
Peers []ipfsPeer Peers []ipfsPeer
} }
type ipfsBlockPutResp struct {
Key string
Size int
}
type ipfsPeer struct { type ipfsPeer struct {
Peer string Peer string
} }
@ -887,16 +893,49 @@ func (ipfs *Connector) BlockPut(ctx context.Context, b *api.NodeWithMeta) error
) )
multiFileR := files.NewMultiFileReader(mapDir, true) multiFileR := files.NewMultiFileReader(mapDir, true)
if b.Format == "" {
b.Format = "v0" q := make(url.Values, 3)
prefix := b.Cid.Prefix()
format, ok := cid.CodecToStr[prefix.Codec]
if !ok {
return fmt.Errorf("cannot find name for the blocks' CID codec: %x", prefix.Codec)
} }
url := "block/put?f=" + b.Format q.Set("format", format)
mhType, ok := multihash.Codes[prefix.MhType]
if !ok {
return fmt.Errorf("cannot find name for the blocks' Multihash type: %x", prefix.MhType)
}
q.Set("mhtype", mhType)
q.Set("mhlen", strconv.Itoa(prefix.MhLength))
url := "block/put?" + q.Encode()
contentType := "multipart/form-data; boundary=" + multiFileR.Boundary() contentType := "multipart/form-data; boundary=" + multiFileR.Boundary()
_, err := ipfs.postCtx(ctx, url, contentType, multiFileR) body, err := ipfs.postCtx(ctx, url, contentType, multiFileR)
if err != nil {
return err return err
} }
var res ipfsBlockPutResp
err = json.Unmarshal(body, &res)
if err != nil {
return err
}
logger.Debug("block/put response CID", res.Key)
respCid, err := cid.Decode(res.Key)
if err != nil {
logger.Error("cannot parse CID from BlockPut response")
return err
}
if !respCid.Equals(b.Cid) {
return fmt.Errorf("blockPut response CID (%s) does not match the block sent (%s)", respCid, b.Cid)
}
return nil
}
// BlockGet retrieves an ipfs block with the given cid // BlockGet retrieves an ipfs block with the given cid
func (ipfs *Connector) BlockGet(ctx context.Context, c cid.Cid) ([]byte, error) { func (ipfs *Connector) BlockGet(ctx context.Context, c cid.Cid) ([]byte, error) {
ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/BlockGet") ctx, span := trace.StartSpan(ctx, "ipfsconn/ipfshttp/BlockGet")

View File

@ -288,11 +288,9 @@ func TestBlockPut(t *testing.T) {
defer mock.Close() defer mock.Close()
defer ipfs.Shutdown(ctx) defer ipfs.Shutdown(ctx)
data := []byte(test.Cid4Data)
err := ipfs.BlockPut(ctx, &api.NodeWithMeta{ err := ipfs.BlockPut(ctx, &api.NodeWithMeta{
Data: data, Data: []byte(test.Cid4Data),
Cid: test.Cid4, Cid: test.Cid4,
Format: "raw",
}) })
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -316,7 +314,6 @@ func TestBlockGet(t *testing.T) {
err = ipfs.BlockPut(ctx, &api.NodeWithMeta{ err = ipfs.BlockPut(ctx, &api.NodeWithMeta{
Data: test.ShardData, Data: test.ShardData,
Cid: test.ShardCid, Cid: test.ShardCid,
Format: "cbor",
}) })
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)

View File

@ -10,9 +10,11 @@ var (
Cid1, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmq") Cid1, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmq")
Cid2, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmma") Cid2, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmma")
Cid3, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmb") Cid3, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmb")
Cid4, _ = cid.Decode("zb2rhiKhUepkTMw7oFfBUnChAN7ABAvg2hXUwmTBtZ6yxuc57") Cid4Data = "Cid4Data"
// Cid resulting from block put using blake2b-256 and raw format
Cid4, _ = cid.Decode("bafk2bzaceawsyhsnrwwy5mtit2emnjfalkxsyq2p2ptd6fuliolzwwjbs42fq")
Cid5, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmd") Cid5, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmd")
Cid4Data = "Cid4Data" // Cid resulting from block put NOT ipfs add
SlowCid1, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmd") SlowCid1, _ = cid.Decode("QmP63DkAFEnDYNjDYBpyNDfttu1fvUw99x1brscPzpqmmd")
CidResolved, _ = cid.Decode("zb2rhiKhUepkTMw7oFfBUnChAN7ABAvg2hXUwmTBtZ6yxuabc") CidResolved, _ = cid.Decode("zb2rhiKhUepkTMw7oFfBUnChAN7ABAvg2hXUwmTBtZ6yxuabc")
// ErrorCid is meant to be used as a Cid which causes errors. i.e. the // ErrorCid is meant to be used as a Cid which causes errors. i.e. the

View File

@ -18,9 +18,9 @@ import (
"github.com/ipfs/ipfs-cluster/datastore/inmem" "github.com/ipfs/ipfs-cluster/datastore/inmem"
"github.com/ipfs/ipfs-cluster/state" "github.com/ipfs/ipfs-cluster/state"
"github.com/ipfs/ipfs-cluster/state/dsstate" "github.com/ipfs/ipfs-cluster/state/dsstate"
"github.com/multiformats/go-multihash"
cid "github.com/ipfs/go-cid" cid "github.com/ipfs/go-cid"
u "github.com/ipfs/go-ipfs-util"
cors "github.com/rs/cors" cors "github.com/rs/cors"
) )
@ -291,8 +291,9 @@ func (m *IpfsMock) handler(w http.ResponseWriter, r *http.Request) {
goto ERROR goto ERROR
} }
if ok { if ok {
if c.Equals(Cid4) { // this a v1 cid. Do not return default-base32 if c.Equals(Cid4) {
w.Write([]byte(`{ "Keys": { "zb2rhiKhUepkTMw7oFfBUnChAN7ABAvg2hXUwmTBtZ6yxuc57": { "Type": "recursive" }}}`)) // this a v1 cid. Do not return default-base32 but base58btc encoding of it
w.Write([]byte(`{ "Keys": { "zCT5htkdztJi3x4zBNHo8TRvGHPLTdHUdCLKgTGMgQcRKSLoWxK1": { "Type": "recursive" }}}`))
return return
} }
@ -349,25 +350,29 @@ func (m *IpfsMock) handler(w http.ResponseWriter, r *http.Request) {
} }
// Parse cid from data and format and add to mock block-store // Parse cid from data and format and add to mock block-store
query := r.URL.Query() query := r.URL.Query()
format, ok := query["f"] format := cid.Codecs[query.Get("format")]
if !ok || len(format) != 1 { mhType := multihash.Names[query.Get("mhtype")]
goto ERROR mhLen, _ := strconv.Atoi(query.Get("mhLen"))
}
var c string var builder cid.Builder
hash := u.Hash(data) if format == cid.DagProtobuf && mhType == multihash.SHA2_256 {
codec, ok := cid.Codecs[format[0]] builder = cid.V0Builder{}
if !ok {
goto ERROR
}
if format[0] == "v0" {
c = cid.NewCidV0(hash).String()
} else { } else {
c = cid.NewCidV1(codec, hash).String() builder = cid.V1Builder{
Codec: format,
MhType: mhType,
MhLength: mhLen,
} }
m.BlockStore[c] = data }
c, err := builder.Sum(data)
if err != nil {
goto ERROR
}
m.BlockStore[c.String()] = data
resp := mockBlockPutResp{ resp := mockBlockPutResp{
Key: c, Key: c.String(),
} }
j, _ := json.Marshal(resp) j, _ := json.Marshal(resp)
w.Write(j) w.Write(j)