Source code for claf.tokens.elmo

"""
This code is from allenai/allennlp
(https://github.com/allenai/allennlp/blob/master/allennlp/modules/elmo.py)
"""

import json
import logging
from typing import Union, List, Dict, Any, Optional, Tuple
import warnings

import numpy
from overrides import overrides
import torch
from torch.nn.utils.rnn import PackedSequence, pad_packed_sequence
from torch.nn.modules import Dropout


with warnings.catch_warnings():  # pragma: no cover
    warnings.filterwarnings("ignore", category=FutureWarning)
    import h5py

from claf.modules.layer import Highway, ScalarMix
from claf.modules.encoder import _EncoderBase, LstmCellWithProjection


logger = logging.getLogger(__name__)  # pylint: disable=invalid-name

# pylint: disable=attribute-defined-outside-init


[docs]class Elmo(torch.nn.Module): # pragma: no cover """ Compute ELMo representations using a pre-trained bidirectional language model. See "Deep contextualized word representations", Peters et al. for details. This module takes character id input and computes ``num_output_representations`` different layers of ELMo representations. Typically ``num_output_representations`` is 1 or 2. For example, in the case of the SRL model in the above paper, ``num_output_representations=1`` where ELMo was included at the input token representation layer. In the case of the SQuAD model, ``num_output_representations=2`` as ELMo was also included at the GRU output layer. In the implementation below, we learn separate scalar weights for each output layer, but only run the biLM once on each input sequence for efficiency. Parameters ---------- options_file : ``str``, required. ELMo JSON options file weight_file : ``str``, required. ELMo hdf5 weight file num_output_representations: ``int``, required. The number of ELMo representation layers to output. requires_grad: ``bool``, optional If True, compute gradient of ELMo parameters for fine tuning. do_layer_norm : ``bool``, optional, (default=False). Should we apply layer normalization (passed to ``ScalarMix``)? dropout : ``float``, optional, (default = 0.5). The dropout to be applied to the ELMo representations. vocab_to_cache : ``List[str]``, optional, (default = 0.5). A list of words to pre-compute and cache character convolutions for. If you use this option, Elmo expects that you pass word indices of shape (batch_size, timesteps) to forward, instead of character indices. If you use this option and pass a word which wasn't pre-cached, this will break. module : ``torch.nn.Module``, optional, (default = None). If provided, then use this module instead of the pre-trained ELMo biLM. If using this option, then pass ``None`` for both ``options_file`` and ``weight_file``. The module must provide a public attribute ``num_layers`` with the number of internal layers and its ``forward`` method must return a ``dict`` with ``activations`` and ``mask`` keys (see `_ElmoBilm`` for an example). Note that ``requires_grad`` is also ignored with this option. """ def __init__( self, options_file: str, weight_file: str, num_output_representations: int, requires_grad: bool = False, do_layer_norm: bool = False, dropout: float = 0.5, vocab_to_cache: List[str] = None, module: torch.nn.Module = None, ) -> None: super(Elmo, self).__init__() logging.info("Initializing ELMo") if module is not None: if options_file is not None or weight_file is not None: raise ValueError("Don't provide options_file or weight_file with module") self._elmo_lstm = module else: self._elmo_lstm = _ElmoBiLm( options_file, weight_file, requires_grad=requires_grad, vocab_to_cache=vocab_to_cache, ) self._has_cached_vocab = vocab_to_cache is not None self._dropout = Dropout(p=dropout) self._scalar_mixes: Any = [] for k in range(num_output_representations): scalar_mix = ScalarMix(self._elmo_lstm.num_layers, do_layer_norm=do_layer_norm) self.add_module("scalar_mix_{}".format(k), scalar_mix) self._scalar_mixes.append(scalar_mix)
[docs] def get_output_dim(self): return self._elmo_lstm.get_output_dim()
[docs] def forward( self, inputs: torch.Tensor, word_inputs: torch.Tensor = None, # pylint: disable=arguments-differ ) -> Dict[str, Union[torch.Tensor, List[torch.Tensor]]]: """ Parameters ---------- inputs: ``torch.Tensor``, required. Shape ``(batch_size, timesteps, 50)`` of character ids representing the current batch. word_inputs : ``torch.Tensor``, required. If you passed a cached vocab, you can in addition pass a tensor of shape ``(batch_size, timesteps)``, which represent word ids which have been pre-cached. Returns ------- Dict with keys: ``'elmo_representations'``: ``List[torch.Tensor]`` A ``num_output_representations`` list of ELMo representations for the input sequence. Each representation is shape ``(batch_size, timesteps, embedding_dim)`` ``'mask'``: ``torch.Tensor`` Shape ``(batch_size, timesteps)`` long tensor with sequence mask. """ # reshape the input if needed original_shape = inputs.size() if len(original_shape) > 3: timesteps, num_characters = original_shape[-2:] reshaped_inputs = inputs.view(-1, timesteps, num_characters) else: reshaped_inputs = inputs if word_inputs is not None: original_word_size = word_inputs.size() if self._has_cached_vocab and len(original_word_size) > 2: reshaped_word_inputs = word_inputs.view(-1, original_word_size[-1]) logger.warning( "Word inputs were passed to ELMo but it does not have a cached vocab." ) reshaped_word_inputs = None else: reshaped_word_inputs = word_inputs else: reshaped_word_inputs = word_inputs # run the biLM bilm_output = self._elmo_lstm(reshaped_inputs, reshaped_word_inputs) layer_activations = bilm_output["activations"] mask_with_bos_eos = bilm_output["mask"] # compute the elmo representations representations = [] for i in range(len(self._scalar_mixes)): scalar_mix = getattr(self, "scalar_mix_{}".format(i)) representation_with_bos_eos = scalar_mix(layer_activations, mask_with_bos_eos) representation_without_bos_eos, mask_without_bos_eos = remove_sentence_boundaries( representation_with_bos_eos, mask_with_bos_eos ) representations.append(self._dropout(representation_without_bos_eos)) # reshape if necessary if word_inputs is not None and len(original_word_size) > 2: mask = mask_without_bos_eos.view(original_word_size) elmo_representations = [ representation.view(original_word_size + (-1,)) for representation in representations ] elif len(original_shape) > 3: mask = mask_without_bos_eos.view(original_shape[:-1]) elmo_representations = [ representation.view(original_shape[:-1] + (-1,)) for representation in representations ] else: mask = mask_without_bos_eos elmo_representations = representations return {"elmo_representations": elmo_representations, "mask": mask}
[docs] @classmethod def from_params(cls, params) -> "Elmo": # Add files to archive params.add_file_to_archive("options_file") params.add_file_to_archive("weight_file") options_file = params.pop("options_file") weight_file = params.pop("weight_file") requires_grad = params.pop("requires_grad", False) num_output_representations = params.pop("num_output_representations") do_layer_norm = params.pop_bool("do_layer_norm", False) dropout = params.pop_float("dropout", 0.5) params.assert_empty(cls.__name__) return cls( options_file=options_file, weight_file=weight_file, num_output_representations=num_output_representations, requires_grad=requires_grad, do_layer_norm=do_layer_norm, dropout=dropout, )
[docs]def remove_sentence_boundaries( tensor: torch.Tensor, mask: torch.Tensor ) -> Tuple[torch.Tensor, torch.Tensor]: # pragma: no cover """ Remove begin/end of sentence embeddings from the batch of sentences. Given a batch of sentences with size ``(batch_size, timesteps, dim)`` this returns a tensor of shape ``(batch_size, timesteps - 2, dim)`` after removing the beginning and end sentence markers. The sentences are assumed to be padded on the right, with the beginning of each sentence assumed to occur at index 0 (i.e., ``mask[:, 0]`` is assumed to be 1). Returns both the new tensor and updated mask. This function is the inverse of ``add_sentence_boundary_token_ids``. Parameters ---------- tensor : ``torch.Tensor`` A tensor of shape ``(batch_size, timesteps, dim)`` mask : ``torch.Tensor`` A tensor of shape ``(batch_size, timesteps)`` Returns ------- tensor_without_boundary_tokens : ``torch.Tensor`` The tensor after removing the boundary tokens of shape ``(batch_size, timesteps - 2, dim)`` new_mask : ``torch.Tensor`` The new mask for the tensor of shape ``(batch_size, timesteps - 2)``. """ # TODO: matthewp, profile this transfer sequence_lengths = mask.sum(dim=1).detach().cpu().numpy() tensor_shape = list(tensor.data.shape) new_shape = list(tensor_shape) new_shape[1] = tensor_shape[1] - 2 tensor_without_boundary_tokens = tensor.new_zeros(*new_shape) new_mask = tensor.new_zeros((new_shape[0], new_shape[1]), dtype=torch.long) for i, j in enumerate(sequence_lengths): if j > 2: tensor_without_boundary_tokens[i, : (j - 2), :] = tensor[i, 1 : (j - 1), :] new_mask[i, : (j - 2)] = 1 return tensor_without_boundary_tokens, new_mask
class _ElmoBiLm(torch.nn.Module): # pragma: no cover """ Run a pre-trained bidirectional language model, outputing the activations at each layer for weighting together into an ELMo representation (with ``allennlp.modules.seq2seq_encoders.Elmo``). This is a lower level class, useful for advanced uses, but most users should use ``allennlp.modules.seq2seq_encoders.Elmo`` directly. Parameters ---------- options_file : ``str`` ELMo JSON options file weight_file : ``str`` ELMo hdf5 weight file requires_grad: ``bool``, optional If True, compute gradient of ELMo parameters for fine tuning. vocab_to_cache : ``List[str]``, optional, (default = 0.5). A list of words to pre-compute and cache character convolutions for. If you use this option, _ElmoBiLm expects that you pass word indices of shape (batch_size, timesteps) to forward, instead of character indices. If you use this option and pass a word which wasn't pre-cached, this will break. """ def __init__( self, options_file: str, weight_file: str, requires_grad: bool = False, vocab_to_cache: List[str] = None, ) -> None: super(_ElmoBiLm, self).__init__() self._token_embedder = _ElmoCharacterEncoder( options_file, weight_file, requires_grad=requires_grad ) self._requires_grad = requires_grad if requires_grad and vocab_to_cache: logging.warning( "You are fine tuning ELMo and caching char CNN word vectors. " "This behaviour is not guaranteed to be well defined, particularly. " "if not all of your inputs will occur in the vocabulary cache." ) # This is an embedding, used to look up cached # word vectors built from character level cnn embeddings. self._word_embedding = None self._bos_embedding: torch.Tensor = None self._eos_embedding: torch.Tensor = None with open(options_file, "r") as fin: options = json.load(fin) if not options["lstm"].get("use_skip_connections"): raise ValueError("We only support pretrained biLMs with residual connections") self._elmo_lstm = ElmoLstm( input_size=options["lstm"]["projection_dim"], hidden_size=options["lstm"]["projection_dim"], cell_size=options["lstm"]["dim"], num_layers=options["lstm"]["n_layers"], memory_cell_clip_value=options["lstm"]["cell_clip"], state_projection_clip_value=options["lstm"]["proj_clip"], requires_grad=requires_grad, ) self._elmo_lstm.load_weights(weight_file) # Number of representation layers including context independent layer self.num_layers = options["lstm"]["n_layers"] + 1 def get_output_dim(self): return 2 * self._token_embedder.get_output_dim() def forward( self, inputs: torch.Tensor, word_inputs: torch.Tensor = None, # pylint: disable=arguments-differ ) -> Dict[str, Union[torch.Tensor, List[torch.Tensor]]]: """ Parameters ---------- inputs: ``torch.Tensor``, required. Shape ``(batch_size, timesteps, 50)`` of character ids representing the current batch. word_inputs : ``torch.Tensor``, required. If you passed a cached vocab, you can in addition pass a tensor of shape ``(batch_size, timesteps)``, which represent word ids which have been pre-cached. Returns ------- Dict with keys: ``'activations'``: ``List[torch.Tensor]`` A list of activations at each layer of the network, each of shape ``(batch_size, timesteps + 2, embedding_dim)`` ``'mask'``: ``torch.Tensor`` Shape ``(batch_size, timesteps + 2)`` long tensor with sequence mask. Note that the output tensors all include additional special begin and end of sequence markers. """ if self._word_embedding is not None and word_inputs is not None: try: mask_without_bos_eos = (word_inputs > 0).long() # The character cnn part is cached - just look it up. embedded_inputs = self._word_embedding(word_inputs) # type: ignore # shape (batch_size, timesteps + 2, embedding_dim) type_representation, mask = add_sentence_boundary_token_ids( embedded_inputs, mask_without_bos_eos, self._bos_embedding, self._eos_embedding ) except RuntimeError: # Back off to running the character convolutions, # as we might not have the words in the cache. token_embedding = self._token_embedder(inputs) mask = token_embedding["mask"] type_representation = token_embedding["token_embedding"] else: token_embedding = self._token_embedder(inputs) mask = token_embedding["mask"] type_representation = token_embedding["token_embedding"] lstm_outputs = self._elmo_lstm(type_representation, mask) # Prepare the output. The first layer is duplicated. # Because of minor differences in how masking is applied depending # on whether the char cnn layers are cached, we'll be defensive and # multiply by the mask here. It's not strictly necessary, as the # mask passed on is correct, but the values in the padded areas # of the char cnn representations can change. output_tensors = [ torch.cat([type_representation, type_representation], dim=-1) * mask.float().unsqueeze(-1) ] for layer_activations in torch.chunk(lstm_outputs, lstm_outputs.size(0), dim=0): output_tensors.append(layer_activations.squeeze(0)) return {"activations": output_tensors, "mask": mask}
[docs]def add_sentence_boundary_token_ids( tensor: torch.Tensor, mask: torch.Tensor, sentence_begin_token: Any, sentence_end_token: Any ) -> Tuple[torch.Tensor, torch.Tensor]: # pragma: no cover """ Add begin/end of sentence tokens to the batch of sentences. Given a batch of sentences with size ``(batch_size, timesteps)`` or ``(batch_size, timesteps, dim)`` this returns a tensor of shape ``(batch_size, timesteps + 2)`` or ``(batch_size, timesteps + 2, dim)`` respectively. Returns both the new tensor and updated mask. Parameters ---------- tensor : ``torch.Tensor`` A tensor of shape ``(batch_size, timesteps)`` or ``(batch_size, timesteps, dim)`` mask : ``torch.Tensor`` A tensor of shape ``(batch_size, timesteps)`` sentence_begin_token: Any (anything that can be broadcast in torch for assignment) For 2D input, a scalar with the <S> id. For 3D input, a tensor with length dim. sentence_end_token: Any (anything that can be broadcast in torch for assignment) For 2D input, a scalar with the </S> id. For 3D input, a tensor with length dim. Returns ------- tensor_with_boundary_tokens : ``torch.Tensor`` The tensor with the appended and prepended boundary tokens. If the input was 2D, it has shape (batch_size, timesteps + 2) and if the input was 3D, it has shape (batch_size, timesteps + 2, dim). new_mask : ``torch.Tensor`` The new mask for the tensor, taking into account the appended tokens marking the beginning and end of the sentence. """ # TODO: matthewp, profile this transfer sequence_lengths = mask.sum(dim=1).detach().cpu().numpy() tensor_shape = list(tensor.data.shape) new_shape = list(tensor_shape) new_shape[1] = tensor_shape[1] + 2 tensor_with_boundary_tokens = tensor.new_zeros(*new_shape) if len(tensor_shape) == 2: tensor_with_boundary_tokens[:, 1:-1] = tensor tensor_with_boundary_tokens[:, 0] = sentence_begin_token for i, j in enumerate(sequence_lengths): tensor_with_boundary_tokens[i, j + 1] = sentence_end_token new_mask = (tensor_with_boundary_tokens != 0).long() elif len(tensor_shape) == 3: tensor_with_boundary_tokens[:, 1:-1, :] = tensor for i, j in enumerate(sequence_lengths): tensor_with_boundary_tokens[i, 0, :] = sentence_begin_token tensor_with_boundary_tokens[i, j + 1, :] = sentence_end_token new_mask = ((tensor_with_boundary_tokens > 0).long().sum(dim=-1) > 0).long() else: raise ValueError("add_sentence_boundary_token_ids only accepts 2D and 3D input") return tensor_with_boundary_tokens, new_mask
def _make_bos_eos( character: int, padding_character: int, beginning_of_word_character: int, end_of_word_character: int, max_word_length: int, ): # pragma: no cover char_ids = [padding_character] * max_word_length char_ids[0] = beginning_of_word_character char_ids[1] = character char_ids[2] = end_of_word_character return char_ids class _ElmoCharacterEncoder(torch.nn.Module): # pragma: no cover """ Compute context sensitive token representation using pretrained biLM. This embedder has input character ids of size (batch_size, sequence_length, 50) and returns (batch_size, sequence_length + 2, embedding_dim), where embedding_dim is specified in the options file (typically 512). We add special entries at the beginning and end of each sequence corresponding to <S> and </S>, the beginning and end of sentence tokens. Note: this is a lower level class useful for advanced usage. Most users should use ``ElmoTokenEmbedder`` or ``allennlp.modules.Elmo`` instead. Parameters ---------- options_file : ``str`` ELMo JSON options file weight_file : ``str`` ELMo hdf5 weight file requires_grad: ``bool``, optional If True, compute gradient of ELMo parameters for fine tuning. The relevant section of the options file is something like: .. example-code:: .. code-block:: python {'char_cnn': { 'activation': 'relu', 'embedding': {'dim': 4}, 'filters': [[1, 4], [2, 8], [3, 16], [4, 32], [5, 64]], 'max_characters_per_token': 50, 'n_characters': 262, 'n_highway': 2 } } """ def __init__(self, options_file: str, weight_file: str, requires_grad: bool = False) -> None: super(_ElmoCharacterEncoder, self).__init__() with open(options_file, "r") as fin: self._options = json.load(fin) self._weight_file = weight_file self.output_dim = self._options["lstm"]["projection_dim"] self.requires_grad = requires_grad self._load_weights() max_word_length = 50 # char ids 0-255 come from utf-8 encoding bytes # assign 256-300 to special chars beginning_of_sentence_character = 256 # <begin sentence> end_of_sentence_character = 257 # <end sentence> beginning_of_word_character = 258 # <begin word> end_of_word_character = 259 # <end word> padding_character = 260 # <padding> beginning_of_sentence_characters = _make_bos_eos( beginning_of_sentence_character, padding_character, beginning_of_word_character, end_of_word_character, max_word_length, ) end_of_sentence_characters = _make_bos_eos( end_of_sentence_character, padding_character, beginning_of_word_character, end_of_word_character, max_word_length, ) # Cache the arrays for use in forward -- +1 due to masking. self._beginning_of_sentence_characters = torch.from_numpy( numpy.array(beginning_of_sentence_characters) + 1 ) self._end_of_sentence_characters = torch.from_numpy( numpy.array(end_of_sentence_characters) + 1 ) def get_output_dim(self): return self.output_dim @overrides def forward( self, inputs: torch.Tensor ) -> Dict[str, torch.Tensor]: # pylint: disable=arguments-differ """ Compute context insensitive token embeddings for ELMo representations. Parameters ---------- inputs: ``torch.Tensor`` Shape ``(batch_size, sequence_length, 50)`` of character ids representing the current batch. Returns ------- Dict with keys: ``'token_embedding'``: ``torch.Tensor`` Shape ``(batch_size, sequence_length + 2, embedding_dim)`` tensor with context insensitive token representations. ``'mask'``: ``torch.Tensor`` Shape ``(batch_size, sequence_length + 2)`` long tensor with sequence mask. """ # Add BOS/EOS mask = ((inputs > 0).long().sum(dim=-1) > 0).long() character_ids_with_bos_eos, mask_with_bos_eos = add_sentence_boundary_token_ids( inputs, mask, self._beginning_of_sentence_characters, self._end_of_sentence_characters ) # the character id embedding max_chars_per_token = self._options["char_cnn"]["max_characters_per_token"] # (batch_size * sequence_length, max_chars_per_token, embed_dim) character_embedding = torch.nn.functional.embedding( character_ids_with_bos_eos.view(-1, max_chars_per_token), self._char_embedding_weights ) # run convolutions cnn_options = self._options["char_cnn"] if cnn_options["activation"] == "tanh": activation = torch.nn.functional.tanh elif cnn_options["activation"] == "relu": activation = torch.nn.functional.relu else: raise ValueError("Unknown activation") # (batch_size * sequence_length, embed_dim, max_chars_per_token) character_embedding = torch.transpose(character_embedding, 1, 2) convs = [] for i in range(len(self._convolutions)): conv = getattr(self, "char_conv_{}".format(i)) convolved = conv(character_embedding) # (batch_size * sequence_length, n_filters for this width) convolved, _ = torch.max(convolved, dim=-1) convolved = activation(convolved) convs.append(convolved) # (batch_size * sequence_length, n_filters) token_embedding = torch.cat(convs, dim=-1) # apply the highway layers (batch_size * sequence_length, n_filters) token_embedding = self._highways(token_embedding) # final projection (batch_size * sequence_length, embedding_dim) token_embedding = self._projection(token_embedding) # reshape to (batch_size, sequence_length, embedding_dim) batch_size, sequence_length, _ = character_ids_with_bos_eos.size() return { "mask": mask_with_bos_eos, "token_embedding": token_embedding.view(batch_size, sequence_length, -1), } def _load_weights(self): self._load_char_embedding() self._load_cnn_weights() self._load_highway() self._load_projection() def _load_char_embedding(self): with h5py.File(self._weight_file, "r") as fin: char_embed_weights = fin["char_embed"][...] weights = numpy.zeros( (char_embed_weights.shape[0] + 1, char_embed_weights.shape[1]), dtype="float32" ) weights[1:, :] = char_embed_weights self._char_embedding_weights = torch.nn.Parameter( torch.FloatTensor(weights), requires_grad=self.requires_grad ) def _load_cnn_weights(self): cnn_options = self._options["char_cnn"] filters = cnn_options["filters"] char_embed_dim = cnn_options["embedding"]["dim"] convolutions = [] for i, (width, num) in enumerate(filters): conv = torch.nn.Conv1d( in_channels=char_embed_dim, out_channels=num, kernel_size=width, bias=True ) # load the weights with h5py.File(self._weight_file, "r") as fin: weight = fin["CNN"]["W_cnn_{}".format(i)][...] bias = fin["CNN"]["b_cnn_{}".format(i)][...] w_reshaped = numpy.transpose(weight.squeeze(axis=0), axes=(2, 1, 0)) if w_reshaped.shape != tuple(conv.weight.data.shape): raise ValueError("Invalid weight file") conv.weight.data.copy_(torch.FloatTensor(w_reshaped)) conv.bias.data.copy_(torch.FloatTensor(bias)) conv.weight.requires_grad = self.requires_grad conv.bias.requires_grad = self.requires_grad convolutions.append(conv) self.add_module("char_conv_{}".format(i), conv) self._convolutions = convolutions def _load_highway(self): # pylint: disable=protected-access # the highway layers have same dimensionality as the number of cnn filters cnn_options = self._options["char_cnn"] filters = cnn_options["filters"] n_filters = sum(f[1] for f in filters) n_highway = cnn_options["n_highway"] # create the layers, and load the weights self._highways = Highway(n_filters, n_highway, activation=torch.nn.functional.relu) for k in range(n_highway): # The AllenNLP highway is one matrix multplication with concatenation of # transform and carry weights. with h5py.File(self._weight_file, "r") as fin: # The weights are transposed due to multiplication order assumptions in tf # vs pytorch (tf.matmul(X, W) vs pytorch.matmul(W, X)) w_transform = numpy.transpose(fin["CNN_high_{}".format(k)]["W_transform"][...]) # -1.0 since AllenNLP is g * x + (1 - g) * f(x) but tf is (1 - g) * x + g * f(x) w_carry = -1.0 * numpy.transpose(fin["CNN_high_{}".format(k)]["W_carry"][...]) weight = numpy.concatenate([w_transform, w_carry], axis=0) self._highways._layers[k].weight.data.copy_(torch.FloatTensor(weight)) self._highways._layers[k].weight.requires_grad = self.requires_grad b_transform = fin["CNN_high_{}".format(k)]["b_transform"][...] b_carry = -1.0 * fin["CNN_high_{}".format(k)]["b_carry"][...] bias = numpy.concatenate([b_transform, b_carry], axis=0) self._highways._layers[k].bias.data.copy_(torch.FloatTensor(bias)) self._highways._layers[k].bias.requires_grad = self.requires_grad def _load_projection(self): cnn_options = self._options["char_cnn"] filters = cnn_options["filters"] n_filters = sum(f[1] for f in filters) self._projection = torch.nn.Linear(n_filters, self.output_dim, bias=True) with h5py.File(self._weight_file, "r") as fin: weight = fin["CNN_proj"]["W_proj"][...] bias = fin["CNN_proj"]["b_proj"][...] self._projection.weight.data.copy_(torch.FloatTensor(numpy.transpose(weight))) self._projection.bias.data.copy_(torch.FloatTensor(bias)) self._projection.weight.requires_grad = self.requires_grad self._projection.bias.requires_grad = self.requires_grad
[docs]class ElmoLstm(_EncoderBase): # pragma: no cover """ A stacked, bidirectional LSTM which uses :class:`~allennlp.modules.lstm_cell_with_projection.LstmCellWithProjection`'s with highway layers between the inputs to layers. The inputs to the forward and backward directions are independent - forward and backward states are not concatenated between layers. Additionally, this LSTM maintains its `own` state, which is updated every time ``forward`` is called. It is dynamically resized for different batch sizes and is designed for use with non-continuous inputs (i.e inputs which aren't formatted as a stream, such as text used for a language modelling task, which is how stateful RNNs are typically used). This is non-standard, but can be thought of as having an "end of sentence" state, which is carried across different sentences. Parameters ---------- input_size : ``int``, required The dimension of the inputs to the LSTM. hidden_size : ``int``, required The dimension of the outputs of the LSTM. cell_size : ``int``, required. The dimension of the memory cell of the :class:`~allennlp.modules.lstm_cell_with_projection.LstmCellWithProjection`. num_layers : ``int``, required The number of bidirectional LSTMs to use. requires_grad: ``bool``, optional If True, compute gradient of ELMo parameters for fine tuning. recurrent_dropout_probability: ``float``, optional (default = 0.0) The dropout probability to be used in a dropout scheme as stated in `A Theoretically Grounded Application of Dropout in Recurrent Neural Networks <https://arxiv.org/abs/1512.05287>`_ . state_projection_clip_value: ``float``, optional, (default = None) The magnitude with which to clip the hidden_state after projecting it. memory_cell_clip_value: ``float``, optional, (default = None) The magnitude with which to clip the memory cell. """ def __init__( self, input_size: int, hidden_size: int, cell_size: int, num_layers: int, requires_grad: bool = False, recurrent_dropout_probability: float = 0.0, memory_cell_clip_value: Optional[float] = None, state_projection_clip_value: Optional[float] = None, ) -> None: super(ElmoLstm, self).__init__(stateful=True) # Required to be wrapped with a :class:`PytorchSeq2SeqWrapper`. self.input_size = input_size self.hidden_size = hidden_size self.num_layers = num_layers self.cell_size = cell_size self.requires_grad = requires_grad forward_layers = [] backward_layers = [] lstm_input_size = input_size go_forward = True for layer_index in range(num_layers): forward_layer = LstmCellWithProjection( lstm_input_size, hidden_size, cell_size, go_forward, recurrent_dropout_probability, memory_cell_clip_value, state_projection_clip_value, ) backward_layer = LstmCellWithProjection( lstm_input_size, hidden_size, cell_size, not go_forward, recurrent_dropout_probability, memory_cell_clip_value, state_projection_clip_value, ) lstm_input_size = hidden_size self.add_module("forward_layer_{}".format(layer_index), forward_layer) self.add_module("backward_layer_{}".format(layer_index), backward_layer) forward_layers.append(forward_layer) backward_layers.append(backward_layer) self.forward_layers = forward_layers self.backward_layers = backward_layers
[docs] def forward( self, inputs: torch.Tensor, mask: torch.LongTensor # pylint: disable=arguments-differ ) -> torch.Tensor: """ Parameters ---------- inputs : ``torch.Tensor``, required. A Tensor of shape ``(batch_size, sequence_length, hidden_size)``. mask : ``torch.LongTensor``, required. A binary mask of shape ``(batch_size, sequence_length)`` representing the non-padded elements in each sequence in the batch. Returns ------- A ``torch.Tensor`` of shape (num_layers, batch_size, sequence_length, hidden_size), where the num_layers dimension represents the LSTM output from that layer. """ batch_size, total_sequence_length = mask.size() stacked_sequence_output, final_states, restoration_indices = self.sort_and_run_forward( self._lstm_forward, inputs, mask ) num_layers, num_valid, returned_timesteps, encoder_dim = stacked_sequence_output.size() # Add back invalid rows which were removed in the call to sort_and_run_forward. if num_valid < batch_size: zeros = stacked_sequence_output.new_zeros( num_layers, batch_size - num_valid, returned_timesteps, encoder_dim ) stacked_sequence_output = torch.cat([stacked_sequence_output, zeros], 1) # The states also need to have invalid rows added back. new_states = [] for state in final_states: state_dim = state.size(-1) zeros = state.new_zeros(num_layers, batch_size - num_valid, state_dim) new_states.append(torch.cat([state, zeros], 1)) final_states = new_states # It's possible to need to pass sequences which are padded to longer than the # max length of the sequence to a Seq2StackEncoder. However, packing and unpacking # the sequences mean that the returned tensor won't include these dimensions, because # the RNN did not need to process them. We add them back on in the form of zeros here. sequence_length_difference = total_sequence_length - returned_timesteps if sequence_length_difference > 0: zeros = stacked_sequence_output.new_zeros( num_layers, batch_size, sequence_length_difference, stacked_sequence_output[0].size(-1), ) stacked_sequence_output = torch.cat([stacked_sequence_output, zeros], 2) self._update_states(final_states, restoration_indices) # Restore the original indices and return the sequence. # Has shape (num_layers, batch_size, sequence_length, hidden_size) return stacked_sequence_output.index_select(1, restoration_indices)
def _lstm_forward( self, inputs: PackedSequence, initial_state: Optional[Tuple[torch.Tensor, torch.Tensor]] = None, ) -> Tuple[torch.Tensor, Tuple[torch.Tensor, torch.Tensor]]: """ Parameters ---------- inputs : ``PackedSequence``, required. A batch first ``PackedSequence`` to run the stacked LSTM over. initial_state : ``Tuple[torch.Tensor, torch.Tensor]``, optional, (default = None) A tuple (state, memory) representing the initial hidden state and memory of the LSTM, with shape (num_layers, batch_size, 2 * hidden_size) and (num_layers, batch_size, 2 * cell_size) respectively. Returns ------- output_sequence : ``torch.FloatTensor`` The encoded sequence of shape (num_layers, batch_size, sequence_length, hidden_size) final_states: ``Tuple[torch.FloatTensor, torch.FloatTensor]`` The per-layer final (state, memory) states of the LSTM, with shape (num_layers, batch_size, 2 * hidden_size) and (num_layers, batch_size, 2 * cell_size) respectively. The last dimension is duplicated because it contains the state/memory for both the forward and backward layers. """ if initial_state is None: hidden_states: List[Optional[Tuple[torch.Tensor, torch.Tensor]]] = [None] * len( self.forward_layers ) elif initial_state[0].size()[0] != len(self.forward_layers): raise ValueError( "Initial states were passed to forward() but the number of " "initial states does not match the number of layers." ) else: hidden_states = list(zip(initial_state[0].split(1, 0), initial_state[1].split(1, 0))) inputs, batch_lengths = pad_packed_sequence(inputs, batch_first=True) forward_output_sequence = inputs backward_output_sequence = inputs final_states = [] sequence_outputs = [] for layer_index, state in enumerate(hidden_states): forward_layer = getattr(self, "forward_layer_{}".format(layer_index)) backward_layer = getattr(self, "backward_layer_{}".format(layer_index)) forward_cache = forward_output_sequence backward_cache = backward_output_sequence if state is not None: forward_hidden_state, backward_hidden_state = state[0].split(self.hidden_size, 2) forward_memory_state, backward_memory_state = state[1].split(self.cell_size, 2) forward_state = (forward_hidden_state, forward_memory_state) backward_state = (backward_hidden_state, backward_memory_state) else: forward_state = None backward_state = None forward_output_sequence, forward_state = forward_layer( forward_output_sequence, batch_lengths, forward_state ) backward_output_sequence, backward_state = backward_layer( backward_output_sequence, batch_lengths, backward_state ) # Skip connections, just adding the input to the output. if layer_index != 0: forward_output_sequence += forward_cache backward_output_sequence += backward_cache sequence_outputs.append( torch.cat([forward_output_sequence, backward_output_sequence], -1) ) # Append the state tuples in a list, so that we can return # the final states for all the layers. final_states.append( ( torch.cat([forward_state[0], backward_state[0]], -1), torch.cat([forward_state[1], backward_state[1]], -1), ) ) stacked_sequence_outputs: torch.FloatTensor = torch.stack(sequence_outputs) # Stack the hidden state and memory for each layer into 2 tensors of shape # (num_layers, batch_size, hidden_size) and (num_layers, batch_size, cell_size) # respectively. final_hidden_states, final_memory_states = zip(*final_states) final_state_tuple: Tuple[torch.FloatTensor, torch.FloatTensor] = ( torch.cat(final_hidden_states, 0), torch.cat(final_memory_states, 0), ) return stacked_sequence_outputs, final_state_tuple
[docs] def load_weights(self, weight_file: str) -> None: """ Load the pre-trained weights from the file. """ requires_grad = self.requires_grad with h5py.File(weight_file, "r") as fin: for i_layer, lstms in enumerate(zip(self.forward_layers, self.backward_layers)): for j_direction, lstm in enumerate(lstms): # lstm is an instance of LSTMCellWithProjection cell_size = lstm.cell_size dataset = fin["RNN_%s" % j_direction]["RNN"]["MultiRNNCell"][ "Cell%s" % i_layer ]["LSTMCell"] # tensorflow packs together both W and U matrices into one matrix, # but pytorch maintains individual matrices. In addition, tensorflow # packs the gates as input, memory, forget, output but pytorch # uses input, forget, memory, output. So we need to modify the weights. tf_weights = numpy.transpose(dataset["W_0"][...]) torch_weights = tf_weights.copy() # split the W from U matrices input_size = lstm.input_size input_weights = torch_weights[:, :input_size] recurrent_weights = torch_weights[:, input_size:] tf_input_weights = tf_weights[:, :input_size] tf_recurrent_weights = tf_weights[:, input_size:] # handle the different gate order convention for torch_w, tf_w in [ [input_weights, tf_input_weights], [recurrent_weights, tf_recurrent_weights], ]: torch_w[(1 * cell_size) : (2 * cell_size), :] = tf_w[ (2 * cell_size) : (3 * cell_size), : ] torch_w[(2 * cell_size) : (3 * cell_size), :] = tf_w[ (1 * cell_size) : (2 * cell_size), : ] lstm.input_linearity.weight.data.copy_(torch.FloatTensor(input_weights)) lstm.state_linearity.weight.data.copy_(torch.FloatTensor(recurrent_weights)) lstm.input_linearity.weight.requires_grad = requires_grad lstm.state_linearity.weight.requires_grad = requires_grad # the bias weights tf_bias = dataset["B"][...] # tensorflow adds 1.0 to forget gate bias instead of modifying the # parameters... tf_bias[(2 * cell_size) : (3 * cell_size)] += 1 torch_bias = tf_bias.copy() torch_bias[(1 * cell_size) : (2 * cell_size)] = tf_bias[ (2 * cell_size) : (3 * cell_size) ] torch_bias[(2 * cell_size) : (3 * cell_size)] = tf_bias[ (1 * cell_size) : (2 * cell_size) ] lstm.state_linearity.bias.data.copy_(torch.FloatTensor(torch_bias)) lstm.state_linearity.bias.requires_grad = requires_grad # the projection weights proj_weights = numpy.transpose(dataset["W_P_0"][...]) lstm.state_projection.weight.data.copy_(torch.FloatTensor(proj_weights)) lstm.state_projection.weight.requires_grad = requires_grad