Source code for claf.tokens.text_handler


from collections import Counter
import logging
import time

from tqdm import tqdm

from claf.data.data_handler import CachePath, DataHandler
from claf.data.utils import padding_tokens, transpose
from claf.tokens.token_maker import TokenMaker
from claf.tokens.vocabulary import Vocab
from claf import utils as common_utils

logger = logging.getLogger(__name__)


[docs]class TextHandler: """ Text Handler - voacb and token_counter - raw_features -> indexed_features - raw_features -> tensor * Args: token_makers: Dictionary consisting of - key: token_name - value: TokenMaker (claf.tokens.token_maker) * Kwargs: lazy_indexing: Apply `Lazy Evaluation` to text indexing """ def __init__(self, token_makers, lazy_indexing=True): self.token_makers = token_makers self.lazy_indexing = lazy_indexing self.data_handler = DataHandler(cache_path=CachePath.TOKEN_COUNTER)
[docs] def build_vocabs(self, token_counters): logger.info("Start build vocab") vocab_start_time = time.time() vocabs = {} for token_name, token_maker in self.token_makers.items(): is_defined_config = type(token_maker.vocab_config) == dict if is_defined_config: token_counter = token_counters[token_name] vocab = self._build_vocab_with_config(token_name, token_maker, token_counter) else: vocab = Vocab(token_name) vocab.init() vocabs[token_name] = vocab logger.info( f" => {token_name} vocab size: {len(vocab)} (use predefine vocab: {vocab.pretrained_path is not None})" ) vocab_elapased_time = time.time() - vocab_start_time logger.info(f"Complete build vocab... elapsed_time: {vocab_elapased_time}\n") # Setting Indexer (vocab) for token_name, token_maker in self.token_makers.items(): token_maker.set_vocab(vocabs[token_name]) return vocabs
def _build_vocab_with_config(self, token_name, token_maker, token_counter): token_maker.vocab_config["token_name"] = token_name vocab = Vocab(**token_maker.vocab_config) if vocab.pretrained_path is not None: vocab.build_with_pretrained_file(token_counter) else: vocab.build(token_counter) return vocab
[docs] def is_all_vocab_use_pretrained(self): for token_name, token_maker in self.token_makers.items(): if token_maker.vocab_config.get("pretrained_path", None) is None: return False if token_maker.vocab_config.get("pretrained_token", "") != Vocab.PRETRAINED_ALL: return False return True
[docs] def make_token_counters(self, texts, config=None): token_counters = {} for token_name, token_maker in self.token_makers.items(): token_vocab_config = token_maker.vocab_config if type(token_vocab_config) == dict: if token_vocab_config.get("pretrained_token", None) == Vocab.PRETRAINED_ALL: texts = [ "" ] # do not use token_counter from dataset -> make empty token_counter token_counter = self._make_token_counter( texts, token_maker.tokenizer, config=config, desc=f"{token_name}-vocab" ) logger.info(f" * {token_name} token_counter size: {len(token_counter)}") token_counters[token_name] = token_counter return token_counters
def _make_token_counter(self, texts, tokenizer, config=None, desc=None): tokenizer_name = tokenizer.name cache_token_counter = None if config is not None: data_reader_config = config.data_reader cache_token_counter = self.data_handler.cache_token_counter( data_reader_config, tokenizer_name ) if cache_token_counter: return cache_token_counter else: tokens = [ token for text in tqdm(texts, desc=desc) for token in tokenizer.tokenize(text) ] flatten_list = list(common_utils.flatten(tokens)) token_counter = Counter(flatten_list) if config is not None: # Cache TokenCounter self.data_handler.cache_token_counter( data_reader_config, tokenizer_name, obj=token_counter ) return token_counter
[docs] def index(self, datas, text_columns): logger.info(f"Start token indexing, Lazy: {self.lazy_indexing}") indexing_start_time = time.time() for data_type, data in datas.items(): if type(data) == list: # Multi-Data Indexing for d in data: self._index_features( d.features, text_columns, desc=f"indexing features ({data_type})" ) else: self._index_features( data.features, text_columns, desc=f"indexing features ({data_type})" ) indexing_elapased_time = time.time() - indexing_start_time logger.info(f"Complete token indexing... elapsed_time: {indexing_elapased_time} \n")
def _index_features(self, features, text_columns, desc=None, suppress_tqdm=False): for feature in tqdm(features, desc=desc, disable=suppress_tqdm): for key, text in feature.items(): if key not in text_columns: continue # Set data_type (text => {"text": ..., "token1": ..., ...}) if type(feature[key]) != dict: feature[key] = {"text": text} if type(text) == dict: text = text["text"] for token_name, token_maker in self.token_makers.items(): param_key = token_maker.indexer.param_key if param_key == key: continue feature[key][token_name] = self._index_token(token_maker, text, feature) def _index_token(self, token_maker, text, data): def index(): indexer = token_maker.indexer params = {} if token_maker.type_name == TokenMaker.EXACT_MATCH_TYPE: param_text = data[indexer.param_key] if type(param_text) == dict: param_text = param_text["text"] params["query_text"] = param_text return indexer.index(text, **params) if self.lazy_indexing: return index else: return index()
[docs] def raw_to_tensor_fn(self, data_reader, cuda_device=None, helper={}): def raw_to_tensor(inputs): is_one = True # batch_size 1 flag feature, _helper = data_reader.read_one_example(inputs) nonlocal helper helper.update(_helper) if type(feature) == list: is_one = False features = feature else: features = [feature] self._index_features(features, data_reader.text_columns, suppress_tqdm=True) if is_one: indexed_features = features[0] else: # when features > 1, need to transpose (dict_of_list -> list_of_dict) indexed_features = {} for key in features[0]: feature_with_key = [feature[key] for feature in features] indexed_features[key] = transpose(feature_with_key, skip_keys=["text"]) for key in indexed_features: for token_name in self.token_makers: if token_name not in indexed_features[key]: continue indexed_values = indexed_features[key][token_name] if is_one: indexed_values = [indexed_values] tensor = padding_tokens(indexed_values, token_name=token_name) if cuda_device is not None and type(tensor) != list: tensor = tensor.cuda(cuda_device) indexed_features[key][token_name] = tensor for key in indexed_features: if "text" in indexed_features[key]: del indexed_features[key]["text"] return indexed_features, helper return raw_to_tensor