Include Name as GlobalPinInfo key and consolidate redundant keys

GlobalPinInfo objects carried redundant information (Cid, Peer) that takes
space and time to serialize.

This has been addressed by having GlobalPinInfo embed PinInfoShort rather than
PinInfo. This new types ommits redundant fields.
This commit is contained in:
Hector Sanjuan 2020-05-16 00:32:28 +02:00
parent 4a0c8195eb
commit c026299b95
11 changed files with 268 additions and 186 deletions

View File

@ -490,16 +490,12 @@ func (wait *waitService) Status(ctx context.Context, in cid.Cid, out *types.Glob
if time.Now().After(wait.pinStart.Add(5 * time.Second)) { //pinned
*out = types.GlobalPinInfo{
Cid: in,
PeerMap: map[string]*types.PinInfo{
PeerMap: map[string]*types.PinInfoShort{
peer.Encode(test.PeerID1): {
Cid: in,
Peer: test.PeerID1,
Status: types.TrackerStatusPinned,
TS: wait.pinStart,
},
peer.Encode(test.PeerID2): {
Cid: in,
Peer: test.PeerID2,
Status: types.TrackerStatusPinned,
TS: wait.pinStart,
},
@ -508,16 +504,12 @@ func (wait *waitService) Status(ctx context.Context, in cid.Cid, out *types.Glob
} else { // pinning
*out = types.GlobalPinInfo{
Cid: in,
PeerMap: map[string]*types.PinInfo{
PeerMap: map[string]*types.PinInfoShort{
peer.Encode(test.PeerID1): {
Cid: in,
Peer: test.PeerID1,
Status: types.TrackerStatusPinning,
TS: wait.pinStart,
},
peer.Encode(test.PeerID2): {
Cid: in,
Peer: test.PeerID2,
Status: types.TrackerStatusPinned,
TS: wait.pinStart,
},

View File

@ -960,7 +960,7 @@ func (api *API) statusHandler(w http.ResponseWriter, r *http.Request) {
pin.Cid,
&pinInfo,
)
api.sendResponse(w, autoStatus, err, pinInfoToGlobal(&pinInfo))
api.sendResponse(w, autoStatus, err, pinInfo.ToGlobal())
} else {
var pinInfo types.GlobalPinInfo
err := api.rpcClient.CallContext(
@ -1019,7 +1019,7 @@ func (api *API) recoverHandler(w http.ResponseWriter, r *http.Request) {
pin.Cid,
&pinInfo,
)
api.sendResponse(w, autoStatus, err, pinInfoToGlobal(&pinInfo))
api.sendResponse(w, autoStatus, err, pinInfo.ToGlobal())
} else {
var pinInfo types.GlobalPinInfo
err := api.rpcClient.CallContext(
@ -1127,19 +1127,10 @@ func (api *API) parsePidOrError(w http.ResponseWriter, r *http.Request) peer.ID
return pid
}
func pinInfoToGlobal(pInfo *types.PinInfo) *types.GlobalPinInfo {
return &types.GlobalPinInfo{
Cid: pInfo.Cid,
PeerMap: map[string]*types.PinInfo{
peer.Encode(pInfo.Peer): pInfo,
},
}
}
func pinInfosToGlobal(pInfos []*types.PinInfo) []*types.GlobalPinInfo {
gPInfos := make([]*types.GlobalPinInfo, len(pInfos))
for i, p := range pInfos {
gPInfos[i] = pinInfoToGlobal(p)
gPInfos[i] = p.ToGlobal()
}
return gPInfos
}

View File

@ -247,10 +247,11 @@ var ipfsPinStatus2TrackerStatusMap = map[IPFSPinStatus]TrackerStatus{
// indexed by cluster peer.
type GlobalPinInfo struct {
Cid cid.Cid `json:"cid" codec:"c"`
Name string `json:"name" codec:"n"`
// https://github.com/golang/go/issues/28827
// Peer IDs are of string Kind(). We can't use peer IDs here
// as Go ignores TextMarshaler.
PeerMap map[string]*PinInfo `json:"peer_map" codec:"pm,omitempty"`
PeerMap map[string]*PinInfoShort `json:"peer_map" codec:"pm,omitempty"`
}
// String returns the string representation of a GlobalPinInfo.
@ -263,17 +264,47 @@ func (gpi *GlobalPinInfo) String() string {
return str
}
// PinInfo holds information about local pins.
type PinInfo struct {
Cid cid.Cid `json:"cid" codec:"c"`
Name string `json:"name" codec:"n,omitempty"`
Peer peer.ID `json:"peer" codec:"p,omitempty"`
// Add adds a PinInfo object to a GlobalPinInfo
func (gpi *GlobalPinInfo) Add(pi *PinInfo) {
if !gpi.Cid.Defined() {
gpi.Cid = pi.Cid
gpi.Name = pi.Name
}
if gpi.PeerMap == nil {
gpi.PeerMap = make(map[string]*PinInfoShort)
}
gpi.PeerMap[peer.Encode(pi.Peer)] = &pi.PinInfoShort
}
// PinInfoShort is a subset of PinInfo which is embedded in GlobalPinInfo
// objects and does not carry redundant information as PinInfo would.
type PinInfoShort struct {
PeerName string `json:"peername" codec:"pn,omitempty"`
Status TrackerStatus `json:"status" codec:"st,omitempty"`
TS time.Time `json:"timestamp" codec:"ts,omitempty"`
Error string `json:"error" codec:"e,omitempty"`
}
// PinInfo holds information about local pins. This is used by the Pin
// Trackers.
type PinInfo struct {
Cid cid.Cid `json:"cid" codec:"c"`
Name string `json:"name" codec:"m,omitempty"`
Peer peer.ID `json:"Peer" codec:"p,omitempty"`
PinInfoShort
}
// ToGlobal converts a PinInfo object to a GlobalPinInfo with
// a single peer corresponding to the given PinInfo.
func (pi *PinInfo) ToGlobal() *GlobalPinInfo {
gpi := GlobalPinInfo{}
gpi.Add(pi)
return &gpi
}
// Version holds version information
type Version struct {
Version string `json:"version" codec:"v"`

View File

@ -1646,15 +1646,18 @@ func (c *Cluster) getTrustedPeers(ctx context.Context) ([]peer.ID, error) {
return trustedPeers, nil
}
func setTrackerStatus(gpin *api.GlobalPinInfo, h cid.Cid, peers []peer.ID, status api.TrackerStatus, t time.Time) {
func setTrackerStatus(gpin *api.GlobalPinInfo, h cid.Cid, peers []peer.ID, status api.TrackerStatus, name string, t time.Time) {
for _, p := range peers {
gpin.PeerMap[peer.Encode(p)] = &api.PinInfo{
gpin.Add(&api.PinInfo{
Cid: h,
Name: name,
Peer: p,
PinInfoShort: api.PinInfoShort{
PeerName: p.String(),
Status: status,
TS: t,
}
},
})
}
}
@ -1662,34 +1665,58 @@ func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h c
ctx, span := trace.StartSpan(ctx, "cluster/globalPinInfoCid")
defer span.End()
gpin := &api.GlobalPinInfo{
Cid: h,
PeerMap: make(map[string]*api.PinInfo),
}
// The object we will return
gpin := &api.GlobalPinInfo{}
// allocated peers, we will contact them through rpc
var dests []peer.ID
// un-allocated peers, we will set remote status
var remote []peer.ID
timeNow := time.Now()
// set dests and remote
if c.config.FollowerMode {
// during follower mode return status only on self peer
dests = []peer.ID{c.host.ID()}
remote = []peer.ID{}
} else {
members, err := c.consensus.Peers(ctx)
if err != nil {
// If pin is not part of the pinset, mark it unpinned
pin, err := c.PinGet(ctx, h)
if err != nil && err != state.ErrNotFound {
logger.Error(err)
return nil, err
}
// If pin is not part of the pinset, mark it unpinned
pin, err := c.PinGet(ctx, h)
// When NotFound return directly with an unpinned
// status.
if err == state.ErrNotFound {
setTrackerStatus(gpin, h, members, api.TrackerStatusUnpinned, timeNow)
var members []peer.ID
if c.config.FollowerMode {
members = []peer.ID{c.host.ID()}
} else {
members, err = c.consensus.Peers(ctx)
if err != nil {
logger.Error(err)
return nil, err
}
}
setTrackerStatus(
gpin,
h,
members,
api.TrackerStatusUnpinned,
"",
timeNow,
)
return gpin, nil
}
// The pin exists.
gpin.Cid = h
gpin.Name = pin.Name
// Make the list of peers that will receive the request.
if c.config.FollowerMode {
// during follower mode return only local status.
dests = []peer.ID{c.host.ID()}
remote = []peer.ID{}
} else {
members, err := c.consensus.Peers(ctx)
if err != nil {
logger.Error(err)
return nil, err
@ -1705,7 +1732,7 @@ func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h c
}
// set status remote on un-allocated peers
setTrackerStatus(gpin, h, remote, api.TrackerStatusRemote, timeNow)
setTrackerStatus(gpin, h, remote, api.TrackerStatusRemote, pin.Name, timeNow)
lenDests := len(dests)
replies := make([]*api.PinInfo, lenDests)
@ -1726,7 +1753,7 @@ func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h c
// No error. Parse and continue
if e == nil {
gpin.PeerMap[peer.Encode(dests[i])] = r
gpin.Add(r)
continue
}
@ -1737,14 +1764,17 @@ func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h c
// Deal with error cases (err != nil): wrap errors in PinInfo
logger.Errorf("%s: error in broadcast response from %s: %s ", c.id, dests[i], e)
gpin.PeerMap[peer.Encode(dests[i])] = &api.PinInfo{
gpin.Add(&api.PinInfo{
Cid: h,
Name: pin.Name,
Peer: dests[i],
PinInfoShort: api.PinInfoShort{
PeerName: dests[i].String(),
Status: api.TrackerStatusClusterError,
TS: timeNow,
Error: e.Error(),
}
},
})
}
return gpin, nil
@ -1784,23 +1814,16 @@ func (c *Cluster) globalPinInfoSlice(ctx context.Context, comp, method string) (
rpcutil.CopyPinInfoSliceToIfaces(replies),
)
mergePins := func(pins []*api.PinInfo) {
for _, p := range pins {
setPinInfo := func(p *api.PinInfo) {
if p == nil {
continue
return
}
item, ok := fullMap[p.Cid]
info, ok := fullMap[p.Cid]
if !ok {
fullMap[p.Cid] = &api.GlobalPinInfo{
Cid: p.Cid,
PeerMap: map[string]*api.PinInfo{
peer.Encode(p.Peer): p,
},
}
} else {
item.PeerMap[peer.Encode(p.Peer)] = p
}
info = &api.GlobalPinInfo{}
fullMap[p.Cid] = info
}
info.Add(p)
}
erroredPeers := make(map[peer.ID]string)
@ -1812,21 +1835,27 @@ func (c *Cluster) globalPinInfoSlice(ctx context.Context, comp, method string) (
}
logger.Errorf("%s: error in broadcast response from %s: %s ", c.id, members[i], e)
erroredPeers[members[i]] = e.Error()
} else {
mergePins(r)
continue
}
for _, pin := range r {
setPinInfo(pin)
}
}
// Merge any errors
for p, msg := range erroredPeers {
for c := range fullMap {
fullMap[c].PeerMap[peer.Encode(p)] = &api.PinInfo{
setPinInfo(&api.PinInfo{
Cid: c,
Name: "",
Peer: p,
PinInfoShort: api.PinInfoShort{
Status: api.TrackerStatusClusterError,
TS: time.Now(),
Error: msg,
}
},
})
}
}

View File

@ -142,17 +142,15 @@ func textFormatPrintID(obj *api.ID) {
func textFormatPrintGPInfo(obj *api.GlobalPinInfo) {
var b strings.Builder
var name string
peers := make([]string, 0, len(obj.PeerMap))
for k, v := range obj.PeerMap {
for k := range obj.PeerMap {
peers = append(peers, k)
name = v.Name // All PinInfos will have the same name
}
sort.Strings(peers)
fmt.Fprintf(&b, "%s", obj.Cid)
if name != "" {
fmt.Fprintf(&b, " | %s", name)
if obj.Name != "" {
fmt.Fprintf(&b, " | %s", obj.Name)
}
b.WriteString(":\n")

View File

@ -458,7 +458,7 @@ func printStatusOnline(absPath, clusterName string) error {
}
}
pinInfo := gpi.PeerMap[pid]
printPin(gpi.Cid, pinInfo.Status.String(), pinInfo.Name, pinInfo.Error)
printPin(gpi.Cid, pinInfo.Status.String(), gpi.Name, pinInfo.Error)
}
return nil
}

View File

@ -830,7 +830,7 @@ func TestClustersStatusAll(t *testing.T) {
clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock)
h := test.Cid1
clusters[0].Pin(ctx, h, api.PinOptions{})
clusters[0].Pin(ctx, h, api.PinOptions{Name: "test"})
pinDelay()
// Global status
f := func(t *testing.T, c *Cluster) {
@ -844,6 +844,11 @@ func TestClustersStatusAll(t *testing.T) {
if !statuses[0].Cid.Equals(h) {
t.Error("bad cid in status")
}
if statuses[0].Name != "test" {
t.Error("globalPinInfo should have the name")
}
info := statuses[0].PeerMap
if len(info) != nClusters {
t.Error("bad info in status")
@ -877,7 +882,7 @@ func TestClustersStatusAllWithErrors(t *testing.T) {
clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock)
h := test.Cid1
clusters[0].Pin(ctx, h, api.PinOptions{})
clusters[0].Pin(ctx, h, api.PinOptions{Name: "test"})
pinDelay()
// shutdown 1 cluster peer
@ -899,6 +904,14 @@ func TestClustersStatusAllWithErrors(t *testing.T) {
t.Fatal("bad status. Expected one item")
}
if !statuses[0].Cid.Equals(h) {
t.Error("wrong Cid in globalPinInfo")
}
if statuses[0].Name != "test" {
t.Error("wrong Name in globalPinInfo")
}
// Raft and CRDT behave differently here
switch consensus {
case "raft":
@ -913,10 +926,6 @@ func TestClustersStatusAllWithErrors(t *testing.T) {
pid := peer.Encode(clusters[1].id)
errst := stts.PeerMap[pid]
if !errst.Cid.Equals(h) {
t.Error("errored pinInfo should have a good cid")
}
if errst.Status != api.TrackerStatusClusterError {
t.Error("erroring status should be set to ClusterError:", errst.Status)
}
@ -932,10 +941,6 @@ func TestClustersStatusAllWithErrors(t *testing.T) {
if pinfo.Status != api.TrackerStatusClusterError {
t.Error("erroring status should be ClusterError:", pinfo.Status)
}
if !pinfo.Cid.Equals(h) {
t.Error("errored status should have a good cid")
}
case "crdt":
// CRDT will not have contacted the offline peer because
// its metric expired and therefore is not in the

View File

@ -144,19 +144,23 @@ func (opt *OperationTracker) unsafePinInfo(ctx context.Context, op *Operation) a
return api.PinInfo{
Cid: cid.Undef,
Peer: opt.pid,
PinInfoShort: api.PinInfoShort{
PeerName: opt.peerName,
Status: api.TrackerStatusUnpinned,
TS: time.Now(),
Error: "",
},
}
}
return api.PinInfo{
Cid: op.Cid(),
Peer: opt.pid,
PinInfoShort: api.PinInfoShort{
PeerName: opt.peerName,
Status: op.ToTrackerStatus(),
TS: op.Timestamp(),
Error: op.Error(),
},
}
}

View File

@ -184,23 +184,31 @@ func TestPinTracker_StatusAll(t *testing.T) {
[]*api.PinInfo{
{
Cid: test.Cid1,
PinInfoShort: api.PinInfoShort{
Status: api.TrackerStatusPinned,
},
},
{
Cid: test.Cid2,
PinInfoShort: api.PinInfoShort{
Status: api.TrackerStatusRemote,
},
},
{
Cid: test.Cid3,
PinInfoShort: api.PinInfoShort{
Status: api.TrackerStatusRemote,
},
},
{
// in state but not on IPFS
Cid: test.Cid4,
PinInfoShort: api.PinInfoShort{
Status: api.TrackerStatusPinError,
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@ -275,9 +283,11 @@ func TestPinTracker_Status(t *testing.T) {
},
api.PinInfo{
Cid: test.Cid1,
PinInfoShort: api.PinInfoShort{
Status: api.TrackerStatusPinned,
},
},
},
{
"basic stateless status/unpinned",
args{
@ -286,9 +296,11 @@ func TestPinTracker_Status(t *testing.T) {
},
api.PinInfo{
Cid: test.Cid5,
PinInfoShort: api.PinInfoShort{
Status: api.TrackerStatusUnpinned,
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@ -323,25 +335,33 @@ func TestPinTracker_RecoverAll(t *testing.T) {
[]*api.PinInfo{
{
Cid: test.Cid1,
PinInfoShort: api.PinInfoShort{
Status: api.TrackerStatusPinned,
},
},
{
Cid: test.Cid2,
PinInfoShort: api.PinInfoShort{
Status: api.TrackerStatusRemote,
},
},
{
Cid: test.Cid3,
PinInfoShort: api.PinInfoShort{
Status: api.TrackerStatusRemote,
},
},
{
// This will recover and status
// is ignored as it could come back as
// queued, pinning or error.
Cid: test.Cid4,
PinInfoShort: api.PinInfoShort{
Status: api.TrackerStatusPinError,
},
},
},
false,
},
}
@ -400,8 +420,10 @@ func TestPinTracker_Recover(t *testing.T) {
},
api.PinInfo{
Cid: test.Cid1,
PinInfoShort: api.PinInfoShort{
Status: api.TrackerStatusPinned,
},
},
false,
},
}
@ -439,8 +461,10 @@ func TestUntrackTrack(t *testing.T) {
},
api.PinInfo{
Cid: test.Cid1,
PinInfoShort: api.PinInfoShort{
Status: api.TrackerStatusPinned,
},
},
false,
},
}
@ -480,8 +504,10 @@ func TestTrackUntrackWithCancel(t *testing.T) {
},
api.PinInfo{
Cid: test.SlowCid1,
PinInfoShort: api.PinInfoShort{
Status: api.TrackerStatusPinned,
},
},
false,
},
}

View File

@ -307,8 +307,10 @@ func (spt *Tracker) Status(ctx context.Context, c cid.Cid) *api.PinInfo {
pinInfo := &api.PinInfo{
Cid: c,
Peer: spt.peerID,
PinInfoShort: api.PinInfoShort{
PeerName: spt.peerName,
TS: time.Now(),
},
}
// check global state to see if cluster should even be caring about
@ -452,9 +454,11 @@ func (spt *Tracker) ipfsStatusAll(ctx context.Context) (map[string]*api.PinInfo,
Cid: c,
Name: "", // to be filled later
Peer: spt.peerID,
PinInfoShort: api.PinInfoShort{
PeerName: spt.peerName,
Status: ips.ToTrackerStatus(),
TS: time.Now(),
},
}
pins[cidstr] = p
}
@ -498,8 +502,10 @@ func (spt *Tracker) localStatus(ctx context.Context, incExtra bool) (map[string]
Cid: p.Cid,
Name: p.Name,
Peer: spt.peerID,
PinInfoShort: api.PinInfoShort{
PeerName: spt.peerName,
TS: time.Now(),
},
}
switch {

View File

@ -224,10 +224,8 @@ func (mock *mockCluster) StatusAll(ctx context.Context, in struct{}, out *[]*api
*out = []*api.GlobalPinInfo{
{
Cid: Cid1,
PeerMap: map[string]*api.PinInfo{
PeerMap: map[string]*api.PinInfoShort{
pid: {
Cid: Cid1,
Peer: PeerID1,
Status: api.TrackerStatusPinned,
TS: time.Now(),
},
@ -235,10 +233,8 @@ func (mock *mockCluster) StatusAll(ctx context.Context, in struct{}, out *[]*api
},
{
Cid: Cid2,
PeerMap: map[string]*api.PinInfo{
PeerMap: map[string]*api.PinInfoShort{
pid: {
Cid: Cid2,
Peer: PeerID1,
Status: api.TrackerStatusPinning,
TS: time.Now(),
},
@ -246,10 +242,8 @@ func (mock *mockCluster) StatusAll(ctx context.Context, in struct{}, out *[]*api
},
{
Cid: Cid3,
PeerMap: map[string]*api.PinInfo{
PeerMap: map[string]*api.PinInfoShort{
pid: {
Cid: Cid3,
Peer: PeerID1,
Status: api.TrackerStatusPinError,
TS: time.Now(),
},
@ -269,10 +263,8 @@ func (mock *mockCluster) Status(ctx context.Context, in cid.Cid, out *api.Global
}
*out = api.GlobalPinInfo{
Cid: in,
PeerMap: map[string]*api.PinInfo{
PeerMap: map[string]*api.PinInfoShort{
peer.Encode(PeerID1): {
Cid: in,
Peer: PeerID1,
Status: api.TrackerStatusPinned,
TS: time.Now(),
},
@ -364,15 +356,19 @@ func (mock *mockPinTracker) StatusAll(ctx context.Context, in struct{}, out *[]*
{
Cid: Cid1,
Peer: PeerID1,
PinInfoShort: api.PinInfoShort{
Status: api.TrackerStatusPinned,
TS: time.Now(),
},
},
{
Cid: Cid3,
Peer: PeerID1,
PinInfoShort: api.PinInfoShort{
Status: api.TrackerStatusPinError,
TS: time.Now(),
},
},
}
return nil
}
@ -385,8 +381,10 @@ func (mock *mockPinTracker) Status(ctx context.Context, in cid.Cid, out *api.Pin
*out = api.PinInfo{
Cid: in,
Peer: PeerID2,
PinInfoShort: api.PinInfoShort{
Status: api.TrackerStatusPinned,
TS: time.Now(),
},
}
return nil
}
@ -400,8 +398,10 @@ func (mock *mockPinTracker) Recover(ctx context.Context, in cid.Cid, out *api.Pi
*out = api.PinInfo{
Cid: in,
Peer: PeerID1,
PinInfoShort: api.PinInfoShort{
Status: api.TrackerStatusPinned,
TS: time.Now(),
},
}
return nil
}