ipfs-cluster/consensus/raft/log_op.go

77 lines
1.7 KiB
Go
Raw Normal View History

package raft
import (
"errors"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/state"
consensus "github.com/libp2p/go-libp2p-consensus"
)
// Type of consensus operation
const (
LogOpPin = iota + 1
LogOpUnpin
)
// LogOpType expresses the type of a consensus Operation
type LogOpType int
// LogOp represents an operation for the OpLogConsensus system.
// It implements the consensus.Op interface and it is used by the
// Consensus component.
type LogOp struct {
Cid api.PinSerial
Type LogOpType
consensus *Consensus
}
// ApplyTo applies the operation to the State
func (op *LogOp) ApplyTo(cstate consensus.State) (consensus.State, error) {
state, ok := cstate.(state.State)
var err error
if !ok {
// Should never be here
panic("received unexpected state type")
}
switch op.Type {
case LogOpPin:
err = state.Add(op.Cid.ToPin())
if err != nil {
goto ROLLBACK
}
// Async, we let the PinTracker take care of any problems
op.consensus.rpcClient.Go("",
"Cluster",
"Track",
op.Cid,
&struct{}{},
nil)
case LogOpUnpin:
err = state.Rm(op.Cid.ToPin().Cid)
if err != nil {
goto ROLLBACK
}
// Async, we let the PinTracker take care of any problems
op.consensus.rpcClient.Go("",
"Cluster",
"Untrack",
op.Cid,
&struct{}{},
nil)
default:
logger.Error("unknown LogOp type. Ignoring")
}
return state, nil
ROLLBACK:
// We failed to apply the operation to the state
// and therefore we need to request a rollback to the
// cluster to the previous state. This operation can only be performed
// by the cluster leader.
logger.Error("Rollbacks are not implemented")
return nil, errors.New("a rollback may be necessary. Reason: " + err.Error())
}