我试图在numpy数组中找到重复行。下面的代码复制我的数组的结构,其中每一行有n行、m列和nz非零条目:
import numpy as np
import random
import datetime
def create_mat(n, m, nz):
sample_mat = np.zeros((n, m), dtype='uint8')
random.seed(42)
for row in range(0, n):
counter = 0
while counter < nz:
random_col = random.randrange(0, m-1, 1)
if sample_mat[row, random_col] == 0:
sample_mat[row, random_col] = 1
counter += 1
test = np.all(np.sum(sample_mat, axis=1) == nz)
print(f'All rows have {nz} elements: {test}')
return sample_mat
我试图优化的代码如下:
if __name__ == '__main__':
threshold = 2
mat = create_mat(1800000, 108, 8)
print(f'Time: {datetime.datetime.now()}')
unique_rows, _, duplicate_counts = np.unique(mat, axis=0, return_counts=True, return_index=True)
duplicate_indices = [int(x) for x in np.argwhere(duplicate_counts >= threshold)]
print(f'Time: {datetime.datetime.now()}')
print(f'Unique rows: {len(unique_rows)} Sample inds: {duplicate_indices[0:5]} Sample counts: {duplicate_counts[0:5]}')
print(f'Sample rows:')
print(unique_rows[0:5])
我的产出如下:
All rows have 8 elements: True
Time: 2022-06-29 12:08:07.320834
Time: 2022-06-29 12:08:23.281633
Unique rows: 1799994 Sample inds: [508991, 553136, 930379, 1128637, 1290356] Sample counts: [1 1 1 1 1]
Sample rows:
[[0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 1 1 0 0 1 1 0 0 0 1 0 0 0 0 0 0 1 0 1 0]
[0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 1 1 0 0 0 0 0 1 1 1 1 0 0 0 0 1 0 0 0 0 0]
[0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0 0 0 0 1 0 0 1 0 0 0 0 1 0 1 0 0 1 0 0 0 1 0 1 0 1 0 0 0]
[0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0 0 0 0 1 0 1 1 0 0 0 0 1 1 0 0 0 0 0 0 1 1 0 1 0 0 0 0 0]
[0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0 0 0 1 1 0 0 0 0 1 0 0 0 1 0 1 1 0 0 0 0 0 1 0 0 0 0 1 0]]
我考虑过使用numba,但挑战是它不使用轴参数。类似地,将集合转换为list和the是一个选项,但是循环执行重复计数似乎是"unpythonic“。
考虑到我需要多次运行这段代码(因为我正在修改numpy数组,然后需要重新搜索副本),那么时间是非常关键的。我也尝试过针对这个步骤使用多进程,但是np.unique似乎是阻塞的(也就是说,即使我试图运行多个版本的多个版本,我也会被限制在一个线程上运行6%的CPU容量,而其他线程处于空闲状态)。
发布于 2022-07-02 05:14:30
第一步:比特包装
因为您的矩阵只包含二进制值,所以您可以在中积极地将比特打包到uint64值中,以便执行更有效的排序。下面是Numba的实现:
import numpy as np
import numba as nb
@nb.njit('(uint8[:,::1],)', parallel=True)
def pack_bits(mat):
n, m = mat.shape
res = np.zeros((n, (m+63)//64), np.uint64)
for i in nb.prange(n):
for bj in range(0, m, 64):
val = np.uint64(0)
if bj + 64 <= m:
# Fast case
for j in range(64):
val += np.uint64(mat[i, bj+j]) << (63 - j)
else:
# Slow case (boundary)
for j in range(m - bj):
val += np.uint64(mat[i, bj+j]) << (63 - j)
res[i, bj//64] = val
return res
@nb.njit('(uint64[:,::1], int_)', parallel=True)
def unpack_bits(mat, m):
n = mat.shape[0]
assert mat.shape[1] == (m+63)//64
res = np.zeros((n, m), np.uint64)
for i in nb.prange(n):
for bj in range(0, m, 64):
val = np.uint64(mat[i, bj//64])
if bj + 64 <= m:
# Fast case
for j in range(64):
res[i, bj+j] = np.uint8((val >> (63 - j)) & 1)
else:
# Slow case (boundary)
for j in range(m - bj):
res[i, bj+j] = np.uint8((val >> (63 - j)) & 1)
return res
可以在小得多的打包数组上调用np.unique
函数,如初始代码中的那样(除了得到的排序数组是打包的数组,需要解压缩)。因为你不需要指数,所以最好不要计算它。因此,可以删除return_index=True
。此外,只有所需的值可以被解压缩(解压缩比打包要昂贵一些,因为编写一个大的矩阵比读取一个现有的矩阵要昂贵)。
if __name__ == '__main__':
threshold = 2
n, m = 1800000, 108
mat = create_mat(n, m, 8)
print(f'Time: {datetime.datetime.now()}')
packed_mat = pack_bits(mat)
duplicate_packed_rows, duplicate_counts = np.unique(packed_mat, axis=0, return_counts=True)
duplicate_indices = [int(x) for x in np.argwhere(duplicate_counts >= threshold)]
print(f'Time: {datetime.datetime.now()}')
print(f'Duplicate rows: {len(duplicate_rows)} Sample inds: {duplicate_indices[0:5]} Sample counts: {duplicate_counts[0:5]}')
print(f'Sample rows:')
print(unpack_bits(duplicate_packed_rows[0:5], m))
步骤2:np.unique
优化
np.unique
调用是次优的,因为它执行多个昂贵的内部排序步骤。在您的具体情况下,并不需要所有这些步骤,并且可以对某些步骤进行优化。
更有效的实现是在第一步对最后一列进行排序,然后对前一列进行排序,以此类推,直到第一列被排序,类似于基排序。请注意,可以使用非稳定算法(通常更快)对最后一列进行排序,但其他列需要一个稳定的算法。由于argsort
调用速度慢,而且当前的实现还没有使用多线程,此方法仍然是次优的。不幸的是,Numpy还没有证明对2D数组行进行排序的任何有效方法。虽然可以在Numba中重新实现这一点,但这很麻烦,有点棘手,而且容易出错。更不用说Numba介绍了一些与本地C/C++代码相比的开销。一旦排序,就可以跟踪和计数唯一/重复的行。以下是一个实现:
def sort_lines(mat):
n, m = mat.shape
for i in range(m):
kind = 'stable' if i > 0 else None
mat = mat[np.argsort(mat[:,m-1-i], kind=kind)]
return mat
@nb.njit('(uint64[:,::1],)', parallel=True)
def find_duplicates(sorted_mat):
n, m = sorted_mat.shape
assert m >= 0
isUnique = np.zeros(n, np.bool_)
uniqueCount = 1
if n > 0:
isUnique[0] = True
for i in nb.prange(1, n):
isUniqueVal = False
for j in range(m):
isUniqueVal |= sorted_mat[i, j] != sorted_mat[i-1, j]
isUnique[i] = isUniqueVal
uniqueCount += isUniqueVal
uniqueValues = np.empty((uniqueCount, m), np.uint64)
duplicateCounts = np.zeros(len(uniqueValues), np.uint64)
cursor = 0
for i in range(n):
cursor += isUnique[i]
for j in range(m):
uniqueValues[cursor-1, j] = sorted_mat[i, j]
duplicateCounts[cursor-1] += 1
return uniqueValues, duplicateCounts
以前的np.unique
调用可以由find_duplicates(sort_lines(packed_mat))
替换。
步骤3:基于GPU的np.unique
虽然使用Numba和Numpy在CPU上实现行排序的快速算法并不容易,但假设Nvidia可用并安装了CUDA (以及CuPy),可以简单地在GPU上使用CuPy来实现。这个解决方案的好处是简单,而且大大提高了效率。下面是一个示例:
import cupy as cp
def cupy_sort_lines(mat):
cupy_mat = cp.array(mat)
return cupy_mat[cp.lexsort(cupy_mat.T[::-1,:])].get()
以前的sort_lines
调用可以由cupy_sort_lines
替换。
结果
下面是我的机器上有6核i5-9600KF CPU和Nvidia 1660超级GPU的计时:
Initial version: 15.541 s
Optimized packing: 0.982 s
Optimized np.unique: 0.634 s
GPU-based sorting: 0.143 s (require a Nvidia GPU)
因此,基于CPU的优化版本大约是快25倍的,而基于GPU的则是109倍快的。注意,在所有版本中,排序都需要很长的时间。此外,请注意,解压缩不包括在基准(如所提供的代码)。它所需的时间可以忽略不计,只要只有少数行被解压缩,而不是所有的完整数组(在我的机器上大约需要200 ms )。最后一个操作可以进一步优化,而代价是一个更复杂的实现。
发布于 2022-07-03 14:42:33
只是为了共享一个简单的解决方案作为基准:
第1版:
def get_duplicate_indexes(data):
signature_to_indexes = {}
index = 0
for row in data:
key = row.data.tobytes()
if key not in signature_to_indexes:
signature_to_indexes[key] = [index]
else:
signature_to_indexes[key].append(index)
index = index + 1
return [
indexes
for indexes in signature_to_indexes.values()
if len(indexes) > 1
]
In [122]: %timeit get_duplicate_indexes(mat)
833 ms ± 5.94 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
In [125]: %time get_duplicate_indexes(mat)
CPU times: user 1.5 s, sys: 87.1 ms, total: 1.59 s
Wall time: 1.59 s
In [123]: get_duplicate_indexes(mat)
Out[123]:
[[1396, 402980],
[769782, 1421364],
[875866, 1476693],
[892483, 1194500],
[1230863, 1478688],
[1311189, 1426136]]
第2版(与@Kelly Bundy讨论后):
def get_duplicate_indexes(data):
signature_to_indexes = {}
duplicates = {}
for index, row in enumerate(data):
key = row.data.tobytes()
if key not in signature_to_indexes:
signature_to_indexes[key] = index
else:
indexes = signature_to_indexes[key]
if isinstance(indexes, int):
duplicates[key] = signature_to_indexes[key] = [indexes, index]
else:
indexes.append(index)
return list(duplicates.values())
In [146]: %timeit get_duplicate_indexes(mat)
672 ms ± 3.67 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
In [147]: %time get_duplicate_indexes(mat)
CPU times: user 652 ms, sys: 46.6 ms, total: 699 ms
Wall time: 698 ms
JFYI:初始版本在我的机器上使用了~14。
发布于 2022-07-02 06:44:30
如果不想在多维数组上使用位打包或调用np.unique
,也可以创建视图。它比任何其他优化方法都慢,但更有教育意义:
def create_view(arr): # a, b are arrays
arr = np.ascontiguousarray(arr)
void_dt = np.dtype((np.void, arr.dtype.itemsize * arr.shape[1]))
return arr.view(void_dt).ravel()
def create_mat(n, m, nz):
rng = np.random.default_rng()
data = rng.permuted(np.full((n, m), [1] * nz + [0] * (m - nz), dtype=np.uint8), axis=1)
return data
if __name__ == '__main__':
mat = create_mat(1800000, 108, 8)
mat_view = create_view(mat)
u, idx, counts = np.unique(mat_view, axis=0, return_index=True, return_counts=True)
duplicate_indices = np.flatnonzero(counts >= 2)
print(f'Duplicate inds: {duplicate_indices}')
print(f'Duplicate counts: {counts[duplicate_indices]}')
print(f'Unique rows: {len(u)}')
请注意,我也修复了一些代码。create_mat
可以以一种更有效的方式完成。运行时为~3秒:)
https://stackoverflow.com/questions/72804712
复制相似问题