"""
This code is from allenai/allennlp
(https://github.com/allenai/allennlp/blob/master/allennlp/modules/elmo.py)
"""
import json
import logging
from typing import Union, List, Dict, Any, Optional, Tuple
import warnings
import numpy
from overrides import overrides
import torch
from torch.nn.utils.rnn import PackedSequence, pad_packed_sequence
from torch.nn.modules import Dropout
with warnings.catch_warnings(): # pragma: no cover
warnings.filterwarnings("ignore", category=FutureWarning)
import h5py
from claf.modules.layer import Highway, ScalarMix
from claf.modules.encoder import _EncoderBase, LstmCellWithProjection
logger = logging.getLogger(__name__) # pylint: disable=invalid-name
# pylint: disable=attribute-defined-outside-init
[docs]class Elmo(torch.nn.Module): # pragma: no cover
"""
Compute ELMo representations using a pre-trained bidirectional language model.
See "Deep contextualized word representations", Peters et al. for details.
This module takes character id input and computes ``num_output_representations`` different layers
of ELMo representations. Typically ``num_output_representations`` is 1 or 2. For example, in
the case of the SRL model in the above paper, ``num_output_representations=1`` where ELMo was included at
the input token representation layer. In the case of the SQuAD model, ``num_output_representations=2``
as ELMo was also included at the GRU output layer.
In the implementation below, we learn separate scalar weights for each output layer,
but only run the biLM once on each input sequence for efficiency.
Parameters
----------
options_file : ``str``, required.
ELMo JSON options file
weight_file : ``str``, required.
ELMo hdf5 weight file
num_output_representations: ``int``, required.
The number of ELMo representation layers to output.
requires_grad: ``bool``, optional
If True, compute gradient of ELMo parameters for fine tuning.
do_layer_norm : ``bool``, optional, (default=False).
Should we apply layer normalization (passed to ``ScalarMix``)?
dropout : ``float``, optional, (default = 0.5).
The dropout to be applied to the ELMo representations.
vocab_to_cache : ``List[str]``, optional, (default = 0.5).
A list of words to pre-compute and cache character convolutions
for. If you use this option, Elmo expects that you pass word
indices of shape (batch_size, timesteps) to forward, instead
of character indices. If you use this option and pass a word which
wasn't pre-cached, this will break.
module : ``torch.nn.Module``, optional, (default = None).
If provided, then use this module instead of the pre-trained ELMo biLM.
If using this option, then pass ``None`` for both ``options_file``
and ``weight_file``. The module must provide a public attribute
``num_layers`` with the number of internal layers and its ``forward``
method must return a ``dict`` with ``activations`` and ``mask`` keys
(see `_ElmoBilm`` for an example). Note that ``requires_grad`` is also
ignored with this option.
"""
def __init__(
self,
options_file: str,
weight_file: str,
num_output_representations: int,
requires_grad: bool = False,
do_layer_norm: bool = False,
dropout: float = 0.5,
vocab_to_cache: List[str] = None,
module: torch.nn.Module = None,
) -> None:
super(Elmo, self).__init__()
logging.info("Initializing ELMo")
if module is not None:
if options_file is not None or weight_file is not None:
raise ValueError("Don't provide options_file or weight_file with module")
self._elmo_lstm = module
else:
self._elmo_lstm = _ElmoBiLm(
options_file,
weight_file,
requires_grad=requires_grad,
vocab_to_cache=vocab_to_cache,
)
self._has_cached_vocab = vocab_to_cache is not None
self._dropout = Dropout(p=dropout)
self._scalar_mixes: Any = []
for k in range(num_output_representations):
scalar_mix = ScalarMix(self._elmo_lstm.num_layers, do_layer_norm=do_layer_norm)
self.add_module("scalar_mix_{}".format(k), scalar_mix)
self._scalar_mixes.append(scalar_mix)
[docs] def get_output_dim(self):
return self._elmo_lstm.get_output_dim()
[docs] def forward(
self,
inputs: torch.Tensor,
word_inputs: torch.Tensor = None, # pylint: disable=arguments-differ
) -> Dict[str, Union[torch.Tensor, List[torch.Tensor]]]:
"""
Parameters
----------
inputs: ``torch.Tensor``, required.
Shape ``(batch_size, timesteps, 50)`` of character ids representing the current batch.
word_inputs : ``torch.Tensor``, required.
If you passed a cached vocab, you can in addition pass a tensor of shape
``(batch_size, timesteps)``, which represent word ids which have been pre-cached.
Returns
-------
Dict with keys:
``'elmo_representations'``: ``List[torch.Tensor]``
A ``num_output_representations`` list of ELMo representations for the input sequence.
Each representation is shape ``(batch_size, timesteps, embedding_dim)``
``'mask'``: ``torch.Tensor``
Shape ``(batch_size, timesteps)`` long tensor with sequence mask.
"""
# reshape the input if needed
original_shape = inputs.size()
if len(original_shape) > 3:
timesteps, num_characters = original_shape[-2:]
reshaped_inputs = inputs.view(-1, timesteps, num_characters)
else:
reshaped_inputs = inputs
if word_inputs is not None:
original_word_size = word_inputs.size()
if self._has_cached_vocab and len(original_word_size) > 2:
reshaped_word_inputs = word_inputs.view(-1, original_word_size[-1])
logger.warning(
"Word inputs were passed to ELMo but it does not have a cached vocab."
)
reshaped_word_inputs = None
else:
reshaped_word_inputs = word_inputs
else:
reshaped_word_inputs = word_inputs
# run the biLM
bilm_output = self._elmo_lstm(reshaped_inputs, reshaped_word_inputs)
layer_activations = bilm_output["activations"]
mask_with_bos_eos = bilm_output["mask"]
# compute the elmo representations
representations = []
for i in range(len(self._scalar_mixes)):
scalar_mix = getattr(self, "scalar_mix_{}".format(i))
representation_with_bos_eos = scalar_mix(layer_activations, mask_with_bos_eos)
representation_without_bos_eos, mask_without_bos_eos = remove_sentence_boundaries(
representation_with_bos_eos, mask_with_bos_eos
)
representations.append(self._dropout(representation_without_bos_eos))
# reshape if necessary
if word_inputs is not None and len(original_word_size) > 2:
mask = mask_without_bos_eos.view(original_word_size)
elmo_representations = [
representation.view(original_word_size + (-1,))
for representation in representations
]
elif len(original_shape) > 3:
mask = mask_without_bos_eos.view(original_shape[:-1])
elmo_representations = [
representation.view(original_shape[:-1] + (-1,))
for representation in representations
]
else:
mask = mask_without_bos_eos
elmo_representations = representations
return {"elmo_representations": elmo_representations, "mask": mask}
[docs] @classmethod
def from_params(cls, params) -> "Elmo":
# Add files to archive
params.add_file_to_archive("options_file")
params.add_file_to_archive("weight_file")
options_file = params.pop("options_file")
weight_file = params.pop("weight_file")
requires_grad = params.pop("requires_grad", False)
num_output_representations = params.pop("num_output_representations")
do_layer_norm = params.pop_bool("do_layer_norm", False)
dropout = params.pop_float("dropout", 0.5)
params.assert_empty(cls.__name__)
return cls(
options_file=options_file,
weight_file=weight_file,
num_output_representations=num_output_representations,
requires_grad=requires_grad,
do_layer_norm=do_layer_norm,
dropout=dropout,
)
[docs]def remove_sentence_boundaries(
tensor: torch.Tensor, mask: torch.Tensor
) -> Tuple[torch.Tensor, torch.Tensor]: # pragma: no cover
"""
Remove begin/end of sentence embeddings from the batch of sentences.
Given a batch of sentences with size ``(batch_size, timesteps, dim)``
this returns a tensor of shape ``(batch_size, timesteps - 2, dim)`` after removing
the beginning and end sentence markers. The sentences are assumed to be padded on the right,
with the beginning of each sentence assumed to occur at index 0 (i.e., ``mask[:, 0]`` is assumed
to be 1).
Returns both the new tensor and updated mask.
This function is the inverse of ``add_sentence_boundary_token_ids``.
Parameters
----------
tensor : ``torch.Tensor``
A tensor of shape ``(batch_size, timesteps, dim)``
mask : ``torch.Tensor``
A tensor of shape ``(batch_size, timesteps)``
Returns
-------
tensor_without_boundary_tokens : ``torch.Tensor``
The tensor after removing the boundary tokens of shape ``(batch_size, timesteps - 2, dim)``
new_mask : ``torch.Tensor``
The new mask for the tensor of shape ``(batch_size, timesteps - 2)``.
"""
# TODO: matthewp, profile this transfer
sequence_lengths = mask.sum(dim=1).detach().cpu().numpy()
tensor_shape = list(tensor.data.shape)
new_shape = list(tensor_shape)
new_shape[1] = tensor_shape[1] - 2
tensor_without_boundary_tokens = tensor.new_zeros(*new_shape)
new_mask = tensor.new_zeros((new_shape[0], new_shape[1]), dtype=torch.long)
for i, j in enumerate(sequence_lengths):
if j > 2:
tensor_without_boundary_tokens[i, : (j - 2), :] = tensor[i, 1 : (j - 1), :]
new_mask[i, : (j - 2)] = 1
return tensor_without_boundary_tokens, new_mask
class _ElmoBiLm(torch.nn.Module): # pragma: no cover
"""
Run a pre-trained bidirectional language model, outputing the activations at each
layer for weighting together into an ELMo representation (with
``allennlp.modules.seq2seq_encoders.Elmo``). This is a lower level class, useful
for advanced uses, but most users should use ``allennlp.modules.seq2seq_encoders.Elmo``
directly.
Parameters
----------
options_file : ``str``
ELMo JSON options file
weight_file : ``str``
ELMo hdf5 weight file
requires_grad: ``bool``, optional
If True, compute gradient of ELMo parameters for fine tuning.
vocab_to_cache : ``List[str]``, optional, (default = 0.5).
A list of words to pre-compute and cache character convolutions
for. If you use this option, _ElmoBiLm expects that you pass word
indices of shape (batch_size, timesteps) to forward, instead
of character indices. If you use this option and pass a word which
wasn't pre-cached, this will break.
"""
def __init__(
self,
options_file: str,
weight_file: str,
requires_grad: bool = False,
vocab_to_cache: List[str] = None,
) -> None:
super(_ElmoBiLm, self).__init__()
self._token_embedder = _ElmoCharacterEncoder(
options_file, weight_file, requires_grad=requires_grad
)
self._requires_grad = requires_grad
if requires_grad and vocab_to_cache:
logging.warning(
"You are fine tuning ELMo and caching char CNN word vectors. "
"This behaviour is not guaranteed to be well defined, particularly. "
"if not all of your inputs will occur in the vocabulary cache."
)
# This is an embedding, used to look up cached
# word vectors built from character level cnn embeddings.
self._word_embedding = None
self._bos_embedding: torch.Tensor = None
self._eos_embedding: torch.Tensor = None
with open(options_file, "r") as fin:
options = json.load(fin)
if not options["lstm"].get("use_skip_connections"):
raise ValueError("We only support pretrained biLMs with residual connections")
self._elmo_lstm = ElmoLstm(
input_size=options["lstm"]["projection_dim"],
hidden_size=options["lstm"]["projection_dim"],
cell_size=options["lstm"]["dim"],
num_layers=options["lstm"]["n_layers"],
memory_cell_clip_value=options["lstm"]["cell_clip"],
state_projection_clip_value=options["lstm"]["proj_clip"],
requires_grad=requires_grad,
)
self._elmo_lstm.load_weights(weight_file)
# Number of representation layers including context independent layer
self.num_layers = options["lstm"]["n_layers"] + 1
def get_output_dim(self):
return 2 * self._token_embedder.get_output_dim()
def forward(
self,
inputs: torch.Tensor,
word_inputs: torch.Tensor = None, # pylint: disable=arguments-differ
) -> Dict[str, Union[torch.Tensor, List[torch.Tensor]]]:
"""
Parameters
----------
inputs: ``torch.Tensor``, required.
Shape ``(batch_size, timesteps, 50)`` of character ids representing the current batch.
word_inputs : ``torch.Tensor``, required.
If you passed a cached vocab, you can in addition pass a tensor of shape ``(batch_size, timesteps)``,
which represent word ids which have been pre-cached.
Returns
-------
Dict with keys:
``'activations'``: ``List[torch.Tensor]``
A list of activations at each layer of the network, each of shape
``(batch_size, timesteps + 2, embedding_dim)``
``'mask'``: ``torch.Tensor``
Shape ``(batch_size, timesteps + 2)`` long tensor with sequence mask.
Note that the output tensors all include additional special begin and end of sequence
markers.
"""
if self._word_embedding is not None and word_inputs is not None:
try:
mask_without_bos_eos = (word_inputs > 0).long()
# The character cnn part is cached - just look it up.
embedded_inputs = self._word_embedding(word_inputs) # type: ignore
# shape (batch_size, timesteps + 2, embedding_dim)
type_representation, mask = add_sentence_boundary_token_ids(
embedded_inputs, mask_without_bos_eos, self._bos_embedding, self._eos_embedding
)
except RuntimeError:
# Back off to running the character convolutions,
# as we might not have the words in the cache.
token_embedding = self._token_embedder(inputs)
mask = token_embedding["mask"]
type_representation = token_embedding["token_embedding"]
else:
token_embedding = self._token_embedder(inputs)
mask = token_embedding["mask"]
type_representation = token_embedding["token_embedding"]
lstm_outputs = self._elmo_lstm(type_representation, mask)
# Prepare the output. The first layer is duplicated.
# Because of minor differences in how masking is applied depending
# on whether the char cnn layers are cached, we'll be defensive and
# multiply by the mask here. It's not strictly necessary, as the
# mask passed on is correct, but the values in the padded areas
# of the char cnn representations can change.
output_tensors = [
torch.cat([type_representation, type_representation], dim=-1)
* mask.float().unsqueeze(-1)
]
for layer_activations in torch.chunk(lstm_outputs, lstm_outputs.size(0), dim=0):
output_tensors.append(layer_activations.squeeze(0))
return {"activations": output_tensors, "mask": mask}
[docs]def add_sentence_boundary_token_ids(
tensor: torch.Tensor, mask: torch.Tensor, sentence_begin_token: Any, sentence_end_token: Any
) -> Tuple[torch.Tensor, torch.Tensor]: # pragma: no cover
"""
Add begin/end of sentence tokens to the batch of sentences.
Given a batch of sentences with size ``(batch_size, timesteps)`` or
``(batch_size, timesteps, dim)`` this returns a tensor of shape
``(batch_size, timesteps + 2)`` or ``(batch_size, timesteps + 2, dim)`` respectively.
Returns both the new tensor and updated mask.
Parameters
----------
tensor : ``torch.Tensor``
A tensor of shape ``(batch_size, timesteps)`` or ``(batch_size, timesteps, dim)``
mask : ``torch.Tensor``
A tensor of shape ``(batch_size, timesteps)``
sentence_begin_token: Any (anything that can be broadcast in torch for assignment)
For 2D input, a scalar with the <S> id. For 3D input, a tensor with length dim.
sentence_end_token: Any (anything that can be broadcast in torch for assignment)
For 2D input, a scalar with the </S> id. For 3D input, a tensor with length dim.
Returns
-------
tensor_with_boundary_tokens : ``torch.Tensor``
The tensor with the appended and prepended boundary tokens. If the input was 2D,
it has shape (batch_size, timesteps + 2) and if the input was 3D, it has shape
(batch_size, timesteps + 2, dim).
new_mask : ``torch.Tensor``
The new mask for the tensor, taking into account the appended tokens
marking the beginning and end of the sentence.
"""
# TODO: matthewp, profile this transfer
sequence_lengths = mask.sum(dim=1).detach().cpu().numpy()
tensor_shape = list(tensor.data.shape)
new_shape = list(tensor_shape)
new_shape[1] = tensor_shape[1] + 2
tensor_with_boundary_tokens = tensor.new_zeros(*new_shape)
if len(tensor_shape) == 2:
tensor_with_boundary_tokens[:, 1:-1] = tensor
tensor_with_boundary_tokens[:, 0] = sentence_begin_token
for i, j in enumerate(sequence_lengths):
tensor_with_boundary_tokens[i, j + 1] = sentence_end_token
new_mask = (tensor_with_boundary_tokens != 0).long()
elif len(tensor_shape) == 3:
tensor_with_boundary_tokens[:, 1:-1, :] = tensor
for i, j in enumerate(sequence_lengths):
tensor_with_boundary_tokens[i, 0, :] = sentence_begin_token
tensor_with_boundary_tokens[i, j + 1, :] = sentence_end_token
new_mask = ((tensor_with_boundary_tokens > 0).long().sum(dim=-1) > 0).long()
else:
raise ValueError("add_sentence_boundary_token_ids only accepts 2D and 3D input")
return tensor_with_boundary_tokens, new_mask
def _make_bos_eos(
character: int,
padding_character: int,
beginning_of_word_character: int,
end_of_word_character: int,
max_word_length: int,
): # pragma: no cover
char_ids = [padding_character] * max_word_length
char_ids[0] = beginning_of_word_character
char_ids[1] = character
char_ids[2] = end_of_word_character
return char_ids
class _ElmoCharacterEncoder(torch.nn.Module): # pragma: no cover
"""
Compute context sensitive token representation using pretrained biLM.
This embedder has input character ids of size (batch_size, sequence_length, 50)
and returns (batch_size, sequence_length + 2, embedding_dim), where embedding_dim
is specified in the options file (typically 512).
We add special entries at the beginning and end of each sequence corresponding
to <S> and </S>, the beginning and end of sentence tokens.
Note: this is a lower level class useful for advanced usage. Most users should
use ``ElmoTokenEmbedder`` or ``allennlp.modules.Elmo`` instead.
Parameters
----------
options_file : ``str``
ELMo JSON options file
weight_file : ``str``
ELMo hdf5 weight file
requires_grad: ``bool``, optional
If True, compute gradient of ELMo parameters for fine tuning.
The relevant section of the options file is something like:
.. example-code::
.. code-block:: python
{'char_cnn': {
'activation': 'relu',
'embedding': {'dim': 4},
'filters': [[1, 4], [2, 8], [3, 16], [4, 32], [5, 64]],
'max_characters_per_token': 50,
'n_characters': 262,
'n_highway': 2
}
}
"""
def __init__(self, options_file: str, weight_file: str, requires_grad: bool = False) -> None:
super(_ElmoCharacterEncoder, self).__init__()
with open(options_file, "r") as fin:
self._options = json.load(fin)
self._weight_file = weight_file
self.output_dim = self._options["lstm"]["projection_dim"]
self.requires_grad = requires_grad
self._load_weights()
max_word_length = 50
# char ids 0-255 come from utf-8 encoding bytes
# assign 256-300 to special chars
beginning_of_sentence_character = 256 # <begin sentence>
end_of_sentence_character = 257 # <end sentence>
beginning_of_word_character = 258 # <begin word>
end_of_word_character = 259 # <end word>
padding_character = 260 # <padding>
beginning_of_sentence_characters = _make_bos_eos(
beginning_of_sentence_character,
padding_character,
beginning_of_word_character,
end_of_word_character,
max_word_length,
)
end_of_sentence_characters = _make_bos_eos(
end_of_sentence_character,
padding_character,
beginning_of_word_character,
end_of_word_character,
max_word_length,
)
# Cache the arrays for use in forward -- +1 due to masking.
self._beginning_of_sentence_characters = torch.from_numpy(
numpy.array(beginning_of_sentence_characters) + 1
)
self._end_of_sentence_characters = torch.from_numpy(
numpy.array(end_of_sentence_characters) + 1
)
def get_output_dim(self):
return self.output_dim
@overrides
def forward(
self, inputs: torch.Tensor
) -> Dict[str, torch.Tensor]: # pylint: disable=arguments-differ
"""
Compute context insensitive token embeddings for ELMo representations.
Parameters
----------
inputs: ``torch.Tensor``
Shape ``(batch_size, sequence_length, 50)`` of character ids representing the
current batch.
Returns
-------
Dict with keys:
``'token_embedding'``: ``torch.Tensor``
Shape ``(batch_size, sequence_length + 2, embedding_dim)`` tensor with context
insensitive token representations.
``'mask'``: ``torch.Tensor``
Shape ``(batch_size, sequence_length + 2)`` long tensor with sequence mask.
"""
# Add BOS/EOS
mask = ((inputs > 0).long().sum(dim=-1) > 0).long()
character_ids_with_bos_eos, mask_with_bos_eos = add_sentence_boundary_token_ids(
inputs, mask, self._beginning_of_sentence_characters, self._end_of_sentence_characters
)
# the character id embedding
max_chars_per_token = self._options["char_cnn"]["max_characters_per_token"]
# (batch_size * sequence_length, max_chars_per_token, embed_dim)
character_embedding = torch.nn.functional.embedding(
character_ids_with_bos_eos.view(-1, max_chars_per_token), self._char_embedding_weights
)
# run convolutions
cnn_options = self._options["char_cnn"]
if cnn_options["activation"] == "tanh":
activation = torch.nn.functional.tanh
elif cnn_options["activation"] == "relu":
activation = torch.nn.functional.relu
else:
raise ValueError("Unknown activation")
# (batch_size * sequence_length, embed_dim, max_chars_per_token)
character_embedding = torch.transpose(character_embedding, 1, 2)
convs = []
for i in range(len(self._convolutions)):
conv = getattr(self, "char_conv_{}".format(i))
convolved = conv(character_embedding)
# (batch_size * sequence_length, n_filters for this width)
convolved, _ = torch.max(convolved, dim=-1)
convolved = activation(convolved)
convs.append(convolved)
# (batch_size * sequence_length, n_filters)
token_embedding = torch.cat(convs, dim=-1)
# apply the highway layers (batch_size * sequence_length, n_filters)
token_embedding = self._highways(token_embedding)
# final projection (batch_size * sequence_length, embedding_dim)
token_embedding = self._projection(token_embedding)
# reshape to (batch_size, sequence_length, embedding_dim)
batch_size, sequence_length, _ = character_ids_with_bos_eos.size()
return {
"mask": mask_with_bos_eos,
"token_embedding": token_embedding.view(batch_size, sequence_length, -1),
}
def _load_weights(self):
self._load_char_embedding()
self._load_cnn_weights()
self._load_highway()
self._load_projection()
def _load_char_embedding(self):
with h5py.File(self._weight_file, "r") as fin:
char_embed_weights = fin["char_embed"][...]
weights = numpy.zeros(
(char_embed_weights.shape[0] + 1, char_embed_weights.shape[1]), dtype="float32"
)
weights[1:, :] = char_embed_weights
self._char_embedding_weights = torch.nn.Parameter(
torch.FloatTensor(weights), requires_grad=self.requires_grad
)
def _load_cnn_weights(self):
cnn_options = self._options["char_cnn"]
filters = cnn_options["filters"]
char_embed_dim = cnn_options["embedding"]["dim"]
convolutions = []
for i, (width, num) in enumerate(filters):
conv = torch.nn.Conv1d(
in_channels=char_embed_dim, out_channels=num, kernel_size=width, bias=True
)
# load the weights
with h5py.File(self._weight_file, "r") as fin:
weight = fin["CNN"]["W_cnn_{}".format(i)][...]
bias = fin["CNN"]["b_cnn_{}".format(i)][...]
w_reshaped = numpy.transpose(weight.squeeze(axis=0), axes=(2, 1, 0))
if w_reshaped.shape != tuple(conv.weight.data.shape):
raise ValueError("Invalid weight file")
conv.weight.data.copy_(torch.FloatTensor(w_reshaped))
conv.bias.data.copy_(torch.FloatTensor(bias))
conv.weight.requires_grad = self.requires_grad
conv.bias.requires_grad = self.requires_grad
convolutions.append(conv)
self.add_module("char_conv_{}".format(i), conv)
self._convolutions = convolutions
def _load_highway(self):
# pylint: disable=protected-access
# the highway layers have same dimensionality as the number of cnn filters
cnn_options = self._options["char_cnn"]
filters = cnn_options["filters"]
n_filters = sum(f[1] for f in filters)
n_highway = cnn_options["n_highway"]
# create the layers, and load the weights
self._highways = Highway(n_filters, n_highway, activation=torch.nn.functional.relu)
for k in range(n_highway):
# The AllenNLP highway is one matrix multplication with concatenation of
# transform and carry weights.
with h5py.File(self._weight_file, "r") as fin:
# The weights are transposed due to multiplication order assumptions in tf
# vs pytorch (tf.matmul(X, W) vs pytorch.matmul(W, X))
w_transform = numpy.transpose(fin["CNN_high_{}".format(k)]["W_transform"][...])
# -1.0 since AllenNLP is g * x + (1 - g) * f(x) but tf is (1 - g) * x + g * f(x)
w_carry = -1.0 * numpy.transpose(fin["CNN_high_{}".format(k)]["W_carry"][...])
weight = numpy.concatenate([w_transform, w_carry], axis=0)
self._highways._layers[k].weight.data.copy_(torch.FloatTensor(weight))
self._highways._layers[k].weight.requires_grad = self.requires_grad
b_transform = fin["CNN_high_{}".format(k)]["b_transform"][...]
b_carry = -1.0 * fin["CNN_high_{}".format(k)]["b_carry"][...]
bias = numpy.concatenate([b_transform, b_carry], axis=0)
self._highways._layers[k].bias.data.copy_(torch.FloatTensor(bias))
self._highways._layers[k].bias.requires_grad = self.requires_grad
def _load_projection(self):
cnn_options = self._options["char_cnn"]
filters = cnn_options["filters"]
n_filters = sum(f[1] for f in filters)
self._projection = torch.nn.Linear(n_filters, self.output_dim, bias=True)
with h5py.File(self._weight_file, "r") as fin:
weight = fin["CNN_proj"]["W_proj"][...]
bias = fin["CNN_proj"]["b_proj"][...]
self._projection.weight.data.copy_(torch.FloatTensor(numpy.transpose(weight)))
self._projection.bias.data.copy_(torch.FloatTensor(bias))
self._projection.weight.requires_grad = self.requires_grad
self._projection.bias.requires_grad = self.requires_grad
[docs]class ElmoLstm(_EncoderBase): # pragma: no cover
"""
A stacked, bidirectional LSTM which uses
:class:`~allennlp.modules.lstm_cell_with_projection.LstmCellWithProjection`'s
with highway layers between the inputs to layers.
The inputs to the forward and backward directions are independent - forward and backward
states are not concatenated between layers.
Additionally, this LSTM maintains its `own` state, which is updated every time
``forward`` is called. It is dynamically resized for different batch sizes and is
designed for use with non-continuous inputs (i.e inputs which aren't formatted as a stream,
such as text used for a language modelling task, which is how stateful RNNs are typically used).
This is non-standard, but can be thought of as having an "end of sentence" state, which is
carried across different sentences.
Parameters
----------
input_size : ``int``, required
The dimension of the inputs to the LSTM.
hidden_size : ``int``, required
The dimension of the outputs of the LSTM.
cell_size : ``int``, required.
The dimension of the memory cell of the
:class:`~allennlp.modules.lstm_cell_with_projection.LstmCellWithProjection`.
num_layers : ``int``, required
The number of bidirectional LSTMs to use.
requires_grad: ``bool``, optional
If True, compute gradient of ELMo parameters for fine tuning.
recurrent_dropout_probability: ``float``, optional (default = 0.0)
The dropout probability to be used in a dropout scheme as stated in
`A Theoretically Grounded Application of Dropout in Recurrent Neural Networks
<https://arxiv.org/abs/1512.05287>`_ .
state_projection_clip_value: ``float``, optional, (default = None)
The magnitude with which to clip the hidden_state after projecting it.
memory_cell_clip_value: ``float``, optional, (default = None)
The magnitude with which to clip the memory cell.
"""
def __init__(
self,
input_size: int,
hidden_size: int,
cell_size: int,
num_layers: int,
requires_grad: bool = False,
recurrent_dropout_probability: float = 0.0,
memory_cell_clip_value: Optional[float] = None,
state_projection_clip_value: Optional[float] = None,
) -> None:
super(ElmoLstm, self).__init__(stateful=True)
# Required to be wrapped with a :class:`PytorchSeq2SeqWrapper`.
self.input_size = input_size
self.hidden_size = hidden_size
self.num_layers = num_layers
self.cell_size = cell_size
self.requires_grad = requires_grad
forward_layers = []
backward_layers = []
lstm_input_size = input_size
go_forward = True
for layer_index in range(num_layers):
forward_layer = LstmCellWithProjection(
lstm_input_size,
hidden_size,
cell_size,
go_forward,
recurrent_dropout_probability,
memory_cell_clip_value,
state_projection_clip_value,
)
backward_layer = LstmCellWithProjection(
lstm_input_size,
hidden_size,
cell_size,
not go_forward,
recurrent_dropout_probability,
memory_cell_clip_value,
state_projection_clip_value,
)
lstm_input_size = hidden_size
self.add_module("forward_layer_{}".format(layer_index), forward_layer)
self.add_module("backward_layer_{}".format(layer_index), backward_layer)
forward_layers.append(forward_layer)
backward_layers.append(backward_layer)
self.forward_layers = forward_layers
self.backward_layers = backward_layers
[docs] def forward(
self, inputs: torch.Tensor, mask: torch.LongTensor # pylint: disable=arguments-differ
) -> torch.Tensor:
"""
Parameters
----------
inputs : ``torch.Tensor``, required.
A Tensor of shape ``(batch_size, sequence_length, hidden_size)``.
mask : ``torch.LongTensor``, required.
A binary mask of shape ``(batch_size, sequence_length)`` representing the
non-padded elements in each sequence in the batch.
Returns
-------
A ``torch.Tensor`` of shape (num_layers, batch_size, sequence_length, hidden_size),
where the num_layers dimension represents the LSTM output from that layer.
"""
batch_size, total_sequence_length = mask.size()
stacked_sequence_output, final_states, restoration_indices = self.sort_and_run_forward(
self._lstm_forward, inputs, mask
)
num_layers, num_valid, returned_timesteps, encoder_dim = stacked_sequence_output.size()
# Add back invalid rows which were removed in the call to sort_and_run_forward.
if num_valid < batch_size:
zeros = stacked_sequence_output.new_zeros(
num_layers, batch_size - num_valid, returned_timesteps, encoder_dim
)
stacked_sequence_output = torch.cat([stacked_sequence_output, zeros], 1)
# The states also need to have invalid rows added back.
new_states = []
for state in final_states:
state_dim = state.size(-1)
zeros = state.new_zeros(num_layers, batch_size - num_valid, state_dim)
new_states.append(torch.cat([state, zeros], 1))
final_states = new_states
# It's possible to need to pass sequences which are padded to longer than the
# max length of the sequence to a Seq2StackEncoder. However, packing and unpacking
# the sequences mean that the returned tensor won't include these dimensions, because
# the RNN did not need to process them. We add them back on in the form of zeros here.
sequence_length_difference = total_sequence_length - returned_timesteps
if sequence_length_difference > 0:
zeros = stacked_sequence_output.new_zeros(
num_layers,
batch_size,
sequence_length_difference,
stacked_sequence_output[0].size(-1),
)
stacked_sequence_output = torch.cat([stacked_sequence_output, zeros], 2)
self._update_states(final_states, restoration_indices)
# Restore the original indices and return the sequence.
# Has shape (num_layers, batch_size, sequence_length, hidden_size)
return stacked_sequence_output.index_select(1, restoration_indices)
def _lstm_forward(
self,
inputs: PackedSequence,
initial_state: Optional[Tuple[torch.Tensor, torch.Tensor]] = None,
) -> Tuple[torch.Tensor, Tuple[torch.Tensor, torch.Tensor]]:
"""
Parameters
----------
inputs : ``PackedSequence``, required.
A batch first ``PackedSequence`` to run the stacked LSTM over.
initial_state : ``Tuple[torch.Tensor, torch.Tensor]``, optional, (default = None)
A tuple (state, memory) representing the initial hidden state and memory
of the LSTM, with shape (num_layers, batch_size, 2 * hidden_size) and
(num_layers, batch_size, 2 * cell_size) respectively.
Returns
-------
output_sequence : ``torch.FloatTensor``
The encoded sequence of shape (num_layers, batch_size, sequence_length, hidden_size)
final_states: ``Tuple[torch.FloatTensor, torch.FloatTensor]``
The per-layer final (state, memory) states of the LSTM, with shape
(num_layers, batch_size, 2 * hidden_size) and (num_layers, batch_size, 2 * cell_size)
respectively. The last dimension is duplicated because it contains the state/memory
for both the forward and backward layers.
"""
if initial_state is None:
hidden_states: List[Optional[Tuple[torch.Tensor, torch.Tensor]]] = [None] * len(
self.forward_layers
)
elif initial_state[0].size()[0] != len(self.forward_layers):
raise ValueError(
"Initial states were passed to forward() but the number of "
"initial states does not match the number of layers."
)
else:
hidden_states = list(zip(initial_state[0].split(1, 0), initial_state[1].split(1, 0)))
inputs, batch_lengths = pad_packed_sequence(inputs, batch_first=True)
forward_output_sequence = inputs
backward_output_sequence = inputs
final_states = []
sequence_outputs = []
for layer_index, state in enumerate(hidden_states):
forward_layer = getattr(self, "forward_layer_{}".format(layer_index))
backward_layer = getattr(self, "backward_layer_{}".format(layer_index))
forward_cache = forward_output_sequence
backward_cache = backward_output_sequence
if state is not None:
forward_hidden_state, backward_hidden_state = state[0].split(self.hidden_size, 2)
forward_memory_state, backward_memory_state = state[1].split(self.cell_size, 2)
forward_state = (forward_hidden_state, forward_memory_state)
backward_state = (backward_hidden_state, backward_memory_state)
else:
forward_state = None
backward_state = None
forward_output_sequence, forward_state = forward_layer(
forward_output_sequence, batch_lengths, forward_state
)
backward_output_sequence, backward_state = backward_layer(
backward_output_sequence, batch_lengths, backward_state
)
# Skip connections, just adding the input to the output.
if layer_index != 0:
forward_output_sequence += forward_cache
backward_output_sequence += backward_cache
sequence_outputs.append(
torch.cat([forward_output_sequence, backward_output_sequence], -1)
)
# Append the state tuples in a list, so that we can return
# the final states for all the layers.
final_states.append(
(
torch.cat([forward_state[0], backward_state[0]], -1),
torch.cat([forward_state[1], backward_state[1]], -1),
)
)
stacked_sequence_outputs: torch.FloatTensor = torch.stack(sequence_outputs)
# Stack the hidden state and memory for each layer into 2 tensors of shape
# (num_layers, batch_size, hidden_size) and (num_layers, batch_size, cell_size)
# respectively.
final_hidden_states, final_memory_states = zip(*final_states)
final_state_tuple: Tuple[torch.FloatTensor, torch.FloatTensor] = (
torch.cat(final_hidden_states, 0),
torch.cat(final_memory_states, 0),
)
return stacked_sequence_outputs, final_state_tuple
[docs] def load_weights(self, weight_file: str) -> None:
"""
Load the pre-trained weights from the file.
"""
requires_grad = self.requires_grad
with h5py.File(weight_file, "r") as fin:
for i_layer, lstms in enumerate(zip(self.forward_layers, self.backward_layers)):
for j_direction, lstm in enumerate(lstms):
# lstm is an instance of LSTMCellWithProjection
cell_size = lstm.cell_size
dataset = fin["RNN_%s" % j_direction]["RNN"]["MultiRNNCell"][
"Cell%s" % i_layer
]["LSTMCell"]
# tensorflow packs together both W and U matrices into one matrix,
# but pytorch maintains individual matrices. In addition, tensorflow
# packs the gates as input, memory, forget, output but pytorch
# uses input, forget, memory, output. So we need to modify the weights.
tf_weights = numpy.transpose(dataset["W_0"][...])
torch_weights = tf_weights.copy()
# split the W from U matrices
input_size = lstm.input_size
input_weights = torch_weights[:, :input_size]
recurrent_weights = torch_weights[:, input_size:]
tf_input_weights = tf_weights[:, :input_size]
tf_recurrent_weights = tf_weights[:, input_size:]
# handle the different gate order convention
for torch_w, tf_w in [
[input_weights, tf_input_weights],
[recurrent_weights, tf_recurrent_weights],
]:
torch_w[(1 * cell_size) : (2 * cell_size), :] = tf_w[
(2 * cell_size) : (3 * cell_size), :
]
torch_w[(2 * cell_size) : (3 * cell_size), :] = tf_w[
(1 * cell_size) : (2 * cell_size), :
]
lstm.input_linearity.weight.data.copy_(torch.FloatTensor(input_weights))
lstm.state_linearity.weight.data.copy_(torch.FloatTensor(recurrent_weights))
lstm.input_linearity.weight.requires_grad = requires_grad
lstm.state_linearity.weight.requires_grad = requires_grad
# the bias weights
tf_bias = dataset["B"][...]
# tensorflow adds 1.0 to forget gate bias instead of modifying the
# parameters...
tf_bias[(2 * cell_size) : (3 * cell_size)] += 1
torch_bias = tf_bias.copy()
torch_bias[(1 * cell_size) : (2 * cell_size)] = tf_bias[
(2 * cell_size) : (3 * cell_size)
]
torch_bias[(2 * cell_size) : (3 * cell_size)] = tf_bias[
(1 * cell_size) : (2 * cell_size)
]
lstm.state_linearity.bias.data.copy_(torch.FloatTensor(torch_bias))
lstm.state_linearity.bias.requires_grad = requires_grad
# the projection weights
proj_weights = numpy.transpose(dataset["W_P_0"][...])
lstm.state_projection.weight.data.copy_(torch.FloatTensor(proj_weights))
lstm.state_projection.weight.requires_grad = requires_grad