from sympy.concrete.summations import (Sum, summation) from sympy.core.basic import Basic from sympy.core.cache import cacheit from sympy.core.function import Lambda from sympy.core.numbers import I from sympy.core.relational import (Eq, Ne) from sympy.core.singleton import S from sympy.core.symbol import (Dummy, symbols) from sympy.core.sympify import sympify from sympy.functions.combinatorial.factorials import factorial from sympy.functions.elementary.exponential import exp from sympy.functions.elementary.integers import floor from sympy.functions.elementary.piecewise import Piecewise from sympy.logic.boolalg import And from sympy.polys.polytools import poly from sympy.series.series import series from sympy.polys.polyerrors import PolynomialError from sympy.stats.crv import reduce_rational_inequalities_wrap from sympy.stats.rv import (NamedArgsMixin, SinglePSpace, SingleDomain, random_symbols, PSpace, ConditionalDomain, RandomDomain, ProductDomain, Distribution) from sympy.stats.symbolic_probability import Probability from sympy.sets.fancysets import Range, FiniteSet from sympy.sets.sets import Union from sympy.sets.contains import Contains from sympy.utilities import filldedent from sympy.core.sympify import _sympify class DiscreteDistribution(Distribution): def __call__(self, *args): return self.pdf(*args) class SingleDiscreteDistribution(DiscreteDistribution, NamedArgsMixin): """ Discrete distribution of a single variable. Serves as superclass for PoissonDistribution etc.... Provides methods for pdf, cdf, and sampling See Also: sympy.stats.crv_types.* """ set = S.Integers def __new__(cls, *args): args = list(map(sympify, args)) return Basic.__new__(cls, *args) @staticmethod def check(*args): pass @cacheit def compute_cdf(self, **kwargs): """ Compute the CDF from the PDF. Returns a Lambda. """ x = symbols('x', integer=True, cls=Dummy) z = symbols('z', real=True, cls=Dummy) left_bound = self.set.inf # CDF is integral of PDF from left bound to z pdf = self.pdf(x) cdf = summation(pdf, (x, left_bound, floor(z)), **kwargs) # CDF Ensure that CDF left of left_bound is zero cdf = Piecewise((cdf, z >= left_bound), (0, True)) return Lambda(z, cdf) def _cdf(self, x): return None def cdf(self, x, **kwargs): """ Cumulative density function """ if not kwargs: cdf = self._cdf(x) if cdf is not None: return cdf return self.compute_cdf(**kwargs)(x) @cacheit def compute_characteristic_function(self, **kwargs): """ Compute the characteristic function from the PDF. Returns a Lambda. """ x, t = symbols('x, t', real=True, cls=Dummy) pdf = self.pdf(x) cf = summation(exp(I*t*x)*pdf, (x, self.set.inf, self.set.sup)) return Lambda(t, cf) def _characteristic_function(self, t): return None def characteristic_function(self, t, **kwargs): """ Characteristic function """ if not kwargs: cf = self._characteristic_function(t) if cf is not None: return cf return self.compute_characteristic_function(**kwargs)(t) @cacheit def compute_moment_generating_function(self, **kwargs): t = Dummy('t', real=True) x = Dummy('x', integer=True) pdf = self.pdf(x) mgf = summation(exp(t*x)*pdf, (x, self.set.inf, self.set.sup)) return Lambda(t, mgf) def _moment_generating_function(self, t): return None def moment_generating_function(self, t, **kwargs): if not kwargs: mgf = self._moment_generating_function(t) if mgf is not None: return mgf return self.compute_moment_generating_function(**kwargs)(t) @cacheit def compute_quantile(self, **kwargs): """ Compute the Quantile from the PDF. Returns a Lambda. """ x = Dummy('x', integer=True) p = Dummy('p', real=True) left_bound = self.set.inf pdf = self.pdf(x) cdf = summation(pdf, (x, left_bound, x), **kwargs) set = ((x, p <= cdf), ) return Lambda(p, Piecewise(*set)) def _quantile(self, x): return None def quantile(self, x, **kwargs): """ Cumulative density function """ if not kwargs: quantile = self._quantile(x) if quantile is not None: return quantile return self.compute_quantile(**kwargs)(x) def expectation(self, expr, var, evaluate=True, **kwargs): """ Expectation of expression over distribution """ # TODO: support discrete sets with non integer stepsizes if evaluate: try: p = poly(expr, var) t = Dummy('t', real=True) mgf = self.moment_generating_function(t) deg = p.degree() taylor = poly(series(mgf, t, 0, deg + 1).removeO(), t) result = 0 for k in range(deg+1): result += p.coeff_monomial(var ** k) * taylor.coeff_monomial(t ** k) * factorial(k) return result except PolynomialError: return summation(expr * self.pdf(var), (var, self.set.inf, self.set.sup), **kwargs) else: return Sum(expr * self.pdf(var), (var, self.set.inf, self.set.sup), **kwargs) def __call__(self, *args): return self.pdf(*args) class DiscreteDomain(RandomDomain): """ A domain with discrete support with step size one. Represented using symbols and Range. """ is_Discrete = True class SingleDiscreteDomain(DiscreteDomain, SingleDomain): def as_boolean(self): return Contains(self.symbol, self.set) class ConditionalDiscreteDomain(DiscreteDomain, ConditionalDomain): """ Domain with discrete support of step size one, that is restricted by some condition. """ @property def set(self): rv = self.symbols if len(self.symbols) > 1: raise NotImplementedError(filldedent(''' Multivariate conditional domains are not yet implemented.''')) rv = list(rv)[0] return reduce_rational_inequalities_wrap(self.condition, rv).intersect(self.fulldomain.set) class DiscretePSpace(PSpace): is_real = True is_Discrete = True @property def pdf(self): return self.density(*self.symbols) def where(self, condition): rvs = random_symbols(condition) assert all(r.symbol in self.symbols for r in rvs) if len(rvs) > 1: raise NotImplementedError(filldedent('''Multivariate discrete random variables are not yet supported.''')) conditional_domain = reduce_rational_inequalities_wrap(condition, rvs[0]) conditional_domain = conditional_domain.intersect(self.domain.set) return SingleDiscreteDomain(rvs[0].symbol, conditional_domain) def probability(self, condition): complement = isinstance(condition, Ne) if complement: condition = Eq(condition.args[0], condition.args[1]) try: _domain = self.where(condition).set if condition == False or _domain is S.EmptySet: return S.Zero if condition == True or _domain == self.domain.set: return S.One prob = self.eval_prob(_domain) except NotImplementedError: from sympy.stats.rv import density expr = condition.lhs - condition.rhs dens = density(expr) if not isinstance(dens, DiscreteDistribution): from sympy.stats.drv_types import DiscreteDistributionHandmade dens = DiscreteDistributionHandmade(dens) z = Dummy('z', real=True) space = SingleDiscretePSpace(z, dens) prob = space.probability(condition.__class__(space.value, 0)) if prob is None: prob = Probability(condition) return prob if not complement else S.One - prob def eval_prob(self, _domain): sym = list(self.symbols)[0] if isinstance(_domain, Range): n = symbols('n', integer=True) inf, sup, step = (r for r in _domain.args) summand = ((self.pdf).replace( sym, n*step)) rv = summation(summand, (n, inf/step, (sup)/step - 1)).doit() return rv elif isinstance(_domain, FiniteSet): pdf = Lambda(sym, self.pdf) rv = sum(pdf(x) for x in _domain) return rv elif isinstance(_domain, Union): rv = sum(self.eval_prob(x) for x in _domain.args) return rv def conditional_space(self, condition): # XXX: Converting from set to tuple. The order matters to Lambda # though so we should be starting with a set... density = Lambda(tuple(self.symbols), self.pdf/self.probability(condition)) condition = condition.xreplace({rv: rv.symbol for rv in self.values}) domain = ConditionalDiscreteDomain(self.domain, condition) return DiscretePSpace(domain, density) class ProductDiscreteDomain(ProductDomain, DiscreteDomain): def as_boolean(self): return And(*[domain.as_boolean for domain in self.domains]) class SingleDiscretePSpace(DiscretePSpace, SinglePSpace): """ Discrete probability space over a single univariate variable """ is_real = True @property def set(self): return self.distribution.set @property def domain(self): return SingleDiscreteDomain(self.symbol, self.set) def sample(self, size=(), library='scipy', seed=None): """ Internal sample method. Returns dictionary mapping RandomSymbol to realization value. """ return {self.value: self.distribution.sample(size, library=library, seed=seed)} def compute_expectation(self, expr, rvs=None, evaluate=True, **kwargs): rvs = rvs or (self.value,) if self.value not in rvs: return expr expr = _sympify(expr) expr = expr.xreplace({rv: rv.symbol for rv in rvs}) x = self.value.symbol try: return self.distribution.expectation(expr, x, evaluate=evaluate, **kwargs) except NotImplementedError: return Sum(expr * self.pdf, (x, self.set.inf, self.set.sup), **kwargs) def compute_cdf(self, expr, **kwargs): if expr == self.value: x = Dummy("x", real=True) return Lambda(x, self.distribution.cdf(x, **kwargs)) else: raise NotImplementedError() def compute_density(self, expr, **kwargs): if expr == self.value: return self.distribution raise NotImplementedError() def compute_characteristic_function(self, expr, **kwargs): if expr == self.value: t = Dummy("t", real=True) return Lambda(t, self.distribution.characteristic_function(t, **kwargs)) else: raise NotImplementedError() def compute_moment_generating_function(self, expr, **kwargs): if expr == self.value: t = Dummy("t", real=True) return Lambda(t, self.distribution.moment_generating_function(t, **kwargs)) else: raise NotImplementedError() def compute_quantile(self, expr, **kwargs): if expr == self.value: p = Dummy("p", real=True) return Lambda(p, self.distribution.quantile(p, **kwargs)) else: raise NotImplementedError()