ai-content-maker/.venv/Lib/site-packages/numba/core/withcontexts.py

536 lines
19 KiB
Python

from numba.core import types, errors, ir, sigutils, ir_utils
from numba.core.typing.typeof import typeof_impl
from numba.core.transforms import find_region_inout_vars
from numba.core.ir_utils import build_definitions
import numba
class WithContext(object):
"""A dummy object for use as contextmanager.
This can be used as a contextmanager.
"""
is_callable = False
def __enter__(self):
pass
def __exit__(self, typ, val, tb):
pass
def mutate_with_body(self, func_ir, blocks, blk_start, blk_end,
body_blocks, dispatcher_factory, extra):
"""Mutate the *blocks* to implement this contextmanager.
Parameters
----------
func_ir : FunctionIR
blocks : dict[ir.Block]
blk_start, blk_end : int
labels of the starting and ending block of the context-manager.
body_block: sequence[int]
A sequence of int's representing labels of the with-body
dispatcher_factory : callable
A callable that takes a `FunctionIR` and returns a `Dispatcher`.
"""
raise NotImplementedError
@typeof_impl.register(WithContext)
def typeof_contextmanager(val, c):
return types.ContextManager(val)
def _get_var_parent(name):
"""Get parent of the variable given its name
"""
# If not a temporary variable
if not name.startswith('$'):
# Return the base component of the name
return name.split('.', )[0]
def _clear_blocks(blocks, to_clear):
"""Remove keys in *to_clear* from *blocks*.
"""
for b in to_clear:
del blocks[b]
class _ByPassContextType(WithContext):
"""A simple context-manager that tells the compiler to bypass the body
of the with-block.
"""
def mutate_with_body(self, func_ir, blocks, blk_start, blk_end,
body_blocks, dispatcher_factory, extra):
assert extra is None
# Determine variables that need forwarding
vlt = func_ir.variable_lifetime
inmap = {_get_var_parent(k): k for k in vlt.livemap[blk_start]}
outmap = {_get_var_parent(k): k for k in vlt.livemap[blk_end]}
forwardvars = {inmap[k]: outmap[k] for k in filter(bool, outmap)}
# Transform the block
_bypass_with_context(blocks, blk_start, blk_end, forwardvars)
_clear_blocks(blocks, body_blocks)
bypass_context = _ByPassContextType()
class _CallContextType(WithContext):
"""A simple context-manager that tells the compiler to lift the body of the
with-block as another function.
"""
def mutate_with_body(self, func_ir, blocks, blk_start, blk_end,
body_blocks, dispatcher_factory, extra):
assert extra is None
vlt = func_ir.variable_lifetime
inputs, outputs = find_region_inout_vars(
blocks=blocks,
livemap=vlt.livemap,
callfrom=blk_start,
returnto=blk_end,
body_block_ids=set(body_blocks),
)
lifted_blks = {k: blocks[k] for k in body_blocks}
_mutate_with_block_callee(lifted_blks, blk_start, blk_end,
inputs, outputs)
# XXX: transform body-blocks to return the output variables
lifted_ir = func_ir.derive(
blocks=lifted_blks,
arg_names=tuple(inputs),
arg_count=len(inputs),
force_non_generator=True,
)
dispatcher = dispatcher_factory(lifted_ir)
newblk = _mutate_with_block_caller(
dispatcher, blocks, blk_start, blk_end, inputs, outputs,
)
blocks[blk_start] = newblk
_clear_blocks(blocks, body_blocks)
return dispatcher
call_context = _CallContextType()
class _ObjModeContextType(WithContext):
"""Creates a contextmanager to be used inside jitted functions to enter
*object-mode* for using interpreter features. The body of the with-context
is lifted into a function that is compiled in *object-mode*. This
transformation process is limited and cannot process all possible
Python code. However, users can wrap complicated logic in another
Python function, which will then be executed by the interpreter.
Use this as a function that takes keyword arguments only.
The argument names must correspond to the output variables from the
with-block. Their respective values can be:
1. strings representing the expected types; i.e. ``"float32"``.
2. compile-time bound global or nonlocal variables referring to the
expected type. The variables are read at compile time.
When exiting the with-context, the output variables are converted
to the expected nopython types according to the annotation. This process
is the same as passing Python objects into arguments of a nopython
function.
Example::
import numpy as np
from numba import njit, objmode, types
def bar(x):
# This code is executed by the interpreter.
return np.asarray(list(reversed(x.tolist())))
# Output type as global variable
out_ty = types.intp[:]
@njit
def foo():
x = np.arange(5)
y = np.zeros_like(x)
with objmode(y='intp[:]', z=out_ty): # annotate return type
# this region is executed by object-mode.
y += bar(x)
z = y
return y, z
.. note:: Known limitations:
- with-block cannot use incoming list objects.
- with-block cannot use incoming function objects.
- with-block cannot ``yield``, ``break``, ``return`` or ``raise`` \
such that the execution will leave the with-block immediately.
- with-block cannot contain `with` statements.
- random number generator states do not synchronize; i.e. \
nopython-mode and object-mode uses different RNG states.
.. note:: When used outside of no-python mode, the context-manager has no
effect.
.. warning:: This feature is experimental. The supported features may
change with or without notice.
"""
is_callable = True
def _legalize_args(self, func_ir, args, kwargs, loc, func_globals,
func_closures):
"""
Legalize arguments to the context-manager
Parameters
----------
func_ir: FunctionIR
args: tuple
Positional arguments to the with-context call as IR nodes.
kwargs: dict
Keyword arguments to the with-context call as IR nodes.
loc: numba.core.ir.Loc
Source location of the with-context call.
func_globals: dict
The globals dictionary of the calling function.
func_closures: dict
The resolved closure variables of the calling function.
"""
if args:
raise errors.CompilerError(
"objectmode context doesn't take any positional arguments",
)
typeanns = {}
def report_error(varname, msg, loc):
raise errors.CompilerError(
f"Error handling objmode argument {varname!r}. {msg}",
loc=loc,
)
for k, v in kwargs.items():
if isinstance(v, ir.Const) and isinstance(v.value, str):
typeanns[k] = sigutils._parse_signature_string(v.value)
elif isinstance(v, ir.FreeVar):
try:
v = func_closures[v.name]
except KeyError:
report_error(
varname=k,
msg=f"Freevar {v.name!r} is not defined.",
loc=loc,
)
typeanns[k] = v
elif isinstance(v, ir.Global):
try:
v = func_globals[v.name]
except KeyError:
report_error(
varname=k,
msg=f"Global {v.name!r} is not defined.",
loc=loc,
)
typeanns[k] = v
elif isinstance(v, ir.Expr) and v.op == "getattr":
try:
base_obj = func_ir.infer_constant(v.value)
typ = getattr(base_obj, v.attr)
except (errors.ConstantInferenceError, AttributeError):
report_error(
varname=k,
msg="Getattr cannot be resolved at compile-time.",
loc=loc,
)
else:
typeanns[k] = typ
else:
report_error(
varname=k,
msg=("The value must be a compile-time constant either as "
"a non-local variable or a getattr expression that "
"refers to a Numba type."),
loc=loc
)
# Legalize the types for objmode
for name, typ in typeanns.items():
self._legalize_arg_type(name, typ, loc)
return typeanns
def _legalize_arg_type(self, name, typ, loc):
"""Legalize the argument type
Parameters
----------
name: str
argument name.
typ: numba.core.types.Type
argument type.
loc: numba.core.ir.Loc
source location for error reporting.
"""
if getattr(typ, "reflected", False):
msgbuf = [
"Objmode context failed.",
f"Argument {name!r} is declared as "
f"an unsupported type: {typ}.",
f"Reflected types are not supported.",
]
raise errors.CompilerError(" ".join(msgbuf), loc=loc)
def mutate_with_body(self, func_ir, blocks, blk_start, blk_end,
body_blocks, dispatcher_factory, extra):
cellnames = func_ir.func_id.func.__code__.co_freevars
closures = func_ir.func_id.func.__closure__
func_globals = func_ir.func_id.func.__globals__
if closures is not None:
# Resolve free variables
func_closures = {}
for cellname, closure in zip(cellnames, closures):
try:
cellval = closure.cell_contents
except ValueError as e:
# empty cell will raise
if str(e) != "Cell is empty":
raise
else:
func_closures[cellname] = cellval
else:
# Missing closure object
func_closures = {}
args = extra['args'] if extra else ()
kwargs = extra['kwargs'] if extra else {}
typeanns = self._legalize_args(func_ir=func_ir,
args=args,
kwargs=kwargs,
loc=blocks[blk_start].loc,
func_globals=func_globals,
func_closures=func_closures,
)
vlt = func_ir.variable_lifetime
inputs, outputs = find_region_inout_vars(
blocks=blocks,
livemap=vlt.livemap,
callfrom=blk_start,
returnto=blk_end,
body_block_ids=set(body_blocks),
)
# Determine types in the output tuple
def strip_var_ver(x):
return x.split('.', 1)[0]
stripped_outs = list(map(strip_var_ver, outputs))
# Verify that only outputs are annotated
extra_annotated = set(typeanns) - set(stripped_outs)
if extra_annotated:
msg = (
'Invalid type annotation on non-outgoing variables: {}.'
'Suggestion: remove annotation of the listed variables'
)
raise errors.TypingError(msg.format(extra_annotated))
# Verify that all outputs are annotated
# Note on "$cp" variable:
# ``transforms.consolidate_multi_exit_withs()`` introduces the variable
# for the control-point to determine the correct exit block. This
# variable crosses the with-region boundary. Thus, it will be consider
# an output variable leaving the lifted with-region.
typeanns["$cp"] = types.int32
not_annotated = set(stripped_outs) - set(typeanns)
if not_annotated:
msg = (
'Missing type annotation on outgoing variable(s): {0}\n\n'
'Example code: with objmode({1}=\'<'
'add_type_as_string_here>\')\n'
)
stable_ann = sorted(not_annotated)
raise errors.TypingError(msg.format(stable_ann, stable_ann[0]))
# Get output types
outtup = types.Tuple([typeanns[v] for v in stripped_outs])
lifted_blks = {k: blocks[k] for k in body_blocks}
_mutate_with_block_callee(lifted_blks, blk_start, blk_end,
inputs, outputs)
lifted_ir = func_ir.derive(
blocks=lifted_blks,
arg_names=tuple(inputs),
arg_count=len(inputs),
force_non_generator=True,
)
dispatcher = dispatcher_factory(lifted_ir, objectmode=True,
output_types=outtup)
newblk = _mutate_with_block_caller(
dispatcher, blocks, blk_start, blk_end, inputs, outputs,
)
blocks[blk_start] = newblk
_clear_blocks(blocks, body_blocks)
return dispatcher
def __call__(self, *args, **kwargs):
# No effect when used in pure-python
return self
objmode_context = _ObjModeContextType()
def _bypass_with_context(blocks, blk_start, blk_end, forwardvars):
"""Given the starting and ending block of the with-context,
replaces the head block with a new block that jumps to the end.
*blocks* is modified inplace.
"""
sblk = blocks[blk_start]
scope = sblk.scope
loc = sblk.loc
newblk = ir.Block(scope=scope, loc=loc)
for k, v in forwardvars.items():
newblk.append(ir.Assign(value=scope.get_exact(k),
target=scope.get_exact(v),
loc=loc))
newblk.append(ir.Jump(target=blk_end, loc=loc))
blocks[blk_start] = newblk
def _mutate_with_block_caller(dispatcher, blocks, blk_start, blk_end,
inputs, outputs):
"""Make a new block that calls into the lifeted with-context.
Parameters
----------
dispatcher : Dispatcher
blocks : dict[ir.Block]
blk_start, blk_end : int
labels of the starting and ending block of the context-manager.
inputs: sequence[str]
Input variable names
outputs: sequence[str]
Output variable names
"""
sblk = blocks[blk_start]
scope = sblk.scope
loc = sblk.loc
newblock = ir.Block(scope=scope, loc=loc)
ir_utils.fill_block_with_call(
newblock=newblock,
callee=dispatcher,
label_next=blk_end,
inputs=inputs,
outputs=outputs,
)
return newblock
def _mutate_with_block_callee(blocks, blk_start, blk_end, inputs, outputs):
"""Mutate *blocks* for the callee of a with-context.
Parameters
----------
blocks : dict[ir.Block]
blk_start, blk_end : int
labels of the starting and ending block of the context-manager.
inputs: sequence[str]
Input variable names
outputs: sequence[str]
Output variable names
"""
if not blocks:
raise errors.NumbaValueError("No blocks in with-context block")
head_blk = min(blocks)
temp_blk = blocks[head_blk]
scope = temp_blk.scope
loc = temp_blk.loc
blocks[blk_start] = ir_utils.fill_callee_prologue(
block=ir.Block(scope=scope, loc=loc),
inputs=inputs,
label_next=head_blk,
)
blocks[blk_end] = ir_utils.fill_callee_epilogue(
block=ir.Block(scope=scope, loc=loc),
outputs=outputs,
)
class _ParallelChunksize(WithContext):
is_callable = True
"""A context-manager that on entry stores the current chunksize
for the executing parfors and then changes the current chunksize
to the programmer specified value. On exit the original
chunksize is restored.
"""
def mutate_with_body(self, func_ir, blocks, blk_start, blk_end,
body_blocks, dispatcher_factory, extra):
ir_utils.dprint_func_ir(func_ir, "Before with changes", blocks=blocks)
assert extra is not None
args = extra["args"]
assert len(args) == 1
arg = args[0]
scope = blocks[blk_start].scope
loc = blocks[blk_start].loc
if isinstance(arg, ir.Arg):
arg = ir.Var(scope, arg.name, loc)
set_state = []
restore_state = []
# global for Numba itself
gvar = scope.redefine("$ngvar", loc)
set_state.append(ir.Assign(ir.Global('numba', numba, loc), gvar, loc))
# getattr for set chunksize function in Numba
spcattr = ir.Expr.getattr(gvar, 'set_parallel_chunksize', loc)
spcvar = scope.redefine("$spc", loc)
set_state.append(ir.Assign(spcattr, spcvar, loc))
# call set_parallel_chunksize
orig_pc_var = scope.redefine("$save_pc", loc)
cs_var = scope.redefine("$cs_var", loc)
set_state.append(ir.Assign(arg, cs_var, loc))
spc_call = ir.Expr.call(spcvar, [cs_var], (), loc)
set_state.append(ir.Assign(spc_call, orig_pc_var, loc))
restore_spc_call = ir.Expr.call(spcvar, [orig_pc_var], (), loc)
restore_state.append(ir.Assign(restore_spc_call, orig_pc_var, loc))
blocks[blk_start].body = (blocks[blk_start].body[1:-1] +
set_state +
[blocks[blk_start].body[-1]])
blocks[blk_end].body = restore_state + blocks[blk_end].body
func_ir._definitions = build_definitions(blocks)
ir_utils.dprint_func_ir(func_ir, "After with changes", blocks=blocks)
def __call__(self, *args, **kwargs):
"""Act like a function and enforce the contract that
setting the chunksize takes only one integer input.
"""
if len(args) != 1 or kwargs or not isinstance(args[0], int):
raise ValueError("parallel_chunksize takes only a "
"single integer argument.")
self.chunksize = args[0]
return self
def __enter__(self):
self.orig_chunksize = numba.get_parallel_chunksize()
numba.set_parallel_chunksize(self.chunksize)
def __exit__(self, typ, val, tb):
numba.set_parallel_chunksize(self.orig_chunksize)
parallel_chunksize = _ParallelChunksize()