ipfs-cluster/state/mapstate/map_state.go
Adrian Lanzafame 3b3f786d68
add opencensus tracing and metrics
This commit adds support for OpenCensus tracing
and metrics collection. This required support for
context.Context propogation throughout the cluster
codebase, and in particular, the ipfscluster component
interfaces.

The tracing propogates across RPC and HTTP boundaries.
The current default tracing backend is Jaeger.

The metrics currently exports the metrics exposed by
the opencensus http plugin as well as the pprof metrics
to a prometheus endpoint for scraping.
The current default metrics backend is Prometheus.

Metrics are currently exposed by default due to low
overhead, can be turned off if desired, whereas tracing
is off by default as it has a much higher performance
overhead, though the extent of the performance hit can be
adjusted with smaller sampling rates.

License: MIT
Signed-off-by: Adrian Lanzafame <adrianlanzafame92@gmail.com>
2019-02-04 18:53:21 +10:00

203 lines
5.4 KiB
Go

// Package mapstate implements the State interface for IPFS Cluster by using
// a map to keep track of the consensus-shared state.
package mapstate
import (
"bytes"
"context"
"errors"
"io"
"io/ioutil"
"sync"
msgpack "github.com/multiformats/go-multicodec/msgpack"
cid "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
"github.com/ipfs/ipfs-cluster/api"
"go.opencensus.io/trace"
)
// Version is the map state Version. States with old versions should
// perform an upgrade before.
const Version = 5
var logger = logging.Logger("mapstate")
// MapState is a very simple database to store the state of the system
// using a Go map. It is thread safe. It implements the State interface.
type MapState struct {
pinMux sync.RWMutex
PinMap map[string]api.PinSerial
Version int
}
// NewMapState initializes the internal map and returns a new MapState object.
func NewMapState() *MapState {
return &MapState{
PinMap: make(map[string]api.PinSerial),
Version: Version,
}
}
// Add adds a Pin to the internal map.
func (st *MapState) Add(ctx context.Context, c api.Pin) error {
ctx, span := trace.StartSpan(ctx, "state/map/Add")
defer span.End()
st.pinMux.Lock()
defer st.pinMux.Unlock()
st.PinMap[c.Cid.String()] = c.ToSerial()
return nil
}
// Rm removes a Cid from the internal map.
func (st *MapState) Rm(ctx context.Context, c cid.Cid) error {
ctx, span := trace.StartSpan(ctx, "state/map/Rm")
defer span.End()
st.pinMux.Lock()
defer st.pinMux.Unlock()
delete(st.PinMap, c.String())
return nil
}
// Get returns Pin information for a CID.
// The returned object has its Cid and Allocations
// fields initialized, regardless of the
// presence of the provided Cid in the state.
// To check the presence, use MapState.Has(cid.Cid).
func (st *MapState) Get(ctx context.Context, c cid.Cid) (api.Pin, bool) {
ctx, span := trace.StartSpan(ctx, "state/map/Get")
defer span.End()
if !c.Defined() {
return api.PinCid(c), false
}
st.pinMux.RLock()
defer st.pinMux.RUnlock()
pins, ok := st.PinMap[c.String()]
if !ok { // make sure no panics
return api.PinCid(c), false
}
return pins.ToPin(), true
}
// Has returns true if the Cid belongs to the State.
func (st *MapState) Has(ctx context.Context, c cid.Cid) bool {
ctx, span := trace.StartSpan(ctx, "state/map/Has")
defer span.End()
st.pinMux.RLock()
defer st.pinMux.RUnlock()
_, ok := st.PinMap[c.String()]
return ok
}
// List provides the list of tracked Pins.
func (st *MapState) List(ctx context.Context) []api.Pin {
ctx, span := trace.StartSpan(ctx, "state/map/List")
defer span.End()
st.pinMux.RLock()
defer st.pinMux.RUnlock()
cids := make([]api.Pin, 0, len(st.PinMap))
for _, v := range st.PinMap {
if v.Cid == "" {
continue
}
cids = append(cids, v.ToPin())
}
return cids
}
// Migrate restores a snapshot from the state's internal bytes and if
// necessary migrates the format to the current version.
func (st *MapState) Migrate(ctx context.Context, r io.Reader) error {
ctx, span := trace.StartSpan(ctx, "state/map/Migrate")
defer span.End()
bs, err := ioutil.ReadAll(r)
if err != nil {
return err
}
err = st.Unmarshal(bs)
if err != nil {
return err
}
if st.Version == Version { // Unmarshal restored for us
return nil
}
bytesNoVersion := bs[1:] // Restore is aware of encoding format
err = st.migrateFrom(st.Version, bytesNoVersion)
if err != nil {
return err
}
st.Version = Version
return nil
}
// GetVersion returns the current version of this state object.
// It is not necessarily up to date
func (st *MapState) GetVersion() int {
return st.Version
}
// Marshal encodes the state using msgpack
func (st *MapState) Marshal() ([]byte, error) {
// FIXME: Re-enable this span when raft Marshable interface has contexts
//ctx, span := trace.StartSpan(ctx, "state/map/Marshal")
//defer span.End()
logger.Debugf("Marshal-- Marshalling state of version %d", st.Version)
buf := new(bytes.Buffer)
enc := msgpack.Multicodec(msgpack.DefaultMsgpackHandle()).Encoder(buf)
if err := enc.Encode(st); err != nil {
return nil, err
}
// First byte indicates the version (probably should make this a varint
// if we stick to this encoding)
vCodec := make([]byte, 1)
vCodec[0] = byte(st.Version)
ret := append(vCodec, buf.Bytes()...)
//logger.Debugf("Marshal-- The final marshaled bytes: %x\n", ret)
return ret, nil
}
// Unmarshal decodes the state using msgpack. It first decodes just
// the version number. If this is not the current version the bytes
// are stored within the state's internal reader, which can be migrated
// to the current version in a later call to restore. Note: Out of date
// version is not an error
func (st *MapState) Unmarshal(bs []byte) error {
// FIXME: Re-enable this span when raft Marshable interface has contexts
// ctx, span := trace.StartSpan(ctx, "state/map/Unmarshal")
// defer span.End()
// Check version byte
// logger.Debugf("The incoming bytes to unmarshal: %x", bs)
if len(bs) < 1 {
return errors.New("cannot unmarshal from empty bytes")
}
v := int(bs[0])
logger.Debugf("The interpreted version: %d", v)
if v != Version { // snapshot is out of date
st.Version = v
return nil
}
// snapshot is up to date
buf := bytes.NewBuffer(bs[1:])
newState := MapState{}
dec := msgpack.Multicodec(msgpack.DefaultMsgpackHandle()).Decoder(buf)
err := dec.Decode(&newState)
if err != nil {
logger.Error(err)
}
st.PinMap = newState.PinMap
st.Version = newState.Version
return err
}