2017 lines
84 KiB
Python
2017 lines
84 KiB
Python
import copy
|
|
import operator
|
|
|
|
import types as pytypes
|
|
import operator
|
|
import warnings
|
|
from dataclasses import make_dataclass
|
|
|
|
import llvmlite.ir
|
|
import numpy as np
|
|
|
|
import numba
|
|
from numba.parfors import parfor
|
|
from numba.core import types, ir, config, compiler, sigutils, cgutils
|
|
from numba.core.ir_utils import (
|
|
add_offset_to_labels,
|
|
replace_var_names,
|
|
remove_dels,
|
|
legalize_names,
|
|
rename_labels,
|
|
get_name_var_table,
|
|
visit_vars_inner,
|
|
get_definition,
|
|
guard,
|
|
get_call_table,
|
|
is_pure,
|
|
get_np_ufunc_typ,
|
|
get_unused_var_name,
|
|
is_const_call,
|
|
fixup_var_define_in_scope,
|
|
transfer_scope,
|
|
find_max_label,
|
|
get_global_func_typ,
|
|
)
|
|
from numba.core.typing import signature
|
|
from numba.core import lowering
|
|
from numba.parfors.parfor import ensure_parallel_support
|
|
from numba.core.errors import (
|
|
NumbaParallelSafetyWarning, NotDefinedError, CompilerError, InternalError,
|
|
)
|
|
from numba.parfors.parfor_lowering_utils import ParforLoweringBuilder
|
|
|
|
|
|
class ParforLower(lowering.Lower):
|
|
"""This is a custom lowering class that extends standard lowering so as
|
|
to accommodate parfor.Parfor nodes."""
|
|
|
|
# custom instruction lowering to handle parfor nodes
|
|
def lower_inst(self, inst):
|
|
if isinstance(inst, parfor.Parfor):
|
|
_lower_parfor_parallel(self, inst)
|
|
else:
|
|
super().lower_inst(inst)
|
|
|
|
@property
|
|
def _disable_sroa_like_opt(self):
|
|
"""
|
|
Force disable this because Parfor use-defs is incompatible---it only
|
|
considers use-defs in blocks that must be executing.
|
|
See https://github.com/numba/numba/commit/017e2ff9db87fc34149b49dd5367ecbf0bb45268
|
|
"""
|
|
return True
|
|
|
|
|
|
def _lower_parfor_parallel(lowerer, parfor):
|
|
if parfor.lowerer is None:
|
|
return _lower_parfor_parallel_std(lowerer, parfor)
|
|
else:
|
|
return parfor.lowerer(lowerer, parfor)
|
|
|
|
|
|
def _lower_parfor_parallel_std(lowerer, parfor):
|
|
"""Lowerer that handles LLVM code generation for parfor.
|
|
This function lowers a parfor IR node to LLVM.
|
|
The general approach is as follows:
|
|
1) The code from the parfor's init block is lowered normally
|
|
in the context of the current function.
|
|
2) The body of the parfor is transformed into a gufunc function.
|
|
3) Code is inserted into the main function that calls do_scheduling
|
|
to divide the iteration space for each thread, allocates
|
|
reduction arrays, calls the gufunc function, and then invokes
|
|
the reduction function across the reduction arrays to produce
|
|
the final reduction values.
|
|
"""
|
|
from numba.np.ufunc.parallel import get_thread_count
|
|
|
|
ensure_parallel_support()
|
|
typingctx = lowerer.context.typing_context
|
|
targetctx = lowerer.context
|
|
builder = lowerer.builder
|
|
# We copy the typemap here because for race condition variable we'll
|
|
# update their type to array so they can be updated by the gufunc.
|
|
orig_typemap = lowerer.fndesc.typemap
|
|
# replace original typemap with copy and restore the original at the end.
|
|
lowerer.fndesc.typemap = copy.copy(orig_typemap)
|
|
if config.DEBUG_ARRAY_OPT:
|
|
print("lowerer.fndesc", lowerer.fndesc, type(lowerer.fndesc))
|
|
typemap = lowerer.fndesc.typemap
|
|
varmap = lowerer.varmap
|
|
|
|
if config.DEBUG_ARRAY_OPT:
|
|
print("_lower_parfor_parallel")
|
|
parfor.dump()
|
|
|
|
loc = parfor.init_block.loc
|
|
scope = parfor.init_block.scope
|
|
|
|
# produce instructions for init_block
|
|
if config.DEBUG_ARRAY_OPT:
|
|
print("init_block = ", parfor.init_block, " ", type(parfor.init_block))
|
|
for instr in parfor.init_block.body:
|
|
if config.DEBUG_ARRAY_OPT:
|
|
print("lower init_block instr = ", instr)
|
|
lowerer.lower_inst(instr)
|
|
|
|
for racevar in parfor.races:
|
|
if racevar not in varmap:
|
|
rvtyp = typemap[racevar]
|
|
rv = ir.Var(scope, racevar, loc)
|
|
lowerer._alloca_var(rv.name, rvtyp)
|
|
|
|
alias_map = {}
|
|
arg_aliases = {}
|
|
numba.parfors.parfor.find_potential_aliases_parfor(parfor, parfor.params, typemap,
|
|
lowerer.func_ir, alias_map, arg_aliases)
|
|
if config.DEBUG_ARRAY_OPT:
|
|
print("alias_map", alias_map)
|
|
print("arg_aliases", arg_aliases)
|
|
|
|
# run get_parfor_outputs() and get_parfor_reductions() before gufunc creation
|
|
# since Jumps are modified so CFG of loop_body dict will become invalid
|
|
assert parfor.params is not None
|
|
|
|
parfor_output_arrays = numba.parfors.parfor.get_parfor_outputs(
|
|
parfor, parfor.params)
|
|
parfor_redvars, parfor_reddict = parfor.redvars, parfor.reddict
|
|
if config.DEBUG_ARRAY_OPT:
|
|
print("parfor_redvars:", parfor_redvars)
|
|
print("parfor_reddict:", parfor_reddict)
|
|
|
|
# init reduction array allocation here.
|
|
nredvars = len(parfor_redvars)
|
|
redarrs = {}
|
|
to_cleanup = []
|
|
if nredvars > 0:
|
|
# reduction arrays outer dimension equal to thread count
|
|
scope = parfor.init_block.scope
|
|
loc = parfor.init_block.loc
|
|
pfbdr = ParforLoweringBuilder(lowerer=lowerer, scope=scope, loc=loc)
|
|
|
|
# Get the Numba internal function to call to get the thread count.
|
|
get_num_threads = pfbdr.bind_global_function(
|
|
fobj=numba.np.ufunc.parallel._iget_num_threads,
|
|
ftype=get_global_func_typ(numba.np.ufunc.parallel._iget_num_threads),
|
|
args=()
|
|
)
|
|
|
|
# Insert the call to assign the thread count to a variable.
|
|
num_threads_var = pfbdr.assign(
|
|
rhs=pfbdr.call(get_num_threads, args=[]),
|
|
typ=types.intp,
|
|
name="num_threads_var")
|
|
|
|
# For each reduction variable...
|
|
for i in range(nredvars):
|
|
red_name = parfor_redvars[i]
|
|
# Get the type of the reduction variable.
|
|
redvar_typ = lowerer.fndesc.typemap[red_name]
|
|
# Get the ir.Var for the reduction variable.
|
|
redvar = ir.Var(scope, red_name, loc)
|
|
# Get the type of the array that holds the per-thread
|
|
# reduction variables.
|
|
redarrvar_typ = redtyp_to_redarraytype(redvar_typ)
|
|
reddtype = redarrvar_typ.dtype
|
|
if config.DEBUG_ARRAY_OPT:
|
|
print(
|
|
"reduction_info",
|
|
red_name,
|
|
redvar_typ,
|
|
redarrvar_typ,
|
|
reddtype,
|
|
types.DType(reddtype),
|
|
num_threads_var,
|
|
type(num_threads_var)
|
|
)
|
|
|
|
# If this is reduction over an array,
|
|
# the reduction array has just one added per-worker dimension.
|
|
if isinstance(redvar_typ, types.npytypes.Array):
|
|
redarrdim = redvar_typ.ndim + 1
|
|
else:
|
|
redarrdim = 1
|
|
|
|
# Reduction array is created and initialized to the initial reduction value.
|
|
|
|
# First create a var for the numpy empty ufunc.
|
|
glbl_np_empty = pfbdr.bind_global_function(
|
|
fobj=np.empty,
|
|
ftype=get_np_ufunc_typ(np.empty),
|
|
args=(
|
|
types.UniTuple(types.intp, redarrdim),
|
|
),
|
|
kws={'dtype': types.DType(reddtype)}
|
|
)
|
|
|
|
size_var_list = [num_threads_var]
|
|
|
|
# If this is a reduction over an array...
|
|
if isinstance(redvar_typ, types.npytypes.Array):
|
|
# Add code to get the shape of the array being reduced over.
|
|
redshape_var = pfbdr.assign(
|
|
rhs=ir.Expr.getattr(redvar, "shape", loc),
|
|
typ=types.UniTuple(types.intp, redvar_typ.ndim),
|
|
name="redarr_shape",
|
|
)
|
|
|
|
# Add the dimension sizes of the array being reduced over to the tuple of sizes pass to empty.
|
|
for j in range(redvar_typ.ndim):
|
|
onedimvar = pfbdr.assign(
|
|
rhs=ir.Expr.static_getitem(redshape_var, j, None, loc),
|
|
typ=types.intp,
|
|
name="redshapeonedim",
|
|
)
|
|
size_var_list.append(onedimvar)
|
|
|
|
# Empty call takes tuple of sizes. Create here and fill in outer dimension (num threads).
|
|
size_var = pfbdr.make_tuple_variable(
|
|
size_var_list, name='tuple_size_var',
|
|
)
|
|
|
|
# Resolve dtype
|
|
cval = pfbdr._typingctx.resolve_value_type(reddtype)
|
|
dt = pfbdr.make_const_variable(cval=cval, typ=types.DType(reddtype))
|
|
# Add call to empty passing the size var tuple.
|
|
empty_call = pfbdr.call(glbl_np_empty, args=[size_var, dt])
|
|
|
|
redarr_var = pfbdr.assign(
|
|
rhs=empty_call, typ=redarrvar_typ, name="redarr",
|
|
)
|
|
|
|
# Remember mapping of original reduction array to the newly created per-worker reduction array.
|
|
redarrs[redvar.name] = redarr_var
|
|
to_cleanup.append(redarr_var)
|
|
|
|
init_val = parfor_reddict[red_name].init_val
|
|
|
|
if init_val is not None:
|
|
if isinstance(redvar_typ, types.npytypes.Array):
|
|
# Create an array of identity values for the reduction.
|
|
# First, create a variable for np.full.
|
|
full_func_node = pfbdr.bind_global_function(
|
|
fobj=np.full,
|
|
ftype=get_np_ufunc_typ(np.full),
|
|
args=(
|
|
types.UniTuple(types.intp, redvar_typ.ndim),
|
|
reddtype,
|
|
),
|
|
kws={'dtype': types.DType(reddtype)},
|
|
)
|
|
|
|
# Then create a var with the identify value.
|
|
init_val_var = pfbdr.make_const_variable(
|
|
cval=init_val,
|
|
typ=reddtype,
|
|
name="init_val",
|
|
)
|
|
|
|
# Then, call np.full with the shape of the reduction array and the identity value.
|
|
full_call = pfbdr.call(
|
|
full_func_node, args=[redshape_var, init_val_var, dt],
|
|
)
|
|
|
|
redtoset = pfbdr.assign(
|
|
rhs=full_call,
|
|
typ=redvar_typ,
|
|
name="redtoset",
|
|
)
|
|
# rettoset is an array from np.full() and must be released
|
|
to_cleanup.append(redtoset)
|
|
else:
|
|
redtoset = pfbdr.make_const_variable(
|
|
cval=init_val,
|
|
typ=reddtype,
|
|
name="redtoset",
|
|
)
|
|
else:
|
|
redtoset = redvar
|
|
|
|
if config.DEBUG_ARRAY_OPT_RUNTIME:
|
|
res_print_str = "res_print1 for redvar " + str(redvar) + ":"
|
|
strconsttyp = types.StringLiteral(res_print_str)
|
|
|
|
lhs = pfbdr.make_const_variable(
|
|
cval=res_print_str,
|
|
typ=strconsttyp,
|
|
name="str_const",
|
|
)
|
|
|
|
res_print = ir.Print(args=[lhs, redvar],
|
|
vararg=None, loc=loc)
|
|
lowerer.fndesc.calltypes[res_print] = signature(types.none,
|
|
typemap[lhs.name],
|
|
typemap[redvar.name])
|
|
print("res_print_redvar", res_print)
|
|
lowerer.lower_inst(res_print)
|
|
|
|
|
|
# For each thread, initialize the per-worker reduction array to
|
|
# the current reduction array value.
|
|
|
|
# Get the Numba type of the variable that holds the thread count.
|
|
num_thread_type = typemap[num_threads_var.name]
|
|
# Get the LLVM type of the thread count variable.
|
|
ntllvm_type = targetctx.get_value_type(num_thread_type)
|
|
# Create a LLVM variable to hold the loop index.
|
|
alloc_loop_var = cgutils.alloca_once(builder, ntllvm_type)
|
|
# Associate this LLVM variable to a Numba IR variable so that
|
|
# we can use setitem IR builder.
|
|
# Create a Numba IR variable.
|
|
numba_ir_loop_index_var = scope.redefine("$loop_index", loc)
|
|
# Give that variable the right type.
|
|
typemap[numba_ir_loop_index_var.name] = num_thread_type
|
|
# Associate this Numba variable to the LLVM variable in the
|
|
# lowerer's varmap.
|
|
lowerer.varmap[numba_ir_loop_index_var.name] = alloc_loop_var
|
|
# Insert a loop into the outputed LLVM that goes from 0 to
|
|
# the current thread count.
|
|
with cgutils.for_range(builder, lowerer.loadvar(num_threads_var.name), intp=ntllvm_type) as loop:
|
|
# Store the loop index into the alloca'd LLVM loop index variable.
|
|
builder.store(loop.index, alloc_loop_var)
|
|
# Initialize one element of the reduction array using the Numba
|
|
# IR variable associated with this loop's index.
|
|
pfbdr.setitem(obj=redarr_var, index=numba_ir_loop_index_var, val=redtoset)
|
|
|
|
# compile parfor body as a separate function to be used with GUFuncWrapper
|
|
flags = parfor.flags.copy()
|
|
flags.error_model = "numpy"
|
|
# Can't get here unless flags.auto_parallel == ParallelOptions(True)
|
|
index_var_typ = typemap[parfor.loop_nests[0].index_variable.name]
|
|
# index variables should have the same type, check rest of indices
|
|
for l in parfor.loop_nests[1:]:
|
|
assert typemap[l.index_variable.name] == index_var_typ
|
|
numba.parfors.parfor.sequential_parfor_lowering = True
|
|
try:
|
|
(func,
|
|
func_args,
|
|
func_sig,
|
|
func_arg_types,
|
|
exp_name_to_tuple_var) = _create_gufunc_for_parfor_body(
|
|
lowerer, parfor, typemap, typingctx, targetctx, flags, {},
|
|
bool(alias_map), index_var_typ, parfor.races)
|
|
finally:
|
|
numba.parfors.parfor.sequential_parfor_lowering = False
|
|
|
|
# get the shape signature
|
|
func_args = ['sched'] + func_args
|
|
num_reductions = len(parfor_redvars)
|
|
num_inputs = len(func_args) - len(parfor_output_arrays) - num_reductions
|
|
if config.DEBUG_ARRAY_OPT:
|
|
print("func_args = ", func_args)
|
|
print("num_inputs = ", num_inputs)
|
|
print("parfor_outputs = ", parfor_output_arrays)
|
|
print("parfor_redvars = ", parfor_redvars)
|
|
print("num_reductions = ", num_reductions)
|
|
gu_signature = _create_shape_signature(
|
|
parfor.get_shape_classes,
|
|
num_inputs,
|
|
num_reductions,
|
|
func_args,
|
|
func_sig,
|
|
parfor.races,
|
|
typemap)
|
|
if config.DEBUG_ARRAY_OPT:
|
|
print("gu_signature = ", gu_signature)
|
|
|
|
# call the func in parallel by wrapping it with ParallelGUFuncBuilder
|
|
loop_ranges = [(l.start, l.stop, l.step) for l in parfor.loop_nests]
|
|
if config.DEBUG_ARRAY_OPT:
|
|
print("loop_nests = ", parfor.loop_nests)
|
|
print("loop_ranges = ", loop_ranges)
|
|
call_parallel_gufunc(
|
|
lowerer,
|
|
func,
|
|
gu_signature,
|
|
func_sig,
|
|
func_args,
|
|
func_arg_types,
|
|
loop_ranges,
|
|
parfor_redvars,
|
|
parfor_reddict,
|
|
redarrs,
|
|
parfor.init_block,
|
|
index_var_typ,
|
|
parfor.races,
|
|
exp_name_to_tuple_var)
|
|
|
|
if nredvars > 0:
|
|
_parfor_lowering_finalize_reduction(
|
|
parfor, redarrs, lowerer, parfor_reddict, num_threads_var,
|
|
)
|
|
|
|
# Cleanup reduction variable
|
|
for v in to_cleanup:
|
|
lowerer.lower_inst(ir.Del(v.name, loc=loc))
|
|
# Restore the original typemap of the function that was replaced temporarily at the
|
|
# Beginning of this function.
|
|
lowerer.fndesc.typemap = orig_typemap
|
|
|
|
if config.DEBUG_ARRAY_OPT:
|
|
print("_lower_parfor_parallel done")
|
|
|
|
|
|
_ReductionInfo = make_dataclass(
|
|
"_ReductionInfo",
|
|
[
|
|
"redvar_info",
|
|
"redvar_name",
|
|
"redvar_typ",
|
|
"redarr_var",
|
|
"redarr_typ",
|
|
"init_val",
|
|
],
|
|
frozen=True,
|
|
)
|
|
|
|
|
|
def _parfor_lowering_finalize_reduction(
|
|
parfor,
|
|
redarrs,
|
|
lowerer,
|
|
parfor_reddict,
|
|
thread_count_var,
|
|
):
|
|
"""Emit code to finalize the reduction from the intermediate values of
|
|
each thread.
|
|
"""
|
|
# For each reduction variable
|
|
for redvar_name, redarr_var in redarrs.items():
|
|
# Pseudo-code for this loop body:
|
|
# tmp = redarr[0]
|
|
# for i in range(1, thread_count):
|
|
# tmp = reduce_op(redarr[i], tmp)
|
|
# reduction_result = tmp
|
|
redvar_typ = lowerer.fndesc.typemap[redvar_name]
|
|
redarr_typ = lowerer.fndesc.typemap[redarr_var.name]
|
|
init_val = lowerer.loadvar(redvar_name)
|
|
|
|
reduce_info = _ReductionInfo(
|
|
redvar_info = parfor_reddict[redvar_name],
|
|
redvar_name=redvar_name,
|
|
redvar_typ=redvar_typ,
|
|
redarr_var=redarr_var,
|
|
redarr_typ=redarr_typ,
|
|
init_val=init_val,
|
|
)
|
|
# generate code for combining reduction variable with thread output
|
|
handler = (_lower_trivial_inplace_binops
|
|
if reduce_info.redvar_info.redop is not None
|
|
else _lower_non_trivial_reduce)
|
|
handler(parfor, lowerer, thread_count_var, reduce_info)
|
|
|
|
|
|
class ParforsUnexpectedReduceNodeError(InternalError):
|
|
def __init__(self, inst):
|
|
super().__init__(f"Unknown reduce instruction node: {inst}")
|
|
|
|
|
|
def _lower_trivial_inplace_binops(parfor, lowerer, thread_count_var, reduce_info):
|
|
"""Lower trivial inplace-binop reduction.
|
|
"""
|
|
for inst in reduce_info.redvar_info.reduce_nodes:
|
|
# Var assigns to Var?
|
|
if _lower_var_to_var_assign(lowerer, inst):
|
|
pass
|
|
# Is inplace-binop for the reduction?
|
|
elif _is_inplace_binop_and_rhs_is_init(inst, reduce_info.redvar_name):
|
|
fn = inst.value.fn
|
|
redvar_result = _emit_binop_reduce_call(
|
|
fn, lowerer, thread_count_var, reduce_info,
|
|
)
|
|
lowerer.storevar(redvar_result, name=inst.target.name)
|
|
# Otherwise?
|
|
else:
|
|
raise ParforsUnexpectedReduceNodeError(inst)
|
|
|
|
# XXX: This seems like a hack to stop the loop with this condition.
|
|
if _fix_redvar_name_ssa_mismatch(parfor, lowerer, inst,
|
|
reduce_info.redvar_name):
|
|
break
|
|
if config.DEBUG_ARRAY_OPT_RUNTIME:
|
|
varname = reduce_info.redvar_name
|
|
lowerer.print_variable(
|
|
f"{parfor.loc}: parfor {fn.__name__} reduction {varname} =",
|
|
varname,
|
|
)
|
|
|
|
|
|
def _lower_non_trivial_reduce(parfor, lowerer, thread_count_var, reduce_info):
|
|
"""Lower non-trivial reduction such as call to `functools.reduce()`.
|
|
"""
|
|
init_name = f"{reduce_info.redvar_name}#init"
|
|
# The init_name variable is not defined at this point.
|
|
lowerer.fndesc.typemap.setdefault(init_name, reduce_info.redvar_typ)
|
|
# Emit a sequence of the reduction operation for each intermediate result
|
|
# of each thread.
|
|
num_thread_llval = lowerer.loadvar(thread_count_var.name)
|
|
with cgutils.for_range(lowerer.builder, num_thread_llval) as loop:
|
|
tid = loop.index
|
|
for inst in reduce_info.redvar_info.reduce_nodes:
|
|
# Var assigns to Var?
|
|
if _lower_var_to_var_assign(lowerer, inst):
|
|
pass
|
|
# The reduction operation?
|
|
elif (isinstance(inst, ir.Assign)
|
|
and any(var.name == init_name for var in inst.list_vars())):
|
|
elem = _emit_getitem_call(tid, lowerer, reduce_info)
|
|
lowerer.storevar(elem, init_name)
|
|
lowerer.lower_inst(inst)
|
|
|
|
# Otherwise?
|
|
else:
|
|
raise ParforsUnexpectedReduceNodeError(inst)
|
|
|
|
# XXX: This seems like a hack to stop the loop with this condition.
|
|
if _fix_redvar_name_ssa_mismatch(parfor, lowerer, inst,
|
|
reduce_info.redvar_name):
|
|
break
|
|
|
|
if config.DEBUG_ARRAY_OPT_RUNTIME:
|
|
varname = reduce_info.redvar_name
|
|
lowerer.print_variable(
|
|
f"{parfor.loc}: parfor non-trivial reduction {varname} =",
|
|
varname,
|
|
)
|
|
|
|
def _lower_var_to_var_assign(lowerer, inst):
|
|
"""Lower Var->Var assignment.
|
|
|
|
Returns True if-and-only-if `inst` is a Var->Var assignment.
|
|
"""
|
|
if isinstance(inst, ir.Assign) and isinstance(inst.value, ir.Var):
|
|
loaded = lowerer.loadvar(inst.value.name)
|
|
lowerer.storevar(loaded, name=inst.target.name)
|
|
return True
|
|
return False
|
|
|
|
def _emit_getitem_call(idx, lowerer, reduce_info):
|
|
"""Emit call to ``redarr_var[idx]``
|
|
"""
|
|
def reducer_getitem(redarr, index):
|
|
return redarr[index]
|
|
|
|
builder = lowerer.builder
|
|
ctx = lowerer.context
|
|
redarr_typ = reduce_info.redarr_typ
|
|
arg_arr = lowerer.loadvar(reduce_info.redarr_var.name)
|
|
args = (arg_arr, idx)
|
|
sig = signature(reduce_info.redvar_typ, redarr_typ, types.intp)
|
|
elem = ctx.compile_internal(builder, reducer_getitem, sig, args)
|
|
return elem
|
|
|
|
|
|
def _emit_binop_reduce_call(binop, lowerer, thread_count_var, reduce_info):
|
|
"""Emit call to the ``binop`` for the reduction variable.
|
|
"""
|
|
|
|
def reduction_add(thread_count, redarr, init):
|
|
c = init
|
|
for i in range(thread_count):
|
|
c += redarr[i]
|
|
return c
|
|
|
|
def reduction_mul(thread_count, redarr, init):
|
|
c = init
|
|
for i in range(thread_count):
|
|
c *= redarr[i]
|
|
return c
|
|
|
|
kernel = {
|
|
operator.iadd: reduction_add,
|
|
operator.isub: reduction_add,
|
|
operator.imul: reduction_mul,
|
|
operator.ifloordiv: reduction_mul,
|
|
operator.itruediv: reduction_mul,
|
|
}[binop]
|
|
|
|
ctx = lowerer.context
|
|
builder = lowerer.builder
|
|
redarr_typ = reduce_info.redarr_typ
|
|
arg_arr = lowerer.loadvar(reduce_info.redarr_var.name)
|
|
|
|
if config.DEBUG_ARRAY_OPT_RUNTIME:
|
|
init_var = reduce_info.redarr_var.scope.get(reduce_info.redvar_name)
|
|
res_print = ir.Print(
|
|
args=[reduce_info.redarr_var, init_var], vararg=None,
|
|
loc=lowerer.loc,
|
|
)
|
|
typemap = lowerer.fndesc.typemap
|
|
lowerer.fndesc.calltypes[res_print] = signature(
|
|
types.none, typemap[reduce_info.redarr_var.name],
|
|
typemap[init_var.name],
|
|
)
|
|
lowerer.lower_inst(res_print)
|
|
|
|
arg_thread_count = lowerer.loadvar(thread_count_var.name)
|
|
args = (arg_thread_count, arg_arr, reduce_info.init_val)
|
|
sig = signature(
|
|
reduce_info.redvar_typ, types.uintp, redarr_typ, reduce_info.redvar_typ,
|
|
)
|
|
|
|
redvar_result = ctx.compile_internal(builder, kernel, sig, args)
|
|
return redvar_result
|
|
|
|
|
|
def _is_inplace_binop_and_rhs_is_init(inst, redvar_name):
|
|
"""Is ``inst`` an inplace-binop and the RHS is the reduction init?
|
|
"""
|
|
if not isinstance(inst, ir.Assign):
|
|
return False
|
|
rhs = inst.value
|
|
if not isinstance(rhs, ir.Expr):
|
|
return False
|
|
if rhs.op != "inplace_binop":
|
|
return False
|
|
if rhs.rhs.name != f"{redvar_name}#init":
|
|
return False
|
|
return True
|
|
|
|
|
|
def _fix_redvar_name_ssa_mismatch(parfor, lowerer, inst, redvar_name):
|
|
"""Fix reduction variable name mismatch due to SSA.
|
|
"""
|
|
# Only process reduction statements post-gufunc execution
|
|
# until we see an assignment with a left-hand side to the
|
|
# reduction variable's name. This fixes problems with
|
|
# cases where there are multiple assignments to the
|
|
# reduction variable in the parfor.
|
|
scope = parfor.init_block.scope
|
|
if isinstance(inst, ir.Assign):
|
|
try:
|
|
reduction_var = scope.get_exact(redvar_name)
|
|
except NotDefinedError:
|
|
# Ideally, this shouldn't happen. The redvar name
|
|
# missing from scope indicates an error from
|
|
# other rewrite passes.
|
|
is_same_source_var = redvar_name == inst.target.name
|
|
else:
|
|
# Because of SSA, the redvar and target var of
|
|
# the current assignment would be different even
|
|
# though they refer to the same source-level var.
|
|
redvar_unver_name = reduction_var.unversioned_name
|
|
target_unver_name = inst.target.unversioned_name
|
|
is_same_source_var = redvar_unver_name == target_unver_name
|
|
|
|
if is_same_source_var:
|
|
# If redvar is different from target var, add an
|
|
# assignment to put target var into redvar.
|
|
if redvar_name != inst.target.name:
|
|
val = lowerer.loadvar(inst.target.name)
|
|
lowerer.storevar(val, name=redvar_name)
|
|
return True
|
|
|
|
return False
|
|
|
|
def _create_shape_signature(
|
|
get_shape_classes,
|
|
num_inputs,
|
|
num_reductions,
|
|
args,
|
|
func_sig,
|
|
races,
|
|
typemap):
|
|
'''Create shape signature for GUFunc
|
|
'''
|
|
if config.DEBUG_ARRAY_OPT:
|
|
print("_create_shape_signature", num_inputs, num_reductions, args, races)
|
|
for i in args[1:]:
|
|
print("argument", i, type(i), get_shape_classes(i, typemap=typemap))
|
|
|
|
num_inouts = len(args) - num_reductions
|
|
# maximum class number for array shapes
|
|
classes = [get_shape_classes(var, typemap=typemap) if var not in races else (-1,) for var in args[1:]]
|
|
class_set = set()
|
|
for _class in classes:
|
|
if _class:
|
|
for i in _class:
|
|
class_set.add(i)
|
|
max_class = max(class_set) + 1 if class_set else 0
|
|
classes.insert(0, (max_class,)) # force set the class of 'sched' argument
|
|
class_set.add(max_class)
|
|
thread_num_class = max_class + 1
|
|
class_set.add(thread_num_class)
|
|
class_map = {}
|
|
# TODO: use prefix + class number instead of single char
|
|
alphabet = ord('a')
|
|
for n in class_set:
|
|
if n >= 0:
|
|
class_map[n] = chr(alphabet)
|
|
alphabet += 1
|
|
threadcount_ordinal = chr(alphabet)
|
|
|
|
alpha_dict = {'latest_alpha' : alphabet}
|
|
|
|
def bump_alpha(c, class_map):
|
|
if c >= 0:
|
|
return class_map[c]
|
|
else:
|
|
alpha_dict['latest_alpha'] += 1
|
|
return chr(alpha_dict['latest_alpha'])
|
|
|
|
gu_sin = []
|
|
gu_sout = []
|
|
count = 0
|
|
syms_sin = ()
|
|
if config.DEBUG_ARRAY_OPT:
|
|
print("args", args)
|
|
print("classes", classes)
|
|
print("threadcount_ordinal", threadcount_ordinal)
|
|
for cls, arg in zip(classes, args):
|
|
count = count + 1
|
|
if cls:
|
|
dim_syms = tuple(bump_alpha(c, class_map) for c in cls)
|
|
else:
|
|
dim_syms = ()
|
|
if (count > num_inouts):
|
|
# Add the threadcount_ordinal to represent the thread count
|
|
# to the start of the reduction array.
|
|
gu_sin.append(tuple([threadcount_ordinal] + list(dim_syms[1:])))
|
|
else:
|
|
gu_sin.append(dim_syms)
|
|
syms_sin += dim_syms
|
|
return (gu_sin, gu_sout)
|
|
|
|
def _print_block(block):
|
|
for i, inst in enumerate(block.body):
|
|
print(" ", i, " ", inst)
|
|
|
|
def _print_body(body_dict):
|
|
'''Pretty-print a set of IR blocks.
|
|
'''
|
|
for label, block in body_dict.items():
|
|
print("label: ", label)
|
|
_print_block(block)
|
|
|
|
|
|
def wrap_loop_body(loop_body):
|
|
blocks = loop_body.copy() # shallow copy is enough
|
|
first_label = min(blocks.keys())
|
|
last_label = max(blocks.keys())
|
|
loc = blocks[last_label].loc
|
|
blocks[last_label].body.append(ir.Jump(first_label, loc))
|
|
return blocks
|
|
|
|
def unwrap_loop_body(loop_body):
|
|
last_label = max(loop_body.keys())
|
|
loop_body[last_label].body = loop_body[last_label].body[:-1]
|
|
|
|
def add_to_def_once_sets(a_def, def_once, def_more):
|
|
'''If the variable is already defined more than once, do nothing.
|
|
Else if defined exactly once previously then transition this
|
|
variable to the defined more than once set (remove it from
|
|
def_once set and add to def_more set).
|
|
Else this must be the first time we've seen this variable defined
|
|
so add to def_once set.
|
|
'''
|
|
if a_def in def_more:
|
|
pass
|
|
elif a_def in def_once:
|
|
def_more.add(a_def)
|
|
def_once.remove(a_def)
|
|
else:
|
|
def_once.add(a_def)
|
|
|
|
def compute_def_once_block(block, def_once, def_more, getattr_taken, typemap, module_assigns):
|
|
'''Effect changes to the set of variables defined once or more than once
|
|
for a single block.
|
|
block - the block to process
|
|
def_once - set of variable names known to be defined exactly once
|
|
def_more - set of variable names known to be defined more than once
|
|
getattr_taken - dict mapping variable name to tuple of object and attribute taken
|
|
module_assigns - dict mapping variable name to the Global that they came from
|
|
'''
|
|
# The only "defs" occur in assignments, so find such instructions.
|
|
assignments = block.find_insts(ir.Assign)
|
|
# For each assignment...
|
|
for one_assign in assignments:
|
|
# Get the LHS/target of the assignment.
|
|
a_def = one_assign.target.name
|
|
# Add variable to def sets.
|
|
add_to_def_once_sets(a_def, def_once, def_more)
|
|
|
|
rhs = one_assign.value
|
|
if isinstance(rhs, ir.Global):
|
|
# Remember assignments of the form "a = Global(...)"
|
|
# Is this a module?
|
|
if isinstance(rhs.value, pytypes.ModuleType):
|
|
module_assigns[a_def] = rhs.value.__name__
|
|
if isinstance(rhs, ir.Expr) and rhs.op == 'getattr' and rhs.value.name in def_once:
|
|
# Remember assignments of the form "a = b.c"
|
|
getattr_taken[a_def] = (rhs.value.name, rhs.attr)
|
|
if isinstance(rhs, ir.Expr) and rhs.op == 'call' and rhs.func.name in getattr_taken:
|
|
# If "a" is being called then lookup the getattr definition of "a"
|
|
# as above, getting the module variable "b" (base_obj)
|
|
# and the attribute "c" (base_attr).
|
|
base_obj, base_attr = getattr_taken[rhs.func.name]
|
|
if base_obj in module_assigns:
|
|
# If we know the definition of the module variable then get the module
|
|
# name from module_assigns.
|
|
base_mod_name = module_assigns[base_obj]
|
|
if not is_const_call(base_mod_name, base_attr):
|
|
# Calling a method on an object could modify the object and is thus
|
|
# like a def of that object. We call is_const_call to see if this module/attribute
|
|
# combination is known to not modify the module state. If we don't know that
|
|
# the combination is safe then we have to assume there could be a modification to
|
|
# the module and thus add the module variable as defined more than once.
|
|
add_to_def_once_sets(base_obj, def_once, def_more)
|
|
else:
|
|
# Assume the worst and say that base_obj could be modified by the call.
|
|
add_to_def_once_sets(base_obj, def_once, def_more)
|
|
if isinstance(rhs, ir.Expr) and rhs.op == 'call':
|
|
# If a mutable object is passed to a function, then it may be changed and
|
|
# therefore can't be hoisted.
|
|
# For each argument to the function...
|
|
for argvar in rhs.args:
|
|
# Get the argument's type.
|
|
if isinstance(argvar, ir.Var):
|
|
argvar = argvar.name
|
|
avtype = typemap[argvar]
|
|
# If that type doesn't have a mutable attribute or it does and it's set to
|
|
# not mutable then this usage is safe for hoisting.
|
|
if getattr(avtype, 'mutable', False):
|
|
# Here we have a mutable variable passed to a function so add this variable
|
|
# to the def lists.
|
|
add_to_def_once_sets(argvar, def_once, def_more)
|
|
|
|
def compute_def_once_internal(loop_body, def_once, def_more, getattr_taken, typemap, module_assigns):
|
|
'''Compute the set of variables defined exactly once in the given set of blocks
|
|
and use the given sets for storing which variables are defined once, more than
|
|
once and which have had a getattr call on them.
|
|
'''
|
|
# For each block...
|
|
for label, block in loop_body.items():
|
|
# Scan this block and effect changes to def_once, def_more, and getattr_taken
|
|
# based on the instructions in that block.
|
|
compute_def_once_block(block, def_once, def_more, getattr_taken, typemap, module_assigns)
|
|
# Have to recursively process parfors manually here.
|
|
for inst in block.body:
|
|
if isinstance(inst, parfor.Parfor):
|
|
# Recursively compute for the parfor's init block.
|
|
compute_def_once_block(inst.init_block, def_once, def_more, getattr_taken, typemap, module_assigns)
|
|
# Recursively compute for the parfor's loop body.
|
|
compute_def_once_internal(inst.loop_body, def_once, def_more, getattr_taken, typemap, module_assigns)
|
|
|
|
def compute_def_once(loop_body, typemap):
|
|
'''Compute the set of variables defined exactly once in the given set of blocks.
|
|
'''
|
|
def_once = set() # set to hold variables defined exactly once
|
|
def_more = set() # set to hold variables defined more than once
|
|
getattr_taken = {}
|
|
module_assigns = {}
|
|
compute_def_once_internal(loop_body, def_once, def_more, getattr_taken, typemap, module_assigns)
|
|
return def_once, def_more
|
|
|
|
def find_vars(var, varset):
|
|
assert isinstance(var, ir.Var)
|
|
varset.add(var.name)
|
|
return var
|
|
|
|
def _hoist_internal(inst, dep_on_param, call_table, hoisted, not_hoisted,
|
|
typemap, stored_arrays):
|
|
if inst.target.name in stored_arrays:
|
|
not_hoisted.append((inst, "stored array"))
|
|
if config.DEBUG_ARRAY_OPT >= 1:
|
|
print("Instruction", inst, " could not be hoisted because the created array is stored.")
|
|
return False
|
|
|
|
uses = set()
|
|
visit_vars_inner(inst.value, find_vars, uses)
|
|
diff = uses.difference(dep_on_param)
|
|
if config.DEBUG_ARRAY_OPT >= 1:
|
|
print("_hoist_internal:", inst, "uses:", uses, "diff:", diff)
|
|
if len(diff) == 0 and is_pure(inst.value, None, call_table):
|
|
if config.DEBUG_ARRAY_OPT >= 1:
|
|
print("Will hoist instruction", inst, typemap[inst.target.name])
|
|
hoisted.append(inst)
|
|
if not isinstance(typemap[inst.target.name], types.npytypes.Array):
|
|
dep_on_param += [inst.target.name]
|
|
return True
|
|
else:
|
|
if len(diff) > 0:
|
|
not_hoisted.append((inst, "dependency"))
|
|
if config.DEBUG_ARRAY_OPT >= 1:
|
|
print("Instruction", inst, " could not be hoisted because of a dependency.")
|
|
else:
|
|
not_hoisted.append((inst, "not pure"))
|
|
if config.DEBUG_ARRAY_OPT >= 1:
|
|
print("Instruction", inst, " could not be hoisted because it isn't pure.")
|
|
return False
|
|
|
|
def find_setitems_block(setitems, itemsset, block, typemap):
|
|
for inst in block.body:
|
|
if isinstance(inst, (ir.StaticSetItem, ir.SetItem)):
|
|
setitems.add(inst.target.name)
|
|
# If we store a non-mutable object into an array then that is safe to hoist.
|
|
# If the stored object is mutable and you hoist then multiple entries in the
|
|
# outer array could reference the same object and changing one index would then
|
|
# change other indices.
|
|
if getattr(typemap[inst.value.name], "mutable", False):
|
|
itemsset.add(inst.value.name)
|
|
elif isinstance(inst, parfor.Parfor):
|
|
find_setitems_block(setitems, itemsset, inst.init_block, typemap)
|
|
find_setitems_body(setitems, itemsset, inst.loop_body, typemap)
|
|
|
|
def find_setitems_body(setitems, itemsset, loop_body, typemap):
|
|
"""
|
|
Find the arrays that are written into (goes into setitems) and the
|
|
mutable objects (mostly arrays) that are written into other arrays
|
|
(goes into itemsset).
|
|
"""
|
|
for label, block in loop_body.items():
|
|
find_setitems_block(setitems, itemsset, block, typemap)
|
|
|
|
def empty_container_allocator_hoist(inst, dep_on_param, call_table, hoisted,
|
|
not_hoisted, typemap, stored_arrays):
|
|
if (isinstance(inst, ir.Assign) and
|
|
isinstance(inst.value, ir.Expr) and
|
|
inst.value.op == 'call' and
|
|
inst.value.func.name in call_table):
|
|
call_list = call_table[inst.value.func.name]
|
|
if call_list == ['empty', np]:
|
|
return _hoist_internal(inst, dep_on_param, call_table, hoisted,
|
|
not_hoisted, typemap, stored_arrays)
|
|
return False
|
|
|
|
def hoist(parfor_params, loop_body, typemap, wrapped_blocks):
|
|
dep_on_param = copy.copy(parfor_params)
|
|
hoisted = []
|
|
not_hoisted = []
|
|
|
|
# Compute the set of variable defined exactly once in the loop body.
|
|
def_once, def_more = compute_def_once(loop_body, typemap)
|
|
(call_table, reverse_call_table) = get_call_table(wrapped_blocks)
|
|
|
|
setitems = set()
|
|
itemsset = set()
|
|
find_setitems_body(setitems, itemsset, loop_body, typemap)
|
|
dep_on_param = list(set(dep_on_param).difference(setitems))
|
|
if config.DEBUG_ARRAY_OPT >= 1:
|
|
print("hoist - def_once:", def_once, "setitems:", setitems, "itemsset:", itemsset, "dep_on_param:", dep_on_param, "parfor_params:", parfor_params)
|
|
for si in setitems:
|
|
add_to_def_once_sets(si, def_once, def_more)
|
|
|
|
for label, block in loop_body.items():
|
|
new_block = []
|
|
for inst in block.body:
|
|
if empty_container_allocator_hoist(inst, dep_on_param, call_table,
|
|
hoisted, not_hoisted, typemap, itemsset):
|
|
continue
|
|
elif isinstance(inst, ir.Assign) and inst.target.name in def_once:
|
|
if _hoist_internal(inst, dep_on_param, call_table,
|
|
hoisted, not_hoisted, typemap, itemsset):
|
|
# don't add this instruction to the block since it is
|
|
# hoisted
|
|
continue
|
|
elif isinstance(inst, parfor.Parfor):
|
|
new_init_block = []
|
|
if config.DEBUG_ARRAY_OPT >= 1:
|
|
print("parfor")
|
|
inst.dump()
|
|
for ib_inst in inst.init_block.body:
|
|
if empty_container_allocator_hoist(ib_inst, dep_on_param,
|
|
call_table, hoisted, not_hoisted, typemap, itemsset):
|
|
continue
|
|
elif (isinstance(ib_inst, ir.Assign) and
|
|
ib_inst.target.name in def_once):
|
|
if _hoist_internal(ib_inst, dep_on_param, call_table,
|
|
hoisted, not_hoisted, typemap, itemsset):
|
|
# don't add this instruction to the block since it is hoisted
|
|
continue
|
|
new_init_block.append(ib_inst)
|
|
inst.init_block.body = new_init_block
|
|
|
|
new_block.append(inst)
|
|
block.body = new_block
|
|
return hoisted, not_hoisted
|
|
|
|
def redtyp_is_scalar(redtype):
|
|
return not isinstance(redtype, types.npytypes.Array)
|
|
|
|
def redtyp_to_redarraytype(redtyp):
|
|
"""Go from a reducation variable type to a reduction array type used to hold
|
|
per-worker results.
|
|
"""
|
|
redarrdim = 1
|
|
# If the reduction type is an array then allocate reduction array with ndim+1 dimensions.
|
|
if isinstance(redtyp, types.npytypes.Array):
|
|
redarrdim += redtyp.ndim
|
|
# We don't create array of array but multi-dimensional reduction array with same dtype.
|
|
redtyp = redtyp.dtype
|
|
return types.npytypes.Array(redtyp, redarrdim, "C")
|
|
|
|
def redarraytype_to_sig(redarraytyp):
|
|
"""Given a reduction array type, find the type of the reduction argument to the gufunc.
|
|
"""
|
|
assert isinstance(redarraytyp, types.npytypes.Array)
|
|
return types.npytypes.Array(redarraytyp.dtype, redarraytyp.ndim, redarraytyp.layout)
|
|
|
|
def legalize_names_with_typemap(names, typemap):
|
|
""" We use ir_utils.legalize_names to replace internal IR variable names
|
|
containing illegal characters (e.g. period) with a legal character
|
|
(underscore) so as to create legal variable names.
|
|
The original variable names are in the typemap so we also
|
|
need to add the legalized name to the typemap as well.
|
|
"""
|
|
outdict = legalize_names(names)
|
|
# For each pair in the dict of legalized names...
|
|
for x, y in outdict.items():
|
|
# If the name had some legalization change to it...
|
|
if x != y:
|
|
# Set the type of the new name the same as the type of the old name.
|
|
typemap[y] = typemap[x]
|
|
return outdict
|
|
|
|
def to_scalar_from_0d(x):
|
|
if isinstance(x, types.ArrayCompatible):
|
|
if x.ndim == 0:
|
|
return x.dtype
|
|
return x
|
|
|
|
def _create_gufunc_for_parfor_body(
|
|
lowerer,
|
|
parfor,
|
|
typemap,
|
|
typingctx,
|
|
targetctx,
|
|
flags,
|
|
locals,
|
|
has_aliases,
|
|
index_var_typ,
|
|
races):
|
|
'''
|
|
Takes a parfor and creates a gufunc function for its body.
|
|
There are two parts to this function.
|
|
1) Code to iterate across the iteration space as defined by the schedule.
|
|
2) The parfor body that does the work for a single point in the iteration space.
|
|
Part 1 is created as Python text for simplicity with a sentinel assignment to mark the point
|
|
in the IR where the parfor body should be added.
|
|
This Python text is 'exec'ed into existence and its IR retrieved with run_frontend.
|
|
The IR is scanned for the sentinel assignment where that basic block is split and the IR
|
|
for the parfor body inserted.
|
|
'''
|
|
if config.DEBUG_ARRAY_OPT >= 1:
|
|
print("starting _create_gufunc_for_parfor_body")
|
|
|
|
loc = parfor.init_block.loc
|
|
|
|
# The parfor body and the main function body share ir.Var nodes.
|
|
# We have to do some replacements of Var names in the parfor body to make them
|
|
# legal parameter names. If we don't copy then the Vars in the main function also
|
|
# would incorrectly change their name.
|
|
loop_body = copy.copy(parfor.loop_body)
|
|
remove_dels(loop_body)
|
|
|
|
parfor_dim = len(parfor.loop_nests)
|
|
loop_indices = [l.index_variable.name for l in parfor.loop_nests]
|
|
|
|
# Get all the parfor params.
|
|
parfor_params = parfor.params
|
|
# Get just the outputs of the parfor.
|
|
parfor_outputs = numba.parfors.parfor.get_parfor_outputs(parfor, parfor_params)
|
|
# Get all parfor reduction vars, and operators.
|
|
typemap = lowerer.fndesc.typemap
|
|
parfor_redvars, parfor_reddict = numba.parfors.parfor.get_parfor_reductions(
|
|
lowerer.func_ir, parfor, parfor_params, lowerer.fndesc.calltypes)
|
|
# Compute just the parfor inputs as a set difference.
|
|
parfor_inputs = sorted(
|
|
list(
|
|
set(parfor_params) -
|
|
set(parfor_outputs) -
|
|
set(parfor_redvars)))
|
|
|
|
if config.DEBUG_ARRAY_OPT >= 1:
|
|
print("parfor_params = ", parfor_params, " ", type(parfor_params))
|
|
print("parfor_outputs = ", parfor_outputs, " ", type(parfor_outputs))
|
|
print("parfor_inputs = ", parfor_inputs, " ", type(parfor_inputs))
|
|
print("parfor_redvars = ", parfor_redvars, " ", type(parfor_redvars))
|
|
|
|
# -------------------------------------------------------------------------
|
|
# Convert tuples to individual parameters.
|
|
tuple_expanded_parfor_inputs = []
|
|
tuple_var_to_expanded_names = {}
|
|
expanded_name_to_tuple_var = {}
|
|
next_expanded_tuple_var = 0
|
|
parfor_tuple_params = []
|
|
# For each input to the parfor.
|
|
for pi in parfor_inputs:
|
|
# Get the type of the input.
|
|
pi_type = typemap[pi]
|
|
# If it is a UniTuple or Tuple we will do the conversion.
|
|
if isinstance(pi_type, types.UniTuple) or isinstance(pi_type, types.NamedUniTuple):
|
|
# Get the size and dtype of the tuple.
|
|
tuple_count = pi_type.count
|
|
tuple_dtype = pi_type.dtype
|
|
# Only do tuples up to config.PARFOR_MAX_TUPLE_SIZE length.
|
|
assert(tuple_count <= config.PARFOR_MAX_TUPLE_SIZE)
|
|
this_var_expansion = []
|
|
for i in range(tuple_count):
|
|
# Generate a new name for the individual part of the tuple var.
|
|
expanded_name = "expanded_tuple_var_" + str(next_expanded_tuple_var)
|
|
# Add that name to the new list of inputs to the gufunc.
|
|
tuple_expanded_parfor_inputs.append(expanded_name)
|
|
this_var_expansion.append(expanded_name)
|
|
# Remember a mapping from new param name to original tuple
|
|
# var and the index within the tuple.
|
|
expanded_name_to_tuple_var[expanded_name] = (pi, i)
|
|
next_expanded_tuple_var += 1
|
|
# Set the type of the new parameter.
|
|
typemap[expanded_name] = tuple_dtype
|
|
# Remember a mapping from the original tuple var to the
|
|
# individual parts.
|
|
tuple_var_to_expanded_names[pi] = this_var_expansion
|
|
parfor_tuple_params.append(pi)
|
|
elif isinstance(pi_type, types.Tuple) or isinstance(pi_type, types.NamedTuple):
|
|
# This is the same as above for UniTuple except that each part of
|
|
# the tuple can have a different type and we fetch that type with
|
|
# pi_type.types[offset].
|
|
tuple_count = pi_type.count
|
|
tuple_types = pi_type.types
|
|
# Only do tuples up to config.PARFOR_MAX_TUPLE_SIZE length.
|
|
assert(tuple_count <= config.PARFOR_MAX_TUPLE_SIZE)
|
|
this_var_expansion = []
|
|
for i in range(tuple_count):
|
|
expanded_name = "expanded_tuple_var_" + str(next_expanded_tuple_var)
|
|
tuple_expanded_parfor_inputs.append(expanded_name)
|
|
this_var_expansion.append(expanded_name)
|
|
expanded_name_to_tuple_var[expanded_name] = (pi, i)
|
|
next_expanded_tuple_var += 1
|
|
typemap[expanded_name] = tuple_types[i]
|
|
tuple_var_to_expanded_names[pi] = this_var_expansion
|
|
parfor_tuple_params.append(pi)
|
|
else:
|
|
tuple_expanded_parfor_inputs.append(pi)
|
|
parfor_inputs = tuple_expanded_parfor_inputs
|
|
if config.DEBUG_ARRAY_OPT >= 1:
|
|
print("parfor_inputs post tuple handling = ", parfor_inputs, " ", type(parfor_inputs))
|
|
# -------------------------------------------------------------------------
|
|
|
|
races = races.difference(set(parfor_redvars))
|
|
for race in races:
|
|
msg = ("Variable %s used in parallel loop may be written "
|
|
"to simultaneously by multiple workers and may result "
|
|
"in non-deterministic or unintended results." % race)
|
|
warnings.warn(NumbaParallelSafetyWarning(msg, loc))
|
|
replace_var_with_array(races, loop_body, typemap, lowerer.fndesc.calltypes)
|
|
|
|
# Reduction variables are represented as arrays, so they go under
|
|
# different names.
|
|
parfor_redarrs = []
|
|
parfor_red_arg_types = []
|
|
for var in parfor_redvars:
|
|
arr = var + "_arr"
|
|
parfor_redarrs.append(arr)
|
|
redarraytype = redtyp_to_redarraytype(typemap[var])
|
|
parfor_red_arg_types.append(redarraytype)
|
|
redarrsig = redarraytype_to_sig(redarraytype)
|
|
if arr in typemap:
|
|
assert(typemap[arr] == redarrsig)
|
|
else:
|
|
typemap[arr] = redarrsig
|
|
|
|
# Reorder all the params so that inputs go first then outputs.
|
|
parfor_params = parfor_inputs + parfor_outputs + parfor_redarrs
|
|
|
|
if config.DEBUG_ARRAY_OPT >= 1:
|
|
print("parfor_params = ", parfor_params, " ", type(parfor_params))
|
|
print("loop_indices = ", loop_indices, " ", type(loop_indices))
|
|
print("loop_body = ", loop_body, " ", type(loop_body))
|
|
_print_body(loop_body)
|
|
|
|
# Some Var are not legal parameter names so create a dict of potentially illegal
|
|
# param name to guaranteed legal name.
|
|
param_dict = legalize_names_with_typemap(parfor_params + parfor_redvars + parfor_tuple_params, typemap)
|
|
if config.DEBUG_ARRAY_OPT >= 1:
|
|
print(
|
|
"param_dict = ",
|
|
sorted(
|
|
param_dict.items()),
|
|
" ",
|
|
type(param_dict))
|
|
|
|
# Some loop_indices are not legal parameter names so create a dict of potentially illegal
|
|
# loop index to guaranteed legal name.
|
|
ind_dict = legalize_names_with_typemap(loop_indices, typemap)
|
|
# Compute a new list of legal loop index names.
|
|
legal_loop_indices = [ind_dict[v] for v in loop_indices]
|
|
if config.DEBUG_ARRAY_OPT >= 1:
|
|
print("ind_dict = ", sorted(ind_dict.items()), " ", type(ind_dict))
|
|
print(
|
|
"legal_loop_indices = ",
|
|
legal_loop_indices,
|
|
" ",
|
|
type(legal_loop_indices))
|
|
for pd in parfor_params:
|
|
print("pd = ", pd)
|
|
print("pd type = ", typemap[pd], " ", type(typemap[pd]))
|
|
|
|
# Get the types of each parameter.
|
|
param_types = [to_scalar_from_0d(typemap[v]) for v in parfor_params]
|
|
# Calculate types of args passed to gufunc.
|
|
func_arg_types = [typemap[v] for v in (parfor_inputs + parfor_outputs)] + parfor_red_arg_types
|
|
if config.DEBUG_ARRAY_OPT >= 1:
|
|
print("new param_types:", param_types)
|
|
print("new func_arg_types:", func_arg_types)
|
|
|
|
# Replace illegal parameter names in the loop body with legal ones.
|
|
replace_var_names(loop_body, param_dict)
|
|
# remember the name before legalizing as the actual arguments
|
|
parfor_args = parfor_params
|
|
# Change parfor_params to be legal names.
|
|
parfor_params = [param_dict[v] for v in parfor_params]
|
|
parfor_params_orig = parfor_params
|
|
|
|
parfor_params = []
|
|
ascontig = False
|
|
for pindex in range(len(parfor_params_orig)):
|
|
if (ascontig and
|
|
pindex < len(parfor_inputs) and
|
|
isinstance(param_types[pindex], types.npytypes.Array)):
|
|
parfor_params.append(parfor_params_orig[pindex]+"param")
|
|
else:
|
|
parfor_params.append(parfor_params_orig[pindex])
|
|
|
|
# Change parfor body to replace illegal loop index vars with legal ones.
|
|
replace_var_names(loop_body, ind_dict)
|
|
loop_body_var_table = get_name_var_table(loop_body)
|
|
sentinel_name = get_unused_var_name("__sentinel__", loop_body_var_table)
|
|
|
|
if config.DEBUG_ARRAY_OPT >= 1:
|
|
print(
|
|
"legal parfor_params = ",
|
|
parfor_params,
|
|
" ",
|
|
type(parfor_params))
|
|
|
|
# Determine the unique names of the scheduling and gufunc functions.
|
|
# sched_func_name = "__numba_parfor_sched_%s" % (hex(hash(parfor)).replace("-", "_"))
|
|
gufunc_name = "__numba_parfor_gufunc_%s" % (
|
|
hex(hash(parfor)).replace("-", "_"))
|
|
if config.DEBUG_ARRAY_OPT:
|
|
# print("sched_func_name ", type(sched_func_name), " ", sched_func_name)
|
|
print("gufunc_name ", type(gufunc_name), " ", gufunc_name)
|
|
|
|
gufunc_txt = ""
|
|
|
|
# Create the gufunc function.
|
|
gufunc_txt += "def " + gufunc_name + \
|
|
"(sched, " + (", ".join(parfor_params)) + "):\n"
|
|
|
|
globls = {"np": np, "numba": numba}
|
|
|
|
# First thing in the gufunc, we reconstruct tuples from their
|
|
# individual parts, e.g., orig_tup_name = (part1, part2,).
|
|
# The rest of the code of the function will use the original tuple name.
|
|
for tup_var, exp_names in tuple_var_to_expanded_names.items():
|
|
tup_type = typemap[tup_var]
|
|
gufunc_txt += " " + param_dict[tup_var]
|
|
# Determine if the tuple is a named tuple.
|
|
if (isinstance(tup_type, types.NamedTuple) or
|
|
isinstance(tup_type, types.NamedUniTuple)):
|
|
named_tup = True
|
|
else:
|
|
named_tup = False
|
|
|
|
if named_tup:
|
|
# It is a named tuple so try to find the global that defines the
|
|
# named tuple.
|
|
func_def = guard(get_definition, lowerer.func_ir, tup_var)
|
|
named_tuple_def = None
|
|
if config.DEBUG_ARRAY_OPT:
|
|
print("func_def:", func_def, type(func_def))
|
|
if func_def is not None:
|
|
if (isinstance(func_def, ir.Expr) and
|
|
func_def.op == 'call'):
|
|
named_tuple_def = guard(get_definition, lowerer.func_ir, func_def.func)
|
|
if config.DEBUG_ARRAY_OPT:
|
|
print("named_tuple_def:", named_tuple_def, type(named_tuple_def))
|
|
elif isinstance(func_def, ir.Arg):
|
|
named_tuple_def = typemap[func_def.name]
|
|
if config.DEBUG_ARRAY_OPT:
|
|
print("named_tuple_def:", named_tuple_def,
|
|
type(named_tuple_def), named_tuple_def.name)
|
|
if named_tuple_def is not None:
|
|
if (isinstance(named_tuple_def, ir.Global) or
|
|
isinstance(named_tuple_def, ir.FreeVar)):
|
|
gval = named_tuple_def.value
|
|
if config.DEBUG_ARRAY_OPT:
|
|
print("gval:", gval, type(gval))
|
|
globls[named_tuple_def.name] = gval
|
|
elif isinstance(named_tuple_def, types.containers.BaseNamedTuple):
|
|
named_tuple_name = named_tuple_def.name.split('(')[0]
|
|
if config.DEBUG_ARRAY_OPT:
|
|
print("name:", named_tuple_name,
|
|
named_tuple_def.instance_class,
|
|
type(named_tuple_def.instance_class))
|
|
globls[named_tuple_name] = named_tuple_def.instance_class
|
|
else:
|
|
if config.DEBUG_ARRAY_OPT:
|
|
print("Didn't find definition of namedtuple for globls.")
|
|
raise CompilerError("Could not find definition of " + str(tup_var),
|
|
tup_var.loc)
|
|
gufunc_txt += " = " + tup_type.instance_class.__name__ + "("
|
|
for name, field_name in zip(exp_names, tup_type.fields):
|
|
gufunc_txt += field_name + "=" + param_dict[name] + ","
|
|
else:
|
|
# Just a regular tuple so use (part0, part1, ...)
|
|
gufunc_txt += " = (" + ", ".join([param_dict[x] for x in exp_names])
|
|
if len(exp_names) == 1:
|
|
# Add comma for tuples with singular values. We can't unilaterally
|
|
# add a comma always because (,) isn't valid.
|
|
gufunc_txt += ","
|
|
|
|
gufunc_txt += ")\n"
|
|
|
|
for pindex in range(len(parfor_inputs)):
|
|
if ascontig and isinstance(param_types[pindex], types.npytypes.Array):
|
|
gufunc_txt += (" " + parfor_params_orig[pindex]
|
|
+ " = np.ascontiguousarray(" + parfor_params[pindex] + ")\n")
|
|
|
|
gufunc_thread_id_var = "ParallelAcceleratorGufuncThreadId"
|
|
|
|
if len(parfor_redarrs) > 0:
|
|
gufunc_txt += " " + gufunc_thread_id_var + " = "
|
|
gufunc_txt += "numba.np.ufunc.parallel._iget_thread_id()\n"
|
|
|
|
# Add initialization of reduction variables
|
|
for arr, var in zip(parfor_redarrs, parfor_redvars):
|
|
gufunc_txt += " " + param_dict[var] + \
|
|
"=" + param_dict[arr] + "[" + gufunc_thread_id_var + "]\n"
|
|
if config.DEBUG_ARRAY_OPT_RUNTIME:
|
|
gufunc_txt += " print(\"thread id =\", ParallelAcceleratorGufuncThreadId)\n"
|
|
gufunc_txt += " print(\"initial reduction value\",ParallelAcceleratorGufuncThreadId," + param_dict[var] + "," + param_dict[var] + ".shape)\n"
|
|
gufunc_txt += " print(\"reduction array\",ParallelAcceleratorGufuncThreadId," + param_dict[arr] + "," + param_dict[arr] + ".shape)\n"
|
|
|
|
# For each dimension of the parfor, create a for loop in the generated gufunc function.
|
|
# Iterate across the proper values extracted from the schedule.
|
|
# The form of the schedule is start_dim0, start_dim1, ..., start_dimN, end_dim0,
|
|
# end_dim1, ..., end_dimN
|
|
for eachdim in range(parfor_dim):
|
|
for indent in range(eachdim + 1):
|
|
gufunc_txt += " "
|
|
sched_dim = eachdim
|
|
gufunc_txt += ("for " +
|
|
legal_loop_indices[eachdim] +
|
|
" in range(sched[" +
|
|
str(sched_dim) +
|
|
"], sched[" +
|
|
str(sched_dim +
|
|
parfor_dim) +
|
|
"] + np.uint8(1)):\n")
|
|
|
|
if config.DEBUG_ARRAY_OPT_RUNTIME:
|
|
for indent in range(parfor_dim + 1):
|
|
gufunc_txt += " "
|
|
gufunc_txt += "print("
|
|
for eachdim in range(parfor_dim):
|
|
gufunc_txt += "\"" + legal_loop_indices[eachdim] + "\"," + legal_loop_indices[eachdim] + ","
|
|
gufunc_txt += ")\n"
|
|
|
|
# Add the sentinel assignment so that we can find the loop body position
|
|
# in the IR.
|
|
for indent in range(parfor_dim + 1):
|
|
gufunc_txt += " "
|
|
gufunc_txt += sentinel_name + " = 0\n"
|
|
# Add assignments of reduction variables (for returning the value)
|
|
for arr, var in zip(parfor_redarrs, parfor_redvars):
|
|
if config.DEBUG_ARRAY_OPT_RUNTIME:
|
|
gufunc_txt += " print(\"final reduction value\",ParallelAcceleratorGufuncThreadId," + param_dict[var] + ")\n"
|
|
gufunc_txt += " print(\"final reduction array\",ParallelAcceleratorGufuncThreadId," + param_dict[arr] + ")\n"
|
|
# After the gufunc loops, copy the accumulated temp value back to reduction array.
|
|
gufunc_txt += " " + param_dict[arr] + \
|
|
"[" + gufunc_thread_id_var + "] = " + param_dict[var] + "\n"
|
|
gufunc_txt += " return None\n"
|
|
|
|
if config.DEBUG_ARRAY_OPT:
|
|
print("gufunc_txt = ", type(gufunc_txt), "\n", gufunc_txt)
|
|
print("globls:", globls, type(globls))
|
|
# Force gufunc outline into existence.
|
|
locls = {}
|
|
exec(gufunc_txt, globls, locls)
|
|
gufunc_func = locls[gufunc_name]
|
|
|
|
if config.DEBUG_ARRAY_OPT:
|
|
print("gufunc_func = ", type(gufunc_func), "\n", gufunc_func)
|
|
# Get the IR for the gufunc outline.
|
|
gufunc_ir = compiler.run_frontend(gufunc_func)
|
|
if config.DEBUG_ARRAY_OPT:
|
|
print("gufunc_ir dump ", type(gufunc_ir))
|
|
gufunc_ir.dump()
|
|
print("loop_body dump ", type(loop_body))
|
|
_print_body(loop_body)
|
|
|
|
# rename all variables in gufunc_ir afresh
|
|
var_table = get_name_var_table(gufunc_ir.blocks)
|
|
new_var_dict = {}
|
|
reserved_names = [sentinel_name] + \
|
|
list(param_dict.values()) + legal_loop_indices
|
|
for name, var in var_table.items():
|
|
if not (name in reserved_names):
|
|
new_var_dict[name] = parfor.init_block.scope.redefine(name, loc).name
|
|
replace_var_names(gufunc_ir.blocks, new_var_dict)
|
|
if config.DEBUG_ARRAY_OPT:
|
|
print("gufunc_ir dump after renaming ")
|
|
gufunc_ir.dump()
|
|
gufunc_param_types = [types.npytypes.Array(
|
|
index_var_typ, 1, "C")] + param_types
|
|
if config.DEBUG_ARRAY_OPT:
|
|
print(
|
|
"gufunc_param_types = ",
|
|
type(gufunc_param_types),
|
|
"\n",
|
|
gufunc_param_types)
|
|
|
|
gufunc_stub_last_label = find_max_label(gufunc_ir.blocks) + 1
|
|
|
|
# Add gufunc stub last label to each parfor.loop_body label to prevent
|
|
# label conflicts.
|
|
loop_body = add_offset_to_labels(loop_body, gufunc_stub_last_label)
|
|
# new label for splitting sentinel block
|
|
new_label = find_max_label(loop_body) + 1
|
|
|
|
# If enabled, add a print statement after every assignment.
|
|
if config.DEBUG_ARRAY_OPT_RUNTIME:
|
|
for label, block in loop_body.items():
|
|
new_block = block.copy()
|
|
new_block.clear()
|
|
loc = block.loc
|
|
scope = block.scope
|
|
for inst in block.body:
|
|
new_block.append(inst)
|
|
# Append print after assignment
|
|
if isinstance(inst, ir.Assign):
|
|
# Only apply to numbers
|
|
if typemap[inst.target.name] not in types.number_domain:
|
|
continue
|
|
|
|
# Make constant string
|
|
strval = "{} =".format(inst.target.name)
|
|
strconsttyp = types.StringLiteral(strval)
|
|
|
|
lhs = scope.redefine("str_const", loc)
|
|
# lhs = ir.Var(scope, mk_unique_var("str_const"), loc)
|
|
assign_lhs = ir.Assign(value=ir.Const(value=strval, loc=loc),
|
|
target=lhs, loc=loc)
|
|
typemap[lhs.name] = strconsttyp
|
|
new_block.append(assign_lhs)
|
|
|
|
# Make print node
|
|
print_node = ir.Print(args=[lhs, inst.target], vararg=None, loc=loc)
|
|
new_block.append(print_node)
|
|
sig = numba.core.typing.signature(types.none,
|
|
typemap[lhs.name],
|
|
typemap[inst.target.name])
|
|
lowerer.fndesc.calltypes[print_node] = sig
|
|
loop_body[label] = new_block
|
|
|
|
if config.DEBUG_ARRAY_OPT:
|
|
print("parfor loop body")
|
|
_print_body(loop_body)
|
|
|
|
wrapped_blocks = wrap_loop_body(loop_body)
|
|
hoisted, not_hoisted = hoist(parfor_params, loop_body, typemap, wrapped_blocks)
|
|
start_block = gufunc_ir.blocks[min(gufunc_ir.blocks.keys())]
|
|
start_block.body = start_block.body[:-1] + hoisted + [start_block.body[-1]]
|
|
unwrap_loop_body(loop_body)
|
|
|
|
# store hoisted into diagnostics
|
|
diagnostics = lowerer.metadata['parfor_diagnostics']
|
|
diagnostics.hoist_info[parfor.id] = {'hoisted': hoisted,
|
|
'not_hoisted': not_hoisted}
|
|
|
|
if config.DEBUG_ARRAY_OPT:
|
|
print("After hoisting")
|
|
_print_body(loop_body)
|
|
|
|
# Search all the block in the gufunc outline for the sentinel assignment.
|
|
for label, block in gufunc_ir.blocks.items():
|
|
for i, inst in enumerate(block.body):
|
|
if isinstance(
|
|
inst,
|
|
ir.Assign) and inst.target.name == sentinel_name:
|
|
# We found the sentinel assignment.
|
|
loc = inst.loc
|
|
scope = block.scope
|
|
# split block across __sentinel__
|
|
# A new block is allocated for the statements prior to the sentinel
|
|
# but the new block maintains the current block label.
|
|
prev_block = ir.Block(scope, loc)
|
|
prev_block.body = block.body[:i]
|
|
# The current block is used for statements after the sentinel.
|
|
block.body = block.body[i + 1:]
|
|
# But the current block gets a new label.
|
|
body_first_label = min(loop_body.keys())
|
|
|
|
# The previous block jumps to the minimum labelled block of the
|
|
# parfor body.
|
|
prev_block.append(ir.Jump(body_first_label, loc))
|
|
# Add all the parfor loop body blocks to the gufunc function's
|
|
# IR.
|
|
for (l, b) in loop_body.items():
|
|
gufunc_ir.blocks[l] = transfer_scope(b, scope)
|
|
body_last_label = max(loop_body.keys())
|
|
gufunc_ir.blocks[new_label] = block
|
|
gufunc_ir.blocks[label] = prev_block
|
|
# Add a jump from the last parfor body block to the block containing
|
|
# statements after the sentinel.
|
|
gufunc_ir.blocks[body_last_label].append(
|
|
ir.Jump(new_label, loc))
|
|
break
|
|
else:
|
|
continue
|
|
break
|
|
|
|
if config.DEBUG_ARRAY_OPT:
|
|
print("gufunc_ir last dump before renaming")
|
|
gufunc_ir.dump()
|
|
|
|
gufunc_ir.blocks = rename_labels(gufunc_ir.blocks)
|
|
remove_dels(gufunc_ir.blocks)
|
|
|
|
if config.DEBUG_ARRAY_OPT:
|
|
print("gufunc_ir last dump")
|
|
gufunc_ir.dump()
|
|
print("flags", flags)
|
|
print("typemap", typemap)
|
|
|
|
old_alias = flags.noalias
|
|
if not has_aliases:
|
|
if config.DEBUG_ARRAY_OPT:
|
|
print("No aliases found so adding noalias flag.")
|
|
flags.noalias = True
|
|
|
|
fixup_var_define_in_scope(gufunc_ir.blocks)
|
|
|
|
class ParforGufuncCompiler(compiler.CompilerBase):
|
|
def define_pipelines(self):
|
|
from numba.core.compiler_machinery import PassManager
|
|
dpb = compiler.DefaultPassBuilder
|
|
pm = PassManager("full_parfor_gufunc")
|
|
parfor_gufunc_passes = dpb.define_parfor_gufunc_pipeline(self.state)
|
|
pm.passes.extend(parfor_gufunc_passes.passes)
|
|
lowering_passes = dpb.define_parfor_gufunc_nopython_lowering_pipeline(self.state)
|
|
pm.passes.extend(lowering_passes.passes)
|
|
|
|
pm.finalize()
|
|
return [pm]
|
|
|
|
kernel_func = compiler.compile_ir(
|
|
typingctx,
|
|
targetctx,
|
|
gufunc_ir,
|
|
gufunc_param_types,
|
|
types.none,
|
|
flags,
|
|
locals,
|
|
pipeline_class=ParforGufuncCompiler)
|
|
|
|
flags.noalias = old_alias
|
|
|
|
kernel_sig = signature(types.none, *gufunc_param_types)
|
|
if config.DEBUG_ARRAY_OPT:
|
|
print("finished create_gufunc_for_parfor_body. kernel_sig = ", kernel_sig)
|
|
|
|
return kernel_func, parfor_args, kernel_sig, func_arg_types, expanded_name_to_tuple_var
|
|
|
|
def replace_var_with_array_in_block(vars, block, typemap, calltypes):
|
|
new_block = []
|
|
for inst in block.body:
|
|
if isinstance(inst, ir.Assign) and inst.target.name in vars:
|
|
loc = inst.loc
|
|
scope = inst.target.scope
|
|
|
|
const_node = ir.Const(0, loc)
|
|
const_var = scope.redefine("$const_ind_0", loc)
|
|
typemap[const_var.name] = types.uintp
|
|
const_assign = ir.Assign(const_node, const_var, loc)
|
|
new_block.append(const_assign)
|
|
|
|
val_var = scope.redefine("$val", loc)
|
|
typemap[val_var.name] = typemap[inst.target.name]
|
|
new_block.append(ir.Assign(inst.value, val_var, loc))
|
|
setitem_node = ir.SetItem(inst.target, const_var, val_var, loc)
|
|
calltypes[setitem_node] = signature(
|
|
types.none, types.npytypes.Array(typemap[inst.target.name], 1, "C"), types.intp, typemap[inst.target.name])
|
|
new_block.append(setitem_node)
|
|
continue
|
|
elif isinstance(inst, parfor.Parfor):
|
|
replace_var_with_array_internal(vars, {0: inst.init_block}, typemap, calltypes)
|
|
replace_var_with_array_internal(vars, inst.loop_body, typemap, calltypes)
|
|
|
|
new_block.append(inst)
|
|
return new_block
|
|
|
|
def replace_var_with_array_internal(vars, loop_body, typemap, calltypes):
|
|
for label, block in loop_body.items():
|
|
block.body = replace_var_with_array_in_block(vars, block, typemap, calltypes)
|
|
|
|
def replace_var_with_array(vars, loop_body, typemap, calltypes):
|
|
replace_var_with_array_internal(vars, loop_body, typemap, calltypes)
|
|
for v in vars:
|
|
el_typ = typemap[v]
|
|
typemap.pop(v, None)
|
|
typemap[v] = types.npytypes.Array(el_typ, 1, "C")
|
|
|
|
def call_parallel_gufunc(lowerer, cres, gu_signature, outer_sig, expr_args, expr_arg_types,
|
|
loop_ranges, redvars, reddict, redarrdict, init_block, index_var_typ, races,
|
|
exp_name_to_tuple_var):
|
|
'''
|
|
Adds the call to the gufunc function from the main function.
|
|
'''
|
|
context = lowerer.context
|
|
builder = lowerer.builder
|
|
|
|
from numba.np.ufunc.parallel import (build_gufunc_wrapper,
|
|
_launch_threads)
|
|
|
|
if config.DEBUG_ARRAY_OPT:
|
|
print("make_parallel_loop")
|
|
print("outer_sig = ", outer_sig.args, outer_sig.return_type,
|
|
outer_sig.recvr, outer_sig.pysig)
|
|
print("loop_ranges = ", loop_ranges)
|
|
print("expr_args", expr_args)
|
|
print("expr_arg_types", expr_arg_types)
|
|
print("gu_signature", gu_signature)
|
|
|
|
# Build the wrapper for GUFunc
|
|
args, return_type = sigutils.normalize_signature(outer_sig)
|
|
llvm_func = cres.library.get_function(cres.fndesc.llvm_func_name)
|
|
sin, sout = gu_signature
|
|
|
|
# These are necessary for build_gufunc_wrapper to find external symbols
|
|
_launch_threads()
|
|
|
|
info = build_gufunc_wrapper(llvm_func, cres, sin, sout,
|
|
cache=False, is_parfors=True)
|
|
wrapper_name = info.name
|
|
cres.library._ensure_finalized()
|
|
|
|
if config.DEBUG_ARRAY_OPT:
|
|
print("parallel function = ", wrapper_name, cres)
|
|
|
|
# loadvars for loop_ranges
|
|
def load_range(v):
|
|
if isinstance(v, ir.Var):
|
|
return lowerer.loadvar(v.name)
|
|
else:
|
|
return context.get_constant(types.uintp, v)
|
|
|
|
num_dim = len(loop_ranges)
|
|
for i in range(num_dim):
|
|
start, stop, step = loop_ranges[i]
|
|
start = load_range(start)
|
|
stop = load_range(stop)
|
|
assert(step == 1) # We do not support loop steps other than 1
|
|
step = load_range(step)
|
|
loop_ranges[i] = (start, stop, step)
|
|
|
|
if config.DEBUG_ARRAY_OPT:
|
|
print("call_parallel_gufunc loop_ranges[{}] = ".format(i), start,
|
|
stop, step)
|
|
cgutils.printf(builder, "loop range[{}]: %d %d (%d)\n".format(i),
|
|
start, stop, step)
|
|
|
|
# Commonly used LLVM types and constants
|
|
byte_t = llvmlite.ir.IntType(8)
|
|
byte_ptr_t = llvmlite.ir.PointerType(byte_t)
|
|
byte_ptr_ptr_t = llvmlite.ir.PointerType(byte_ptr_t)
|
|
intp_t = context.get_value_type(types.intp)
|
|
uintp_t = context.get_value_type(types.uintp)
|
|
intp_ptr_t = llvmlite.ir.PointerType(intp_t)
|
|
intp_ptr_ptr_t = llvmlite.ir.PointerType(intp_ptr_t)
|
|
uintp_ptr_t = llvmlite.ir.PointerType(uintp_t)
|
|
uintp_ptr_ptr_t = llvmlite.ir.PointerType(uintp_ptr_t)
|
|
zero = context.get_constant(types.uintp, 0)
|
|
one = context.get_constant(types.uintp, 1)
|
|
one_type = one.type
|
|
sizeof_intp = context.get_abi_sizeof(intp_t)
|
|
|
|
# Prepare sched, first pop it out of expr_args, outer_sig, and gu_signature
|
|
expr_args.pop(0)
|
|
sched_sig = sin.pop(0)
|
|
|
|
if config.DEBUG_ARRAY_OPT:
|
|
print("Parfor has potentially negative start", index_var_typ.signed)
|
|
|
|
if index_var_typ.signed:
|
|
sched_type = intp_t
|
|
sched_ptr_type = intp_ptr_t
|
|
sched_ptr_ptr_type = intp_ptr_ptr_t
|
|
else:
|
|
sched_type = uintp_t
|
|
sched_ptr_type = uintp_ptr_t
|
|
sched_ptr_ptr_type = uintp_ptr_ptr_t
|
|
|
|
# Call do_scheduling with appropriate arguments
|
|
dim_starts = cgutils.alloca_once(
|
|
builder, sched_type, size=context.get_constant(
|
|
types.uintp, num_dim), name="dim_starts")
|
|
dim_stops = cgutils.alloca_once(
|
|
builder, sched_type, size=context.get_constant(
|
|
types.uintp, num_dim), name="dim_stops")
|
|
for i in range(num_dim):
|
|
start, stop, step = loop_ranges[i]
|
|
if start.type != one_type:
|
|
start = builder.sext(start, one_type)
|
|
if stop.type != one_type:
|
|
stop = builder.sext(stop, one_type)
|
|
if step.type != one_type:
|
|
step = builder.sext(step, one_type)
|
|
# substract 1 because do-scheduling takes inclusive ranges
|
|
stop = builder.sub(stop, one)
|
|
builder.store(
|
|
start, builder.gep(
|
|
dim_starts, [
|
|
context.get_constant(
|
|
types.uintp, i)]))
|
|
builder.store(stop, builder.gep(dim_stops,
|
|
[context.get_constant(types.uintp, i)]))
|
|
|
|
# Prepare to call get/set parallel_chunksize and get the number of threads.
|
|
get_chunksize = cgutils.get_or_insert_function(
|
|
builder.module,
|
|
llvmlite.ir.FunctionType(uintp_t, []),
|
|
name="get_parallel_chunksize")
|
|
|
|
set_chunksize = cgutils.get_or_insert_function(
|
|
builder.module,
|
|
llvmlite.ir.FunctionType(llvmlite.ir.VoidType(), [uintp_t]),
|
|
name="set_parallel_chunksize")
|
|
|
|
get_num_threads = cgutils.get_or_insert_function(
|
|
builder.module,
|
|
llvmlite.ir.FunctionType(llvmlite.ir.IntType(types.intp.bitwidth), []),
|
|
"get_num_threads")
|
|
|
|
# Get the current number of threads.
|
|
num_threads = builder.call(get_num_threads, [])
|
|
# Get the current chunksize so we can use it and restore the value later.
|
|
current_chunksize = builder.call(get_chunksize, [])
|
|
|
|
with cgutils.if_unlikely(builder, builder.icmp_signed('<=', num_threads,
|
|
num_threads.type(0))):
|
|
cgutils.printf(builder, "num_threads: %d\n", num_threads)
|
|
context.call_conv.return_user_exc(builder, RuntimeError,
|
|
("Invalid number of threads. "
|
|
"This likely indicates a bug in Numba.",))
|
|
|
|
# Call get_sched_size from gufunc_scheduler.cpp that incorporates the size of the work,
|
|
# the number of threads and the selected chunk size. This will tell us how many entries
|
|
# in the schedule we will need.
|
|
get_sched_size_fnty = llvmlite.ir.FunctionType(uintp_t, [uintp_t, uintp_t, intp_ptr_t, intp_ptr_t])
|
|
get_sched_size = cgutils.get_or_insert_function(
|
|
builder.module,
|
|
get_sched_size_fnty,
|
|
name="get_sched_size")
|
|
num_divisions = builder.call(get_sched_size, [num_threads,
|
|
context.get_constant(types.uintp, num_dim),
|
|
dim_starts,
|
|
dim_stops])
|
|
# Set the chunksize to zero so that any nested calls get the default chunk size behavior.
|
|
builder.call(set_chunksize, [zero])
|
|
|
|
# Each entry in the schedule is 2 times the number of dimensions long.
|
|
multiplier = context.get_constant(types.uintp, num_dim * 2)
|
|
# Compute the total number of entries in the schedule.
|
|
sched_size = builder.mul(num_divisions, multiplier)
|
|
|
|
# Prepare to dynamically allocate memory to hold the schedule.
|
|
alloc_sched_fnty = llvmlite.ir.FunctionType(sched_ptr_type, [uintp_t])
|
|
alloc_sched_func = cgutils.get_or_insert_function(
|
|
builder.module,
|
|
alloc_sched_fnty,
|
|
name="allocate_sched")
|
|
# Call gufunc_scheduler.cpp to allocate the schedule.
|
|
# This may or may not do pooling.
|
|
alloc_space = builder.call(alloc_sched_func, [sched_size])
|
|
# Allocate a slot in the entry block to store the schedule pointer.
|
|
sched = cgutils.alloca_once(builder, sched_ptr_type)
|
|
# Store the schedule pointer into that slot.
|
|
builder.store(alloc_space, sched)
|
|
|
|
debug_flag = 1 if config.DEBUG_ARRAY_OPT else 0
|
|
scheduling_fnty = llvmlite.ir.FunctionType(
|
|
intp_ptr_t, [uintp_t, intp_ptr_t, intp_ptr_t, uintp_t, sched_ptr_type, intp_t])
|
|
if index_var_typ.signed:
|
|
do_scheduling = cgutils.get_or_insert_function(builder.module,
|
|
scheduling_fnty,
|
|
name="do_scheduling_signed")
|
|
else:
|
|
do_scheduling = cgutils.get_or_insert_function(builder.module,
|
|
scheduling_fnty,
|
|
name="do_scheduling_unsigned")
|
|
|
|
# Call the scheduling routine that decides how to break up the work.
|
|
builder.call(
|
|
do_scheduling, [
|
|
context.get_constant(
|
|
types.uintp, num_dim), dim_starts, dim_stops, num_divisions,
|
|
builder.load(sched), context.get_constant(
|
|
types.intp, debug_flag)])
|
|
|
|
# Get the LLVM vars for the Numba IR reduction array vars.
|
|
redarrs = [lowerer.loadvar(redarrdict[x].name) for x in redvars]
|
|
|
|
nredvars = len(redvars)
|
|
ninouts = len(expr_args) - nredvars
|
|
|
|
def load_potential_tuple_var(x):
|
|
"""Given a variable name, if that variable is not a new name
|
|
introduced as the extracted part of a tuple then just return
|
|
the variable loaded from its name. However, if the variable
|
|
does represent part of a tuple, as recognized by the name of
|
|
the variable being present in the exp_name_to_tuple_var dict,
|
|
then we load the original tuple var instead that we get from
|
|
the dict and then extract the corresponding element of the
|
|
tuple, also stored and returned to use in the dict (i.e., offset).
|
|
"""
|
|
if x in exp_name_to_tuple_var:
|
|
orig_tup, offset = exp_name_to_tuple_var[x]
|
|
tup_var = lowerer.loadvar(orig_tup)
|
|
res = builder.extract_value(tup_var, offset)
|
|
return res
|
|
else:
|
|
return lowerer.loadvar(x)
|
|
|
|
# ----------------------------------------------------------------------------
|
|
# Prepare arguments: args, shapes, steps, data
|
|
all_args = [load_potential_tuple_var(x) for x in expr_args[:ninouts]] + redarrs
|
|
num_args = len(all_args)
|
|
num_inps = len(sin) + 1
|
|
args = cgutils.alloca_once(
|
|
builder,
|
|
byte_ptr_t,
|
|
size=context.get_constant(
|
|
types.intp,
|
|
1 + num_args),
|
|
name="pargs")
|
|
array_strides = []
|
|
# sched goes first
|
|
builder.store(builder.bitcast(builder.load(sched), byte_ptr_t), args)
|
|
array_strides.append(context.get_constant(types.intp, sizeof_intp))
|
|
rv_to_arg_dict = {}
|
|
# followed by other arguments
|
|
for i in range(num_args):
|
|
arg = all_args[i]
|
|
var = expr_args[i]
|
|
aty = expr_arg_types[i]
|
|
dst = builder.gep(args, [context.get_constant(types.intp, i + 1)])
|
|
if i >= ninouts: # reduction variables
|
|
ary = context.make_array(aty)(context, builder, arg)
|
|
strides = cgutils.unpack_tuple(builder, ary.strides, aty.ndim)
|
|
# Start from 1 because we skip the first dimension of length num_threads just like sched.
|
|
for j in range(len(strides)):
|
|
array_strides.append(strides[j])
|
|
builder.store(builder.bitcast(ary.data, byte_ptr_t), dst)
|
|
elif isinstance(aty, types.ArrayCompatible):
|
|
if var in races:
|
|
typ = (context.get_data_type(aty.dtype)
|
|
if aty.dtype != types.boolean
|
|
else llvmlite.ir.IntType(1))
|
|
|
|
rv_arg = cgutils.alloca_once(builder, typ)
|
|
builder.store(arg, rv_arg)
|
|
builder.store(builder.bitcast(rv_arg, byte_ptr_t), dst)
|
|
rv_to_arg_dict[var] = (arg, rv_arg)
|
|
|
|
array_strides.append(context.get_constant(types.intp, context.get_abi_sizeof(typ)))
|
|
else:
|
|
ary = context.make_array(aty)(context, builder, arg)
|
|
strides = cgutils.unpack_tuple(builder, ary.strides, aty.ndim)
|
|
for j in range(len(strides)):
|
|
array_strides.append(strides[j])
|
|
builder.store(builder.bitcast(ary.data, byte_ptr_t), dst)
|
|
else:
|
|
if i < num_inps:
|
|
# Scalar input, need to store the value in an array of size 1
|
|
if isinstance(aty, types.Optional):
|
|
# Unpack optional type
|
|
unpacked_aty = aty.type
|
|
arg = context.cast(builder, arg, aty, unpacked_aty)
|
|
else:
|
|
unpacked_aty = aty
|
|
typ = (context.get_data_type(unpacked_aty)
|
|
if not isinstance(unpacked_aty, types.Boolean)
|
|
else llvmlite.ir.IntType(1))
|
|
ptr = cgutils.alloca_once(builder, typ)
|
|
builder.store(arg, ptr)
|
|
else:
|
|
# Scalar output, must allocate
|
|
typ = (context.get_data_type(aty)
|
|
if not isinstance(aty, types.Boolean)
|
|
else llvmlite.ir.IntType(1))
|
|
ptr = cgutils.alloca_once(builder, typ)
|
|
builder.store(builder.bitcast(ptr, byte_ptr_t), dst)
|
|
|
|
# ----------------------------------------------------------------------------
|
|
# Next, we prepare the individual dimension info recorded in gu_signature
|
|
sig_dim_dict = {}
|
|
occurrences = []
|
|
occurrences = [sched_sig[0]]
|
|
sig_dim_dict[sched_sig[0]] = context.get_constant(types.intp, 2 * num_dim)
|
|
assert len(expr_args) == len(all_args)
|
|
assert len(expr_args) == len(expr_arg_types)
|
|
assert len(expr_args) == len(sin + sout)
|
|
assert len(expr_args) == len(outer_sig.args[1:])
|
|
for var, arg, aty, gu_sig in zip(expr_args, all_args,
|
|
expr_arg_types, sin + sout):
|
|
if isinstance(aty, types.npytypes.Array):
|
|
i = aty.ndim - len(gu_sig)
|
|
else:
|
|
i = 0
|
|
if config.DEBUG_ARRAY_OPT:
|
|
print("var =", var, "gu_sig =", gu_sig, "type =", aty, "i =", i)
|
|
|
|
for dim_sym in gu_sig:
|
|
if config.DEBUG_ARRAY_OPT:
|
|
print("var = ", var, " type = ", aty)
|
|
if var in races:
|
|
sig_dim_dict[dim_sym] = context.get_constant(types.intp, 1)
|
|
else:
|
|
ary = context.make_array(aty)(context, builder, arg)
|
|
shapes = cgutils.unpack_tuple(builder, ary.shape, aty.ndim)
|
|
sig_dim_dict[dim_sym] = shapes[i]
|
|
|
|
if not (dim_sym in occurrences):
|
|
if config.DEBUG_ARRAY_OPT:
|
|
print("dim_sym = ", dim_sym, ", i = ", i)
|
|
cgutils.printf(builder, dim_sym + " = %d\n", sig_dim_dict[dim_sym])
|
|
occurrences.append(dim_sym)
|
|
i = i + 1
|
|
|
|
# ----------------------------------------------------------------------------
|
|
# Prepare shapes, which is a single number (outer loop size), followed by
|
|
# the size of individual shape variables.
|
|
nshapes = len(sig_dim_dict) + 1
|
|
shapes = cgutils.alloca_once(builder, intp_t, size=nshapes, name="pshape")
|
|
# For now, outer loop size is the same as number of threads
|
|
builder.store(num_divisions, shapes)
|
|
# Individual shape variables go next
|
|
i = 1
|
|
for dim_sym in occurrences:
|
|
if config.DEBUG_ARRAY_OPT:
|
|
cgutils.printf(builder, dim_sym + " = %d\n", sig_dim_dict[dim_sym])
|
|
builder.store(
|
|
sig_dim_dict[dim_sym], builder.gep(
|
|
shapes, [
|
|
context.get_constant(
|
|
types.intp, i)]))
|
|
i = i + 1
|
|
|
|
# ----------------------------------------------------------------------------
|
|
# Prepare steps for each argument. Note that all steps are counted in
|
|
# bytes.
|
|
num_steps = num_args + 1 + len(array_strides)
|
|
steps = cgutils.alloca_once(
|
|
builder, intp_t, size=context.get_constant(
|
|
types.intp, num_steps), name="psteps")
|
|
# First goes the step size for sched, which is 2 * num_dim
|
|
builder.store(context.get_constant(types.intp, 2 * num_dim * sizeof_intp),
|
|
steps)
|
|
# The steps for all others are 0, except for reduction results.
|
|
for i in range(num_args):
|
|
# steps are strides from one thread to the next
|
|
stepsize = zero
|
|
|
|
dst = builder.gep(steps, [context.get_constant(types.intp, 1 + i)])
|
|
builder.store(stepsize, dst)
|
|
for j in range(len(array_strides)):
|
|
dst = builder.gep(
|
|
steps, [
|
|
context.get_constant(
|
|
types.intp, 1 + num_args + j)])
|
|
builder.store(array_strides[j], dst)
|
|
|
|
# ----------------------------------------------------------------------------
|
|
# prepare data
|
|
data = cgutils.get_null_value(byte_ptr_t)
|
|
|
|
fnty = llvmlite.ir.FunctionType(llvmlite.ir.VoidType(),
|
|
[byte_ptr_ptr_t, intp_ptr_t,
|
|
intp_ptr_t, byte_ptr_t])
|
|
|
|
fn = cgutils.get_or_insert_function(builder.module, fnty, wrapper_name)
|
|
context.active_code_library.add_linking_library(info.library)
|
|
|
|
if config.DEBUG_ARRAY_OPT:
|
|
cgutils.printf(builder, "before calling kernel %p\n", fn)
|
|
builder.call(fn, [args, shapes, steps, data])
|
|
if config.DEBUG_ARRAY_OPT:
|
|
cgutils.printf(builder, "after calling kernel %p\n", fn)
|
|
|
|
builder.call(set_chunksize, [current_chunksize])
|
|
|
|
# Deallocate the schedule's memory.
|
|
dealloc_sched_fnty = llvmlite.ir.FunctionType(llvmlite.ir.VoidType(), [sched_ptr_type])
|
|
dealloc_sched_func = cgutils.get_or_insert_function(
|
|
builder.module,
|
|
dealloc_sched_fnty,
|
|
name="deallocate_sched")
|
|
builder.call(dealloc_sched_func, [builder.load(sched)])
|
|
|
|
for k, v in rv_to_arg_dict.items():
|
|
arg, rv_arg = v
|
|
only_elem_ptr = builder.gep(rv_arg, [context.get_constant(types.intp, 0)])
|
|
builder.store(builder.load(only_elem_ptr), lowerer.getvar(k))
|
|
|
|
context.active_code_library.add_linking_library(cres.library)
|