diff --git a/cmd/ipfs-cluster-service/daemon.go b/cmd/ipfs-cluster-service/daemon.go index 13baa63c..c2915a2d 100644 --- a/cmd/ipfs-cluster-service/daemon.go +++ b/cmd/ipfs-cluster-service/daemon.go @@ -130,6 +130,12 @@ func createCluster( ctx, err = tag.New(ctx, tag.Upsert(observations.HostKey, host.ID().Pretty())) checkErr("tag context with host id", err) + err = observations.SetupMetrics(cfgs.Metrics) + checkErr("setting up Metrics", err) + + tracer, err := observations.SetupTracing(cfgs.Tracing) + checkErr("setting up Tracing", err) + var apis []ipfscluster.API if cfgMgr.IsLoadedFromJSON(config.API, cfgs.Restapi.ConfigKey()) { var api *rest.API @@ -188,12 +194,6 @@ func createCluster( ipfscluster.ReadyTimeout = cfgs.Raft.WaitForLeaderTimeout + 5*time.Second - err = observations.SetupMetrics(cfgs.Metrics) - checkErr("setting up Metrics", err) - - tracer, err := observations.SetupTracing(cfgs.Tracing) - checkErr("setting up Tracing", err) - cons, err := setupConsensus( cfgHelper, host, diff --git a/ipfsconn/ipfshttp/ipfshttp.go b/ipfsconn/ipfshttp/ipfshttp.go index 1c200ce0..9e26f4b9 100644 --- a/ipfsconn/ipfshttp/ipfshttp.go +++ b/ipfsconn/ipfshttp/ipfshttp.go @@ -18,6 +18,7 @@ import ( "time" "github.com/ipfs/ipfs-cluster/api" + "github.com/ipfs/ipfs-cluster/observations" cid "github.com/ipfs/go-cid" files "github.com/ipfs/go-ipfs-files" @@ -32,6 +33,7 @@ import ( "go.opencensus.io/plugin/ochttp" "go.opencensus.io/plugin/ochttp/propagation/tracecontext" + "go.opencensus.io/stats" "go.opencensus.io/trace" ) @@ -58,6 +60,8 @@ type Connector struct { updateMetricCount uint64 + ipfsPinCount int64 + shutdownLock sync.Mutex shutdown bool wg sync.WaitGroup @@ -171,10 +175,23 @@ func NewConnector(cfg *Config) (*Connector, error) { client: c, } + initializeMetrics(ctx) + go ipfs.run() return ipfs, nil } +func initializeMetrics(ctx context.Context) { + // initialize metrics + stats.Record(ctx, observations.PinsIpfsPins.M(0)) + stats.Record(ctx, observations.PinsPinAdd.M(0)) + stats.Record(ctx, observations.PinsPinAddError.M(0)) + stats.Record(ctx, observations.BlocksPut.M(0)) + stats.Record(ctx, observations.BlocksAddedSize.M(0)) + stats.Record(ctx, observations.BlocksAdded.M(0)) + stats.Record(ctx, observations.BlocksAddedError.M(0)) +} + // connects all ipfs daemons when // we receive the rpcReady signal. func (ipfs *Connector) run() { @@ -392,10 +409,14 @@ func (ipfs *Connector) Pin(ctx context.Context, pin api.Pin) error { } }() + stats.Record(ipfs.ctx, observations.PinsPinAdd.M(1)) err = ipfs.pinProgress(ctx, hash, maxDepth, outPins) if err != nil { + stats.Record(ipfs.ctx, observations.PinsPinAddError.M(1)) return err } + totalPins := atomic.AddInt64(&ipfs.ipfsPinCount, 1) + stats.Record(ipfs.ctx, observations.PinsIpfsPins.M(totalPins)) logger.Info("IPFS Pin request succeeded: ", hash) return nil @@ -456,6 +477,8 @@ func (ipfs *Connector) pinUpdate(ctx context.Context, from, to api.Cid) error { if err != nil { return err } + totalPins := atomic.AddInt64(&ipfs.ipfsPinCount, 1) + stats.Record(ipfs.ctx, observations.PinsIpfsPins.M(totalPins)) logger.Infof("IPFS Pin Update request succeeded. %s -> %s (unpin=false)", from, to) return nil } @@ -489,6 +512,9 @@ func (ipfs *Connector) Unpin(ctx context.Context, hash api.Cid) error { return nil } + totalPins := atomic.AddInt64(&ipfs.ipfsPinCount, -1) + stats.Record(ipfs.ctx, observations.PinsIpfsPins.M(totalPins)) + logger.Info("IPFS Unpin request succeeded:", hash) return nil } @@ -506,6 +532,13 @@ func (ipfs *Connector) PinLs(ctx context.Context, typeFilters []string, out chan defer cancel() var err error + var totalPinCount int64 + defer func() { + if err != nil { + atomic.StoreInt64(&ipfs.ipfsPinCount, totalPinCount) + stats.Record(ipfs.ctx, observations.PinsIpfsPins.M(totalPinCount)) + } + }() nextFilter: for i, typeFilter := range typeFilters { @@ -545,6 +578,7 @@ nextFilter: logger.Error(err) return err case out <- ipfsPin: + totalPinCount++ } } } @@ -948,6 +982,10 @@ func (ci *chanIterator) Node() files.Node { ci.seenMu.Lock() ci.seen.Add(ci.current.Cid.Hash()) ci.seenMu.Unlock() + + stats.Record(ci.ctx, observations.BlocksAdded.M(1)) + stats.Record(ci.ctx, observations.BlocksAddedSize.M(int64(len(ci.current.Data)))) + return files.NewBytesFile(ci.current.Data) } @@ -1104,6 +1142,9 @@ func (ipfs *Connector) BlockStream(ctx context.Context, blocks <-chan api.NodeWi } }() + if err != nil { + stats.Record(ipfs.ctx, observations.BlocksAddedError.M(1)) + } return err } diff --git a/observations/metrics.go b/observations/metrics.go index 04bfed87..00d45b1c 100644 --- a/observations/metrics.go +++ b/observations/metrics.go @@ -32,12 +32,22 @@ var ( // metrics var ( // This metric is managed in state/dsstate. - Pins = stats.Int64("pins", "Total number of pins", stats.UnitDimensionless) + Pins = stats.Int64("pins", "Total number of cluster pins", stats.UnitDimensionless) // These metrics are managed by the pintracker/optracker module. - PinsQueued = stats.Int64("pins/pin_queued", "Number of pins queued for pinning", stats.UnitDimensionless) - PinsPinning = stats.Int64("pins/pinning", "Number of pins currently pinning", stats.UnitDimensionless) - PinsPinError = stats.Int64("pins/pin_error", "Number of pins in pin_error state", stats.UnitDimensionless) + PinsQueued = stats.Int64("pins/pin_queued", "Current number of pins queued for pinning", stats.UnitDimensionless) + PinsPinning = stats.Int64("pins/pinning", "Current number of pins currently pinning", stats.UnitDimensionless) + PinsPinError = stats.Int64("pins/pin_error", "Current number of pins in pin_error state", stats.UnitDimensionless) + + // These metrics and managed in the ipfshttp module. + PinsIpfsPins = stats.Int64("pins/ipfs_pins", "Current number of items pinned on IPFS", stats.UnitDimensionless) + PinsPinAdd = stats.Int64("pins/pin_add", "Total number of IPFS pin requests", stats.UnitDimensionless) + PinsPinAddError = stats.Int64("pins/pin_add_errors", "Total number of failed pin requests", stats.UnitDimensionless) + BlocksPut = stats.Int64("blocks/put", "Total number of blocks/put requests", stats.UnitDimensionless) + BlocksAddedSize = stats.Int64("blocks/added_size", "Total size of blocks added in bytes", stats.UnitDimensionless) + + BlocksAdded = stats.Int64("blocks/added", "Total number of blocks added", stats.UnitDimensionless) + BlocksAddedError = stats.Int64("blocks/put_error", "Total number of block/put errors", stats.UnitDimensionless) ) // views, which is just the aggregation of the metrics @@ -69,11 +79,53 @@ var ( Aggregation: view.LastValue(), } + PinsIpfsPinsView = &view.View{ + Measure: PinsIpfsPins, + Aggregation: view.LastValue(), + } + + PinsPinAddView = &view.View{ + Measure: PinsPinAdd, + Aggregation: view.Sum(), + } + + PinsPinAddErrorView = &view.View{ + Measure: PinsPinAddError, + Aggregation: view.Sum(), + } + + BlocksPutView = &view.View{ + Measure: BlocksPut, + Aggregation: view.Sum(), + } + + BlocksAddedSizeView = &view.View{ + Measure: BlocksAddedSize, + Aggregation: view.Sum(), + } + + BlocksAddedView = &view.View{ + Measure: BlocksAdded, + Aggregation: view.Sum(), + } + + BlocksAddedErrorView = &view.View{ + Measure: PinsPinAddError, + Aggregation: view.Sum(), + } + DefaultViews = []*view.View{ PinsView, PinsQueuedView, PinsPinningView, PinsPinErrorView, + PinsIpfsPinsView, + PinsPinAddView, + PinsPinAddErrorView, + BlocksPutView, + BlocksAddedSizeView, + BlocksAddedView, + BlocksAddedErrorView, } )