-
Notifications
You must be signed in to change notification settings - Fork 247
Drop negative measurements on record #389
Changes from all commits
0e41971
3a3c449
1f17fdf
407b2bb
1d1ba09
e4374fa
c77429a
1a4756e
c7b65b2
bce908c
87e21f3
ec6bb8b
2148de7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,9 +12,14 @@ | |
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| import logging | ||
|
|
||
| from opencensus.stats import bucket_boundaries | ||
|
|
||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class BaseAggregationData(object): | ||
| """Aggregation Data represents an aggregated value from a collection | ||
|
|
||
|
|
@@ -126,12 +131,17 @@ def __init__(self, | |
| self._sum_of_sqd_deviations = sum_of_sqd_deviations | ||
| if bounds is None: | ||
| bounds = [] | ||
| else: | ||
| assert bounds == list(sorted(set(bounds))) | ||
|
|
||
| if counts_per_bucket is None: | ||
| counts_per_bucket = [0 for ii in range(len(bounds) + 1)] | ||
| elif len(counts_per_bucket) != len(bounds) + 1: | ||
| raise ValueError("counts_per_bucket length does not match bounds " | ||
| "length") | ||
| else: | ||
| assert all(cc >= 0 for cc in counts_per_bucket) | ||
| assert len(counts_per_bucket) == len(bounds) + 1 | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Asserts aren't guaranteed to run in production, since some people might run Python without assertions, but we always raise ValueErrors or AssertionErrors for these, I think. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, I should have elaborated. Caveat #1 from this blog post mentions it. The
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tried to use asserts in a way consistent with the blog. An assertion failure here should mean a programming error since we only ever expect to call I.e. precondition checks for API methods should raise That said, there's no bright line separating the API from the internals here, and we don't lose anything by changing these to |
||
|
|
||
| assert bounds == sorted(bounds) | ||
| assert all(bb > 0 for bb in bounds) | ||
|
|
||
| self._counts_per_bucket = counts_per_bucket | ||
| self._bounds = bucket_boundaries.BucketBoundaries( | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -164,31 +164,13 @@ def to_metric(self, desc, view): | |
| labels=labels) | ||
| elif isinstance(agg_data, | ||
| aggregation_data_module.DistributionAggregationData): | ||
|
|
||
| assert(agg_data.bounds == sorted(agg_data.bounds)) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should do same in the stats/stackdriver exporter?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here? I don't think we need this assertion everywhere we use distribution aggregations, I've only got it in the prometheus exporter since guaranteeing the bounds were sorted let me simplify the code a bit. |
||
| points = {} | ||
| # Histograms are cumulative in Prometheus. | ||
| # 1. Sort buckets in ascending order but, retain | ||
| # their indices for reverse lookup later on. | ||
| # TODO: If there is a guarantee that distribution elements | ||
| # are always sorted, then skip the sorting. | ||
| indices_map = {} | ||
| buckets = [] | ||
| i = 0 | ||
| for boundarie in view.aggregation.boundaries.boundaries: | ||
| if boundarie not in indices_map \ | ||
| or indices_map == {}: # pragma: NO COVER | ||
| indices_map[str(boundarie)] = i | ||
| buckets.append(str(boundarie)) | ||
| i += 1 | ||
|
|
||
| buckets.sort() | ||
|
|
||
| # 2. Now that the buckets are sorted by magnitude | ||
| # we can create cumulative indicesmap them back by reverse index | ||
| cum_count = 0 | ||
| for bucket in buckets: | ||
| i = indices_map[bucket] | ||
| cum_count += int(agg_data.counts_per_bucket[i]) | ||
| points[bucket] = cum_count | ||
| for ii, bound in enumerate(agg_data.bounds): | ||
| cum_count += agg_data.counts_per_bucket[ii] | ||
| points[str(bound)] = cum_count | ||
| labels = desc['labels'] if points is None else None | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| return HistogramMetricFamily(name=desc['name'], | ||
| documentation=desc['documentation'], | ||
|
|
@@ -217,7 +199,7 @@ def to_metric(self, desc, view): | |
| % type(agg_data)) | ||
|
|
||
| def collect(self): # pragma: NO COVER | ||
| """ Collect fetches the statistics from OpenCensus | ||
c24t marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| """Collect fetches the statistics from OpenCensus | ||
| and delivers them as Prometheus Metrics. | ||
| Collect is invoked everytime a prometheus.Gatherer is run | ||
| for example when the HTTP endpoint is invoked by Prometheus. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -13,9 +13,14 @@ | |
| # limitations under the License. | ||
|
|
||
| from datetime import datetime | ||
| import logging | ||
|
|
||
| from opencensus.tags import execution_context | ||
|
|
||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class MeasurementMap(object): | ||
| """Measurement Map is a map from Measures to measured values | ||
| to be recorded at the same time | ||
|
|
@@ -33,6 +38,10 @@ def __init__(self, measure_to_view_map, attachments=None): | |
| self._measurement_map = {} | ||
| self._measure_to_view_map = measure_to_view_map | ||
| self._attachments = attachments | ||
| # If the user tries to record a negative value for any measurement, | ||
| # refuse to record all measurements from this map. Recording negative | ||
| # measurements will become an error in a later release. | ||
| self._invalid = False | ||
|
|
||
| @property | ||
| def measurement_map(self): | ||
|
|
@@ -51,10 +60,16 @@ def attachments(self): | |
|
|
||
| def measure_int_put(self, measure, value): | ||
| """associates the measure of type Int with the given value""" | ||
| if value < 0: | ||
| # Should be an error in a later release. | ||
| logger.warning("Cannot record negative values") | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note that we're breaking with the java impl here in that we don't mark the map invalid until record time. |
||
| self._measurement_map[measure] = value | ||
|
|
||
| def measure_float_put(self, measure, value): | ||
| """associates the measure of type Float with the given value""" | ||
| if value < 0: | ||
| # Should be an error in a later release. | ||
| logger.warning("Cannot record negative values") | ||
| self._measurement_map[measure] = value | ||
|
|
||
| def measure_put_attachment(self, key, value): | ||
|
|
@@ -75,11 +90,27 @@ def measure_put_attachment(self, key, value): | |
|
|
||
| self._attachments[key] = value | ||
|
|
||
| def record(self, tag_map_tags=execution_context.get_current_tag_map()): | ||
| def record(self, tag_map_tags=None): | ||
| """records all the measures at the same time with a tag_map. | ||
| tag_map could either be explicitly passed to the method, or implicitly | ||
| read from current execution context. | ||
| """ | ||
| if tag_map_tags is None: | ||
| tag_map_tags = execution_context.get_current_tag_map() | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unrelated to this PR, I'm assuming we want to get this at call time and not module load time. |
||
| if self._invalid: | ||
| logger.warning("Measurement map has included negative value " | ||
| "measurements, refusing to record") | ||
| return | ||
| for measure, value in self.measurement_map.items(): | ||
| if value < 0: | ||
| self._invalid = True | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would set this in put method, no need to iterate on map here.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't have a strong preference, but this is intentional since I don't think it makes sense to raise until the user tries to record.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Agreed. But my point was if you set
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it, but this would mean marking the map invalid if the user puts the same measure twice before recording, first with a negative value and second positive. Obviously they shouldn't do this, but there's no reason in principle to invalidate the map if they do. It also lets us log the particular negative value when |
||
| logger.warning("Dropping values, value to record must be " | ||
| "non-negative") | ||
| logger.info("Measure '{}' has negative value ({}), refusing " | ||
| "to record measurements from {}" | ||
| .format(measure.name, value, self)) | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not that we're doing it other places in the library, but I think it's a good idea to include more context at a more verbose logging level so users can turn up specific loggers to see what's going wrong in detail. Reviewers: let me know what you think. |
||
| return | ||
|
|
||
| self.measure_to_view_map.record( | ||
| tags=tag_map_tags, | ||
| measurement_map=self.measurement_map, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could do the above check during this loop too, and then we only have to iterate through the list of boundaries once?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This loop should normally break on the first item, I think the slight readability improvement is worth it here.