binomial.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. # mypy: allow-untyped-defs
  2. from typing import Optional, Union
  3. import torch
  4. from torch import Tensor
  5. from torch.distributions import constraints
  6. from torch.distributions.distribution import Distribution
  7. from torch.distributions.utils import (
  8. broadcast_all,
  9. lazy_property,
  10. logits_to_probs,
  11. probs_to_logits,
  12. )
  13. __all__ = ["Binomial"]
  14. def _clamp_by_zero(x):
  15. # works like clamp(x, min=0) but has grad at 0 is 0.5
  16. return (x.clamp(min=0) + x - x.clamp(max=0)) / 2
  17. class Binomial(Distribution):
  18. r"""
  19. Creates a Binomial distribution parameterized by :attr:`total_count` and
  20. either :attr:`probs` or :attr:`logits` (but not both). :attr:`total_count` must be
  21. broadcastable with :attr:`probs`/:attr:`logits`.
  22. Example::
  23. >>> # xdoctest: +IGNORE_WANT("non-deterministic")
  24. >>> m = Binomial(100, torch.tensor([0 , .2, .8, 1]))
  25. >>> x = m.sample()
  26. tensor([ 0., 22., 71., 100.])
  27. >>> m = Binomial(torch.tensor([[5.], [10.]]), torch.tensor([0.5, 0.8]))
  28. >>> x = m.sample()
  29. tensor([[ 4., 5.],
  30. [ 7., 6.]])
  31. Args:
  32. total_count (int or Tensor): number of Bernoulli trials
  33. probs (Tensor): Event probabilities
  34. logits (Tensor): Event log-odds
  35. """
  36. # pyrefly: ignore [bad-override]
  37. arg_constraints = {
  38. "total_count": constraints.nonnegative_integer,
  39. "probs": constraints.unit_interval,
  40. "logits": constraints.real,
  41. }
  42. has_enumerate_support = True
  43. def __init__(
  44. self,
  45. total_count: Union[Tensor, int] = 1,
  46. probs: Optional[Tensor] = None,
  47. logits: Optional[Tensor] = None,
  48. validate_args: Optional[bool] = None,
  49. ) -> None:
  50. if (probs is None) == (logits is None):
  51. raise ValueError(
  52. "Either `probs` or `logits` must be specified, but not both."
  53. )
  54. if probs is not None:
  55. (
  56. self.total_count,
  57. # pyrefly: ignore [read-only]
  58. self.probs,
  59. ) = broadcast_all(total_count, probs)
  60. self.total_count = self.total_count.type_as(self.probs)
  61. else:
  62. assert logits is not None # helps mypy
  63. (
  64. self.total_count,
  65. # pyrefly: ignore [read-only]
  66. self.logits,
  67. ) = broadcast_all(total_count, logits)
  68. self.total_count = self.total_count.type_as(self.logits)
  69. self._param = self.probs if probs is not None else self.logits
  70. batch_shape = self._param.size()
  71. super().__init__(batch_shape, validate_args=validate_args)
  72. def expand(self, batch_shape, _instance=None):
  73. new = self._get_checked_instance(Binomial, _instance)
  74. batch_shape = torch.Size(batch_shape)
  75. new.total_count = self.total_count.expand(batch_shape)
  76. if "probs" in self.__dict__:
  77. new.probs = self.probs.expand(batch_shape)
  78. new._param = new.probs
  79. if "logits" in self.__dict__:
  80. new.logits = self.logits.expand(batch_shape)
  81. new._param = new.logits
  82. super(Binomial, new).__init__(batch_shape, validate_args=False)
  83. new._validate_args = self._validate_args
  84. return new
  85. def _new(self, *args, **kwargs):
  86. return self._param.new(*args, **kwargs)
  87. @constraints.dependent_property(is_discrete=True, event_dim=0)
  88. # pyrefly: ignore [bad-override]
  89. def support(self):
  90. return constraints.integer_interval(0, self.total_count)
  91. @property
  92. def mean(self) -> Tensor:
  93. return self.total_count * self.probs
  94. @property
  95. def mode(self) -> Tensor:
  96. return ((self.total_count + 1) * self.probs).floor().clamp(max=self.total_count)
  97. @property
  98. def variance(self) -> Tensor:
  99. return self.total_count * self.probs * (1 - self.probs)
  100. @lazy_property
  101. def logits(self) -> Tensor:
  102. return probs_to_logits(self.probs, is_binary=True)
  103. @lazy_property
  104. def probs(self) -> Tensor:
  105. return logits_to_probs(self.logits, is_binary=True)
  106. @property
  107. def param_shape(self) -> torch.Size:
  108. return self._param.size()
  109. def sample(self, sample_shape=torch.Size()):
  110. shape = self._extended_shape(sample_shape)
  111. with torch.no_grad():
  112. return torch.binomial(
  113. self.total_count.expand(shape), self.probs.expand(shape)
  114. )
  115. def log_prob(self, value):
  116. if self._validate_args:
  117. self._validate_sample(value)
  118. log_factorial_n = torch.lgamma(self.total_count + 1)
  119. log_factorial_k = torch.lgamma(value + 1)
  120. log_factorial_nmk = torch.lgamma(self.total_count - value + 1)
  121. # k * log(p) + (n - k) * log(1 - p) = k * (log(p) - log(1 - p)) + n * log(1 - p)
  122. # (case logit < 0) = k * logit - n * log1p(e^logit)
  123. # (case logit > 0) = k * logit - n * (log(p) - log(1 - p)) + n * log(p)
  124. # = k * logit - n * logit - n * log1p(e^-logit)
  125. # (merge two cases) = k * logit - n * max(logit, 0) - n * log1p(e^-|logit|)
  126. normalize_term = (
  127. self.total_count * _clamp_by_zero(self.logits)
  128. + self.total_count * torch.log1p(torch.exp(-torch.abs(self.logits)))
  129. - log_factorial_n
  130. )
  131. return (
  132. value * self.logits - log_factorial_k - log_factorial_nmk - normalize_term
  133. )
  134. def entropy(self):
  135. total_count = int(self.total_count.max())
  136. if not self.total_count.min() == total_count:
  137. raise NotImplementedError(
  138. "Inhomogeneous total count not supported by `entropy`."
  139. )
  140. log_prob = self.log_prob(self.enumerate_support(False))
  141. return -(torch.exp(log_prob) * log_prob).sum(0)
  142. def enumerate_support(self, expand=True):
  143. total_count = int(self.total_count.max())
  144. if not self.total_count.min() == total_count:
  145. raise NotImplementedError(
  146. "Inhomogeneous total count not supported by `enumerate_support`."
  147. )
  148. values = torch.arange(
  149. 1 + total_count, dtype=self._param.dtype, device=self._param.device
  150. )
  151. values = values.view((-1,) + (1,) * len(self._batch_shape))
  152. if expand:
  153. values = values.expand((-1,) + self._batch_shape)
  154. return values