假设我们有一个具有形状(N,)的numpy数组A,一个具有形状(M,3)的矩阵D (M,3),以及另一个具有形状(M,3)的矩阵I(M,3),它具有D中每个数据元素的相应索引,我们如何构造一个给定的D和I,使重复的元素索引被添加?
示例:
############# A[I] := D ###################################
A = [0.5, 0.6] # Final Reduced Data Vector
D = [[0.1, 0.1 0.2], [0.2, 0.4, 0.1]] # Data
I = [[0, 1, 0], [0, 1, 1]] # Indices
例如:
A[0] = D[0][0] + D[0][2] + D[1][0] # 0.5 = 0.1 + 0.2 + 0.2
因为在索引矩阵中我们有:
I[0][0] = I[0][2] = I[1][0] = 0
目标是避免对所有元素进行循环,以便对大N,M (10^6-10^9)有效。
发布于 2020-12-06 21:08:33
我怀疑你能比np.bincount
快得多--注意到正式文件是如何提供这个精确的使用程序的。
# Your example
A = [0.5, 0.6]
D = [[0.1, 0.1, 0.2], [0.2, 0.4, 0.1]]
I = [[0, 1, 0], [0, 1, 1]]
# Solution
import numpy as np
D, I = np.array(D).flatten(), np.array(I).flatten()
print(np.bincount(I, D)) #[0.5 0.6]
发布于 2020-12-02 08:10:48
I
和D
的形状并不重要:您可以在不更改结果的情况下清晰地排列数组:
index = np.ravel(I)
data = np.ravel(D)
现在,您可以根据I
对两个数组进行排序。
sorter = np.argsort(index)
index = index[sorter]
data = data[sorter]
这很有帮助,因为现在index
看起来如下所示:
0, 0, 0, 1, 1, 1
data
是这样的:
0.1, 0.2, 0.2, 0.1, 0.4, 0.1
与处理随机位置相比,将连续数的运行相加应该更容易。让我们从寻找开始运行的索引开始:
runs = np.r_[0, np.flatnonzero(np.diff(index)) + 1]
现在,您可以使用这样的事实:像np.add
这样的运行函数有一个名为reduceat
的部分reduce
操作。这允许您对数组的区域进行求和:
a = np.add.reduceat(data, runs)
如果保证I
至少包含一次[0,A.size
)中的所有索引,那么您就完成了:只分配给A
而不是a
。如果不是,可以使用以下事实进行映射:index
中每次运行的开始都是目标索引:
A = np.zeros(n)
A[index[runs]] = a
算法复杂度分析:
ravel
在时间和空间上为O(1)。如果它是一个列表,它在时间和空间上是O(MN)argsort
在时间上是O(MN log ),在空间上是O(MN)
sorter
的索引在时间和空间上是O(MN)runs
的时间为O(MN),空间上为O(MN + M) = O(MN)。reduceat
是一个单程:时间上的O(MN),空间中的O(M)A
在时间和空间上是O(M)总计: O(MN log MN)时间,O(MN)空间
TL;博士
def make_A(D, I, M):
index = np.ravel(I)
data = np.ravel(D)
sorter = np.argsort(index)
index = index[sorter]
if index[0] < 0 or index[-1] >= M:
raise ValueError('Bad indices')
data = data[sorter]
runs = np.r_[0, np.flatnonzero(np.diff(index)) + 1]
a = np.add.reduceat(data, runs)
if a.size == M:
return a
A = np.zeros(M)
A[index[runs]] = a
return A
发布于 2020-12-10 07:47:50
如果你事先知道A的大小,就像你看上去的那样,你可以简单地使用add.at
import numpy as np
D = [[0.1, 0.1, 0.2], [0.2, 0.4, 0.1]]
I = [[0, 1, 0], [0, 1, 1]]
arr_D = np.array(D)
arr_I = np.array(I)
A = np.zeros(2)
np.add.at(A, arr_I, arr_D)
print(A)
输出
[0.5 0.6]
如果您不知道A的大小,可以使用最大值计算它:
A = np.zeros(arr_I.max() + 1)
np.add.at(A, arr_I, arr_D)
print(A)
输出
[0.5 0.6]
该算法的时间复杂度为O(N),空间复杂度为O(N)。
以下方面:
arr_I.max() + 1
是bincount在引擎盖下所做的,来自文档。
绑定输入数组的结果。out的长度等于np.amax(x)+1。
话虽如此,二进制数至少快一个数量级:
I = np.random.choice(1000, size=(1000, 3), replace=True)
D = np.random.random((1000, 3))
%timeit make_A_with_at(I, D, 1000)
213 µs ± 25 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
%timeit make_A_with_bincount(I, D)
11 µs ± 15.6 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
https://stackoverflow.com/questions/65103114
复制相似问题