#%%
import ssl
ssl._create_default_https_context = ssl._create_unverified_context
import random
import math
import time
import numpy as np
from sklearn.cluster import KMeans, MiniBatchKMeans
from sklearn.decomposition import PCA
from sklearn.metrics import (
silhouette_score,
calinski_harabasz_score,
davies_bouldin_score,
)
import spacy
from pymfe.mfe import MFE
import torch
import torchvision
from torchvision.models import (
resnet18,
vgg16,
densenet121,
alexnet,
mobilenet_v3_small,
) # , vit_l_32
import torch.nn.functional as F
from modlee.utils import closest_power_of_2, _make_serializable
fixed_resize = 32
import logging, warnings
for _logger in ["sklearn", "pymfe", "numpy"]:
pl_logger = logging.getLogger(_logger)
pl_logger.propagate = False
pl_logger.setLevel(logging.ERROR)
warnings_to_ignore = [
"Number of distinct clusters",
"Will set it as 'np.nan'",
"Input data for shapiro has range zero",
"Can't extract feature",
"invalid value encountered in divide",
"Mean of empty slice",
"It is not possible make equal discretization",
"invalid value encountered in scalar divide",
"invalid value encountered in double_scalars",
"invalid value encountered in double scalars",
]
for warning_to_ignore in warnings_to_ignore:
warnings.filterwarnings("ignore", f".*{warning_to_ignore}.*")
module_available = True
[docs]
def bench_kmeans_unsupervised(batch, n_clusters=[2, 4, 8, 16, 32], testing=False):
"""
Calculate k-means clusters for a batch of data.
:param batch: The batch of data.
:param n_clusters: Number of clusters to calculate, defaults to [2, 4, 8, 16, 32],
:param testing: Flag for testing and calculating with a smaller batch, defaults to False,
:return: A dictionary of {'kmeans':calculated_kmeans_clusters}
"""
if testing == True:
n_clusters = [2, 4, 8]
# flatten all but first dimension
batch = batch.reshape(batch.shape[0], -1)
kmeans_results = {}
batch_size = batch.shape[0]
for nc in n_clusters:
if nc > batch_size:
continue
start_time = time.time()
kmeans = KMeans(n_clusters=nc, init="random", n_init="auto") # KMeans
labels = kmeans.fit_predict(batch)
inertia = kmeans.inertia_
silhouette_avg = silhouette_score(
batch, labels
) # x is your data, labels are cluster labels
ch_score = calinski_harabasz_score(batch, labels)
db_score = davies_bouldin_score(batch, labels)
end_time = time.time()
kmeans_results[nc] = {
"inertia": inertia,
"silhouette_score": silhouette_avg,
"calinski_harabasz_score": ch_score,
"davies_bouldin_score": db_score,
"time_taken": end_time - start_time,
}
# print(kmeans_results)
# STOP
return {"kmeans": kmeans_results}
[docs]
def pad_image_channels(x, desired_channels=3):
"""
Pad an image with extra channels.
Uses dimeension order [batch, channel, width, height].
:param x: The image tensor to pad.
:param desired_channels: Desired number of channels, defaults to 3.
:return: The padded tensor.
"""
# Calculate the number of channels to pad
channels_to_pad = desired_channels - x.shape[1]
# Create a tensor with zeros for padding
padding_tensor = torch.zeros((x.shape[0], channels_to_pad, x.shape[2], x.shape[3]))
# Concatenate the original tensor and the padding tensor along the channel dimension
padded_tensor = torch.cat((x, padding_tensor), dim=1)
return padded_tensor
[docs]
def sample_image_channels(x, num_sample=3):
"""
Sample random channels from an image [batch_size, channel, width, height].
:param x: The image tensor to sample from.
:param num_sample: Number of channels to sample, defaults to 3.
:return: A tensor of sampled channels.
"""
# Generate random indices for channel selection
random_indices = torch.randperm(x.shape[1])[:num_sample]
# Select the channels using the random indices
selected_channels = x[:, random_indices]
return selected_channels
[docs]
def sample_image_from_video(x, num_channels=1):
"""
Sample 3-channel images from a video tensor [batch_size, frames, channels, width, height].
:param x: The video tensor.
:param num_channels: The number of channels to sample.
:return: A tensor of images.
"""
# Generate random channel indices for each batch element
random_channel_indices = torch.randint(0, x.shape[1], (x.shape[0], num_channels))
# Use the channel indices to select the channels
selected_channels = x[
torch.arange(x.shape[0]), # Batch indices
random_channel_indices[:, 0], # Random channel indices
:,
:,
:, # All spatial dimensions
]
# Add a new dimension to the selected_channels tensor
selected_channels = selected_channels.squeeze(1)
return selected_channels
[docs]
def manipulate_x_5(x):
"""
Process a 5-dimensional tensor, assumed to be video-like [batch_size, frames, channels, width, height], into image-like [batch_size, channels, width, height].
:param x: The tensor.
:return: A subsample of the tesnor
"""
x = sample_image_from_video(x)
return x
[docs]
def manipulate_x_4(x):
"""
Process a 4-dimensional tensor, assumed to be image-like [batch_size, channelw, width, height], into subchannels
:param x: The image to process.
:return: Sampled channels from the image.
"""
if x.shape[1] < 3:
x = pad_image_channels(x)
elif x.shape[1] > 3:
x = sample_image_channels(x)
global fixed_resize
resized_tensor = F.interpolate(
x, size=fixed_resize, mode="bilinear", align_corners=False
)
return resized_tensor
[docs]
def manipulate_x_3(x):
"""
Process a 3-dimensional tensor [batch_size, width, height] by resizing to a fixed size.
:param x: The tensor.
:return: The tensor, resized.
"""
global fixed_resize
try:
resized_tensor = F.interpolate(
x.unsqueeze(1), size=fixed_resize, mode="bilinear", align_corners=False
)
resized_tensor = resized_tensor.squeeze(1)
except:
return x
return resized_tensor
[docs]
def manipulate_x_2(x):
"""
Subsample a 2D tensor to the first 10000 values.
:param x: The tensor to subsample.
:return: A subsample of the tensor.
"""
# if data is very large sample first few elements
if x.shape[-1] > 10000:
x = x[:, :10000]
return x
[docs]
def manipulate_x_1(x):
"""
Unsqueeze a 1D tensor.
:param x: The tensor.
:return: The tensor with an extra beginning dimension.
"""
return x.unsqueeze(1)
[docs]
def get_image_features(x, testing=False):
"""
Get features for a batch of image data.
:param x: The batch of image data.
:param testing: Flag to calculate on a smaller test subsample of the data, defaults to False.
:return: A dictionary of the features.
"""
# assumptions: x has the following structure (num,ch,h,w), or (num,?,ch,h,w)
# cases
# - x shape : (num,3,h,w): all below should work if h&w are compatible
# - x shape : (num,h,w): output only raw
# - x shape : (num,w): output only raw
# - x shape : (num): output only raw
# --- manipulate x ---
# - x shape : (num,<3,h,w): duplicate image chanels? zero image channels?
# - x shape : (num,>3,h,w): take first 3 ch? randomly sample 3 channels?
# - x shape : (num,?,ch,h,w): randomly take a slice of "video", then treat as above case ...
# print('x shape before manipulation: ',x.shape)
x_raw = x
if len(x.size()) == 5:
if x.size()[2] != min(list(x.size())):
print(
"WARNING: We require datasets to be formatted (num_dataset_examples,num_images,num_ch,h,w). Encountered tensor from dataset of size {}".format(
x.size()
)
)
x = manipulate_x_5(x)
if len(x.size()) == 4:
if x.size()[1] != min(list(x.size())):
print(
"WARNING: We require datasets to be formatted (num_dataset_examples,num_ch,h,w). Encountered tensor from dataset of size {}".format(
x.size()
)
)
x = manipulate_x_4(x)
if len(x.size()) == 3:
x = manipulate_x_3(x)
if len(x.size()) == 2:
x = manipulate_x_2(x)
if len(x.size()) == 1:
x = manipulate_x_1(x)
assert (
len(x.size()) <= 5
), "datastats: We can only accommodate datasets of up up to 5 dimensions, Encountered tensor from dataset of size {}".format(
x.size()
)
assert (
len(x.size()) > 1
), "datastats: We can only accommodate datasets of between 2 and 5 dimensions, Encountered tensor from dataset of size {}".format(
x.size()
)
# ------------------------------------------------
# print('x shape after manipulation: ',x.shape)
# sleep(5)
if testing == True:
# Load the pre-trained models
model_resnet = resnet18(pretrained=True)
model_resnet.fc = torch.nn.Identity()
# Set the models to evaluation mode
model_resnet.eval()
name_model_pairs = [["resnet18", model_resnet]]
else:
# TODO - Deprectaed, whole function should be refactored or removed
# Load the pre-trained models
model_resnet = model_vgg = None
# model_resnet = resnet18(pretrained=True)
# model_resnet.fc = torch.nn.Identity()
# model_vgg = vgg16(pretrained=True)
# model_vgg.classifier = torch.nn.Sequential(
# *list(model_vgg.classifier.children())[:-1]
# )
# # Set the models to evaluation mode
# model_resnet.eval()
# model_vgg.eval()
name_model_pairs = [
["resnet18", model_resnet],
["vgg16", model_vgg],
# ['densenet121',model_densenet],
# ['alexnet',model_alexnet],
# ['mobilenet_v3_small',model_mobile_small],
# ['vit_l_32',model_vit_l_32],
]
feature_dict = {}
for pair in name_model_pairs:
# feature_dict[pair[0]] = extract_features(pair[1], x)
try:
feature_dict[pair[0]] = torch.zeros(1,1)
# feature_dict[pair[0]] = extract_features_from_model(pair[1], x)
except:
# if model is not compatible with data, just skip for now
pass
feature_dict["raw"] = x_raw
return feature_dict
# NEED TO UPDATE THIS ON OTHER SIDE
[docs]
def sample_dataloader(train_dataloader, num_sample):
"""
Sample batches from a dataloader.
:param train_dataloader: The dataloader to sample from.
:param num_sample: The number of samples.
:return: A tuple of dataset_size, batch_elements, and the original size of the batch.
"""
# goal: take dataloader, sample batches, seperate elements into own arrays for indpendent analysis
# assumptions:
# - batches may be shuffled or not
# - user may have memory constraints so loading all batches into memory is not viable
# - prior to calling this function we know how many samples we want to take
# - we don't know the full size of the dataset and we need to return this
# 1: Loop through all batches counting number of batches and getting batch_size
num_batches = 0
try:
for i, batch in enumerate(train_dataloader):
if i == 0:
# if type(batch)==list or type(batch)==tuple:
if type(batch) in [list, tuple]:
_subbatch = batch[0]
batch_size = _subbatch.size()[0]
num_batch_elements = len(batch)
else:
# assume train_dataloader returns a tensor
batch_size = batch.size()[0]
num_batch_elements = 1
# print(type(batch))
# print(batch_size)
num_batches += 1
except:
batch_size = train_dataloader.batch_size
num_batches = len(train_dataloader.dataset) // batch_size
num_batch_elements = len(next(iter(train_dataloader)))
pass
assert num_batches != 0, "num_batches={}".format(num_batches)
# 2: randomly sample batches at specific inds that total sampling size ~ num_sample: easier on memory
num_batches_to_sample = num_sample // batch_size
num_batches_to_sample = min(
[num_batches_to_sample, num_batches - 1]
) # handles case where num_sample>size of dataset
inds_to_sample = set(
random.sample(list(np.arange(num_batches - 1)), num_batches_to_sample)
) # -1 avoids incommensurate batches
assert (
len(inds_to_sample) == num_batches_to_sample
), "len(inds_to_sample)={},num_batches_to_sample={}".format(
len(inds_to_sample), num_batches_to_sample
)
sampled_batches = [
batch for i, batch in enumerate(train_dataloader) if i in inds_to_sample
]
assert num_batches_to_sample == len(
sampled_batches
), "num_batches_to_sample={} != len(sampled_batches)={}".format(
num_batches_to_sample, len(sampled_batches)
)
# 3: organize batch_elements into their own arrays
dataset_size = num_batches * batch_size
batch_elements = []
try:
for i in range(num_batch_elements):
batch_elements.append(
torch.concat([torch.Tensor(b[i]).cpu() for b in sampled_batches])
)
except:
pass
batch_elements_orig_shapes = [b.shape for b in batch_elements]
return dataset_size, batch_elements, batch_elements_orig_shapes
[docs]
def get_n_samples(dataloader, n_samples=100):
"""
Get a number of samples from a dataloader
:param dataloader: The dataloader.
:param n_samples: The number of samples, defaults to 100.
:return: An iterable of batch elements, each of length n_samples.
"""
batch = next(iter(dataloader))
while len(batch[0]) < n_samples:
_batch = next(iter(dataloader))
# for b,_b in zip(batch, _batch):
for i,b in enumerate(_batch):
if isinstance(b, torch.Tensor):
batch[i] = torch.cat((batch[i],b), dim=0)
else:
batch[i] = list(batch[i]) + list(b)
# batch[i].append(b)
batch[i] = batch[i][:n_samples]
return batch
[docs]
class TextDataMetafeatures(DataMetafeatures):
def __init__(self, dataloader, nlp_model=None, *args, **kwargs):
super().__init__(dataloader, *args, **kwargs)
# self.dataloader = dataloader
if not nlp_model:
# TODO - consider using a larger model embedding e.g. en_code_web_sm -> 300D
# and truncate
self.nlp_model = spacy.load('en_core_web_sm')
self.embedding = self.get_embedding()
pass
[docs]
def get_embedding(self, index=None, max_len=100, *args, **kwargs):
"""
Get embeddings from the dataloader.
:param index: The index in a batch of the string elements to embed, defaults to 1
:return: A dictionary of {embd_i : embd_value}
"""
samples = get_n_samples(self.dataloader)
# Find the index of the first batch of strings
if not index:
index = 0
while not isinstance(samples[index][0], str):
index += 1
if index == len(samples):
raise IndexError(f"No string elements in {self}, cannot calculate embedding with spaCy")
embds = list(map(lambda x: self.nlp_model(x).vector, samples[index]))
embds = torch.Tensor(embds)
embds = embds[:,:max_len]
# Return distributions of each embedding axis
# TODO - consider how batch elements are sorted, should they be indexed by the index that the
# string elements appear? at "embd_{INDEX}..."
# ret = {f'embd_{index}_mean_{i}':float(v.numpy()) for i,v in enumerate(embds.mean(axis=0))}
# ret.update({f'embd_{index}_std_{i}':float(v.numpy()) for i,v in enumerate(embds.std(axis=0))})
# TODO - consider this assumption, not indexing the embeddings at all
ret = {f'embd_mean_{i}':float(v.numpy()) for i,v in enumerate(embds.mean(axis=0))}
ret.update({f'embd_std_{i}':float(v.numpy()) for i,v in enumerate(embds.std(axis=0))})
return ret
breakpoint()