globalPinInfo should contain entries for all peers except for

followermode
This commit is contained in:
Kishan Mohanbhai Sagathiya 2019-11-05 19:09:15 +05:30
parent fdb47292d0
commit 3a7addc946
2 changed files with 64 additions and 21 deletions

View File

@ -1623,38 +1623,56 @@ func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h c
ctx, span := trace.StartSpan(ctx, "cluster/globalPinInfoCid") ctx, span := trace.StartSpan(ctx, "cluster/globalPinInfoCid")
defer span.End() defer span.End()
pin, err := c.PinGet(ctx, h) var dests []peer.ID
if err != nil { var err error
return nil, err var allocatedEverywhere bool
if c.config.FollowerMode {
// during follower mode return status only on self peer
dests = []peer.ID{c.host.ID()}
} else {
pin, err := c.PinGet(ctx, h)
if err != nil {
logger.Error(err)
return nil, err
}
if len(pin.Allocations) > 0 {
dests = pin.Allocations
} else {
dests, err = c.consensus.Peers(ctx)
if err != nil {
logger.Error(err)
return nil, err
}
allocatedEverywhere = true
}
} }
gpin := &api.GlobalPinInfo{ lenDests := len(dests)
Cid: h, replies := make([]*api.PinInfo, lenDests, lenDests)
PeerMap: make(map[string]*api.PinInfo), ctxs, cancels := rpcutil.CtxsWithCancel(ctx, lenDests)
}
members := pin.Allocations
lenMembers := len(members)
replies := make([]*api.PinInfo, lenMembers, lenMembers)
ctxs, cancels := rpcutil.CtxsWithCancel(ctx, lenMembers)
defer rpcutil.MultiCancel(cancels) defer rpcutil.MultiCancel(cancels)
errs := c.rpcClient.MultiCall( errs := c.rpcClient.MultiCall(
ctxs, ctxs,
members, dests,
comp, comp,
method, method,
h, h,
rpcutil.CopyPinInfoToIfaces(replies), rpcutil.CopyPinInfoToIfaces(replies),
) )
gpin := &api.GlobalPinInfo{
Cid: h,
PeerMap: make(map[string]*api.PinInfo),
}
for i, r := range replies { for i, r := range replies {
e := errs[i] e := errs[i]
// No error. Parse and continue // No error. Parse and continue
if e == nil { if e == nil {
gpin.PeerMap[peer.IDB58Encode(members[i])] = r gpin.PeerMap[peer.IDB58Encode(dests[i])] = r
continue continue
} }
@ -1664,17 +1682,44 @@ func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h c
} }
// Deal with error cases (err != nil): wrap errors in PinInfo // Deal with error cases (err != nil): wrap errors in PinInfo
logger.Errorf("%s: error in broadcast response from %s: %s ", c.id, members[i], e) logger.Errorf("%s: error in broadcast response from %s: %s ", c.id, dests[i], e)
gpin.PeerMap[peer.IDB58Encode(members[i])] = &api.PinInfo{ gpin.PeerMap[peer.IDB58Encode(dests[i])] = &api.PinInfo{
Cid: h, Cid: h,
Peer: members[i], Peer: dests[i],
PeerName: members[i].String(), PeerName: dests[i].String(),
Status: api.TrackerStatusClusterError, Status: api.TrackerStatusClusterError,
TS: time.Now(), TS: time.Now(),
Error: e.Error(), Error: e.Error(),
} }
} }
if c.config.FollowerMode || allocatedEverywhere {
return gpin, nil
}
// set status remote on un-allocated peers
members, err := c.consensus.Peers(ctx)
if err != nil {
logger.Error(err)
return nil, err
}
for _, member := range members {
id := peer.IDB58Encode(member)
_, ok := gpin.PeerMap[id]
if ok {
continue
}
gpin.PeerMap[id] = &api.PinInfo{
Cid: h,
Peer: member,
PeerName: member.String(),
Status: api.TrackerStatusRemote,
TS: time.Now(),
}
}
return gpin, nil return gpin, nil
} }

2
go.mod
View File

@ -55,8 +55,6 @@ require (
github.com/libp2p/go-libp2p-pubsub v0.1.1 github.com/libp2p/go-libp2p-pubsub v0.1.1
github.com/libp2p/go-libp2p-raft v0.1.3 github.com/libp2p/go-libp2p-raft v0.1.3
github.com/libp2p/go-ws-transport v0.1.2 github.com/libp2p/go-ws-transport v0.1.2
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/multiformats/go-multiaddr v0.1.1 github.com/multiformats/go-multiaddr v0.1.1
github.com/multiformats/go-multiaddr-dns v0.1.1 github.com/multiformats/go-multiaddr-dns v0.1.1
github.com/multiformats/go-multiaddr-net v0.1.0 github.com/multiformats/go-multiaddr-net v0.1.0