dirichlet.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. # mypy: allow-untyped-defs
  2. from typing import Optional
  3. import torch
  4. from torch import Tensor
  5. from torch.autograd import Function
  6. from torch.autograd.function import once_differentiable
  7. from torch.distributions import constraints
  8. from torch.distributions.exp_family import ExponentialFamily
  9. from torch.types import _size
  10. __all__ = ["Dirichlet"]
  11. # This helper is exposed for testing.
  12. def _Dirichlet_backward(x, concentration, grad_output):
  13. total = concentration.sum(-1, True).expand_as(concentration)
  14. grad = torch._dirichlet_grad(x, concentration, total)
  15. return grad * (grad_output - (x * grad_output).sum(-1, True))
  16. class _Dirichlet(Function):
  17. @staticmethod
  18. def forward(ctx, concentration):
  19. x = torch._sample_dirichlet(concentration)
  20. ctx.save_for_backward(x, concentration)
  21. return x
  22. @staticmethod
  23. @once_differentiable
  24. def backward(ctx, grad_output):
  25. x, concentration = ctx.saved_tensors
  26. return _Dirichlet_backward(x, concentration, grad_output)
  27. class Dirichlet(ExponentialFamily):
  28. r"""
  29. Creates a Dirichlet distribution parameterized by concentration :attr:`concentration`.
  30. Example::
  31. >>> # xdoctest: +IGNORE_WANT("non-deterministic")
  32. >>> m = Dirichlet(torch.tensor([0.5, 0.5]))
  33. >>> m.sample() # Dirichlet distributed with concentration [0.5, 0.5]
  34. tensor([ 0.1046, 0.8954])
  35. Args:
  36. concentration (Tensor): concentration parameter of the distribution
  37. (often referred to as alpha)
  38. """
  39. arg_constraints = {
  40. "concentration": constraints.independent(constraints.positive, 1)
  41. }
  42. support = constraints.simplex
  43. has_rsample = True
  44. def __init__(
  45. self,
  46. concentration: Tensor,
  47. validate_args: Optional[bool] = None,
  48. ) -> None:
  49. if concentration.dim() < 1:
  50. raise ValueError(
  51. "`concentration` parameter must be at least one-dimensional."
  52. )
  53. self.concentration = concentration
  54. batch_shape, event_shape = concentration.shape[:-1], concentration.shape[-1:]
  55. super().__init__(batch_shape, event_shape, validate_args=validate_args)
  56. def expand(self, batch_shape, _instance=None):
  57. new = self._get_checked_instance(Dirichlet, _instance)
  58. batch_shape = torch.Size(batch_shape)
  59. new.concentration = self.concentration.expand(batch_shape + self.event_shape)
  60. super(Dirichlet, new).__init__(
  61. batch_shape, self.event_shape, validate_args=False
  62. )
  63. new._validate_args = self._validate_args
  64. return new
  65. def rsample(self, sample_shape: _size = ()) -> Tensor:
  66. shape = self._extended_shape(sample_shape)
  67. concentration = self.concentration.expand(shape)
  68. return _Dirichlet.apply(concentration)
  69. def log_prob(self, value):
  70. if self._validate_args:
  71. self._validate_sample(value)
  72. return (
  73. torch.xlogy(self.concentration - 1.0, value).sum(-1)
  74. + torch.lgamma(self.concentration.sum(-1))
  75. - torch.lgamma(self.concentration).sum(-1)
  76. )
  77. @property
  78. def mean(self) -> Tensor:
  79. return self.concentration / self.concentration.sum(-1, True)
  80. @property
  81. def mode(self) -> Tensor:
  82. concentrationm1 = (self.concentration - 1).clamp(min=0.0)
  83. mode = concentrationm1 / concentrationm1.sum(-1, True)
  84. mask = (self.concentration < 1).all(dim=-1)
  85. mode[mask] = torch.nn.functional.one_hot(
  86. mode[mask].argmax(dim=-1), concentrationm1.shape[-1]
  87. ).to(mode)
  88. return mode
  89. @property
  90. def variance(self) -> Tensor:
  91. con0 = self.concentration.sum(-1, True)
  92. return (
  93. self.concentration
  94. * (con0 - self.concentration)
  95. / (con0.pow(2) * (con0 + 1))
  96. )
  97. def entropy(self):
  98. k = self.concentration.size(-1)
  99. a0 = self.concentration.sum(-1)
  100. return (
  101. torch.lgamma(self.concentration).sum(-1)
  102. - torch.lgamma(a0)
  103. - (k - a0) * torch.digamma(a0)
  104. - ((self.concentration - 1.0) * torch.digamma(self.concentration)).sum(-1)
  105. )
  106. @property
  107. def _natural_params(self) -> tuple[Tensor]:
  108. return (self.concentration,)
  109. def _log_normalizer(self, x):
  110. return x.lgamma().sum(-1) - torch.lgamma(x.sum(-1))