diff --git a/api/pb/types.pb.go b/api/pb/types.pb.go new file mode 100644 index 00000000..285b274e --- /dev/null +++ b/api/pb/types.pb.go @@ -0,0 +1,200 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: types.proto + +package api_pb + +import ( + fmt "fmt" + proto "github.com/golang/protobuf/proto" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +type Pin_PinType int32 + +const ( + Pin_BadType Pin_PinType = 0 + Pin_DataType Pin_PinType = 1 + Pin_MetaType Pin_PinType = 2 + Pin_ClusterDAGType Pin_PinType = 3 + Pin_ShardType Pin_PinType = 4 +) + +var Pin_PinType_name = map[int32]string{ + 0: "BadType", + 1: "DataType", + 2: "MetaType", + 3: "ClusterDAGType", + 4: "ShardType", +} + +var Pin_PinType_value = map[string]int32{ + "BadType": 0, + "DataType": 1, + "MetaType": 2, + "ClusterDAGType": 3, + "ShardType": 4, +} + +func (x Pin_PinType) String() string { + return proto.EnumName(Pin_PinType_name, int32(x)) +} + +func (Pin_PinType) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_d938547f84707355, []int{0, 0} +} + +type Pin struct { + Cid []byte `protobuf:"bytes,1,opt,name=Cid,proto3" json:"Cid,omitempty"` + Type Pin_PinType `protobuf:"varint,2,opt,name=Type,proto3,enum=api.pb.Pin_PinType" json:"Type,omitempty"` + Allocations [][]byte `protobuf:"bytes,3,rep,name=Allocations,proto3" json:"Allocations,omitempty"` + MaxDepth int32 `protobuf:"zigzag32,4,opt,name=MaxDepth,proto3" json:"MaxDepth,omitempty"` + Reference []byte `protobuf:"bytes,5,opt,name=Reference,proto3" json:"Reference,omitempty"` + ReplicationFactorMin int32 `protobuf:"zigzag32,8,opt,name=ReplicationFactorMin,proto3" json:"ReplicationFactorMin,omitempty"` + ReplicationFactorMax int32 `protobuf:"zigzag32,9,opt,name=ReplicationFactorMax,proto3" json:"ReplicationFactorMax,omitempty"` + Name string `protobuf:"bytes,10,opt,name=Name,proto3" json:"Name,omitempty"` + ShardSize uint64 `protobuf:"varint,11,opt,name=ShardSize,proto3" json:"ShardSize,omitempty"` + Metadata map[string]string `protobuf:"bytes,12,rep,name=Metadata,proto3" json:"Metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Pin) Reset() { *m = Pin{} } +func (m *Pin) String() string { return proto.CompactTextString(m) } +func (*Pin) ProtoMessage() {} +func (*Pin) Descriptor() ([]byte, []int) { + return fileDescriptor_d938547f84707355, []int{0} +} + +func (m *Pin) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Pin.Unmarshal(m, b) +} +func (m *Pin) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Pin.Marshal(b, m, deterministic) +} +func (m *Pin) XXX_Merge(src proto.Message) { + xxx_messageInfo_Pin.Merge(m, src) +} +func (m *Pin) XXX_Size() int { + return xxx_messageInfo_Pin.Size(m) +} +func (m *Pin) XXX_DiscardUnknown() { + xxx_messageInfo_Pin.DiscardUnknown(m) +} + +var xxx_messageInfo_Pin proto.InternalMessageInfo + +func (m *Pin) GetCid() []byte { + if m != nil { + return m.Cid + } + return nil +} + +func (m *Pin) GetType() Pin_PinType { + if m != nil { + return m.Type + } + return Pin_BadType +} + +func (m *Pin) GetAllocations() [][]byte { + if m != nil { + return m.Allocations + } + return nil +} + +func (m *Pin) GetMaxDepth() int32 { + if m != nil { + return m.MaxDepth + } + return 0 +} + +func (m *Pin) GetReference() []byte { + if m != nil { + return m.Reference + } + return nil +} + +func (m *Pin) GetReplicationFactorMin() int32 { + if m != nil { + return m.ReplicationFactorMin + } + return 0 +} + +func (m *Pin) GetReplicationFactorMax() int32 { + if m != nil { + return m.ReplicationFactorMax + } + return 0 +} + +func (m *Pin) GetName() string { + if m != nil { + return m.Name + } + return "" +} + +func (m *Pin) GetShardSize() uint64 { + if m != nil { + return m.ShardSize + } + return 0 +} + +func (m *Pin) GetMetadata() map[string]string { + if m != nil { + return m.Metadata + } + return nil +} + +func init() { + proto.RegisterEnum("api.pb.Pin_PinType", Pin_PinType_name, Pin_PinType_value) + proto.RegisterType((*Pin)(nil), "api.pb.Pin") + proto.RegisterMapType((map[string]string)(nil), "api.pb.Pin.MetadataEntry") +} + +func init() { proto.RegisterFile("types.proto", fileDescriptor_d938547f84707355) } + +var fileDescriptor_d938547f84707355 = []byte{ + // 352 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x52, 0x4d, 0x0f, 0xd2, 0x30, + 0x18, 0xb6, 0xac, 0xb0, 0xed, 0xdd, 0x20, 0xf3, 0x95, 0x43, 0x25, 0x1e, 0x1a, 0x2e, 0xee, 0xb4, + 0x03, 0xc6, 0xc4, 0xe8, 0x09, 0x41, 0x4d, 0x48, 0x30, 0xa4, 0xe8, 0x0f, 0x28, 0xac, 0x86, 0xc6, + 0xb9, 0x2d, 0xa3, 0x18, 0xe6, 0x7f, 0xf0, 0x3f, 0x9b, 0x76, 0x08, 0x98, 0xe0, 0xa1, 0xc9, 0xf3, + 0xf1, 0x7e, 0xb4, 0x4f, 0x0a, 0x91, 0x69, 0x6b, 0x75, 0xcc, 0xea, 0xa6, 0x32, 0x15, 0x0e, 0x64, + 0xad, 0xb3, 0x7a, 0x37, 0xfd, 0x4d, 0xc1, 0xdb, 0xe8, 0x12, 0x13, 0xf0, 0x16, 0x3a, 0x67, 0x84, + 0x93, 0x34, 0x16, 0x16, 0xe2, 0x4b, 0xa0, 0x5f, 0xda, 0x5a, 0xb1, 0x1e, 0x27, 0xe9, 0x68, 0xf6, + 0x2c, 0xeb, 0x1a, 0xb2, 0x8d, 0x2e, 0xed, 0xb1, 0x96, 0x70, 0x05, 0xc8, 0x21, 0x9a, 0x17, 0x45, + 0xb5, 0x97, 0x46, 0x57, 0xe5, 0x91, 0x79, 0xdc, 0x4b, 0x63, 0x71, 0x2f, 0xe1, 0x04, 0x82, 0xb5, + 0x3c, 0x2f, 0x55, 0x6d, 0x0e, 0x8c, 0x72, 0x92, 0x3e, 0x15, 0x57, 0x8e, 0x2f, 0x20, 0x14, 0xea, + 0x9b, 0x6a, 0x54, 0xb9, 0x57, 0xac, 0xef, 0xd6, 0xdf, 0x04, 0x9c, 0xc1, 0x58, 0xa8, 0xba, 0xd0, + 0xdd, 0xa4, 0x8f, 0x72, 0x6f, 0xaa, 0x66, 0xad, 0x4b, 0x16, 0xb8, 0x29, 0x0f, 0xbd, 0xc7, 0x3d, + 0xf2, 0xcc, 0xc2, 0xff, 0xf5, 0xc8, 0x33, 0x22, 0xd0, 0xcf, 0xf2, 0x87, 0x62, 0xc0, 0x49, 0x1a, + 0x0a, 0x87, 0xed, 0xcd, 0xb6, 0x07, 0xd9, 0xe4, 0x5b, 0xfd, 0x4b, 0xb1, 0x88, 0x93, 0x94, 0x8a, + 0x9b, 0x80, 0xaf, 0x21, 0x58, 0x2b, 0x23, 0x73, 0x69, 0x24, 0x8b, 0xb9, 0x97, 0x46, 0xb3, 0xe7, + 0xf7, 0x11, 0xfd, 0xf5, 0x3e, 0x94, 0xa6, 0x69, 0xc5, 0xb5, 0x74, 0xf2, 0x0e, 0x86, 0xff, 0x58, + 0x36, 0xf8, 0xef, 0xaa, 0x75, 0xc1, 0x87, 0xc2, 0x42, 0x1c, 0x43, 0xff, 0xa7, 0x2c, 0x4e, 0x5d, + 0xf2, 0xa1, 0xe8, 0xc8, 0xdb, 0xde, 0x1b, 0x32, 0xfd, 0x0a, 0xfe, 0x25, 0x7a, 0x8c, 0xc0, 0x7f, + 0x2f, 0x73, 0x0b, 0x93, 0x27, 0x18, 0x43, 0xb0, 0x94, 0x46, 0x3a, 0x46, 0x2c, 0xb3, 0x2b, 0x1c, + 0xeb, 0x21, 0xc2, 0x68, 0x51, 0x9c, 0x8e, 0x46, 0x35, 0xcb, 0xf9, 0x27, 0xa7, 0x79, 0x38, 0xbc, + 0xbc, 0xcc, 0x51, 0xba, 0xa2, 0xc1, 0x20, 0xf1, 0x57, 0x34, 0xf0, 0x93, 0x60, 0x37, 0x70, 0xdf, + 0xe3, 0xd5, 0x9f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x63, 0x03, 0x06, 0x76, 0x2d, 0x02, 0x00, 0x00, +} diff --git a/api/pb/types.proto b/api/pb/types.proto new file mode 100644 index 00000000..81326c22 --- /dev/null +++ b/api/pb/types.proto @@ -0,0 +1,23 @@ +syntax = "proto3"; +package api.pb; + +message Pin { + bytes Cid = 1; + enum PinType { + BadType = 0; // 1 << iota + DataType = 1; // 2 << iota + MetaType = 2; + ClusterDAGType = 3; + ShardType = 4; + } + PinType Type = 2; + repeated bytes Allocations = 3; + sint32 MaxDepth = 4; + bytes Reference = 5; + reserved 6,7; + sint32 ReplicationFactorMin = 8; + sint32 ReplicationFactorMax = 9; + string Name = 10; + uint64 ShardSize = 11; + map Metadata = 12; +} \ No newline at end of file diff --git a/api/types.go b/api/types.go index 433a9344..83fc2f8c 100644 --- a/api/types.go +++ b/api/types.go @@ -17,6 +17,9 @@ import ( "strings" "time" + pb "github.com/ipfs/ipfs-cluster/api/pb" + + proto "github.com/gogo/protobuf/proto" cid "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log" peer "github.com/libp2p/go-libp2p-peer" @@ -704,10 +707,11 @@ func (pT PinType) String() string { // PinOptions wraps user-defined options for Pins type PinOptions struct { - ReplicationFactorMin int `json:"replication_factor_min"` - ReplicationFactorMax int `json:"replication_factor_max"` - Name string `json:"name"` - ShardSize uint64 `json:"shard_size"` + ReplicationFactorMin int `json:"replication_factor_min" codec:"rn,omitempty"` + ReplicationFactorMax int `json:"replication_factor_max" codec:"rx,omitempty"` + Name string `json:"name" codec:"n,omitempty"` + ShardSize uint64 `json:"shard_size" codec:"s,omitempty"` + Metadata map[string]string `json:"metadata" codec:"m,omitempty"` } // Pin carries all the information associated to a CID that is pinned @@ -749,10 +753,7 @@ func PinCid(c cid.Cid) Pin { // its PinOptions fields with the given options. func PinWithOpts(c cid.Cid, opts PinOptions) Pin { p := PinCid(c) - p.ReplicationFactorMin = opts.ReplicationFactorMin - p.ReplicationFactorMax = opts.ReplicationFactorMax - p.Name = opts.Name - p.ShardSize = opts.ShardSize + p.PinOptions = opts return p } @@ -760,11 +761,11 @@ func PinWithOpts(c cid.Cid, opts PinOptions) Pin { type PinSerial struct { PinOptions - Cid string `json:"cid"` - Type uint64 `json:"type"` - Allocations []string `json:"allocations"` - MaxDepth int `json:"max_depth"` - Reference string `json:"reference"` + Cid string `json:"cid" codec:"c,omitempty"` + Type uint64 `json:"type" codec:"t,omitempty"` + Allocations []string `json:"allocations" codec:"a,omitempty"` + MaxDepth int `json:"max_depth" codec:"d,omitempty"` + Reference string `json:"reference" codec:"r,omitempty"` } // ToSerial converts a Pin to PinSerial. @@ -778,7 +779,6 @@ func (pin Pin) ToSerial() PinSerial { ref = pin.Reference.String() } - n := pin.Name allocs := PeersToStrings(pin.Allocations) return PinSerial{ @@ -787,15 +787,93 @@ func (pin Pin) ToSerial() PinSerial { Type: uint64(pin.Type), MaxDepth: pin.MaxDepth, Reference: ref, - PinOptions: PinOptions{ - Name: n, - ReplicationFactorMin: pin.ReplicationFactorMin, - ReplicationFactorMax: pin.ReplicationFactorMax, - ShardSize: pin.ShardSize, - }, + PinOptions: pin.PinOptions, } } +func convertPinType(t PinType) pb.Pin_PinType { + var i pb.Pin_PinType + for t != 1 { + if t == 0 { + return pb.Pin_BadType + } + t = t >> 1 + i++ + } + return i +} + +// ProtoMarshal marshals this Pin using probobuf. +func (pin *Pin) ProtoMarshal() ([]byte, error) { + allocs := make([][]byte, len(pin.Allocations), len(pin.Allocations)) + for i, pid := range pin.Allocations { + bs, err := pid.Marshal() + if err != nil { + return nil, err + } + allocs[i] = bs + } + + pbPin := &pb.Pin{ + Cid: pin.Cid.Bytes(), + Type: convertPinType(pin.Type), + Allocations: allocs, + MaxDepth: int32(pin.MaxDepth), + Reference: pin.Reference.Bytes(), + ReplicationFactorMin: int32(pin.ReplicationFactorMin), + ReplicationFactorMax: int32(pin.ReplicationFactorMax), + Name: pin.Name, + ShardSize: pin.ShardSize, + Metadata: pin.Metadata, + } + return proto.Marshal(pbPin) +} + +// ProtoUnmarshal unmarshals this fields from protobuf-encoded bytes. +func (pin *Pin) ProtoUnmarshal(data []byte) error { + pbPin := pb.Pin{} + err := proto.Unmarshal(data, &pbPin) + if err != nil { + return err + } + ci, err := cid.Cast(pbPin.GetCid()) + if err != nil { + pin.Cid = cid.Undef + } else { + pin.Cid = ci + } + + pin.Type = 1 << uint64(pbPin.GetType()) + + pbAllocs := pbPin.GetAllocations() + lenAllocs := len(pbAllocs) + allocs := make([]peer.ID, lenAllocs, lenAllocs) + for i, pidb := range pbAllocs { + pid, err := peer.IDFromBytes(pidb) + if err != nil { + return err + } + allocs[i] = pid + } + + pin.Allocations = allocs + pin.MaxDepth = int(pbPin.GetMaxDepth()) + ref, err := cid.Cast(pbPin.GetReference()) + if err != nil { + pin.Reference = cid.Undef + + } else { + pin.Reference = ref + } + pin.Reference = ref + pin.ReplicationFactorMin = int(pbPin.GetReplicationFactorMin()) + pin.ReplicationFactorMax = int(pbPin.GetReplicationFactorMax()) + pin.Name = pbPin.GetName() + pin.ShardSize = pbPin.GetShardSize() + pin.Metadata = pbPin.GetMetadata() + return nil +} + // Equals checks if two pins are the same (with the same allocations). // If allocations are the same but in different order, they are still // considered equivalent. diff --git a/api/types_test.go b/api/types_test.go index 93367a0a..87e447f9 100644 --- a/api/types_test.go +++ b/api/types_test.go @@ -3,6 +3,7 @@ package api import ( "fmt" "reflect" + "strings" "testing" "time" @@ -304,3 +305,47 @@ func BenchmarkPinSerial_DecodeCid(b *testing.B) { pinS.DecodeCid() } } + +func TestConvertPinType(t *testing.T) { + for _, t1 := range []PinType{BadType, ShardType} { + i := convertPinType(t1) + t2 := PinType(1 << uint64(i)) + if t2 != t1 { + t.Error("bad conversion") + } + } +} + +func checkDupTags(t *testing.T, name string, typ reflect.Type, tags map[string]struct{}) { + if tags == nil { + tags = make(map[string]struct{}) + } + for i := 0; i < typ.NumField(); i++ { + f := typ.Field(i) + + if f.Type.Kind() == reflect.Struct && f.Anonymous { + checkDupTags(t, name, f.Type, tags) + continue + } + + tag := f.Tag.Get(name) + if tag == "" { + continue + } + val := strings.Split(tag, ",")[0] + + t.Logf("%s: '%s:%s'", f.Name, name, val) + _, ok := tags[val] + if ok { + t.Errorf("%s: tag %s already used", f.Name, val) + } + tags[val] = struct{}{} + } +} + +// TestPinTags checks that we are not re-using the same codec tag for +// different fields in the Pin object. +func TestPinTags(t *testing.T) { + typ := reflect.TypeOf(PinSerial{}) + checkDupTags(t, "codec", typ, nil) +} diff --git a/cluster.go b/cluster.go index 81e0951a..b340c88d 100644 --- a/cluster.go +++ b/cluster.go @@ -739,7 +739,7 @@ func (c *Cluster) Join(ctx context.Context, addr ma.Multiaddr) error { // Note that our regular bootstrap process is still running in the // background since we created the cluster. go func() { - c.dht.BootstrapOnce(c.ctx, dht.DefaultBootstrapConfig) + c.dht.BootstrapOnce(ctx, dht.DefaultBootstrapConfig) }() // wait for leader and for state to catch up @@ -1093,7 +1093,7 @@ func (c *Cluster) setupPin(ctx context.Context, pin *api.Pin) error { existing, err := c.PinGet(ctx, pin.Cid) if err == nil && existing.Type != pin.Type { // it exists - return errors.New("cannot repin CID with different tracking method, clear state with pin rm to proceed") + return fmt.Errorf("cannot repin CID with different tracking method, clear state with pin rm to proceed. New: %s. Was: %s", pin.Type, existing.Type) } return checkPinType(pin) } diff --git a/cmd/ipfs-cluster-service/state.go b/cmd/ipfs-cluster-service/state.go index 9cdcc694..77e69045 100644 --- a/cmd/ipfs-cluster-service/state.go +++ b/cmd/ipfs-cluster-service/state.go @@ -6,12 +6,12 @@ import ( "encoding/json" "errors" "io" - "io/ioutil" ipfscluster "github.com/ipfs/ipfs-cluster" "github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/consensus/raft" "github.com/ipfs/ipfs-cluster/pstoremgr" + "github.com/ipfs/ipfs-cluster/state" "github.com/ipfs/ipfs-cluster/state/mapstate" "go.opencensus.io/trace" ) @@ -59,7 +59,7 @@ func export(ctx context.Context, w io.Writer) error { // restoreStateFromDisk returns a mapstate containing the latest // snapshot, a flag set to true when the state format has the // current version and an error -func restoreStateFromDisk(ctx context.Context) (*mapstate.MapState, bool, error) { +func restoreStateFromDisk(ctx context.Context) (state.State, bool, error) { ctx, span := trace.StartSpan(ctx, "daemon/restoreStateFromDisk") defer span.End() @@ -82,11 +82,7 @@ func restoreStateFromDisk(ctx context.Context) (*mapstate.MapState, bool, error) // duplicate reader to both check version and migrate var buf bytes.Buffer r2 := io.TeeReader(r, &buf) - raw, err := ioutil.ReadAll(r2) - if err != nil { - return nil, false, err - } - err = stateFromSnap.Unmarshal(raw) + err = stateFromSnap.Unmarshal(r2) if err != nil { return nil, false, err } @@ -144,11 +140,7 @@ func validateVersion(ctx context.Context, cfg *ipfscluster.Config, cCfg *raft.Co } else if snapExists && err != nil { logger.Error("error after reading last snapshot. Snapshot potentially corrupt.") } else if snapExists && err == nil { - raw, err2 := ioutil.ReadAll(r) - if err2 != nil { - return err2 - } - err2 = state.Unmarshal(raw) + err2 := state.Unmarshal(r) if err2 != nil { logger.Error("error unmarshalling snapshot. Snapshot potentially corrupt.") return err2 @@ -167,7 +159,7 @@ func validateVersion(ctx context.Context, cfg *ipfscluster.Config, cCfg *raft.Co } // ExportState saves a json representation of a state -func exportState(ctx context.Context, state *mapstate.MapState, w io.Writer) error { +func exportState(ctx context.Context, state state.State, w io.Writer) error { ctx, span := trace.StartSpan(ctx, "daemon/exportState") defer span.End() diff --git a/consensus/raft/consensus_test.go b/consensus/raft/consensus_test.go index 4fc6f458..3cf5c7a4 100644 --- a/consensus/raft/consensus_test.go +++ b/consensus/raft/consensus_test.go @@ -98,7 +98,7 @@ func TestConsensusPin(t *testing.T) { time.Sleep(250 * time.Millisecond) st, err := cc.State(ctx) if err != nil { - t.Fatal("error gettinng state:", err) + t.Fatal("error getting state:", err) } pins := st.List(ctx) @@ -228,14 +228,14 @@ func TestConsensusRmPeer(t *testing.T) { // Remove unexisting peer err = cc.RmPeer(ctx, test.TestPeerID1) if err != nil { - t.Error("the operation did not make it to the log:", err) + t.Fatal("the operation did not make it to the log:", err) } // Remove real peer. At least the leader can succeed err = cc2.RmPeer(ctx, cc.host.ID()) err2 := cc.RmPeer(ctx, cc2.host.ID()) if err != nil && err2 != nil { - t.Error("could not remove peer:", err, err2) + t.Fatal("could not remove peer:", err, err2) } err = cc.raft.WaitForPeer(ctx, cc2.host.ID().Pretty(), true) diff --git a/consensus/raft/log_op.go b/consensus/raft/log_op.go index e21789cb..966173e1 100644 --- a/consensus/raft/log_op.go +++ b/consensus/raft/log_op.go @@ -66,6 +66,7 @@ func (op *LogOp) ApplyTo(cstate consensus.State) (consensus.State, error) { case LogOpPin: err = state.Add(ctx, pinS.ToPin()) if err != nil { + logger.Error(err) goto ROLLBACK } // Async, we let the PinTracker take care of any problems @@ -81,6 +82,7 @@ func (op *LogOp) ApplyTo(cstate consensus.State) (consensus.State, error) { case LogOpUnpin: err = state.Rm(ctx, pinS.DecodeCid()) if err != nil { + logger.Error(err) goto ROLLBACK } // Async, we let the PinTracker take care of any problems diff --git a/state/dsstate/datastore.go b/state/dsstate/datastore.go new file mode 100644 index 00000000..5819d7f5 --- /dev/null +++ b/state/dsstate/datastore.go @@ -0,0 +1,311 @@ +// Package dsstate implements the IPFS Cluster state interface using +// an underlying go-datastore. +package dsstate + +import ( + "context" + "errors" + "io" + + "github.com/ipfs/ipfs-cluster/api" + "github.com/ipfs/ipfs-cluster/state" + + cid "github.com/ipfs/go-cid" + ds "github.com/ipfs/go-datastore" + query "github.com/ipfs/go-datastore/query" + dshelp "github.com/ipfs/go-ipfs-ds-help" + logging "github.com/ipfs/go-log" + codec "github.com/ugorji/go/codec" + + trace "go.opencensus.io/trace" +) + +var _ state.State = (*State)(nil) + +var logger = logging.Logger("dsstate") + +// State implements the IPFS Cluster "state" interface by wrapping +// a go-datastore and choosing how api.Pin objects are stored +// in it. It also provides serialization methods for the whole +// state which are datastore-independent. +type State struct { + ds ds.Datastore + codecHandle codec.Handle + namespace ds.Key + version int +} + +// DefaultHandle returns the codec handler of choice (Msgpack). +func DefaultHandle() codec.Handle { + h := &codec.MsgpackHandle{} + return h +} + +// New returns a new state using the given datastore. +// +// The version number is used to set the state version in the cases where the +// state is new. +// +// All keys are namespaced with the given string, allowing that this datastore +// can be sharded in different namespaces. +// +// The Handle controls options for the serialization of items and the state +// itself. +func New(dstore ds.Datastore, version int, namespace string, handle codec.Handle) (*State, error) { + if handle == nil { + handle = DefaultHandle() + } + + st := &State{ + ds: dstore, + codecHandle: handle, + namespace: ds.NewKey(namespace), + } + + curVersion := st.GetVersion() + if curVersion < 0 { + return nil, errors.New("error reading state version") + } + + // initialize + if curVersion == 0 { + err := st.SetVersion(version) + if err != nil { + return nil, err + } + } + return st, nil +} + +// Add adds a new Pin or replaces an existing one. +func (st *State) Add(ctx context.Context, c api.Pin) error { + _, span := trace.StartSpan(ctx, "state/dsstate/Add") + defer span.End() + + ps, err := st.serializePin(&c) + if err != nil { + return err + } + return st.ds.Put(st.key(c.Cid), ps) +} + +// Rm removes an existing Pin. It is a no-op when the +// item does not exist. +func (st *State) Rm(ctx context.Context, c cid.Cid) error { + _, span := trace.StartSpan(ctx, "state/dsstate/Rm") + defer span.End() + + err := st.ds.Delete(st.key(c)) + if err == ds.ErrNotFound { + return nil + } + return err +} + +// Get returns a Pin from the store and whether it +// was present. When not present, a default pin +// is returned. +func (st *State) Get(ctx context.Context, c cid.Cid) (api.Pin, bool) { + _, span := trace.StartSpan(ctx, "state/dsstate/Get") + defer span.End() + + v, err := st.ds.Get(st.key(c)) + if err != nil { + return api.PinCid(c), false + } + p, err := st.deserializePin(c, v) + if err != nil { + return api.PinCid(c), false + } + return *p, true +} + +// Has returns whether a Cid is stored. +func (st *State) Has(ctx context.Context, c cid.Cid) bool { + _, span := trace.StartSpan(ctx, "state/dsstate/Has") + defer span.End() + + ok, err := st.ds.Has(st.key(c)) + if err != nil { + logger.Error(err) + } + return ok && err == nil +} + +// List returns the unsorted list of all Pins that have been added to the +// datastore. +func (st *State) List(ctx context.Context) []api.Pin { + _, span := trace.StartSpan(ctx, "state/dsstate/List") + defer span.End() + + q := query.Query{ + Prefix: st.namespace.String(), + } + + results, err := st.ds.Query(q) + if err != nil { + return []api.Pin{} + } + defer results.Close() + + var pins []api.Pin + versionKey := st.versionKey() + + for r := range results.Next() { + if r.Error != nil { + logger.Errorf("error in query result: %s", r.Error) + return pins + } + k := ds.NewKey(r.Key) + if k.Equal(versionKey) { + continue + } + + ci, err := st.unkey(k) + if err != nil { + logger.Error("key: ", k, "error: ", err) + logger.Error(string(r.Value)) + continue + } + + p, err := st.deserializePin(ci, r.Value) + if err != nil { + logger.Errorf("error deserializing pin (%s): %s", r.Key, err) + continue + } + + pins = append(pins, *p) + } + return pins +} + +// Migrate migrates an older state version to the current one. +// This is a no-op for now. +func (st *State) Migrate(ctx context.Context, r io.Reader) error { + ctx, span := trace.StartSpan(ctx, "state/map/Migrate") + defer span.End() + return nil +} + +func (st *State) versionKey() ds.Key { + return st.namespace.Child(ds.NewKey("/version")) +} + +// GetVersion returns the current state version. +func (st *State) GetVersion() int { + v, err := st.ds.Get(st.versionKey()) + if err != nil { + if err == ds.ErrNotFound { + return 0 // fine + } + logger.Error("error getting version: ", err) + return -1 + } + if len(v) != 1 { + logger.Error("bad version length") + return -1 + } + return int(v[0]) +} + +// SetVersion allows to manually modify the state version. +func (st *State) SetVersion(v int) error { + err := st.ds.Put(st.versionKey(), []byte{byte(v)}) + if err != nil { + logger.Error("error storing version:", v) + return err + } + st.version = v + return nil +} + +type serialEntry struct { + Key string `codec:"k"` + Value []byte `codec:"v"` +} + +// Marshal dumps the state to a writer. It does this by encoding every +// key/value in the store. The keys are stored without the namespace part to +// reduce the size of the snapshot. +func (st *State) Marshal(w io.Writer) error { + q := query.Query{ + Prefix: st.namespace.String(), + } + + results, err := st.ds.Query(q) + if err != nil { + return err + } + defer results.Close() + + enc := codec.NewEncoder(w, st.codecHandle) + + for r := range results.Next() { + if r.Error != nil { + logger.Errorf("error in query result: %s", r.Error) + return r.Error + } + + k := ds.NewKey(r.Key) + // reduce snapshot size by not storing the prefix + err := enc.Encode(serialEntry{ + Key: k.BaseNamespace(), + Value: r.Value, + }) + if err != nil { + logger.Error(err) + return err + } + } + return nil +} + +// Unmarshal reads and parses a previous dump of the state. +// All the parsed key/values are added to the store. As of now, +// Unmarshal does not empty the existing store from any values +// before unmarshaling from the given reader. +func (st *State) Unmarshal(r io.Reader) error { + dec := codec.NewDecoder(r, st.codecHandle) + for { + var entry serialEntry + if err := dec.Decode(&entry); err == io.EOF { + break + } else if err != nil { + return err + } + k := st.namespace.Child(ds.NewKey(entry.Key)) + err := st.ds.Put(k, entry.Value) + if err != nil { + logger.Error("error adding unmarshaled key to datastore:", err) + return err + } + } + + return nil +} + +// convert Cid to /namespace/cidKey +func (st *State) key(c cid.Cid) ds.Key { + k := dshelp.CidToDsKey(c) + return st.namespace.Child(k) +} + +// convert /namespace/cidKey to Cid +func (st *State) unkey(k ds.Key) (cid.Cid, error) { + return dshelp.DsKeyToCid(ds.NewKey(k.BaseNamespace())) +} + +// this decides how a Pin object is serialized to be stored in the +// datastore. Changing this may require a migration! +func (st *State) serializePin(c *api.Pin) ([]byte, error) { + return c.ProtoMarshal() +} + +// this deserializes a Pin object from the datastore. It should be +// the exact opposite from serializePin. +func (st *State) deserializePin(c cid.Cid, buf []byte) (*api.Pin, error) { + p := &api.Pin{} + err := p.ProtoUnmarshal(buf) + p.Cid = c + return p, err +} diff --git a/state/interface.go b/state/interface.go index 6d4141f1..3dcea12a 100644 --- a/state/interface.go +++ b/state/interface.go @@ -30,7 +30,7 @@ type State interface { // Return the version of this state GetVersion() int // Marshal serializes the state to a byte slice - Marshal() ([]byte, error) + Marshal(io.Writer) error // Unmarshal deserializes the state from marshaled bytes - Unmarshal([]byte) error + Unmarshal(io.Reader) error } diff --git a/state/mapstate/map_state.go b/state/mapstate/map_state.go index 1332cd36..db936fe3 100644 --- a/state/mapstate/map_state.go +++ b/state/mapstate/map_state.go @@ -3,53 +3,56 @@ package mapstate import ( + "bufio" "bytes" "context" - "errors" "io" "io/ioutil" - "sync" - - msgpack "github.com/multiformats/go-multicodec/msgpack" - - cid "github.com/ipfs/go-cid" - logging "github.com/ipfs/go-log" "github.com/ipfs/ipfs-cluster/api" - "go.opencensus.io/trace" + "github.com/ipfs/ipfs-cluster/state" + "github.com/ipfs/ipfs-cluster/state/dsstate" + + cid "github.com/ipfs/go-cid" + ds "github.com/ipfs/go-datastore" + sync "github.com/ipfs/go-datastore/sync" + logging "github.com/ipfs/go-log" + + trace "go.opencensus.io/trace" ) // Version is the map state Version. States with old versions should // perform an upgrade before. -const Version = 5 +const Version = 6 var logger = logging.Logger("mapstate") -// MapState is a very simple database to store the state of the system -// using a Go map. It is thread safe. It implements the State interface. +// MapState is mostly a MapDatastore to store the pinset. type MapState struct { - pinMux sync.RWMutex - PinMap map[string]api.PinSerial - Version int + dst *dsstate.State } // NewMapState initializes the internal map and returns a new MapState object. -func NewMapState() *MapState { - return &MapState{ - PinMap: make(map[string]api.PinSerial), - Version: Version, +func NewMapState() state.State { + // We need to keep a custom Migrate until everyone is + // on version 6. Then we could fully remove all the + // wrapping struct and just return the dsstate directly. + + mapDs := ds.NewMapDatastore() + mutexDs := sync.MutexWrap(mapDs) + + dsSt, err := dsstate.New(mutexDs, Version, "", dsstate.DefaultHandle()) + if err != nil { + panic(err) } + return &MapState{dsSt} } // Add adds a Pin to the internal map. func (st *MapState) Add(ctx context.Context, c api.Pin) error { ctx, span := trace.StartSpan(ctx, "state/map/Add") defer span.End() - - st.pinMux.Lock() - defer st.pinMux.Unlock() - st.PinMap[c.Cid.String()] = c.ToSerial() - return nil + return st.dst.Add(ctx, c) } // Rm removes a Cid from the internal map. @@ -57,10 +60,7 @@ func (st *MapState) Rm(ctx context.Context, c cid.Cid) error { ctx, span := trace.StartSpan(ctx, "state/map/Rm") defer span.End() - st.pinMux.Lock() - defer st.pinMux.Unlock() - delete(st.PinMap, c.String()) - return nil + return st.dst.Rm(ctx, c) } // Get returns Pin information for a CID. @@ -72,131 +72,104 @@ func (st *MapState) Get(ctx context.Context, c cid.Cid) (api.Pin, bool) { ctx, span := trace.StartSpan(ctx, "state/map/Get") defer span.End() - if !c.Defined() { - return api.PinCid(c), false - } - st.pinMux.RLock() - defer st.pinMux.RUnlock() - pins, ok := st.PinMap[c.String()] - if !ok { // make sure no panics - return api.PinCid(c), false - } - return pins.ToPin(), true + return st.dst.Get(ctx, c) } // Has returns true if the Cid belongs to the State. func (st *MapState) Has(ctx context.Context, c cid.Cid) bool { ctx, span := trace.StartSpan(ctx, "state/map/Has") defer span.End() - - st.pinMux.RLock() - defer st.pinMux.RUnlock() - _, ok := st.PinMap[c.String()] - return ok + return st.dst.Has(ctx, c) } // List provides the list of tracked Pins. func (st *MapState) List(ctx context.Context) []api.Pin { ctx, span := trace.StartSpan(ctx, "state/map/List") defer span.End() - - st.pinMux.RLock() - defer st.pinMux.RUnlock() - cids := make([]api.Pin, 0, len(st.PinMap)) - for _, v := range st.PinMap { - if v.Cid == "" { - continue - } - cids = append(cids, v.ToPin()) - } - return cids + return st.dst.List(ctx) } // Migrate restores a snapshot from the state's internal bytes and if // necessary migrates the format to the current version. func (st *MapState) Migrate(ctx context.Context, r io.Reader) error { - ctx, span := trace.StartSpan(ctx, "state/map/Migrate") + _, span := trace.StartSpan(ctx, "state/map/Migrate") defer span.End() - bs, err := ioutil.ReadAll(r) + // TODO: Remove after migration to v6! + // Read the full state - Unfortunately there is no way to + // migrate v5 to v6 without doing this. + full, err := ioutil.ReadAll(r) + if err != nil { return err } - err = st.Unmarshal(bs) + + // Try to do our special unmarshal + buf := bytes.NewBuffer(full) + err = st.Unmarshal(buf) if err != nil { return err } - if st.Version == Version { // Unmarshal restored for us + + v := st.GetVersion() + + if v == Version { // Unmarshal worked return nil } - bytesNoVersion := bs[1:] // Restore is aware of encoding format - err = st.migrateFrom(st.Version, bytesNoVersion) + + // we need to migrate. Discard first byte. + buf2 := bytes.NewBuffer(full[1:]) + err = st.migrateFrom(v, buf2) if err != nil { return err } - st.Version = Version + st.dst.SetVersion(Version) return nil } // GetVersion returns the current version of this state object. // It is not necessarily up to date func (st *MapState) GetVersion() int { - return st.Version + return st.dst.GetVersion() } -// Marshal encodes the state using msgpack -func (st *MapState) Marshal() ([]byte, error) { - // FIXME: Re-enable this span when raft Marshable interface has contexts - //ctx, span := trace.StartSpan(ctx, "state/map/Marshal") - //defer span.End() - - logger.Debugf("Marshal-- Marshalling state of version %d", st.Version) - buf := new(bytes.Buffer) - enc := msgpack.Multicodec(msgpack.DefaultMsgpackHandle()).Encoder(buf) - if err := enc.Encode(st); err != nil { - return nil, err - } - // First byte indicates the version (probably should make this a varint - // if we stick to this encoding) - vCodec := make([]byte, 1) - vCodec[0] = byte(st.Version) - ret := append(vCodec, buf.Bytes()...) - //logger.Debugf("Marshal-- The final marshaled bytes: %x\n", ret) - return ret, nil +// Marshal encodes the state to the given writer. This implements +// go-libp2p-raft.Marshable. +func (st *MapState) Marshal(w io.Writer) error { + return st.dst.Marshal(w) } -// Unmarshal decodes the state using msgpack. It first decodes just -// the version number. If this is not the current version the bytes -// are stored within the state's internal reader, which can be migrated -// to the current version in a later call to restore. Note: Out of date -// version is not an error -func (st *MapState) Unmarshal(bs []byte) error { - // FIXME: Re-enable this span when raft Marshable interface has contexts - // ctx, span := trace.StartSpan(ctx, "state/map/Unmarshal") - // defer span.End() +// Unmarshal decodes the state from the given reader. This implements +// go-libp2p-raft.Marshable. +func (st *MapState) Unmarshal(r io.Reader) error { + // TODO: Re-do when on version 6. + // This is only to enable migration. - // Check version byte - // logger.Debugf("The incoming bytes to unmarshal: %x", bs) - if len(bs) < 1 { - return errors.New("cannot unmarshal from empty bytes") + // Extract the first byte in case this is an old + // state. + iobuf := bufio.NewReader(r) + vByte, err := iobuf.ReadByte() + if err != nil { + return err } - v := int(bs[0]) - logger.Debugf("The interpreted version: %d", v) - if v != Version { // snapshot is out of date - st.Version = v + err = iobuf.UnreadByte() + if err != nil { + return err + } + + // Try to unmarshal normally + err = st.dst.Unmarshal(iobuf) + if err == nil { return nil } - // snapshot is up to date - buf := bytes.NewBuffer(bs[1:]) - newState := MapState{} - dec := msgpack.Multicodec(msgpack.DefaultMsgpackHandle()).Decoder(buf) - err := dec.Decode(&newState) + // Error. Assume it's an old state + // and try to migrate it. + err = st.dst.SetVersion(int(vByte)) if err != nil { - logger.Error(err) + return err } - - st.PinMap = newState.PinMap - st.Version = newState.Version - return err + // We set the version but did not unmarshal. + // This signals that a migration is needed. + return nil } diff --git a/state/mapstate/map_state_test.go b/state/mapstate/map_state_test.go index 19325199..76f5728e 100644 --- a/state/mapstate/map_state_test.go +++ b/state/mapstate/map_state_test.go @@ -18,6 +18,7 @@ var testPeerID1, _ = peer.IDB58Decode("QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLe var c = api.Pin{ Cid: testCid1, + Type: api.DataType, Allocations: []peer.ID{testPeerID1}, MaxDepth: -1, PinOptions: api.PinOptions{ @@ -55,12 +56,21 @@ func TestGet(t *testing.T) { }() ms := NewMapState() ms.Add(ctx, c) - get, _ := ms.Get(ctx, c.Cid) - if get.Cid.String() != c.Cid.String() || - get.Allocations[0] != c.Allocations[0] || - get.ReplicationFactorMax != c.ReplicationFactorMax || + get, ok := ms.Get(ctx, c.Cid) + if !ok { + t.Fatal("not found") + } + if get.Cid.String() != c.Cid.String() { + t.Error("bad cid decoding: ", get.Cid) + } + + if get.Allocations[0] != c.Allocations[0] { + t.Error("bad allocations decoding:", get.Allocations) + } + + if get.ReplicationFactorMax != c.ReplicationFactorMax || get.ReplicationFactorMin != c.ReplicationFactorMin { - t.Error("returned something different") + t.Error("bad replication factors decoding") } } @@ -86,22 +96,29 @@ func TestMarshalUnmarshal(t *testing.T) { ctx := context.Background() ms := NewMapState() ms.Add(ctx, c) - b, err := ms.Marshal() + buf := new(bytes.Buffer) + err := ms.Marshal(buf) if err != nil { t.Fatal(err) } ms2 := NewMapState() - err = ms2.Unmarshal(b) + err = ms2.Unmarshal(buf) if err != nil { t.Fatal(err) } - if ms.Version != ms2.Version { + if ms.GetVersion() != ms2.GetVersion() { t.Fatal(err) } - get, _ := ms2.Get(ctx, c.Cid) + get, ok := ms2.Get(ctx, c.Cid) + if !ok { + t.Fatal("cannot get pin") + } if get.Allocations[0] != testPeerID1 { t.Error("expected different peer id") } + if !get.Cid.Equals(c.Cid) { + t.Error("expected different cid") + } } func TestMigrateFromV1(t *testing.T) { @@ -121,13 +138,14 @@ func TestMigrateFromV1(t *testing.T) { vCodec[0] = byte(v1State.Version) v1Bytes := append(vCodec, buf.Bytes()...) + buf2 := bytes.NewBuffer(v1Bytes) // Unmarshal first to check this is v1 ms := NewMapState() - err = ms.Unmarshal(v1Bytes) + err = ms.Unmarshal(buf2) if err != nil { t.Error(err) } - if ms.Version != 1 { + if ms.GetVersion() != 1 { t.Error("unmarshal picked up the wrong version") } // Migrate state to current version diff --git a/state/mapstate/migrate.go b/state/mapstate/migrate.go index 7970e981..d6600744 100644 --- a/state/mapstate/migrate.go +++ b/state/mapstate/migrate.go @@ -6,8 +6,9 @@ package mapstate // - add a case to the switch statement for the previous format version // - update the code copying the from mapStateVx to mapState import ( - "bytes" + "context" "errors" + "io" msgpack "github.com/multiformats/go-multicodec/msgpack" @@ -18,7 +19,7 @@ import ( // to other state formats type migrateable interface { next() migrateable - unmarshal([]byte) error + unmarshal(io.Reader) error } /* V1 */ @@ -29,9 +30,8 @@ type mapStateV1 struct { } // Unmarshal the serialization of a v1 state -func (st *mapStateV1) unmarshal(bs []byte) error { - buf := bytes.NewBuffer(bs) - dec := msgpack.Multicodec(msgpack.DefaultMsgpackHandle()).Decoder(buf) +func (st *mapStateV1) unmarshal(r io.Reader) error { + dec := msgpack.Multicodec(msgpack.DefaultMsgpackHandle()).Decoder(r) return dec.Decode(st) } @@ -63,9 +63,8 @@ type mapStateV2 struct { Version int } -func (st *mapStateV2) unmarshal(bs []byte) error { - buf := bytes.NewBuffer(bs) - dec := msgpack.Multicodec(msgpack.DefaultMsgpackHandle()).Decoder(buf) +func (st *mapStateV2) unmarshal(r io.Reader) error { + dec := msgpack.Multicodec(msgpack.DefaultMsgpackHandle()).Decoder(r) return dec.Decode(st) } @@ -99,9 +98,8 @@ type mapStateV3 struct { Version int } -func (st *mapStateV3) unmarshal(bs []byte) error { - buf := bytes.NewBuffer(bs) - dec := msgpack.Multicodec(msgpack.DefaultMsgpackHandle()).Decoder(buf) +func (st *mapStateV3) unmarshal(r io.Reader) error { + dec := msgpack.Multicodec(msgpack.DefaultMsgpackHandle()).Decoder(r) return dec.Decode(st) } @@ -137,9 +135,8 @@ type mapStateV4 struct { Version int } -func (st *mapStateV4) unmarshal(bs []byte) error { - buf := bytes.NewBuffer(bs) - dec := msgpack.Multicodec(msgpack.DefaultMsgpackHandle()).Decoder(buf) +func (st *mapStateV4) unmarshal(r io.Reader) error { + dec := msgpack.Multicodec(msgpack.DefaultMsgpackHandle()).Decoder(r) return dec.Decode(st) } @@ -170,48 +167,37 @@ func (st *mapStateV4) next() migrateable { /* V5 */ -// Uncomment for next migration -// type pinOptionsV5 struct { -// ReplicationFactorMin int `json:"replication_factor_min"` -// ReplicationFactorMax int `json:"replication_factor_max"` -// Name string `json:"name"` -// ShardSize uint64 `json:"shard_size"` -// } - -// type pinSerialV5 struct { -// pinOptionsV5 - -// Cid string `json:"cid"` -// Type uint64 `json:"type"` -// Allocations []string `json:"allocations"` -// MaxDepth int `json:"max_depth"` -// Reference string `json:"reference"` -// } - type mapStateV5 struct { - PinMap map[string]api.PinSerial + PinMap map[string]api.PinSerial // this has not changed Version int } -func (st *mapStateV5) unmarshal(bs []byte) error { - buf := bytes.NewBuffer(bs) - dec := msgpack.Multicodec(msgpack.DefaultMsgpackHandle()).Decoder(buf) +func (st *mapStateV5) unmarshal(r io.Reader) error { + dec := msgpack.Multicodec(msgpack.DefaultMsgpackHandle()).Decoder(r) return dec.Decode(st) } func (st *mapStateV5) next() migrateable { - return nil + v6 := NewMapState() + for _, v := range st.PinMap { + v6.Add(context.Background(), v.ToPin()) + } + return v6.(*MapState) +} + +// Last time we use this migration approach. +func (st *MapState) next() migrateable { return nil } +func (st *MapState) unmarshal(r io.Reader) error { + return st.dst.Unmarshal(r) } // Migrate code -func finalCopy(st *MapState, internal *mapStateV5) { - for k, v := range internal.PinMap { - st.PinMap[k] = v - } +func finalCopy(st *MapState, internal *MapState) { + st.dst = internal.dst } -func (st *MapState) migrateFrom(version int, snap []byte) error { +func (st *MapState) migrateFrom(version int, snap io.Reader) error { var m, next migrateable switch version { case 1: @@ -226,6 +212,9 @@ func (st *MapState) migrateFrom(version int, snap []byte) error { case 4: var mst4 mapStateV4 m = &mst4 + case 5: + var mst5 mapStateV5 + m = &mst5 default: return errors.New("version migration not supported") } @@ -238,11 +227,11 @@ func (st *MapState) migrateFrom(version int, snap []byte) error { for { next = m.next() if next == nil { - mst5, ok := m.(*mapStateV5) + mst6, ok := m.(*MapState) if !ok { return errors.New("migration ended prematurely") } - finalCopy(st, mst5) + finalCopy(st, mst6) return nil } m = next diff --git a/test/ipfs_mock.go b/test/ipfs_mock.go index e612b156..dad3445c 100644 --- a/test/ipfs_mock.go +++ b/test/ipfs_mock.go @@ -12,13 +12,13 @@ import ( "strings" "time" - "github.com/rs/cors" - "github.com/ipfs/ipfs-cluster/api" + "github.com/ipfs/ipfs-cluster/state" "github.com/ipfs/ipfs-cluster/state/mapstate" cid "github.com/ipfs/go-cid" u "github.com/ipfs/go-ipfs-util" + cors "github.com/rs/cors" ) // Some values used by the ipfs mock @@ -34,7 +34,7 @@ type IpfsMock struct { server *httptest.Server Addr string Port int - pinMap *mapstate.MapState + pinMap state.State BlockStore map[string][]byte }