首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何加速numpy.unique并同时提供计数和重复行索引

如何加速numpy.unique并同时提供计数和重复行索引
EN

Stack Overflow用户
提问于 2022-06-29 16:15:11
回答 4查看 285关注 0票数 6

我试图在numpy数组中找到重复行。下面的代码复制我的数组的结构,其中每一行有n行、m列和nz非零条目:

代码语言:javascript
运行
复制
import numpy as np
import random
import datetime


def create_mat(n, m, nz):
    sample_mat = np.zeros((n, m), dtype='uint8')
    random.seed(42)
    for row in range(0, n):
        counter = 0
        while counter < nz:
            random_col = random.randrange(0, m-1, 1)
            if sample_mat[row, random_col] == 0:
                sample_mat[row, random_col] = 1
                counter += 1
    test = np.all(np.sum(sample_mat, axis=1) == nz)
    print(f'All rows have {nz} elements: {test}')
    return sample_mat

我试图优化的代码如下:

代码语言:javascript
运行
复制
if __name__ == '__main__':
    threshold = 2
    mat = create_mat(1800000, 108, 8)

    print(f'Time: {datetime.datetime.now()}')
    unique_rows, _, duplicate_counts = np.unique(mat, axis=0, return_counts=True, return_index=True)
    duplicate_indices = [int(x) for x in np.argwhere(duplicate_counts >= threshold)]
    print(f'Time: {datetime.datetime.now()}')

    print(f'Unique rows: {len(unique_rows)} Sample inds: {duplicate_indices[0:5]} Sample counts: {duplicate_counts[0:5]}')
    print(f'Sample rows:')
    print(unique_rows[0:5])

我的产出如下:

代码语言:javascript
运行
复制
All rows have 8 elements: True
Time: 2022-06-29 12:08:07.320834
Time: 2022-06-29 12:08:23.281633
Unique rows: 1799994 Sample inds: [508991, 553136, 930379, 1128637, 1290356] Sample counts: [1 1 1 1 1]
Sample rows:
[[0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
  0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
  0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 1 1 0 0 1 1 0 0 0 1 0 0 0 0 0 0 1 0 1 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
  0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
  0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 1 1 0 0 0 0 0 1 1 1 1 0 0 0 0 1 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
  0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
  0 0 0 0 0 0 0 0 0 0 0 1 0 0 1 0 0 0 0 1 0 1 0 0 1 0 0 0 1 0 1 0 1 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
  0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
  0 0 0 0 0 0 0 0 0 0 0 1 0 1 1 0 0 0 0 1 1 0 0 0 0 0 0 1 1 0 1 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
  0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
  0 0 0 0 0 0 0 0 0 0 1 1 0 0 0 0 1 0 0 0 1 0 1 1 0 0 0 0 0 1 0 0 0 0 1 0]]

我考虑过使用numba,但挑战是它不使用轴参数。类似地,将集合转换为list和the是一个选项,但是循环执行重复计数似乎是"unpythonic“。

考虑到我需要多次运行这段代码(因为我正在修改numpy数组,然后需要重新搜索副本),那么时间是非常关键的。我也尝试过针对这个步骤使用多进程,但是np.unique似乎是阻塞的(也就是说,即使我试图运行多个版本的多个版本,我也会被限制在一个线程上运行6%的CPU容量,而其他线程处于空闲状态)。

EN

回答 4

Stack Overflow用户

回答已采纳

发布于 2022-07-02 05:14:30

第一步:比特包装

因为您的矩阵只包含二进制值,所以您可以在中积极地将比特打包到uint64值中,以便执行更有效的排序。下面是Numba的实现:

代码语言:javascript
运行
复制
import numpy as np
import numba as nb

@nb.njit('(uint8[:,::1],)', parallel=True)
def pack_bits(mat):
    n, m = mat.shape
    res = np.zeros((n, (m+63)//64), np.uint64)
    for i in nb.prange(n):
        for bj in range(0, m, 64):
            val = np.uint64(0)
            if bj + 64 <= m:
                # Fast case
                for j in range(64):
                    val += np.uint64(mat[i, bj+j]) << (63 - j)
            else:
                # Slow case (boundary)
                for j in range(m - bj):
                    val += np.uint64(mat[i, bj+j]) << (63 - j)
            res[i, bj//64] = val
    return res

@nb.njit('(uint64[:,::1], int_)', parallel=True)
def unpack_bits(mat, m):
    n = mat.shape[0]
    assert mat.shape[1] == (m+63)//64
    res = np.zeros((n, m), np.uint64)
    for i in nb.prange(n):
        for bj in range(0, m, 64):
            val = np.uint64(mat[i, bj//64])
            if bj + 64 <= m:
                # Fast case
                for j in range(64):
                    res[i, bj+j] = np.uint8((val >> (63 - j)) & 1)
            else:
                # Slow case (boundary)
                for j in range(m - bj):
                    res[i, bj+j] = np.uint8((val >> (63 - j)) & 1)
    return res

可以在小得多的打包数组上调用np.unique函数,如初始代码中的那样(除了得到的排序数组是打包的数组,需要解压缩)。因为你不需要指数,所以最好不要计算它。因此,可以删除return_index=True。此外,只有所需的值可以被解压缩(解压缩比打包要昂贵一些,因为编写一个大的矩阵比读取一个现有的矩阵要昂贵)。

代码语言:javascript
运行
复制
if __name__ == '__main__':
    threshold = 2
    n, m = 1800000, 108
    mat = create_mat(n, m, 8)

    print(f'Time: {datetime.datetime.now()}')
    packed_mat = pack_bits(mat)
    duplicate_packed_rows, duplicate_counts = np.unique(packed_mat, axis=0, return_counts=True)
    duplicate_indices = [int(x) for x in np.argwhere(duplicate_counts >= threshold)]
    print(f'Time: {datetime.datetime.now()}')

    print(f'Duplicate rows: {len(duplicate_rows)} Sample inds: {duplicate_indices[0:5]} Sample counts: {duplicate_counts[0:5]}')
    print(f'Sample rows:')
    print(unpack_bits(duplicate_packed_rows[0:5], m))

步骤2:np.unique优化

np.unique调用是次优的,因为它执行多个昂贵的内部排序步骤。在您的具体情况下,并不需要所有这些步骤,并且可以对某些步骤进行优化。

更有效的实现是在第一步对最后一列进行排序,然后对前一列进行排序,以此类推,直到第一列被排序,类似于基排序。请注意,可以使用非稳定算法(通常更快)对最后一列进行排序,但其他列需要一个稳定的算法。由于argsort调用速度慢,而且当前的实现还没有使用多线程,此方法仍然是次优的。不幸的是,Numpy还没有证明对2D数组行进行排序的任何有效方法。虽然可以在Numba中重新实现这一点,但这很麻烦,有点棘手,而且容易出错。更不用说Numba介绍了一些与本地C/C++代码相比的开销。一旦排序,就可以跟踪和计数唯一/重复的行。以下是一个实现:

代码语言:javascript
运行
复制
def sort_lines(mat):
    n, m = mat.shape

    for i in range(m):
        kind = 'stable' if i > 0 else None
        mat = mat[np.argsort(mat[:,m-1-i], kind=kind)]

    return mat

@nb.njit('(uint64[:,::1],)', parallel=True)
def find_duplicates(sorted_mat):
    n, m = sorted_mat.shape
    assert m >= 0

    isUnique = np.zeros(n, np.bool_)
    uniqueCount = 1
    if n > 0:
        isUnique[0] = True
    for i in nb.prange(1, n):
        isUniqueVal = False
        for j in range(m):
            isUniqueVal |= sorted_mat[i, j] != sorted_mat[i-1, j]
        isUnique[i] = isUniqueVal
        uniqueCount += isUniqueVal

    uniqueValues = np.empty((uniqueCount, m), np.uint64)
    duplicateCounts = np.zeros(len(uniqueValues), np.uint64)

    cursor = 0
    for i in range(n):
        cursor += isUnique[i]
        for j in range(m):
            uniqueValues[cursor-1, j] = sorted_mat[i, j]
        duplicateCounts[cursor-1] += 1

    return uniqueValues, duplicateCounts

以前的np.unique调用可以由find_duplicates(sort_lines(packed_mat))替换。

步骤3:基于GPU的np.unique

虽然使用Numba和Numpy在CPU上实现行排序的快速算法并不容易,但假设Nvidia可用并安装了CUDA (以及CuPy),可以简单地在GPU上使用CuPy来实现。这个解决方案的好处是简单,而且大大提高了效率。下面是一个示例:

代码语言:javascript
运行
复制
import cupy as cp

def cupy_sort_lines(mat):
    cupy_mat = cp.array(mat)
    return cupy_mat[cp.lexsort(cupy_mat.T[::-1,:])].get()

以前的sort_lines调用可以由cupy_sort_lines替换。

结果

下面是我的机器上有6核i5-9600KF CPU和Nvidia 1660超级GPU的计时:

代码语言:javascript
运行
复制
Initial version:        15.541 s
Optimized packing:       0.982 s
Optimized np.unique:     0.634 s
GPU-based sorting:       0.143 s   (require a Nvidia GPU)

因此,基于CPU的优化版本大约是快25倍的,而基于GPU的则是109倍快的。注意,在所有版本中,排序都需要很长的时间。此外,请注意,解压缩不包括在基准(如所提供的代码)。它所需的时间可以忽略不计,只要只有少数行被解压缩,而不是所有的完整数组(在我的机器上大约需要200 ms )。最后一个操作可以进一步优化,而代价是一个更复杂的实现。

票数 4
EN

Stack Overflow用户

发布于 2022-07-03 14:42:33

只是为了共享一个简单的解决方案作为基准:

第1版:

代码语言:javascript
运行
复制
def get_duplicate_indexes(data):
    signature_to_indexes = {}

    index = 0
    for row in data:
        key = row.data.tobytes()
        if key not in signature_to_indexes:
            signature_to_indexes[key] = [index]
        else:
            signature_to_indexes[key].append(index)
        index = index + 1

    return [
        indexes
        for indexes in signature_to_indexes.values()
        if len(indexes) > 1
    ]

In [122]: %timeit get_duplicate_indexes(mat)
833 ms ± 5.94 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [125]: %time get_duplicate_indexes(mat)
CPU times: user 1.5 s, sys: 87.1 ms, total: 1.59 s
Wall time: 1.59 s

In [123]: get_duplicate_indexes(mat)
Out[123]:
[[1396, 402980],
 [769782, 1421364],
 [875866, 1476693],
 [892483, 1194500],
 [1230863, 1478688],
 [1311189, 1426136]]

第2版(与@Kelly Bundy讨论后):

代码语言:javascript
运行
复制
def get_duplicate_indexes(data):
    signature_to_indexes = {}
    duplicates = {}

    for index, row in enumerate(data):
        key = row.data.tobytes()
        if key not in signature_to_indexes:
            signature_to_indexes[key] = index
        else:
            indexes = signature_to_indexes[key]
            if isinstance(indexes, int):
                duplicates[key] = signature_to_indexes[key] = [indexes, index]
            else:
                indexes.append(index)
    return list(duplicates.values())

In [146]: %timeit get_duplicate_indexes(mat)
672 ms ± 3.67 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [147]: %time get_duplicate_indexes(mat)
CPU times: user 652 ms, sys: 46.6 ms, total: 699 ms
Wall time: 698 ms

JFYI:初始版本在我的机器上使用了~14。

票数 2
EN

Stack Overflow用户

发布于 2022-07-02 06:44:30

如果不想在多维数组上使用位打包或调用np.unique,也可以创建视图。它比任何其他优化方法都慢,但更有教育意义:

代码语言:javascript
运行
复制
def create_view(arr): # a, b are arrays
    arr = np.ascontiguousarray(arr)
    void_dt = np.dtype((np.void, arr.dtype.itemsize * arr.shape[1]))
    return arr.view(void_dt).ravel()

def create_mat(n, m, nz):
    rng = np.random.default_rng()
    data = rng.permuted(np.full((n, m), [1] * nz + [0] * (m - nz), dtype=np.uint8), axis=1)
    return data

if __name__ == '__main__':
    mat = create_mat(1800000, 108, 8)
    mat_view = create_view(mat)
    u, idx, counts = np.unique(mat_view, axis=0, return_index=True, return_counts=True)
    duplicate_indices = np.flatnonzero(counts >= 2)

    print(f'Duplicate inds: {duplicate_indices}')
    print(f'Duplicate counts: {counts[duplicate_indices]}')
    print(f'Unique rows: {len(u)}')

请注意,我也修复了一些代码。create_mat可以以一种更有效的方式完成。运行时为~3秒:)

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72804712

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档