我已经写了一个代码,并尝试使用numba加速代码。代码的主要目标是根据条件对某些值进行分组。在这方面,iter_
用于收敛代码以满足条件。我准备了下面的一个小例子来重现示例代码:
import numpy as np
import numba as nb
rng = np.random.default_rng(85)
# --------------------------------------- small data volume ---------------------------------------
# values_ = {'R0': np.array([0.01090976, 0.01069902, 0.00724112, 0.0068463 , 0.01135723, 0.00990762,
# 0.01090976, 0.01069902, 0.00724112, 0.0068463 , 0.01135723]),
# 'R1': np.array([0.01836379, 0.01900166, 0.01864162, 0.0182823 , 0.01840322, 0.01653088,
# 0.01900166, 0.01864162, 0.0182823 , 0.01840322, 0.01653088]),
# 'R2': np.array([0.02430913, 0.02239156, 0.02225379, 0.02093393, 0.02408692, 0.02110411,
# 0.02239156, 0.02225379, 0.02093393, 0.02408692, 0.02110411])}
#
# params = {'R0': [3, 0.9490579204466154, 1825, 7.070272000000002e-05],
# 'R1': [0, 0.9729203826820172, 167 , 7.070272000000002e-05],
# 'R2': [1, 0.6031363088057902, 1316, 8.007296000000003e-05]}
#
# Sno, dec_, upd_ = 2, 100, 200
# -------------------------------------------------------------------------------------------------
# ----------------------------- UPDATED (medium and large data volumes) ---------------------------
# values_ = np.load("values_med.npy", allow_pickle=True)[()]
# params = np.load("params_med.npy", allow_pickle=True)[()]
values_ = np.load("values_large.npy", allow_pickle=True)[()]
params = np.load("params_large.npy", allow_pickle=True)[()]
Sno, dec_, upd_ = 2000, 1000, 200
# -------------------------------------------------------------------------------------------------
# values_ = [*values_.values()]
# params = [*params.values()]
# @nb.jit(forceobj=True)
# def test(values_, params, Sno, dec_, upd_):
final_dict = {}
for i, j in enumerate(values_.keys()):
Rand_vals = []
goal_sum = params[j][1] * params[j][3]
tel = goal_sum / dec_ * 10
if params[j][0] != 0:
for k in range(Sno):
final_sum = 0.0
iter_ = 0
t = 1
while not np.allclose(goal_sum, final_sum, atol=tel):
iter_ += 1
vals_group = rng.choice(values_[j], size=params[j][0], replace=False)
# final_sum = 0.0016 * np.sum(vals_group) # -----> For small data volume
final_sum = np.sum(vals_group ** 3) # -----> UPDATED For med or large data volume
if iter_ == upd_:
t += 1
tel = t * tel
values_[j] = np.delete(values_[j], np.where(np.in1d(values_[j], vals_group)))
Rand_vals.append(vals_group)
else:
Rand_vals = [np.array([])] * Sno
final_dict["R" + str(i)] = Rand_vals
# return final_dict
# test(values_, params, Sno, dec_, upd_)
首先,在此代码上应用numba,使用了@nb.jit
(forceobj=True
用于避免警告和…)。,这将对演出产生不利影响。nopython
也会使用@nb.njit
进行检查,因为输入的不支持 (如1,2中提到的)字典类型导致了以下错误:
无法确定的Numba类型
我不知道Dict
是否可以(如何)从numba.typed
(通过将创建的python字典转换为numba )处理它,或者将字典转换为数组列表是否有什么好处。我认为,并行化可能是可能的,如果一些代码行,如Rand_vals.append(vals_group)
或其他部分或…从函数中删除或修改以获得与以前相同的结果,但我不知道如何这样做。
我将非常感谢在这段代码中帮助使用numba。如果可以的话,numba parallelization
将是最理想的解决方案(可能是性能方面最适用的方法)。
数据:
发布于 2022-04-13 17:05:12
此代码可以转换为Numba,但并不简单。
首先,必须定义字典和列表类型,因为Numba njit
函数不能直接在反射列表(又名)上操作。纯python列表)。在Numba中这样做有点乏味,其结果代码有点冗长:
String = nb.types.unicode_type
ValueArray = nb.float64[::1]
ValueDict = nb.types.DictType(String, ValueArray)
ParamDictValue = nb.types.Tuple([nb.int_, nb.float64, nb.int_, nb.float64])
ParamDict = nb.types.DictType(String, ParamDictValue)
FinalDictValue = nb.types.ListType(ValueArray)
FinalDict = nb.types.DictType(String, FinalDictValue)
然后,您需要转换输入字典:
nbValues = nb.typed.typeddict.Dict.empty(String, ValueArray)
for key,value in values_.items():
nbValues[key] = value.copy()
nbParams = nb.typed.typeddict.Dict.empty(String, ParamDictValue)
for key,value in params.items():
nbParams[key] = (nb.int_(value[0]), nb.float64(value[1]), nb.int_(value[2]), nb.float64(value[3]))
然后,您需要编写核心函数。np.allclose
和np.isin
没有在Numba中实现,因此应该手动重新实现。但要点是Numba不支持rng
Numpy对象。我认为政府肯定不会在短期内支持这项建议。注意,Numba有一个随机数实现,试图模仿Numpy的行为,但是种子的管理有点不同。还请注意,如果种子设置为相同的值(Numpy和Numba有不同的种子变量而不是同步的),那么np.random.xxx
Numpy函数的结果应该是相同的。
@nb.njit(FinalDict(ValueDict, ParamDict, nb.int_, nb.int_, nb.int_))
def nbTest(values_, params, Sno, dec_, upd_):
final_dict = nb.typed.Dict.empty(String, FinalDictValue)
for i, j in enumerate(values_.keys()):
Rand_vals = nb.typed.List.empty_list(ValueArray)
goal_sum = params[j][1] * params[j][3]
tel = goal_sum / dec_ * 10
if params[j][0] != 0:
for k in range(Sno):
final_sum = 0.0
iter_ = 0
t = 1
vals_group = np.empty(0, dtype=nb.float64)
while np.abs(goal_sum - final_sum) > (1e-05 * np.abs(final_sum) + tel):
iter_ += 1
vals_group = np.random.choice(values_[j], size=params[j][0], replace=False)
final_sum = 0.0016 * np.sum(vals_group)
# final_sum = 0.0016 * np.sum(vals_group) # (for small data volume)
final_sum = np.sum(vals_group ** 3) # (for med or large data volume)
if iter_ == upd_:
t += 1
tel = t * tel
# Perform an in-place deletion
vals, gr = values_[j], vals_group
cur = 0
for l in range(vals.size):
found = False
for m in range(gr.size):
found |= vals[l] == gr[m]
if not found:
# Keep the value (delete it otherwise)
vals[cur] = vals[l]
cur += 1
values_[j] = vals[:cur]
Rand_vals.append(vals_group)
else:
for k in range(Sno):
Rand_vals.append(np.empty(0, dtype=nb.float64))
final_dict["R" + str(i)] = Rand_vals
return final_dict
请注意,np.isin
的替换实现非常幼稚,但在您的输入示例中,它在实践中运行得很好。
可以使用以下方式调用该函数:
nbFinalDict = nbTest(nbValues, nbParams, Sno, dec_, upd_)
最后,应该将字典转换回基本Python对象:
finalDict = dict()
for key,value in nbFinalDict.items():
finalDict[key] = list(value)
这个实现对于小的输入是快速的,但不是大的,因为np.random.choice
几乎所有的时间都是 (>96%)。问题是,当请求项的数量很小时,这个函数显然不是最优的(这就是您的情况)。实际上,它在输入数组的线性时间中运行,而不是在请求项数的线性时间中运行。
进一步优化
该算法可以完全重写,只提取12个随机项,并以更有效的方式从主醋栗数组中丢弃它们。其思想是将数组末尾的n
项(小目标示例)与任意位置的其他项交换,然后检查和,重复此过程直到满足条件,最后在调整视图大小之前将视图提取到最后一个n
项,从而丢弃最后一个项。所有这些都可以在O(n)
时间完成,而不是O(m)
时间,其中m
是带有n << m
的主电流数组的大小(例如。12对20_000)。它也可以在不需要任何昂贵分配的情况下进行计算。以下是生成的代码:
@nb.njit(nb.void(ValueArray, nb.int_, nb.int_))
def swap(arr, i, j):
arr[i], arr[j] = arr[j], arr[i]
@nb.njit(FinalDict(ValueDict, ParamDict, nb.int_, nb.int_, nb.int_))
def nbTest(values_, params, Sno, dec_, upd_):
final_dict = nb.typed.Dict.empty(String, FinalDictValue)
for i, j in enumerate(values_.keys()):
Rand_vals = nb.typed.List.empty_list(ValueArray)
goal_sum = params[j][1] * params[j][3]
tel = goal_sum / dec_ * 10
values = values_[j]
n = params[j][0]
if n != 0:
for k in range(Sno):
final_sum = 0.0
iter_ = 0
t = 1
m = values.size
assert n <= m
group = values[-n:]
while np.abs(goal_sum - final_sum) > (1e-05 * np.abs(final_sum) + tel):
iter_ += 1
# Swap the group view with other random items
for pos in range(m - n, m):
swap(values, pos, np.random.randint(0, m))
# For small data volume:
# final_sum = 0.0016 * np.sum(group)
# For med/large data volume
final_sum = 0.0
for v in group:
final_sum += v ** 3
if iter_ == upd_:
t += 1
tel *= t
assert iter_ > 0
values = values[:m-n]
Rand_vals.append(group)
else:
for k in range(Sno):
Rand_vals.append(np.empty(0, dtype=nb.float64))
final_dict["R" + str(i)] = Rand_vals
return final_dict
除了速度更快以外,这种实现作为更简单的好处也是如此。结果看起来与以前的实现非常相似,尽管具有随机性,这使得对结果的检查变得困难(特别是因为这个函数没有使用相同的方法来选择随机样本)。请注意,与前一个实现相比,此实现没有删除values
中group
中的项(这可能并不需要)。
基准测试
以下是我的机器上最后一个实现的结果(不包括编译和转换时间):
Provided small input (embedded in the question):
- Initial code: 42.71 ms
- Numba code: 0.11 ms
Medium input:
- Initial code: 3481 ms
- Numba code: 11 ms
Large input:
- Initial code: 6728 ms
- Numba code: 20 ms
请注意,转换时间与计算时间大致相同。
最后一个实现是316~388倍,比小输入上的初始代码快一倍。
备注
注意,由于dict和列出类型,编译时间需要几秒钟。
请注意,虽然可以并行化实现,但只有最包容的循环才能并行化。问题是,需要计算的项目很少,而且时间已经很短了(这不是多线程的最佳情况)。此外,创建许多临时数组(由rng.choice
创建)肯定会导致并行循环不能很好地扩展。此外,Numba并行化倾向于显著增加已经很重要的编译时间。最后,由于每个Numba线程都有自己的随机数生成器种子,并且不能用prange
(取决于在目标平台上选择的并行运行时)来预测线程计算的项,所以结果不那么确定。请注意,在Numpy中,默认情况下有一个全局种子被通常的随机函数使用(不推荐的方式),而RNG对象有自己的种子(新的首选方式)。
https://stackoverflow.com/questions/71814612
复制相似问题