Source code for jange.ops.neighbors

import networkx as nx
from sklearn import neighbors as sknn

from jange import base, ops, stream


[docs]class NearestNeighborsOperation(ops.base.ScikitBasedOperation): def __init__( self, n_neighbors: int = 10, metric="cosine", name: str = "nearest_neighbors" ) -> None: model = sknn.NearestNeighbors(n_neighbors=n_neighbors, metric=metric) super().__init__(model=model, predict_fn_name="kneighbors", name=name) def _predict(self, ds, predict_params: dict = {}): # nn needs access to full dataset to determine nearest neighbors ctx = list(ds.context) bs = len(ctx) for batch, context in self._get_batch(bs, ds, ctx): distances, indices = self.model.kneighbors(batch) output = [] for dists, indxs in zip(distances, indices): # get contexts for neighbors nbor_ctxs = [context[i] for i in indxs] nbors = [ {"context": c, "distance": d, "item_idx": i} for c, d, i in zip(nbor_ctxs, dists, indxs) ] output.append(nbors) yield output, context
[docs]class SimilarPairOperation(NearestNeighborsOperation): """Finds similar pairs This operation uses nearest neighbors algorithms from sklearn.neighbors package to find similar items in a dataset and convert them into pairs. Unlike nearest neighbors, where you get `n_neighbor` items for each item in the input, similar pairs will only return distinct occurence of any two items. The input data stream should contain a numpy array or a scipy sparse matrix. Attributes ---------- sim_threshold : float minimun similarity threshold that each should pair have to be considered as being similar model : any model from sklearn.neighbors package. default `sklearn.neighbors.NearestNeighbors` name : str name of this operation. default `similar_pair` Example ------- >>> features_ds = stream.DataStream(np.random.uniform(size=(20, 100))) >>> op = SimilarPairOperation(sim_threshold=0.9) >>> similar_pairs = features_ds.apply(features_ds) """ valid_metrics = ["cosine", "euclidean"] def __init__( self, sim_threshold=0.8, metric="cosine", n_neighbors=10, name: str = "similar_pair", ) -> None: if metric not in self.valid_metrics: raise ValueError( f"metric should be one of {self.valid_metrics} but got {metric}" ) self.sim_threshold = sim_threshold super().__init__(n_neighbors=n_neighbors, metric=metric, name=name) def _get_similariry_from_distance(self, metric: str, distance: float): if metric == "cosine": return 1 - distance elif metric == "euclidean": return 1 / (1 + distance) else: raise ValueError(f"unknown metric {metric}") def _get_pairs(self, dist_indices_ds): def get_pair_key(id1, id2): return f"{id1}_{id2}" items = list(dist_indices_ds) context = list(dist_indices_ds.context) is_pair_seen = set() pairs = [] for data in items: nbor_distances, nbor_idxs = zip( *[(d["distance"], d["item_idx"]) for d in data] ) main_id = nbor_idxs[0] for d, i in zip(nbor_distances[1:], nbor_idxs[1:]): doc1_id, doc2_id = sorted([main_id, i]) pair_key = get_pair_key(doc1_id, doc2_id) sim = self._get_similariry_from_distance(self.model.metric, distance=d) if pair_key in is_pair_seen or sim < self.sim_threshold: continue pairs.append((context[doc1_id], context[doc2_id], sim)) is_pair_seen.add(pair_key) pairs = sorted(pairs, key=lambda p: p[2], reverse=True) return pairs def run(self, ds: stream.DataStream) -> stream.DataStream: dist_indices_ds = super().run(ds) pairs = self._get_pairs(dist_indices_ds) # contexts do not make sense anymore return stream.DataStream(pairs, applied_ops=ds.applied_ops + [self])
[docs]class GroupingOperation(base.Operation): """Operation to group a list of pairs. This operation is similar to clustering but instead requires a list of pairs. It then uses the pairs data to create a graph and find connected components to group the items. e.g. is there are pairs [("a", "b"), ("b", "c"), ("e", "f")] then the groups formed will be [{'a', 'b', 'c'}, {'e', 'f'}] The items in the DataStream should be a tuple where each tuple indicates a pair as follows: `<item1, item2, *other_properties>`. All other entries in the tuple except `item1` and `item2` will not be used by the operation and is discarded. Typically, the output of `ops.neighbors.SimilarPairOperation` is passed to this operation. Parameters ---------- name : str name of this operation, default `grouping` Attributes ---------- name : str name of this operation """ def __init__(self, name: str = "grouping") -> None: super().__init__(name=name) def run(self, ds: stream.DataStream) -> stream.DataStream: G = nx.Graph() for pair in ds: idx1, idx2 = pair[0], pair[1] G.add_edge(idx1, idx2) groups = list(nx.connected_components(G)) return stream.DataStream(groups, applied_ops=ds.applied_ops + [self])