@@ -65,6 +65,7 @@ ViewDataImpl::ViewDataImpl(absl::Time start_time,
6565 aggregation_window_ (descriptor.aggregation_window_),
6666 type_(TypeForDescriptor(descriptor)),
6767 start_times_(),
68+ expiry_duration_(descriptor.expiry_duration_),
6869 start_time_(start_time) {
6970 switch (type_) {
7071 case Type::kDouble : {
@@ -195,6 +196,8 @@ void ViewDataImpl::Merge(const std::vector<std::string>& tag_values,
195196 const MeasureData& data, absl::Time now) {
196197 // A value is set here. Set a start time if it is unset.
197198 SetStartTimeIfUnset (tag_values, now);
199+ SetUpdateTime (tag_values, now);
200+ PurgeExpired (now);
198201 switch (type_) {
199202 case Type::kDouble : {
200203 if (aggregation_.type () == Aggregation::Type::kSum ) {
@@ -302,5 +305,60 @@ ViewDataImpl::ViewDataImpl(ViewDataImpl* source, absl::Time now)
302305 }
303306}
304307
308+ void ViewDataImpl::SetUpdateTime (const std::vector<std::string>& tag_values,
309+ absl::Time now) {
310+ if (expiry_duration_ == absl::ZeroDuration ()) {
311+ // No need to track last update time if expiry duration is not set.
312+ return ;
313+ }
314+
315+ auto update_time_map_iter = update_time_entries_.find (tag_values);
316+ if (update_time_map_iter == update_time_entries_.end ()) {
317+ // The timeseries is not tracked, add it to the update time list and map.
318+ update_times_.emplace_front (now, tag_values);
319+ update_time_entries_[tag_values] = update_times_.begin ();
320+ } else {
321+ // The timeseries is tracked, update its updated time and move it to the
322+ // front of the list.
323+ auto update_time_list_iter = update_time_map_iter->second ;
324+ update_time_list_iter->first = now;
325+ update_times_.splice (update_times_.begin (), update_times_,
326+ update_time_list_iter);
327+ }
328+ }
329+
330+ void ViewDataImpl::PurgeExpired (absl::Time now) {
331+ if (expiry_duration_ == absl::ZeroDuration () || update_times_.empty ()) {
332+ // No need to remove expired entries since either expiry is not set or there
333+ // is no data entry yet.
334+ return ;
335+ }
336+ // Remove data that has not been updated for expiry.
337+ while (now - update_times_.back ().first > expiry_duration_) {
338+ const auto & tags = update_times_.back ().second ;
339+ update_time_entries_.erase (tags);
340+ start_times_.erase (tags);
341+ switch (type_) {
342+ case Type::kDouble : {
343+ double_data_.erase (tags);
344+ break ;
345+ }
346+ case Type::kInt64 : {
347+ int_data_.erase (tags);
348+ break ;
349+ }
350+ case Type::kDistribution : {
351+ distribution_data_.erase (tags);
352+ break ;
353+ }
354+ case Type::kStatsObject : {
355+ interval_data_.erase (tags);
356+ break ;
357+ }
358+ }
359+ update_times_.pop_back ();
360+ }
361+ }
362+
305363} // namespace stats
306364} // namespace opencensus
0 commit comments