Source code for stilpy.timegaps


from typing import Dict, Union, Any, Iterable, Iterator, List, Tuple, Optional
from operator import itemgetter
from itertools import groupby
from datetime import timedelta

from stilpy.timeinterval import TimeInterval

dicts = List[dict]

[docs] class TimeGaps: """ Class representing a collection for time interval objects. It's an iterator of `TimeInterval Objects`_. Attributes ---------- grouper_tags : List of dicts A list containig a collection of dictionarys with the keys passed as ``grouped_by`` parameter and the values finded in the ``iterable`` that makes the different groups. grouped_intervals : List of TimeGaps iterators Returns time intervals separated by groups. For every group a ``TimeGaps`` iterator is made an stored in a ``list``. The groups are created using the ``group_by`` argument passed in the instance. Raises ------ StopIteration Raised when the ``__next__()`` method is called but there are no more records to read. AttributeError Raised when ``grouper_tags`` or ``grouped_intervals`` attribute is reasigned. """ ERROR_ITERABLE = "it's not a supported iterable."
[docs] def __init__( self, iterable: Iterable, tag_loc: Union[str, int], i_tag: Any, f_tag: Any, dt_loc: Union[str, int], *args: Optional[str], group_by: Optional[Any]=None ) -> None: """ Parameters ---------- iterable : iterable of iterables An iterable object that contains a ``list`` of items. Those items must be ``dicts`` or dictlike objects. Lists, tuples and objects with ``__dict__`` atribute are accepted as well. Every item, must content the next items inside: 1. A ``datetime`` object or a string format datetime 2. A item that defines if the ``datetime`` object or string that we just mentioned is an initial or a final time point of a time interval. tag_loc : str or int Where to find the tag that contains the value representing wich part of the interval defines each element. In a ``list`` or a ``tuple`` can be an index. In a ``dict`` can be a key that reference the value. i_tag : Any The name of the initial time tag in the iterable. It's needed to know if a specific element of the iterable is an initial time value. Default is ``'start'``. f_tag : Any The name of the final time tag in the iterable. It's needed to know if a specific element of the iterable is a final time value. Default is ``'end'``. dt_loc : str or int The location, inside each element of the iterable, of the datetime information. args : str, optional Any argument that will be an attribute of the TimeInterval. For example, a name, an age... This parameter expects the key, or the attribute, depending on the ``iterable`` that is passed as first argument. Cannot be a ``int``, or a ``TypeError`` will be raised. If both records for an interval contains a key with different values, the attribute will be populated with a tuple like ``(<start_record_value>, <end_record_value>)`` group_by : Any, optional The tags that you want to use for making the correct pairs between the diferent records of your ``iterable``. If it is ``None``, the intervals will be made considering only the ``i_tag`` and the ``f_tag``. You can pass a ``list``, ``tuple`` or a single value. But every tag you pass must be contained in the iterables inside the main ``iterable``. Raises ------ ValueError If the iterables inside the main ``iterable`` are not supported a ``ValueError`` will be raised. TypeError: When ``int`` types are passed to the ``args`` parameter as additional attributes for ``TimeInterval``. """ self._tag_loc = tag_loc self._i_tag = i_tag self._f_tag = f_tag self._dt_loc = dt_loc self._args = args self._index = 0 self._lis_of_dicts = self._to_list_of_dicts(iterable) self._grouper_tags, self._groups = TimeGaps._create_groups( self._lis_of_dicts, group_by ) self._intervals = sorted(self._list_intervals())
@property def grouper_tags(self) -> dicts: """Return a ``list`` of dictionaries with the grouper tags. For example, if the group were made with ``'name'`` and ``'surname'``, this property will return something like: ``{'name': 'Jonh', 'surname': 'Smith', ...}``. """ return self._grouper_tags @property def grouped_intervals(self) -> List['TimeGaps']: """Returns a ``list`` with a ``TimeGaps`` iterator for each group. If there are no groups, a single ``TimeGaps`` object will be returned, with the same intervals, properties and methods as the initial instance of ``TimeGaps``. """ grouped = [] for group in self._groups: grouped.append(TimeGaps( group, self._tag_loc, self._i_tag, self._f_tag, self._dt_loc, *self._args )) return grouped
[docs] def total_duration(self, default: Any=None) -> Union[timedelta, Any]: """Returns the total duration of the ``TimeIntervals`` iterator. If any ``TimeInterval`` object is imperfect (``start`` or ``end`` atributte is empty `''`), the `default` ``argument`` passed to the method will be returned. Parameters ---------- default : Any, optional The value that will be returned if any ``TimeInterval`` object in the ``TimeGaps`` iterator hasn't a valid duration for the sum. Returns ------- timedelta When every ``TimeInterval`` object in the ``TimeGaps`` iterator has a valid ``duration`` atributte (that's a ``timedelta`` type) the method will return a ``timedelta`` object representing the sum of every ``duration`` atributte in the ``TimeInterval`` objects. Any If the method can't reach the sum, it returns the value of the ``default`` parameter. """ total = timedelta() for interval in self._intervals: if not interval.is_perfect: return default total += interval.duration return total
[docs] def total_duration_anyway(self) -> timedelta: """Returns the total duration of the ``TimeIntervals`` iterator. If any ``TimeInterval`` object is imperfect (``start`` or ``end`` atributte is empty `''`) the duration of the others intervals will be returned. Returns ------- timedelta When every ``TimeInterval`` object in the ``TimeGaps`` iterator has a valid ``duration`` atributte (that's a ``timedelta`` type) the method will return a ``timedelta`` object representing the sum of every ``duration`` atributte in the ``TimeInterval`` objects. If any timeinterval hasn't a valid duration, that interval will be ignored, but the duration will be returned anyway, with the sum of the intervals that do have a duration. """ total = timedelta() for interval in self._intervals: if interval.is_perfect: total += interval.duration return total
def _to_list_of_dicts(self, iterable: Iterable) -> dicts: """Cast the iterable of iterables to a iterable of dicts. If the iterable inside the main iterable is not supported, it will raise an ``ValueError`` """ iterable = list(iterable) if len(iterable) <= 0: raise IndexError('Iterable length cant\'t be 0') if 'index' in dir(iterable[0]): return [{i: el for i, el in enumerate(el)} for el in iterable] elif 'items' in dir(iterable[0]): return iterable elif 'keys' in dir(iterable[0]): return [{k: el[k] for k in el.keys()} for el in iterable] elif '__dict__' in dir(iterable[0]): return [el.__dict__ for el in iterable] else: raise ValueError(type(iterable), self.ERROR_ITERABLE) @staticmethod def _create_groups( iterable: Iterable, group_by: Union[None, str, Iterable] ) -> Tuple[dicts, dicts]: """Creates groups with the different combinations.""" group_tags = [] groups = [] if group_by is None: groups.append(iterable) return None, groups if type(group_by) in (str, int): grouper = itemgetter(group_by) else: grouper = itemgetter(*group_by) s_iter = sorted(iterable, key=grouper) for tag, group in groupby(s_iter, grouper): # If group_by is a str or a int tag, we avoid zip. if type(group_by) in (str, int): group_tag = {} group_tag[group_by] = tag else: group_tag = dict(zip(group_by, tag)) group_tags.append(group_tag) groups.append(list(group)) return group_tags, groups def _sort_iter(self, iterable: dicts) -> dicts: """Returns a sorted by datetime iterable.""" return sorted(iterable, key=itemgetter(self._dt_loc)) def _list_intervals(self) -> List['TimeInterval']: """Creates a list of intervals by groups""" intervals = [] for group in self._groups: interval = list(self._intervalize( self._sort_iter(group)) ) intervals.extend(interval) return intervals def _intervalize(self, sorted_iter: dicts) -> Iterator['TimeInterval']: """Returns a list with of TimeIntervals objects. If there is an incomplete interval, y create an empty starting or ending interval """ skip_loop = False # Compere every item with the next one to find his pair. for i, el in enumerate(sorted_iter): if skip_loop == True: skip_loop = False continue el1 = sorted_iter[i + 1] if (i + 1) < len(sorted_iter) else None # Creates a perfect pair interval if finds a proper initial # and final point. Else, an incomplete interval is created, # depending on wich limit is missiing, start or end. el_kwargs_in_args = self._pairs_in_args(el) if self._are_pair(el, el1): el1_kwargs_in_args = self._pairs_in_args(el1) new_kwargs = TimeGaps._join_args_in_dict(el_kwargs_in_args, el1_kwargs_in_args) interval = TimeInterval( start=el[self._dt_loc], end=el1[self._dt_loc], **new_kwargs ) skip_loop = True elif self._is_start(el): interval = TimeInterval(start=el[self._dt_loc], **el_kwargs_in_args) skip_loop = False elif self._is_end(el): interval = TimeInterval(end=el[self._dt_loc], **el_kwargs_in_args) skip_loop = False yield interval def _pairs_in_args(self, el: Dict[str, Any]): return {k: v for k, v in el.items() if k in self._args} def _are_pair(self, el1: dict, el2: Union[dict, None]) -> bool: """Return True if the first element is start and the second end""" if el2 is None: return False if self._is_start(el1) and self._is_end(el2): return True else: return False @staticmethod def _join_args_in_dict(start_el: Dict[str, Any], end_el: Dict[str, Any]) -> Dict[str, Any]: new_args = {} for key, val in start_el.items(): if key in end_el and end_el[key] != val: new_args[key] = (val, end_el[key]) else: new_args[key] = val # For end_el we will process just unique elements, since the others are already in new_args not_processed_el2_pairs = {k: v for k, v in end_el.items() if k not in new_args} new_args.update(not_processed_el2_pairs) return new_args def _is_start(self, element: dict) -> bool: """True if is a start elemente, else False""" tag = element[self._tag_loc] return True if tag == self._i_tag else False def _is_end(self, element: dict) -> bool: """True if is a end elemente, else False""" tag = element[self._tag_loc] return True if tag == self._f_tag else False def __iter__(self) -> Iterator['TimeInterval']: return self def __next__(self) -> 'TimeInterval': if self._index >= len(self._intervals): raise StopIteration index = self._index self._index += 1 return self._intervals[index] def __contains__(self, value: 'TimeInterval') -> bool: return value in self._intervals def __len__(self) -> int: return len(self._intervals) def __repr__(self) -> str: out = '' for i, interval in enumerate(self._intervals): end = ', ' if (i + 1) != len(self._intervals) else '' out += repr(interval) + end return 'TimeGaps({})'.format(out)