ipfs-cluster/cmd/ipfs-cluster-service/state.go
Adrian Lanzafame 3b3f786d68
add opencensus tracing and metrics
This commit adds support for OpenCensus tracing
and metrics collection. This required support for
context.Context propogation throughout the cluster
codebase, and in particular, the ipfscluster component
interfaces.

The tracing propogates across RPC and HTTP boundaries.
The current default tracing backend is Jaeger.

The metrics currently exports the metrics exposed by
the opencensus http plugin as well as the pprof metrics
to a prometheus endpoint for scraping.
The current default metrics backend is Prometheus.

Metrics are currently exposed by default due to low
overhead, can be turned off if desired, whereas tracing
is off by default as it has a much higher performance
overhead, though the extent of the performance hit can be
adjusted with smaller sampling rates.

License: MIT
Signed-off-by: Adrian Lanzafame <adrianlanzafame92@gmail.com>
2019-02-04 18:53:21 +10:00

196 lines
5.2 KiB
Go

package main
import (
"bytes"
"context"
"encoding/json"
"errors"
"io"
"io/ioutil"
ipfscluster "github.com/ipfs/ipfs-cluster"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/consensus/raft"
"github.com/ipfs/ipfs-cluster/pstoremgr"
"github.com/ipfs/ipfs-cluster/state/mapstate"
"go.opencensus.io/trace"
)
var errNoSnapshot = errors.New("no snapshot found")
func upgrade(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "daemon/upgrade")
defer span.End()
newState, current, err := restoreStateFromDisk(ctx)
if err != nil {
return err
}
if current {
logger.Warning("Skipping migration of up-to-date state")
return nil
}
cfgMgr, cfgs := makeConfigs()
err = cfgMgr.LoadJSONFromFile(configPath)
if err != nil {
return err
}
pm := pstoremgr.New(nil, cfgs.clusterCfg.GetPeerstorePath())
raftPeers := append(ipfscluster.PeersFromMultiaddrs(pm.LoadPeerstore()), cfgs.clusterCfg.ID)
return raft.SnapshotSave(cfgs.consensusCfg, newState, raftPeers)
}
func export(ctx context.Context, w io.Writer) error {
ctx, span := trace.StartSpan(ctx, "daemon/export")
defer span.End()
stateToExport, _, err := restoreStateFromDisk(ctx)
if err != nil {
return err
}
return exportState(ctx, stateToExport, w)
}
// restoreStateFromDisk returns a mapstate containing the latest
// snapshot, a flag set to true when the state format has the
// current version and an error
func restoreStateFromDisk(ctx context.Context) (*mapstate.MapState, bool, error) {
ctx, span := trace.StartSpan(ctx, "daemon/restoreStateFromDisk")
defer span.End()
cfgMgr, cfgs := makeConfigs()
err := cfgMgr.LoadJSONFromFile(configPath)
if err != nil {
return nil, false, err
}
r, snapExists, err := raft.LastStateRaw(cfgs.consensusCfg)
if !snapExists {
err = errNoSnapshot
}
if err != nil {
return nil, false, err
}
stateFromSnap := mapstate.NewMapState()
// duplicate reader to both check version and migrate
var buf bytes.Buffer
r2 := io.TeeReader(r, &buf)
raw, err := ioutil.ReadAll(r2)
if err != nil {
return nil, false, err
}
err = stateFromSnap.Unmarshal(raw)
if err != nil {
return nil, false, err
}
if stateFromSnap.GetVersion() == mapstate.Version {
return stateFromSnap, true, nil
}
err = stateFromSnap.Migrate(ctx, &buf)
if err != nil {
return nil, false, err
}
return stateFromSnap, false, nil
}
func stateImport(ctx context.Context, r io.Reader) error {
ctx, span := trace.StartSpan(ctx, "daemon/stateImport")
defer span.End()
cfgMgr, cfgs := makeConfigs()
err := cfgMgr.LoadJSONFromFile(configPath)
if err != nil {
return err
}
pinSerials := make([]api.PinSerial, 0)
dec := json.NewDecoder(r)
err = dec.Decode(&pinSerials)
if err != nil {
return err
}
stateToImport := mapstate.NewMapState()
for _, pS := range pinSerials {
err = stateToImport.Add(ctx, pS.ToPin())
if err != nil {
return err
}
}
pm := pstoremgr.New(nil, cfgs.clusterCfg.GetPeerstorePath())
raftPeers := append(ipfscluster.PeersFromMultiaddrs(pm.LoadPeerstore()), cfgs.clusterCfg.ID)
return raft.SnapshotSave(cfgs.consensusCfg, stateToImport, raftPeers)
}
func validateVersion(ctx context.Context, cfg *ipfscluster.Config, cCfg *raft.Config) error {
ctx, span := trace.StartSpan(ctx, "daemon/validateVersion")
defer span.End()
state := mapstate.NewMapState()
r, snapExists, err := raft.LastStateRaw(cCfg)
if !snapExists && err != nil {
logger.Error("error before reading latest snapshot.")
} else if snapExists && err != nil {
logger.Error("error after reading last snapshot. Snapshot potentially corrupt.")
} else if snapExists && err == nil {
raw, err2 := ioutil.ReadAll(r)
if err2 != nil {
return err2
}
err2 = state.Unmarshal(raw)
if err2 != nil {
logger.Error("error unmarshalling snapshot. Snapshot potentially corrupt.")
return err2
}
if state.GetVersion() != mapstate.Version {
logger.Error("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
logger.Error("Out of date ipfs-cluster state is saved.")
logger.Error("To migrate to the new version, run ipfs-cluster-service state upgrade.")
logger.Error("To launch a node without this state, rename the consensus data directory.")
logger.Error("Hint, the default is .ipfs-cluster/ipfs-cluster-data.")
logger.Error("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
err = errors.New("outdated state version stored")
}
} // !snapExists && err == nil // no existing state, no check needed
return err
}
// ExportState saves a json representation of a state
func exportState(ctx context.Context, state *mapstate.MapState, w io.Writer) error {
ctx, span := trace.StartSpan(ctx, "daemon/exportState")
defer span.End()
// Serialize pins
pins := state.List(ctx)
pinSerials := make([]api.PinSerial, len(pins), len(pins))
for i, pin := range pins {
pinSerials[i] = pin.ToSerial()
}
// Write json to output file
enc := json.NewEncoder(w)
enc.SetIndent("", " ")
return enc.Encode(pinSerials)
}
// CleanupState cleans the state
func cleanupState(cCfg *raft.Config) error {
err := raft.CleanupRaft(cCfg.GetDataFolder(), cCfg.BackupsRotate)
if err == nil {
logger.Warningf("the %s folder has been rotated. Next start will use an empty state", cCfg.GetDataFolder())
}
return err
}