Skip to content

用 Cython 写并行代码

原文: http://docs.cython.org/en/latest/src/tutorial/parallelization.html

一种提升你的Cython代码速度的办法是使用并行:你可以编写在你CPU多个核心上同时运行的代码。使用并行的代码能够产生夸张的速度提升,倍数和你CPU的核心数量相等(比如CPU有4核,就会有4倍的速度提升)。

这个教程假设你已经对Cython的类型化内存视图非常熟悉了(因为使用类型化内存视图的代码常常是最容易在Cython中并行的那种代码。除此之外,还假设你已经对写一般并行代码会产生的那些问题比较熟悉(因为 这是Cython教程,而不是并行编程的教程)。

在开始之前,有一些事情需要注意:

  • 不是所有代码都可以并行化 -- 比如某些算法的代码依赖于顺序执行,这时你不应该尝试对其并行化。一个好的例子比如计算累计总值。
  • 不是所有的代码都值得被并行化 -- 启动一个并行模块总是有一个不小的额外开销的,所以你必须保证你操作的数据量足够大来保证这个额外开销是值得的。另外,请确保你确实对你的数据做了处理!使用多线程来读取同样的 数据并不能在并行的情况下表现太好,如果你不信,可以自己测一下时间。
  • Cython要求并行模块的内容是nogil的。如果你的算法要求访问Python对象,那它可能不适合并行化。
  • Cython的内置并行化使用 OpenMP 来构建 omp parallel foromp parallel。这些比较适合用来并行化一些相对较小的、独立的代码模块(比如循环)。但是,如果你需要使用其他并行模型比如 衍生并等待任务或者将一些其他工作交给一个连续运行的线程,那你最好还是使用其他的模块(比如Python的 threading 模块)。
  • 实际去实施你的并行Cython代码可能是你优化策略的最后一步。你应该从写线性的代码开始。尽管如此,早做并行的打算也是值得的,因为这可能会影响你对算法的选择。

这个教程并不准备探索所有定制并行化的可选项。更多内容可以查看使用并行性文档内容。除此之外,你还应该了解到Cython对于将你的代码并行化所做出的许多的选择都是固定的,因此如果你希望使用 特定的Cython没有默认提供的OpenMP行为,你最好还是自己写C语言代码。

编译

OpenMP需要你的C/C ++编译器的支持。通常可以通过一行特殊的命令行参数开启支持:如果是GCC的话是 -fopenmp, 如果是MSVC的话是 /openmp。如果你的编译器不支持OpenMP(或者你忘记传参数了),那么你的代码 仍然会编译,只是不会在并行状态运行。

在这个教程中,下面这个setup.py可以用来编译案例。

from setuptools import Extension, setup
from Cython.Build import cythonize
import sys

if sys.platform.startswith("win"):
    openmp_arg = '/openmp'
else:
    openmp_arg = '-fopenmp'


ext_modules = [
    Extension(
        "*",
        ["*.pyx"],
        extra_compile_args=[openmp_arg],
        extra_link_args=[openmp_arg],
    ),
    Extension(
        "*",
        ["*.pyx"],
        extra_compile_args=[openmp_arg],
        extra_link_args=[openmp_arg],
    )
]

setup(
    name='parallel-tutorial',
    ext_modules=cythonize(ext_modules),
)

逐个元素的并行操作

在Cython中,最简单且最常见的并行操作就是在一个数组中逐个元素的遍历,对数组中每一个元素进同样的操作。下面这个简单的案例中,我们计算了数组中每个元素的 sin值。

from cython.parallel cimport prange
cimport cython
from libc.math cimport sin

import numpy as np

@cython.boundscheck(False)
@cython.wraparound(False)
def do_sine(double[:,:] input):
    cdef double[:,:] output = np.empty_like(input)
    cdef Py_ssize_t i, j

    for i in prange(input.shape[0], nogil=True):
        for j in range(input.shape[1]):
            output[i, j] = sin(input[i, j])
    return np.asarray(output)

我们将最外层的循环并行化了。这通常是一个好的主意,毕竟进入和离开一个并发模块总是有额外开销的。但是,你也应该考虑你的数组可能的大小。如果input通常的大小为(2,10000000), 那么对长度为2的维度进行并行化不是一个好主意。

循环的主体本身是 nogil 的,比如你不能做任何"Python"的操作。这是一个非常强的限制,如果你发现你必须使用GIL,那Cython的并行化特性可能不适合你。在循环内可以抛出异常,但是 Cython只会重新获取GIL并引发这个异常,然后终结所有线程上的循环。

需要对循环变量i显式的定义为C整数类型。对于非并行的循环Cython能够推理出这一点,但是对于并行的循环变量目前不进行推理,所以不给i限定类型会引发编译错误,因为它会是一个Python 对象,因此没有GIL的情况下无法使用。

如果有经验的OpenMP使用者想看,那么生成的C语言代码如下所示。为了可读性这里做了一点点简化

#pragma omp parallel
{
    #pragma omp for firstprivate(i) lastprivate(i) lastprivate(j)
    for (__pyx_t_8 = 0; __pyx_t_8 < __pyx_t_9; __pyx_t_8++){
        i = __pyx_t_8;
        /* body goes here */
    }
}

私有变量

上面这段生成的C语言代码有一点值得注意 - 循环内使用变量比如 ij 会被标记为 firstprivatelastprivate。在循环中,每个线程都有属于自己的一份数据的副本,数据会根据循环前 的值进行初始化,且在循环结束后,"全局的"副本的值会被设置为与最后一次循环相等(就好像循环是线性执行的一样)。

Cython采用的基本规则是: - 在 prange 块内的 C 标量设为 firstprivatelastprivate, - 在一个循环块内赋值的C标量是 private,这代表它们不能被用于在块内和块外传递数据。 - 数组变量(比如内存视图)不是私有的。相反,Cython假设:你已经给循环分了层次,也就是说每次循环都是处理不同的数据。 - Python 对象也不会被设为私有,尽管对于它们的访问被Python的GIL控制。

Cython不允许覆盖这些选择。

归约(Reductions)操作

在Cython中第二常见的并行操作是归约(Reductions)操作。一个常见的例子是在整个数组上计算累加总和,比如下面这段计算向量范数的程序:

from cython.parallel cimport prange
cimport cython
from libc.math cimport sqrt

@cython.boundscheck(False)
@cython.wraparound(False)
def l2norm(double[:] x):
    cdef double total = 0
    cdef Py_ssize_t i
    for i in prange(x.shape[0], nogil=True):
        total += x[i]*x[i]
    return sqrt(total)

Cython支持对 += , *= , &= , |= , ^= 这些运算符进行推理。这些只能应用于C标量,因此,举个例子来说,你并不能很轻松的将2维内存视图归约成1维内存视图。

生成的C语言代码大概是这样的:

#pragma omp parallel reduction(+:total)
{
    #pragma omp for firstprivate(i) lastprivate(i)
    for (__pyx_t_2 = 0; __pyx_t_2 < __pyx_t_3; __pyx_t_2++){
        i = __pyx_t_2;
        total = total + /* some indexing code */;

    }
}

并行代码块

Cython的 parallel 运算符比 prange 更少的被使用。parallel 会产生在多个线程下同时运行的代码块。而 prange 则不然,线程间工作并不是自动划分的。

这里我们展示三种 parallel 代码块的 常见用途。

将prange块写在一起

进入和离开一个并行区域总是有一些额外开销的。因此,如果你有许多并行区域且中间夹着者线性的操作,写一个很大的并行代码块可能是更为高效的做法。虽然微小的线性操作的 部分重复了,但是总体开销减少了。

在下面这个案例中我们完成了一个向量的原地正则化。第一个并行循环体计算范数,第二个并行循环体将范数应用到向量上,因此我们避免进入并离开夹杂在中间的线性区域。

from cython.parallel cimport parallel, prange
cimport cython
from libc.math cimport sqrt

@cython.boundscheck(False)
@cython.wraparound(False)
def normalize(double[:] x):
    cdef Py_ssize_t i
    cdef double total = 0
    cdef double norm
    with nogil, parallel():
        for i in prange(x.shape[0]):
            total += x[i]*x[i]
        norm = sqrt(total)
        for i in prange(x.shape[0]):
            x[i] /= norm

C代码大致是这样:

#pragma omp parallel private(norm) reduction(+:total)
{
    /* some calculations of array size... */
    #pragma omp for firstprivate(i) lastprivate(i)
    for (__pyx_t_2 = 0; __pyx_t_2 < __pyx_t_3; __pyx_t_2++){
        /* ... */
    }
    norm = sqrt(total);
    #pragma omp for firstprivate(i) lastprivate(i)
    for (__pyx_t_2 = 0; __pyx_t_2 < __pyx_t_3; __pyx_t_2++){
        /* ... */
    }
}

为每个线程分配暂存空间

假设每个线程都需要一个小的暂存空间来工作。它们彼此不可以共享暂存空间,不然会引发数据竞争。在这种场景下,分配和回收内存是在包裹着循环都并发区域中完成(因此是每个线程都执行,完成后该循环体可以使用)。

这里我们的案例使用C ++来计算2维数组中每一列的中间值(只是 numpy.median(x,axis=0) 的并行版本)。我们需要给每一列重新排序以寻找中间值,但是我们不希望修改原始的输入数组。因此,我们给每个线程 分配了一个C ++ 向量作为暂存空间,并且在暂存空间内工作。为了性能考虑,向量在 prange 循环外部分配。

# distutils: language = c++

from cython.parallel cimport parallel, prange
from libcpp.vector cimport vector
from libcpp.algorithm cimport nth_element
cimport cython
from cython.operator cimport dereference

import numpy as np

@cython.boundscheck(False)
@cython.wraparound(False)
def median_along_axis0(const double[:,:] x):
    cdef double[::1] out = np.empty(x.shape[1])
    cdef Py_ssize_t i, j

    cdef vector[double] *scratch
    cdef vector[double].iterator median_it
    with nogil, parallel():
        # allocate scratch space per loop
        scratch = new vector[double](x.shape[0])
        try:
            for i in prange(x.shape[1]):
                # copy row into scratch space
                for j in range(x.shape[0]):
                    dereference(scratch)[j] = x[j, i]
                median_it = scratch.begin() + scratch.size()//2
                nth_element(scratch.begin(), median_it, scratch.end())
                # for the sake of a simple example, don't handle even lengths...
                out[i] = dereference(median_it)
        finally:
            del scratch
    return np.asarray(out)

注意:如果这里不用经典Cython写法,而是用纯Python风格的话,代码不完全相同。因为纯Python风格的语法下目前还不支持 C ++ 的 "new",因此在分配暂存空间的代码上会有些许不同。

在生成的代码中,scratch 变量被声明为 最外层循环体的 private。大概的轮廓是这样的:

#pragma omp parallel private(scratch)
{
    scratch = new std::vector<double> ((x.shape[0]))
    #pragma omp for firstprivate(i) lastprivate(i) lastprivate(j) lastprivate(median_it)
    for (__pyx_t_9 = 0; __pyx_t_9 < __pyx_t_10; __pyx_t_9++){
        i = __pyx_t_9;
        /* implementation goes here */
    }
    /* some exception handling detail omitted */
    delete scratch;
}

每个线程执行不同的任务

最后,如果你手动设置线程数然后用 omp.get_thread_num() 区分每一个线程,你可以手动划分线程的工作。这在Cython中是一个比较少见的场景,并且大概意味着使用 threading 模块 可能更适合你想做的事。但是无论如何这是一个选择:

from cython.parallel cimport parallel
from openmp cimport omp_get_thread_num




cdef void long_running_task1() nogil:
    pass



cdef void long_running_task2() nogil:
    pass

def do_two_tasks():
    cdef int thread_num
    with nogil, parallel(num_threads=2):
        thread_num = omp_get_thread_num()
        if thread_num == 0:
            long_running_task1()
        elif thread_num == 1:
            long_running_task2()

这种代码块的作用是受限的,因为块内分配的变量是对于每个线程 private 的, 因此在后面的线程区域中不能被访问。

上面这个案例产生的C代码比较简单:

#pragma omp parallel private(thread_num)
{
    thread_num = omp_get_thread_num();
    switch (thread_num) {
        /* ... */
    }
}


回到顶部