Skip to content

使用并行性

原文: http://docs.cython.org/en/latest/src/userguide/parallelism.html

Cython 通过 cython.parallel模块支持原生并行。要使用这种并行机制,必须释放 GIL(参见 释放 GIL)。它目前支持 OpenMP,但以后可能会支持更多后端。

注意: 由于 OpenMP 的限制,此模块的功能仅可用于主线程或并行区域。

cython.parallel.prange([start,] stop[, step][, nogil=False][, schedule=None[, chunksize=None]][, num_threads=None])

此函数可用于并行循环。 OpenMP 自动启动线程池并根据 schedule 分配工作。

变量的线程局部性和归约将会自动推断。

如果分配给 prange 块中的变量,它将变为 lastprivate,这意味着变量将包含上次迭代的值。如果对变量使用 原位运算符,它将变为归约,这意味着变量的线程局部副本的值将随操作符一起减少,并在循环后分配给原始变量。索引变量始终是 lastprivate。与块并行分配的变量在块之后将是私有的并且不可用,因为没有顺序最后值的概念。

参数:

  • start - 指示循环开始的索引(与范围中的起始参数相同)。
  • stop - 指示何时停止循环的索引(与范围中的停止参数相同)。
  • step - 给出步长的整数(与range中的step参数相同)。它不能为 0。
  • nogil - 此函数只能与释放的GIL一起使用。如果 nogil 为真,则循环代码将被包裹在 nogil 代码块中。
  • schedule

    schedule传递给 OpenMP的值,可以是以下之一:

    1. static:

    2. 静态调度

      如果chunksize(块大小参数)已知,循环会事前给定chunksize的块的形式分配给所有线程。如果没有提供chunksize,那么循环会被划分为大致相等的大小,每个线程最多只会事先分配一个块。

      最适合:调度规划的额外开销很重要,并且可以将问题简化为已知具有大致相同运行时间的相同大小的块。

    3. dynamic:

    4. 动态调度

      当线程请求循环的时候才分配给他们,默认分配的chunsize为1。

      最适合每个块的运行时间不同并且事前未知的情况。会使用更多数量的较小块来保持所有线程处于工作状态。

    5. guided:

    6. 规划过的动态调度

      像动态一样,循环也是在线程请求的时候分配,但是分配块大小是递减的。每个块的大小正比于未被分配到循环数除以正在参与的线程数,递减到最小为1(或者到给定的chunksize)。

      这比纯动态调度有一个优势,当事实证明最后一个块比预期花费更多时间或者其他方式被错误调度时,所以大多数线程开始运行空闲而最后一个块只由较少数量的线程处理。

    7. runtime:

    8. 调度和块大小从运行期间的scheduling变量获取。可以通过 openmp.omp_set_schedule() 函数调度、或者设置 OMP_SCHEDULE 环境变量来设置scheduling变量。请注意:这基本上禁止了任何静态编译期间对调度代码本身对优化举措,因此可能会产生比编译时就静态的确定调度策略来说更慢的速度。默认的调度策略就是定义的实现方式。更详细的信息可以查看 OpenMP说明

  • num_threads - num_threads参数表示团队应该包含多少个线程。如果没有给出,OpenMP 将决定使用多少线程。通常,这是计算机上可用的核心数。但是,这可以通过omp_set_num_threads()功能或OMP_NUM_THREADS环境变量来控制。

  • chunksize - chunksize参数指示用于在线程之间划分迭代的块大小。这仅对staticdynamicguided调度有效,并且是可选的。根据日程安排,它提供的负载平衡,调度开销和错误共享的数量(如果有的话),不同的组块可以提供显着不同的性能结果。

归约的示例:

# 经典风格
from cython.parallel import prange

cdef int i
cdef int n = 30
cdef int sum = 0

for i in prange(n, nogil=True):
    sum += i

print(sum)

# 纯Python风格
from cython.parallel import prange

i = cython.declare(cython.int)
n = cython.declare(cython.int, 30)
sum = cython.declare(cython.int, 0)

for i in prange(n, nogil=True):
    sum += i

print(sum)

类型化内存视图的示例(例如 NumPy 数组):

# 经典风格
from cython.parallel import prange

def func(double[:] x, double alpha):
    cdef Py_ssize_t i

    for i in prange(x.shape[0]):
        x[i] = alpha * x[i]

# 纯Python风格
from cython.parallel import prange

def func(x: cython.double[:], alpha: cython.double):
    i: cython.Py_ssize_t

    for i in prange(x.shape[0], nogil=True):
        x[i] = alpha * x[i]

cython.parallel.parallel(num_threads=None)

该指令可用作 with 语句的一部分,以并行执行代码串。这对于设置 prange 使用的线程局部缓冲区非常有用。包含的 prange 将是一个非平行的共享工作的循环,因此在并行部分中分配给的任何变量对于 prange 也是私有的。在并行块之后,并行块中的私有变量不可用。

线程局部缓冲区的示例:

# 经典风格
from cython.parallel import parallel, prange
from libc.stdlib cimport abort, malloc, free

cdef Py_ssize_t idx, i, n = 100
cdef int * local_buf
cdef size_t size = 10

with nogil, parallel():
    local_buf = <int *> malloc(sizeof(int) * size)
    if local_buf is NULL:
        abort()

    # populate our local buffer in a sequential loop
    for i in xrange(size):
        local_buf[i] = i * 2

    # share the work using the thread-local buffer(s)
    for i in prange(n, schedule='guided'):
        func(local_buf)

    free(local_buf)

# 纯Python风格
from cython.parallel import parallel, prange
from cython.cimports.libc.stdlib import abort, malloc, free

@cython.nogil
@cython.cfunc
def func(buf: cython.p_int) -> cython.void:
    pass
    # ...

idx = cython.declare(cython.Py_ssize_t)
i = cython.declare(cython.Py_ssize_t)
j = cython.declare(cython.Py_ssize_t)
n = cython.declare(cython.Py_ssize_t, 100)
local_buf = cython.declare(p_int)
size = cython.declare(cython.size_t, 10)

with cython.nogil, parallel():
    local_buf: cython.p_int = cython.cast(cython.p_int, malloc(cython.sizeof(cython.int) * size))
    if local_buf is cython.NULL:
        abort()

    # populate our local buffer in a sequential loop
    for i in range(size):
        local_buf[i] = i * 2

    # share the work using the thread-local buffer(s)
    for j in prange(n, schedule='guided'):
        func(local_buf)

    free(local_buf)

以后(的版本)可能会在并行块中支持在线程之间分配工作的代码块。

cython.parallel.threadid()

返回线程的 id。对于 n 个线程,id 的范围为从 0 到 n-1。

编译

要实际使用 OpenMP 支持,您需要告诉 C 或 C ++编译器启用 OpenMP。对于 gcc,这可以在 setup.py 中完成如下:

# 经典风格
from distutils.core import setup
from distutils.extension import Extension
from Cython.Build import cythonize

ext_modules = [
    Extension(
        "hello",
        ["hello.pyx"],
        extra_compile_args=['-fopenmp'],
        extra_link_args=['-fopenmp'],
    )
]

setup(
    name='hello-parallel-world',
    ext_modules=cythonize(ext_modules),
)

# 纯Python风格
from setuptools import Extension, setup
from Cython.Build import cythonize

ext_modules = [
    Extension(
        "hello",
        ["hello.py"],
        extra_compile_args=['-fopenmp'],
        extra_link_args=['-fopenmp'],
    )
]

setup(
    name='hello-parallel-world',
    ext_modules=cythonize(ext_modules),
)

对于 Microsoft Visual C ++编译器,请使用'/openmp'而不是'-fopenmp'

主动打破循环

并行和 prange 块在nogill模式下支持“break”(中断语句)、“continue”(继续语句)和“return”(返回语句)。此外,在这些块中使用 with gil 块是有效的,并且可以从中传播异常。但是,因为块使用 OpenMP,所以不能只留下异常(必须处理),因此最好写好退出的过程。对于 prange(),这意味着在第一次break、continue、return之后任何后续线程中的后续循环的循环主体都会被跳过。如果可能返回多个不同的值,则返回哪个值是未定义的,因为迭代没有特定的顺序:

# 经典风格
from cython.parallel import prange

cdef int func(Py_ssize_t n):
    cdef Py_ssize_t i

    for i in prange(n, nogil=True):
        if i == 8:
            with gil:
                raise Exception()
        elif i == 4:
            break
        elif i == 2:
            return i

# 纯Python风格
from cython.parallel import prange

@cython.exceptval(-1)
@cython.cfunc
def func(n: cython.Py_ssize_t) -> cython.int:
    i: cython.Py_ssize_t

    for i in prange(n, nogil=True):
        if i == 8:
            with cython.gil:
                raise Exception()
        elif i == 4:
            break
        elif i == 2:
            return i

在上面的例子中,是否会简单地中断、是否会返回 2、是否应该引发异常都是未知的

使用 OpenMP 函数

可以通过 cimporting openmp 使用 OpenMP 函数:

# 经典风格
# tag: openmp
# You can ignore the previous line.
# It's for internal testing of the Cython documentation.

from cython.parallel cimport parallel
cimport openmp

cdef int num_threads

openmp.omp_set_dynamic(1)
with nogil, parallel():
    num_threads = openmp.omp_get_num_threads()
    # ...

# 纯Python风格
from cython.parallel import parallel
from cython.cimports.openmp import omp_set_dynamic, omp_get_num_threads

num_threads = cython.declare(cython.int)

omp_set_dynamic(1)
with cython.nogil, parallel():
    num_threads = omp_get_num_threads()
    # ...


回到顶部