Coordinator 类
由于 RoIDataLoader 类将 Coordinator 类对象作为成员变量, 因此我们先看一下这个类的作用和底层实现, 该类位于detectron/utils/coordinator.py
文件中, 定义如下:
1 | #detectron/utils/coordinator.py |
RoIDataLoader 类
在之前分析的tools/train_net.py
文件中, 关于数据载入的部分被封装在了detectron/roi_data/loader.py
文件中的RoIDataLoader
类中, 而数据载入对于任何模型和工程来说, 都是非常重要的一步, 下面, 我们就具体看看这个类的底层实现是怎么样的.
文件开头, 有一段非常详细的注释:
1 | # detectron/roi_data/loader.py |
从上面的注释我们可以看出, 这个文件定义了Detectron
的数据载入器data loader
, 这个类的设计是一种抽象的一般化的设计, 并且会与所有minibatch的细节隔离开来. 在这个类中, minibatch被记录为一个字典结构, 它的key值为blob name, 其value值为对应的numpy ndarray.
每一个GPU都具有一个enqueue线程, 可以从minibatch queue中获取数据, 然后会将minibatch blobs喂到workspace中去, 之后运行 EnqueueBlobsOp 来将minibatch blobs 放置到 GPU的blob queue中.
在每一次前向传播过程中, 模型最先做的事情就是运行 DequeueBlobsOp 来构建工作空间.
下面, 看一下RoIDataLoader
类的具体实现:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58# detectron/roi_data/loader.py
class RoIDataLoader(object):
def __init__(
self,
roidb,
num_loaders = 4,
minibatch_queue_size=64,
blobs_queue_capacity=8
):
self._roidb = roidb
self._lock = threading.Lock()
self._perm = deque(range(len(self._roidb)))
self._cur = 0 # _perm cursor
# minibatch队列会在CPU内存当中持有准备好的训练数据
# 当训练N>1个GPUs时, 在minibatch队列中的每一个元素
# 实际上是只是一部分minibatch, 对整个minibatch贡献了
# 1/N的样例
# from six.moves import queue as Queue
self._minibatch_queue = Queue.Queue(maxsize=minibatch_queue_size)
# TODO, 其他参数的初始化
# from detectron.utils.coordinator import Coordinator
self.coordinator = Coordinator()
# 加载mini-batches, 并且将它们放进mini-batch 队列中.
def minibatch_loader_thread(slef):
# coordinator的上下文管理器, 当有异常出现时会调用coordinator.request_stop()方法
with self.coordinator.stop_on_exception():
while not self.coordinator.should_stop():
# RoIDataLoader的成员函数, 返回用于下一个minibatch的blobs,
# 函数内部调用了另一个成员函数_get_next_minibatch_inds()
# 该函数返回下一个minibatch的roidb的下标
# 还调用了detectron/roi_data/minibatch.py文件中的get_minibatch方法
# 该方法会在给定roidb的情况下, 从中构造一个minibatch
blobs = self.get_next_minibatch()
# from collections import OrderedDict
# Blobs必须根据self.get_output_names()在队列中进行排序
ordered_blobs = OrderedDict()
for key in self.get_output_names():
assert blobs[key].dtype in (np.int32, np.float32), \
"Blob {} of dtype {} must have dtype of" \
"np.int32 or np.float32".format(key, blobs[key].dtype)
ordered_blobs[key] = blobs[key]
# from detectron.utils.coordinator import coordianted_put
# 此处是将minibatch中数据blobs放入队列的关键代码
coordinated_put(self.coordinator, self._minibatch_queue, ordered_blobs)
logger.info("Stopping mini-batch loading thread")
# 将mini-batches从mini-batch队列中转移到BlobsQueue中.
def enqueue_blobs_thread(self, gpu_id, blob_names):
with self.coordinator.stop_on_exception():
while not self.coordinator.should_stop():
if self._minibatch_queue.qsize == 0:
logger.warning("Mini-batch queue is empty")
blobs = coordinated_get(self.coordinate, self._minibatch_queue)