121 lines
3.9 KiB
Python
121 lines
3.9 KiB
Python
import numpy as np
|
|
import math
|
|
from numba import cuda, double, void
|
|
from numba.cuda.testing import unittest, CUDATestCase
|
|
|
|
|
|
RISKFREE = 0.02
|
|
VOLATILITY = 0.30
|
|
|
|
A1 = 0.31938153
|
|
A2 = -0.356563782
|
|
A3 = 1.781477937
|
|
A4 = -1.821255978
|
|
A5 = 1.330274429
|
|
RSQRT2PI = 0.39894228040143267793994605993438
|
|
|
|
|
|
def cnd(d):
|
|
K = 1.0 / (1.0 + 0.2316419 * np.abs(d))
|
|
ret_val = (RSQRT2PI * np.exp(-0.5 * d * d) *
|
|
(K * (A1 + K * (A2 + K * (A3 + K * (A4 + K * A5))))))
|
|
return np.where(d > 0, 1.0 - ret_val, ret_val)
|
|
|
|
|
|
def black_scholes(callResult, putResult, stockPrice, optionStrike, optionYears,
|
|
Riskfree, Volatility):
|
|
S = stockPrice
|
|
X = optionStrike
|
|
T = optionYears
|
|
R = Riskfree
|
|
V = Volatility
|
|
sqrtT = np.sqrt(T)
|
|
d1 = (np.log(S / X) + (R + 0.5 * V * V) * T) / (V * sqrtT)
|
|
d2 = d1 - V * sqrtT
|
|
cndd1 = cnd(d1)
|
|
cndd2 = cnd(d2)
|
|
|
|
expRT = np.exp(- R * T)
|
|
callResult[:] = (S * cndd1 - X * expRT * cndd2)
|
|
putResult[:] = (X * expRT * (1.0 - cndd2) - S * (1.0 - cndd1))
|
|
|
|
|
|
def randfloat(rand_var, low, high):
|
|
return (1.0 - rand_var) * low + rand_var * high
|
|
|
|
|
|
class TestBlackScholes(CUDATestCase):
|
|
def test_blackscholes(self):
|
|
OPT_N = 400
|
|
iterations = 2
|
|
|
|
stockPrice = randfloat(np.random.random(OPT_N), 5.0, 30.0)
|
|
optionStrike = randfloat(np.random.random(OPT_N), 1.0, 100.0)
|
|
optionYears = randfloat(np.random.random(OPT_N), 0.25, 10.0)
|
|
|
|
callResultNumpy = np.zeros(OPT_N)
|
|
putResultNumpy = -np.ones(OPT_N)
|
|
|
|
callResultNumba = np.zeros(OPT_N)
|
|
putResultNumba = -np.ones(OPT_N)
|
|
|
|
# numpy
|
|
for i in range(iterations):
|
|
black_scholes(callResultNumpy, putResultNumpy, stockPrice,
|
|
optionStrike, optionYears, RISKFREE, VOLATILITY)
|
|
|
|
@cuda.jit(double(double), device=True, inline=True)
|
|
def cnd_cuda(d):
|
|
K = 1.0 / (1.0 + 0.2316419 * math.fabs(d))
|
|
ret_val = (RSQRT2PI * math.exp(-0.5 * d * d) *
|
|
(K * (A1 + K * (A2 + K * (A3 + K * (A4 + K * A5))))))
|
|
if d > 0:
|
|
ret_val = 1.0 - ret_val
|
|
return ret_val
|
|
|
|
@cuda.jit(void(double[:], double[:], double[:], double[:], double[:],
|
|
double, double))
|
|
def black_scholes_cuda(callResult, putResult, S, X, T, R, V):
|
|
i = cuda.threadIdx.x + cuda.blockIdx.x * cuda.blockDim.x
|
|
if i >= S.shape[0]:
|
|
return
|
|
sqrtT = math.sqrt(T[i])
|
|
d1 = ((math.log(S[i] / X[i]) + (R + 0.5 * V * V) * T[i])
|
|
/ (V * sqrtT))
|
|
d2 = d1 - V * sqrtT
|
|
cndd1 = cnd_cuda(d1)
|
|
cndd2 = cnd_cuda(d2)
|
|
|
|
expRT = math.exp((-1. * R) * T[i])
|
|
callResult[i] = (S[i] * cndd1 - X[i] * expRT * cndd2)
|
|
putResult[i] = (X[i] * expRT * (1.0 - cndd2) - S[i] * (1.0 - cndd1))
|
|
|
|
# numba
|
|
blockdim = 512, 1
|
|
griddim = int(math.ceil(float(OPT_N) / blockdim[0])), 1
|
|
stream = cuda.stream()
|
|
d_callResult = cuda.to_device(callResultNumba, stream)
|
|
d_putResult = cuda.to_device(putResultNumba, stream)
|
|
d_stockPrice = cuda.to_device(stockPrice, stream)
|
|
d_optionStrike = cuda.to_device(optionStrike, stream)
|
|
d_optionYears = cuda.to_device(optionYears, stream)
|
|
|
|
for i in range(iterations):
|
|
black_scholes_cuda[griddim, blockdim, stream](
|
|
d_callResult, d_putResult, d_stockPrice, d_optionStrike,
|
|
d_optionYears, RISKFREE, VOLATILITY)
|
|
d_callResult.copy_to_host(callResultNumba, stream)
|
|
d_putResult.copy_to_host(putResultNumba, stream)
|
|
stream.synchronize()
|
|
|
|
delta = np.abs(callResultNumpy - callResultNumba)
|
|
L1norm = delta.sum() / np.abs(callResultNumpy).sum()
|
|
|
|
max_abs_err = delta.max()
|
|
self.assertTrue(L1norm < 1e-13)
|
|
self.assertTrue(max_abs_err < 1e-13)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|