150 lines
4.8 KiB
150 lines
4.8 KiB
import multiprocessing as mp
import logging
import traceback
from numba.cuda.testing import unittest, CUDATestCase
from numba.cuda.testing import (skip_on_cudasim, skip_with_cuda_python,
from numba.tests.support import linux_only
def child_test():
from numba import cuda, int32, void
from numba.core import config
import io
import numpy as np
import threading
# Enable PTDS before we make any CUDA driver calls. Enabling it first
# ensures that PTDS APIs are used because the CUDA driver looks up API
# functions on first use and memoizes them.
# Set up log capture for the Driver API so we can see what API calls were
# used.
logbuf = io.StringIO()
handler = logging.StreamHandler(logbuf)
cudadrv_logger = logging.getLogger('numba.cuda.cudadrv.driver')
# Set up data for our test, and copy over to the device
N = 2 ** 16
# Seed the RNG for repeatability
x = np.random.randint(low=0, high=1000, size=N, dtype=np.int32)
r = np.zeros_like(x)
# One input and output array for each thread
xs = [cuda.to_device(x) for _ in range(N_THREADS)]
rs = [cuda.to_device(r) for _ in range(N_THREADS)]
# Compute the grid size and get the [per-thread] default stream
n_threads = 256
n_blocks = N // n_threads
stream = cuda.default_stream()
# A simple multiplication-by-addition kernel. What it does exactly is not
# too important; only that we have a kernel that does something.
@cuda.jit(void(int32[::1], int32[::1]))
def f(r, x):
i = cuda.grid(1)
if i > len(r):
# Accumulate x into r
for j in range(N_ADDITIONS):
r[i] += x[i]
# This function will be used to launch the kernel from each thread on its
# own unique data.
def kernel_thread(n):
f[n_blocks, n_threads, stream](rs[n], xs[n])
# Create threads
threads = [threading.Thread(target=kernel_thread, args=(i,))
for i in range(N_THREADS)]
# Start all threads
for thread in threads:
# Wait for all threads to finish, to ensure that we don't synchronize with
# the device until all kernels are scheduled.
for thread in threads:
# Synchronize with the device
# Check output is as expected
expected = x * N_ADDITIONS
for i in range(N_THREADS):
np.testing.assert_equal(rs[i].copy_to_host(), expected)
# Return the driver log output to the calling process for checking
return logbuf.getvalue()
def child_test_wrapper(result_queue):
output = child_test()
success = True
# Catch anything raised so it can be propagated
except: # noqa: E722
output = traceback.format_exc()
success = False
result_queue.put((success, output))
# Run on Linux only until the reason for test hangs on Windows (Issue #8635,
# https://github.com/numba/numba/issues/8635) is diagnosed
@skip_under_cuda_memcheck('Hangs cuda-memcheck')
@skip_on_cudasim('Streams not supported on the simulator')
class TestPTDS(CUDATestCase):
@skip_with_cuda_python('Function names unchanged for PTDS with NV Binding')
def test_ptds(self):
# Run a test with PTDS enabled in a child process
ctx = mp.get_context('spawn')
result_queue = ctx.Queue()
proc = ctx.Process(target=child_test_wrapper, args=(result_queue,))
success, output = result_queue.get()
# Ensure the child process ran to completion before checking its output
if not success:
# Functions with a per-thread default stream variant that we expect to
# see in the output
ptds_functions = ('cuMemcpyHtoD_v2_ptds', 'cuLaunchKernel_ptsz',
for fn in ptds_functions:
with self.subTest(fn=fn, expected=True):
self.assertIn(fn, output)
# Non-PTDS versions of the functions that we should not see in the
# output:
legacy_functions = ('cuMemcpyHtoD_v2', 'cuLaunchKernel',
for fn in legacy_functions:
with self.subTest(fn=fn, expected=False):
# Ensure we only spot these function names appearing without a
# _ptds or _ptsz suffix by checking including the end of the
# line in the log
fn_at_end = f'{fn}\n'
self.assertNotIn(fn_at_end, output)
if __name__ == '__main__':