MaskrcnnBenchmark 源码解析-训练/推演核心代码(engine)

源码文件

./maskrcnn_benchmark/engine/trainer.py
./maskrcnn_benchmark/engine/inference.py

模型测试/推演时, 无需计算每个参数的梯度, 因此会大大减少 GPU 的显存占用空间

trainer.py 文件概览

本文件中的代码是用来定义模型训练的步骤的, 当我们确定了模型, 数据集载入器, 优化器, 更新策略等相关组件以后, 就可以调用该文件中的函数来进行模型训练, 下面先简单看一下本文件的函数概览.

1
2
3
4
5
6
7
8
9
10
11
12
# ./maskrcnn_benchmark/engine/trainer.py

# 导入各种包及函数
import torch

def reduce_loss_dict(loss_dict):
# 计算 reduce 后的损失函数字典
# ...

def do_train(...):
# 模型训练核心代码
# ...

trainer 导入各种包及函数

下面是本文件导入的包及函数, 我们会对这些包进行简要的介绍, 并且会给出详细解析的博文链接.

1
2
3
4
5
6
7
8
9
10
11
12
13
# ./maskrcnn_benchmark/engine/trainer.py

# 常规包
import datetime
import logging
import time

import torch
import torch.distributed as dist # 分布式相关

from maskrcnn_benchmark.utils.comm import get_world_size

from maskrcnn_benchmark.utils.metric_logger import MetricLogger

get_world_size: 该函数封装了 troch.distributed.get_world_size() 函数, 位于 ./maskrcnn_benchmark/utils/comm.py 文件中, 详细解析请看comm
MetricLogger: 该类中包含了同文件的 SmoothedValue 类作为其数据成员, SmoothedValue 类用于跟踪一系列的值, 同时还会提供访问这些值的滑动平均值或全局平均值, 而位于 MetricLogger 定义了将这些值打印到屏幕或保存到文件中的代码, 这两个类位于 ./maskrcnn_benchmark/utils.metric_logger 文件中, 详细解析请看metric_logger

trainer.do_train() 模型训练核心函数

我们按照函数的一般调用顺序, 首先来看看模型训练的核心逻辑代码的实现, 具体如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# ./maskrcnn_benchmark/engine/trainer.py

def do_train(
model, # 从 build_detection_model 函数得到的模型对象
data_loader, # PyTorch 的 DataLoader 对象, 对应自己的数据集
optimizer, # torch.optim.sgd.SGD 对象
scheduler, # 学习率的更新策略, 封装在 solver/lr_scheduler.py 文件中
checkpointer, # DetectronCheckpointer, 用于自动转换 Caffe2 Detectron 的模型文件
device, # torch.device, 指定训练的设备
checkpoint_period, # 整形, 指定了模型的保存迭代间隔, 默认为 2500
arguments, # 额外的其他参数, 字典类型, 一般情况只有 arguments[iteratioin], 初值为0
):
# 记录日志信息
logger = logging.getLogger("maskrcnn_benchmark.trainer")
logger.info("Start training")

# 用于记录一些变量的滑动平均值和全局平均值
meters = MetricLogger(delimiter=" ") # delimiter为定界符, 这里用两个空格作为定界符

# 数据载入器重写了 len 函数, 使其返回载入器需要提供batch的次数, 即 cfg.SOLVER.MAX_ITER
max_iter = len(data_loader)

start_iter = arguments["iteration"] # 默认为0, 但是会根据载入的权重文件, 变成其他值.

model.train() # 将 model 的模式置为 train, train() 函数的参数 mode 默认值为True.

start_training_time = time.time()
end = time.time() # 计时

# 遍历 data_loader, 第二个参数是设置序号的开始序号,
# data_loader 的返回值为(images, targets, shape)
for iteration, (images, targets, _) in enumerate(data_loader, start_iter):
data_time = time.time() - end # 获取一个 batch 所需的时间
iteration = iteration + 1
arguments["iteration"] = iteration

scheduler.step() # 更新一次学习率

images = images.to(device) # 把 images 移动到指定设备上
targets = [target.to(device) for target in targets] # 移动到指定设备上

loss_dict = model(images, targets) # 根据 images 和 targets 计算 loss

losses = sum(loss for loss in loss_dict.values()) # 将各个loss合并

# 根据 GPUs 的数量对 loss 进行 reduce, 详细请看关于 reduce_loss_dict() 函数的解析
loss_dict_reduced = reduce_loss_dict(loss_dict)
losses_reduced = sum(loss for loss in loss_dict_reduced.values) # 合并loss
meters.update(loss=losses_reduced, **loss_dict_reduced) # 更新滑动平均值

optimizer.zero_grad() # 清除梯度缓存
losses.backward() # 计算梯度
optimizer.step() # 更新参数

batch_time = time.time()-end # 进行一次 batch 所需时间
end = time.time()
meters.update(time=batch_time, data=data_time)

# 根据时间的滑动平均值计算大约还剩多长时间结束训练
eta_seconds = meters.time.global_avg * (max_iter - iteration)
eta_string = str(datetime.timedelta(seconds=int(eta_seconds)))

# 每经过20次迭代, 输出一次训练状态
if iteration % 20 == 0 or iteration == max_iter:
logger.info(
meters.delimiter.join(
[
"eta: {eta}",
"iter: {iter}",
"{meters}",
"lr: {lr:.6f}",
"max mem: {memory:.0f}",
]
).format(
eta=eta_string,
iter=iteration,
meters=str(means),
lr=optimizer.param_groups[0]["lr"],
memory=torch.cuda.max_memory_allocated() / 1024.0 / 1024.0,
)
)
# 每经过 checkpoint_period 次迭代后, 就将模型保存
if iteration % checkpoint_period == 0:
checkpointer.save("model_{:07d}".format(iteration), **arguments)
# 达到最大迭代次数后, 也进行保存
if iteration == max_iter:
checkpointer.save("model_final", **arguments)

# 输出总的训练耗时
total_training_time = time.time() - start_training_time
total_time_str = str(datetime.timedelta(seconds=total_training_time))
logger.info(
"Total training time: {} ({:.4f} s / it)".format(
total_time_str, total_training_time / (max_iter)
)
)

trainer.reduce_loss_dict() 计算 reduced_loss

下面的代码会计算多 GPU 时的 reduce loss, 直接来看代码解析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# ../maskrcnn_benchmark/engine/trainer.py

def reduce_loss_dict(loss_dict):
# 对loss进行reduce, 使其可以利用 rank 0进行处理

world_size = get_world_size()
if world_size < 2: # 单 GPU, 直接返回, 无需reduce
return loss_dict

with torch.no_grad(): # 不要计算任何参数的梯度
loss_names = []
all_losses = []
for k, v in loss_dict.items():
loss_names.append(k) # 获取键
all_losses.append(v) # 获取值

# 将列表中的 loss 连接起来组成一个一维的tensor, tensor的每个元素代表一个 loss.
all_losses = torch.stack(all_losses, dim=0)

# import torch.distributed as dist
dist.reduce(all_losses, dst=0)
if dist.get_rank() == 0:
# only main process gets accumulated, so only divide by
# world_size in this case
call_losses /= world_size
reduced_losses = {k: v for k, v in zip(loss_nams, all_losses)}
return reduced_losses

inference.py 文件概览

该文件定义了模型推演时的代码逻辑, 在拥有预训练的模型文件时, 可以通过调用本文件的函数来进行模型测试或模型推演, 文件的函数概览如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# ./maskrcnn_benchmark/engine/inference.py

# 导入各种包及函数
import torch
# ...

def compute_on_dataset(model, data_loader, device):
# 计算结果
# ...

def _accumulate_predictions_from_multiple_gpus(predictions_per_gpu):
# 累积预测
# ...

def inference(...):
# 模型测试/推演核心代码
# ...

inference 导入各种包及函数

我们首先来看看该文件导入了哪些包及函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# ./maskrcnn_benchmark/engine/inference.py

# 导入常规包
import datetime
import logging
import time
import

import torch
from tqdm import tqdm

# 导入评价函数, 包含 coco_evaluation 和 voc_evaluation
from maskrcnn_benchmark.data import datasets
from ..utils.comm import is_main_process
from ..utils.comm import scatter_gather
from ..utils.comm import synchronize

evaluate: 该函数封装了 coco_evaluationvoc_evaluation, 具体解析可以查看data.
其余三个函数都来自 comm.py 文件, 详情着看 comm

inference.inference() 模型测试/推演核心代码

下面我们按照函数的调用顺序, 先来看看模型测试/推演的核心代码, 解析如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# ./maskrcnn_benchmark/engine/inference.py
def inference(
model, # 从 build_detection_model 函数得到的模型对象
data_loader, # PyTorch 的 DataLoader 对象, 对应自定义的数据集
dataset_name, # str, 数据集的名字
iou_types=("bbox",), # iou的类型, 默认为 bbox
box_only=False, # cfg.MODEL.RPN_ONLY="False"
device="cuda", # cfg.MODEL.DEVICE="cuda"
expected_results=(), # cfg.TEST.EXPECTED_RESULTS=[]
expected_results_sigma_tol=4, # cfg.TEST.EXPECTED_RESULTS_SIGMA_TOL=4
output_folder=None, # 自定义输出文件夹
):
# 获取设备
device = torch.device(divice)
num_devices = (
torch.distributed.get_world_size()
if torch.distributed.is_initialized
else 1
)

# 日志信息
logger = logging.getLogger("maskrcnn_benchmark.inference")

dataset = data_loader.dataset # 自定义的数据集类, 如 coco.COCODataset

logger.info("Start evaluation on {} dataset({} images).".format(dataset_name, len(dataset)))

# 开始计时
start_time = time.time()

# 调用本文件的函数, 获得预测结果, 关于该函数的解析可看后文
predictions = compute_on_dataset(model, data_loader, device)

# 调用下面的语句, 使得等到所有的进程都结束以后再计算总耗时
synchronize()

# 计算总耗时记入log
total_time = time.time() - start_time
total_time_str = str(datetime.timedelta(seconds=total_time))
logger.info(
"Total inference time: {} ({} s / img per device, on {} devices)".format(
total_time_str, total_time * num_devices / len(dataset), num_devices
)
)

# 调用本文件的函数, 将所有GPU设备上的预测结果累加并返回
predictions = _accumulate_predictions_from_multiple_gpus(predictions)

if output_folder:
# 将结果保存
torch.save(predictions, os.path.join(output_folder, "predictions.pth"))

extra_args = dict(
box_only=box_only,
iou_types=iou_types,
expected_results=expected_results,
expected_results_sigma_tol=expected_results_sigma_tol,
)

# 调用评价函数, 返回预测结果的质量
return evaluate(
dataset=dataset,
predictions=predictions,
output_folder=output_folder,
**extra_args
)

inference.compute_on_dataset() 计算结果

在上面的函数中, 调用了 compute_on_dataset() 函数来获得预测结果, 该函数的主要逻辑就是利用训练好的模型来预测模型的输出, 然后再将放在字典里返回, 代码解析如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# ./maskrcnn_benchmark/engine/inference.py

def compute_on_dataset(model, data_loader, device):
model.eval() # 将模型状态置于eval, 主要影响 dropout, BN 等操作的行为
results_dict = {}
cpu_device = torch.device("cpu")
for i, batch in enumerate(tqdm(data_loader)):
images, targets, image_ids = batch
images = images.to(device) # 将图片移动至 gpu 上(默认device="cuda")
with torch.no_grad(): # 使用model运算时, 不用计算梯度
output = model(images)
output = [o.to(cpu_device) for o in output] # 将计算结果转移到cpu上

# 更新结果字典
results_dict.update(
{img_id: result for img_id, result in zip(image_ids, output)}
)
return results_dict

inference._accumulate_predictions_from_multiple_gpus()

由于 MaskrcnnBenchmark 默认是支持分布式的, 因此可以在多个 GPU 上计算结果, 但是最终需要把所有的结果都合并起来, 这正是 _accumulate_predictions_from_multiple_gpus() 的功能, 函数解析如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# ./maskrcnn_benchmark/engine/inference.py

def _accumulate_predictions_from_multiple_gpus(predictions_per_gpu):
# from ..utils.comm import scatter_gather
all_predictions = scatter_gather(predictions_per_gpu)
if not is_main_process():
return
# merge the list of dicts
predictions = {}
for p in all_predictions:
predictions.update(p)
# convert a dict where the key is the index in a list
image_ids = list(sorted(predictions.keys()))
if len(image_ids) != image_ids[-1] + 1:
logger = logging.getLogger("maskrcnn_benchmark.inference")
logger.warning(
"Number of images that were gathered from multiple processes is not "
"a contiguous set. Some images might be missing from the evaluation"
)

# convert to a list
predictions = [predictions[i] for i in image_ids]
return predictions