Source code for jange.base

import functools
import logging
import warnings
from typing import Any, Iterable, List, Optional

import cytoolz
import more_itertools
from scipy.sparse import issparse


class LoggerMixin(object):
    @property
    def logger(self):
        name = ".".join([__name__, self.__class__.__name__])
        return logging.getLogger(name)


class OperationCollection(list):
    def __add__(self, other):
        result = super().__add__(other)
        return OperationCollection(result)

    def find_by_name(self, name: str, first_only=True):
        """Finds the operation(s) by its name.

        Parameters
        ----------
        name : str
            name of the operation
        first_only : bool, optional
            if True then returns first operation with
            the given name otherwise returns all operations
            matching the given name, by default True

        Returns
        -------
        Operation

        Example
        -------
        >>> ds.applied_ops.find_by_name(name="tfidf")
        >>> ds.applied_ops.find_by_name(name="token_filter", first_only=False)
        """
        if first_only:
            try:
                return next((op for op in self if op.name == name))
            except StopIteration:
                raise LookupError(f"Operation with name {name} was not found")
        else:
            return OperationCollection(op for op in self if op.name == name)

    def find_up_to(self, name: str):
        """Returns all operations upto the operation matching the given name

        Parameters
        ----------
        name : str
            name of the operation

        Returns
        -------
        OperationCollection[Operation]
            Collection of operations that were applied upto the operation with
            given name

        Example
        -------
        To return all operations applied to a DataStream up to an operation with
        name 'tfidf'
        >>> feature_extraction_ops = ds.find_up_to(name="tfidf")
        """
        found = False
        output = OperationCollection()
        for op in self:
            output.append(op)
            if op.name == name:
                found = True
                break

        if found:
            return output
        else:
            raise LookupError(f"Operation with name {name} was not found")


[docs]class DataStream: """A class representing a stream of data. A data stream is created as a result of some operation. DataStream object can be iterated which basically iterates through the underlying data. The underlying data is stored in `items` attribute which can be any iterable object. Parameters ---------- items : iterable an iterable that contains the raw data applied_ops : Optional[List[Operation]] a list of operations that were applied to create this stream of data Example ------- >>> ds = DataStream(items=[1, 2, 3]) >>> print(list(ds)) >>> [1, 2, 3] Attributes ---------- applied_ops : List[Operation] a list of operations that were applied to create this stream of data items : iterable an iterable that contains the raw data """ def __init__( self, items: Iterable[Any], applied_ops: Optional[List] = None, context: Optional[Iterable[Any]] = None, ): if items is None: raise ValueError("items cannot be None") # items is countable # if items count is 0 then raise Exception # if context is None then generate a list of context # if context is generator then raise Exception # if context is countable and does not have same length as items # then raise Exception if self._is_countable(items): if self._count_items(items) == 0: raise ValueError( f"items must contain atleast one element but got {items}" ) elif context is None: context = list(range(len(items))) elif not self._is_countable(context): raise ValueError( f"context cannot be a generator when items is not a generator but got {context}" ) else: items_len = self._count_items(items) context_len = self._count_items(context) if items_len != context_len: raise ValueError( "items and context should have same length " f"but received items with length={items_len} and context with length={context_len}" ) # items is not countable i.e. a generator # if context is None then create a context else: if context is None: context, items = more_itertools.unzip(enumerate(items)) self.items = items self.context = context if applied_ops and not isinstance(applied_ops, OperationCollection): applied_ops = OperationCollection(applied_ops) self.applied_ops = applied_ops or OperationCollection() def _is_countable(self, x): return hasattr(x, "__len__") def _count_items(self, items): if not self._is_countable(items): raise AttributeError( "Length of this datastream cannot be determined because the items are from a generator" ) if issparse(items): return items.shape[0] else: return len(items) def __iter__(self): for item in self.items: yield item @property def total_items(self): return self._count_items(self.items) @property def is_countable(self) -> bool: return self._is_countable(self.items) @property def item_type(self): if self.is_countable: return type(self.items[0]) else: first, items = cytoolz.peek(self.items) self.items = items return type(first) def apply(self, *ops, result_collector: dict = None, op_kwargs: dict = {}): x = self for op in ops: kwargs = op_kwargs.get(op.name, {}) x = op.run(x, **kwargs) if result_collector is not None: result_collector[op] = x return x def __repr__(self) -> str: item_type = self.item_type total_items = self.total_items if self.is_countable else "unknown" return f"DataStream(item_type={item_type}, is_finite={self.is_countable}, total_items={total_items})"
class Operation(LoggerMixin): accepts_types = [] produces_types = [] def __init__(self, name: Optional[str] = None) -> None: super().__init__() name = name or self.__class__.__name__ self.name = name def run(self, ds: DataStream) -> DataStream: raise NotImplementedError() def __call__(self, ds: DataStream) -> DataStream: return self.run(ds) class FunctionOperation(Operation): def __init__(self, name: str, fn): super().__init__(name=name if name else fn.__name__) self.fn = fn def run(self, ds): return self.fn(ds) class TrainableMixin: def __init__(self) -> None: self.should_train = True def accepts(*types, strict=False): """Class decorator for specifying what types of items in the DataStream are supported by the Operation class """ def _accepts(cls: Operation): cls.strict_validation = strict cls.accepts_types = types f = getattr(cls, "run") @functools.wraps(f) def wrapper(self, ds: DataStream, *args, **kwargs): item_type = ds.item_type if self.accepts_types and not any( issubclass(item_type, accepted_type) for accepted_type in self.accepts_types ): msg = f"Operation {self} expected DataStream having item_type = any{self.accepts_types} but got {item_type}" if self.strict_validation: raise Exception(msg) else: warnings.warn(msg) return f(self, ds, *args, **kwargs) cls.run = wrapper return cls return _accepts def produces(*types): """Class decorator for specifying what type of item will be in the DataStream that this Operation returns """ def _produces(cls): cls.produces_types = types return cls return _produces def operation(name=None, accepts=(), produces=(), strict=False): """Decorator for returning FunctionOperation from the given function Example ------- >>> @operation(accepts=str, produces=str) >>> def uppercase_op(ds): >>> return (text.upper() for text in ds) >>> op = uppercase_op() >>> op.run(stream.DataStream(["hi", "hello"])) """ def _operation(f): @functools.wraps(f) def wrapper(*args, **kwargs): o = FunctionOperation(name=name if name else f.__name__, fn=f) o.strict_validation = strict o.accepts_types = ( accepts if isinstance(accepts, (list, tuple)) else [accepts] ) o.produces_types = ( produces if isinstance(produces, (list, tuple)) else [produces] ) return o return wrapper return _operation