// Copyright The OpenTelemetry Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( "context" "errors" "fmt" "sync" "sync/atomic" "time" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/internal/global" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) // Default periodic reader timing. const ( defaultTimeout = time.Millisecond * 30000 defaultInterval = time.Millisecond * 60000 ) // periodicReaderConfig contains configuration options for a PeriodicReader. type periodicReaderConfig struct { interval time.Duration timeout time.Duration producers []Producer } // newPeriodicReaderConfig returns a periodicReaderConfig configured with // options. func newPeriodicReaderConfig(options []PeriodicReaderOption) periodicReaderConfig { c := periodicReaderConfig{ interval: envDuration(envInterval, defaultInterval), timeout: envDuration(envTimeout, defaultTimeout), } for _, o := range options { c = o.applyPeriodic(c) } return c } // PeriodicReaderOption applies a configuration option value to a PeriodicReader. type PeriodicReaderOption interface { applyPeriodic(periodicReaderConfig) periodicReaderConfig } // periodicReaderOptionFunc applies a set of options to a periodicReaderConfig. type periodicReaderOptionFunc func(periodicReaderConfig) periodicReaderConfig // applyPeriodic returns a periodicReaderConfig with option(s) applied. func (o periodicReaderOptionFunc) applyPeriodic(conf periodicReaderConfig) periodicReaderConfig { return o(conf) } // WithTimeout configures the time a PeriodicReader waits for an export to // complete before canceling it. This includes an export which occurs as part // of Shutdown or ForceFlush if the user passed context does not have a // deadline. If the user passed context does have a deadline, it will be used // instead. // // This option overrides any value set for the // OTEL_METRIC_EXPORT_TIMEOUT environment variable. // // If this option is not used or d is less than or equal to zero, 30 seconds // is used as the default. func WithTimeout(d time.Duration) PeriodicReaderOption { return periodicReaderOptionFunc(func(conf periodicReaderConfig) periodicReaderConfig { if d <= 0 { return conf } conf.timeout = d return conf }) } // WithInterval configures the intervening time between exports for a // PeriodicReader. // // This option overrides any value set for the // OTEL_METRIC_EXPORT_INTERVAL environment variable. // // If this option is not used or d is less than or equal to zero, 60 seconds // is used as the default. func WithInterval(d time.Duration) PeriodicReaderOption { return periodicReaderOptionFunc(func(conf periodicReaderConfig) periodicReaderConfig { if d <= 0 { return conf } conf.interval = d return conf }) } // NewPeriodicReader returns a Reader that collects and exports metric data to // the exporter at a defined interval. By default, the returned Reader will // collect and export data every 60 seconds, and will cancel any attempts that // exceed 30 seconds, collect and export combined. The collect and export time // are not counted towards the interval between attempts. // // The Collect method of the returned Reader continues to gather and return // metric data to the user. It will not automatically send that data to the // exporter. That is left to the user to accomplish. func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) *PeriodicReader { conf := newPeriodicReaderConfig(options) ctx, cancel := context.WithCancel(context.Background()) r := &PeriodicReader{ interval: conf.interval, timeout: conf.timeout, exporter: exporter, flushCh: make(chan chan error), cancel: cancel, done: make(chan struct{}), rmPool: sync.Pool{ New: func() interface{} { return &metricdata.ResourceMetrics{} }, }, } r.externalProducers.Store(conf.producers) go func() { defer func() { close(r.done) }() r.run(ctx, conf.interval) }() return r } // PeriodicReader is a Reader that continuously collects and exports metric // data at a set interval. type PeriodicReader struct { sdkProducer atomic.Value mu sync.Mutex isShutdown bool externalProducers atomic.Value interval time.Duration timeout time.Duration exporter Exporter flushCh chan chan error done chan struct{} cancel context.CancelFunc shutdownOnce sync.Once rmPool sync.Pool } // Compile time check the periodicReader implements Reader and is comparable. var _ = map[Reader]struct{}{&PeriodicReader{}: {}} // newTicker allows testing override. var newTicker = time.NewTicker // run continuously collects and exports metric data at the specified // interval. This will run until ctx is canceled or times out. func (r *PeriodicReader) run(ctx context.Context, interval time.Duration) { ticker := newTicker(interval) defer ticker.Stop() for { select { case <-ticker.C: err := r.collectAndExport(ctx) if err != nil { otel.Handle(err) } case errCh := <-r.flushCh: errCh <- r.collectAndExport(ctx) ticker.Reset(interval) case <-ctx.Done(): return } } } // register registers p as the producer of this reader. func (r *PeriodicReader) register(p sdkProducer) { // Only register once. If producer is already set, do nothing. if !r.sdkProducer.CompareAndSwap(nil, produceHolder{produce: p.produce}) { msg := "did not register periodic reader" global.Error(errDuplicateRegister, msg) } } // temporality reports the Temporality for the instrument kind provided. func (r *PeriodicReader) temporality(kind InstrumentKind) metricdata.Temporality { return r.exporter.Temporality(kind) } // aggregation returns what Aggregation to use for kind. func (r *PeriodicReader) aggregation(kind InstrumentKind) Aggregation { // nolint:revive // import-shadow for method scoped by type. return r.exporter.Aggregation(kind) } // collectAndExport gather all metric data related to the periodicReader r from // the SDK and exports it with r's exporter. func (r *PeriodicReader) collectAndExport(ctx context.Context) error { ctx, cancel := context.WithTimeout(ctx, r.timeout) defer cancel() // TODO (#3047): Use a sync.Pool or persistent pointer instead of allocating rm every Collect. rm := r.rmPool.Get().(*metricdata.ResourceMetrics) err := r.Collect(ctx, rm) if err == nil { err = r.export(ctx, rm) } r.rmPool.Put(rm) return err } // Collect gathers all metric data related to the Reader from // the SDK and other Producers and stores the result in rm. The metric // data is not exported to the configured exporter, it is left to the caller to // handle that if desired. // // Collect will return an error if called after shutdown. // Collect will return an error if rm is a nil ResourceMetrics. // Collect will return an error if the context's Done channel is closed. // // This method is safe to call concurrently. func (r *PeriodicReader) Collect(ctx context.Context, rm *metricdata.ResourceMetrics) error { if rm == nil { return errors.New("periodic reader: *metricdata.ResourceMetrics is nil") } // TODO (#3047): When collect is updated to accept output as param, pass rm. return r.collect(ctx, r.sdkProducer.Load(), rm) } // collect unwraps p as a produceHolder and returns its produce results. func (r *PeriodicReader) collect(ctx context.Context, p interface{}, rm *metricdata.ResourceMetrics) error { if p == nil { return ErrReaderNotRegistered } ph, ok := p.(produceHolder) if !ok { // The atomic.Value is entirely in the periodicReader's control so // this should never happen. In the unforeseen case that this does // happen, return an error instead of panicking so a users code does // not halt in the processes. err := fmt.Errorf("periodic reader: invalid producer: %T", p) return err } err := ph.produce(ctx, rm) if err != nil { return err } var errs []error for _, producer := range r.externalProducers.Load().([]Producer) { externalMetrics, err := producer.Produce(ctx) if err != nil { errs = append(errs, err) } rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics...) } global.Debug("PeriodicReader collection", "Data", rm) return unifyErrors(errs) } // export exports metric data m using r's exporter. func (r *PeriodicReader) export(ctx context.Context, m *metricdata.ResourceMetrics) error { return r.exporter.Export(ctx, m) } // ForceFlush flushes pending telemetry. // // This method is safe to call concurrently. func (r *PeriodicReader) ForceFlush(ctx context.Context) error { // Prioritize the ctx timeout if it is set. if _, ok := ctx.Deadline(); !ok { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, r.timeout) defer cancel() } errCh := make(chan error, 1) select { case r.flushCh <- errCh: select { case err := <-errCh: if err != nil { return err } close(errCh) case <-ctx.Done(): return ctx.Err() } case <-r.done: return ErrReaderShutdown case <-ctx.Done(): return ctx.Err() } return r.exporter.ForceFlush(ctx) } // Shutdown flushes pending telemetry and then stops the export pipeline. // // This method is safe to call concurrently. func (r *PeriodicReader) Shutdown(ctx context.Context) error { err := ErrReaderShutdown r.shutdownOnce.Do(func() { // Prioritize the ctx timeout if it is set. if _, ok := ctx.Deadline(); !ok { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, r.timeout) defer cancel() } // Stop the run loop. r.cancel() <-r.done // Any future call to Collect will now return ErrReaderShutdown. ph := r.sdkProducer.Swap(produceHolder{ produce: shutdownProducer{}.produce, }) if ph != nil { // Reader was registered. // Flush pending telemetry. m := r.rmPool.Get().(*metricdata.ResourceMetrics) err = r.collect(ctx, ph, m) if err == nil { err = r.export(ctx, m) } r.rmPool.Put(m) } sErr := r.exporter.Shutdown(ctx) if err == nil || err == ErrReaderShutdown { err = sErr } r.mu.Lock() defer r.mu.Unlock() r.isShutdown = true // release references to Producer(s) r.externalProducers.Store([]Producer{}) }) return err } // MarshalLog returns logging data about the PeriodicReader. func (r *PeriodicReader) MarshalLog() interface{} { r.mu.Lock() down := r.isShutdown r.mu.Unlock() return struct { Type string Exporter Exporter Registered bool Shutdown bool Interval time.Duration Timeout time.Duration }{ Type: "PeriodicReader", Exporter: r.exporter, Registered: r.sdkProducer.Load() != nil, Shutdown: down, Interval: r.interval, Timeout: r.timeout, } }