最初实现的学分--我的代码基于:
Quora - 对百万32位整数进行排序的最有效方法是什么?
// RadixSort - works for values up to unsigned_int_max (32-bit)
template<typename Iter>
void radix_sort(Iter __first, Iter __last){
typedef typename iterator_traits<Iter>::value_type value_type;
vector<value_type> out(__last - __first);
// calculate most-significant-digit (256-base)
value_type __mx = *max_element(__first, __last); //O(n)
int __msb = 0;
do {
__msb += 8;
__mx = __mx >> 8;
} while(__mx);
Iter __i, __j, __s;
bool __swapped = false;
for (int __shift = 0; __shift < __msb; __shift += 8) {
// cycle input/auxiliar vectors
if (__swapped) {
__i = out.begin();
__j = out.end();
__s = __first;
}
else {
__i = __first;
__j = __last;
__s = out.begin();
}
// counting_sort
size_t count[0x100] = {};
for (Iter __p = __i; __p != __j; __p++)
count[(*__p >> __shift) & 0xFF]++;
// prefix-sum
size_t __m, __q = 0;
for (int i = 0; i < 0x100; i++) {
__m = count[i];
count[i] = __q;
__q += __m;
}
// filling result
Iter __v;
for (Iter __p = __i; __p != __j; __p++) {
__v = __s + count[(*__p >> __shift) & 0xFF]++;
*__v = *__p;
}
__swapped = !__swapped;
}
// if ended on auxiliar vector, copy to input vector
if (__swapped) copy(out.begin(), out.end(), __first); //O(n)
}
为了实现比参考算法更通用的算法,我尝试使用template
和iterators
重构代码。这是我的代码和原始代码之间的主要区别。
在使用迭代器时,我不能简单地交换引用,所以我通过循环每个计数排序循环中使用的迭代器来解决这个问题。这样做可以防止复制每个循环上的输出数组。
我的代码的另一个不同方面是,我找到了max元素,并在基256上计算了最重要的数字。我使用这些信息来确定需要多少计数排序循环,而不是硬编码32 (unsigned_max),并且总是运行4次循环,即使所有的值都小于255。这实际上增加了不必要的开销,如果4个循环是必要的,但它应该增加执行时间,否则。
使用的临时容器是一个vector<value_type>
,这是我不太确定的事情,我觉得这是我可以改进代码的地方。我想听听大家对此的看法。
std::copy
或std::move
将数据从临时数组移动到输入容器中吗?我决定运行一些基准来测试计算最大数字是否会产生巨大的影响。element_values
定义了每个元素可以拥有的值的范围。如果element_values
可以用24位或更少的方式表示,则最大数字仅会影响循环数。以下是n = 1E7
的结果:
element_values < 256 (8-bits):
radix_sort_msd - Average time: 51.59 ms
radix_sort_32 - Average time: 109.03 ms
element_values < UINT_MAX(32-bits):
radix_sort_msd - Average time: 107.38 ms
radix_sort_32 - Average time: 89.75 ms
虽然radix_sort_msd
在element_values
很小的时候运行得很好,但它实际上取决于数据集。因此,实施是优先考虑的问题。
发布于 2017-03-02 21:36:48
的改进
您想知道的事情之一是radix_sort_msd
还是radix_sort_32
更好。前者调用max_element()
以确定值的最大宽度,而后者在不需要时总是执行4次传递。
在生成计数之后,我使用了radix_sort_32
并添加了以下代码:
// If this byte is zero for every value, skip the byte entirely.
if (count[0] == (size_t)(__last - __first))
continue;
这是一个快速检查,以确定整个输入是否包含当前字节的零。如果是这样的话,它将转移到下一个字节,而不是浪费时间将输入精确复制到临时区域。在“值<256个”的情况下,它仍然比radix_sort_msd
花费更多的时间,但速度要快得多,因为3次传递可以跳过50%以上的工作。
在我的机器上,在“值< 256”的情况下,radix_sort_32
比更改前快了40%。这可能会让您重新考虑哪个版本更好。
你可以做的另一件事,以改善你的基数排序是使用所谓的“混合”基排序,这是一个混合的MSD和LSD基排序。其思想是在最重要的字节上使用基数排序的一次传递。这会将输入分成256个“回收箱”。然后在256个垃圾箱中的每一个上运行LSD基排序(当前排序)。
与普通LSD基排序相比,这种混合排序的优点是混合版本对缓存更友好。在普通LSD排序中,如果您的输入不适合缓存,那么第二次计数传递将无法从缓存中受益,因为到第一次计数传递结束时,数组的开始将被从缓存中删除。
使用混合排序,输入首先被分解为256个部分(希望这些部分适合高速缓存)。因此,由于缓存,对每个bin进行排序应该会更快。但是,有额外的开销,因为256个较小的类别中的每一个都需要做一些固定数量的工作来处理计数桶。因此,只有当输入大于缓存大小时,我们才应该进行混合排序,否则我们只会有更多的开销而没有任何好处。
我修改了您的代码来执行混合排序,并且它比随机32位输入的原始版本运行得更快。我没有尝试为“值< 256”的情况优化它,尽管那里肯定有改进的可能性,类似于我在上一节中提到的那样。以下是代码:
// LSD RadixSort helper function for radix_sort() below.
template<typename Iter>
static void radix_sort_lsd(Iter __first, Iter __last, Iter __out, Iter __outEnd,
int __msb, bool needsSwap)
{
Iter __i, __j, __s;
bool __swapped = false;
for (int __shift = 0; __shift < __msb; __shift += 8) {
// cycle input/auxiliar vectors
if (__swapped) {
__i = __out;
__j = __outEnd;
__s = __first;
}
else {
__i = __first;
__j = __last;
__s = __out;
}
// counting_sort
size_t count[0x100] = {};
for (Iter __p = __i; __p != __j; __p++)
count[(*__p >> __shift) & 0xFF]++;
if (count[0] == (size_t)(__last - __first))
continue;
// prefix-sum
size_t __m, __q = 0;
for (int i = 0; i < 0x100; i++) {
__m = count[i];
count[i] = __q;
__q += __m;
}
// filling result
for (Iter __p = __i; __p != __j; __p++) {
*(__s + count[(*__p >> __shift) & 0xFF]++) = *__p;
}
__swapped = !__swapped;
}
// if ended on auxiliar vector, copy to input vector
if (__swapped != needsSwap) copy(__out, __outEnd, __first); //O(n)
}
// If the input exceeds this threshold (in bytes), we do one pass of MSD
// followed by the rest of the passes done by LSD. This number should be
// an estimate of the cache size.
#define THRESHOLD 8000000
// RadixSort - works for values up to unsigned_int_max (32-bit)
template<typename Iter>
void radix_sort(Iter __first, Iter __last)
{
typedef typename iterator_traits<Iter>::value_type value_type;
size_t len = (size_t) (__last - __first);
vector<value_type> out(len);
// First, test if the input exceeds the caching threshold. If the input
// is smaller than the threshold, just do a straight LSD radix sort.
if (len * sizeof(value_type) < THRESHOLD) {
radix_sort_lsd(__first, __last, out.begin(), out.end(),
sizeof(value_type) * 8, false);
return;
}
// Set __shift to the most significant byte.
int __shift = (sizeof(value_type)-1) * 8;
Iter __s = out.begin();
Iter __p = __first;
// counting_sort
size_t count[0x100] = {};
for (size_t i = 0; i < len; i++) {
count[(*__p++ >> __shift) & 0xFF]++;
}
// prefix-sum
size_t __m, __q = 0;
for (int i = 0; i < 0x100; i++) {
__m = count[i];
count[i] = __q;
__q += __m;
}
// filling result
__p = __first;
for (size_t i = 0; i < len; i++) {
*(__s + count[(*__p >> __shift) & 0xFF]++) = *__p++;
}
// For each of the 256 bins, do a LSD radix sort on the bin. The input
// and auxiliary vectors have been swapped, so we pass needsSwap = true to
// indicate that the LSD sort should end on the auxiliary vector instead.
int startIndex = 0;
for (int i = 0; i < 0x100; i++) {
int endIndex = count[i];
radix_sort_lsd(__s + startIndex, __s + endIndex, __first + startIndex,
__first + endIndex, __shift, true);
startIndex = endIndex;
}
}
我认为vector<value_type>
是辅助数组的适当容器。我想不出比这更好的事了。
就复制vs移动而言,它们对于数字类型应该是等效的。有关原因的一些好解释,请参见这个堆栈溢出问题及其答案。
发布于 2017-02-28 17:17:42
我想知道使用了什么级别的优化,因为我在radix_sort_32上的时间平均为22.8ms,而平均为89.75ms,大约是速度的4倍。从使用迭代器到指针到向量的第一个元素的转换,然后使用指针索引,将这个值减少到14.8ms。
如前所述,在1百万32位整数的情况下,原始数组和工作数组都适用于许多处理器上的8MB L3/L4缓存。这大大减少了每次基数排序传递期间随机访问写入的开销。在随机访问写入成为一个更重要的问题之前,向量/数组大小需要更大,超出处理器缓存的范围。
radix sort times for 1048576 32 bit unsgined integers
Intel 3770K, 3.5ghz, 32 bit mode (Windows XP), Visual Studio 2005
method average time
radix_sort_32 (set msb = 32) 0.0228 seconds
radix_sort_32 using pointer[index] 0.0148 seconds
array 0.0132 seconds
array with matrix for counts/indices 0.0127 seconds
array with 4 threads and matrix 0.0115 seconds
正如最初的海报所要求的那样,这里是多线程基排序的示例代码。多线程部分是Windows特定的.RadixSort()是以前发布的面向矩阵的函数(除了工作数组b[]作为参数传递以避免对多线程进行多次分配),而不是特定于Windows的函数。QP定义是用于Windows性能计数器。
//------------------------------------------------------------------//
// mtrsrt.cpp mult-threaded radix sort //
//------------------------------------------------------------------//
#include <cstdlib>
#include <ctime>
#include <iostream>
#include <windows.h>
// COUNT must be multiple of 4
#define COUNT (1024*1024)
typedef unsigned int uint32_t;
#define QP 0 // if != 0, use queryperformance for timer
#if QP
#include <math.h>
#include <windows.h>
#pragma comment(lib, "winmm.lib")
typedef LARGE_INTEGER LI64;
#endif
#if QP
static LI64 liQPFrequency; // cpu counter values
static LI64 liStartTime;
static LI64 liStopTime;
static double dQPFrequency;
static double dStartTime;
static double dStopTime;
static double dElapsedTime;
#else
clock_t ctTimeStart; // clock values
clock_t ctTimeStop;
#endif
static HANDLE hs0; // semaphore handles
static HANDLE hs1;
static HANDLE hs2;
static HANDLE hs3;
static HANDLE ht1; // thread handles
static HANDLE ht2;
static HANDLE ht3;
static DWORD WINAPI Thread0(LPVOID); // thread functions
static DWORD WINAPI Thread1(LPVOID);
static DWORD WINAPI Thread2(LPVOID);
static DWORD WINAPI Thread3(LPVOID);
static uint32_t *pa; // pointers to buffers
static uint32_t *pb;
void RadixSort(uint32_t a[], uint32_t b[], size_t n);
void Merge(uint32_t a[], uint32_t b[], size_t ll, size_t rr, size_t ee);
uint32_t main()
{
uint32_t *array = new uint32_t[COUNT];
uint32_t *buffer = new uint32_t[COUNT];
pa = array;
pb = buffer;
for(uint32_t i = 0; i < COUNT; i++){ // generate pseudo random data
uint32_t r;
r = (((uint32_t)((rand()>>4) & 0xff))<< 0);
r += (((uint32_t)((rand()>>4) & 0xff))<< 8);
r += (((uint32_t)((rand()>>4) & 0xff))<<16);
r += (((uint32_t)((rand()>>4) & 0xff))<<24);
array[i] = r;
}
hs0 = CreateSemaphore(NULL,0,1,NULL);
hs1 = CreateSemaphore(NULL,0,1,NULL);
hs2 = CreateSemaphore(NULL,0,1,NULL);
hs3 = CreateSemaphore(NULL,0,1,NULL);
ht1 = CreateThread(NULL, 0, Thread1, 0, 0, 0);
ht2 = CreateThread(NULL, 0, Thread2, 0, 0, 0);
ht3 = CreateThread(NULL, 0, Thread3, 0, 0, 0);
#if QP
QueryPerformanceFrequency(&liQPFrequency);
dQPFrequency = (double)liQPFrequency.QuadPart;
timeBeginPeriod(1); // set ticker to 1000 hz
Sleep(128); // wait for it to settle
QueryPerformanceCounter(&liStartTime);
#else
ctTimeStart = clock();
#endif
ReleaseSemaphore(hs0, 1, NULL); // start sorts
ReleaseSemaphore(hs1, 1, NULL);
ReleaseSemaphore(hs2, 1, NULL);
ReleaseSemaphore(hs3, 1, NULL);
Thread0((LPVOID)NULL); // run thread 0
WaitForSingleObject(ht2, INFINITE); // wait for thead 2
// merge 1st and 2nd halves
Merge(pb, pa, 0, COUNT>>1, COUNT);
#if QP
QueryPerformanceCounter(&liStopTime);
dStartTime = (double)liStartTime.QuadPart;
dStopTime = (double)liStopTime.QuadPart;
dElapsedTime = (dStopTime - dStartTime) / dQPFrequency;
timeEndPeriod(1); // restore ticker to default
std::cout << "# of seconds " << dElapsedTime << std::endl;
#else
ctTimeStop = clock();
std::cout << "# of ticks " << ctTimeStop - ctTimeStart << std::endl;
#endif
for(uint32_t i = 1; i < COUNT; i++){ // check result
if(array[i-1] > array[i]){
std::cout << "failed" << std::endl;
break;
}
}
CloseHandle(ht3);
CloseHandle(ht2);
CloseHandle(ht1);
CloseHandle(hs3);
CloseHandle(hs2);
CloseHandle(hs1);
CloseHandle(hs0);
delete[] buffer;
delete[] array;
return 0;
}
static DWORD WINAPI Thread0(LPVOID lpvoid)
{
WaitForSingleObject(hs0, INFINITE); // wait for semaphore
// sort 1st quarter
RadixSort(pa + 0*(COUNT>>2), pb + 0*(COUNT>>2), COUNT>>2);
WaitForSingleObject(ht1, INFINITE); // wait for thead 1
// merge 1st and 2nd quarter
Merge(pa + 0*(COUNT>>1), pb + 0*(COUNT>>1), 0, COUNT>>2, COUNT>>1);
return 0;
}
static DWORD WINAPI Thread1(LPVOID lpvoid)
{
WaitForSingleObject(hs1, INFINITE); // wait for semaphore
// sort 2nd quarter
RadixSort(pa + 1*(COUNT>>2), pb + 1*(COUNT>>2), COUNT>>2);
return 0;
}
static DWORD WINAPI Thread2(LPVOID lpvoid)
{
WaitForSingleObject(hs2, INFINITE); // wait for semaphore
// sort 3rd quarter
RadixSort(pa + 2*(COUNT>>2), pb + 2*(COUNT>>2), COUNT>>2);
WaitForSingleObject(ht3, INFINITE); // wait for thread 3
// merge 3rd and 4th quarter
Merge(pa + 1*(COUNT>>1), pb + 1*(COUNT>>1), 0, COUNT>>2, COUNT>>1);
return 0;
}
static DWORD WINAPI Thread3(LPVOID lpvoid)
{
WaitForSingleObject(hs3, INFINITE); // wait for semaphore
// sort 4th quarter
RadixSort(pa + 3*(COUNT>>2), pb + 3*(COUNT>>2), COUNT>>2);
return 0;
}
void RadixSort(uint32_t a[], uint32_t b[], size_t count)
{
size_t mIndex[4][256] = {0}; // count / index matrix
size_t i,j,m,n;
uint32_t u;
for(i = 0; i < count; i++){ // generate histograms
u = a[i];
for(j = 0; j < 4; j++){
mIndex[j][(size_t)(u & 0xff)]++;
u >>= 8;
}
}
for(j = 0; j < 4; j++){ // convert to indices
m = 0;
for(i = 0; i < 256; i++){
n = mIndex[j][i];
mIndex[j][i] = m;
m += n;
}
}
for(j = 0; j < 4; j++){ // radix sort
for(i = 0; i < count; i++){ // sort by current lsb
u = a[i];
m = (size_t)(u>>(j<<3))&0xff;
b[mIndex[j][m]++] = u;
}
std::swap(a, b); // swap ptrs
}
}
void Merge(uint32_t a[], uint32_t b[], size_t ll, size_t rr, size_t ee)
{
size_t o = ll; // b[] index
size_t l = ll; // a[] left index
size_t r = rr; // a[] right index
while(1){ // merge data
if(a[l] <= a[r]){ // if a[l] <= a[r]
b[o++] = a[l++]; // copy a[l]
if(l < rr) // if not end of left run
continue; // continue (back to while)
do // else copy rest of right run
b[o++] = a[r++];
while(r < ee);
break; // and return
} else { // else a[l] > a[r]
b[o++] = a[r++]; // copy a[r]
if(r < ee) // if not end of right run
continue; // continue (back to while)
do // else copy rest of left run
b[o++] = a[l++];
while(l < rr);
break; // and return
}
}
}
https://codereview.stackexchange.com/questions/156275
复制相似问题