MaskrcnnBenchmark 源码解析-模型定义(modeling)之RPN网络

源码文件

在 Faster R-CNN 中, 首次提出了 RPN 网络, 该网络用于生成目标检测任务所需要候选区域框, 在 MaskrcnnBenchmark 中, 关于 RPN 网络的定义位于 ./maskrcnn_benchmark/modeling/rpn/ 文件夹中, 该文件夹包含以下四个文件:

class GeneralizedRCNN(nn.Module) 类中, 会通过 self.rpn = build_rpn(cfg) 函数来创建 RPN 网络, 该函数位于 ./maskrcnn_benchmark/modeling/rpn/rpn.py 文件中, 下面我们就先来看看该文件的内部实现.

rpn.py 区域候选框网络

rpn.py 文件中的最后, 定义了 build_fpn() 函数, 如下所示:

1
2
def build_fpn(cfg):
return RPNModule(cfg)

可以看出, 构建 RPN 网络的核心定义在 class RPNModule 中, 该类的定义如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# ./maskrcnn_benchmark/modeling/rpn/rpn.py

import torch
import torch.nn.functional as F
from torch import nn

from maskrcnn_benchmark.modeling import registry
from maskrcnn_benchmark.modeling.box_coder import BoxCoder
from .loss import make_rpn_loss_evaluator
from .anchor_generator import make_anchor_generator
from .inference import make_rpn_postprocessor

@registry.RPN_HEADS.register("SingleConvHead")
class RPNHead(nn.Module):
# 利用分类层和回归层添加一个简单的 RPN heads
# ...

class RPNModule(torch.nn.Module):
# 用于 RPN 计算的 Module.
# 从 backbone 中获取特征图谱用于计算
def __init__(self, cfg):#
super(RPNModule, self).__init__()

self.cfg = cfg.clone()

# from .anchor_generator import make_anchor_generator
# 根据配置文件的信息输出对应的 anchor, 详细的实现逻辑需要查看 anchor_generator.py文件
anchor_generator = make_anchor_generator(cfg)

# cfg.MODEL.BACKBONE.OUT_CHANNELS = 256 (默认为256*4), 即stage2的输出通道数
in_channels = cfg.MODEL.BACKBONE.OUT_CHANNELS

# 创建 rpn heads, 具体的代码解析请看 class RPNHead
# cfg.MODEL.RPN.RPN_HEAD = "SingleConvRPNHead"
rpn_head = registry.RPN_HEADS[cfg.MODEL.RPN.RPN_HEAD]

# num_anchors_per_location() 是 AnchorGenerator 的成员函数, 具体解析请看后文
head = rpn_head(
cfg, in_channels, anchor_generator.num_anchors_per_location()[0]
)

# from maskrcnn_benchmark.modeling.box_coder import BoxCoder
# class BoxCoder(object) 类定义在 ./maskrcnn_benchmark/modeling/box_coder.py 文件中
# 其主要功能是将 bounding boxes 的表示形式编码成易于训练的形式(详细信息可查看R-CNN原文)
rpn_box_coder = BoxCoder(weights=(1.0, 1.0, 1.0, 1.0))

# from .inference import make_rpn_postprocessor
# 根据配置信息对候选框进行后处理, 选取合适的框用于训练
box_selector_train = make_rpn_postprocessor(cfg, rpn_box_coder, is_train=Fasle)
# 选取合适的框用于测试
box_selector_test = make_rpn_postprocessor(cfg, rpn_box_coder, is_train=False)

# 利用得到的box获取损失函数
loss_evaluator = make_rpn_loss_evaluator(cfg, rpn_box_coder)

# 设置相应的成员
self.anchor_generator = anchor_generator
self.head = head
self.box_selector_train = box_selector_train
self.box_selector_test = box_selector_test
self.loss_evaluator = loss_evaluator

# 定义前向传播过程
def forward(self, images, features, targets=None):
# images (ImageList):
# features (list[Tensor]):
# targets (list[BoxList]):

# 返回值:
# boxes (list[BoxList]):
# losses (dict[Tensor]):

# 利用给定的特征图谱计算相应的 rpn 结果
objectness, rpn_box_regression = self.head(features)

# 在图片上生成 anchors
anchors = self.anchor_generator(images, features)

# 当处在训练状态时, 调用 _foward_train(), 当处在推演状态时, 调用 _forward_test()
if self.training:
return self._forward_train(anchors, objectness, rpn_box_regression, targets)
else:
self._forward_test(anchors, objectness, rpn_box_regression)

# 训练状态时的前向传播函数
def _forward_train(self, anchors, objectness, rpn_box_regression, targets):
if self.cfg.MODEL.RPN_ONLY:
# 当处在 rpn-only 的训练模式时, 网络的 loss 仅仅与rpn的 objectness 和
# rpn_box_regression values 有关, 因此无需将 anchors 转化成 boxes
boxes = anchors
else:
# 对于 end-to-end 模型来说, anchors 必须被转化成 boxes,
# 然后采样到目标检测网络的 batch 中用于训练, 注意此时不更新网络参数
with torch.no_grad():
boxes = self.box_selector_train(
anchors, objectness, rpn_box_regression, targets
)

# 获取损失函数的结果
loss_objectness, loss_rpn_box_reg = self.loss_evaluator(
anchors, objectness, rpn_box_regression, targets
)

# 创建一个loss字典
losses = {
"loss_objectness": loss_objectness,
"loss_rpn_box_reg": loss_rpn_box_reg,
}

return boxes, losses

# 测试状态时的前向传播函数
def _forward_test(self, anchors, objectness, rpn_box_regression):
# 将 anchors 转化成对应的 boxes
boxes = self.box_selector_test(anchors, objectness, rpn_box_regression)

if self.cfg.MODEL.RPN_ONLY:
# 对于 end-to-end 模型来说, RPN proposals 仅仅只是网络的一个中间状态,
# 我们无需将它们以降序顺序存储, 直接返回 FPN 结果即可
# 但是对于 RPN-only 模式下, RPN 的输出就是最终结果, 我们需要以置信度从高
# 到低的顺序保存结果并返回.
inds = [
box.get_field("objectness").sort(descending=True)[1] for box in boxes
]
boxes = [box[ind] for box, ind in zip(boxes, inds)]

return boxes, {}

class RPNModule 中, 使用了 class RPNHead 作为其头部, 下面我们就来看一下该类的定义及实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# ../maskrcnn_benchmark/modeling/rpn/rpn.py

@registry.RPN_HEADS.register("SingleConvRPNHead")
class RPNHead(nn.Module):
# 添加 classification 和 regression heads

def __init__(self, cfg, in_channels, num_anchors):
# cfg: 配置信息
# in_channels (int): 输入特征的通道数
# num_anchors (int): 需要预测的 anchors 的数量

super(RPNHead, self).__init__()

# 维持通道数不变
self.conv = nn.Conv2d(
in_channels, in_channels, kernel_size=3, stride=1, padding=1
)

# objectness 预测层, 输出的 channels 数为 anchors 的数量.(每一点对应 k 个 anchors)
self.cls_logits = nn.Conv2d(in_channels, num_anchors, kernel_size=1, stride=1)

# 预测 box 回归的网络层
self.bbox_pred = nn.Conv2d(
in_channels, num_anchors * 4, kernel_size=1, stride=1
)

# 对定义的网络层参数进行初始化
for l in [self.conv, self.cls_logits, self.bbox_pred]:
torch.nn.init.normal_(l.weight, std=0.01)
torch.nn.init.constant_(l.bias, 0)

# 定义 rpn head 的前向传播过程
def forward(self, x):
logits = []
bbox_reg = []
for feature in x:
# 先执行卷积+激活
t = F.relu(self.conv(feature))

# 根据卷积+激活后的结果预测objectness
logits.append(self.cls_logits(t))

# 根据卷积+激活后的结果预测 bbox
bbox_reg.append(self.bbox_pred(t))

return logits, bbox_reg

在定义 RPNModule 时, 分别使用了 make_anchor_generator(), make_rpn_postprocessor()make_rpn_loss_evaluator() 函数来构建模型的 anchor_generator, box_selector 以及 loss_evaluator, 这三个函数分别定义在其他的三个文件中, 下面我们就根据函数的调用顺序, 对这几个文件展开解析.

anchor_generator.py 生成 anchors

首先是 make_anchor_generator() 函数, 该函数定义在 rpn/ 文件夹下的 anchor_generator.py 文件中, 由于该文件内容较多, 因此, 我们先看一下文件的整体概览, 了解一下文件中都包含了哪些类和函数.

文件概览

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# ./maskrcnn_benchmark/modeling/rpn/anchor_generator.py

# 包的导入
from maskrcnn_benchmark.structures.bounding_box import BoxList
# ...

class BufferList(nn.Module):
# 和 nn.ParameterList 差不多, 但是是针对 buffers 的

def __init__(self, buffers=None):
# 初始化函数
# ...

def extend(self, buffers):
# buffer 扩展
# ...

def __len__(self):
# 获取 buffer 长度
return len(self._buffers)

def __iter__(self):
# buffer 迭代器
return iter(self._buffers.values())

class AnchorGenerator(nn.Module):
# 对于给定的一系列 image sizes 和 feature maps, 计算对应的 anchors

def __init__(...):
# 初始化函数
# ...

def num_anchors_per_location(self):
# 获取每个位置的 anchors 数量
return [len(cell_anchors) for cell_anchors in self.cell_anchors]

def grid_anchors(self, grid_sizes):
# 获取 anchors
# ...

def add_visibility_to(self, boxlist):
# ???
# ...

def forward(self, image_list, feature_maps):
# 定义前向传播过程
# ...


def make_anchor_generator(config):
# 根据配置信息创建 AnchorGenerator 对象实例
# ...

def generator_anchors(...):
# 根据给定的 stride, sizes, aspect_ratio 等参数返回一个 anchor box 组成的矩阵
# ...

def _generate_anchors(base_size, scales, aspect_ratios):
# 返回 anchor windows ??
# ...

def _whctrs(anchor):
# 返回某个 anchor 的宽高以及中心坐标
# ...

def _mkanchors(ws, hs, x_ctr, y_ctr):
# 给定关于一系列 centers 的宽和高, 返回对应的 anchors
# ...

make_anchor_generator() 函数

由于外部文件往往通过 make_anchor_generator(config) 函数来获取对应的 anchors, 因此, 我们就从这个函数入手解析, 代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def make_anchor_generator(config):

# 定义了 RPN 网络的默认的 anchor 的面积大小
# 默认值为: (32, 64, 128, 256, 512)
anchor_sizes = config.MODEL.RPN.ANCHOR_SIZES

# 定义了 RPN 网络 anchor 的高宽比
# 默认值为: (0.5, 1.0, 2.0)
aspect_ratios = config.MODEL.RPN.ASPECT_RATIOS

# 定义了 RPN 网络中 feature map 采用的 stride,
# 对于 FPN 来说, strides 的值应该与 scales 的值匹配
# 默认值为: (16,)
anchor_stride = config.MODEL.RPN.ANCHOR_STRIDE

# 移除那些超过图片 STRADDLE_THRESH 个像素大小的 anchors, 起到剪枝作用
# 默认值为 0, 如果想要关闭剪枝功能, 则将该值置为 -1 或者一个很大的数, 如 100000
straddle_thresh = config.MODEL.RPN.STRADDLE_THRESH

if config.MODEL.RPN.USE_FPN:
# 当使用 fpn 时, 要确保rpn与fpn的相关参数匹配
assert len(anchor_stride) == len(
anchor_sizes
), "FPN should have len(ANCHOR_STRIDE) == len(ANCHOR_SIZES)"
else:
assert len(anchor_stride) == 1, "Non-FPN should have a single ANCHOR_STRIDE"

# 当获取到相关的参数以后, 创建一个 AnchorGenerator 实例并将其返回
anchor_generator = AnchorGenerator(
anchor_sizes, aspect_ratios, anchor_stride, straddle_thresh
)
return anchor_generator

AnchorGenerator 类

根据上面的函数我们知道, make_anchor_generator(config) 函数会根据对应的配置文件创建一个 AnchorGenerator 的实例, 因此, 我们下面就对 class AnchorGenerator(nn.Module) 类进行解析, 代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# ./maskrcnn_benchmark/modeling/rpn/anchor_generator.py

class AnchorGenerator(nn.Module):
# 对于给定的 image sizes 和 feature maps, 计算对应的 anchors

def __init__(
self,
sizes=(128, 256, 512),
aspect_ratios=(0.5, 1.0, 2.0)
anchor_strides=(8, 16, 32),
straddle_thresh=0,
):
super(AnchorGenerator, self).__init__()

if len(anchor_strides) == 1:
# 如果 anchor_strides 的长度为1, 说明没有 fpn 部分, 则直接调用相关函数
anchor_stride = anchor_strides[0]

# 此处调用了本文件的 generate_anchors 函数, 详解见后文
cell_anchors = [
generate_anchors(anchor_stride, sizes, aspect_ratios).float()
]
else:
if len(anchor_strides) != len(sizes):
raise RuntimeError("FPN should have #anchor_strides == #sizes")

# 调用 generate_anchors 函数
cell_anchors = [
generate_anchors(anchor_stride, (size,), aspect_ratios).float()
for anchor_stride, size in zip(anchor_strides, sizes)
]

# 将 strides, cell_anchors, straddle_thresh 作为 AnchorGenerator 的成员
self.strides = anchor_strides
self.cell_anchors = BufferList(cell_anchors) # 使用了 BufferList 类
self.straddle_thresh = straddle_thresh

# 返回每一个 location 上对应的 anchors 的数量
def num_anchors_per_location(self):
return [len(cell_anchors) for cell_anchors in self.cell_anchors]

# 用于生成所有特征图谱的 anchors, 会被 forward 函数调用.
def grid_anchors(self, grid_sizes):
# 创建一个空的 anchors 列表
anchors = []

# 针对各种组合
for size, stride, base_anchors in zip(
grid_sizes, self.strides, self.cell_anchors
):
# 获取 grid 的尺寸和 base_anchors 的 device
grid_height, grid_width = size
device = base_anchors.device

# 按照步长获取偏移量
shifts_x = torch.arange(
0, grid_width * stride, step=stride, dtype=torch.float32, device=device
)
# 获取 y 的偏移量
shifts_y = torch.arange(
0, grid_height * stride, step=stride, dtype=torch.float32, device=device
)

# 创建关于 shifts_y, shifts_x 的 meshgrid
shift_y, shift_x = torch.meshgrid(shifts_y, shifts_x)

# 将二者展开成一维
shift_x = shift_x.reshape(-1)
shift_y = shift_y.reshape(-1)

shifts = torch.stack((shift_x, shift_y, shift_x, shift_y), dim=1)

anchors.append(
(shifts.view(-1, 1, 4) + base_anchors.view(1, -1, 4)).reshape(-1, 4)
)
return anchors

def add_visibility_to(self, boxlist):
## TODO... 解析


def forward(self, image_list, feature_maps):
# TODO... 解析

根据参数生成 anchors

class AnchorGenerator 中, 利用了 generate_anchors() 函数来生成对应的 anchors, 该函数是生成 anchors 的入口函数, 在生成 anchors 时, 需要进行一些计算和转换, 其大致流程和对应的实现函数如下所示

  1. 获取生成 anchors 必要的参数, 包括: stride, sizes, 和 aspect_ratios, 其中, stride 代表特征图谱上的 anchors 的基础尺寸, sizes 代表 anchor 对应在原始图片中的大小(以像素为单位), 因此, 我们容易知道 anchor 在特征图谱上的放缩比例为 sizes/stride, aspect_ratios 代表 anchors 的高宽比, 于是, 最终返回的 anchors 的数量就是 sizes (在特征图谱上固定 base_window 的尺寸, 根据比例的不同来对应不同大小的物体)的数量和 aspect_ratios 数量的乘积;
  2. 在获取特征图谱上对应的 base_size(stride)后, 我们将其表示成 [x1, y1, x2, y2](坐标是相对于 anchor 的中心而言的) 的 box 形式. 例如对于 stride=4 的情况, 我们将其表示成 [0, 0, 3, 3], 此部分的实现位于 _generate_anchors(...) 函数中
  3. 然后根据 aspect_ratios 的值来获取不同的 anchor boxes 的尺寸, 例如, 对于 stride=4 的 base_anchor 来说, 如果参数 aspect_ratios[0.5, 1.0, 2.0], 那么它就应该返回面积不变, 但是高宽比分别为 [0.5, 1.0, 2.0] 的三个 box 的坐标, 也就是应该返回下面的 box 数组(注意到这里 box 的比例实际上是 [5/2, 1, 2/5], 并不是绝对符合 aspect_ratios, 这是因为像素点只能为整数, 后面还能对这些坐标取整). 这部分的实现位于 _ratio_enum() 函数中;

    1
    2
    3
    [[-1.   0.5  4.   2.5]
    [ 0. 0. 3. 3. ]
    [ 0.5 -1. 2.5 4. ]]
  4. 在获取到不同比例的特征图谱上的 box 坐标以后, 我们就该利用 scales = sizes/stride 来将这些 box 坐标映射到原始图像中, 也就是按照对应的比例将这些 box 放大, 对于我们刚刚举的例子 scales = 32/4 = 8 来说, 最终的 box 的坐标如下所示. 这部分的代码实现位于 _scale_num() 函数中.

    1
    2
    3
    [[-22., -10.,  25.,  13.],
    [-14., -14., 17., 17.],
    [-10., -22., 13., 25.]]

代码解析如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# ./maskrcnn_benchmark/modeling/rpn/anchor_generator.py

def generate_anchors(
stride=16, sizes=(32, 64, 128, 256, 512), aspect_ratios=(0.5, 1, 2)
):
# 该函数会生成一个 anchor boxes 列表, 列表中的元素为以 (x1, x2, y1, y2) 形式表示的 box
# 这些 box 的坐标是相对于 anchor 的中心而言的, 其大小为 sizes 数组中元素的平方
# 这里的默认参数对应的是使用 resnet-C4 作为 backbone 的 faster_rcnn 模型
# 如果使用了 FPN, 则不同的 size 会对应到不同的特征图谱上, 下面我们利用 FPN 的参数来讲解代码
# fpn 第一阶段参数值为:(注意sizes必须写成元组或列表的形成)
# stride=4, sizes=(32,), aspect_ratios=(0.5, 1, 2)

return _generate_anchors( # 调用 _generate_anchors() 函数
stride, # stride=4
np.array(sizes, dtype=np.float) / stride, # sizes / stride = 32 / 4 = 8
np.array(aspect_ratios, dtype=np.float), # [0.5, 1, 2]
)




def _generate_anchors(base_size, scales, aspect_ratios):
# 根据调用语句知, 参数值分别为: 4, 8, [0.5, 1, 2]

# 首先得到 anchor 的 base box 坐标(相对于 anchor 中心而言), [0, 0, 3, 3]
anchor = np.array([1, 1, base_size, base_size], dtype=np.float) - 1

# 根据 base_box 和给定的高宽比, 得到拥有不同高宽比的 anchors,
# 此处是使 anchor 的比例转化成 [0.5, 1, 2], 对应的 box 为:
#[[-1. 0.5 4. 2.5]
# [ 0. 0. 3. 3. ]
# [ 0.5 -1. 2.5 4. ]]
# 注意到这里的 box 的比例实际为 [5/2, 1, 2/5], 具体原理可查看 _ratio_enum() 函数解析
anchors = _ratio_enum(anchor, aspect_ratios)

# 得到不同高宽比的 anchors 以后, 按照给定的比例(scales)将其缩放到原始图像中,
# 此处 scales 的值只有一个, 即为 8, 因此, 将上面的 boxes 放大 8 倍(指的是宽高各放大 8 倍, 故总面积会放大64倍), 得到新的 boxes 坐标如下:
#[[-22., -10., 25., 13.],
# [-14., -14., 17., 17.],
# [-10., -22., 13., 25.]]
# 这里的 vstack 用于将 3 个 1×4 的数组合并成一个 3×4 的数组, 如上所示.
# anchors[i, :] 代表的是一个 box 的坐标, 如: [-1. 0.5 4. 2.5]
anchors = np.vstack(
[_scale_enum(anchors[i, :], scales) for i in range(anchors.shape[0])]
)

# 将numpy数组转换成tensors, 然后返回, anchor的 shape 为: (n, 4), 其中 n 为 anchors 的数量
return torch.from_numpy(anchors)

在上面的函数上, 分别使用了 _ratio_enum()_scale_enum() 函数来实现高宽比和放缩比的组合, 下面, 我们就先对这两个函数进行解析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
./maskrcnn_benchmark/modeling/rpn/anchor_generator.py

def _ratio_enum(anchor, ratios):
# 该函数按照给定的 ratios 将 base anchor 转化成具有不同高宽比的多个 anchor boxes, 假设:
# anchor: [0. 0. 3. 3.]
# ratios: [0.5, 1.0, 2.0]

# 获取 anchor 的宽, 高, 以及中心点的坐标
w, h, x_ctr, y_ctr = _whctrs(anchor)

# 获取 anchor 的面积
size = w * h

# 根据高宽比获取 size_ratios 变量, 后续会用该变量对 box 的高宽比进行转化
size_ratios = size / ratios

# ws = sqrt(size) / sqrt(ratios)
# hs = sqrt(size) * sqrt(ratios)
# 高宽比 = hs/ws = sqrt(ratios) * sqrt(ratios) = ratios
# round 代表四舍五入
ws = np.round(np.sqrt(size_ratios))
hs = np.round(ws * ratios)

# 根据新的 w 和 h, 生成新的 box 坐标(x1, x2, y1, y2) 并将其返回
anchors = _mkanchors(ws, hs, x_ctr, y_ctr)
return anchors

接下来看看对放缩比进行遍历的函数 _scale_enum() 的代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
def _scale_enum(anchor, scales):
# anchor: [-1. 0.5 4. 2.5] (举例)
# scales: 8

# 获取 anchor 的宽, 高, 以及中心坐标
w, h, x_ctr, y_ctr = _whctrs(anchor)

# 将宽和高各放大8倍
ws = w * scales
hs = h * scales

# 根据新的宽, 高, 中心坐标, 将 anchor 转化成 (x1, x2, y1, y2) 的形式
return anchors

_ratio_enum()_scale_enum() 函数中, 都使用了 _whctrs()_mkanchors 函数, 前者可以根据 box 的坐标信息得到 box 的宽高以及中心点坐标, 后者则是根据宽高以及中心点坐标得到 box 的 (x1, y1, x2, y2) 形式, 这两个函数的代码解析如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
./maskrcnn_benchmark/modeling/rpn/anchor_generator.py

def _whctrs(anchor):
# 根据左上角和右下角坐标返回该 box 的宽高以及中心点坐标
w = anchor[2] - anchor[0] + 1
h = anchor[3] - anchor[1] + 1
x_ctr = anchor[0] + 0.5 * (w - 1)
y_ctr = anchor[1] + 0.5 * (h - 1)
return w, h, x_ctr, y_ctr

def _mkanchors(ws, hs, x_ctr, y_ctr):
# 将给定的宽, 高以及中心点坐标转化成 (x1, y1, x2, y2) 的坐标形式

# 这里新增加了一个维度, 以便可以是有 hstack 将结果叠加.
ws = ws[:, np.newaxis]
hs = hs[:, np.newaxis]

# 将结果组合起来并返回
anchors = np.hstack(
(
x_ctr - 0.5 * (ws - 1),
y_ctr - 0.5 * (hs - 1),
x_ctr + 0.5 * (ws - 1),
y_ctr + 0.5 * (hs - 1),
)
)

return anchors

经过以上步骤, 最终的 anchors 会被作为初始化参数来实例化一个 class BufferList(nn.Module) 对象, 这一部分的详细解析可以翻到上面的关于 class AnchorGenerator(nn.Module) 的解析.

inference.py 文件解析

class RPNModule(torch.nn.Module) 中, 使用了下面的语句来分别创建训练时和测试时的 box selector:

1
2
3
4
rpn_box_coder = BoxCoder(weights=(1.0, 1.0, 1.0, 1.0))

box_selector_train = make_rpn_postprocessor(cfg, rpn_box_coder, is_train=True)
box_selector_test = make_rpn_postprocessor(cfg, rpn_box_coder, is_train=False)

box_selector_train 为例, 该方法用于在 end-to-end 的模型中, 返回训练用的 boxes, 调用形式如下:

1
2
3
boxes = self.box_selector_train(
anchors, objectness, rpn_box_regression, targets
)

由于 make_rpn_postprocessor() 函数位于 inference.py 文件中, 因此, 我们将在这一小节对该文件进行解析.(关于 BoxCoder 的解析请看模型定义-其他辅助文件解析)

inference.py 文件概览

./maskrcnn_benchmark/modeling/rpn/inference.py 文件概览如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# ./maskrcnn_benchmark/modeling/rpn/inference.py

# 导入各种包及函数
from maskrcnn_benchmark.modeling.box_coder import BoxCoder

class RPNPostProcessor(torch.nn.Module):
# 在将 proposals 喂到网络的 heads 之前, 先对 RPN 输出的 boxes 执行后处理

def __init__(...):
# 初始化函数
# ...

def add_gt_proposals(self, proposals, targets):
# ...

def forward_for_single_feature_map(self, anchors, objectness, box_regression):
# ...

def forward(self, anchors, objectness, box_regression, targets=None):
# ...

def select_over_all_levels(self, boxlists):
# ...

def make_rpn_postprocessor(config, rpn_box_coder, is_train):
# ...

make_rpn_postprocessor() 入口函数

rpn.py 中使用了 make_rpn_postprocessor() 函数来创建 class RPNPostProcessor(nn.Module) 实例, 该函数的第二个参数是一个 class BoxCoder(object) 的实例, 关于该类的解析请看模型定义-其他辅助文件解析).
make_rpn_postprocessor() 函数解析如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# ./maskrcnn_benchmark/modeling/rpn/rpn.py

def make_rpn_postprocessor(config, rpn_box_coder, is_train):
# rpn_box_coder: BoxCoder 实例

# eg: 2000
fpn_post_nms_top_n = config.MODEL.RPN.FPN_POST_NMS_TOP_N_TRAIN

if not is_train:
# eg: 1000
fpn_post_nms_top_n = config.MODEL.RPN.FPN_POST_NMS_TOP_N_TRAIN

# eg: 2000
pre_nms_top_n = config.MODEL.RPN.PRE_NMS_TOP_N_TRAIN

# eg: 2000
post_nms_top_n = config.MODEL.RPN.POST_NMS_TOP_N_TRAIN

if not is_train:
# eg: 1000
pre_nms_top_n = config.MODEL.RPN.PRE_NMS_TOP_N_TEST
# eg: 1000
post_nms_top_n = config.MODEL.RPN.POST_NMS_TOP_N_TEST

# eg: 0.7
nms_thresh = config.MODEL.RPN.NMS_THRESH

# eg: 0
min_size = config.MODEL.RPN.MIN_SIZE

# 根据配置参数创建一个 RPNPostProcessor 实例
box_selector = RPNPostProcessor(
pre_nms_top_n=pre_nms_top_n,
post_nms_top_n=post_nms_top_n,
nms_thresh=nms_thresh,
min_size=min_size,
box_coder=rpn_box_coder,
fpn_post_nms_top_n=fpn_post_nms_top_n,
)
return box_selector

可以看到, 上面函数的主要功能就是根据配置文件的信息创建一个 RPNPostProcessor 的实例对象, 下面我们来看看这个类的定义.

RPNPostProcessor 类

RPNPostProcessor 类中的 proposals 使用了 BoxList 数据结构, 关于该结构的定义可以看BoxList结构解析. 另外, 还使用了三个函数: cat_boxlist(), boxlist_nms(), 以及 remove_small_boxes(), 关于它们的详细解析请看BoxList Ops 解析

RPNPostProcessor 类的代码解析如下:

初始化函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# ./maskrcnn_benchmark/modeling/rpn/inference.py

class RPNPostProcessor(torch.nn.Module):
# 该类主要完成对 RPN boxes 的后处理功能(在将 boxes 送到 heads 之前执行)

def __init__(
self,
pre_nms_top_n,
post_nms_top_n,
nms_thresh,
min_size,
box_coder=None,
fpn_post_nms_top_n=None,
):
# pre_nms_top_n (int)
# post_nms_top_n (int)
# nms_thresh (float)
# min_size (int)
# box_coder (BoxCoder)
# fpn_post_nms_top_n (int)
super(RPNPostProcessor, self).__init__()

# 将传送进来的参数都变成成员变量
self.pre_nms_top_n = pre_nms_top_n
self.post_nms_top_n = post_nms_top_n
self.nms_thresh = nms_thresh
self.min_size = min_size

# 创建一个 BoxCoder 实例
if box_coder is None:
box_coder = BoxCoder(weights=(1.0, 1.0, 1.0, 1.0))
self.box_coder = box_coder

if fpn_post_nms_top_n is None:
fpn_post_nms_top_n = post_nms_top_n
self.fpn_post_nms_top_n = fpn_post_nms_top_n

添加真实候选框函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# ./maskrcnn_benchmark/modeling/rpn/inference.py

class RPNPostProcessor(torch.nn.Module):
def __init__(...):
# ...

def add_gt_proposals(self, proposals, targets):
# 将真实的边框标签 targets 添加到当前的 BoxList 列表数据中.
# proposals: list[BoxList]
# proposals: list[BoxList]

# 获取当前正在操作的设备
device = proposals[0].bbox.device

# 调用 BoxList 的 copy_with_fields 方法进行深度复制, gt_boxes 是一个列表
# 其元素的类型是 BoxList
gt_boxes = [target.copy_with_fields([]) for target in targets]

# 添加一个字典键, "objectness", 值为当前 BoxList 元素中的 box 的数量长度的一维 tensor
for gt_box in gt_boxes:
gt_box.add_field("objectness", torch.ones(len(gt_box), device=device))

# from maskrcnn_benchmark.structures.boxlist_ops import cat_boxlist
# 调用 boxlist_ops.py 中的 cat_boxlist 函数将 proposal 和 gt_box 合并成一个 BoxList
proposals = [
cat_boxlist((proposal, gt_box))
for proposal, gt_box zip(proposals, gt_boxes)
]
return proposals

在单一的特征图谱上执行前向传播

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# ./maskrcnn_benchmark/modeling/rpn/inference.py

class RPNPostProcessor(torch.nn.Module):
def __init__(...):
# ...

def add_gt_proposals(...):
# ...

def forward_for_single_feature_map(self, anchors, objectness, box_regression):
# anchors: list[BoxList]
# objectness: tensor of size N, A, H, W, A 代表每个像素点 anchors 的数量
# N 代表 batchsize, H, W 代表特征图谱的高和宽
# box_regression: tensor of size N, A * 4, H, W

# 获取当前的设备
device = objectness.device

# 获取 objectness 的 shape
N, A, H, W = objectness.shape

# 将格式转换成和 anchors 相同的格式, 先改变维度的排列, 然后改变 shape 的形式
objectness = objectness.permute(0, 2, 3, 1).reshape(N, -1) # shape: (N, H*W*A)

# sigmoid 归一化
objectness = objectness.sigmoid()

# 相似的操作, 应用在 box_regression 上
box_regression = box_regression.view(N, -1, 4, H, W).permute(0, 3, 4, 1, 2)
box_regression = box_regression.reshape(N, -1, 4)

# 计算 anchors 的总数量

num_anchors = A * H * W

# 确保 pre_nms_top_n 不会超过 anchors 的总数量, 以免产生错误
pre_nms_top_n = min(self.pre_nms_top_n, num_anchors)

# 调用 PyTorch 的 topk 函数, 该函数返回两个列表, 一个是 topk 的值, 一个是对应下标
objectness, topk_idx = objectness.topk(pre_nms_top_n, dim=1, sorted=True)

# 创建 batch 的下标, shape 为 N×1, 按顺序递增, 如:[[0],[1],...,[N-1]]
batch_idx = torch.arange(N, device=device)[:, None]
# 获取所有 batch 的 top_k box
box_regression = box_regression[batch_idx, topk_idx]

# 获取所有 anchor 的尺寸
image_shapes = [box.size for box in anchors]

# 获取所有 box, 将 anchors 连接成一个列表
concat_anchors = torch.cat([a.bbox for a in anchors], dim=0)

# 重新按照 batch 划分, 同时获取每个 batch 的 topk
concat_anchors = concat_anchors.reshape(N, -1, 4)[batch_idx, topk_idx]

# 将最终的结果解码成方便表示的形式(原本为方便训练的形式)
proposals = self.box_coder.decode(
box_regression.view(-1,4), concat_anchors.view(-1,4)
)

proposals = proposals.view(N, -1, 4)

result = [] # 组建结果并返回
for proposal, score, im_shape in zip(proposals, objectness, image_shapes):
# 根据当前的结果创建一个 BoxList 实例
boxlist = BoxList(proposal, im_shape, mode="xyxy")
# 添加 score
boxlist.add_field("objectness", score)
# 防止 box 超出 image 的边界
boxlist = boxlist.clip_to_image(remove_empty=False)
# 移除过小的 box
boxlist = remove_small_boxes(boxlist, self.min_size)
# 在当前的 box 上执行 nms 算法
boxlist = boxlist_nms(
boxlist,
self.nms_thresh,
max_proposals=self.post_nms_top_n,
score_field="objectness",
)
result.append(boxlist)
return result

前向传播函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# ./maskrcnn_benchmark/modeling/rpn/inference.py

class RPNPostProcessor(torch.nn.Module):

def __ init__(...):
# ...

def add_gt_proposals(...):
# ...

def forward_for_single_feature_map(...):
# ...

def forward(self, anchors, objectness, box_regression, targets=None):
# anchors: list[list[BoxList]]
# objectness: list[tensor]
# box_regression: list[tensor]

# 返回值:
# boxlists (list[BoxList]): 经过 box decoding 和 NMS 操作的处理后的 anchors,

# 创建一个空的 box 列表
sampled_boxes = []
num_levels = len(objectness)
anchors = list(zip(*anchors))

# 调用类的 forward_for_single_feature_map() 成员函数
for a, o, b in zip(anchors, objectness, box_regression):
sampled_boxes.append(self.forward_for_single_feature_map(a, o, b))

boxlists = list(zip(*sampled_boxes))
# 调用 boxlist_ops.py 文件中的 cat_boxlist函数
boxlists = [cat_boxlist(boxliist) for boxlist in boxlists]

if num_levels > 1:
# 调用类的 select_over_all_levels 成员函数
boxlists = self.select_over_all_levels(boxlists)

# 添加 gt bboxes 到 proposals 当中去
if self.training and targets is not None:
# 调用类的 add_gt_proposals 成员函数
boxlists = self.add_gt_proposals(boxlists, targets)

return boxlists

在所有层次上进行选择

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# ./maskrcnn_benchmark/modeling/rpn/inference.py

class RPNPostProcessor(torch.nn.Module):
def __init__(...):
# ...

def add_gt_proposals(...):
# ...

def forward_for_single_feature_map(...):
# ...

def forward(...):
# ...

def select_over_all_level(self, boxlists):
# 在训练阶段和测试阶段的行为不同, 在训练阶段, post_nms_top_n 是在所有的 proposals 上进行的,
# 而在测试阶段, 是在每一个图片上的 proposals 上进行的

num_images = len(boxlists)

if self.training:
# 连接 "objectness"
objectness = torch.cat(
[boxlist.get_field("objectness") for boxlist in boxlists], dim=0
)

# 获取box的数量
box_sizes = [len(boxlist) for boxlist in boxlists]

# 防止 post_nms_top_n 超过 anchors 总数, 产生错误
post_nms_top_n = min(self.fpn_post_nms_top_n, len(objectness))

# 获取 topk 的下标
_, inds_sorted = torch.topk(objectness, post_nms_top_n, dim=0, sorted=True)

inds_mask = torch.zeros_like(objectness, dtype=torch.uint8)
inds_mask[inds_sorted] = 1
inds_mask = inds_mask.split(box_sizes)

# 获取所有满足条件的box
for i in range(num_images):
boxlists[i] = boxlists[i][inds_mask[i]]
else:
for i in range(num_images):
objectness = boxlists[i].get_field("objectness")
post_nms_top_n = min(self.fpn_post_nms_top_n, len(objectness))
_, inds_sorted = torch.topk(
objectness, post_nms_top_n, dim=0, sorted=True
)
boxlists[i] = boxlists[i][inds_sorted]

return boxlists

loss.py

rpn.py 中, 使用了下面的语句来创建损失函数评价器:

1
2
3
4
5
# ./maskrcnn_benchmark/modeling/rpn/rpn.py

from .loss import make_rpn_loss_evaluator

loss_evaluator = make_rpn_loss_evaluator(cfg, rpn_box_coder)

可以看出, 该语句使用了 ./rpn/loss.py 文件中的 make_rpn_loss_evaluator() 函数来创建 RPN 网络的损失函数评价器, 下面我们就来看看该函数的代码实现是怎样的, 代码解析如下.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# ./maskrcnn_benchmark/modeling/rpn/loss.py

from ..balanced_positive_negative_sampler import BalancedPositiveNegativeSampler

from maskrcnn_benchmark.modeling.matcher import Matcher

# ...

def make_rpn_loss_evaluator(cfg, box_coder):

# 根据配置信息创建 Matcher 实例
matcher = Matcher(
cfg.MODEL.RPN.FG_IOU_THRESHOLD, # 0.7
cfg.MODEL.RPN.BG_IOU_THRESHOLD, # 0.3
allow_low_quality_matches=True,
)

# 根据配置信息创建一个 BalancedPositiveNegativeSampler 实例
fg_bg_sampler = BalancedPositiveNegativeSampler(
cfg.MODEL.RPN.BATCH_SIZE_PER_IMAGE, cfg.MODEL.RPN.POSITIVE_FRACTION
)

# 利用上面创建的实例对象进一步创建 RPNLossComputation 实例
loss_evaluator = RPNLossComputation(matcher, fg_bg_sampler, box_coder)

从上面的代码我们可以看出, 在 make_rpn_loss_evaluator() 函数中, 创建了 ./modeling/matcher.py 中的 Matcher 实例, 同时还创建了 ./modeling/balanced_positive_negative_sampler.py 文件中的 BalancedPositiveNegativeSampler, 最后, 利用这两个实例创建了本文件中定义的 RPNLossComputation 实例, 前两个类的解析我们已经介绍过, 下来, 我们就来详细介绍一下本文件的 RPNLossComputation 类的代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# ./maskrcnn_benchmark/modeling/loss.py

import torch
from torch.nn import functional as F

from ..balanced_positive_negative_sampler import BalancedPositiveNegativeSampler
from ..utils import cat

from maskrcnn_benchmark.layers import smooth_l1_loss
from maskrcnn_benchmark.modeling.matcher import Matcher
from maskrcnn_benchmark.structures.boxlist_ops import boxlist_iou
from maskrcnn_benchmark.structures.boxlist_ops import cat_boxlist

class RPNLossComputation(object):
# 该类用于计算 RPN 的损失函数结果

def __init__(self, proposal_matcher, fg_bg_sampler, box_coder):