negative_binomial.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. # mypy: allow-untyped-defs
  2. from typing import Optional, Union
  3. import torch
  4. import torch.nn.functional as F
  5. from torch import Tensor
  6. from torch.distributions import constraints
  7. from torch.distributions.distribution import Distribution
  8. from torch.distributions.gamma import Gamma
  9. from torch.distributions.utils import (
  10. broadcast_all,
  11. lazy_property,
  12. logits_to_probs,
  13. probs_to_logits,
  14. )
  15. __all__ = ["NegativeBinomial"]
  16. class NegativeBinomial(Distribution):
  17. r"""
  18. Creates a Negative Binomial distribution, i.e. distribution
  19. of the number of successful independent and identical Bernoulli trials
  20. before :attr:`total_count` failures are achieved. The probability
  21. of success of each Bernoulli trial is :attr:`probs`.
  22. Args:
  23. total_count (float or Tensor): non-negative number of negative Bernoulli
  24. trials to stop, although the distribution is still valid for real
  25. valued count
  26. probs (Tensor): Event probabilities of success in the half open interval [0, 1)
  27. logits (Tensor): Event log-odds for probabilities of success
  28. """
  29. # pyrefly: ignore [bad-override]
  30. arg_constraints = {
  31. "total_count": constraints.greater_than_eq(0),
  32. "probs": constraints.half_open_interval(0.0, 1.0),
  33. "logits": constraints.real,
  34. }
  35. support = constraints.nonnegative_integer
  36. def __init__(
  37. self,
  38. total_count: Union[Tensor, float],
  39. probs: Optional[Tensor] = None,
  40. logits: Optional[Tensor] = None,
  41. validate_args: Optional[bool] = None,
  42. ) -> None:
  43. if (probs is None) == (logits is None):
  44. raise ValueError(
  45. "Either `probs` or `logits` must be specified, but not both."
  46. )
  47. if probs is not None:
  48. (
  49. self.total_count,
  50. # pyrefly: ignore [read-only]
  51. self.probs,
  52. ) = broadcast_all(total_count, probs)
  53. self.total_count = self.total_count.type_as(self.probs)
  54. else:
  55. assert logits is not None # helps mypy
  56. (
  57. self.total_count,
  58. # pyrefly: ignore [read-only]
  59. self.logits,
  60. ) = broadcast_all(total_count, logits)
  61. self.total_count = self.total_count.type_as(self.logits)
  62. self._param = self.probs if probs is not None else self.logits
  63. batch_shape = self._param.size()
  64. super().__init__(batch_shape, validate_args=validate_args)
  65. def expand(self, batch_shape, _instance=None):
  66. new = self._get_checked_instance(NegativeBinomial, _instance)
  67. batch_shape = torch.Size(batch_shape)
  68. new.total_count = self.total_count.expand(batch_shape)
  69. if "probs" in self.__dict__:
  70. new.probs = self.probs.expand(batch_shape)
  71. new._param = new.probs
  72. if "logits" in self.__dict__:
  73. new.logits = self.logits.expand(batch_shape)
  74. new._param = new.logits
  75. super(NegativeBinomial, new).__init__(batch_shape, validate_args=False)
  76. new._validate_args = self._validate_args
  77. return new
  78. def _new(self, *args, **kwargs):
  79. return self._param.new(*args, **kwargs)
  80. @property
  81. def mean(self) -> Tensor:
  82. return self.total_count * torch.exp(self.logits)
  83. @property
  84. def mode(self) -> Tensor:
  85. return ((self.total_count - 1) * self.logits.exp()).floor().clamp(min=0.0)
  86. @property
  87. def variance(self) -> Tensor:
  88. return self.mean / torch.sigmoid(-self.logits)
  89. @lazy_property
  90. def logits(self) -> Tensor:
  91. return probs_to_logits(self.probs, is_binary=True)
  92. @lazy_property
  93. def probs(self) -> Tensor:
  94. return logits_to_probs(self.logits, is_binary=True)
  95. @property
  96. def param_shape(self) -> torch.Size:
  97. return self._param.size()
  98. @lazy_property
  99. def _gamma(self) -> Gamma:
  100. # Note we avoid validating because self.total_count can be zero.
  101. return Gamma(
  102. concentration=self.total_count,
  103. rate=torch.exp(-self.logits),
  104. validate_args=False,
  105. )
  106. def sample(self, sample_shape=torch.Size()):
  107. with torch.no_grad():
  108. rate = self._gamma.sample(sample_shape=sample_shape)
  109. return torch.poisson(rate)
  110. def log_prob(self, value):
  111. if self._validate_args:
  112. self._validate_sample(value)
  113. log_unnormalized_prob = self.total_count * F.logsigmoid(
  114. -self.logits
  115. ) + value * F.logsigmoid(self.logits)
  116. log_normalization = (
  117. -torch.lgamma(self.total_count + value)
  118. + torch.lgamma(1.0 + value)
  119. + torch.lgamma(self.total_count)
  120. )
  121. # The case self.total_count == 0 and value == 0 has probability 1 but
  122. # lgamma(0) is infinite. Handle this case separately using a function
  123. # that does not modify tensors in place to allow Jit compilation.
  124. log_normalization = log_normalization.masked_fill(
  125. self.total_count + value == 0.0, 0.0
  126. )
  127. return log_unnormalized_prob - log_normalization