Detectron 源码解析-数据加载

Coordinator 类

由于 RoIDataLoader 类将 Coordinator 类对象作为成员变量, 因此我们先看一下这个类的作用和底层实现, 该类位于detectron/utils/coordinator.py文件中, 定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#detectron/utils/coordinator.py

# 从名字可以看出, 该类的作用主要是协调各个数据载入管道之间的信息同步
# 实现上, 该类主要封装了threading多线程模块的一些功能
class Coordinator(object):

def __init__(self):
# import threading
self._event = threading.Event()
def request_stop(self):
log.debug("Coordinator stopping")
self._event.set()

def should_stop(self):
# 当Event()对象使用set()方法后, is_set()方法返回镇
return self._event.is_set()

#...
@contextlib.contextmanager 上下文环境管理器
def stop_on_exception(self):
try:
yield
except Exception:
if not self.should_stop():
traceback.print_exc()
self.request_stop()

def coordinated_get(coordinator, queue):
while not coordinator.should_stop():
try:
# 从队列中获取数据
return queue.get(block=True, timeout=1.0)
except Queue.Empty:
continue
raise Exception("Coordinator stopped during get()")

def coordinated_put(coordinator, queue, element):
while not coordinator.shuold_stop():
try:
queue.put(element, block=True, timeout=1.0)
return
except Queue.Full:
continue
raise Exception("Coordinator stopped during put()")

RoIDataLoader 类

在之前分析的tools/train_net.py 文件中, 关于数据载入的部分被封装在了detectron/roi_data/loader.py文件中的RoIDataLoader类中, 而数据载入对于任何模型和工程来说, 都是非常重要的一步, 下面, 我们就具体看看这个类的底层实现是怎么样的.

文件开头, 有一段非常详细的注释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# detectron/roi_data/loader.py

"""Detectron data loader. The design is generic and abstracted away from any
details of the minibatch. A minibatch is a dictionary of blob name keys and
their associated numpy (float32 or int32) ndarray values.

Outline of the data loader design:

loader thread\
loader thread \ / GPU 1 enqueue thread -> feed -> EnqueueOp
... -> minibatch queue -> ...
loader thread / \ GPU N enqueue thread -> feed -> EnqueueOp
loader thread/

<---------------------------- CPU -----------------------------|---- GPU ---->

A pool of loader threads construct minibatches that are put onto the shared
minibatch queue. Each GPU has an enqueue thread that pulls a minibatch off the
minibatch queue, feeds the minibatch blobs into the workspace, and then runs
an EnqueueBlobsOp to place the minibatch blobs into the GPU's blobs queue.
During each fprop the first thing the network does is run a DequeueBlobsOp
in order to populate the workspace with the blobs from a queued minibatch.
"""

从上面的注释我们可以看出, 这个文件定义了Detectron的数据载入器data loader, 这个类的设计是一种抽象的一般化的设计, 并且会与所有minibatch的细节隔离开来. 在这个类中, minibatch被记录为一个字典结构, 它的key值为blob name, 其value值为对应的numpy ndarray.

每一个GPU都具有一个enqueue线程, 可以从minibatch queue中获取数据, 然后会将minibatch blobs喂到workspace中去, 之后运行 EnqueueBlobsOp 来将minibatch blobs 放置到 GPU的blob queue中.

在每一次前向传播过程中, 模型最先做的事情就是运行 DequeueBlobsOp 来构建工作空间.

下面, 看一下RoIDataLoader类的具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# detectron/roi_data/loader.py
class RoIDataLoader(object):
def __init__(
self,
roidb,
num_loaders = 4,
minibatch_queue_size=64,
blobs_queue_capacity=8
):
self._roidb = roidb
self._lock = threading.Lock()
self._perm = deque(range(len(self._roidb)))
self._cur = 0 # _perm cursor
# minibatch队列会在CPU内存当中持有准备好的训练数据
# 当训练N>1个GPUs时, 在minibatch队列中的每一个元素
# 实际上是只是一部分minibatch, 对整个minibatch贡献了
# 1/N的样例
# from six.moves import queue as Queue
self._minibatch_queue = Queue.Queue(maxsize=minibatch_queue_size)
# TODO, 其他参数的初始化


# from detectron.utils.coordinator import Coordinator
self.coordinator = Coordinator()

# 加载mini-batches, 并且将它们放进mini-batch 队列中.
def minibatch_loader_thread(slef):
# coordinator的上下文管理器, 当有异常出现时会调用coordinator.request_stop()方法
with self.coordinator.stop_on_exception():
while not self.coordinator.should_stop():
# RoIDataLoader的成员函数, 返回用于下一个minibatch的blobs,
# 函数内部调用了另一个成员函数_get_next_minibatch_inds()
# 该函数返回下一个minibatch的roidb的下标
# 还调用了detectron/roi_data/minibatch.py文件中的get_minibatch方法
# 该方法会在给定roidb的情况下, 从中构造一个minibatch
blobs = self.get_next_minibatch()

# from collections import OrderedDict
# Blobs必须根据self.get_output_names()在队列中进行排序
ordered_blobs = OrderedDict()
for key in self.get_output_names():
assert blobs[key].dtype in (np.int32, np.float32), \
"Blob {} of dtype {} must have dtype of" \
"np.int32 or np.float32".format(key, blobs[key].dtype)

ordered_blobs[key] = blobs[key]
# from detectron.utils.coordinator import coordianted_put
# 此处是将minibatch中数据blobs放入队列的关键代码
coordinated_put(self.coordinator, self._minibatch_queue, ordered_blobs)
logger.info("Stopping mini-batch loading thread")

# 将mini-batches从mini-batch队列中转移到BlobsQueue中.
def enqueue_blobs_thread(self, gpu_id, blob_names):
with self.coordinator.stop_on_exception():
while not self.coordinator.should_stop():
if self._minibatch_queue.qsize == 0:
logger.warning("Mini-batch queue is empty")
blobs = coordinated_get(self.coordinate, self._minibatch_queue)