rec_att_head.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. # copyright (c) 2021 PaddlePaddle Authors. All Rights Reserve.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from __future__ import absolute_import
  15. from __future__ import division
  16. from __future__ import print_function
  17. import paddle
  18. import paddle.nn as nn
  19. import paddle.nn.functional as F
  20. import numpy as np
  21. class AttentionHead(nn.Layer):
  22. def __init__(self, in_channels, out_channels, hidden_size, **kwargs):
  23. super(AttentionHead, self).__init__()
  24. self.input_size = in_channels
  25. self.hidden_size = hidden_size
  26. self.num_classes = out_channels
  27. self.attention_cell = AttentionGRUCell(
  28. in_channels, hidden_size, out_channels, use_gru=False
  29. )
  30. self.generator = nn.Linear(hidden_size, out_channels)
  31. def _char_to_onehot(self, input_char, onehot_dim):
  32. input_ont_hot = F.one_hot(input_char, onehot_dim)
  33. return input_ont_hot
  34. def forward(self, inputs, targets=None, batch_max_length=25):
  35. batch_size = inputs.shape[0]
  36. num_steps = batch_max_length
  37. hidden = paddle.zeros((batch_size, self.hidden_size))
  38. output_hiddens = []
  39. if targets is not None:
  40. for i in range(num_steps):
  41. char_onehots = self._char_to_onehot(
  42. targets[:, i], onehot_dim=self.num_classes
  43. )
  44. (outputs, hidden), alpha = self.attention_cell(
  45. hidden, inputs, char_onehots
  46. )
  47. output_hiddens.append(paddle.unsqueeze(outputs, axis=1))
  48. output = paddle.concat(output_hiddens, axis=1)
  49. probs = self.generator(output)
  50. else:
  51. targets = paddle.zeros(shape=[batch_size], dtype="int32")
  52. probs = None
  53. char_onehots = None
  54. outputs = None
  55. alpha = None
  56. for i in range(num_steps):
  57. char_onehots = self._char_to_onehot(
  58. targets, onehot_dim=self.num_classes
  59. )
  60. (outputs, hidden), alpha = self.attention_cell(
  61. hidden, inputs, char_onehots
  62. )
  63. probs_step = self.generator(outputs)
  64. if probs is None:
  65. probs = paddle.unsqueeze(probs_step, axis=1)
  66. else:
  67. probs = paddle.concat(
  68. [probs, paddle.unsqueeze(probs_step, axis=1)], axis=1
  69. )
  70. next_input = probs_step.argmax(axis=1)
  71. targets = next_input
  72. if not self.training:
  73. probs = paddle.nn.functional.softmax(probs, axis=2)
  74. return probs
  75. class AttentionGRUCell(nn.Layer):
  76. def __init__(self, input_size, hidden_size, num_embeddings, use_gru=False):
  77. super(AttentionGRUCell, self).__init__()
  78. self.i2h = nn.Linear(input_size, hidden_size, bias_attr=False)
  79. self.h2h = nn.Linear(hidden_size, hidden_size)
  80. self.score = nn.Linear(hidden_size, 1, bias_attr=False)
  81. self.rnn = nn.GRUCell(
  82. input_size=input_size + num_embeddings, hidden_size=hidden_size
  83. )
  84. self.hidden_size = hidden_size
  85. def forward(self, prev_hidden, batch_H, char_onehots):
  86. batch_H_proj = self.i2h(batch_H)
  87. prev_hidden_proj = paddle.unsqueeze(self.h2h(prev_hidden), axis=1)
  88. res = paddle.add(batch_H_proj, prev_hidden_proj)
  89. res = paddle.tanh(res)
  90. e = self.score(res)
  91. alpha = F.softmax(e, axis=1)
  92. alpha = paddle.transpose(alpha, [0, 2, 1])
  93. context = paddle.squeeze(paddle.mm(alpha, batch_H), axis=1)
  94. concat_context = paddle.concat([context, char_onehots], 1)
  95. cur_hidden = self.rnn(concat_context, prev_hidden)
  96. return cur_hidden, alpha
  97. class AttentionLSTM(nn.Layer):
  98. def __init__(self, in_channels, out_channels, hidden_size, **kwargs):
  99. super(AttentionLSTM, self).__init__()
  100. self.input_size = in_channels
  101. self.hidden_size = hidden_size
  102. self.num_classes = out_channels
  103. self.attention_cell = AttentionLSTMCell(
  104. in_channels, hidden_size, out_channels, use_gru=False
  105. )
  106. self.generator = nn.Linear(hidden_size, out_channels)
  107. def _char_to_onehot(self, input_char, onehot_dim):
  108. input_ont_hot = F.one_hot(input_char, onehot_dim)
  109. return input_ont_hot
  110. def forward(self, inputs, targets=None, batch_max_length=25):
  111. batch_size = inputs.shape[0]
  112. num_steps = batch_max_length
  113. hidden = (
  114. paddle.zeros((batch_size, self.hidden_size)),
  115. paddle.zeros((batch_size, self.hidden_size)),
  116. )
  117. output_hiddens = []
  118. if targets is not None:
  119. for i in range(num_steps):
  120. # one-hot vectors for a i-th char
  121. char_onehots = self._char_to_onehot(
  122. targets[:, i], onehot_dim=self.num_classes
  123. )
  124. hidden, alpha = self.attention_cell(hidden, inputs, char_onehots)
  125. hidden = (hidden[1][0], hidden[1][1])
  126. output_hiddens.append(paddle.unsqueeze(hidden[0], axis=1))
  127. output = paddle.concat(output_hiddens, axis=1)
  128. probs = self.generator(output)
  129. else:
  130. targets = paddle.zeros(shape=[batch_size], dtype="int32")
  131. probs = None
  132. char_onehots = None
  133. alpha = None
  134. for i in range(num_steps):
  135. char_onehots = self._char_to_onehot(
  136. targets, onehot_dim=self.num_classes
  137. )
  138. hidden, alpha = self.attention_cell(hidden, inputs, char_onehots)
  139. probs_step = self.generator(hidden[0])
  140. hidden = (hidden[1][0], hidden[1][1])
  141. if probs is None:
  142. probs = paddle.unsqueeze(probs_step, axis=1)
  143. else:
  144. probs = paddle.concat(
  145. [probs, paddle.unsqueeze(probs_step, axis=1)], axis=1
  146. )
  147. next_input = probs_step.argmax(axis=1)
  148. targets = next_input
  149. if not self.training:
  150. probs = paddle.nn.functional.softmax(probs, axis=2)
  151. return probs
  152. class AttentionLSTMCell(nn.Layer):
  153. def __init__(self, input_size, hidden_size, num_embeddings, use_gru=False):
  154. super(AttentionLSTMCell, self).__init__()
  155. self.i2h = nn.Linear(input_size, hidden_size, bias_attr=False)
  156. self.h2h = nn.Linear(hidden_size, hidden_size)
  157. self.score = nn.Linear(hidden_size, 1, bias_attr=False)
  158. if not use_gru:
  159. self.rnn = nn.LSTMCell(
  160. input_size=input_size + num_embeddings, hidden_size=hidden_size
  161. )
  162. else:
  163. self.rnn = nn.GRUCell(
  164. input_size=input_size + num_embeddings, hidden_size=hidden_size
  165. )
  166. self.hidden_size = hidden_size
  167. def forward(self, prev_hidden, batch_H, char_onehots):
  168. batch_H_proj = self.i2h(batch_H)
  169. prev_hidden_proj = paddle.unsqueeze(self.h2h(prev_hidden[0]), axis=1)
  170. res = paddle.add(batch_H_proj, prev_hidden_proj)
  171. res = paddle.tanh(res)
  172. e = self.score(res)
  173. alpha = F.softmax(e, axis=1)
  174. alpha = paddle.transpose(alpha, [0, 2, 1])
  175. context = paddle.squeeze(paddle.mm(alpha, batch_H), axis=1)
  176. concat_context = paddle.concat([context, char_onehots], 1)
  177. cur_hidden = self.rnn(concat_context, prev_hidden)
  178. return cur_hidden, alpha