ai-content-maker/.venv/Lib/site-packages/thinc/tests/backends/test_ops.py

1607 lines
52 KiB
Python
Raw Permalink Normal View History

2024-05-03 04:18:51 +03:00
import inspect
import platform
from typing import Tuple, cast
import numpy
import pytest
from hypothesis import given, settings
from hypothesis.strategies import composite, integers
from numpy.testing import assert_allclose
from packaging.version import Version
from thinc.api import (
LSTM,
CupyOps,
NumpyOps,
Ops,
fix_random_seed,
get_current_ops,
get_ops,
use_ops,
)
from thinc.backends._custom_kernels import KERNELS, KERNELS_LIST, compile_mmh
from thinc.compat import has_cupy_gpu, has_torch, torch_version
from thinc.types import Floats2d
from thinc.util import torch2xp, xp2torch
from .. import strategies
from ..strategies import arrays_BI, ndarrays_of_shape
MAX_EXAMPLES = 10
VANILLA_OPS = Ops(numpy) # type:ignore
NUMPY_OPS = NumpyOps()
BLIS_OPS = NumpyOps(use_blis=True)
CPU_OPS = [NUMPY_OPS, VANILLA_OPS]
XP_OPS = [NUMPY_OPS]
if has_cupy_gpu:
XP_OPS.append(CupyOps())
ALL_OPS = XP_OPS + [VANILLA_OPS]
FLOAT_TYPES = ["float32", "float64"]
INT_TYPES = ["int32", "int64"]
REDUCTIONS = ["reduce_first", "reduce_last", "reduce_max", "reduce_mean", "reduce_sum"]
REDUCE_ZERO_LENGTH_RAISES = [
("reduce_first", True),
("reduce_last", True),
("reduce_max", True),
# From a mathematical perspective we'd want mean reduction to raise for
# zero-length sequences, since floating point numbers are not a monoid
# under averaging. However, floret relies on reduce_mean to return a
# zero-vector in this case.
("reduce_mean", False),
("reduce_sum", False),
]
def create_pytorch_funcs():
import math
import torch
def torch_relu(x):
return torch.nn.functional.relu(x)
def torch_relu_k(x):
return torch.nn.functional.relu6(x)
def torch_hard_sigmoid(x):
return torch.clip(x * 0.2 + 0.5, 0, 1)
def torch_hard_tanh(x):
return torch.nn.functional.hardtanh(x)
def torch_mish(x):
return torch.nn.functional.mish(x)
def torch_swish(x):
return torch.nn.functional.silu(x)
def torch_hard_swish(x):
return x * torch_hard_sigmoid(x)
def torch_hard_swish_mobilenet(x):
return torch.nn.functional.hardswish(x)
def torch_sigmoid(x):
return torch.sigmoid(x)
def torch_dish(x):
return 0.5 * x * (x / (1 + x * x).sqrt() + 1)
# https://github.com/huggingface/transformers/blob/master/src/transformers/activations.py#L37
def torch_gelu_approx(x):
return (
0.5
* x
* (
1.0
+ torch.tanh(
math.sqrt(2.0 / math.pi) * (x + 0.044715 * torch.pow(x, 3.0))
)
)
)
def torch_gelu(x):
return torch.nn.functional.gelu(x)
return [
("relu", torch_relu),
("relu_k", torch_relu_k),
("hard_sigmoid", torch_hard_sigmoid),
("hard_tanh", torch_hard_tanh),
("mish", torch_mish),
("swish", torch_swish),
("hard_swish", torch_hard_swish),
("hard_swish_mobilenet", torch_hard_swish_mobilenet),
("dish", torch_dish),
("gelu_approx", torch_gelu_approx),
("gelu", torch_gelu),
("sigmoid", torch_sigmoid),
]
if has_torch:
TORCH_FUNCS = create_pytorch_funcs()
else:
TORCH_FUNCS = []
@pytest.mark.parametrize("op", [NumpyOps, CupyOps])
def test_ops_consistency(op):
"""Test that specific ops don't define any methods that are not on the
Ops base class and that all ops methods define the exact same arguments."""
attrs = [m for m in dir(op) if not m.startswith("_")]
for attr in attrs:
assert hasattr(Ops, attr)
method = getattr(op, attr)
if hasattr(method, "__call__"):
sig = inspect.signature(method)
params = [p for p in sig.parameters][1:]
base_sig = inspect.signature(getattr(Ops, attr))
base_params = [p for p in base_sig.parameters][1:]
assert params == base_params, attr
defaults = [p.default for p in sig.parameters.values()][1:]
base_defaults = [p.default for p in base_sig.parameters.values()][1:]
assert defaults == base_defaults, attr
# If args are type annotated, their types should be the same
annots = [p.annotation for p in sig.parameters.values()][1:]
base_annots = [p.annotation for p in base_sig.parameters.values()][1:]
for i, (p1, p2) in enumerate(zip(annots, base_annots)):
if p1 != inspect.Parameter.empty and p2 != inspect.Parameter.empty:
# Need to check string value to handle TypeVars etc.
assert str(p1) == str(p2), attr
@pytest.mark.parametrize("ops", ALL_OPS)
@pytest.mark.filterwarnings("ignore::RuntimeWarning")
def test_adam_incorrect_inputs(ops):
one = ops.xp.zeros(1, dtype="f")
two = ops.xp.zeros(2, dtype="f")
ops.adam(one, one, one, one, 0.0, 0.0, 0.0, 0.0)
with pytest.raises(ValueError):
ops.adam(two, one, one, one, 0.0, 0.0, 0.0, 0.0)
with pytest.raises(ValueError):
ops.adam(one, two, one, one, 0.0, 0.0, 0.0, 0.0)
with pytest.raises(ValueError):
ops.adam(one, one, two, one, 0.0, 0.0, 0.0, 0.0)
with pytest.raises(ValueError):
ops.adam(one, one, one, two, 0.0, 0.0, 0.0, 0.0)
@pytest.mark.parametrize("ops", ALL_OPS)
def test_alloc(ops):
float_methods = (ops.alloc1f, ops.alloc2f, ops.alloc3f, ops.alloc4f)
for i, method in enumerate(float_methods):
shape = (1,) * (i + 1)
arr = method(*shape)
assert arr.dtype == numpy.float32
assert arr.ndim == len(shape)
arr = ops.alloc_f(shape)
assert arr.dtype == numpy.float32
assert arr.ndim == len(shape)
int_methods = (ops.alloc1i, ops.alloc2i, ops.alloc3i, ops.alloc4i)
for i, method in enumerate(int_methods):
shape = (1,) * (i + 1)
arr = method(*shape)
assert arr.dtype == numpy.int32
assert arr.ndim == len(shape)
arr = ops.alloc_i(shape)
assert arr.dtype == numpy.int32
assert arr.ndim == len(shape)
assert ops.alloc(1).ndim == 1
@pytest.mark.parametrize("ops", XP_OPS)
def test_hash_gives_distinct_keys(ops):
ids = ops.alloc1f(5, dtype="uint64")
keys = ops.hash(ids, 0)
assert keys.shape == (5, 4)
assert keys.dtype == "uint32"
for i in range(len(ids)):
for j in range(keys.shape[1]):
assert keys[i, j] != 0
@pytest.mark.parametrize("ops", XP_OPS)
def test_get_dropout_empty(ops):
shape = (2, 2)
drop = 0.0
mask = ops.get_dropout_mask(shape, drop)
if drop <= 0.0:
assert mask[mask == 1.0].all()
else:
assert mask[mask != 1.0].all()
@pytest.mark.parametrize("ops", XP_OPS)
def test_get_dropout_not_empty(ops):
shape = (200, 200)
drop = 0.5
mask = ops.get_dropout_mask(shape, drop)
assert (mask > 1.0).any()
assert (mask == 0.0).any()
assert mask.shape == shape
@pytest.mark.parametrize("ops", ALL_OPS)
@pytest.mark.parametrize("dtype", FLOAT_TYPES)
@pytest.mark.parametrize("index_dtype", ["int32", "uint32"])
def test_gather_add(ops, dtype, index_dtype):
table = ops.xp.arange(12, dtype=dtype).reshape(4, 3)
indices = ops.xp.array([[0, 2], [3, 1], [0, 1]], dtype=index_dtype)
gathered = ops.gather_add(table, indices)
ops.xp.testing.assert_allclose(
gathered, [[6.0, 8.0, 10.0], [12.0, 14.0, 16.0], [3.0, 5.0, 7.0]]
)
@pytest.mark.parametrize("ops", XP_OPS)
@given(table=strategies.arrays_BI())
def test_gather_add_against_numpy(ops, table):
table = ops.asarray(table)
indices = ops.xp.arange(100, dtype="i").reshape(25, 4) % table.shape[0]
ops.xp.testing.assert_allclose(
ops.gather_add(table, indices),
table[indices].sum(1),
atol=1e-5,
)
@pytest.mark.parametrize("ops", ALL_OPS)
def test_gather_add_oob_raises(ops):
table = ops.xp.arange(12, dtype="f").reshape(4, 3)
indices = ops.xp.array([[0, 2], [3, 1], [5, 1]], dtype="i")
with pytest.raises(IndexError):
ops.gather_add(table, indices)
@pytest.mark.parametrize("ops", CPU_OPS)
def test_seq2col_window_one_small(ops):
seq = ops.asarray([[1.0], [3.0], [4.0], [5]], dtype="float32")
cols = ops.seq2col(seq, 1)
if hasattr(cols, "get"):
cols = cols.get()
assert_allclose(cols[0], [0.0, 1.0, 3.0])
assert_allclose(cols[1], [1.0, 3.0, 4.0])
assert_allclose(cols[2], [3.0, 4.0, 5.0])
assert_allclose(cols[3], [4.0, 5.0, 0.0])
@pytest.mark.parametrize("ops", ALL_OPS)
@pytest.mark.parametrize("dtype", FLOAT_TYPES)
@settings(max_examples=MAX_EXAMPLES, deadline=None)
@given(X=strategies.arrays_BOP())
def test_maxout(ops, dtype, X):
X = ops.asarray(X, dtype=dtype)
expected_best = X.max(axis=-1).astype(dtype)
predicted_best, which = ops.maxout(X)
assert predicted_best.dtype == dtype
ops.xp.testing.assert_allclose(
expected_best, predicted_best, rtol=0.001, atol=0.001
)
# Can't compare 'which' directly, as sort order might be different.
# So, instead we use 'which' to extract elements from X and then
# check the result against the expected output.
ops.xp.testing.assert_allclose(
ops.xp.take_along_axis(X, ops.xp.expand_dims(which, -1), axis=-1),
ops.xp.expand_dims(expected_best, -1),
atol=1e-10,
)
@pytest.mark.parametrize("ops", ALL_OPS)
@pytest.mark.parametrize("dtype", FLOAT_TYPES)
def test_backprop_maxout(ops, dtype):
dX = ops.backprop_maxout(
ops.asarray2f([[1.0, 2.0], [3.0, 4.0]], dtype=dtype),
ops.asarray2i([[1, 0], [2, 1]]),
3,
)
assert dX.dtype == dtype
ops.xp.testing.assert_allclose(
dX,
[[[0.0, 1.0, 0.0], [2.0, 0.0, 0.0]], [[0.0, 0.0, 3.0], [0.0, 4.0, 0.0]]],
)
with pytest.raises(IndexError):
ops.backprop_maxout(
ops.asarray2f([[1.0, 2.0], [3.0, 4.0]]), ops.asarray2i([[1, 0], [3, 1]]), 3
)
@pytest.mark.parametrize("ops", ALL_OPS)
@settings(max_examples=MAX_EXAMPLES, deadline=None)
@given(X=strategies.arrays_BI())
def test_seq2col_window_one(ops, X):
X = ops.asarray(X)
base_ops = Ops()
base_ops.xp = ops.xp
baseX = base_ops.alloc(X.shape) + X
target = base_ops.seq2col(base_ops.asarray(baseX), nW=1)
predicted = ops.seq2col(X, nW=1)
ops.xp.testing.assert_allclose(target, predicted, atol=0.001, rtol=0.001)
@pytest.mark.parametrize("ops", XP_OPS)
@pytest.mark.parametrize("dtype", FLOAT_TYPES)
def test_seq2col_lengths_all_zero(ops, dtype):
# Empty batch
ops.xp.testing.assert_allclose(
ops.alloc((0, 0), dtype=dtype),
ops.seq2col(
ops.alloc((0, 0), dtype=dtype), 1, lengths=ops.xp.zeros((0,), dtype="int32")
),
)
ops.xp.testing.assert_allclose(
ops.alloc((0, 0), dtype=dtype),
ops.backprop_seq2col(
ops.alloc((0, 0), dtype=dtype), 1, lengths=ops.xp.zeros((0,), dtype="int32")
),
)
# Zero-length sequence
ops.xp.testing.assert_allclose(
ops.alloc((0, 0), dtype=dtype),
ops.seq2col(ops.alloc((0, 0), dtype=dtype), 1, lengths=ops.asarray1i([0])),
)
ops.xp.testing.assert_allclose(
ops.alloc((0, 0), dtype=dtype),
ops.backprop_seq2col(
ops.alloc((0, 0), dtype=dtype), 1, lengths=ops.asarray1i([0])
),
)
# Multiple zero-length sequences
ops.xp.testing.assert_allclose(
ops.alloc((0, 0), dtype=dtype),
ops.seq2col(ops.alloc((0, 0), dtype=dtype), 1, lengths=ops.asarray1i([0, 0])),
)
ops.xp.testing.assert_allclose(
ops.alloc((0, 0), dtype=dtype),
ops.backprop_seq2col(
ops.alloc((0, 0), dtype=dtype), 1, lengths=ops.asarray1i([0, 0])
),
)
@pytest.mark.parametrize("ops", XP_OPS)
@pytest.mark.parametrize("dtype", FLOAT_TYPES)
def test_seq2col_lengths_zero_first_last(ops, dtype):
cols_check = ops.asarray2f(
[
[0, 0, 0, 1, 2, 3, 4, 5, 6],
[1, 2, 3, 4, 5, 6, 7, 8, 9],
[4, 5, 6, 7, 8, 9, 10, 11, 12],
[7, 8, 9, 10, 11, 12, 13, 14, 15],
[10, 11, 12, 13, 14, 15, 0, 0, 0],
],
dtype=dtype,
)
grad_check = ops.asarray2f(
[[2, 4, 6], [12, 15, 18], [21, 24, 27], [30, 33, 36], [26, 28, 30]], dtype=dtype
)
# Initial zero-length sequence
ops.xp.testing.assert_allclose(
cols_check,
ops.seq2col(
ops.xp.arange(1.0, 16.0, dtype=dtype).reshape(5, 3),
1,
lengths=ops.asarray1i([0, 5]),
),
)
ops.xp.testing.assert_allclose(
grad_check,
ops.backprop_seq2col(
cols_check,
1,
lengths=ops.asarray1i([0, 5]),
),
)
# Final zero-length sequence.
ops.xp.testing.assert_allclose(
cols_check,
ops.seq2col(
ops.xp.arange(1.0, 16.0, dtype=dtype).reshape(5, 3),
1,
lengths=ops.asarray1i([5, 0]),
),
)
@pytest.mark.parametrize("ops", XP_OPS)
@pytest.mark.parametrize("dtype", FLOAT_TYPES)
def test_seq2col_lengths_zero_between(ops, dtype):
cols_check = ops.asarray2f(
[
[0, 0, 0, 1, 2, 3, 4, 5, 6],
[1, 2, 3, 4, 5, 6, 7, 8, 9],
[4, 5, 6, 7, 8, 9, 10, 11, 12],
[7, 8, 9, 10, 11, 12, 13, 14, 15],
[10, 11, 12, 13, 14, 15, 0, 0, 0],
[0, 0, 0, 16, 17, 18, 19, 20, 21],
[16, 17, 18, 19, 20, 21, 0, 0, 0],
],
dtype=dtype,
)
grad_check = ops.asarray2f(
[
[2, 4, 6],
[12, 15, 18],
[21, 24, 27],
[30, 33, 36],
[26, 28, 30],
[32, 34, 36],
[38, 40, 42],
],
dtype=dtype,
)
# Zero-length between.
ops.xp.testing.assert_allclose(
cols_check,
ops.seq2col(
ops.xp.arange(1.0, 22.0, dtype=dtype).reshape(7, 3),
1,
lengths=ops.asarray1i([5, 0, 2]),
),
)
ops.xp.testing.assert_allclose(
grad_check,
ops.backprop_seq2col(
cols_check,
1,
lengths=ops.asarray1i([5, 0, 2]),
),
)
# Zero-length between twice.
ops.xp.testing.assert_allclose(
cols_check,
ops.seq2col(
ops.xp.arange(1.0, 22.0, dtype=dtype).reshape(7, 3),
1,
lengths=ops.asarray1i([5, 0, 0, 2]),
),
)
ops.xp.testing.assert_allclose(
grad_check,
ops.backprop_seq2col(
cols_check,
1,
lengths=ops.asarray1i([5, 0, 0, 2]),
),
)
@pytest.mark.parametrize("ops", XP_OPS)
@pytest.mark.parametrize("dtype", FLOAT_TYPES)
def test_seq2col_window_one_lengths(ops, dtype):
X = ops.xp.arange(1.0, 16.0, dtype=dtype).reshape(5, 3)
lengths = ops.asarray1i([1, 3, 1])
cols = ops.seq2col(X, 1, lengths=lengths)
ops.xp.testing.assert_allclose(
ops.asarray2f(
[
[0, 0, 0, 1, 2, 3, 0, 0, 0],
[0, 0, 0, 4, 5, 6, 7, 8, 9],
[4, 5, 6, 7, 8, 9, 10, 11, 12],
[7, 8, 9, 10, 11, 12, 0, 0, 0],
[0, 0, 0, 13, 14, 15, 0, 0, 0],
],
dtype=dtype,
),
cols,
)
@pytest.mark.parametrize("ops", XP_OPS)
@pytest.mark.parametrize("dtype", FLOAT_TYPES)
def test_seq2col_window_two_lengths(ops, dtype):
X = ops.xp.arange(1.0, 16.0, dtype=dtype).reshape(5, 3)
lengths = ops.asarray1i([1, 3, 1])
cols = ops.seq2col(X, 2, lengths=lengths)
ops.xp.testing.assert_allclose(
ops.asarray2f(
[
[0, 0, 0, 0, 0, 0, 1, 2, 3, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 4, 5, 6, 7, 8, 9, 10, 11, 12],
[0, 0, 0, 4, 5, 6, 7, 8, 9, 10, 11, 12, 0, 0, 0],
[4, 5, 6, 7, 8, 9, 10, 11, 12, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 13, 14, 15, 0, 0, 0, 0, 0, 0],
],
dtype=dtype,
),
cols,
)
@pytest.mark.parametrize("ops", XP_OPS)
@pytest.mark.parametrize("dtype", FLOAT_TYPES)
def test_backprop_seq2col_window_one_small(ops, dtype):
cols = ops.asarray(
[[0.0, 0.0, 0.0], [-1.0, 0.0, 1.0], [2.0, 0.0, 0.0]], dtype=dtype
)
expected = [[-1.0], [2.0], [1.0]]
seq = ops.backprop_seq2col(cols, 1)
if not isinstance(seq, numpy.ndarray):
seq = seq.get()
assert_allclose(seq, expected, atol=0.001, rtol=0.001)
@pytest.mark.parametrize("ops", ALL_OPS)
@pytest.mark.parametrize("dtype", FLOAT_TYPES)
@settings(max_examples=MAX_EXAMPLES, deadline=None)
@given(X=strategies.arrays_BI())
def test_backprop_seq2col_window_one(ops, dtype, X):
if X.shape[1] % 3:
return None
X = ops.asarray(X, dtype=dtype)
if ops.xp.abs(X).max() >= 30:
return None
base_ops = Ops()
base_ops.xp = ops.xp
target = base_ops.backprop_seq2col(X, nW=1)
predicted = ops.backprop_seq2col(X, nW=1)
for row in range(target.shape[0]):
diff = target[row].sum() - predicted[row].sum()
if diff < -0.1 or diff > 0.1:
print(row, diff)
print(target[row])
print(predicted[row])
ops.xp.testing.assert_allclose(target, predicted, atol=0.001, rtol=0.001)
@pytest.mark.parametrize("ops", XP_OPS)
@pytest.mark.parametrize("dtype", FLOAT_TYPES)
def test_backprop_seq2col_window_one_lengths(ops, dtype):
d_y = ops.xp.arange(0.1, 4.6, step=0.1, dtype=dtype).reshape(5, 9)
lengths = ops.asarray1i([1, 3, 1])
d_seqs = ops.backprop_seq2col(d_y, 1, lengths=lengths)
ops.xp.testing.assert_allclose(
ops.asarray2f(
[
[0.4, 0.5, 0.6],
[3.2, 3.4, 3.6],
[6.6, 6.9, 7.2],
[5.6, 5.8, 6.0],
[4.0, 4.1, 4.2],
],
dtype=dtype,
),
d_seqs,
atol=1e-6,
)
@pytest.mark.parametrize("ops", XP_OPS)
@pytest.mark.parametrize("dtype", FLOAT_TYPES)
def test_seq2col_window_two(ops, dtype):
seq = ops.asarray([[1.0], [2.0], [3.0], [4]], dtype=dtype)
cols = ops.seq2col(seq, 2)
if not isinstance(cols, numpy.ndarray):
cols = cols.get()
assert_allclose(cols[0], [0.0, 0.0, 1.0, 2.0, 3.0])
assert_allclose(cols[1], [0.0, 1.0, 2.0, 3.0, 4.0])
assert_allclose(cols[2], [1.0, 2.0, 3.0, 4.0, 0.0])
assert_allclose(cols[3], [2.0, 3.0, 4.0, 0.0, 0.0])
@pytest.mark.parametrize("ops", XP_OPS)
@pytest.mark.parametrize("dtype", FLOAT_TYPES)
def test_backprop_seq2col_window_two_lengths(ops, dtype):
d_y = ops.xp.arange(0.1, 7.6, step=0.1, dtype=dtype).reshape(5, 15)
lengths = ops.asarray1i([1, 3, 1])
d_seqs = ops.backprop_seq2col(d_y, 2, lengths=lengths)
ops.xp.testing.assert_allclose(
ops.asarray2f(
[
[0.7, 0.8, 0.9],
[10.2, 10.5, 10.8],
[11.1, 11.4, 11.7],
[12.0, 12.3, 12.6],
[6.7, 6.8, 6.9],
],
dtype=dtype,
),
d_seqs,
)
@pytest.mark.parametrize("ops", XP_OPS)
@pytest.mark.parametrize("dtype", FLOAT_TYPES)
def test_backprop_seq2col_window_two(ops, dtype):
cols = ops.asarray(
[
[0.0, 0.0, 1.0, 2.0, 3.0],
[0.0, 1.0, 2.0, 3.0, 4.0],
[1.0, 2.0, 3.0, 4.0, 0.0],
[2.0, 3.0, 4.0, 0.0, 0.0],
],
dtype=dtype,
)
# We're summing the values that each row
# was used as a feature. So row 0 had a
# gradient of 1 in row 0, 1 in row 2, and
# 1 in row 3.
expected = ops.asarray(
[
[1 + 1 + 1.0 + 0.0],
[2.0 + 2.0 + 2.0 + 2.0],
[3.0 + 3.0 + 3.0 + 3.0],
[0.0 + 4.0 + 4.0 + 4.0],
],
dtype=dtype,
)
seq = ops.backprop_seq2col(cols, 2)
ops.xp.testing.assert_allclose(seq, expected, atol=0.001, rtol=0.001)
@pytest.mark.skipif(not has_cupy_gpu, reason="needs GPU/CuPy")
@pytest.mark.parametrize("nW", [1, 2])
def test_large_seq2col_gpu_against_cpu(nW):
cupy_ops = CupyOps()
numpy_ops = NumpyOps()
# Use array with a large enough batch to require multiple
# CUDA grids.
batch_size = 128 * 128 * 2 # threads per block * blocks * 2
X = numpy_ops.xp.random.randn(batch_size * 2).astype("float32").reshape(-1, 2)
X_gpu = cupy_ops.asarray2f(X)
# Use somewhat interesting sequence lengths.
lengths = numpy_ops.asarray1i([1, 4, 2, 1] * (batch_size // 8))
lengths_gpu = cupy_ops.asarray1i(lengths)
cols = numpy_ops.seq2col(X, nW=nW, lengths=lengths)
cols_gpu = cupy_ops.seq2col(X_gpu, nW=nW, lengths=lengths_gpu)
assert_allclose(cols, cols_gpu.get())
@pytest.mark.skipif(not has_cupy_gpu, reason="needs GPU/CuPy")
@pytest.mark.parametrize("nW", [1, 2])
def test_large_backprop_seq2col_gpu_against_cpu(nW):
cupy_ops = CupyOps()
numpy_ops = NumpyOps()
# Use array with a large enough batch to require multiple
# CUDA grids.
batch_size = 128 * 128 * 2 # threads per block * blocks * 2
nF = 2 * nW + 1
d_cols = (
numpy_ops.xp.random.randn(batch_size * nF).astype("float32").reshape(-1, nF)
)
d_cols_gpu = cupy_ops.asarray2f(d_cols)
# Use somewhat interesting sequence lengths.
lengths = numpy_ops.asarray1i([1, 4, 2, 1] * (batch_size // 8))
lengths_gpu = cupy_ops.asarray1i(lengths)
d_seqs = numpy_ops.backprop_seq2col(d_cols, nW=nW, lengths=lengths)
d_seqs_gpu = cupy_ops.backprop_seq2col(d_cols_gpu, nW=nW, lengths=lengths_gpu)
assert_allclose(d_seqs, d_seqs_gpu.get())
@pytest.mark.parametrize("ops", ALL_OPS)
@pytest.mark.parametrize("dtype", FLOAT_TYPES)
@settings(max_examples=MAX_EXAMPLES, deadline=None)
@given(X=strategies.arrays_BI())
def test_backprop_reduce_sum(ops, dtype, X):
X = ops.asarray(X, dtype=dtype)
if ops.xp.abs(X).max() >= 5:
return None
lengths = ops.asarray([3] * len(X), dtype="i")
out = ops.backprop_reduce_sum(X, lengths)
assert out.dtype == dtype
assert out.shape == (sum(lengths), X.shape[1])
start = 0
for i, length in enumerate(lengths):
ops.xp.testing.assert_allclose(
out[start : start + length].sum(axis=0), X[i] * length, rtol=0.01, atol=0.01
)
start += length
@pytest.mark.parametrize("ops", ALL_OPS)
@settings(max_examples=MAX_EXAMPLES, deadline=None)
@given(X=strategies.arrays_BI())
def test_softmax_sums_to_one(ops, X):
y = ops.softmax(ops.asarray(X))
for row in y:
assert 0.99999 <= row.sum() <= 1.0001
@pytest.mark.parametrize("ops", ALL_OPS)
@settings(max_examples=MAX_EXAMPLES, deadline=None)
@given(X=strategies.arrays_BI())
def test_softmax_works_inplace(ops, X):
X = ops.asarray(X)
X = ops.softmax(X, inplace=True)
for row in X:
assert 0.99999 <= row.sum() <= 1.00001
def torch_softmax_with_temperature(
X: Floats2d, dY: Floats2d, temperature: float
) -> Tuple[Floats2d, Floats2d]:
import torch
Xt = xp2torch(X, requires_grad=True)
dYt = xp2torch(dY)
Xt_temp = Xt / temperature
Yt = torch.nn.functional.softmax(Xt_temp, dim=-1)
Yt.backward(dYt)
return cast(Floats2d, torch2xp(Yt)), cast(
Floats2d, torch2xp(cast(torch.Tensor, Xt.grad))
)
@pytest.mark.skipif(not has_torch, reason="needs PyTorch")
@pytest.mark.parametrize("ops", ALL_OPS)
@pytest.mark.parametrize("temperature", [0.5, 1.0, 2.0])
def test_softmax_temperature(ops, temperature):
X = ops.xp.arange(-10, 10, 0.2, dtype="f").reshape(10, 10)
dY = ops.xp.eye(10, dtype="f")
Y = ops.softmax(X, temperature=temperature)
dX = ops.backprop_softmax(Y, dY, temperature=temperature)
Yt, dXt = torch_softmax_with_temperature(X, dY, temperature)
ops.xp.testing.assert_allclose(Y, Yt, atol=1e-6)
ops.xp.testing.assert_allclose(dX, dXt, atol=1e-6)
@pytest.mark.parametrize("cpu_ops", [*CPU_OPS, BLIS_OPS])
def test_gemm_computes_correctly(cpu_ops):
W = numpy.zeros((3, 2), dtype="f")
X = numpy.zeros((4, 2), dtype="f")
W += numpy.random.uniform(size=W.size).reshape(W.shape)
X += numpy.random.uniform(size=X.size).reshape(X.shape)
Y = cpu_ops.gemm(X, W, trans2=True)
expected = numpy.dot(X, W.T)
assert_allclose(expected, Y, atol=1e-4, rtol=1e-4)
W = numpy.zeros((2, 3), dtype="f")
X = numpy.zeros((2, 4), dtype="f")
W += numpy.random.uniform(size=W.size).reshape(W.shape)
X += numpy.random.uniform(size=X.size).reshape(X.shape)
Y = cpu_ops.gemm(X, W, trans1=True)
expected = numpy.dot(X.T, W)
assert_allclose(expected, Y, atol=1e-4, rtol=1e-4)
cpu_ops.gemm(X, W, trans1=True, out=Y)
@pytest.mark.parametrize("cpu_ops", [*CPU_OPS, BLIS_OPS])
def test_gemm_out_used(cpu_ops):
a = b = numpy.zeros((2, 2), dtype="f")
c = numpy.ones((2, 2), dtype="f")
cpu_ops.gemm(a, b, out=c)
assert numpy.array_equal(c, numpy.zeros((2, 2)))
@pytest.mark.parametrize("cpu_ops", CPU_OPS)
@settings(max_examples=MAX_EXAMPLES, deadline=None)
@given(X=strategies.arrays_BI())
def test_flatten_unflatten_roundtrip(cpu_ops, X):
flat = cpu_ops.flatten([x for x in X])
assert flat.ndim == 1
unflat = cpu_ops.unflatten(flat, [len(x) for x in X])
assert_allclose(X, unflat)
flat2 = cpu_ops.flatten([x for x in X], pad=1, dtype="f")
assert len(flat2) > len(flat)
unflat2 = cpu_ops.unflatten(flat2, [len(x) for x in X], pad=1)
assert_allclose(X, unflat2)
@pytest.mark.parametrize("ops", ALL_OPS)
@pytest.mark.parametrize("dtype", FLOAT_TYPES + INT_TYPES)
def test_pad(ops, dtype):
X = [ops.xp.arange(1, 3, dtype=dtype), ops.xp.arange(1, 5, dtype=dtype)]
ops.xp.testing.assert_allclose(ops.pad(X), [[1, 2, 0, 0], [1, 2, 3, 4]])
ops.xp.testing.assert_allclose(
ops.pad(X, round_to=8), [[1, 2, 0, 0, 0, 0, 0, 0], [1, 2, 3, 4, 0, 0, 0, 0]]
)
X = [
ops.xp.arange(1, 5, dtype=dtype).reshape(2, 2),
ops.xp.arange(1, 9, dtype=dtype).reshape(4, 2),
]
ops.xp.testing.assert_allclose(
ops.pad(X),
[
[[1, 2], [3, 4], [0, 0], [0, 0]],
[[1, 2], [3, 4], [5, 6], [7, 8]],
],
)
ops.xp.testing.assert_allclose(
ops.pad(X, round_to=5),
[
[[1, 2], [3, 4], [0, 0], [0, 0], [0, 0]],
[[1, 2], [3, 4], [5, 6], [7, 8], [0, 0]],
],
)
with pytest.raises(ValueError, match=r"Rounding for padding must at least be 1"):
ops.pad(X, round_to=0)
@pytest.mark.parametrize("ops", ALL_OPS)
@pytest.mark.parametrize("dtype", FLOAT_TYPES)
def test_reduce_sum(ops, dtype):
X = ops.asarray2f(
[[1.0, 2.0], [3.0, 4.0], [5.0, 6.0], [1.0, 2.0], [3.0, 4.0]], dtype=dtype
)
lengths = ops.asarray1i([3, 2])
ops.xp.testing.assert_allclose(
ops.reduce_sum(X, lengths), [[9.0, 12.0], [4.0, 6.0]]
)
# Zero-length array
lengths = ops.asarray1i([3, 0, 2])
ops.xp.testing.assert_allclose(
ops.reduce_sum(X, lengths), [[9.0, 12.0], [0.0, 0.0], [4.0, 6.0]]
)
with pytest.raises(IndexError):
ops.reduce_sum(X, ops.xp.array([5, 5, 5, 5], dtype="i"))
with pytest.raises(ValueError):
ops.reduce_sum(X, ops.xp.array([-1, 10, 5, 5], dtype="i"))
@pytest.mark.parametrize("ops", ALL_OPS)
@pytest.mark.parametrize("dtype", FLOAT_TYPES)
def test_backprop_fails_with_incorrect_length(ops, dtype):
with pytest.raises(ValueError, match=r"lengths must be"):
ops.backprop_reduce_sum(
ops.xp.arange(1, 7, dtype=dtype).reshape(2, 3),
ops.xp.array([-1, 2], dtype="int32"),
)
@pytest.mark.parametrize("ops", ALL_OPS)
@pytest.mark.parametrize("dtype", FLOAT_TYPES)
def test_reduce_first(ops, dtype):
X = ops.asarray2f(
[[1.0, 6.0], [2.0, 7.0], [3.0, 8.0], [4.0, 9.0], [5.0, 10.0]], dtype=dtype
)
lengths = ops.asarray1i([3, 2])
Y, starts_ends = ops.reduce_first(X, lengths)
ops.xp.testing.assert_array_equal(starts_ends, ops.asarray1i([0, 3, 5]))
ops.xp.testing.assert_allclose(Y, [[1.0, 6.0], [4.0, 9.0]])
lengths = ops.asarray1i([3, 0, 2])
with pytest.raises(ValueError, match=r"all sequence lengths must be > 0"):
ops.reduce_last(X, lengths)
lengths = ops.asarray1i([3, 2, 1])
with pytest.raises(IndexError, match=r"lengths must sum up to the number of rows"):
ops.reduce_last(X, lengths)
@pytest.mark.parametrize("ops", ALL_OPS)
@pytest.mark.parametrize("dtype", FLOAT_TYPES)
def test_backprop_reduce_first(ops, dtype):
dY = ops.asarray2f([[1.0, 3.0], [2.0, 4.0]], dtype=dtype)
starts_ends = ops.asarray1i([0, 3, 5])
dX = ops.backprop_reduce_first(dY, starts_ends)
ops.xp.testing.assert_allclose(
dX, [[1.0, 3.0], [0.0, 0.0], [0.0, 0.0], [2.0, 4.0], [0.0, 0.0]]
)
@pytest.mark.parametrize("ops", ALL_OPS)
@pytest.mark.parametrize("dtype", FLOAT_TYPES)
def test_reduce_last(ops, dtype):
X = ops.asarray2f(
[[1.0, 6.0], [2.0, 7.0], [3.0, 8.0], [4.0, 9.0], [5.0, 10.0]], dtype=dtype
)
lengths = ops.asarray1i([3, 2])
Y, lasts = ops.reduce_last(X, lengths)
ops.xp.testing.assert_array_equal(lasts, ops.asarray1i([2, 4]))
ops.xp.testing.assert_allclose(Y, [[3.0, 8.0], [5.0, 10.0]])
lengths = ops.asarray1i([3, 0, 2])
with pytest.raises(ValueError, match=r"all sequence lengths must be > 0"):
ops.reduce_last(X, lengths)
lengths = ops.asarray1i([3, 2, 1])
with pytest.raises(IndexError, match=r"lengths must sum up to the number of rows"):
ops.reduce_last(X, lengths)
@pytest.mark.parametrize("ops", ALL_OPS)
@pytest.mark.parametrize("dtype", FLOAT_TYPES)
def test_backprop_reduce_last(ops, dtype):
dY = ops.asarray2f([[1.0, 3.0], [2.0, 4.0]], dtype=dtype)
lasts = ops.asarray1i([2, 4])
dX = ops.backprop_reduce_last(dY, lasts)
ops.xp.testing.assert_allclose(
dX, [[0.0, 0.0], [0.0, 0.0], [1.0, 3.0], [0.0, 0.0], [2.0, 4.0]]
)
@pytest.mark.parametrize("ops", ALL_OPS)
@pytest.mark.parametrize("dtype", FLOAT_TYPES)
def test_reduce_max_sm(ops, dtype):
X = ops.xp.zeros((6, 3), dtype=dtype)
X += ops.xp.random.uniform(-1, 1, X.shape)
lengths = ops.xp.array([2, 2, 2], dtype="i")
maxes, which = ops.reduce_max(X, lengths)
assert maxes.dtype == dtype
assert ops.xp.all(which >= 0)
assert ops.xp.all(which < X.shape[0])
start = 0
for i, length in enumerate(lengths):
truth = X[start : start + length].max(axis=0)
ops.xp.testing.assert_allclose(maxes[i], truth)
start += length
@pytest.mark.parametrize("ops", ALL_OPS)
@pytest.mark.parametrize("dtype", FLOAT_TYPES)
def test_reduce_max(ops, dtype):
m = ops.xp.zeros((19, 5), dtype=dtype)
m += ops.xp.random.uniform(-1, 1, m.shape)
lengths = ops.xp.array([5, 5, 3, 6], dtype="i")
# m[4, 0] = 1
# m[0, 1] = 2
# m[1, 3] = 3
maxes, which = ops.reduce_max(m, lengths)
assert maxes.dtype == dtype
assert ops.xp.all(which >= 0)
assert ops.xp.all(which < m.shape[0])
start = 0
for i, length in enumerate(lengths):
truth = m[start : start + length].max(axis=0)
ops.xp.testing.assert_allclose(maxes[i], truth)
start += length
with pytest.raises(IndexError):
ops.reduce_max(m, ops.xp.array([5, 5, 5, 5], dtype="i"))
with pytest.raises(ValueError):
ops.reduce_max(m, ops.xp.array([-1, 10, 5, 5], dtype="i"))
with pytest.raises(ValueError):
ops.reduce_max(m, ops.xp.array([5, 5, 0, 3, 6], dtype="i"))
@pytest.mark.parametrize("ops", ALL_OPS)
@pytest.mark.parametrize("dtype", FLOAT_TYPES)
def test_backprop_reduce_max(ops, dtype):
dX = ops.backprop_reduce_max(
ops.xp.arange(1, 7, dtype=dtype).reshape(2, 3),
ops.xp.array([[2, 1, 0], [1, 0, 1]]).astype("int32"),
ops.xp.array([3, 2], dtype="int32"),
)
assert dX.dtype == dtype
ops.xp.testing.assert_allclose(
dX,
[
[0.0, 0.0, 3.0],
[0.0, 2.0, 0.0],
[1.0, 0.0, 0.0],
[0.0, 5.0, 0.0],
[4.0, 0.0, 6.0],
],
)
with pytest.raises(IndexError):
ops.backprop_reduce_max(
ops.xp.arange(1, 7, dtype="f").reshape(2, 3),
ops.xp.array([[2, 3, 0], [1, 0, 1]]).astype("int32"),
ops.xp.array([3, 2], dtype="int32"),
)
with pytest.raises(ValueError):
ops.backprop_reduce_max(
ops.xp.arange(1, 7, dtype=dtype).reshape(2, 3),
ops.xp.array([[2, 1, 0], [1, 0, 1]]).astype("int32"),
ops.xp.array([-3, 2], dtype="int32"),
)
with pytest.raises(ValueError):
ops.backprop_reduce_max(
ops.xp.arange(1, 7, dtype=dtype).reshape(2, 3),
ops.xp.array([[2, 1, 0], [1, 0, 1], [1, 0, 1]]).astype("int32"),
ops.xp.array([3, 0, 2], dtype="int32"),
)
@pytest.mark.parametrize("ops", ALL_OPS)
@pytest.mark.parametrize("dtype", FLOAT_TYPES)
def test_reduce_mean(ops, dtype):
X = ops.asarray2f(
[[1.0, 2.0], [3.0, 4.0], [5.0, 6.0], [1.0, 2.0], [3.0, 4.0]], dtype=dtype
)
lengths = ops.asarray1i([3, 2])
ops.xp.testing.assert_allclose(
ops.reduce_mean(X, lengths), [[3.0, 4.0], [2.0, 3.0]]
)
# Zero-length array
lengths = ops.asarray1i([3, 0, 2])
ops.xp.testing.assert_allclose(
ops.reduce_mean(X, lengths), [[3.0, 4.0], [0.0, 0.0], [2.0, 3.0]]
)
# Zero-length array last.
X = ops.asarray2f([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]], dtype=dtype)
lengths = ops.asarray1i([3, 0])
ops.xp.testing.assert_allclose(
ops.reduce_mean(X, lengths), [[3.0, 4.0], [0.0, 0.0]]
)
with pytest.raises(IndexError):
ops.reduce_mean(X, ops.xp.array([3, 3], dtype="i"))
with pytest.raises(ValueError):
ops.reduce_mean(X, ops.xp.array([-1, 5], dtype="i"))
@pytest.mark.parametrize("ops", ALL_OPS)
@pytest.mark.parametrize("dtype", FLOAT_TYPES)
def test_backprop_reduce_mean(ops, dtype):
dX = ops.backprop_reduce_mean(
ops.xp.arange(1, 7, dtype=dtype).reshape(2, 3),
ops.xp.array([4, 2], dtype="int32"),
)
assert dX.dtype == dtype
ops.xp.testing.assert_allclose(
dX,
[
[0.25, 0.5, 0.75],
[0.25, 0.5, 0.75],
[0.25, 0.5, 0.75],
[0.25, 0.5, 0.75],
[2.0, 2.5, 3.0],
[2.0, 2.5, 3.0],
],
)
with pytest.raises(ValueError, match=r"lengths must be"):
ops.backprop_reduce_mean(
ops.xp.arange(1, 7, dtype=dtype).reshape(2, 3),
ops.xp.array([-1, 2], dtype="int32"),
)
@pytest.mark.parametrize("ops", ALL_OPS)
@pytest.mark.parametrize("dtype", FLOAT_TYPES)
@pytest.mark.parametrize("reduction", REDUCTIONS)
def test_reduce_empty_batch(ops, dtype, reduction):
func = getattr(ops, reduction)
backprop_func = getattr(ops, f"backprop_{reduction}")
lengths = ops.asarray1i([])
Y = func(ops.alloc((0, 10), dtype=dtype), lengths)
if reduction == "reduce_max":
Y, which = Y
dX = backprop_func(Y, which, lengths)
elif isinstance(Y, tuple):
Y, extra = Y
dX = backprop_func(Y, extra)
else:
dX = backprop_func(Y, lengths)
assert Y.shape == (0, 10)
assert dX.shape == (0, 10)
@pytest.mark.parametrize("ops", ALL_OPS)
@pytest.mark.parametrize("dtype", FLOAT_TYPES)
@pytest.mark.parametrize("reduction", REDUCTIONS)
def test_reduce_empty_hidden(ops, dtype, reduction):
func = getattr(ops, reduction)
backprop_func = getattr(ops, f"backprop_{reduction}")
lengths = ops.asarray1i([2, 3])
Y = func(ops.alloc((5, 0), dtype=dtype), lengths)
if reduction == "reduce_max":
Y, which = Y
dX = backprop_func(Y, which, lengths)
elif isinstance(Y, tuple):
Y, extra = Y
dX = backprop_func(Y, extra)
else:
dX = backprop_func(Y, lengths)
assert Y.shape == (2, 0)
assert dX.shape == (5, 0)
@pytest.mark.parametrize("ops", ALL_OPS)
@pytest.mark.parametrize("dtype", FLOAT_TYPES)
@pytest.mark.parametrize("reduction_raises", REDUCE_ZERO_LENGTH_RAISES)
def test_reduce_zero_seq_length(ops, dtype, reduction_raises):
reduction_str, raises = reduction_raises
reduction = getattr(ops, reduction_str)
X = ops.asarray2f(
[[1.0, 2.0], [3.0, 4.0], [5.0, 6.0], [1.0, 2.0], [3.0, 4.0]], dtype=dtype
)
lengths = ops.asarray1i([3, 0, 2])
if raises:
with pytest.raises(ValueError):
reduction(X, lengths)
else:
# All non-raising reductions have zero as their identity element.
ops.xp.testing.assert_allclose(reduction(X, lengths)[1], [0.0, 0.0])
@pytest.mark.parametrize("ops", ALL_OPS)
@settings(max_examples=MAX_EXAMPLES, deadline=None)
@given(X=strategies.arrays_BI())
def test_mish(ops, X):
X = ops.asarray(X)
Y = ops.mish(X)
assert Y.shape == X.shape
assert not ops.xp.isnan(Y).any()
@pytest.mark.parametrize("ops", XP_OPS)
@pytest.mark.parametrize("dtype", FLOAT_TYPES)
@pytest.mark.parametrize(
"op",
[
"backprop_clipped_linear",
"backprop_dish",
"backprop_gelu",
"backprop_gelu_approx",
"backprop_hard_sigmoid",
"backprop_hard_swish",
"backprop_hard_swish_mobilenet",
"backprop_hard_tanh",
"backprop_mish",
"backprop_relu",
"backprop_relu_k",
"backprop_softmax",
"backprop_swish",
],
)
def test_eltwise_backprop_rejects_incorrect_shapes(ops, dtype, op):
backprop = getattr(ops, op)
positional_args = [
p
for p in inspect.signature(backprop).parameters.values()
if p.default == inspect.Parameter.empty
]
if len(positional_args) == 3:
with pytest.raises(ValueError):
backprop(
ops.xp.zeros(10, dtype=dtype),
ops.xp.zeros(5, dtype=dtype),
ops.xp.zeros(10, dtype=dtype),
)
with pytest.raises(ValueError):
backprop(
ops.xp.zeros(10, dtype=dtype),
ops.xp.zeros(10, dtype=dtype),
ops.xp.zeros(5, dtype=dtype),
)
else:
with pytest.raises(ValueError):
backprop(
ops.xp.arange(-10, 10, dtype=dtype),
ops.xp.arange(5, -5, -1, dtype=dtype),
)
@pytest.mark.parametrize("ops", ALL_OPS)
@settings(max_examples=MAX_EXAMPLES, deadline=None)
@given(X=strategies.arrays_BI())
def test_relu_k(ops, X):
X = ops.asarray(X)
Y = ops.relu_k(X)
assert Y.shape == X.shape
assert not ops.xp.isnan(Y).any()
assert (Y >= 0).sum() == Y.size
assert (Y <= 6.0).sum() == Y.size
@pytest.mark.parametrize("ops", ALL_OPS)
@settings(max_examples=MAX_EXAMPLES, deadline=None)
@given(X=strategies.arrays_BI())
def test_swish(ops, X):
X = ops.asarray(X)
Y = ops.swish(X)
assert Y.shape == X.shape
assert not ops.xp.isnan(Y).any()
@pytest.mark.parametrize("ops", ALL_OPS)
@settings(max_examples=MAX_EXAMPLES, deadline=None)
@given(X=strategies.arrays_BI())
def test_hard_sigmoid(ops, X):
X = ops.asarray(X)
Y = ops.hard_sigmoid(X)
assert Y.shape == X.shape
assert not ops.xp.isnan(Y).any()
assert (Y >= 0).sum() == Y.size
assert (Y <= 1.0).sum() == Y.size
@pytest.mark.parametrize("ops", ALL_OPS)
@settings(max_examples=MAX_EXAMPLES, deadline=None)
@given(X=strategies.arrays_BI())
def test_hard_tanh(ops, X):
X = ops.asarray(X)
Y = ops.hard_tanh(X)
assert Y.shape == X.shape
assert not ops.xp.isnan(Y).any()
assert (Y >= -1.0).sum() == Y.size
assert (Y <= 1.0).sum() == Y.size
@pytest.mark.parametrize("ops", ALL_OPS)
@settings(max_examples=MAX_EXAMPLES, deadline=None)
@given(X=strategies.arrays_BI())
def test_hard_swish(ops, X):
X = ops.asarray(X)
Y = ops.hard_swish(X)
assert Y.shape == X.shape
assert not ops.xp.isnan(Y).any()
@pytest.mark.parametrize("ops", ALL_OPS)
@settings(max_examples=MAX_EXAMPLES, deadline=None)
@given(X=strategies.arrays_BI())
def test_hard_swish_mobilenet(ops, X):
X = ops.asarray(X)
Y = ops.hard_swish_mobilenet(X)
assert Y.shape == X.shape
assert not ops.xp.isnan(Y).any()
@pytest.mark.parametrize("ops", ALL_OPS)
@settings(max_examples=MAX_EXAMPLES, deadline=None)
@given(X=strategies.arrays_BI())
def test_gelu_approx(ops, X):
X = ops.asarray(X)
Y = ops.gelu_approx(X)
assert Y.shape == X.shape
assert not ops.xp.isnan(Y).any()
@pytest.mark.parametrize("ops", ALL_OPS)
@settings(max_examples=MAX_EXAMPLES, deadline=None)
@given(X=strategies.arrays_BI())
def test_dish(ops, X):
X = ops.asarray(X)
Y = ops.dish(X)
assert Y.shape == X.shape
assert not ops.xp.isnan(Y).any()
@pytest.mark.parametrize("ops", ALL_OPS)
@settings(max_examples=MAX_EXAMPLES, deadline=None)
@given(X=strategies.arrays_BI())
def test_gelu(ops, X):
X = ops.asarray(X)
Y = ops.gelu(X)
assert Y.shape == X.shape
assert not ops.xp.isnan(Y).any()
@pytest.mark.parametrize("ops", ALL_OPS)
@settings(max_examples=MAX_EXAMPLES, deadline=None)
@given(X=strategies.arrays_BI())
def test_backprop_mish(ops, X):
X = ops.asarray(X)
# Test zero gradients result in 0 dX
zeros = ops.alloc(X.shape)
dX = ops.backprop_mish(zeros, X)
assert dX.shape == X.shape
assert (dX == 0).all()
def get_lstm_args(depth, dirs, nO, batch_size, nI, draw=None):
if dirs == 1:
n_params = (nO * 4) * nI + nO * 4 + nO * 4 * nO + nO * 4
for _ in range(1, depth):
n_params += nO * 4 * nO + nO * 4 + nO * 4 * nO + nO * 4
else:
n_params = (nO * 2) * nI + nO * 2 + nO * 2 * (nO // 2) + nO * 2
for _ in range(1, depth):
n_params += nO * 2 * nO + nO * 2 + nO * 2 * (nO // 2) + nO * 2
n_params *= 2
lstm = LSTM(nO, nI, depth=depth, bi=dirs >= 2).initialize()
assert lstm.get_param("LSTM").size == n_params
if draw:
params = draw(ndarrays_of_shape(n_params))
# For some reason this is crashing hypothesis?
# size_at_t = draw(ndarrays_of_shape(shape=(batch_size,), lo=1, dtype="int32"))
size_at_t = numpy.ones(shape=(batch_size,), dtype="int32")
X = draw(ndarrays_of_shape((int(size_at_t.sum()), nI)))
else:
params = numpy.ones((n_params,), dtype="f")
size_at_t = numpy.ones(shape=(batch_size,), dtype="int32")
X = numpy.zeros(((int(size_at_t.sum()), nI)))
H0 = numpy.zeros((depth, dirs, nO // dirs))
C0 = numpy.zeros((depth, dirs, nO // dirs))
return (params, H0, C0, X, size_at_t)
@composite
def draw_lstm_args(draw):
depth = draw(integers(1, 4))
dirs = draw(integers(1, 2))
nO = draw(integers(1, 16)) * dirs
batch_size = draw(integers(1, 6))
nI = draw(integers(1, 16))
return get_lstm_args(depth, dirs, nO, batch_size, nI, draw=draw)
@pytest.mark.parametrize("ops", XP_OPS)
@pytest.mark.parametrize(
"depth,dirs,nO,batch_size,nI",
[
(1, 1, 1, 1, 1),
(1, 1, 2, 1, 1),
(1, 1, 2, 1, 2),
(2, 1, 1, 1, 1),
(2, 1, 2, 2, 2),
(1, 2, 2, 1, 1),
(2, 2, 2, 2, 2),
],
)
def test_lstm_forward_training(ops, depth, dirs, nO, batch_size, nI):
reference_ops = Ops()
params, H0, C0, X, size_at_t = get_lstm_args(depth, dirs, nO, batch_size, nI)
reference = reference_ops.lstm_forward_training(params, H0, C0, X, size_at_t)
Y, fwd_state = ops.lstm_forward_training(params, H0, C0, X, size_at_t)
assert_allclose(fwd_state[2], reference[1][2], atol=1e-4, rtol=1e-3)
assert_allclose(fwd_state[1], reference[1][1], atol=1e-4, rtol=1e-3)
assert_allclose(Y, reference[0], atol=1e-4, rtol=1e-3)
@pytest.mark.skipif(platform.machine() == "aarch64", reason="Flaky, skip temporarily")
@pytest.mark.parametrize("ops", XP_OPS)
@settings(max_examples=MAX_EXAMPLES, deadline=None)
@given(args=draw_lstm_args())
def test_lstm_forward_training_fuzz(ops, args):
params, H0, C0, X, size_at_t = args
reference_ops = Ops()
reference = reference_ops.lstm_forward_training(params, H0, C0, X, size_at_t)
Y, fwd_state = ops.lstm_forward_training(params, H0, C0, X, size_at_t)
assert_allclose(fwd_state[2], reference[1][2], atol=1e-4, rtol=1e-3)
assert_allclose(fwd_state[1], reference[1][1], atol=1e-4, rtol=1e-3)
assert_allclose(Y, reference[0], atol=1e-4, rtol=1e-3)
def test_get_ops():
assert isinstance(get_ops("numpy"), NumpyOps)
assert isinstance(get_ops("cupy"), CupyOps)
# If Apple ops are available, "cpu" should return AppleOps or
# NumpyOps otherwise.
try:
from thinc_apple_ops import AppleOps
assert isinstance(get_ops("cpu"), AppleOps)
except ImportError:
assert isinstance(get_ops("cpu"), NumpyOps)
# If BigEndian ops are available, "cpu" should return BigEndianOps or
# NumpyOps otherwise.
try:
from thinc_bigendian_ops import BigEndianOps
assert isinstance(get_ops("cpu"), BigEndianOps)
except ImportError:
assert isinstance(get_ops("cpu"), NumpyOps)
with pytest.raises(ValueError):
get_ops("blah")
ops = Ops(numpy)
assert ops.xp == numpy
def test_use_ops():
class_ops = get_current_ops()
with use_ops("numpy"):
new_ops = get_current_ops()
assert new_ops.name == "numpy"
with use_ops("cupy"):
new_ops = get_current_ops()
assert new_ops.name == "cupy"
new_ops = get_current_ops()
assert class_ops.name == new_ops.name
def test_minibatch():
fix_random_seed(0)
ops = get_current_ops()
items = [1, 2, 3, 4, 5, 6]
batches = ops.minibatch(3, items)
assert list(batches) == [[1, 2, 3], [4, 5, 6]]
batches = ops.minibatch((i for i in (3, 2, 1)), items)
assert list(batches) == [[1, 2, 3], [4, 5], [6]]
batches = list(ops.minibatch(3, numpy.asarray(items)))
assert isinstance(batches[0], numpy.ndarray)
assert numpy.array_equal(batches[0], numpy.asarray([1, 2, 3]))
assert numpy.array_equal(batches[1], numpy.asarray([4, 5, 6]))
batches = list(ops.minibatch((i for i in (3, 2, 1)), items, shuffle=True))
assert batches != [[1, 2, 3], [4, 5], [6]]
assert len(batches[0]) == 3
assert len(batches[1]) == 2
assert len(batches[2]) == 1
with pytest.raises(ValueError):
ops.minibatch(10, (i for i in range(100)))
with pytest.raises(ValueError):
ops.minibatch(10, True)
def test_multibatch():
fix_random_seed(0)
ops = get_current_ops()
arr1 = numpy.asarray([1, 2, 3, 4])
arr2 = numpy.asarray([5, 6, 7, 8])
batches = list(ops.multibatch(2, arr1, arr2))
assert numpy.concatenate(batches).tolist() == [[1, 2], [5, 6], [3, 4], [7, 8]]
batches = list(ops.multibatch(2, arr1, arr2, shuffle=True))
assert len(batches) == 2
assert len(batches[0]) == 2
assert len(batches[1]) == 2
batches = list(ops.multibatch(2, [1, 2, 3, 4], [5, 6, 7, 8]))
assert batches == [[[1, 2], [5, 6]], [[3, 4], [7, 8]]]
with pytest.raises(ValueError):
ops.multibatch(10, (i for i in range(100)), (i for i in range(100)))
with pytest.raises(ValueError):
ops.multibatch(10, arr1, (i for i in range(100)), arr2)
def test_ngrams():
ops = get_current_ops()
arr1 = numpy.asarray([1, 2, 3, 4, 5], dtype=numpy.uint64)
for n in range(1, 10):
assert len(ops.ngrams(n, arr1)) == max(0, arr1.shape[0] - (n - 1))
assert len(ops.ngrams(-1, arr1)) == 0
assert len(ops.ngrams(arr1.shape[0] + 1, arr1)) == 0
@pytest.mark.skipif(not has_torch, reason="needs PyTorch")
@pytest.mark.skipif(torch_version < Version("1.9.0"), reason="needs PyTorch 1.9.0")
@pytest.mark.parametrize("ops", ALL_OPS)
@pytest.mark.parametrize("dtype", ["float32", "float64"])
@pytest.mark.parametrize("torch_func", TORCH_FUNCS)
@settings(max_examples=MAX_EXAMPLES, deadline=None)
@given(
x=strategies.floats(min_value=-30, max_value=30),
dY=strategies.floats(min_value=-1, max_value=1),
)
def test_compare_activations_to_torch(ops, dtype, x, dY, torch_func):
import torch
func_name, pytorch_func = torch_func
forward = getattr(ops, func_name)
backward = getattr(ops, "backprop_" + func_name)
# The tolerance of isclose is set to 1e-06 instead of
# the default 1e-08 due to the GELU
x_thinc = ops.asarray([x], dtype=dtype)
x_torch = xp2torch(x_thinc, requires_grad=True)
y = pytorch_func(x_torch)
y_thinc = forward(x_thinc)
y.backward()
assert x_thinc.dtype == y_thinc.dtype
assert y_thinc is not x_thinc
y_think_inplace = forward(x_thinc, inplace=True)
assert y_think_inplace is x_thinc
assert ops.xp.isclose(y_thinc, y_think_inplace, atol=1e-06)
assert ops.xp.isclose(y_thinc, y.detach(), atol=1e-05)
x_thinc = ops.asarray([x], dtype=dtype)
dY_thinc = ops.asarray([dY], dtype=dtype)
dY_thinc_inplace = dY_thinc.copy()
s = inspect.signature(backward)
params = {p for p in s.parameters if p in ["dY", "X", "Y"]}
if params == {"dY", "X", "Y"}:
dx_thinc = backward(dY_thinc, Y=y_thinc, X=x_thinc)
assert dx_thinc.dtype == x_thinc.dtype
assert dx_thinc is not dY_thinc
dx_thinc_inplace = backward(
dY=dY_thinc_inplace, Y=y_thinc, X=x_thinc, inplace=True
)
assert dx_thinc_inplace is dY_thinc_inplace
assert ops.xp.isclose(dx_thinc, dx_thinc_inplace)
assert ops.xp.isclose(x_torch.grad.item() * dY, float(dx_thinc), atol=1e-06)
elif params == {"Y", "dY"}:
dx_thinc = backward(dY_thinc, Y=y_thinc)
assert dx_thinc.dtype == x_thinc.dtype
assert ops.xp.isclose(
dx_thinc,
backward(dY=dY_thinc_inplace, Y=y_thinc, inplace=True),
)
assert ops.xp.isclose(x_torch.grad.item() * dY, float(dx_thinc), atol=1e-06)
elif params == {"dY", "X"}:
dx_thinc = backward(dY_thinc, X=x_thinc)
assert dx_thinc.dtype == x_thinc.dtype
assert ops.xp.isclose(
dx_thinc, backward(dY=dY_thinc_inplace, X=x_thinc, inplace=True)
)
assert ops.xp.isclose(
x_torch.grad.item() * dY, float(backward(dY_thinc, X=x_thinc)), atol=1e-06
)
else:
raise NotImplementedError(
f"No PyTorch comparison implemented for parameter set: {params}"
)
@pytest.mark.parametrize("ops", ALL_OPS)
@settings(max_examples=MAX_EXAMPLES, deadline=None)
@given(x=strategies.floats(min_value=-10, max_value=10))
def test_clipped_linear(ops, x):
x_thinc = ops.xp.asarray([x])
assert ops.xp.isclose(ops.clipped_linear(x_thinc, max_val=6.0), ops.relu_k(x_thinc))
assert ops.xp.isclose(
ops.backprop_clipped_linear(ops.asarray1f([1.0]), x_thinc, max_val=6.0),
ops.backprop_relu_k(ops.asarray1f([1.0]), x_thinc),
)
assert ops.xp.isclose(
ops.clipped_linear(x_thinc, slope=0.2, offset=0.5), ops.hard_sigmoid(x_thinc)
)
assert ops.xp.isclose(
ops.backprop_clipped_linear(
ops.asarray1f([1.0]), x_thinc, slope=0.2, offset=0.5
),
ops.backprop_hard_sigmoid(ops.asarray1f([1.0]), x_thinc),
)
@pytest.mark.parametrize("ops", ALL_OPS)
@pytest.mark.parametrize("byte_order", (">", "<", "=", "|"))
@settings(max_examples=MAX_EXAMPLES, deadline=None)
@given(x=strategies.floats(min_value=-10, max_value=10))
def test_to_numpy_byteorder(ops, byte_order, x):
x = ops.xp.asarray([x])
y = ops.to_numpy(x, byte_order=byte_order)
assert numpy.array_equal(ops.to_numpy(x), ops.to_numpy(y))
if byte_order in (">", "<"):
# hack from: https://stackoverflow.com/a/49740663
assert y.dtype.newbyteorder("S").newbyteorder("S").byteorder == byte_order
else:
assert x.dtype.byteorder == y.dtype.byteorder
@pytest.mark.skipif(not has_cupy_gpu, reason="needs GPU/CuPy")
def test_custom_kernel_compilation():
for kernel_name in KERNELS_LIST:
compiled_kernel = KERNELS.get_function(kernel_name)
assert compiled_kernel is not None
assert compile_mmh() is not None
@pytest.mark.parametrize("ops", ALL_OPS)
def test_asarray_from_list_uint64(ops):
# list contains int values both above and below int64.max
uint64_list = [16, 11648197037703959513]
assert uint64_list == list(ops.asarray(uint64_list, dtype="uint64"))