diff --git a/README.md b/README.md index d7811edd8..0310f8338 100644 --- a/README.md +++ b/README.md @@ -47,18 +47,18 @@ This section lists the components that are included in the Observe Distribution |----------------------------------------------------------|-------------------------------------------------------|--------------------------------------------------------|--------------------------------------|-----------------------------| | [awsecscontainermetrics][awsecscontainermetricsreceiver] | [attributes][attributesprocessor] | [debug][debugexporter] | [file_storage][filestorage] | [count][countconnector] | | [docker_stats][dockerstatsreceiver] | [batch][batchprocessor] | [file][fileexporter] | [health_check][healthcheckextension] | [forward][forwardconnector] | -| [elasticsearch][elasticsearchreceiver] | [filter][filterprocessor] | [otlphttp][otlphttpexporter] | [zpages][zpagesextension] | | -| [filelog][filelogreceiver] | [k8sattributes][k8sattributesprocessor] | [prometheusremotewrite][prometheusremotewriteexporter] | | | -| [filestats][filestatsreceiver] | [memory_limiter][memorylimiterprocessor] | | | | -| [hostmetrics][hostmetricsreceiver] | [observek8sattributes][observek8sattributesprocessor] | | | | -| [iis][iisreceiver] | [probabilisticsampler][probabilisticsamplerprocessor] | | | | -| [journald][journaldreceiver] | [redaction][redactionprocessor] | | | | -| [k8s_cluster][k8sclusterreceiver] | [resource][resourceprocessor] | | | | -| [k8sobjects][k8sobjectsreceiver] | [resourcedetection][resourcedetectionprocessor] | | | | -| [kafkametrics][kafkametricsreceiver] | [span][spanprocessor] | | | | -| [kafka][kafkareceiver] | [tailsampling][tailsamplingprocessor] | | | | -| [kubeletstats][kubeletstatsreceiver] | [transform][transformprocessor] | | | | -| [mongodb][mongodbreceiver] | | | | | +| [elasticsearch][elasticsearchreceiver] | [deltatocumulative][deltatocumulativeprocessor] | [otlphttp][otlphttpexporter] | [zpages][zpagesextension] | | +| [filelog][filelogreceiver] | [filter][filterprocessor] | [prometheusremotewrite][prometheusremotewriteexporter] | | | +| [filestats][filestatsreceiver] | [k8sattributes][k8sattributesprocessor] | | | | +| [hostmetrics][hostmetricsreceiver] | [memory_limiter][memorylimiterprocessor] | | | | +| [iis][iisreceiver] | [observek8sattributes][observek8sattributesprocessor] | | | | +| [journald][journaldreceiver] | [probabilisticsampler][probabilisticsamplerprocessor] | | | | +| [k8s_cluster][k8sclusterreceiver] | [redaction][redactionprocessor] | | | | +| [k8sobjects][k8sobjectsreceiver] | [resource][resourceprocessor] | | | | +| [kafkametrics][kafkametricsreceiver] | [resourcedetection][resourcedetectionprocessor] | | | | +| [kafka][kafkareceiver] | [span][spanprocessor] | | | | +| [kubeletstats][kubeletstatsreceiver] | [tailsampling][tailsamplingprocessor] | | | | +| [mongodb][mongodbreceiver] | [transform][transformprocessor] | | | | | [otlp][otlpreceiver] | | | | | | [prometheus][prometheusreceiver] | | | | | | [redis][redisreceiver] | | | | | @@ -66,8 +66,6 @@ This section lists the components that are included in the Observe Distribution | [tcplog][tcplogreceiver] | | | | | | [udplog][udplogreceiver] | | | | | | [windowseventlog][windowseventlogreceiver] | | | | | -| | | | | | -| | | | | | [awsecscontainermetricsreceiver]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/v0.110.0/receiver/awsecscontainermetricsreceiver [dockerstatsreceiver]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/v0.110.0/receiver/dockerstatsreceiver @@ -92,6 +90,7 @@ This section lists the components that are included in the Observe Distribution [windowseventlogreceiver]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/v0.110.0/receiver/windowseventlogreceiver [attributesprocessor]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/v0.110.0/processor/attributesprocessor [batchprocessor]: https://github.com/open-telemetry/opentelemetry-collector/tree/v0.110.0/processor/batchprocessor +[deltatocumulativeprocessor]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/v0.110.0/processor/deltatocumulativeprocessor [filterprocessor]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/v0.110.0/processor/filterprocessor [k8sattributesprocessor]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/v0.110.0/processor/k8sattributesprocessor [memorylimiterprocessor]: https://github.com/open-telemetry/opentelemetry-collector/tree/v0.110.0/processor/memorylimiterprocessor diff --git a/builder-config.yaml b/builder-config.yaml index c83eb254b..d88cb8f3b 100644 --- a/builder-config.yaml +++ b/builder-config.yaml @@ -18,6 +18,7 @@ processors: - gomod: go.opentelemetry.io/collector/processor/memorylimiterprocessor v0.110.0 - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributesprocessor v0.110.0 + - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor v0.110.0 - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor v0.110.0 - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/probabilisticsamplerprocessor v0.110.0 - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/redactionprocessor v0.110.0 diff --git a/go.mod b/go.mod index 4c47ae9cd..bf4a056aa 100644 --- a/go.mod +++ b/go.mod @@ -163,6 +163,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.110.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.110.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/docker v0.110.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.110.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter v0.110.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig v0.110.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka v0.110.0 // indirect @@ -185,6 +186,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.110.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/winperfcounters v0.110.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributesprocessor v0.110.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor v0.110.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor v0.110.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor v0.110.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/processor/probabilisticsamplerprocessor v0.110.0 // indirect diff --git a/observecol/components.go b/observecol/components.go index 5f6820965..f45d82b01 100644 --- a/observecol/components.go +++ b/observecol/components.go @@ -23,6 +23,7 @@ import ( batchprocessor "go.opentelemetry.io/collector/processor/batchprocessor" memorylimiterprocessor "go.opentelemetry.io/collector/processor/memorylimiterprocessor" attributesprocessor "github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributesprocessor" + deltatocumulativeprocessor "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor" k8sattributesprocessor "github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor" probabilisticsamplerprocessor "github.com/open-telemetry/opentelemetry-collector-contrib/processor/probabilisticsamplerprocessor" redactionprocessor "github.com/open-telemetry/opentelemetry-collector-contrib/processor/redactionprocessor" @@ -143,6 +144,7 @@ func components() (otelcol.Factories, error) { batchprocessor.NewFactory(), memorylimiterprocessor.NewFactory(), attributesprocessor.NewFactory(), + deltatocumulativeprocessor.NewFactory(), k8sattributesprocessor.NewFactory(), probabilisticsamplerprocessor.NewFactory(), redactionprocessor.NewFactory(), @@ -161,6 +163,7 @@ func components() (otelcol.Factories, error) { factories.ProcessorModules[batchprocessor.NewFactory().Type()] = "go.opentelemetry.io/collector/processor/batchprocessor v0.110.0" factories.ProcessorModules[memorylimiterprocessor.NewFactory().Type()] = "go.opentelemetry.io/collector/processor/memorylimiterprocessor v0.110.0" factories.ProcessorModules[attributesprocessor.NewFactory().Type()] = "github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributesprocessor v0.110.0" + factories.ProcessorModules[deltatocumulativeprocessor.NewFactory().Type()] = "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor v0.110.0" factories.ProcessorModules[k8sattributesprocessor.NewFactory().Type()] = "github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor v0.110.0" factories.ProcessorModules[probabilisticsamplerprocessor.NewFactory().Type()] = "github.com/open-telemetry/opentelemetry-collector-contrib/processor/probabilisticsamplerprocessor v0.110.0" factories.ProcessorModules[redactionprocessor.NewFactory().Type()] = "github.com/open-telemetry/opentelemetry-collector-contrib/processor/redactionprocessor v0.110.0" diff --git a/observecol/go.mod b/observecol/go.mod index 1d2dfa1b1..b2c45d160 100644 --- a/observecol/go.mod +++ b/observecol/go.mod @@ -13,6 +13,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckextension v0.110.0 github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/filestorage v0.110.0 github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributesprocessor v0.110.0 + github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor v0.110.0 github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor v0.110.0 github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor v0.110.0 github.com/open-telemetry/opentelemetry-collector-contrib/processor/probabilisticsamplerprocessor v0.110.0 @@ -205,6 +206,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.110.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.110.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/docker v0.110.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.110.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter v0.110.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig v0.110.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka v0.110.0 // indirect diff --git a/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/LICENSE b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/LICENSE new file mode 100644 index 000000000..261eeb9e9 --- /dev/null +++ b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity/doc.go b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity/doc.go new file mode 100644 index 000000000..457bd7434 --- /dev/null +++ b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity/doc.go @@ -0,0 +1,8 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// identity types for metrics and sample streams. +// +// Use the `Of*(T) -> I` functions to obtain a unique, comparable (==) and +// hashable (map key) identity value I for T. +package identity // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" diff --git a/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity/metric.go b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity/metric.go new file mode 100644 index 000000000..ceaea90b8 --- /dev/null +++ b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity/metric.go @@ -0,0 +1,71 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package identity // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" + +import ( + "hash" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" +) + +type metric = Metric + +type Metric struct { + scope + + name string + unit string + ty pmetric.MetricType + + monotonic bool + temporality pmetric.AggregationTemporality +} + +func (i Metric) Hash() hash.Hash64 { + sum := i.scope.Hash() + sum.Write([]byte(i.name)) + sum.Write([]byte(i.unit)) + + var mono byte + if i.monotonic { + mono = 1 + } + sum.Write([]byte{byte(i.ty), mono, byte(i.temporality)}) + return sum +} + +func (i Metric) Scope() Scope { + return i.scope +} + +func OfMetric(scope Scope, m pmetric.Metric) Metric { + id := Metric{ + scope: scope, + name: m.Name(), + unit: m.Unit(), + ty: m.Type(), + } + + switch m.Type() { + case pmetric.MetricTypeSum: + sum := m.Sum() + id.monotonic = sum.IsMonotonic() + id.temporality = sum.AggregationTemporality() + case pmetric.MetricTypeExponentialHistogram: + exp := m.ExponentialHistogram() + id.monotonic = true + id.temporality = exp.AggregationTemporality() + case pmetric.MetricTypeHistogram: + hist := m.Histogram() + id.monotonic = true + id.temporality = hist.AggregationTemporality() + } + + return id +} + +func OfResourceMetric(res pcommon.Resource, scope pcommon.InstrumentationScope, metric pmetric.Metric) Metric { + return OfMetric(OfScope(OfResource(res), scope), metric) +} diff --git a/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity/resource.go b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity/resource.go new file mode 100644 index 000000000..990fb71e6 --- /dev/null +++ b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity/resource.go @@ -0,0 +1,31 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package identity // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" + +import ( + "hash" + "hash/fnv" + + "go.opentelemetry.io/collector/pdata/pcommon" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil" +) + +type resource = Resource + +type Resource struct { + attrs [16]byte +} + +func (r Resource) Hash() hash.Hash64 { + sum := fnv.New64a() + sum.Write(r.attrs[:]) + return sum +} + +func OfResource(r pcommon.Resource) Resource { + return Resource{ + attrs: pdatautil.MapHash(r.Attributes()), + } +} diff --git a/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity/scope.go b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity/scope.go new file mode 100644 index 000000000..db516bc14 --- /dev/null +++ b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity/scope.go @@ -0,0 +1,43 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package identity // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" + +import ( + "hash" + + "go.opentelemetry.io/collector/pdata/pcommon" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil" +) + +type scope = Scope + +type Scope struct { + resource resource + + name string + version string + attrs [16]byte +} + +func (s Scope) Hash() hash.Hash64 { + sum := s.resource.Hash() + sum.Write([]byte(s.name)) + sum.Write([]byte(s.version)) + sum.Write(s.attrs[:]) + return sum +} + +func (s Scope) Resource() Resource { + return s.resource +} + +func OfScope(res Resource, scope pcommon.InstrumentationScope) Scope { + return Scope{ + resource: res, + name: scope.Name(), + version: scope.Version(), + attrs: pdatautil.MapHash(scope.Attributes()), + } +} diff --git a/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity/stream.go b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity/stream.go new file mode 100644 index 000000000..19988f773 --- /dev/null +++ b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity/stream.go @@ -0,0 +1,35 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package identity // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" + +import ( + "hash" + + "go.opentelemetry.io/collector/pdata/pcommon" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil" +) + +type Stream struct { + metric + attrs [16]byte +} + +func (i Stream) Hash() hash.Hash64 { + sum := i.metric.Hash() + sum.Write(i.attrs[:]) + return sum +} + +func (i Stream) Metric() Metric { + return i.metric +} + +func OfStream[DataPoint attrPoint](m Metric, dp DataPoint) Stream { + return Stream{metric: m, attrs: pdatautil.MapHash(dp.Attributes())} +} + +type attrPoint interface { + Attributes() pcommon.Map +} diff --git a/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity/strings.go b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity/strings.go new file mode 100644 index 000000000..7339f95a5 --- /dev/null +++ b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity/strings.go @@ -0,0 +1,24 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package identity // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" + +import ( + "fmt" +) + +func (r Resource) String() string { + return fmt.Sprintf("resource/%x", r.Hash().Sum64()) +} + +func (s Scope) String() string { + return fmt.Sprintf("scope/%x", s.Hash().Sum64()) +} + +func (m Metric) String() string { + return fmt.Sprintf("metric/%x", m.Hash().Sum64()) +} + +func (s Stream) String() string { + return fmt.Sprintf("stream/%x", s.Hash().Sum64()) +} diff --git a/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/staleness/priority_queue.go b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/staleness/priority_queue.go new file mode 100644 index 000000000..f1b01743f --- /dev/null +++ b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/staleness/priority_queue.go @@ -0,0 +1,111 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package staleness // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/staleness" + +import ( + "container/heap" + "time" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" +) + +// PriorityQueue represents a way to store entries sorted by their priority. +// Pop() will return the oldest entry of the set. +type PriorityQueue interface { + // Update will add or update an entry, and reshuffle the queue internally as needed to keep it sorted + Update(id identity.Stream, newPrio time.Time) + // Peek will return the entry at the HEAD of the queue *without* removing it from the queue + Peek() (identity.Stream, time.Time) + // Pop will remove the entry at the HEAD of the queue and return it + Pop() (identity.Stream, time.Time) + // Len will return the number of entries in the queue + Len() int +} + +// heapQueue implements heap.Interface. +// We use it as the inner implementation of a heap-based sorted queue +type heapQueue []*queueItem + +type queueItem struct { + key identity.Stream + prio time.Time + index int +} + +func (pq heapQueue) Len() int { return len(pq) } + +func (pq heapQueue) Less(i, j int) bool { + // We want Pop to give us the lowest priority + return pq[i].prio.Before(pq[j].prio) +} + +func (pq heapQueue) Swap(i, j int) { + pq[i], pq[j] = pq[j], pq[i] + pq[i].index = i + pq[j].index = j +} + +func (pq *heapQueue) Push(x any) { + n := len(*pq) + item := x.(*queueItem) + item.index = n + *pq = append(*pq, item) +} + +func (pq *heapQueue) Pop() any { + old := *pq + n := len(old) + item := old[n-1] + old[n-1] = nil // avoid memory leak + item.index = -1 // for safety + *pq = old[0 : n-1] + return item +} + +type heapPriorityQueue struct { + inner heapQueue + itemLookup map[identity.Stream]*queueItem +} + +func NewPriorityQueue() PriorityQueue { + pq := &heapPriorityQueue{ + inner: heapQueue{}, + itemLookup: map[identity.Stream]*queueItem{}, + } + heap.Init(&pq.inner) + + return pq +} + +func (pq *heapPriorityQueue) Update(id identity.Stream, newPrio time.Time) { + // Check if the entry already exists in the queue + item, ok := pq.itemLookup[id] + if ok { + // If so, we can update it in place + item.prio = newPrio + heap.Fix(&pq.inner, item.index) + } else { + item = &queueItem{ + key: id, + prio: newPrio, + } + heap.Push(&pq.inner, item) + pq.itemLookup[id] = item + } +} + +func (pq *heapPriorityQueue) Peek() (identity.Stream, time.Time) { + val := pq.inner[0] + return val.key, val.prio +} + +func (pq *heapPriorityQueue) Pop() (identity.Stream, time.Time) { + val := heap.Pop(&pq.inner).(*queueItem) + delete(pq.itemLookup, val.key) + return val.key, val.prio +} + +func (pq *heapPriorityQueue) Len() int { + return pq.inner.Len() +} diff --git a/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/staleness/staleness.go b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/staleness/staleness.go new file mode 100644 index 000000000..dae1870cb --- /dev/null +++ b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/staleness/staleness.go @@ -0,0 +1,104 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package staleness // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/staleness" + +import ( + "time" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams" +) + +// We override how Now() is returned, so we can have deterministic tests +var NowFunc = time.Now + +var ( + _ streams.Map[any] = (*Staleness[any])(nil) + _ streams.Evictor = (*Staleness[any])(nil) +) + +// Staleness a a wrapper over a map that adds an additional "staleness" value to each entry. Users can +// call ExpireOldEntries() to automatically remove all entries from the map whole staleness value is +// older than the `max` +// +// NOTE: Staleness methods are *not* thread-safe. If the user needs to use Staleness in a multi-threaded +// environment, then it is the user's responsibility to properly serialize calls to Staleness methods +type Staleness[T any] struct { + Max time.Duration + + items streams.Map[T] + pq PriorityQueue +} + +func NewStaleness[T any](max time.Duration, items streams.Map[T]) *Staleness[T] { + return &Staleness[T]{ + Max: max, + + items: items, + pq: NewPriorityQueue(), + } +} + +// Load the value at key. If it does not exist, the boolean will be false and the value returned will be the zero value +func (s *Staleness[T]) Load(id identity.Stream) (T, bool) { + return s.items.Load(id) +} + +// Store the given key value pair in the map, and update the pair's staleness value to "now" +func (s *Staleness[T]) Store(id identity.Stream, v T) error { + s.pq.Update(id, NowFunc()) + return s.items.Store(id, v) +} + +func (s *Staleness[T]) Delete(id identity.Stream) { + s.items.Delete(id) +} + +// Items returns an iterator function that in future go version can be used with range +// See: https://go.dev/wiki/RangefuncExperiment +func (s *Staleness[T]) Items() func(yield func(identity.Stream, T) bool) bool { + return s.items.Items() +} + +// ExpireOldEntries will remove all entries whose staleness value is older than `now() - max` +// For example, if an entry has a staleness value of two hours ago, and max == 1 hour, then the entry would +// be removed. But if an entry had a stalness value of 30 minutes, then it *wouldn't* be removed. +func (s *Staleness[T]) ExpireOldEntries() { + now := NowFunc() + for { + if s.Len() == 0 { + return + } + _, ts := s.pq.Peek() + if now.Sub(ts) < s.Max { + break + } + id, _ := s.pq.Pop() + s.items.Delete(id) + } +} + +func (s *Staleness[T]) Len() int { + return s.items.Len() +} + +func (s *Staleness[T]) Next() time.Time { + _, ts := s.pq.Peek() + return ts +} + +func (s *Staleness[T]) Evict() (identity.Stream, bool) { + _, ts := s.pq.Peek() + if NowFunc().Sub(ts) < s.Max { + return identity.Stream{}, false + } + + id, _ := s.pq.Pop() + s.items.Delete(id) + return id, true +} + +func (s *Staleness[T]) Clear() { + s.items.Clear() +} diff --git a/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams/streams.go b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams/streams.go new file mode 100644 index 000000000..5f0d715b6 --- /dev/null +++ b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams/streams.go @@ -0,0 +1,81 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package streams // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams" + +import ( + "go.opentelemetry.io/collector/pdata/pcommon" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" +) + +// Sequence of streams that can be iterated upon +type Seq[T any] func(yield func(identity.Stream, T) bool) bool + +// Map defines a collection of items tracked by a stream-id and the operations +// on it +type Map[T any] interface { + Load(identity.Stream) (T, bool) + Store(identity.Stream, T) error + Delete(identity.Stream) + Items() func(yield func(identity.Stream, T) bool) bool + Len() int + Clear() +} + +var _ Map[any] = HashMap[any](nil) + +type HashMap[T any] map[identity.Stream]T + +func (m HashMap[T]) Load(id identity.Stream) (T, bool) { + v, ok := (map[identity.Stream]T)(m)[id] + return v, ok +} + +func (m HashMap[T]) Store(id identity.Stream, v T) error { + (map[identity.Stream]T)(m)[id] = v + return nil +} + +func (m HashMap[T]) Delete(id identity.Stream) { + delete((map[identity.Stream]T)(m), id) +} + +func (m HashMap[T]) Items() func(yield func(identity.Stream, T) bool) bool { + return func(yield func(identity.Stream, T) bool) bool { + for id, v := range (map[identity.Stream]T)(m) { + if !yield(id, v) { + break + } + } + return false + } +} + +func (m HashMap[T]) Len() int { + return len((map[identity.Stream]T)(m)) +} + +func (m HashMap[T]) Clear() { + clear(m) +} + +// Evictors remove the "least important" stream based on some strategy such as +// the oldest, least active, etc. +// +// Returns whether a stream was evicted and if so the now gone stream id +type Evictor interface { + Evict() (gone identity.Stream, ok bool) +} + +type DataPointSlice[DP DataPoint[DP]] interface { + Len() int + At(i int) DP + AppendEmpty() DP +} + +type DataPoint[Self any] interface { + Timestamp() pcommon.Timestamp + Attributes() pcommon.Map + CopyTo(dest Self) +} diff --git a/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/LICENSE b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/LICENSE new file mode 100644 index 000000000..261eeb9e9 --- /dev/null +++ b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/Makefile b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/Makefile new file mode 100644 index 000000000..c1496226e --- /dev/null +++ b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/Makefile @@ -0,0 +1 @@ +include ../../Makefile.Common \ No newline at end of file diff --git a/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/README.md b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/README.md new file mode 100644 index 000000000..87452c163 --- /dev/null +++ b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/README.md @@ -0,0 +1,41 @@ +# Delta to cumulative processor + + +| Status | | +| ------------- |-----------| +| Stability | [alpha]: metrics | +| Distributions | [contrib] | +| Warnings | [Statefulness](#warnings) | +| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aprocessor%2Fdeltatocumulative%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aprocessor%2Fdeltatocumulative) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aprocessor%2Fdeltatocumulative%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aprocessor%2Fdeltatocumulative) | +| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@sh0rez](https://www.github.com/sh0rez), [@RichieSams](https://www.github.com/RichieSams), [@jpkrohling](https://www.github.com/jpkrohling) | + +[alpha]: https://github.com/open-telemetry/opentelemetry-collector#alpha +[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib + + + +## Description + +The delta to cumulative processor (`deltatocumulativeprocessor`) converts +metrics from delta temporality to cumulative, by accumulating samples in memory. + +## Configuration + +``` yaml +processors: + deltatocumulative: + # how long until a series not receiving new samples is removed + [ max_stale: | default = 5m ] + + # upper limit of streams to track. new streams exceeding this limit + # will be dropped + [ max_streams: | default = 0 (off) ] + +``` + +There is no further configuration required. All delta samples are converted to cumulative. + +## Troubleshooting + +When [Telemetry is +enabled](https://opentelemetry.io/docs/collector/configuration/#telemetry), this component exports [several metrics](./documentation.md). diff --git a/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/config.go b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/config.go new file mode 100644 index 000000000..b97793d0b --- /dev/null +++ b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/config.go @@ -0,0 +1,38 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package deltatocumulativeprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor" + +import ( + "fmt" + "time" + + "go.opentelemetry.io/collector/component" +) + +var _ component.ConfigValidator = (*Config)(nil) + +type Config struct { + MaxStale time.Duration `mapstructure:"max_stale"` + MaxStreams int `mapstructure:"max_streams"` +} + +func (c *Config) Validate() error { + if c.MaxStale <= 0 { + return fmt.Errorf("max_stale must be a positive duration (got %s)", c.MaxStale) + } + if c.MaxStreams < 0 { + return fmt.Errorf("max_streams must be a positive number (got %d)", c.MaxStreams) + } + return nil +} + +func createDefaultConfig() component.Config { + return &Config{ + MaxStale: 5 * time.Minute, + + // disable. TODO: find good default + // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/31603 + MaxStreams: 0, + } +} diff --git a/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/doc.go b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/doc.go new file mode 100644 index 000000000..c8f961f6b --- /dev/null +++ b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/doc.go @@ -0,0 +1,8 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//go:generate mdatagen metadata.yaml + +// package deltatocumulativeprocessor implements a processor which +// converts metrics from delta temporality to cumulative. +package deltatocumulativeprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor" diff --git a/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/documentation.md b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/documentation.md new file mode 100644 index 000000000..55d85f06c --- /dev/null +++ b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/documentation.md @@ -0,0 +1,63 @@ +[comment]: <> (Code generated by mdatagen. DO NOT EDIT.) + +# deltatocumulative + +## Internal Telemetry + +The following telemetry is emitted by this component. + +### otelcol_deltatocumulative.datapoints.dropped + +number of datapoints dropped due to given 'reason' + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| {datapoint} | Sum | Int | true | + +### otelcol_deltatocumulative.datapoints.processed + +number of datapoints processed + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| {datapoint} | Sum | Int | true | + +### otelcol_deltatocumulative.gaps.length + +total duration where data was expected but not received + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| s | Sum | Int | true | + +### otelcol_deltatocumulative.streams.evicted + +number of streams evicted + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| {stream} | Sum | Int | true | + +### otelcol_deltatocumulative.streams.limit + +upper limit of tracked streams + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| {stream} | Gauge | Int | + +### otelcol_deltatocumulative.streams.max_stale + +duration after which streams inactive streams are dropped + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| s | Gauge | Int | + +### otelcol_deltatocumulative.streams.tracked + +number of streams tracked + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| {dps} | Sum | Int | false | diff --git a/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/factory.go b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/factory.go new file mode 100644 index 000000000..8a6a39408 --- /dev/null +++ b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/factory.go @@ -0,0 +1,37 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package deltatocumulativeprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor" + +import ( + "context" + "fmt" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/processor" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metadata" +) + +func NewFactory() processor.Factory { + return processor.NewFactory( + metadata.Type, + createDefaultConfig, + processor.WithMetrics(createMetricsProcessor, metadata.MetricsStability), + ) +} + +func createMetricsProcessor(_ context.Context, set processor.Settings, cfg component.Config, next consumer.Metrics) (processor.Metrics, error) { + pcfg, ok := cfg.(*Config) + if !ok { + return nil, fmt.Errorf("configuration parsing error") + } + + telb, err := metadata.NewTelemetryBuilder(set.TelemetrySettings) + if err != nil { + return nil, err + } + + return newProcessor(pcfg, set.Logger, telb, next), nil +} diff --git a/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/add.go b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/add.go new file mode 100644 index 000000000..597f91824 --- /dev/null +++ b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/add.go @@ -0,0 +1,110 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package data // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" + +import ( + "math" + + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/putil/pslice" +) + +func (dp Number) Add(in Number) Number { + switch in.ValueType() { + case pmetric.NumberDataPointValueTypeDouble: + v := dp.DoubleValue() + in.DoubleValue() + dp.SetDoubleValue(v) + case pmetric.NumberDataPointValueTypeInt: + v := dp.IntValue() + in.IntValue() + dp.SetIntValue(v) + } + dp.SetTimestamp(in.Timestamp()) + return dp +} + +func (dp Histogram) Add(in Histogram) Histogram { + // bounds different: no way to merge, so reset observation to new boundaries + if !pslice.Equal(dp.ExplicitBounds(), in.ExplicitBounds()) { + in.MoveTo(dp.HistogramDataPoint) + return dp + } + + // spec requires len(BucketCounts) == len(ExplicitBounds)+1. + // given we have limited error handling at this stage (and already verified boundaries are correct), + // doing a best-effort add of whatever we have appears reasonable. + n := min(dp.BucketCounts().Len(), in.BucketCounts().Len()) + for i := 0; i < n; i++ { + sum := dp.BucketCounts().At(i) + in.BucketCounts().At(i) + dp.BucketCounts().SetAt(i, sum) + } + + dp.SetTimestamp(in.Timestamp()) + dp.SetCount(dp.Count() + in.Count()) + + if dp.HasSum() && in.HasSum() { + dp.SetSum(dp.Sum() + in.Sum()) + } else { + dp.RemoveSum() + } + + if dp.HasMin() && in.HasMin() { + dp.SetMin(math.Min(dp.Min(), in.Min())) + } else { + dp.RemoveMin() + } + + if dp.HasMax() && in.HasMax() { + dp.SetMax(math.Max(dp.Max(), in.Max())) + } else { + dp.RemoveMax() + } + + return dp +} + +func (dp ExpHistogram) Add(in ExpHistogram) ExpHistogram { + type H = ExpHistogram + + if dp.Scale() != in.Scale() { + hi, lo := expo.HiLo(dp, in, H.Scale) + from, to := expo.Scale(hi.Scale()), expo.Scale(lo.Scale()) + expo.Downscale(hi.Positive(), from, to) + expo.Downscale(hi.Negative(), from, to) + hi.SetScale(lo.Scale()) + } + + if dp.ZeroThreshold() != in.ZeroThreshold() { + hi, lo := expo.HiLo(dp, in, H.ZeroThreshold) + expo.WidenZero(lo.DataPoint, hi.ZeroThreshold()) + } + + expo.Merge(dp.Positive(), in.Positive()) + expo.Merge(dp.Negative(), in.Negative()) + + dp.SetTimestamp(in.Timestamp()) + dp.SetCount(dp.Count() + in.Count()) + dp.SetZeroCount(dp.ZeroCount() + in.ZeroCount()) + + if dp.HasSum() && in.HasSum() { + dp.SetSum(dp.Sum() + in.Sum()) + } else { + dp.RemoveSum() + } + + if dp.HasMin() && in.HasMin() { + dp.SetMin(math.Min(dp.Min(), in.Min())) + } else { + dp.RemoveMin() + } + + if dp.HasMax() && in.HasMax() { + dp.SetMax(math.Max(dp.Max(), in.Max())) + } else { + dp.RemoveMax() + } + + return dp +} diff --git a/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/data.go b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/data.go new file mode 100644 index 000000000..e6f7551fd --- /dev/null +++ b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/data.go @@ -0,0 +1,96 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package data // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" + +import ( + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" +) + +type Point[Self any] interface { + StartTimestamp() pcommon.Timestamp + Timestamp() pcommon.Timestamp + Attributes() pcommon.Map + + Clone() Self + CopyTo(Self) + + Add(Self) Self +} + +type Typed[Self any] interface { + Point[Self] + Number | Histogram | ExpHistogram +} + +type Number struct { + pmetric.NumberDataPoint +} + +func Zero[P Typed[P]]() P { + var point P + switch ty := any(&point).(type) { + case *Number: + ty.NumberDataPoint = pmetric.NewNumberDataPoint() + case *Histogram: + ty.HistogramDataPoint = pmetric.NewHistogramDataPoint() + case *ExpHistogram: + ty.DataPoint = pmetric.NewExponentialHistogramDataPoint() + } + return point +} + +func (dp Number) Clone() Number { + clone := Number{NumberDataPoint: pmetric.NewNumberDataPoint()} + if dp.NumberDataPoint != (pmetric.NumberDataPoint{}) { + dp.CopyTo(clone) + } + return clone +} + +func (dp Number) CopyTo(dst Number) { + dp.NumberDataPoint.CopyTo(dst.NumberDataPoint) +} + +type Histogram struct { + pmetric.HistogramDataPoint +} + +func (dp Histogram) Clone() Histogram { + clone := Histogram{HistogramDataPoint: pmetric.NewHistogramDataPoint()} + if dp.HistogramDataPoint != (pmetric.HistogramDataPoint{}) { + dp.CopyTo(clone) + } + return clone +} + +func (dp Histogram) CopyTo(dst Histogram) { + dp.HistogramDataPoint.CopyTo(dst.HistogramDataPoint) +} + +type ExpHistogram struct { + expo.DataPoint +} + +func (dp ExpHistogram) Clone() ExpHistogram { + clone := ExpHistogram{DataPoint: pmetric.NewExponentialHistogramDataPoint()} + if dp.DataPoint != (expo.DataPoint{}) { + dp.CopyTo(clone) + } + return clone +} + +func (dp ExpHistogram) CopyTo(dst ExpHistogram) { + dp.DataPoint.CopyTo(dst.DataPoint) +} + +type mustPoint[D Point[D]] struct{ _ D } + +var ( + _ = mustPoint[Number]{} + _ = mustPoint[Histogram]{} + _ = mustPoint[ExpHistogram]{} +) diff --git a/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expo.go b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expo.go new file mode 100644 index 000000000..2011e3cd8 --- /dev/null +++ b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expo.go @@ -0,0 +1,52 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package expo implements various operations on exponential histograms and their bucket counts +package expo // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" + +import "go.opentelemetry.io/collector/pdata/pmetric" + +type ( + DataPoint = pmetric.ExponentialHistogramDataPoint + Buckets = pmetric.ExponentialHistogramDataPointBuckets +) + +// Abs returns a view into the buckets using an absolute scale +func Abs(bs Buckets) Absolute { + return Absolute{buckets: bs} +} + +type buckets = Buckets + +// Absolute addresses bucket counts using an absolute scale, such that it is +// interoperable with [Scale]. +// +// It spans from [[Absolute.Lower]:[Absolute.Upper]] +// +// NOTE: The zero-value is unusable, use [Abs] to construct +type Absolute struct { + buckets +} + +// Abs returns the value at absolute index 'at' +func (a Absolute) Abs(at int) uint64 { + if i, ok := a.idx(at); ok { + return a.BucketCounts().At(i) + } + return 0 +} + +// Upper returns the minimal index outside the set, such that every i < Upper +func (a Absolute) Upper() int { + return a.BucketCounts().Len() + int(a.Offset()) +} + +// Lower returns the minimal index inside the set, such that every i >= Lower +func (a Absolute) Lower() int { + return int(a.Offset()) +} + +func (a Absolute) idx(at int) (int, bool) { + idx := at - a.Lower() + return idx, idx >= 0 && idx < a.BucketCounts().Len() +} diff --git a/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/merge.go b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/merge.go new file mode 100644 index 000000000..150e29a65 --- /dev/null +++ b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/merge.go @@ -0,0 +1,37 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package expo // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" + +import ( + "go.opentelemetry.io/collector/pdata/pcommon" +) + +// Merge combines the counts of buckets a and b into a. +// Both buckets MUST be of same scale +func Merge(arel, brel Buckets) { + if brel.BucketCounts().Len() == 0 { + return + } + if arel.BucketCounts().Len() == 0 { + brel.CopyTo(arel) + return + } + + a, b := Abs(arel), Abs(brel) + + lo := min(a.Lower(), b.Lower()) + up := max(a.Upper(), b.Upper()) + + size := up - lo + + counts := pcommon.NewUInt64Slice() + counts.Append(make([]uint64, size-counts.Len())...) + + for i := 0; i < counts.Len(); i++ { + counts.SetAt(i, a.Abs(lo+i)+b.Abs(lo+i)) + } + + a.SetOffset(int32(lo)) + counts.MoveTo(a.BucketCounts()) +} diff --git a/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/ord.go b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/ord.go new file mode 100644 index 000000000..34d177be1 --- /dev/null +++ b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/ord.go @@ -0,0 +1,16 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package expo // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" + +import "cmp" + +// HiLo returns the greater of a and b by comparing the result of applying fn to +// each. If equal, returns operands as passed +func HiLo[T any, N cmp.Ordered](a, b T, fn func(T) N) (hi, lo T) { + an, bn := fn(a), fn(b) + if cmp.Less(an, bn) { + return b, a + } + return a, b +} diff --git a/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/scale.go b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/scale.go new file mode 100644 index 000000000..5201806fb --- /dev/null +++ b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/scale.go @@ -0,0 +1,115 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package expo // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" + +import ( + "fmt" + "math" +) + +type Scale int32 + +// Idx gives the bucket index v belongs into +func (scale Scale) Idx(v float64) int { + // from: https://opentelemetry.io/docs/specs/otel/metrics/data-model/#all-scales-use-the-logarithm-function + + // Special case for power-of-two values. + if frac, exp := math.Frexp(v); frac == 0.5 { + return ((exp - 1) << scale) - 1 + } + + scaleFactor := math.Ldexp(math.Log2E, int(scale)) + // Note: math.Floor(value) equals math.Ceil(value)-1 when value + // is not a power of two, which is checked above. + return int(math.Floor(math.Log(v) * scaleFactor)) +} + +// Bounds returns the half-open interval (min,max] of the bucket at index. +// This means a value min < v <= max belongs to this bucket. +// +// NOTE: this is different from Go slice intervals, which are [a,b) +func (scale Scale) Bounds(index int) (min, max float64) { + // from: https://opentelemetry.io/docs/specs/otel/metrics/data-model/#all-scales-use-the-logarithm-function + lower := func(index int) float64 { + inverseFactor := math.Ldexp(math.Ln2, int(-scale)) + return math.Exp(float64(index) * inverseFactor) + } + + return lower(index), lower(index + 1) +} + +// Downscale collapses the buckets of bs until scale 'to' is reached +func Downscale(bs Buckets, from, to Scale) { + switch { + case from == to: + return + case from < to: + // because even distribution within the buckets cannot be assumed, it is + // not possible to correctly upscale (split) buckets. + // any attempt to do so would yield erronous data. + panic(fmt.Sprintf("cannot upscale without introducing error (%d -> %d)", from, to)) + } + + for at := from; at > to; at-- { + Collapse(bs) + } +} + +// Collapse merges adjacent buckets and zeros the remaining area: +// +// before: 1 1 1 1 1 1 1 1 1 1 1 1 +// after: 2 2 2 2 2 2 0 0 0 0 0 0 +// +// Due to the "perfect subsetting" property of exponential histograms, this +// gives the same observation as before, but recorded at scale-1. See +// https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponential-scale. +// +// Because every bucket now spans twice as much range, half of the allocated +// counts slice is technically no longer required. It is zeroed but left in +// place to avoid future allocations, because observations may happen in that +// area at a later time. +func Collapse(bs Buckets) { + counts := bs.BucketCounts() + size := counts.Len() / 2 + if counts.Len()%2 != 0 || bs.Offset()%2 != 0 { + size++ + } + + // merging needs to happen in pairs aligned to i=0. if offset is non-even, + // we need to shift the whole merging by one to make above condition true. + shift := 0 + if bs.Offset()%2 != 0 { + bs.SetOffset(bs.Offset() - 1) + shift-- + } + bs.SetOffset(bs.Offset() / 2) + + for i := 0; i < size; i++ { + // size is ~half of len. we add two buckets per iteration. + // k jumps in steps of 2, shifted if offset makes this necessary. + k := i*2 + shift + + // special case: we just started and had to shift. the left half of the + // new bucket is not actually stored, so only use counts[0]. + if i == 0 && k == -1 { + counts.SetAt(i, counts.At(k+1)) + continue + } + + // new[k] = old[k]+old[k+1] + counts.SetAt(i, counts.At(k)) + if k+1 < counts.Len() { + counts.SetAt(i, counts.At(k)+counts.At(k+1)) + } + } + + // zero the excess area. its not needed to represent the observation + // anymore, but kept for two reasons: + // 1. future observations may need it, no need to re-alloc then if kept + // 2. [pcommon.Uint64Slice] can not, in fact, be sliced, so getting rid + // of it would alloc ¯\_(ツ)_/¯ + for i := size; i < counts.Len(); i++ { + counts.SetAt(i, 0) + } +} diff --git a/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/zero.go b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/zero.go new file mode 100644 index 000000000..2d5401b39 --- /dev/null +++ b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/zero.go @@ -0,0 +1,68 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package expo // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" + +import ( + "cmp" + "fmt" +) + +// WidenZero widens the zero-bucket to span at least [-width,width], possibly wider +// if min falls in the middle of a bucket. +// +// Both buckets counts MUST be of same scale. +func WidenZero(dp DataPoint, width float64) { + switch { + case width == dp.ZeroThreshold(): + return + case width < dp.ZeroThreshold(): + panic(fmt.Sprintf("min must be larger than current threshold (%f)", dp.ZeroThreshold())) + } + + scale := Scale(dp.Scale()) + zero := scale.Idx(width) // the largest bucket index inside the zero width + + widen := func(bs Buckets) { + abs := Abs(bs) + for i := abs.Lower(); i <= zero; i++ { + dp.SetZeroCount(dp.ZeroCount() + abs.Abs(i)) + } + + // right next to the new zero bucket, constrained to slice range + lo := clamp(zero+1, abs.Lower(), abs.Upper()) + abs.Slice(lo, abs.Upper()) + } + + widen(dp.Positive()) + widen(dp.Negative()) + + _, max := scale.Bounds(zero) + dp.SetZeroThreshold(max) +} + +// Slice drops data outside the range from <= i < to from the bucket counts. It behaves the same as Go's [a:b] +// +// Limitations: +// - due to a limitation of the pcommon package, slicing cannot happen in-place and allocates +// - in consequence, data outside the range is garbage collected +func (a Absolute) Slice(from, to int) { + lo, up := a.Lower(), a.Upper() + switch { + case from > to: + panic(fmt.Sprintf("bad bounds: must be from<=to (got %d<=%d)", from, to)) + case from < lo || to > up: + panic(fmt.Sprintf("%d:%d is out of bounds for %d:%d", from, to, lo, up)) + } + + first := from - lo + last := to - lo + + a.BucketCounts().FromRaw(a.BucketCounts().AsRaw()[first:last]) + a.SetOffset(int32(from)) +} + +// clamp constraints v to the range up..=lo +func clamp[N cmp.Ordered](v, lo, up N) N { + return max(lo, min(v, up)) +} diff --git a/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/delta/delta.go b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/delta/delta.go new file mode 100644 index 000000000..5539eb8c8 --- /dev/null +++ b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/delta/delta.go @@ -0,0 +1,84 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package delta // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/delta" + +import ( + "fmt" + + "go.opentelemetry.io/collector/pdata/pcommon" + + exp "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" +) + +func New[D data.Point[D]]() Accumulator[D] { + return Accumulator[D]{ + Map: make(exp.HashMap[D]), + } +} + +var _ streams.Map[data.Number] = (*Accumulator[data.Number])(nil) + +type Accumulator[D data.Point[D]] struct { + streams.Map[D] +} + +func (a Accumulator[D]) Store(id streams.Ident, dp D) error { + aggr, ok := a.Map.Load(id) + + // new series: initialize with current sample + if !ok { + clone := dp.Clone() + return a.Map.Store(id, clone) + } + + // drop bad samples + switch { + case dp.StartTimestamp() < aggr.StartTimestamp(): + // belongs to older series + return ErrOlderStart{Start: aggr.StartTimestamp(), Sample: dp.StartTimestamp()} + case dp.Timestamp() <= aggr.Timestamp(): + // out of order + return ErrOutOfOrder{Last: aggr.Timestamp(), Sample: dp.Timestamp()} + } + + // detect gaps + var gap error + if dp.StartTimestamp() > aggr.Timestamp() { + gap = ErrGap{From: aggr.Timestamp(), To: dp.StartTimestamp()} + } + + res := aggr.Add(dp) + if err := a.Map.Store(id, res); err != nil { + return err + } + return gap +} + +type ErrOlderStart struct { + Start pcommon.Timestamp + Sample pcommon.Timestamp +} + +func (e ErrOlderStart) Error() string { + return fmt.Sprintf("dropped sample with start_time=%s, because series only starts at start_time=%s. consider checking for multiple processes sending the exact same series", e.Sample, e.Start) +} + +type ErrOutOfOrder struct { + Last pcommon.Timestamp + Sample pcommon.Timestamp +} + +func (e ErrOutOfOrder) Error() string { + return fmt.Sprintf("out of order: dropped sample from time=%s, because series is already at time=%s", e.Sample, e.Last) +} + +type ErrGap struct { + From, To pcommon.Timestamp +} + +func (e ErrGap) Error() string { + return fmt.Sprintf("gap in stream from %s to %s. samples were likely lost in transit", e.From, e.To) +} diff --git a/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/maybe/ptr.go b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/maybe/ptr.go new file mode 100644 index 000000000..8f40b8d27 --- /dev/null +++ b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/maybe/ptr.go @@ -0,0 +1,52 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// maybe provides utilities for representing data may or may not exist at +// runtime in a safe way. +// +// A typical approach to this are pointers, but they suffer from two issues: +// - Unsafety: permitting nil pointers must require careful checking on each use, +// which is easily forgotten +// - Blindness: nil itself does cannot differentiate between "set to nil" and +// "not set all", leading to unexepcted edge cases +// +// The [Ptr] type of this package provides a safe alternative with a clear +// distinction between "not set" and "set to nil". +package maybe // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/maybe" + +// Ptr references some value of type T that is not guaranteed to exist. +// Callers must use [Ptr.Try] to access the underlying value, checking the +// ok return value too. +// This provides a clear distinction between "not set" and "set to nil". +// +// Use [Some] and [None] to create Ptrs. +type Ptr[T any] struct { + to *T + ok bool +} + +// None returns a Ptr that represents "not-set". +// This is equal to a zero-value Ptr. +func None[T any]() Ptr[T] { + return Ptr[T]{to: nil, ok: false} +} + +// Some returns a pointer to the passed T. +// +// The ptr argument may be nil, in which case this represents "explicitly set to +// nil". +func Some[T any](ptr *T) Ptr[T] { + return Ptr[T]{to: ptr, ok: true} +} + +// Try attempts to de-reference the Ptr, giving one of three results: +// +// - nil, false: not-set +// - nil, true: explicitly set to nil +// - non-nil, true: set to some value +// +// This provides extra safety over bare pointers, because callers are forced by +// the compiler to either check or explicitly ignore the ok value. +func (ptr Ptr[T]) Try() (_ *T, ok bool) { + return ptr.to, ptr.ok +} diff --git a/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metadata/generated_status.go b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metadata/generated_status.go new file mode 100644 index 000000000..25a4f5a32 --- /dev/null +++ b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metadata/generated_status.go @@ -0,0 +1,16 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "go.opentelemetry.io/collector/component" +) + +var ( + Type = component.MustNewType("deltatocumulative") + ScopeName = "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor" +) + +const ( + MetricsStability = component.StabilityLevelAlpha +) diff --git a/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry.go b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry.go new file mode 100644 index 000000000..caf9cd16d --- /dev/null +++ b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry.go @@ -0,0 +1,105 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "errors" + + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/trace" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configtelemetry" +) + +// Deprecated: [v0.108.0] use LeveledMeter instead. +func Meter(settings component.TelemetrySettings) metric.Meter { + return settings.MeterProvider.Meter("github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor") +} + +func LeveledMeter(settings component.TelemetrySettings, level configtelemetry.Level) metric.Meter { + return settings.LeveledMeterProvider(level).Meter("github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor") +} + +func Tracer(settings component.TelemetrySettings) trace.Tracer { + return settings.TracerProvider.Tracer("github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor") +} + +// TelemetryBuilder provides an interface for components to report telemetry +// as defined in metadata and user config. +type TelemetryBuilder struct { + meter metric.Meter + DeltatocumulativeDatapointsDropped metric.Int64Counter + DeltatocumulativeDatapointsProcessed metric.Int64Counter + DeltatocumulativeGapsLength metric.Int64Counter + DeltatocumulativeStreamsEvicted metric.Int64Counter + DeltatocumulativeStreamsLimit metric.Int64Gauge + DeltatocumulativeStreamsMaxStale metric.Int64Gauge + DeltatocumulativeStreamsTracked metric.Int64UpDownCounter + meters map[configtelemetry.Level]metric.Meter +} + +// TelemetryBuilderOption applies changes to default builder. +type TelemetryBuilderOption interface { + apply(*TelemetryBuilder) +} + +type telemetryBuilderOptionFunc func(mb *TelemetryBuilder) + +func (tbof telemetryBuilderOptionFunc) apply(mb *TelemetryBuilder) { + tbof(mb) +} + +// NewTelemetryBuilder provides a struct with methods to update all internal telemetry +// for a component +func NewTelemetryBuilder(settings component.TelemetrySettings, options ...TelemetryBuilderOption) (*TelemetryBuilder, error) { + builder := TelemetryBuilder{meters: map[configtelemetry.Level]metric.Meter{}} + for _, op := range options { + op.apply(&builder) + } + builder.meters[configtelemetry.LevelBasic] = LeveledMeter(settings, configtelemetry.LevelBasic) + var err, errs error + builder.DeltatocumulativeDatapointsDropped, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( + "otelcol_deltatocumulative.datapoints.dropped", + metric.WithDescription("number of datapoints dropped due to given 'reason'"), + metric.WithUnit("{datapoint}"), + ) + errs = errors.Join(errs, err) + builder.DeltatocumulativeDatapointsProcessed, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( + "otelcol_deltatocumulative.datapoints.processed", + metric.WithDescription("number of datapoints processed"), + metric.WithUnit("{datapoint}"), + ) + errs = errors.Join(errs, err) + builder.DeltatocumulativeGapsLength, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( + "otelcol_deltatocumulative.gaps.length", + metric.WithDescription("total duration where data was expected but not received"), + metric.WithUnit("s"), + ) + errs = errors.Join(errs, err) + builder.DeltatocumulativeStreamsEvicted, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( + "otelcol_deltatocumulative.streams.evicted", + metric.WithDescription("number of streams evicted"), + metric.WithUnit("{stream}"), + ) + errs = errors.Join(errs, err) + builder.DeltatocumulativeStreamsLimit, err = builder.meters[configtelemetry.LevelBasic].Int64Gauge( + "otelcol_deltatocumulative.streams.limit", + metric.WithDescription("upper limit of tracked streams"), + metric.WithUnit("{stream}"), + ) + errs = errors.Join(errs, err) + builder.DeltatocumulativeStreamsMaxStale, err = builder.meters[configtelemetry.LevelBasic].Int64Gauge( + "otelcol_deltatocumulative.streams.max_stale", + metric.WithDescription("duration after which streams inactive streams are dropped"), + metric.WithUnit("s"), + ) + errs = errors.Join(errs, err) + builder.DeltatocumulativeStreamsTracked, err = builder.meters[configtelemetry.LevelBasic].Int64UpDownCounter( + "otelcol_deltatocumulative.streams.tracked", + metric.WithDescription("number of streams tracked"), + metric.WithUnit("{dps}"), + ) + errs = errors.Join(errs, err) + return &builder, errs +} diff --git a/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics/data.go b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics/data.go new file mode 100644 index 000000000..0475ba2d4 --- /dev/null +++ b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics/data.go @@ -0,0 +1,79 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package metrics // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics" + +import ( + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" +) + +type Data[D data.Point[D]] interface { + At(i int) D + Len() int + Ident() Ident +} + +type Sum Metric + +func (s Sum) At(i int) data.Number { + dp := Metric(s).Sum().DataPoints().At(i) + return data.Number{NumberDataPoint: dp} +} + +func (s Sum) Len() int { + return Metric(s).Sum().DataPoints().Len() +} + +func (s Sum) Ident() Ident { + return (*Metric)(&s).Ident() +} + +func (s Sum) Filter(expr func(data.Number) bool) { + s.Sum().DataPoints().RemoveIf(func(dp pmetric.NumberDataPoint) bool { + return !expr(data.Number{NumberDataPoint: dp}) + }) +} + +type Histogram Metric + +func (s Histogram) At(i int) data.Histogram { + dp := Metric(s).Histogram().DataPoints().At(i) + return data.Histogram{HistogramDataPoint: dp} +} + +func (s Histogram) Len() int { + return Metric(s).Histogram().DataPoints().Len() +} + +func (s Histogram) Ident() Ident { + return (*Metric)(&s).Ident() +} + +func (s Histogram) Filter(expr func(data.Histogram) bool) { + s.Histogram().DataPoints().RemoveIf(func(dp pmetric.HistogramDataPoint) bool { + return !expr(data.Histogram{HistogramDataPoint: dp}) + }) +} + +type ExpHistogram Metric + +func (s ExpHistogram) At(i int) data.ExpHistogram { + dp := Metric(s).ExponentialHistogram().DataPoints().At(i) + return data.ExpHistogram{DataPoint: dp} +} + +func (s ExpHistogram) Len() int { + return Metric(s).ExponentialHistogram().DataPoints().Len() +} + +func (s ExpHistogram) Ident() Ident { + return (*Metric)(&s).Ident() +} + +func (s ExpHistogram) Filter(expr func(data.ExpHistogram) bool) { + s.ExponentialHistogram().DataPoints().RemoveIf(func(dp pmetric.ExponentialHistogramDataPoint) bool { + return !expr(data.ExpHistogram{DataPoint: dp}) + }) +} diff --git a/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics/iter.go b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics/iter.go new file mode 100644 index 000000000..9902d22a2 --- /dev/null +++ b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics/iter.go @@ -0,0 +1,38 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package metrics // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics" + +import ( + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/putil/pslice" +) + +func All(md pmetric.Metrics) func(func(Metric) bool) { + return func(yield func(Metric) bool) { + var ok bool + pslice.All(md.ResourceMetrics())(func(rm pmetric.ResourceMetrics) bool { + pslice.All(rm.ScopeMetrics())(func(sm pmetric.ScopeMetrics) bool { + pslice.All(sm.Metrics())(func(m pmetric.Metric) bool { + ok = yield(From(rm.Resource(), sm.Scope(), m)) + return ok + }) + return ok + }) + return ok + }) + } +} + +func Filter(md pmetric.Metrics, keep func(Metric) bool) { + md.ResourceMetrics().RemoveIf(func(rm pmetric.ResourceMetrics) bool { + rm.ScopeMetrics().RemoveIf(func(sm pmetric.ScopeMetrics) bool { + sm.Metrics().RemoveIf(func(m pmetric.Metric) bool { + return !keep(From(rm.Resource(), sm.Scope(), m)) + }) + return sm.Metrics().Len() == 0 + }) + return rm.ScopeMetrics().Len() == 0 + }) +} diff --git a/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics/metrics.go b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics/metrics.go new file mode 100644 index 000000000..50c802c70 --- /dev/null +++ b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics/metrics.go @@ -0,0 +1,35 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package metrics // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics" + +import ( + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" +) + +type Ident = identity.Metric + +type Metric struct { + res pcommon.Resource + scope pcommon.InstrumentationScope + pmetric.Metric +} + +func (m *Metric) Ident() Ident { + return identity.OfResourceMetric(m.res, m.scope, m.Metric) +} + +func (m *Metric) Resource() pcommon.Resource { + return m.res +} + +func (m *Metric) Scope() pcommon.InstrumentationScope { + return m.scope +} + +func From(res pcommon.Resource, scope pcommon.InstrumentationScope, metric pmetric.Metric) Metric { + return Metric{res: res, scope: scope, Metric: metric} +} diff --git a/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/putil/pslice/pslice.go b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/putil/pslice/pslice.go new file mode 100644 index 000000000..6cc97af04 --- /dev/null +++ b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/putil/pslice/pslice.go @@ -0,0 +1,31 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package pslice // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/putil/pslice" + +type Slice[E any] interface { + At(int) E + Len() int +} + +func Equal[E comparable, S Slice[E]](a, b S) bool { + if a.Len() != b.Len() { + return false + } + for i := 0; i < a.Len(); i++ { + if a.At(i) != b.At(i) { + return false + } + } + return true +} + +func All[E any, S Slice[E]](slice S) func(func(E) bool) { + return func(yield func(E) bool) { + for i := 0; i < slice.Len(); i++ { + if !yield(slice.At(i)) { + break + } + } + } +} diff --git a/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams/data.go b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams/data.go new file mode 100644 index 000000000..532b4b828 --- /dev/null +++ b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams/data.go @@ -0,0 +1,55 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package streams // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" + +import ( + "errors" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/putil/pslice" +) + +func Datapoints[P data.Point[P], List metrics.Data[P]](dps List) func(func(identity.Stream, P) bool) { + return func(yield func(identity.Stream, P) bool) { + mid := dps.Ident() + pslice.All(dps)(func(dp P) bool { + id := identity.OfStream(mid, dp) + return yield(id, dp) + }) + } +} + +type filterable[D data.Point[D]] interface { + metrics.Data[D] + Filter(func(D) bool) +} + +// Apply does dps[i] = fn(dps[i]) for each item in dps. +// If fn returns [streams.Drop], the datapoint is removed from dps instead. +// If fn returns another error, the datapoint is also removed and the error returned eventually +func Apply[P data.Point[P], List filterable[P]](dps List, fn func(Ident, P) (P, error)) error { + var errs error + + mid := dps.Ident() + dps.Filter(func(dp P) bool { + id := identity.OfStream(mid, dp) + next, err := fn(id, dp) + if err != nil { + if !errors.Is(err, Drop) { + errs = errors.Join(errs, err) + } + return false + } + + next.CopyTo(dp) + return true + }) + + return errs +} + +// Drop signals the current item (stream or datapoint) is to be dropped +var Drop = errors.New("stream dropped") //nolint:revive // Drop is a good name for a signal, see fs.SkipAll diff --git a/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams/errors.go b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams/errors.go new file mode 100644 index 000000000..c0638e091 --- /dev/null +++ b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams/errors.go @@ -0,0 +1,25 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package streams // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" + +import ( + "fmt" +) + +func Error(id Ident, err error) error { + return StreamErr{Ident: id, Err: err} +} + +type StreamErr struct { + Ident Ident + Err error +} + +func (e StreamErr) Error() string { + return fmt.Sprintf("%s: %s", e.Ident, e.Err) +} + +func (e StreamErr) Unwrap() error { + return e.Err +} diff --git a/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams/limit.go b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams/limit.go new file mode 100644 index 000000000..dd1d92768 --- /dev/null +++ b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams/limit.go @@ -0,0 +1,78 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package streams // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" + +import ( + "errors" + "fmt" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams" +) + +func Limit[T any](m Map[T], max int) LimitMap[T] { + return LimitMap[T]{ + Map: m, Max: max, + Evictor: EvictorFunc(func() (identity.Stream, bool) { + return identity.Stream{}, false + }), + } +} + +type LimitMap[T any] struct { + Max int + + Evictor streams.Evictor + streams.Map[T] +} + +func (m LimitMap[T]) Store(id identity.Stream, v T) error { + _, exist := m.Map.Load(id) + + var errEv error + // if not already tracked and no space: try to evict + if !exist && m.Map.Len() >= m.Max { + errl := ErrLimit(m.Max) + gone, ok := m.Evictor.Evict() + if !ok { + // if no eviction possible, fail as there is no space + return errl + } + errEv = ErrEvicted{ErrLimit: errl, Ident: gone} + } + + // there was space, or we made space: store it + if err := m.Map.Store(id, v); err != nil { + return err + } + + // we may have evicted something, let the caller know + return errEv +} + +type ErrLimit int + +func (e ErrLimit) Error() string { + return fmt.Sprintf("stream limit of %d reached", e) +} + +func AtLimit(err error) bool { + var errLimit ErrLimit + return errors.As(err, &errLimit) +} + +type ErrEvicted struct { + ErrLimit + Ident Ident +} + +func (e ErrEvicted) Error() string { + return fmt.Sprintf("%s. evicted stream %s", e.ErrLimit, e.Ident) +} + +type EvictorFunc func() (identity.Stream, bool) + +func (ev EvictorFunc) Evict() (identity.Stream, bool) { + return ev() +} diff --git a/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams/streams.go b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams/streams.go new file mode 100644 index 000000000..1b34f806b --- /dev/null +++ b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams/streams.go @@ -0,0 +1,37 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package streams // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" + +import ( + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" +) + +type Ident = identity.Stream + +type ( + Seq[T any] streams.Seq[T] + Map[T any] streams.Map[T] +) + +type Aggregator[D data.Point[D]] interface { + Aggregate(Ident, D) (D, error) +} + +func IntoAggregator[D data.Point[D]](m Map[D]) MapAggr[D] { + return MapAggr[D]{Map: m} +} + +type MapAggr[D data.Point[D]] struct { + Map[D] +} + +func (a MapAggr[D]) Aggregate(id Ident, dp D) (D, error) { + err := a.Map.Store(id, dp) + v, _ := a.Map.Load(id) + return v, err +} + +type Evictor = streams.Evictor diff --git a/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go new file mode 100644 index 000000000..8062fc838 --- /dev/null +++ b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go @@ -0,0 +1,161 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package telemetry // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/telemetry" + +import ( + "context" + "errors" + "time" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/delta" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metadata" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" +) + +type Telemetry struct { + Metrics +} + +func New(telb *metadata.TelemetryBuilder) Telemetry { + return Telemetry{Metrics: Metrics{ + streams: Streams{ + tracked: telb.DeltatocumulativeStreamsTracked, + limit: telb.DeltatocumulativeStreamsLimit, + evicted: telb.DeltatocumulativeStreamsEvicted, + stale: telb.DeltatocumulativeStreamsMaxStale, + }, + dps: Datapoints{ + total: telb.DeltatocumulativeDatapointsProcessed, + dropped: telb.DeltatocumulativeDatapointsDropped, + }, + gaps: telb.DeltatocumulativeGapsLength, + }} +} + +type Streams struct { + tracked metric.Int64UpDownCounter + limit metric.Int64Gauge + evicted metric.Int64Counter + stale metric.Int64Gauge +} + +type Datapoints struct { + total metric.Int64Counter + dropped metric.Int64Counter +} + +type Metrics struct { + streams Streams + dps Datapoints + + gaps metric.Int64Counter +} + +func (tel Telemetry) WithLimit(max int64) { + tel.streams.limit.Record(context.Background(), max) +} + +func (tel Telemetry) WithStale(max time.Duration) { + tel.streams.stale.Record(context.Background(), int64(max.Seconds())) +} + +func ObserveItems[T any](items streams.Map[T], metrics *Metrics) Items[T] { + return Items[T]{ + Map: items, + Metrics: metrics, + } +} + +func ObserveNonFatal[T any](items streams.Map[T], metrics *Metrics) Faults[T] { + return Faults[T]{ + Map: items, + Metrics: metrics, + } +} + +type Items[T any] struct { + streams.Map[T] + *Metrics +} + +func (i Items[T]) Store(id streams.Ident, v T) error { + inc(i.dps.total) + + _, old := i.Map.Load(id) + err := i.Map.Store(id, v) + if err == nil && !old { + inc(i.streams.tracked) + } + + return err +} + +func (i Items[T]) Delete(id streams.Ident) { + dec(i.streams.tracked) + i.Map.Delete(id) +} + +type Faults[T any] struct { + streams.Map[T] + *Metrics +} + +func (f Faults[T]) Store(id streams.Ident, v T) error { + var ( + olderStart delta.ErrOlderStart + outOfOrder delta.ErrOutOfOrder + gap delta.ErrGap + limit streams.ErrLimit + evict streams.ErrEvicted + ) + + err := f.Map.Store(id, v) + switch { + default: + return err + case errors.As(err, &olderStart): + inc(f.dps.dropped, reason("older-start")) + return streams.Drop + case errors.As(err, &outOfOrder): + inc(f.dps.dropped, reason("out-of-order")) + return streams.Drop + case errors.As(err, &limit): + inc(f.dps.dropped, reason("stream-limit")) + // no space to store stream, drop it instead of failing silently + return streams.Drop + case errors.As(err, &evict): + inc(f.streams.evicted) + case errors.As(err, &gap): + from := gap.From.AsTime() + to := gap.To.AsTime() + lost := to.Sub(from).Seconds() + f.gaps.Add(context.TODO(), int64(lost)) + } + + return nil +} + +var ( + _ streams.Map[any] = (*Items[any])(nil) + _ streams.Map[any] = (*Faults[any])(nil) +) + +type addable[Opts any] interface { + Add(context.Context, int64, ...Opts) +} + +func inc[A addable[O], O any](a A, opts ...O) { + a.Add(context.Background(), 1, opts...) +} + +func dec[A addable[O], O any](a A, opts ...O) { + a.Add(context.Background(), -1, opts...) +} + +func reason(reason string) metric.AddOption { + return metric.WithAttributes(attribute.String("reason", reason)) +} diff --git a/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/metadata.yaml b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/metadata.yaml new file mode 100644 index 000000000..552c812e1 --- /dev/null +++ b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/metadata.yaml @@ -0,0 +1,63 @@ +type: deltatocumulative + +status: + class: processor + stability: + alpha: [metrics] + distributions: [contrib] + warnings: [Statefulness] + codeowners: + active: [sh0rez, RichieSams, jpkrohling] + + +telemetry: + metrics: + # streams + deltatocumulative.streams.tracked: + description: number of streams tracked + unit: "{dps}" + sum: + value_type: int + monotonic: false + enabled: true + deltatocumulative.streams.limit: + description: upper limit of tracked streams + unit: "{stream}" + gauge: + value_type: int + enabled: true + deltatocumulative.streams.evicted: + description: number of streams evicted + unit: "{stream}" + sum: + value_type: int + monotonic: true + enabled: true + deltatocumulative.streams.max_stale: + description: duration after which streams inactive streams are dropped + unit: "s" + gauge: + value_type: int + enabled: true + # datapoints + deltatocumulative.datapoints.processed: + description: number of datapoints processed + unit: "{datapoint}" + sum: + value_type: int + monotonic: true + enabled: true + deltatocumulative.datapoints.dropped: + description: number of datapoints dropped due to given 'reason' + unit: "{datapoint}" + sum: + value_type: int + monotonic: true + enabled: true + deltatocumulative.gaps.length: + description: total duration where data was expected but not received + unit: "s" + sum: + value_type: int + monotonic: true + enabled: true diff --git a/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/processor.go b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/processor.go new file mode 100644 index 000000000..219f657df --- /dev/null +++ b/vendor/github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/processor.go @@ -0,0 +1,182 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package deltatocumulativeprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor" + +import ( + "context" + "errors" + "sync" + "time" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/processor" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/staleness" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/delta" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/maybe" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metadata" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/telemetry" +) + +var _ processor.Metrics = (*Processor)(nil) + +type Processor struct { + next consumer.Metrics + + log *zap.Logger + ctx context.Context + cancel context.CancelFunc + + sums Pipeline[data.Number] + expo Pipeline[data.ExpHistogram] + hist Pipeline[data.Histogram] + + mtx sync.Mutex +} + +func newProcessor(cfg *Config, log *zap.Logger, telb *metadata.TelemetryBuilder, next consumer.Metrics) *Processor { + ctx, cancel := context.WithCancel(context.Background()) + + tel := telemetry.New(telb) + proc := Processor{ + log: log, + ctx: ctx, + cancel: cancel, + next: next, + + sums: pipeline[data.Number](cfg, &tel), + expo: pipeline[data.ExpHistogram](cfg, &tel), + hist: pipeline[data.Histogram](cfg, &tel), + } + + return &proc +} + +type Pipeline[D data.Point[D]] struct { + aggr streams.Aggregator[D] + stale maybe.Ptr[staleness.Staleness[D]] +} + +func pipeline[D data.Point[D]](cfg *Config, tel *telemetry.Telemetry) Pipeline[D] { + var pipe Pipeline[D] + + var dps streams.Map[D] + dps = delta.New[D]() + dps = telemetry.ObserveItems(dps, &tel.Metrics) + + if cfg.MaxStale > 0 { + tel.WithStale(cfg.MaxStale) + stale := maybe.Some(staleness.NewStaleness(cfg.MaxStale, dps)) + pipe.stale = stale + dps, _ = stale.Try() + } + if cfg.MaxStreams > 0 { + tel.WithLimit(int64(cfg.MaxStreams)) + lim := streams.Limit(dps, cfg.MaxStreams) + if stale, ok := pipe.stale.Try(); ok { + lim.Evictor = stale + } + dps = lim + } + + dps = telemetry.ObserveNonFatal(dps, &tel.Metrics) + + pipe.aggr = streams.IntoAggregator(dps) + return pipe +} + +func (p *Processor) Start(_ context.Context, _ component.Host) error { + sums, sok := p.sums.stale.Try() + expo, eok := p.expo.stale.Try() + hist, hok := p.hist.stale.Try() + if !(sok && eok && hok) { + return nil + } + + go func() { + tick := time.NewTicker(time.Minute) + for { + select { + case <-p.ctx.Done(): + return + case <-tick.C: + p.mtx.Lock() + sums.ExpireOldEntries() + expo.ExpireOldEntries() + hist.ExpireOldEntries() + p.mtx.Unlock() + } + } + }() + return nil +} + +func (p *Processor) Shutdown(_ context.Context) error { + p.cancel() + return nil +} + +func (p *Processor) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: true} +} + +func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { + if err := context.Cause(p.ctx); err != nil { + return err + } + + p.mtx.Lock() + defer p.mtx.Unlock() + + var errs error + metrics.Filter(md, func(m metrics.Metric) bool { + var n int + //exhaustive:enforce + switch m.Type() { + case pmetric.MetricTypeGauge: + n = m.Gauge().DataPoints().Len() + case pmetric.MetricTypeSum: + sum := m.Sum() + if sum.AggregationTemporality() == pmetric.AggregationTemporalityDelta { + err := streams.Apply(metrics.Sum(m), p.sums.aggr.Aggregate) + errs = errors.Join(errs, err) + sum.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + } + n = sum.DataPoints().Len() + case pmetric.MetricTypeHistogram: + hist := m.Histogram() + if hist.AggregationTemporality() == pmetric.AggregationTemporalityDelta { + err := streams.Apply(metrics.Histogram(m), p.hist.aggr.Aggregate) + errs = errors.Join(errs, err) + hist.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + } + n = hist.DataPoints().Len() + case pmetric.MetricTypeExponentialHistogram: + expo := m.ExponentialHistogram() + if expo.AggregationTemporality() == pmetric.AggregationTemporalityDelta { + err := streams.Apply(metrics.ExpHistogram(m), p.expo.aggr.Aggregate) + errs = errors.Join(errs, err) + expo.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + } + n = expo.DataPoints().Len() + case pmetric.MetricTypeSummary: + n = m.Summary().DataPoints().Len() + } + return n > 0 + }) + if errs != nil { + return errs + } + + if md.MetricCount() == 0 { + return nil + } + return p.next.ConsumeMetrics(ctx, md) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index ebd6512ad..f6ff90efc 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -770,6 +770,11 @@ github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/ # github.com/open-telemetry/opentelemetry-collector-contrib/internal/docker v0.110.0 ## explicit; go 1.22.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/docker +# github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.110.0 +## explicit; go 1.22.0 +github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity +github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/staleness +github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams # github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter v0.110.0 ## explicit; go 1.22.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/expr @@ -924,6 +929,18 @@ github.com/open-telemetry/opentelemetry-collector-contrib/pkg/winperfcounters/in ## explicit; go 1.22.0 github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributesprocessor github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributesprocessor/internal/metadata +# github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor v0.110.0 +## explicit; go 1.22.0 +github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor +github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data +github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo +github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/delta +github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/maybe +github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metadata +github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics +github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/putil/pslice +github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams +github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/telemetry # github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor v0.110.0 ## explicit; go 1.22.0 github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor