from typing import Dict, List, Optional, Sequence, Tuple, Type, Union, overload
import numpy as np
import torch
from numpy.typing import NDArray
from torch import Tensor
__all__ = [
"ArrayOps",
"linear_interpolation",
"adjust_vector_angle",
"align_vectors",
]
[docs]
class ArrayOps:
[docs]
@staticmethod
def floor(
array: Union[NDArray, Tensor, Sequence[Union[float, int]], float, int]
) -> Union[NDArray, Tensor]:
if isinstance(array, (np.ndarray, list, tuple, float, int)):
return np.floor(array)
if isinstance(array, Tensor):
return array.floor()
raise TypeError(f"Invalid type for flooring: {type(array)}")
[docs]
@staticmethod
def ceil(
array: Union[NDArray, Tensor, Sequence[Union[float, int]], float, int]
) -> Union[NDArray, Tensor]:
if isinstance(array, (np.ndarray, list, tuple, float, int)):
return np.ceil(array)
if isinstance(array, Tensor):
return array.ceil()
raise TypeError(f"Invalid type for ceiling: {type(array)}")
[docs]
@staticmethod
def take(
array: Union[NDArray, Tensor, List],
index: Union[NDArray, Tensor, List, int],
dim: int = 0,
) -> Union[NDArray, Tensor]:
if isinstance(array, (np.ndarray, list, tuple)):
if not isinstance(index, np.ndarray):
index = np.array(index)
return np.take(array, index.astype(int), axis=dim)
if isinstance(array, Tensor):
if not isinstance(index, Tensor):
index = torch.tensor(index)
return array.index_select(dim, index.type(torch.long))
raise TypeError(f"Invalid type for taking: {type(array)}")
[docs]
@staticmethod
def cast(
x: Union[NDArray, Tensor, Sequence[Union[float, int]]],
data_type: Type[Union[np.ndarray, Tensor]],
_dtype: Optional[Union[Type[torch.dtype], Type[np.dtype], Type]] = None,
) -> Union[NDArray, Tensor]:
"""Typecast some multidimensional data to numpy array or torch Tensor.
Args:
x (Union[NDArray, Tensor, Sequence[Union[float, int]]]): The input array or tensor.
data_type (Type[Union[np.ndarray, Tensor]]): The data type to cast the input array or tensor to.
_dtype (Optional[Union[Type[torch.dtype], Type[np.dtype], Type]], optional): The new data type of the values inside the array. None means original dtype is kept. Defaults to None.
Raises:
ValueError: If the `data_type` is not np.ndarray or Tensor.
Returns:
Union[NDArray, Tensor]: The casted array or tensor.
"""
if data_type == np.ndarray:
return np.array(x, dtype=_dtype)
if data_type == Tensor:
type_map: Dict = {int: torch.int64, float: torch.float64}
_dtype = type_map.get(_dtype, _dtype)
if isinstance(x, np.ndarray):
x = torch.from_numpy(x)
if isinstance(x, Tensor):
return x.to(dtype=_dtype) # type: ignore
return torch.tensor(x, dtype=_dtype) # type: ignore
raise ValueError(f"Invalid data_type for array casting: {data_type}")
[docs]
@staticmethod
def norm(
x: Union[NDArray, Tensor, Sequence[Union[float, int]]],
dim: Optional[int] = None,
keepdim=False,
) -> Union[NDArray, Tensor]:
"""
Compute the norm of a given array or tensor along a specified dimension.
Args:
x (Union[NDArray, Tensor, Sequence[Union[float, int]]]): The input array or tensor.
dim (Optional[int]): The dimension along which to compute the norm. If None, the norm is computed over the entire array or tensor. Default is None.
keepdim (bool): Whether to keep the dimension of the input array or tensor after computing the norm. Default is False.
Returns:
Union[NDArray, Tensor]: The norm of the input array or tensor.
Raises:
TypeError: If the input type is not supported.
"""
if isinstance(x, (np.ndarray, list, tuple)):
return np.linalg.norm(x, axis=dim, keepdims=keepdim) # type: ignore
if isinstance(x, Tensor):
return x.norm(dim=dim, keepdim=keepdim)
raise TypeError(f"Invalid type for norm: {type(x)}")
[docs]
@staticmethod
def svd(
x: Union[NDArray, Tensor, Sequence[Sequence[Union[float, int]]]]
) -> Tuple[Union[NDArray, Tensor], Union[NDArray, Tensor], Union[NDArray, Tensor]]:
"""
Compute the singular value decomposition of a given array or tensor.
Args:
x (Union[NDArray, Tensor, Sequence[Sequence[Union[float, int]]]]): The input array or tensor.
Returns:
Tuple[Union[NDArray, Tensor], Union[NDArray, Tensor], Union[NDArray, Tensor]]: The (Rotation, coordinate scaling, reflection) matrices of the input array or tensor.
Raises:
TypeError: If the input type is not supported.
"""
if isinstance(x, (np.ndarray, list, tuple)):
return np.linalg.svd(x)
if isinstance(x, Tensor):
U, S, V = x.svd()
return U, S, V.T
raise TypeError(f"Invalid type for svd: {type(x)}")
[docs]
@staticmethod
def top_k(
x: Union[NDArray, Tensor, Sequence[Union[float, int]]],
k: int,
dim: int = -1,
largest=True,
) -> Union[Tuple[NDArray, NDArray], Tuple[Tensor, Tensor]]:
"""
Compute the top k values and their indices along a specified dimension of a given array or tensor.
Args:
x (Union[NDArray, Tensor, Sequence[Union[float, int]]]): The input array or tensor.
k (int): The number of top values to return.
dim (int, optional): The dimension along which to compute the top k values. Default is -1.
largest (bool, optional): Whether to return the largest or smallest k values. Default is True.
Returns:
Tuple[Union[NDArray, Tensor], Union[NDArray, Tensor]]: The top k values and their indices along the specified dimension.
Raises:
TypeError: If the input type is not supported.
"""
x_tensor = ArrayOps.cast(x, Tensor)
values, indices = x_tensor.topk(k, dim=dim, largest=largest, sorted=True)
if isinstance(x, np.ndarray):
return values.numpy(), indices.numpy()
return values, indices
@overload
@staticmethod
def concatenate(arrays: Sequence[NDArray], dim: int = 0) -> NDArray: ...
@overload
@staticmethod
def concatenate(arrays: Sequence[Tensor], dim: int = 0) -> Tensor: ...
[docs]
@staticmethod
def concatenate(
arrays: Union[Sequence[NDArray], Sequence[Tensor]], dim: int = 0
) -> Union[NDArray, Tensor]:
"""
Concatenate a sequence of arrays or tensors along a specified dimension.
Args:
arrays (Union[Sequence[NDArray], Sequence[Tensor]]): The sequence of arrays or tensors to concatenate.
dim (int, optional): The dimension along which to concatenate the arrays or tensors. Default is 0.
Returns:
Union[NDArray, Tensor]: The concatenated array or tensor.
Raises:
TypeError: If the input type is not supported.
"""
if isinstance(arrays[0], np.ndarray):
return np.concatenate(arrays, axis=dim)
if isinstance(arrays[0], Tensor):
return torch.concatenate(
[(x if isinstance(x, Tensor) else torch.tensor(x)) for x in arrays],
dim=dim,
)
raise TypeError(f"Invalid type for concatenation: {type(arrays[0])}")
@overload
@staticmethod
def abs(x: NDArray) -> NDArray: ...
@overload
@staticmethod
def abs(x: Tensor) -> Tensor: ...
[docs]
@staticmethod
def abs(x: Union[NDArray, Tensor]) -> Union[NDArray, Tensor]:
"""
Compute the absolute value of a given array or tensor.
Args:
x (Union[NDArray, Tensor]): The input array or tensor.
Returns:
Union[NDArray, Tensor]: The absolute value of the input array or tensor.
Raises:
TypeError: If the input type is not supported.
"""
if isinstance(x, np.ndarray):
return np.abs(x)
if isinstance(x, Tensor):
return x.abs()
raise TypeError(f"Invalid type for absolute value: {type(x)}")
@overload
@staticmethod
def copy(x: NDArray) -> NDArray: ...
@overload
@staticmethod
def copy(x: Tensor) -> Tensor: ...
[docs]
@staticmethod
def copy(x: Union[NDArray, Tensor]) -> Union[NDArray, Tensor]:
"""
Create a copy of a given array or tensor.
Args:
x (Union[NDArray, Tensor]): The input array or tensor.
Returns:
Union[NDArray, Tensor]: A deep copy of the input array or tensor.
Raises:
TypeError: If the input type is not supported.
"""
if isinstance(x, np.ndarray):
return np.copy(x)
if isinstance(x, Tensor):
return torch.clone(x)
raise TypeError(f"Invalid type for copying array: {type(x)}")
[docs]
@staticmethod
def linspace(
start: Union[float, int],
end: Union[float, int],
n_steps: int,
data_type: Type[Union[np.ndarray, Tensor]] = np.ndarray,
endpoint=True,
) -> Union[NDArray, Tensor]:
"""Generate an array or tensor with equally spaced values between `start` and `end`.
Args:
start (Union[float, int]): The starting value of the sequence. The value is inclusive.
end (Union[float, int]): The end value of the sequence. The value is inclusive if `endpoint` is True.
n_steps (int): The number of samples to generate. Must be non-negative.
data_type (Type[Union[np.ndarray, Tensor]], optional): The data type of the output array. Defaults to np.ndarray.
endpoint (bool, optional): Whether to include the `end` value in the sequence. Defaults to True.
Raises:
ValueError: If `data_type` is not np.ndarray or Tensor.
Returns:
Union[NDArray, Tensor]: The generated array or tensor.
"""
if data_type == np.ndarray:
return np.linspace(start, end, n_steps, endpoint=endpoint)
if data_type == Tensor:
return torch.linspace(start, end, n_steps + int(not endpoint))[
: -int(not endpoint) or None
]
raise ValueError(f"Invalid `data_type` arg for creating linspace: {data_type}")
[docs]
@staticmethod
def random_normal(
size: Sequence[int],
loc: Union[float, int] = 0,
scale: Union[float, int] = 1,
start: Union[float, int] = float("-inf"),
end: Union[float, int] = float("inf"),
data_type: Type[Union[np.ndarray, Tensor]] = np.ndarray,
) -> Union[NDArray, Tensor]:
"""Generate an array or tensor of the specified shape filled with random values from a normal (Gaussian) distribution.
Optionally truncate the distribution to the range [`start`, `end`].
Args:
size (Sequence[int]): The shape of the output array or tensor.
loc (Union[float, int], optional): The mean ("centre") of the distribution. Defaults to 0.
scale (Union[float, int], optional): The standard deviation (spread or "width") of the distribution. Must be non-negative. Defaults to 1.
start (Union[float, int], optional): The lower bound of the distribution. Defaults to float("-inf").
end (Union[float, int], optional): The upper bound of the distribution. Defaults to float("inf").
data_type (Type[Union[np.ndarray, Tensor]], optional): The data type of the output array. Defaults to np.ndarray.
Raises:
ValueError: If `data_type` is not np.ndarray or torch.Tensor.
Returns:
Union[NDArray, Tensor]: The random values filled array or tensor.
Note:
Uses torch's random number generator to generate random values even for NumPy arrays.
"""
if data_type not in (np.ndarray, Tensor):
raise ValueError(f"Invalid `data_type` arg for random normal: {data_type}")
random_values = (
torch.normal(loc, scale, size)
if start == float("-inf") and end == float("inf")
else torch.nn.init.trunc_normal_(torch.empty(size), loc, scale, start, end)
)
if data_type == np.ndarray:
random_values = random_values.numpy()
return random_values
@overload
@staticmethod
def steps(
n_steps: int,
anchors: Tensor = torch.Tensor([0, -1, 2]),
random_uniform_frac: float = 0.2,
random_normal_frac: float = 0.3,
n_clusters: int = 1,
cluster_std: Optional[float] = None,
anchor_spacing_blend: float = 0.5,
) -> Tensor: ...
@overload
@staticmethod
def steps(
n_steps: int,
anchors: Union[NDArray, Sequence[Union[float, int]]] = (0, 1),
random_uniform_frac: float = 0.2,
random_normal_frac: float = 0.3,
n_clusters: int = 1,
cluster_std: Optional[float] = None,
anchor_spacing_blend: float = 0.5,
) -> NDArray: ...
[docs]
@staticmethod
def steps(
n_steps: int,
anchors: Union[NDArray, Tensor, Sequence[Union[float, int]]] = (0, 1),
random_uniform_frac: float = 0.2,
random_normal_frac: float = 0.3,
n_clusters: int = 1,
cluster_std: Optional[float] = None,
anchor_spacing_blend: float = 0.5,
) -> Union[NDArray, Tensor]:
"""Generates a sequence of steps based on a combination of linear interpolation,
random uniform distribution, and random normal distribution.
Args:
n_steps (int): The total number of steps to generate.
anchors (Union[NDArray, Tensor, Sequence[Union[float, int]]], optional): The points between & through which the steps are interpolated. Defaults to (0, 1).
random_uniform_frac (float, optional): The fraction of steps generated using a random uniform distribution. Must be between 0 and 1. Defaults to 0.2.
random_normal_frac (float, optional): The fraction of steps generated using a random normal distribution. Must be between 0 and 1. Defaults to 0.3.
n_clusters (int, optional): The number of concentrated spots to add using the random normal (gaussian distribution) steps (around cluster centroids selected from a uniform distribution). Defaults to 1.
cluster_std (Optional[float], optional): The standard deviation (spread) of the normal distribution generating the concentrated spots. If None, it is calculated based on the anchor gap and number of clusters (`std(gaps)/10/n_clusters`). Defaults to None.
anchor_spacing_blend (float, optional): A blend factor between equal anchor spacing (1) and spacing based on the distances between anchor points (0). Defaults to 0.5.
Raises:
ValueError: If the sum of `random_uniform_frac` and `random_normal_frac` exceeds 1, or if either is negative.
Returns:
Union[NDArray, Tensor]: The generated sequence of steps.
Examples:
.. code-block:: python
import torch
from sign_language_translator.utils import ArrayOps
# you should plot the following arrays on a graph for better understanding
anchors = [0, 1, -2, 0, 5, 2]
# Basic linear interpolation with no randomness and equal anchor spacing
steps = ArrayOps.steps(9, anchors, 0, 0, 0, 0, anchor_spacing_blend=0)
# array([ 0. , 0.25, -1.5 , -0.75, 1. , 2.75, 4.5 , 3.75, 2. ])
# Linear interpolation with no randomness and anchor spacing based on distances
steps = ArrayOps.steps(9, anchors, 0, 0, 0, 0, anchor_spacing_blend=1)
# array([ 0. , 0.625, 0.25 , -1.625, -1. , 0.625, 3.75 , 3.875, 2. ])
# A blend of equal and distance-based anchor spacing with no randomness
steps = ArrayOps.steps(9, torch.Tensor(anchors), 0, 0, 0, 0, anchor_spacing_blend=0.5)
# Tensor([ 0. , 0.921, -0.655, -1.625, -0.167, 1.987, 4.231, 3.81 , 2. ])
# Adding uniform randomness to the steps
steps = ArrayOps.steps(9, anchors, 0.5, 0, 0, 0, anchor_spacing_blend=1)
# array([ 0. , 0.25 , -1. , -0.895, 0.214, 1.346, 3.75 , 3.777, 2. ])
# Adding 2 concentration spots using gaussian randomness
steps = ArrayOps.steps(9, anchors, 0, 0.5, 2, 0.1, anchor_spacing_blend=0)
# array([ 0. , 0.99 , 0.924, -1.5 , 1. , 4.5 , 4.872, 4.025, 2. ])
# Combining uniform and normal randomness
steps = ArrayOps.steps(9, anchors, 0.2, 0.3, 2, 0.1, anchor_spacing_blend=0.5)
# array([ 0. , 0.069, -1.333, 0.468, 1.538, 3.835, 4.897, 4.267, 2. ])
"""
if (
random_uniform_frac < 0
or random_normal_frac < 0
or random_uniform_frac + random_normal_frac > 1
):
raise ValueError(
f"Invalid fractions: {random_uniform_frac=}, {random_normal_frac=}."
"Provide non-negative values summing to <=1."
)
if anchor_spacing_blend < 0 or anchor_spacing_blend > 1:
raise ValueError(f"Invalid {anchor_spacing_blend=}. Expected 0<=blend<=1")
if not isinstance(anchors, (np.ndarray, Tensor)):
anchors = np.array(anchors)
n_uniform_steps = int(n_steps * random_uniform_frac)
n_normal_steps = (
int(n_steps * random_normal_frac / n_clusters) if n_clusters > 0 else 0
)
n_linear_steps = n_steps - n_uniform_steps - n_normal_steps * n_clusters
# calculate the distance between anchors and normalize it and then blend with equal anchor spacing
anchor_gap = np.cumsum(np.abs(np.diff(anchors, axis=-1, prepend=anchors[0])))
anchor_gap = anchor_gap / anchor_gap[-1] * (1 - anchor_spacing_blend)
anchor_gap = anchor_gap + np.linspace(0, 1, len(anchors)) * anchor_spacing_blend
cluster_std = cluster_std or float(np.std(anchor_gap)) / 10 * max(n_clusters, 1)
new_x = np.concatenate(
[
np.linspace(0, 1, n_linear_steps),
ArrayOps.random_uniform((n_uniform_steps,), 0, 1),
]
+ [
ArrayOps.random_normal(
(n_normal_steps,), np.random.rand(), cluster_std, 0, 1
)
for _ in range(n_clusters)
]
)
new_x = np.sort(new_x)
return linear_interpolation(anchors, new_x, old_x=anchor_gap, dim=-1)
@overload
def linear_interpolation(
array: Union[NDArray[np.number], Sequence],
new_x: Union[Sequence[Union[int, float]], NDArray[np.number], Tensor],
old_x: Union[Sequence[Union[int, float]], NDArray[np.number], Tensor, None] = None,
dim: int = 0,
) -> NDArray: ...
@overload
def linear_interpolation(
array: Tensor,
new_x: Union[Sequence[Union[int, float]], NDArray[np.number], Tensor],
old_x: Union[Sequence[Union[int, float]], NDArray[np.number], Tensor, None] = None,
dim: int = 0,
) -> Tensor: ...
[docs]
def linear_interpolation(
array: Union[NDArray[np.number], Tensor, Sequence],
new_x: Union[Sequence[Union[int, float]], NDArray[np.number], Tensor],
old_x: Union[Sequence[Union[int, float]], NDArray[np.number], Tensor, None] = None,
dim: int = 0,
) -> Union[NDArray, Tensor]:
"""
Perform linear interpolation on a multidimensional array or tensor along a dimension.
This function essentially connects all consecutive values in a multidimensional array with
straight lines along a specified dimension, so that intermediate values can be calculated.
It takes the input array, a set of new indexes or alternatively new & old coordinate values,
and a dimension along which to perform interpolation.
Parameters:
array (NDArray[np.number] | Tensor | List): The input array or tensor to interpolate.
new_x (Sequence[int | float] | NDArray[np.number] | Tensor): The new index values or coordinate values at which to calculate the intermediate values from `array`. Must be 1D. Order of values does not matter. if `old_x` is not provided, these values are relative to the index of the data in `array` i.e. [0, 1, 2, ...] and negative indexes are allowed. If `old_x` is provided, all `new_x` values must be within it's bounds.
old_x (Sequence[int | float] | NDArray[np.number] | Tensor | None, optional): The old *coordinate* values corresponding to the data in `array` along the `dim`. Must be 1D and strictly sorted ascending. Can contain negative numbers. If None, method assumes it to be a linear sequence starting at 0 and growing with step +1 i.e. `[0, 1, 2, ...]` like the index of `array`.
dim (int, optional): The dimension along which to perform interpolation. Default is 0.
Returns:
NDArray | Tensor: The result of linear interpolation along the specified dimension.
Raises:
ValueError: If `new_x` or `old_x` is not 1 dimensional.
Examples:
.. code-block:: python
data = np.array([1, 2, 3, 5])
new_indexes = np.array([1.5, 0.5, 2.5])
interpolated_data = linear_interpolation(data, new_indexes)
print(interpolated_data)
# array([2.5, 1.5, 4. ])
old_x = [0, 4, 4.5, 5]
new_x = [0, 1, 2, 2.5, 3, 4, 5]
interpolated_data = linear_interpolation(data, new_x, old_x=old_x)
print(interpolated_data)
# array([1. , 1.25 , 1.5 , 1.625, 1.75 , 2. , 5. ])
positional_embedding_table = torch.randn(100, 768) # (max_seq_len, embedding_dim)
intermediate_positions = torch.linspace(0, 99, 500)
new_embedding_table = linear_interpolation(positional_embedding_table, intermediate_positions, dim=0)
# new_embedding_table.shape -> (500, 768) # (new_max_seq_len, embedding_dim)
Note:
This function supports both NumPy arrays and PyTorch tensors as input and preserves gradient.
"""
if not isinstance(array, (np.ndarray, Tensor)):
array = np.array(array)
indexes, dim = __validate_lin_interp_args(array, new_x, old_x, dim)
# interpolate (Magic!)
floored_indexes = ArrayOps.floor(indexes)
ceiled_indexes = ArrayOps.ceil(indexes)
fraction = (indexes - floored_indexes).reshape(
[len(indexes) if d == dim else 1 for d in range(array.ndim)]
)
interpolated = ArrayOps.take(array, floored_indexes, dim) * (1 - fraction)
interpolated = interpolated + ArrayOps.take(array, ceiled_indexes, dim) * fraction
return interpolated
def __validate_lin_interp_args(array: Union[NDArray, Tensor], new_x, old_x, dim: int):
if dim < -1 * array.ndim or dim >= array.ndim:
raise ValueError(f"Invalid dim: {dim} use -{array.ndim} <= d <= {array.ndim-1}")
if dim < 0:
dim = dim + array.ndim
if old_x is not None:
if len(old_x) != array.shape[dim]:
raise ValueError(f"Invalid old_x: ({len(old_x)=}) != ({array.shape[dim]=})")
old_x, new_x = np.array(old_x), np.array(new_x)
if (out_of_bounds := (new_x < old_x.min()) | (new_x > old_x.max())).any():
raise ValueError(
f"Invalid values in new_x: {new_x[out_of_bounds].tolist()}. "
f"Must be in range {old_x.min()} <= x <= {old_x.max()}."
)
differences = old_x[:-1] - old_x[1:]
if not ((is_descending := (differences > 0).all()) or (differences < 0).all()):
raise ValueError("Provide sorted `old_x` (otherwise rearrange the array)")
indexes = (
np.interp(new_x, old_x[::-1], np.arange(len(old_x))[::-1])
if is_descending
else np.interp(new_x, old_x, np.arange(len(old_x)))
)
else:
if not isinstance(new_x, (np.ndarray, Tensor)):
new_x = np.array(new_x)
if (
out_of_bounds := (new_x < -array.shape[dim]) | (new_x >= array.shape[dim])
).any():
raise ValueError(
f"Invalid new indexes: {new_x[out_of_bounds.tolist()].tolist()}. "
f"Must be in range {-array.shape[dim]} <= x <= {array.shape[dim]-1}."
)
indexes = new_x
indexes = ArrayOps.cast(indexes, type(array))
if indexes.ndim != 1:
raise ValueError(f"Invalid indexes shape: {indexes.shape}. Must be 1D.")
return indexes, dim
@overload
def adjust_vector_angle(
vector_1: Union[NDArray, Sequence[float]],
vector_2: Union[NDArray, Sequence[float]],
scaling_factor: float,
post_normalize: bool = False,
) -> Tuple[NDArray, NDArray]: ...
@overload
def adjust_vector_angle(
vector_1: Tensor,
vector_2: Tensor,
scaling_factor: float,
post_normalize: bool = False,
) -> Tuple[Tensor, Tensor]: ...
[docs]
def adjust_vector_angle(
vector_1: Union[NDArray, Tensor, Sequence[float]],
vector_2: Union[NDArray, Tensor, Sequence[float]],
scaling_factor: float,
post_normalize: bool = False,
) -> Tuple[Union[NDArray, Tensor], Union[NDArray, Tensor]]:
"""Move a pair of vectors away or towards each other in the same plane.
Converge or Diverge a pair of vectors by increasing or decreasing their distance from each other.
The norm or the length of the vectors is preserved.
Args:
vector_1 (NDArray | Tensor): A 1D array of size n representing a word in an n dimensional vector space.
vector_2 (NDArray | Tensor): A 1D array of size n representing another word in an n dimensional vector space.
scaling_factor (float): The scaling factor by which the vector difference should be enhanced or diminished. The fraction of distance between the vectors where new vector should land. (sf > 1 diverges the two vectors. sf = 1 leaves the two vectors unchanged. 0.5 < sf < 1 converges the two vectors. sf = 0.5 makes the two vectors equal to their mean. sf = 0 swaps the two vectors. sf < 0.5 move the vectors away from their mean but in opposite direction.)
post_normalize (bool, optional): Make the magnitude of both output vectors equal to 1 after they have been rotated. Defaults to False.
Returns:
Tuple[NDArray | Tensor, NDArray | Tensor]: moved vectors.
Notes:
.. code-block:: python
# sf > 1 diverges the two vectors
# new_v1 = v2 + 2.00 * (v1 - v2) = 2 * v1 - v2 # more v1, less v2.
# new_v2 = v1 - 2.00 * (v1 - v2) = 2 * v2 - v1 # more v2, less v1.
# sf = 1 leaves the two vectors unchanged
# new_v1 = v2 + 1.00 * (v1 - v2) = v1
# new_v2 = v1 - 1.00 * (v1 - v2) = v2
# 0.5 < sf < 1 converges the two vectors
# new_v1 = v2 + 0.75 * (v1 - v2) = 0.75 * v1 + 0.25 * v2 # weighted average
# new_v1 = v1 - 0.75 * (v1 - v2) = 0.75 * v2 + 0.25 * v1 # weighted average
# sf = 0.5 makes the two vectors equal
# new_v1 = v2 + 0.50 * (v1 - v2) = 0.5 * v1 + 0.5 * v2 # mean
# new_v1 = v1 - 0.50 * (v1 - v2) = 0.5 * v2 + 0.5 * v1 # mean
# sf = 0. swaps the two vectors
# new_v1 = v2 + 0.00 * (v1 - v2) = v2
# new_v2 = v1 + 0.00 * (v1 - v2) = v1
# sf < 0.5 move the vectors away from their mean but in opposite direction
# new_v1 = v2 + (-1) * (v1 - v2) = 2 * v2 - v1 # more v2, less v1.
# new_v2 = v1 - (-1) * (v1 - v2) = 2 * v1 - v2 # more v1, less v2.
"""
# TODO: make it work for batches of vectors or nd-arrays
# TODO: Handle NaNs
if not isinstance(vector_1, (np.ndarray, Tensor)):
vector_1 = np.array(vector_1)
if not isinstance(vector_2, (np.ndarray, Tensor)):
vector_2 = np.array(vector_2)
v1_norm = ArrayOps.norm(vector_1, dim=None)
v2_norm = ArrayOps.norm(vector_2, dim=None)
# make the magnitude of both vectors = 1
vector_1 = vector_1 / (v1_norm or 1)
vector_2 = vector_2 / (v2_norm or 1)
# figure out the dimension of divergence
v1_minus_v2 = vector_1 - vector_2
# move each vector away or towards the other by equal amount
new_v1 = vector_2 + scaling_factor * v1_minus_v2
new_v2 = vector_1 - scaling_factor * v1_minus_v2
# make the magnitude of the new vectors = 1
new_v1 = new_v1 / (ArrayOps.norm(new_v1) or 1)
new_v2 = new_v2 / (ArrayOps.norm(new_v2) or 1)
if not post_normalize:
# restore original magnitudes
new_v1 = new_v1 * v1_norm
new_v2 = new_v2 * v2_norm
return new_v1, new_v2
@overload
def align_vectors(
source_matrix: NDArray, target_matrix: NDArray, pre_normalize: bool = True
) -> NDArray: ...
@overload
def align_vectors(
source_matrix: Tensor, target_matrix: Tensor, pre_normalize: bool = True
) -> Tensor: ...
[docs]
def align_vectors(
source_matrix: Union[NDArray, Tensor],
target_matrix: Union[NDArray, Tensor],
pre_normalize: bool = True,
) -> Union[NDArray, Tensor]:
"""
Align the source matrix to the target matrix using the orthogonal transformation.
Args:
source_matrix (NDArray | Tensor): A 2D array of shape (dictionary_length, embedding_dimension) containing word vectors from source model (or language).
target_matrix (NDArray | Tensor): A 2D array of shape (dictionary_length, embedding_dimension) containing word vectors from target model (or language).
normalize_vectors (bool, optional): Whether to normalize the training vectors before SVD. Defaults to True.
Returns:
NDArray | Tensor: An orthogonal transformation which aligns the source language to the target language.
Note:
This function supports both NumPy arrays and PyTorch tensors as input.
(Based on: https://github.com/babylonhealth/fastText_multilingual)
"""
if not isinstance(source_matrix, (np.ndarray, Tensor)):
source_matrix = np.array(source_matrix)
if not isinstance(target_matrix, (np.ndarray, Tensor)):
target_matrix = np.array(target_matrix)
# optionally normalize the training vectors
if pre_normalize:
src_norm = ArrayOps.norm(source_matrix, dim=1, keepdim=True)
src_norm[src_norm == 0] = 1
source_matrix = source_matrix / src_norm
tgt_norm = ArrayOps.norm(target_matrix, dim=1, keepdim=True)
tgt_norm[tgt_norm == 0] = 1
target_matrix = target_matrix / tgt_norm
# perform the SVD
product = source_matrix.T @ target_matrix
U, _, V = ArrayOps.svd(product)
# return orthogonal transformation which aligns source language to the target
return U @ V