0%

A Guide for Vectorization with NumPy

PS. 本文环境为Python 3.5.4, NumPy 1.18.3. 行文中往往不区分“数组”与“向量”.

How far Python may go

我们以一个简单的例子开头:计算 1 1000000 的和。(我知道用求和公式自然可以瞬间出结果,只是做一个简单的性能对比)
以下是一个简单的实现和结果:

1
2
3
4
s = 0
for i in range(1000000):
s += i
# 109 ms ± 1.47 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

来看看Python做了什么(涉及编译原理的一些知识,如果不想看可以跳过)。
Python是一门解释性语言。它内部由底层解释器在一个stack machine(主要临时空间是一个栈)上直接执行一系列由源代码生成的字节码。
我们可以用dis模块获取这一段代码的字节码,我对其逐行作了注释。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
 0 LOAD_CONST               0 (0)        # 将常量0入栈
3 STORE_NAME 0 (s) # 把栈顶的值(即先前入栈的0)赋值给变量s,并出栈,现在栈是空的

6 SETUP_LOOP 30 (to 39) # 创建一个循环结构,这个循环结束位置是标号为39的字节码。循环结构入栈。
# PS. 这一动作是为了支持诸如break等指令和for...else...的特殊语法。
9 LOAD_NAME 1 (range) # 将range对象入栈(栈中存放着一个函数)
12 LOAD_CONST 1 (1000000) # 将常量1000000入栈
15 CALL_FUNCTION 1 (1 positional, 0 keyword pair)
# 函数调用,将1个参数(1000000)和函数(range)出栈,将调用结果入栈
# 现在栈顶存放着一个range对象
18 GET_ITER # 获取迭代器,range对象出栈,现在栈顶存放着用于遍历range的迭代器
19 FOR_ITER 16 (to 38) # for循环正式开始,并指定了for循环结束位置标号为38
# 如果迭代器里没有更多值了,将跳到标号38的地方
# 这一步不将任何东西出栈,并将迭代器的下一个值入栈
22 STORE_NAME 2 (i) # 把栈顶值赋值给变量i,并出栈

25 LOAD_NAME 0 (s) # 把变量s的值入栈
28 LOAD_NAME 2 (i) # 把变量i的值入栈
31 INPLACE_ADD # 出栈两次、将栈顶两个值相加,把结果入栈
32 STORE_NAME 0 (s) # 把栈顶值赋值给s,出栈
# 这四步完成了s += i的操作
35 JUMP_ABSOLUTE 19 # 无条件跳回for循环开始的地方
38 POP_BLOCK # 将之前提到过入栈的循环结构出栈
39 LOAD_CONST 2 (None) # 因为Python规定没有返回值的表达式和函数返回值为None
42 RETURN_VALUE # 因而将None入栈,并出栈返回

如果你对计算机体系结构有基本的认识,你会意识到:Python居然是直接用名称来定位变量的!(…_NAME)这意味着,在你做一次变量赋值或取值的时候,Python就要在变量表里查找一次你所要操作的变量。这之中存在大量多余的字符串哈希和比较操作,消耗了大量时间。

那么消灭变量就足够快了吗?我们知道,Python的sum函数可以完成求和操作,因此我们可以这么写

1
2
sum(range(1000000))
# 37.3 ms ± 144 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

挖藕,提升了3倍的性能呢!真快呢~
如果你这么想的话再看看这个

1
2
3
import numpy
numpy.sum(numpy.arange(1000000))
# 2.08 ms ± 101 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

近20倍的性能提升,似乎只是加了numpy.世界就变快了。其中原因有若干,罪魁祸首是Python的动态类型系统。准备作加法的对象,Python并不能事先知道其类型。因而每次作加法时都要动态地获取对象的类型,确认两个对象可加,并根据实际的类型选择相加的方式。而NumPy中的数组中的数其实都有类型(可以用some_var.dtype获得),因而对于C++实现的NumPy而言,一次加法只对应一条CPU指令,甚至在向量化环境下多次加法只对应一条CPU指令(关于具体可以搜索SIMDAVX2相关指令)。
那么Python能否引入类型信息而借此进行效率优化呢?很遗憾,官方实现还没有这么做。也就是说,在数值计算问题中,Pure-Python的效率极限距离C++实现约20倍。(博主当前正在开发一个对Python进行静态类型推导方面的摸鱼项目 Github

总结(跳过了上述部分的读者可以从这里开始看):

  1. Python中循环很慢。
  2. Python中循环操作慢的一个主要原因是对变量的赋值和读取需要进行对变量名字符串的处理。
  3. Python中利用内建函数进行的数值计算快于循环计算,但是仍然比较慢,原因是Python中缺少相关类型信息,需要动态获取。
  4. Python实现效率的极限大约为向量化操作效率的1/20。通常来说这一数值可以达到约1/50甚至更低。这意味着当你在Python中通过拆开向量化操作的精细实现进行少量的算法复杂度优化时,往往需要考虑进这一常数的差异。

Hello, World of Vectorization

这里的基础指直接利用NumPy的向量化函数进行计算。在需要对一组数据进行处理时往往可以将缩写打到有提示的IDE里面看看NumPy是不是帮你实现好了:)
NumPy的核心便是它的向量类,ndarray。下面主要讲解numpy.ndarrayshape(形状,大小,whatever)。本文将以一系列小概念的方式来引入这些内容。

维数的个数(向量的阶)

这个名词可能读起来比较奇怪,维数已经是个个数了,维数的个数又是什么?
举一些基本的例子:某向量的维数是 x ,那么这个向量被称为 x 维向量。相对应的,维数的个数是1,那么就是向量;维数的个数是2,那么就是矩阵;不小于3那么就是张量;0那么就是标量。
在代码中这一阶数可由len(some_arr.shape)表示(注:对于NumPy中标量而言这一语句仍然合法;对于Python原生数值这一语句并不合法)。阶数是一个ndarray最基本的属性。

Shape

上述的每一个向量的各个维数按顺序组成的元组就是shape。为了避免这一描述过于抽象我们仍然举一些例子。例如三维向量的shape(3,)(其中多余的,在Python中用来区分标量和一元组)一个 m \times n 矩阵的shape(m, n),一幅图片由RGB三个通道的色彩矩阵决定,因而shape可以是(m, n, 3)(3, m, n),其中mn分别代表高宽,这两种记法分别被称为HWCCHW;一组同样大小的图片数据则再在这之前添加一个维数表示图片数量,例如(x, m, n, 3),这种记法被称为NHWC

Axis

想象把一个具有n个维数的向量每个元素对应一个方格,放在一个n维空间中;axis实际就是在Shape中的位置对应的轴。NumPy大多数函数可以在某个轴上操作,操作完成后例如sum等函数结果会将shape对应位置变成1或直接删去。通过指定axis操作的组合可以编写出较为多样的操作。
通常情况下,数据组数对应axis0索引,其他axis用负数索引。这样操作的好处是操作往往可以拓展到中间有一些其他维数的数据上。

例子:空间中有n个点,求均值点和这些点距离原点的方差。

1
2
3
4
5
def mean_and_var(data):
# data: (n, 3)
m = numpy.mean(data, axis=0) # Mean among axis 0. m: (3,)
dist = numpy.sum(data ** 2, axis=-1) ** 0.5 # Sum of squares among last axis and sqrt, dist: (n,)
return m, numpy.var(dist)

Gather操作

Gather操作指以一个数组shape = (n, d1, d2, ...)为数据源,另一个数组shape = (m,), dtype = int为选择的下标对源数组的第一维进行访问,得到新的数组shape = (m, d1, d2, ...)的操作。
广义上的Gather操作可以支持其他维甚至多个不连续维同时访问的操作,但是NumPy没有内建函数支持(TensorFlow、PyTorch等所带的张量操作库均具有此操作的支持,NumPy需要手动调换轴的顺序拼凑出该操作)。
NumPy中进行这一操作非常简单。正如下标访问那样,用a[b]的语法即可。

例子:One-hot Encoding

1
2
3
def one_hot(labels):
# labels: (n,), int starting from 0
return numpy.eye(labels.max() + 1)[labels]

由于第 i 个标签的编码结果等价于单位阵的第 i 行,可以利用Gather操作进行简洁的运算表达。
当下标为一组Slice时,可以对一个范围内的元素进行访问。下面这个例子对于一组概率分布返回每个概率分步中最高的k个。
Slice的定义与range类似,a:b:c表示start:end:step,省略表示从数组的开始或到数组的最后(step默认为1),且下标值不包括end自身。

1
2
3
def top_k(prob, k):
# prob: (n, s)
return numpy.argsort(prob, axis=-1)[:, ::-1][:, :k]

注:用这种方式访问的元素是通过储存原数组和对应地址实现的,按元素改变返回结果将改变原数组的值。但这也意味着空间占用得到了最简化,例如One-hot无需每个数据点储存一个大小为n的向量,而只需储存一个对于单位阵的引用。

让Shape帮助你提高编写代码的效率和正确率

在进行复杂的操作时,往往将元素之间映射关系对应到向量操作时并不是非常直观,容易犯细节错误。
这时可以考虑shapeshape在向量化的世界中就如同其他语言中的变量类型,利用shape,可以通过向量之间操作的可行性和结果中元素的数量来检查代码的正确性。当你每一步之间的逻辑没有问题(在这里往往是一个简单函数调用,不易出问题),而shape(或其他语言中变量类型)又是符合预期的时候,其运算结果一般情况下都是正确的。
因而:在执行复杂操作时,建议分步进行计算而不要堆积在一个表达式解决。在执行复杂操作时推荐在每步(或关键步骤)后注释上预期的shape。出现结果错误或运行时错误时首先检查shape是否符合预期。

One Step Further - Einstein Sum

爱因斯坦求和约定是爱因斯坦提出的一种记号,用来简化多个/多维数据进行乘积求和的运算的标记。这种标记可以有效地将我们从实现复杂求和操作的时候从拼凑summatmul的苦海中解救出来。另外,在处理多个(3个及以上)数组时,可以通过预处理求和约定得到一个最优的求和路线来减少计算量。例如三个矩阵的乘法ABC,其中A(20, 20) B(20, 20) C(20, 1),那么先求BC可以大幅减少计算量。
(爱因斯坦永远滴神)
对于数学上的记号我们不作解释。感兴趣的读者可以参阅Wolfram的解释页面
在NumPy中对爱因斯坦求和的实现位于numpy.einsum函数中。调用时第一个变量是求和约定,随后是待求和的数组。
对于求和约定的简单描述如下:由->符号划分数据源侧和结果侧。数据源侧每个参与求和的向量之间由,隔开。对于源侧每个相同的维度,对应数组的对应维上的元素将会作乘积。如出现在结果侧,则将乘积放在对应位置;如不出现则求和。对于其他的维度必须同时出现在结果侧,将会以结果侧的顺序直接“复制”对应的维度。
这个记号有一个约束是:乘积求和的标记只能出现在两个源侧元素中,而不能出现在更多元素中。
不难发现每个元素的下标个数需要与输入变量维数的个数一致。

例子:矩阵乘法

1
2
3
4
5
A = numpy.random.normal(size=[10, 20])
B = numpy.random.normal(size=[20, 40])
print(numpy.max(numpy.abs(numpy.matmul(A, B) - numpy.einsum("ij,jk->ik", A, B))))
# Sum over 'j' and gather into result[i, k]
# 2.6645352591003757e-15

(这一小差异是浮点精度误差。如果你不知道浮点精度误差是什么,你可以暂时认为这是一种由于你邻居的猫叫了一声产生的随机误差。)
例子:向量内积和逐元素相乘

1
2
3
4
5
6
A = numpy.random.normal(size=[5])
B = numpy.random.normal(size=[5])
print(numpy.max(numpy.abs(numpy.dot(A, B) - numpy.einsum("i,i->", A, B))))
# 0.0
print(numpy.max(numpy.abs(numpy.multiply(A, B) - numpy.einsum("i,i->i", A, B))))
# 0.0

One Step Further - Stride Tricks

Stride Trick指根据某数组通过每隔一定间隔取数的方式构造新的数组的操作。
常用来利用元素的规律性分布进行对较为复杂的运算的简化(和向量化)。
需要注意的是,广义上的Stride Trick是不安全的。
如果你使用了错误的操作,可能会导致不可预测的结果和Python核心崩溃(kernel killed)。
BroadcastStride Trick的一个安全的子集。

什么是Stride

在NumPy中除特别指定,一个向量数据总是占用连续的空间。(如果你对高速缓存和数据局部性的作用有基本的认识,你会知道为什么NumPy总是为数组申请连续的空间。)Stride,步进量,在这里表示每个元素或每个维度的下标增加 1 对应内存地址的增量。
我们定义一个 10 \times 20 的矩阵,并来看一下它的strides属性。

1
2
3
4
5
A = numpy.random.uniform(size=[10, 20])
print(A.strides)
# (160, 8)
print(A.dtype)
# float64

每个元素是64位的浮点,因而占用8字节。每列是20个元素,因而占用160字节。这就是stride

as_strided函数

这个函数可以实现所有的Stride Tricks
它位于一个不太好找的地方:np.lib.stride_tricks.as_strided
它接受三个参数,原数组,新数组shape和新数组strides。到这里应该已经比较明晰了。在不同维度上,通过不同步进量的访问,得到新的数组(要求len(shape) = len(strides))。
与Gather操作同样,这个新数组也是不具有自己的内存空间的,它只是对于原数组的一个引用。

例子:向量化二维卷积实现 - Part I
假设我们有卷积核,shape = (ker_size, ker_size, n_in, n_out)和输入数据shape = (b, h, w, n_in),我们希望进行卷积操作,就可以通过Stride Trick来生成待卷积的内容。
在指定完成需要的shape时便正好卷积到边界,无需担心溢出问题。

1
2
3
4
5
6
7
8
9
def submatrices(inputs, kernel_size, output_shape):
core_shape = [kernel_size, kernel_size, inputs.shape[-1]]
return numpy.lib.stride_tricks.as_strided(
inputs,
[inputs.shape[0]] + output_shape[:2] + core_shape,
inputs.strides[:3] + inputs.strides[-3:]
)
# submatrices(numpy.random.uniform(size=[16, 24, 24, 6]), 5, [20, 20, 32]).shape
# (16, 20, 20, 5, 5, 6)

(随后将每个(5, 5, 6)区块卷积得到(32,)个数即可,相关实现将在Broadcast中介绍。)

Broadcast

Broadcast操作将某个维数为 1 的维度复制扩展为维数为 n 的维度,来和其他操作数对齐。某种意义上说,将向量与标量相加的操作是在全维度上应用Broadcast的结果。Broadcast作为Stride Trick的子集,在as_strided函数对应位置的strides传入 0 就实现了Broadcast
另外,在as_strided相同位置有broadcast_to函数可以用来执行该操作。
在NumPy常用运算中,若操作向量之间大小不相同会自动尝试Broadcast操作。因而这一函数常常不会被显式调用。
利用Broadcast实现的操作往往需要配合往数据中增加维数为 1 的维度。用numpy.expand_dims可以实现这一点,该函数传入的axis是新数组中增加的维数为 1 的维度的索引。

例子:求第二组点在第一组点上的k-近邻

1
2
3
4
5
6
7
8
9
def knn_idx(x, y, k):
# x (n, d)
# y (m, d)
x_e = numpy.expand_dims(x, axis=1) # (n, 1, d)
y_e = numpy.expand_dims(y, axis=0) # (1, m, d)
# x_e - y_e Broadcasts to (n, m, d)
dist = numpy.sum((x_e - y_e) ** 2, axis=-1) # (n, m)
dist_sort = numpy.argsort(dist, axis=0)[:k, :] # (k, m)
return x[dist_sort] # (k, m, d)

例子:向量化二维卷积实现 - Part II
最后我们来利用上文的submatricesBroadcast以及爱因斯坦求和约定综合实现向量化的二维卷积。这将比Python循环实现的卷积快许多。

1
2
3
4
5
6
7
8
9
10
11
def conv_2d(inputs, kernel, output_shape):
subm = submatrices(inputs, kernel.shape[0], output_shape)
expand_kernel = numpy.expand_dims(kernel, 0)
broadcast_kernel = numpy.lib.stride_tricks.broadcast_to(
expand_kernel,
[inputs.shape[0]] + list(kernel.shape)
)
conv = numpy.einsum("nhwijk,nijko->nhwo", subm, broadcast_kernel)
return conv
# conv_2d(numpy.random.normal(size=[16, 24, 24, 6]), numpy.random.normal(size=[5, 5, 6, 32]), [20, 20, 32]).shape
# (16, 20, 20, 32)

End.