本文介绍如何使用Tensorflow进行多线程预处理。首先介绍TFRecord格式,并介绍如何利用队列框架进行多线程数据预处理,最后介绍Tensorflow 1.3版之后推荐使用的数据集(Dataset)API。
TFRecord文件
介绍
TFRecord文件中的数据都是通过tf.train.Example Protocol Buffer的格式存储的。下面是tf.train.Example的定义
1 | message Example { |
从上面的定义可以看出,tf.train.Example的数据结构比较简单,包含一个从属性名称到取值的字典,其中属性名称为字符串,取值可以为字符串、实数列表、整数列表。
样例
1 | import tensorflow as tf |
1 | def _int64_feature(value): |
Extracting /media/seisinv/Data/04_data/MNIST_data/train-images-idx3-ubyte.gz
Extracting /media/seisinv/Data/04_data/MNIST_data/train-labels-idx1-ubyte.gz
Extracting /media/seisinv/Data/04_data/MNIST_data/t10k-images-idx3-ubyte.gz
Extracting /media/seisinv/Data/04_data/MNIST_data/t10k-labels-idx1-ubyte.gz
1 | # 创建reader读取TFRecord文件 |
[array([ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 97,
96, 77, 118, 61, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 90, 138, 235, 235, 235, 235, 235,
235, 251, 251, 248, 254, 245, 235, 190, 21, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 140, 251, 254, 254, 254, 254,
254, 254, 254, 254, 254, 254, 254, 254, 254, 254, 189, 23, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 226, 254, 208, 199,
199, 199, 199, 139, 61, 61, 61, 61, 61, 128, 222, 254, 254,
189, 21, 0, 0, 0, 0, 0, 0, 0, 0, 0, 38, 82,
13, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 34,
213, 254, 254, 115, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 84, 254, 254, 234, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 84, 254, 254, 234, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 106, 157, 254, 254, 243, 51, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 25, 117, 228, 228, 228, 253, 254, 254, 254, 254, 240,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 68, 119, 220, 254, 254, 254, 254, 254, 254, 254, 254,
254, 142, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 37, 187, 253, 254, 254, 254, 223, 206, 206, 75, 68,
215, 254, 254, 117, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 113, 219, 254, 242, 227, 115, 89, 31, 0, 0,
0, 0, 200, 254, 241, 41, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 169, 254, 176, 62, 0, 0, 0, 0,
0, 0, 0, 48, 231, 254, 234, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 18, 124, 0, 0, 0, 0,
0, 0, 0, 0, 0, 84, 254, 254, 166, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 139, 254, 238, 57, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 210, 250, 254, 168, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 242, 254, 239,
57, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 89, 251,
241, 86, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 5,
206, 246, 157, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 4, 117, 69, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0], dtype=uint8), 7, 784]
[array([ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 31, 132, 254,
253, 254, 213, 82, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 21, 142, 233,
252, 253, 252, 253, 252, 223, 20, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 123, 254,
253, 254, 253, 224, 203, 203, 223, 255, 213, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
203, 253, 252, 253, 212, 20, 0, 0, 61, 253, 252, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 41, 243, 224, 203, 183, 41, 152, 30, 0, 0, 255, 253,
102, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 40, 20, 0, 0, 102, 253, 50, 0, 82,
253, 252, 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 82, 214, 31,
113, 233, 254, 233, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 62, 102, 82, 41,
253, 232, 253, 252, 233, 50, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 152, 253,
254, 253, 254, 253, 254, 233, 123, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
152, 252, 253, 252, 253, 252, 192, 50, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 62, 183, 203, 243, 254, 253, 62, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 40, 172, 252, 203, 20, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 21, 0, 0, 0, 0, 0, 0, 0, 0, 0, 183, 254,
112, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 62, 203, 163, 0, 0, 0, 0, 0, 0, 0, 0,
61, 253, 151, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 21, 214, 192, 0, 0, 0, 0, 0, 0, 0,
0, 11, 213, 254, 151, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 102, 253, 151, 0, 0, 0, 0, 0,
0, 0, 41, 213, 252, 253, 111, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 41, 255, 213, 92, 51, 0,
0, 31, 92, 173, 253, 254, 253, 142, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 172, 252, 253,
252, 203, 203, 233, 252, 253, 252, 253, 130, 20, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 21,
203, 255, 253, 254, 253, 254, 253, 244, 203, 82, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 20, 151, 151, 253, 171, 151, 151, 40, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0], dtype=uint8), 3, 784]
多线程数据处理框架
队列和多线程
在Tensorflow中,队列和变量类似,都是计算图上有状态的节点,其他的计算节点可以修改它们的状态。
- 对于变量,可以通过赋值操作修改变量的取值。
- 对于队列,修改队列的操作主要有Enqueue, EnqueueMany和Dequeue
1 | # 创建一个先进先出的队列,指定最多可以保存两个元素,类型为整型 |
0
10
1
11
2
Tensorflow中还提供了两种队列,FIFOQueue和RandomShufferQueue,第一种实现的是先进先出队列,后一种会将队列中的元素打乱,每次出队列操作得到的是从当前队列所有元素中随机选择的一个。
在Tensorflow中,队列不仅是一种数据结构,而且是异步计算张量取值的一个重要机制。比如多个线程同事向一个队列写元素或者读元素。
Tensorflow中提供了tf.Coordinator和tf.QueueRunner两个类实现多线程协同的功能。tf.Coordinator类主要用于协同多个线程一起停止,并提供了should_stop, request_stop和join三个函数。用法如下:
- 在启动线程之前,首先需要声明一个tf.Coordinator类
- 将这个类传入每个创建的线程,启动的线程需要一直查询should_stophanshu,当该函数返回为True时,当前线程需要退出
- 每个启动的线程都可以通过request_stop函数通知其他的线程退出
- 当某个线程调用request_stop函数之后,should_stop函数的返回值将被设置为True,这样其他的线程就会同时终止。
1 | import threading |
Working on id:0
Working on id:1
Working on id:0
Working on id:1
Working on id:0
Stoping from id: 1
tf.QueueRunner主要用于启动多个线程来操作同一个队列,启动的这些线程可以通过上面介绍的tf.Coordinator类来统一管理。
1 | # 声明一个队列,共100个元素,类型为实数 |
1.43469
-0.598175
输入文件队列
虽然可以将多个训练样本放入一个TFRecord中,但是当训练数据量很大时,将数据分成多个TFRecord文件可以提高处理效率。Tensorflow提供了tf.train.match_filenames_once函数获取符合正则化表达式的所有文件列表,并通过tf.train.string_input_producer函数进行管理。
1 | # 模拟海量数据情况下,将数据写入不同的文件 |
1 | # 获取文件列表 |
[b'/media/seisinv/Data/04_data/ai/test/data.tfrecords_00000-of-00002'
b'/media/seisinv/Data/04_data/ai/test/data.tfrecords_00001-of-00002']
[0, 0]
[0, 1]
[1, 0]
[1, 1]
[0, 0]
[0, 1]
在上面的例子中,由于没有打乱文件列表的顺序,因为会依次读取样本数据中每个样本的信息,而且当所有样本都被读取之后,程序会自动从头开始。如果限制num_epochs为1,那么程序将会报错。
组合训练数据
当得到单个样本之后,可以通过tf.train.batch和tf.train.shuffle_batch函数以队列的形式生成一个batch。入队操作是生成一个样本,出队操作得到一个batch的样本。两者唯一的区别在于是否会将数据顺序打乱。
1 | tf.reset_default_graph() |
[0 0 1] [0 1 0]
[1 0 0] [1 0 1]
tf.train.shuffle_batch的用法和tf.train.batch类似,区别有二:
- 输出的样本顺序会被打乱
- 增加一个参数min_after_dequeue,限制出队时样本的最少个数。这是因为当队列中元素太少时,随机打乱顺序的作用不大,只有当样本多于一定数量时,才开始随机出队
tf.train.batch、tf.train.shuffle_batch、tf.train.shuffle_batch_join都可以通过num_threads指定多个线程执行入队操作(包括数据读取和预处理)。区别在于:
- tf.train.shuffle_batch函数多个线程会同时读取一个文件的不同样本并进行预处理。如果一个文件中的样例比较相似(比如属于同一类),那么神经网络的训练效果可能会受到影响,因此尽量将同一个TFRecord文件中的样本随机打乱。
- tf.train.shuffle_batch_join函数多个线程处理不同文件中的不同样本,不同线程会读取不同文件,具体来说,是将tf.train.string_input_producer函数生成的文件队列平均分配到不同的线程上。但是,如果读取数据的线程数比总文件数还大,那么多个线程可能会读取同一个文件中相近部分的数据。而且多个线程读取多个文件可能导致过多的硬盘寻址,从而使得读取效率降低。
总结具体的多线程文件读入和预处理步骤包括:
- 输入文件列表,通过tf.train.string_input_producer函数生成输入文件队列(可以随机打乱)
- 通过tf.train.batch或其他2个函数进行多线程入队操作(包括数据读入和预处理)
- 生成样本batch,执行训练或者测试过程
数据集(Dataset)
前面介绍了,通过队列进行多线程输入和预处理。从Tensorflow 1.3开始,数据集被正式推荐为输入数据的首选框架。下面介绍数据集的基本用法。
基本用法
在数据集框架中,每个数据集代表一个数据来源,可能是一个张量、TFRecord文件、或者文本文件。由于训练数据通常都很大,无法全部写入内存中,因此从数据集中读取数据时需要使用一个迭代器按顺序读取,这点和队列相似,并且数据集也是计算图上的一个节点。
1 | input_data = [1, 2, 3, 4, 5] |
1
4
9
16
25
利用数据集读取数据的步骤包括:
- 创建数据集
- 定义遍历器
- 使用get_next读取数据张量
1 | input_files = ["/media/seisinv/Data/04_data/ai/test/test1.txt","/media/seisinv/Data/04_data/ai/test/test2.txt"] |
b'1\t2'
b'3\t4'
b'5\t6'
1 | def parser(record): |
[0, 0]
[0, 1]
[1, 0]
[1, 1]
除了简单地使用one_shot_iterator(需要事先确定所欲的参数)来遍历数据外,还可以使用placeholder来初始化数据集,initializable_iterator初始化迭代器。
1 | input_files = tf.placeholder(tf.string) |
[0, 0]
[0, 1]
[1, 0]
[1, 1]
数据集的高层封装
在上一节介绍的队列框架中,预处理,shuffle,batch等操作有的在队列上进行,有的在图片张量上进行,整个处理流程在处理队列和张量的代码中来回切换。而在数据集中,所有的操作都是在数据集上进行,代码结构简洁、干净。
通过map方法可以封装更加复杂的预处理流程,比如: 1
dataset = dataset.map(lambda x: preprocess(x, image_size, image_size, None))
在数据集框架中,shuffle和batch操作由两个方法独立实现: 1
2dataset = dataset.shuffle(buffer_size) # 和前面的min_after_dequeue相似
dataset = dataset.batch(batch_size)
数据集框架还提供了很多其他的函数,具体的参考Tensorflow相关文档。
结论
本文主要介绍Tensorflow所支持的多线程输入及预处理框架,包括:
- TFRecord格式可以将不同类型的数据统一管理,其数据结构可以简单理解为一种复杂的字典结构
- 队列框架支持从生成文件列表队列到batch组合队列,中间支持多线程并行预处理样本集
- 数据集框架是目前Tensorflow推荐的高层输入和预处理框架,该框架提供了随机打乱、样本batch、数据复制等高层操作。
参考资料
- 郑泽宇、梁博文和顾思宇,Tensorflow: 实战Google深度学习框架(第二版)