Cluster Embeddings Operator

Description

The Cluster Embeddings operator clusters high-dimensional embeddings into groups using KMeans, Agglomerative Clustering, or Affinity Propagation algorithms from scikit-learn. It supports both audio and video modalities, and can automatically determine the number of clusters using Affinity Propagation if not specified.

Model Information

  • KMeans: Partitions data into a specified number of clusters by minimizing within-cluster variance.

  • Agglomerative Clustering: Hierarchical clustering that merges pairs of clusters based on distance.

  • Affinity Propagation: Clusters data by sending messages between points, automatically determining the number of clusters.

  • Vector Size: Any dimensionality supported by scikit-learn clustering algorithms.

  • Usage: Groups similar embeddings for downstream tasks such as retrieval, summarization, or visualization.

Dependencies

  • scikit-learn >= 1.6.1

  • numpy >= 1.26,<2.2.0

How to Run the Tests

  1. Ensure that you are in the root directory of the feluda project.

  2. Install dependencies (in your virtual environment):

    uv pip install "./operators/cluster_embeddings"
    uv pip install "feluda[dev]"
    
  3. Run the tests:

    pytest operators/cluster_embeddings/test.py
    

Usage

from feluda.operators import ClusterEmbeddings

# Initialize the operator
operator = ClusterEmbeddings()

# Prepare input data
input_data = [
    {"payload": "A", "embedding": [0, 1]},
    {"payload": "B", "embedding": [1, 0]},
    {"payload": "C", "embedding": [100, 101]},
    {"payload": "D", "embedding": [101, 100]},
]

# Run clustering (audio modality, KMeans)
result = operator.run(input_data, n_clusters=2, modality="audio")
print(result)

# Run clustering (video modality, Agglomerative)
result = operator.run(input_data, n_clusters=2, modality="video")
print(result)

# Run clustering with automatic cluster count (Affinity Propagation)
result = operator.run(input_data, modality="audio")
print(result)
class operators.cluster_embeddings.cluster_embeddings.ClusterEmbeddings[source]

Bases: Operator

Operator to cluster embeddings using KMeans, Affinity Propagation, and Agglomerative clustering algorithms.

__init__() None[source]

Initialize the ClusterEmbeddings operator.

static gen_data(payloads: list[dict], labels: numpy.ndarray) dict[source]

Generate formatted output data.

Parameters:
  • payloads (list) – List of payloads

  • labels (np.ndarray) – An array of cluster labels

Returns:

A dictionary mapping cluster labels to corresponding array of payloads

Return type:

dict

static kmeans(matrix: list[list], n_clusters: int) numpy.ndarray[source]

Cluster embeddings using KMeans.

Parameters:
  • matrix (list[list]) – list of embeddings

  • n_clusters (int) – number of clusters

Returns:

An array of cluster labels for each embedding

Return type:

numpy.ndarray

static agglomerative(matrix: list[list], n_clusters: int) numpy.ndarray[source]

Cluster embeddings using Agglomerative Clustering.

Parameters:
  • matrix (list[list]) – list of embeddings

  • n_clusters (int) – number of clusters

Returns:

An array of cluster labels for each embedding

Return type:

numpy.ndarray

static affinity_propagation(matrix: list[list]) numpy.ndarray[source]

Cluster embeddings using Affinity Propagation.

(Used if the number of clusters is unknown).

Parameters:

matrix (list[list]) – list of embeddings

Returns:

An array of cluster labels for each embedding

Return type:

numpy.ndarray

run(input_data: list[dict], n_clusters: int | None = None, modality: str | None = None) dict[source]

Run the operator.

Parameters:
  • input_data (list[dict]) – List of data with each dictionary containing embedding and payload properties

  • n_clusters (int, optional) – Number of clusters. Defaults to None

  • modality (str, optional) – Source modality of embeddings. Defaults to None

Returns:

A dictionary mapping cluster labels to corresponding array of payloads

Return type:

dict

cleanup() None[source]

Clean up resources used by the operator.

state() dict[source]

Return the current state of the operator.

Returns:

State of the operator

Return type:

dict