import atexit
import logging
from pathlib import Path
import torch
from claf import nsml
from claf.config.factory import (
DataReaderFactory,
DataLoaderFactory,
TokenMakersFactory,
ModelFactory,
OptimizerFactory,
)
from claf import utils as common_utils
from claf.config.args import NestedNamespace
from claf.config.utils import convert_config2dict, pretty_json_dumps, set_global_seed
from claf.tokens.text_handler import TextHandler
from claf.learn.mode import Mode
from claf.learn.trainer import Trainer
from claf.learn import utils
logger = logging.getLogger(__name__)
[docs]class Experiment:
"""
Experiment settings with config.
* Args:
mode: Mode (ex. TRAIN, EVAL, INFER_EVAL, PREDICT)
config: (NestedNamespace) Argument config according to mode
"""
def __init__(self, mode, config):
common_utils.set_logging_config(mode, config)
self.argument = (
config
) # self.config (experiment overall config) / config (argument according to mode)
self.config = config
self.mode = mode
self.common_setting(mode, config)
if mode != Mode.TRAIN: # evaluate and predict
self.load_setting()
# Set evaluation config
if mode.endswith(Mode.EVAL):
self.config.data_reader.train_file_path = ""
self.config.data_reader.valid_file_path = self.argument.data_file_path
self.config.cuda_devices = self.argument.cuda_devices
self.config.iterator.cuda_devices = self.argument.cuda_devices
if getattr(self.argument, "inference_latency", None):
self.config.max_latency = self.argument.inference_latency
self.predict_settings = None
[docs] def common_setting(self, mode, config):
""" Common Setting - experiment config, use_gpu and cuda_device_ids """
self.config_dict = convert_config2dict(config)
cuda_devices = self._get_cuda_devices()
self.config.cuda_devices = cuda_devices
self.config.slack_url = getattr(self.config, "slack_url", False)
def _get_cuda_devices(self):
if getattr(self.config, "use_gpu", None) is None:
self.config.use_gpu = torch.cuda.is_available() or nsml.IS_ON_NSML
if self.config.use_gpu:
if nsml.IS_ON_NSML:
return list(range(self.config.gpu_num))
else:
return self.config.cuda_devices
else:
return None
[docs] def load_setting(self):
""" Load Setting - need to load checkpoint case (ex. evaluate and predict) """
cuda_devices = self.argument.cuda_devices
checkpoint_path = self.argument.checkpoint_path
prev_cuda_device_id = getattr(self.argument, "prev_cuda_device_id", None)
self.model_checkpoint = self._read_checkpoint(
cuda_devices, checkpoint_path, prev_cuda_device_id=prev_cuda_device_id
)
self._set_saved_config(cuda_devices)
def _read_checkpoint(self, cuda_devices, checkpoint_path, prev_cuda_device_id=None):
if cuda_devices == "cpu":
return torch.load(checkpoint_path, map_location="cpu") # use CPU
if torch.cuda.is_available():
checkpoint = torch.load(
checkpoint_path,
map_location={
f"cuda:{prev_cuda_device_id}": f"cuda:{cuda_devices[0]}"
}, # different cuda_device id case (save/load)
)
else:
checkpoint = torch.load(checkpoint_path, map_location="cpu") # use CPU
return checkpoint
def _set_saved_config(self, cuda_devices):
saved_config_dict = self.model_checkpoint["config"]
self.config_dict = saved_config_dict
logger.info("Load saved_config ...")
logger.info(pretty_json_dumps(saved_config_dict))
saved_config = NestedNamespace()
saved_config.load_from_json(saved_config_dict)
is_use_gpu = self.config.use_gpu
self.config = saved_config
self.config.use_gpu = is_use_gpu
self.config.cuda_devices = cuda_devices
def __call__(self):
""" Run Trainer """
set_global_seed(self.config.seed_num) # For Reproducible
if self.mode == Mode.TRAIN:
# exit trigger slack notification
if self.config.slack_url:
atexit.register(utils.send_message_to_slack)
train_loader, valid_loader, optimizer = self.set_train_mode()
assert train_loader is not None
assert optimizer is not None
if valid_loader is None:
self.trainer.train(train_loader, optimizer)
else:
self.trainer.train_and_evaluate(train_loader, valid_loader, optimizer)
self._summary_experiments()
elif self.mode == Mode.EVAL:
valid_loader = self.set_eval_mode()
assert valid_loader is not None
return self.trainer.evaluate(valid_loader)
elif self.mode == Mode.INFER_EVAL:
raw_examples, raw_to_tensor_fn = self.set_eval_inference_latency_mode()
assert raw_examples is not None
assert raw_to_tensor_fn is not None
return self.trainer.evaluate_inference_latency(raw_examples, raw_to_tensor_fn, max_latency=self.config.max_latency)
elif self.mode.endswith(Mode.PREDICT):
raw_features, raw_to_tensor_fn, arguments = self.set_predict_mode()
assert raw_features is not None
assert raw_to_tensor_fn is not None
return self.trainer.predict(
raw_features,
raw_to_tensor_fn,
arguments,
interactive=arguments.get("interactive", False),
)
else:
raise ValueError(f"unknown mode: {self.mode}")
[docs] def set_train_mode(self):
"""
Training Mode
- Pipeline
1. read raw_data (DataReader)
2. build vocabs (DataReader, Token)
3. indexing tokens (DataReader, Token)
4. convert to DataSet (DataReader)
5. create DataLoader (DataLoader)
6. define model and optimizer
7. run!
"""
logger.info("Config. \n" + pretty_json_dumps(self.config_dict) + "\n")
data_reader, token_makers = self._create_data_and_token_makers()
datas, helpers = data_reader.read()
# Token & Vocab
text_handler = TextHandler(token_makers, lazy_indexing=True)
if text_handler.is_all_vocab_use_pretrained():
token_counters = token_makers
else:
texts = data_reader.filter_texts(datas)
token_counters = text_handler.make_token_counters(texts, config=self.config)
vocabs = text_handler.build_vocabs(token_counters)
text_handler.index(datas, data_reader.text_columns)
# iterator
vocab = vocabs[next(iter(vocabs))]
datasets = data_reader.convert_to_dataset(datas, vocab, helpers=helpers) # with name
self.config.iterator.cuda_devices = self.config.cuda_devices
train_loader, valid_loader, test_loader = self._create_by_factory(
DataLoaderFactory, self.config.iterator, param={"datasets": datasets}
)
# calculate 'num_train_steps'
num_train_steps = self._get_num_train_steps(train_loader)
self.config.optimizer.num_train_steps = num_train_steps
checkpoint_dir = Path(self.config.trainer.log_dir) / "checkpoint"
checkpoints = None
if checkpoint_dir.exists():
checkpoints = self._load_exist_checkpoints(checkpoint_dir) # contain model and optimizer
if checkpoints is None:
model = self._create_model(token_makers, helpers=helpers)
op_dict = self._create_by_factory(
OptimizerFactory, self.config.optimizer, param={"model": model}
)
else:
model = self._create_model(token_makers, checkpoint=checkpoints)
op_dict = self._create_by_factory(
OptimizerFactory, self.config.optimizer, param={"model": model}
)
utils.load_optimizer_checkpoint(op_dict["optimizer"], checkpoints)
self.set_trainer(model, op_dict=op_dict)
return train_loader, valid_loader, op_dict["optimizer"]
def _create_data_and_token_makers(self):
token_makers = self._create_by_factory(TokenMakersFactory, self.config.token)
tokenizers = token_makers["tokenizers"]
del token_makers["tokenizers"]
self.config.data_reader.tokenizers = tokenizers
data_reader = self._create_by_factory(DataReaderFactory, self.config.data_reader)
return data_reader, token_makers
def _create_by_factory(self, factory, item_config, param={}):
return factory(item_config).create(**param)
def _get_num_train_steps(self, train_loader):
train_set_size = len(train_loader.dataset)
batch_size = self.config.iterator.batch_size
gradient_accumulation_steps = getattr(self.config.optimizer, "gradient_accumulation_steps", 1)
num_epochs = self.config.trainer.num_epochs
one_epoch_steps = int(train_set_size / batch_size / gradient_accumulation_steps)
if one_epoch_steps == 0:
one_epoch_steps = 1
num_train_steps = one_epoch_steps * num_epochs
return num_train_steps
def _load_exist_checkpoints(self, checkpoint_dir): # pragma: no cover
checkpoints = utils.get_sorted_path(checkpoint_dir, both_exist=True)
train_counts = list(checkpoints.keys())
if not train_counts:
return None
seperator = "-" * 50
message = f"{seperator}\n !! Find exist checkpoints {train_counts}.\n If you want to recover, input train_count in list.\n If you don't want to recover, input 0.\n{seperator}"
selected_train_count = common_utils.get_user_input(message)
if selected_train_count == 0:
return None
model_path = checkpoints[selected_train_count]["model"]
model_checkpoint = self._read_checkpoint(self.config.cuda_devices, model_path)
optimizer_path = checkpoints[selected_train_count]["optimizer"]
optimizer_checkpoint = self._read_checkpoint("cpu", optimizer_path)
checkpoints = {}
checkpoints.update(model_checkpoint)
checkpoints.update(optimizer_checkpoint)
return checkpoints
def _create_model(self, token_makers, checkpoint=None, helpers=None):
if checkpoint is None:
assert helpers is not None
first_key = next(iter(helpers))
helper = helpers[first_key] # get first helper
model_init_params = helper.get("model", {})
predict_helper = helper.get("predict_helper", {})
else:
model_init_params = checkpoint.get("init_params", {})
predict_helper = checkpoint.get("predict_helper", {})
model_params = {"token_makers": token_makers}
model_params.update(model_init_params)
model = self._create_by_factory(
ModelFactory, self.config.model, param=model_params
)
# Save params
model.init_params = model_init_params
model.predict_helper = predict_helper
if checkpoint is not None:
model = utils.load_model_checkpoint(model, checkpoint)
model = self._set_gpu_env(model)
return model
def _set_gpu_env(self, model):
if self.config.use_gpu:
cuda_devices = self._get_cuda_devices()
num_gpu = len(cuda_devices)
use_multi_gpu = num_gpu > 1
if use_multi_gpu:
model = torch.nn.DataParallel(model, device_ids=cuda_devices)
model.cuda()
else:
num_gpu = 0
num_gpu_state = str(num_gpu)
if num_gpu > 1:
num_gpu_state += " (Multi-GPU)"
# TODO: distributed training and 16-bits training (FP16)
logger.info(f"use_gpu: {self.config.use_gpu} num_gpu: {num_gpu_state}, distributed training: False, 16-bits training: False")
return model
[docs] def set_trainer(self, model, op_dict={}, save_params={}):
trainer_config = vars(self.config.trainer)
trainer_config["config"] = self.config_dict
trainer_config["model"] = model
trainer_config["learning_rate_scheduler"] = op_dict.get("learning_rate_scheduler", None)
trainer_config["exponential_moving_average"] = op_dict.get(
"exponential_moving_average", None
)
self.trainer = Trainer(**trainer_config)
# Set NSML
if nsml.IS_ON_NSML:
utils.bind_nsml(model, optimizer=op_dict.get("optimizer", None))
if getattr(self.config.nsml, "pause", None):
nsml.paused(scope=locals())
def _summary_experiments(self):
hr_text = "-" * 50
summary_logs = f"\n\n\nExperiment Summary. {nsml.SESSION_NAME}\n{hr_text}\n"
summary_logs += f"Config.\n{pretty_json_dumps(self.config_dict)}\n{hr_text}\n"
summary_logs += (
f"Training Logs.\n{pretty_json_dumps(self.trainer.training_logs)}\n{hr_text}\n"
)
summary_logs += f"Metric Logs.\n{pretty_json_dumps(self.trainer.metric_logs)}"
logger.info(summary_logs)
if self.config.slack_url: # pragma: no cover
simple_summary_title = f"Session Name: {nsml.SESSION_NAME} "
if getattr(self.config, "base_config", None):
simple_summary_title += f"({self.config.base_config})"
simple_summary_logs = f" - Dataset: {self.config.data_reader.dataset} \n"
simple_summary_logs += f" - Model: {self.config.model.name}"
best_metrics = {"epoch": self.trainer.metric_logs["best_epoch"]}
best_metrics.update(self.trainer.metric_logs["best"])
simple_summary_logs += f" - Best metrics.\n {pretty_json_dumps(best_metrics)} "
utils.send_message_to_slack(self.config.slack_url, title=simple_summary_title, message=simple_summary_logs)
[docs] def set_eval_mode(self):
"""
Evaluate Mode
- Pipeline
1. read raw_data (DataReader)
2. load vocabs from checkpoint (DataReader, Token)
3. indexing tokens (DataReader, Token)
4. convert to DataSet (DataReader)
5. create DataLoader (DataLoader)
6. define and load model
7. run!
"""
data_reader, token_makers = self._create_data_and_token_makers()
# DataReader
datas, helpers = data_reader.read()
# Token & Vocab
vocabs = utils.load_vocabs(self.model_checkpoint)
for token_name, token_maker in token_makers.items():
token_maker.set_vocab(vocabs[token_name])
text_handler = TextHandler(token_makers, lazy_indexing=False)
text_handler.index(datas, data_reader.text_columns)
# iterator
vocab = vocabs[next(iter(vocabs))]
datasets = data_reader.convert_to_dataset(datas, vocab, helpers=helpers) # with name
self.config.iterator.cuda_devices = self.config.cuda_devices
_, valid_loader, _ = self._create_by_factory(
DataLoaderFactory, self.config.iterator, param={"datasets": datasets}
)
# Model
model = self._create_model(token_makers, checkpoint=self.model_checkpoint)
self.set_trainer(model)
return valid_loader
[docs] def set_eval_inference_latency_mode(self):
"""
Evaluate Inference Latency Mode
- Pipeline
1. read raw_data (DataReader)
2. load vocabs from checkpoint (DataReader, Token)
3. define raw_to_tensor_fn (DataReader, Token)
4. define and load model
5. run!
"""
data_reader, token_makers = self._create_data_and_token_makers()
# Token & Vocab
vocabs = utils.load_vocabs(self.model_checkpoint)
for token_name, token_maker in token_makers.items():
token_maker.set_vocab(vocabs[token_name])
text_handler = TextHandler(token_makers, lazy_indexing=False)
_, helpers = data_reader.read()
raw_examples = helpers["valid"]["examples"]
cuda_device = self.config.cuda_devices[0] if self.config.use_gpu else None
raw_to_tensor_fn = text_handler.raw_to_tensor_fn(data_reader, cuda_device=cuda_device)
# Model
model = self._create_model(token_makers, checkpoint=self.model_checkpoint)
self.set_trainer(model)
return raw_examples, raw_to_tensor_fn
[docs] def predict(self, raw_features):
if self.predict_settings is None:
raise ValueError(
"To use 'predict()', you must call 'set_predict_mode()' first, with preload=True parameter"
)
raw_to_tensor_fn = self.predict_settings["raw_to_tensor_fn"]
arguments = self.predict_settings["arguments"]
arguments.update(raw_features)
assert raw_features is not None
assert raw_to_tensor_fn is not None
return self.trainer.predict(
raw_features,
raw_to_tensor_fn,
arguments,
interactive=arguments.get("interactive", False),
)
[docs] def set_predict_mode(self, preload=False):
"""
Predict Mode
- Pipeline
1. read raw_data (Argument)
2. load vocabs from checkpoint (DataReader, Token)
3. define raw_to_tensor_fn (DataReader, Token)
4. define and load model
5. run!
"""
data_reader, token_makers = self._create_data_and_token_makers()
# Token & Vocab
vocabs = utils.load_vocabs(self.model_checkpoint)
for token_name, token_maker in token_makers.items():
token_maker.set_vocab(vocabs[token_name])
text_handler = TextHandler(token_makers, lazy_indexing=False)
# Set predict config
if self.argument.interactive:
raw_features = {feature_name: "" for feature_name in data_reader.text_columns}
else:
raw_features = {}
for feature_name in data_reader.text_columns:
feature = getattr(self.argument, feature_name, None)
# if feature is None:
# raise ValueError(f"--{feature_name} argument is required!")
raw_features[feature_name] = feature
cuda_device = self.config.cuda_devices[0] if self.config.use_gpu else None
raw_to_tensor_fn = text_handler.raw_to_tensor_fn(
data_reader,
cuda_device=cuda_device,
helper=self.model_checkpoint.get("predict_helper", {})
)
# Model
model = self._create_model(token_makers, checkpoint=self.model_checkpoint)
self.set_trainer(model)
arguments = vars(self.argument)
if preload:
self.predict_settings = {"raw_to_tensor_fn": raw_to_tensor_fn, "arguments": arguments}
else:
return raw_features, raw_to_tensor_fn, arguments