Make it clearer
This commit is contained in:
parent
108fcff8a9
commit
0e7ed97e59
67
cluster.go
67
cluster.go
|
@ -1678,6 +1678,18 @@ func (c *Cluster) getTrustedPeers(ctx context.Context) ([]peer.ID, error) {
|
||||||
return trustedPeers, nil
|
return trustedPeers, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func setTrackerStatus(gpin *api.GlobalPinInfo, h cid.Cid, peers []peer.ID, status api.TrackerStatus, t time.Time) {
|
||||||
|
for _, p := range peers {
|
||||||
|
gpin.PeerMap[peer.IDB58Encode(p)] = &api.PinInfo{
|
||||||
|
Cid: h,
|
||||||
|
Peer: p,
|
||||||
|
PeerName: p.String(),
|
||||||
|
Status: status,
|
||||||
|
TS: t,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h cid.Cid) (*api.GlobalPinInfo, error) {
|
func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h cid.Cid) (*api.GlobalPinInfo, error) {
|
||||||
ctx, span := trace.StartSpan(ctx, "cluster/globalPinInfoCid")
|
ctx, span := trace.StartSpan(ctx, "cluster/globalPinInfoCid")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
@ -1687,15 +1699,17 @@ func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h c
|
||||||
PeerMap: make(map[string]*api.PinInfo),
|
PeerMap: make(map[string]*api.PinInfo),
|
||||||
}
|
}
|
||||||
var dests []peer.ID
|
var dests []peer.ID
|
||||||
var members []peer.ID
|
var remote []peer.ID
|
||||||
var err error
|
|
||||||
timeNow := time.Now()
|
timeNow := time.Now()
|
||||||
|
|
||||||
|
// set dests (allocated peers, we will contact them through rpc) and remote (un-allocated peers,
|
||||||
|
// we will set remote status)
|
||||||
if c.config.FollowerMode {
|
if c.config.FollowerMode {
|
||||||
// during follower mode return status only on self peer
|
// during follower mode return status only on self peer
|
||||||
dests = []peer.ID{c.host.ID()}
|
dests = []peer.ID{c.host.ID()}
|
||||||
|
remote = []peer.ID{}
|
||||||
} else {
|
} else {
|
||||||
members, err = c.consensus.Peers(ctx)
|
members, err := c.consensus.Peers(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error(err)
|
logger.Error(err)
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -1703,31 +1717,27 @@ func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h c
|
||||||
|
|
||||||
// If pin is not part of the pinset, mark it unpinned
|
// If pin is not part of the pinset, mark it unpinned
|
||||||
pin, err := c.PinGet(ctx, h)
|
pin, err := c.PinGet(ctx, h)
|
||||||
if err != nil {
|
if err == state.ErrNotFound {
|
||||||
if err != state.ErrNotFound {
|
setTrackerStatus(gpin, h, members, api.TrackerStatusUnpinned, timeNow)
|
||||||
logger.Error(err)
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, member := range members {
|
|
||||||
gpin.PeerMap[peer.IDB58Encode(member)] = &api.PinInfo{
|
|
||||||
Cid: h,
|
|
||||||
Peer: member,
|
|
||||||
PeerName: member.String(),
|
|
||||||
Status: api.TrackerStatusUnpinned,
|
|
||||||
TS: timeNow,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return gpin, nil
|
return gpin, nil
|
||||||
}
|
}
|
||||||
|
if err != nil {
|
||||||
|
logger.Error(err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
if len(pin.Allocations) > 0 {
|
if len(pin.Allocations) > 0 {
|
||||||
dests = pin.Allocations
|
dests = pin.Allocations
|
||||||
|
remote = peersSubtract(members, dests)
|
||||||
} else {
|
} else {
|
||||||
dests = members
|
dests = members
|
||||||
|
remote = []peer.ID{}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// set status remote on un-allocated peers
|
||||||
|
setTrackerStatus(gpin, h, remote, api.TrackerStatusRemote, timeNow)
|
||||||
|
|
||||||
lenDests := len(dests)
|
lenDests := len(dests)
|
||||||
replies := make([]*api.PinInfo, lenDests, lenDests)
|
replies := make([]*api.PinInfo, lenDests, lenDests)
|
||||||
ctxs, cancels := rpcutil.CtxsWithCancel(ctx, lenDests)
|
ctxs, cancels := rpcutil.CtxsWithCancel(ctx, lenDests)
|
||||||
|
@ -1768,27 +1778,6 @@ func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h c
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if c.config.FollowerMode || len(dests) == len(members) {
|
|
||||||
return gpin, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// set status remote on un-allocated peers
|
|
||||||
for _, member := range members {
|
|
||||||
id := peer.IDB58Encode(member)
|
|
||||||
_, ok := gpin.PeerMap[id]
|
|
||||||
if ok {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
gpin.PeerMap[id] = &api.PinInfo{
|
|
||||||
Cid: h,
|
|
||||||
Peer: member,
|
|
||||||
PeerName: member.String(),
|
|
||||||
Status: api.TrackerStatusRemote,
|
|
||||||
TS: timeNow,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return gpin, nil
|
return gpin, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
22
util.go
22
util.go
|
@ -146,3 +146,25 @@ func xor(a, b distance) distance {
|
||||||
}
|
}
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// peersSubtract subtracts peers ID slice b from peers ID slice a.
|
||||||
|
func peersSubtract(a []peer.ID, b []peer.ID) []peer.ID {
|
||||||
|
lenb := len(b)
|
||||||
|
result := make([]peer.ID, len(a)-lenb)
|
||||||
|
bMap := make(map[peer.ID]struct{}, lenb)
|
||||||
|
|
||||||
|
for _, p := range b {
|
||||||
|
bMap[p] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, p := range a {
|
||||||
|
_, ok := bMap[p]
|
||||||
|
if ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
result = append(result, p)
|
||||||
|
}
|
||||||
|
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user