Skip to content
This repository was archived by the owner on Sep 17, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 25 additions & 5 deletions opencensus/stats/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import logging

from opencensus.stats import bucket_boundaries
from opencensus.stats import aggregation_data


logger = logging.getLogger(__name__)


class Type(object):
""" The type of aggregation function used on a View.

Expand Down Expand Up @@ -123,11 +128,26 @@ class DistributionAggregation(BaseAggregation):
:param aggregation_type: represents the type of this aggregation

"""
def __init__(
self,
boundaries=None,
distribution=None,
aggregation_type=Type.DISTRIBUTION):

def __init__(self,
boundaries=None,
distribution=None,
aggregation_type=Type.DISTRIBUTION):
if boundaries:
if not all(boundaries[ii] < boundaries[ii + 1]
for ii in range(len(boundaries) - 1)):
raise ValueError("bounds must be sorted in increasing order")
for ii, bb in enumerate(boundaries):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could do the above check during this loop too, and then we only have to iterate through the list of boundaries once?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This loop should normally break on the first item, I think the slight readability improvement is worth it here.

if bb > 0:
break
else:
ii += 1
if ii:
logger.warning("Dropping {} negative bucket boundaries, the "
"values must be strictly > 0"
.format(ii))
boundaries = boundaries[ii:]

super(DistributionAggregation, self).__init__(
buckets=boundaries, aggregation_type=aggregation_type)
self._boundaries = bucket_boundaries.BucketBoundaries(boundaries)
Expand Down
16 changes: 13 additions & 3 deletions opencensus/stats/aggregation_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import logging

from opencensus.stats import bucket_boundaries


logger = logging.getLogger(__name__)


class BaseAggregationData(object):
"""Aggregation Data represents an aggregated value from a collection

Expand Down Expand Up @@ -126,12 +131,17 @@ def __init__(self,
self._sum_of_sqd_deviations = sum_of_sqd_deviations
if bounds is None:
bounds = []
else:
assert bounds == list(sorted(set(bounds)))

if counts_per_bucket is None:
counts_per_bucket = [0 for ii in range(len(bounds) + 1)]
elif len(counts_per_bucket) != len(bounds) + 1:
raise ValueError("counts_per_bucket length does not match bounds "
"length")
else:
assert all(cc >= 0 for cc in counts_per_bucket)
assert len(counts_per_bucket) == len(bounds) + 1

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Asserts aren't guaranteed to run in production, since some people might run Python without assertions, but we always raise ValueErrors or AssertionErrors for these, I think.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I should have elaborated. Caveat #1 from this blog post mentions it. The assert docs also mention it, but not necessarily emphasising how they shouldn't be used.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to use asserts in a way consistent with the blog. An assertion failure here should mean a programming error since we only ever expect to call DistributionAggregationData.__init__ from DistributionAggregation.__init__, where we actually do raise a ValueError if the user gives us unsorted bounds.

I.e. precondition checks for API methods should raise Type/ValueErrors (and etc.), but precondition checks in internal-only code can assert instead since bad user input should never cause them to fail. The logic doesn't change if the assertions are removed, and they should only fail if there's a bug in the library.

That said, there's no bright line separating the API from the internals here, and we don't lose anything by changing these to ValueErrors (other than signaling the difference between expected and unexpected errors). If you've got a strong preference I don't mind changing it.


assert bounds == sorted(bounds)
assert all(bb > 0 for bb in bounds)

self._counts_per_bucket = counts_per_bucket
self._bounds = bucket_boundaries.BucketBoundaries(
Expand Down
30 changes: 6 additions & 24 deletions opencensus/stats/exporters/prometheus_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,31 +164,13 @@ def to_metric(self, desc, view):
labels=labels)
elif isinstance(agg_data,
aggregation_data_module.DistributionAggregationData):

assert(agg_data.bounds == sorted(agg_data.bounds))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should do same in the stats/stackdriver exporter?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here? I don't think we need this assertion everywhere we use distribution aggregations, I've only got it in the prometheus exporter since guaranteeing the bounds were sorted let me simplify the code a bit.

points = {}
# Histograms are cumulative in Prometheus.
# 1. Sort buckets in ascending order but, retain
# their indices for reverse lookup later on.
# TODO: If there is a guarantee that distribution elements
# are always sorted, then skip the sorting.
indices_map = {}
buckets = []
i = 0
for boundarie in view.aggregation.boundaries.boundaries:
if boundarie not in indices_map \
or indices_map == {}: # pragma: NO COVER
indices_map[str(boundarie)] = i
buckets.append(str(boundarie))
i += 1

buckets.sort()

# 2. Now that the buckets are sorted by magnitude
# we can create cumulative indicesmap them back by reverse index
cum_count = 0
for bucket in buckets:
i = indices_map[bucket]
cum_count += int(agg_data.counts_per_bucket[i])
points[bucket] = cum_count
for ii, bound in enumerate(agg_data.bounds):
cum_count += agg_data.counts_per_bucket[ii]
points[str(bound)] = cum_count
labels = desc['labels'] if points is None else None
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

points is never null here, but I didn't touch it because I don't know what prometheus expects. @mayurkale22 let me know if labels should be null when there are no points in the distribution and I'll change it.

return HistogramMetricFamily(name=desc['name'],
documentation=desc['documentation'],
Expand Down Expand Up @@ -217,7 +199,7 @@ def to_metric(self, desc, view):
% type(agg_data))

def collect(self): # pragma: NO COVER
""" Collect fetches the statistics from OpenCensus
"""Collect fetches the statistics from OpenCensus
and delivers them as Prometheus Metrics.
Collect is invoked everytime a prometheus.Gatherer is run
for example when the HTTP endpoint is invoked by Prometheus.
Expand Down
9 changes: 6 additions & 3 deletions opencensus/stats/measure_to_view_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from opencensus.stats.view_data import ViewData
from collections import defaultdict
import logging
import copy
import logging

from opencensus.stats import view_data as view_data_module


class MeasureToViewMap(object):
Expand Down Expand Up @@ -91,10 +92,12 @@ def register_view(self, view, timestamp):
if registered_measure is None:
self._registered_measures[measure.name] = measure
self._measure_to_view_data_list_map[view.measure.name].append(
ViewData(view=view, start_time=timestamp, end_time=timestamp))
view_data_module.ViewData(view=view, start_time=timestamp,
end_time=timestamp))

def record(self, tags, measurement_map, timestamp, attachments=None):
"""records stats with a set of tags"""
assert all(vv >= 0 for vv in measurement_map.values())
for measure, value in measurement_map.items():
if measure != self._registered_measures.get(measure.name):
return
Expand Down
33 changes: 32 additions & 1 deletion opencensus/stats/measurement_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,14 @@
# limitations under the License.

from datetime import datetime
import logging

from opencensus.tags import execution_context


logger = logging.getLogger(__name__)


class MeasurementMap(object):
"""Measurement Map is a map from Measures to measured values
to be recorded at the same time
Expand All @@ -33,6 +38,10 @@ def __init__(self, measure_to_view_map, attachments=None):
self._measurement_map = {}
self._measure_to_view_map = measure_to_view_map
self._attachments = attachments
# If the user tries to record a negative value for any measurement,
# refuse to record all measurements from this map. Recording negative
# measurements will become an error in a later release.
self._invalid = False

@property
def measurement_map(self):
Expand All @@ -51,10 +60,16 @@ def attachments(self):

def measure_int_put(self, measure, value):
"""associates the measure of type Int with the given value"""
if value < 0:
# Should be an error in a later release.
logger.warning("Cannot record negative values")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that we're breaking with the java impl here in that we don't mark the map invalid until record time.

self._measurement_map[measure] = value

def measure_float_put(self, measure, value):
"""associates the measure of type Float with the given value"""
if value < 0:
# Should be an error in a later release.
logger.warning("Cannot record negative values")
self._measurement_map[measure] = value

def measure_put_attachment(self, key, value):
Expand All @@ -75,11 +90,27 @@ def measure_put_attachment(self, key, value):

self._attachments[key] = value

def record(self, tag_map_tags=execution_context.get_current_tag_map()):
def record(self, tag_map_tags=None):
"""records all the measures at the same time with a tag_map.
tag_map could either be explicitly passed to the method, or implicitly
read from current execution context.
"""
if tag_map_tags is None:
tag_map_tags = execution_context.get_current_tag_map()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated to this PR, I'm assuming we want to get this at call time and not module load time.

if self._invalid:
logger.warning("Measurement map has included negative value "
"measurements, refusing to record")
return
for measure, value in self.measurement_map.items():
if value < 0:
self._invalid = True
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would set this in put method, no need to iterate on map here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong preference, but this is intentional since I don't think it makes sense to raise until the user tries to record.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it makes sense to raise until the user tries to record.

Agreed. But my point was if you set self._invalid = True in the put method, during record time you just have to check if (self._invalid == True) instead of iterating on map.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, but this would mean marking the map invalid if the user puts the same measure twice before recording, first with a negative value and second positive. Obviously they shouldn't do this, but there's no reason in principle to invalidate the map if they do. It also lets us log the particular negative value when record is called (vs. doing it earlier in put).

logger.warning("Dropping values, value to record must be "
"non-negative")
logger.info("Measure '{}' has negative value ({}), refusing "
"to record measurements from {}"
.format(measure.name, value, self))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not that we're doing it other places in the library, but I think it's a good idea to include more context at a more verbose logging level so users can turn up specific loggers to see what's going wrong in detail. Reviewers: let me know what you think.

return

self.measure_to_view_map.record(
tags=tag_map_tags,
measurement_map=self.measurement_map,
Expand Down
6 changes: 3 additions & 3 deletions tests/unit/stats/exporter/test_prometheus_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@

VIDEO_SIZE_VIEW_NAME = "my.org/views/video_size_test2"
VIDEO_SIZE_DISTRIBUTION = aggregation_module.DistributionAggregation(
[0.0, 16.0 * MiB, 256.0 * MiB])
[16.0 * MiB, 256.0 * MiB])
VIDEO_SIZE_VIEW = view_module.View(
VIDEO_SIZE_VIEW_NAME, "processed video size over time", [FRONTEND_KEY],
VIDEO_SIZE_MEASURE, VIDEO_SIZE_DISTRIBUTION)
Expand Down Expand Up @@ -189,7 +189,7 @@ def test_collector_to_metric_histogram(self):
self.assertEqual(desc['name'], metric.name)
self.assertEqual(desc['documentation'], metric.documentation)
self.assertEqual('histogram', metric.type)
self.assertEqual(5, len(metric.samples))
self.assertEqual(4, len(metric.samples))

def test_collector_to_metric_invalid_dist(self):
agg = mock.Mock()
Expand Down Expand Up @@ -232,7 +232,7 @@ def test_collector_collect(self):
self.assertEqual(desc['name'], metric.name)
self.assertEqual(desc['documentation'], metric.documentation)
self.assertEqual('histogram', metric.type)
self.assertEqual(5, len(metric.samples))
self.assertEqual(4, len(metric.samples))


class TestPrometheusStatsExporter(unittest.TestCase):
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/stats/exporter/test_stackdriver_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@

VIDEO_SIZE_VIEW_NAME = "my.org/views/video_size_test2"
VIDEO_SIZE_DISTRIBUTION = aggregation_module.DistributionAggregation(
[0.0, 16.0 * MiB, 256.0 * MiB])
[16.0 * MiB, 256.0 * MiB])
VIDEO_SIZE_VIEW = view_module.View(
VIDEO_SIZE_VIEW_NAME, "processed video size over time", [FRONTEND_KEY],
VIDEO_SIZE_MEASURE, VIDEO_SIZE_DISTRIBUTION)
Expand Down
25 changes: 21 additions & 4 deletions tests/unit/stats/test_aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,14 @@ def test_constructor_defaults(self):
distribution_aggregation.aggregation_type)

def test_constructor_explicit(self):
boundaries = ["test"]
distribution = {1: "test"}
boundaries = [1, 2]
distribution = [0, 1, 2]
distribution_aggregation = aggregation_module.DistributionAggregation(
boundaries=boundaries, distribution=distribution)

self.assertEqual(["test"],
self.assertEqual([1, 2],
distribution_aggregation.boundaries.boundaries)
self.assertEqual({1: "test"}, distribution_aggregation.distribution)
self.assertEqual([0, 1, 2], distribution_aggregation.distribution)
self.assertEqual(aggregation_module.Type.DISTRIBUTION,
distribution_aggregation.aggregation_type)

Expand All @@ -122,3 +122,20 @@ def test_min_max(self):

self.assertEqual(da.aggregation_data.min, -10)
self.assertEqual(da.aggregation_data.max, 10)

def test_init_bad_boundaries(self):
"""Check that boundaries must be sorted and unique."""
with self.assertRaises(ValueError):
aggregation_module.DistributionAggregation([1, 3, 2])
with self.assertRaises(ValueError):
aggregation_module.DistributionAggregation([1, 1, 2])

def test_init_negative_boundaries(self):
"""Check that non-positive boundaries are dropped."""
da = aggregation_module.DistributionAggregation([-2, -1, 0, 1, 2])
self.assertEqual(da.boundaries.boundaries, [1, 2])
self.assertEqual(da.aggregation_data.bounds, [1, 2])

da2 = aggregation_module.DistributionAggregation([-2, -1])
self.assertEqual(da2.boundaries.boundaries, [])
self.assertEqual(da2.aggregation_data.bounds, [])
Loading