ipfs-cluster/consensus/raft/log_op.go
Hector Sanjuan b852dfa892 Fix #219: WIP: Remove duplicate peer accounting
This change removes the duplicities of the PeerManager component:

* No more commiting PeerAdd and PeerRm log entries
* The Raft peer set is the source of truth
* Basic broadcasting is used to communicate peer multiaddresses
  in the cluster
* A peer can only be added in a healthy cluster
* A peer can be removed from any cluster which can still commit
* This also adds support for multiple multiaddresses per peer

License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
2017-11-08 20:04:04 +01:00

78 lines
1.7 KiB
Go

package raft
import (
"errors"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/state"
consensus "github.com/libp2p/go-libp2p-consensus"
)
// Type of consensus operation
const (
LogOpPin = iota + 1
LogOpUnpin
)
// LogOpType expresses the type of a consensus Operation
type LogOpType int
// LogOp represents an operation for the OpLogConsensus system.
// It implements the consensus.Op interface and it is used by the
// Consensus component.
type LogOp struct {
Cid api.PinSerial
Type LogOpType
consensus *Consensus
}
// ApplyTo applies the operation to the State
func (op *LogOp) ApplyTo(cstate consensus.State) (consensus.State, error) {
state, ok := cstate.(state.State)
var err error
if !ok {
// Should never be here
panic("received unexpected state type")
}
switch op.Type {
case LogOpPin:
err = state.Add(op.Cid.ToPin())
if err != nil {
goto ROLLBACK
}
// Async, we let the PinTracker take care of any problems
op.consensus.rpcClient.Go("",
"Cluster",
"Track",
op.Cid,
&struct{}{},
nil)
case LogOpUnpin:
err = state.Rm(op.Cid.ToPin().Cid)
if err != nil {
goto ROLLBACK
}
// Async, we let the PinTracker take care of any problems
op.consensus.rpcClient.Go("",
"Cluster",
"Untrack",
op.Cid,
&struct{}{},
nil)
default:
logger.Error("unknown LogOp type. Ignoring")
}
return state, nil
ROLLBACK:
// We failed to apply the operation to the state
// and therefore we need to request a rollback to the
// cluster to the previous state. This operation can only be performed
// by the cluster leader.
logger.Error("Rollbacks are not implemented")
return nil, errors.New("a rollback may be necessary. Reason: " + err.Error())
}