Source code for operators.dimension_reduction.dimension_reduction

import gc
from dataclasses import dataclass
from typing import Any

import numpy as np
import umap
from sklearn.manifold import TSNE

from feluda import Operator


class ReductionModel:
    """Base class for dimension reduction models."""

    def __init__(self, params: Any) -> None:
        self.params = params

    @staticmethod
    def validate_embeddings(embeddings_array: np.ndarray) -> np.ndarray:
        """Validate embeddings array, converting list to numpy array if needed.

        Args:
            embeddings_array: Either a list or numpy array of embeddings


        Raises:
            ValueError: If the embeddings are invalid
        """
        if not isinstance(embeddings_array, np.ndarray):
            raise ValueError("Embeddings must be a list or numpy array")

        if embeddings_array.ndim != 2:
            raise ValueError("Embeddings should be a 2D array")

        if embeddings_array.shape[0] == 0 or embeddings_array.shape[1] == 0:
            raise ValueError("Embeddings array cannot be empty or have zero dimensions")

        if not np.issubdtype(embeddings_array.dtype, np.number):
            raise ValueError("Embeddings must contain numeric values")

        if np.any(np.isnan(embeddings_array)) or np.any(np.isinf(embeddings_array)):
            raise ValueError("Embeddings contain NaN or infinite values")


@dataclass
class TSNEParams:
    """Configuration parameters for t-SNE dimensionality reduction.

    Attributes:
        n_components: Number of dimensions to reduce to (default: 2)
        perplexity: Perplexity parameter for t-SNE (default: 30.0)
        learning_rate: Learning rate for optimization (default: 150.0)
        max_iter: Maximum number of iterations (default: 1000)
        random_state: Random seed for reproducibility (default: 42)
        method: Algorithm method ('barnes_hut' or 'exact') (default: 'barnes_hut')
    """

    n_components: int = 2
    perplexity: float = 30.0
    learning_rate: float = 150.0
    max_iter: int = 1000
    random_state: int = 42
    method: str = "barnes_hut"

    def __post_init__(self) -> None:
        """Validate t-SNE parameters."""
        if self.n_components < 1:
            raise ValueError("n_components must be at least 1")
        if self.perplexity <= 0:
            raise ValueError("perplexity must be positive")
        if self.learning_rate <= 0:
            raise ValueError("learning_rate must be positive")
        if self.max_iter < 1:
            raise ValueError("max_iter must be at least 1")
        if self.method not in {"barnes_hut", "exact"}:
            raise ValueError("method must be 'barnes_hut' or 'exact'")


@dataclass
class UMAPParams:
    """Configuration parameters for UMAP dimensionality reduction.

    Attributes:
        n_components: Number of dimensions to reduce to (default: 2)
        n_neighbors: Size of local neighborhood (default: 15)
        min_dist: Minimum distance between embedded points (default: 0.1)
        metric: Distance metric to use (default: 'euclidean')
        random_state: Random seed for reproducibility (default: 42)
    """

    n_components: int = 2
    n_neighbors: int = 15
    min_dist: float = 0.1
    metric: str = "euclidean"
    random_state: int = 42

    def __post_init__(self) -> None:
        """Validate UMAP parameters."""
        if self.n_components < 1:
            raise ValueError("n_components must be at least 1")
        if self.n_neighbors < 2:
            raise ValueError("n_neighbors must be at least 2")
        if not (0 <= self.min_dist <= 1):
            raise ValueError("min_dist must be between 0 and 1")


class TSNEReduction(ReductionModel):
    """T-SNE (t-Distributed Stochastic Neighbor Embedding) dimension
    reduction.
    """

    def __init__(self, params: TSNEParams) -> None:
        """Initialize the t-SNE model with parameters.

        Args:
            params: TSNE configuration parameters

        Raises:
            ValueError: If the t-SNE model fails to initialize
        """
        super().__init__(params)
        try:
            self.model = TSNE(
                n_components=params.n_components,
                perplexity=params.perplexity,
                learning_rate=params.learning_rate,
                max_iter=params.max_iter,
                random_state=params.random_state,
                method=params.method,
            )
            print("t-SNE model successfully initialized")
        except Exception as e:
            raise ValueError(f"Failed to initialize t-SNE model: {e}")

    def run(self, embeddings_array: np.ndarray) -> np.ndarray:
        """Apply the t-SNE model to reduce the dimensionality of embeddings.

        Args:
            embeddings_array: A 2D numpy array of embeddings to be reduced

        Returns:
            numpy.ndarray: The reduced embeddings as a 2D array.
        """
        self.validate_embeddings(embeddings_array)
        try:
            return self.model.fit_transform(embeddings_array)
        except Exception as e:
            raise RuntimeError(f"t-SNE reduction failed: {e}")


class UMAPReduction(ReductionModel):
    """UMAP (Uniform Manifold Approximation and Projection) dimension
    reduction.
    """

    def __init__(self, params: UMAPParams) -> None:
        """Initialize the UMAP model with parameters.

        Args:
            params: UMAP configuration parameters

        Raises:
            ValueError: If the UMAP model fails to initialize
        """
        super().__init__(params)
        try:
            self.model = umap.UMAP(
                n_components=params.n_components,
                n_neighbors=params.n_neighbors,
                min_dist=params.min_dist,
                metric=params.metric,
                random_state=params.random_state,
            )
            print("UMAP model successfully initialized")
        except Exception as e:
            raise ValueError(f"Failed to initialize UMAP model: {e}")

    def run(self, embeddings_array: np.ndarray) -> np.ndarray:
        """Apply the UMAP model to reduce the dimensionality of embeddings.

        Args:
            embeddings_array: A 2D numpy array of embeddings to be reduced

        Returns:
            numpy.ndarray: The reduced embeddings as a 2D array.
        """
        self.validate_embeddings(embeddings_array)
        try:
            return self.model.fit_transform(embeddings_array)
        except Exception as e:
            raise RuntimeError(f"UMAP reduction failed: {e}")


[docs] class DimensionReduction(Operator): """Main interface for dimensionality reduction."""
[docs] def __init__(self, model_type: str, params: dict[str, Any] | None = None) -> None: """Initialize the dimension reduction operator. Args: model_type: Type of model to use ('tsne' or 'umap') params: Optional dictionary of parameters for the model Raises: ValueError: If the model type is not supported or initialization fails """ if params is None: params = {} try: self.reduction_model: ReductionModel = self.get_reduction_model( model_type, params ) except Exception as e: raise ValueError(f"Failed to initialize dimension reduction model: {e}")
[docs] @staticmethod def get_reduction_model(model_type: str, params: dict[str, Any]) -> ReductionModel: """Create a dimension reduction model based on the model type. Args: model_type: Type of model ('tsne' or 'umap') params: Dictionary of parameters for the model Returns: A dimension reduction model instance """ model_type_lower = model_type.lower() if model_type_lower == "tsne": tsne_params = TSNEParams(**params) return TSNEReduction(tsne_params) if model_type_lower == "umap": umap_params = UMAPParams(**params) return UMAPReduction(umap_params) raise ValueError(f"Unsupported model type: {model_type}")
[docs] @staticmethod def gen_data(payloads: list, reduced_embeddings: np.ndarray) -> list[dict]: """Generates the formatted output. Args: payloads (list): List of paylods. reduced_embeddings (nd.array): An array of reduced embeddings. Returns: list: A list of dictionaries containing the payload and corresponding embedding. """ return [ { "payload": payload, "reduced_embedding": reduced_embedding.tolist(), } for payload, reduced_embedding in zip( payloads, reduced_embeddings, strict=False ) ]
[docs] def run(self, input_data: list[dict]) -> list[dict]: """Reduce the dimensionality of the provided embeddings using the initialized model. Args: input_data (list): A list of dictionaries containing payload and embeddings to be reduced. Example: [ { "payload": "123", "embedding": [1, 2, 3] }, { "payload": "124", "embedding": [1, 0, 1] } ] Returns: list: The reduced embeddings and the corresponding payload as a list of dictionaries. Example: [ { "payload":"123", "reduced_embedding": [1, 2] }, { "payload": "124", "reduced_embedding": [1, 0] } ] Raises: ValueError: If the embeddings input is not a non-empty list. KeyError: If the input data is invalid. """ if not isinstance(input_data, list) or len(input_data) == 0: raise ValueError("Input should be a non-empty list.") try: self.embeddings, self.payloads = zip( *[(data["embedding"], data["payload"]) for data in input_data], strict=False, ) except KeyError as e: raise KeyError(f"Missing key in input data: {e}") self.embeddings = np.array(self.embeddings) self.reduced = self.reduction_model.run(self.embeddings) return self.gen_data(self.payloads, self.reduced)
[docs] def cleanup(self) -> None: """Cleans up resources used by the operator.""" del self.reduction_model self.embeddings = None self.payloads = None self.reduced = None self.reduction_model = None gc.collect()
[docs] def state(self) -> dict: """Returns the current state of the operator. Returns: dict: State of the operator """ if not hasattr(self, "reduction_model"): raise RuntimeError("Reduction model is not initialized.") return { "model": self.reduction_model, "embeddings": self.embeddings.tolist(), "payloads": self.payloads, "reduced": self.reduced.tolist(), }