首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >RadixSort实现设计与性能

RadixSort实现设计与性能
EN

Code Review用户
提问于 2017-02-25 08:56:50
回答 2查看 806关注 0票数 8

最初实现的学分--我的代码基于:

Quora - 对百万32位整数进行排序的最有效方法是什么?

Implementation

代码语言:javascript
运行
复制
// RadixSort - works for values up to unsigned_int_max (32-bit)
template<typename Iter>
void radix_sort(Iter __first, Iter __last){
    typedef typename iterator_traits<Iter>::value_type value_type;
    vector<value_type> out(__last - __first);

    // calculate most-significant-digit (256-base)
    value_type __mx = *max_element(__first, __last); //O(n)
    int __msb = 0;
    do {
        __msb += 8;
        __mx = __mx >> 8;
    } while(__mx);

    Iter __i, __j, __s;
    bool __swapped = false;
    for (int __shift = 0; __shift < __msb; __shift += 8) {

        // cycle input/auxiliar vectors
        if (__swapped) {
            __i = out.begin();
            __j = out.end();
            __s = __first;
        }
        else {
            __i = __first;
            __j = __last;
            __s = out.begin();
        }

        // counting_sort
        size_t count[0x100] = {};
        for (Iter __p = __i; __p != __j; __p++)
            count[(*__p >> __shift) & 0xFF]++;

        // prefix-sum
        size_t __m, __q = 0;
        for (int i = 0; i < 0x100; i++) {
            __m = count[i];
            count[i] = __q;
            __q += __m;
        }

        // filling result
        Iter __v;
        for (Iter __p = __i; __p != __j; __p++) {
            __v = __s + count[(*__p >> __shift) & 0xFF]++;
            *__v = *__p;
        }
        __swapped = !__swapped;
    }

    // if ended on auxiliar vector, copy to input vector
    if (__swapped) copy(out.begin(), out.end(), __first); //O(n)
}

讨论

为了实现比参考算法更通用的算法,我尝试使用templateiterators重构代码。这是我的代码和原始代码之间的主要区别。

在使用迭代器时,我不能简单地交换引用,所以我通过循环每个计数排序循环中使用的迭代器来解决这个问题。这样做可以防止复制每个循环上的输出数组。

我的代码的另一个不同方面是,我找到了max元素,并在基256上计算了最重要的数字。我使用这些信息来确定需要多少计数排序循环,而不是硬编码32 (unsigned_max),并且总是运行4次循环,即使所有的值都小于255。这实际上增加了不必要的开销,如果4个循环是必要的,但它应该增加执行时间,否则。

使用的临时容器是一个vector<value_type>,这是我不太确定的事情,我觉得这是我可以改进代码的地方。我想听听大家对此的看法。

问题

  • 我的执行工作可以做哪些改进?
  • 您将如何重新考虑我使用的迭代器循环?(我讨厌它)
  • 我应该使用哪个容器作为临时数组?
  • 我应该使用std::copystd::move将数据从临时数组移动到输入容器中吗?

相关信息

我决定运行一些基准来测试计算最大数字是否会产生巨大的影响。element_values定义了每个元素可以拥有的值的范围。如果element_values可以用24位或更少的方式表示,则最大数字仅会影响循环数。以下是n = 1E7的结果:

代码语言:javascript
运行
复制
     element_values < 256 (8-bits):  
          radix_sort_msd - Average time:   51.59 ms  
          radix_sort_32  - Average time:  109.03 ms

     element_values < UINT_MAX(32-bits):  
          radix_sort_msd - Average time:  107.38 ms  
          radix_sort_32  - Average time:   89.75 ms

虽然radix_sort_msdelement_values很小的时候运行得很好,但它实际上取决于数据集。因此,实施是优先考虑的问题。

EN

回答 2

Code Review用户

回答已采纳

发布于 2017-03-02 21:36:48

对radix_sort_32

的改进

您想知道的事情之一是radix_sort_msd还是radix_sort_32更好。前者调用max_element()以确定值的最大宽度,而后者在不需要时总是执行4次传递。

在生成计数之后,我使用了radix_sort_32并添加了以下代码:

代码语言:javascript
运行
复制
// If this byte is zero for every value, skip the byte entirely.
if (count[0] == (size_t)(__last - __first))
    continue;

这是一个快速检查,以确定整个输入是否包含当前字节的零。如果是这样的话,它将转移到下一个字节,而不是浪费时间将输入精确复制到临时区域。在“值<256个”的情况下,它仍然比radix_sort_msd花费更多的时间,但速度要快得多,因为3次传递可以跳过50%以上的工作。

在我的机器上,在“值< 256”的情况下,radix_sort_32比更改前快了40%。这可能会让您重新考虑哪个版本更好。

混合基排序

你可以做的另一件事,以改善你的基数排序是使用所谓的“混合”基排序,这是一个混合的MSD和LSD基排序。其思想是在最重要的字节上使用基数排序的一次传递。这会将输入分成256个“回收箱”。然后在256个垃圾箱中的每一个上运行LSD基排序(当前排序)。

与普通LSD基排序相比,这种混合排序的优点是混合版本对缓存更友好。在普通LSD排序中,如果您的输入不适合缓存,那么第二次计数传递将无法从缓存中受益,因为到第一次计数传递结束时,数组的开始将被从缓存中删除。

使用混合排序,输入首先被分解为256个部分(希望这些部分适合高速缓存)。因此,由于缓存,对每个bin进行排序应该会更快。但是,有额外的开销,因为256个较小的类别中的每一个都需要做一些固定数量的工作来处理计数桶。因此,只有当输入大于缓存大小时,我们才应该进行混合排序,否则我们只会有更多的开销而没有任何好处。

我修改了您的代码来执行混合排序,并且它比随机32位输入的原始版本运行得更快。我没有尝试为“值< 256”的情况优化它,尽管那里肯定有改进的可能性,类似于我在上一节中提到的那样。以下是代码:

代码语言:javascript
运行
复制
// LSD RadixSort helper function for radix_sort() below.
template<typename Iter>
static void radix_sort_lsd(Iter __first, Iter __last, Iter __out, Iter __outEnd,
        int __msb, bool needsSwap)
{
    Iter __i, __j, __s;
    bool __swapped = false;

    for (int __shift = 0; __shift < __msb; __shift += 8) {

        // cycle input/auxiliar vectors
        if (__swapped) {
            __i = __out;
            __j = __outEnd;
            __s = __first;
        }
        else {
            __i = __first;
            __j = __last;
            __s = __out;
        }

        // counting_sort
        size_t count[0x100] = {};
        for (Iter __p = __i; __p != __j; __p++)
            count[(*__p >> __shift) & 0xFF]++;

        if (count[0] == (size_t)(__last - __first))
            continue;

        // prefix-sum
        size_t __m, __q = 0;
        for (int i = 0; i < 0x100; i++) {
            __m = count[i];
            count[i] = __q;
            __q += __m;
        }

        // filling result
        for (Iter __p = __i; __p != __j; __p++) {
            *(__s + count[(*__p >> __shift) & 0xFF]++) = *__p;
        }
        __swapped = !__swapped;
    }

    // if ended on auxiliar vector, copy to input vector
    if (__swapped != needsSwap) copy(__out, __outEnd, __first); //O(n)
}

// If the input exceeds this threshold (in bytes), we do one pass of MSD
// followed by the rest of the passes done by LSD.  This number should be
// an estimate of the cache size.
#define THRESHOLD 8000000

// RadixSort - works for values up to unsigned_int_max (32-bit)
template<typename Iter>
void radix_sort(Iter __first, Iter __last)
{
    typedef typename iterator_traits<Iter>::value_type value_type;
    size_t len = (size_t) (__last - __first);
    vector<value_type> out(len);

    // First, test if the input exceeds the caching threshold.  If the input
    // is smaller than the threshold, just do a straight LSD radix sort.
    if (len * sizeof(value_type) < THRESHOLD) {
        radix_sort_lsd(__first, __last, out.begin(), out.end(),
                sizeof(value_type) * 8, false);
        return;
    }

    // Set __shift to the most significant byte.
    int  __shift = (sizeof(value_type)-1) * 8;
    Iter __s     = out.begin();
    Iter __p     = __first;

    // counting_sort
    size_t count[0x100] = {};

    for (size_t i = 0; i < len; i++) {
        count[(*__p++ >> __shift) & 0xFF]++;
    }

    // prefix-sum
    size_t __m, __q = 0;
    for (int i = 0; i < 0x100; i++) {
        __m = count[i];
        count[i] = __q;
        __q += __m;
    }

    // filling result
    __p = __first;
    for (size_t i = 0; i < len; i++) {
        *(__s + count[(*__p >> __shift) & 0xFF]++) = *__p++;
    }

    // For each of the 256 bins, do a LSD radix sort on the bin.  The input
    // and auxiliary vectors have been swapped, so we pass needsSwap = true to
    // indicate that the LSD sort should end on the auxiliary vector instead.
    int startIndex = 0;
    for (int i = 0; i < 0x100; i++) {
        int endIndex = count[i];
        radix_sort_lsd(__s + startIndex, __s + endIndex, __first + startIndex,
                __first + endIndex, __shift, true);
        startIndex = endIndex;
    }
}

其他事情

我认为vector<value_type>是辅助数组的适当容器。我想不出比这更好的事了。

就复制vs移动而言,它们对于数字类型应该是等效的。有关原因的一些好解释,请参见这个堆栈溢出问题及其答案

票数 3
EN

Code Review用户

发布于 2017-02-28 17:17:42

我想知道使用了什么级别的优化,因为我在radix_sort_32上的时间平均为22.8ms,而平均为89.75ms,大约是速度的4倍。从使用迭代器到指针到向量的第一个元素的转换,然后使用指针索引,将这个值减少到14.8ms。

如前所述,在1百万32位整数的情况下,原始数组和工作数组都适用于许多处理器上的8MB L3/L4缓存。这大大减少了每次基数排序传递期间随机访问写入的开销。在随机访问写入成为一个更重要的问题之前,向量/数组大小需要更大,超出处理器缓存的范围。

代码语言:javascript
运行
复制
radix sort times for 1048576 32 bit unsgined integers
Intel 3770K, 3.5ghz, 32 bit mode (Windows XP), Visual Studio 2005

method                                       average time

radix_sort_32 (set msb = 32)                 0.0228 seconds
radix_sort_32 using pointer[index]           0.0148 seconds
array                                        0.0132 seconds
array with matrix for counts/indices         0.0127 seconds
array with 4 threads and matrix              0.0115 seconds

正如最初的海报所要求的那样,这里是多线程基排序的示例代码。多线程部分是Windows特定的.RadixSort()是以前发布的面向矩阵的函数(除了工作数组b[]作为参数传递以避免对多线程进行多次分配),而不是特定于Windows的函数。QP定义是用于Windows性能计数器。

代码语言:javascript
运行
复制
//------------------------------------------------------------------//
//  mtrsrt.cpp      mult-threaded radix sort                        //
//------------------------------------------------------------------//
#include <cstdlib>
#include <ctime>
#include <iostream>
#include <windows.h>

// COUNT must be multiple of 4    
#define COUNT (1024*1024)

typedef unsigned int uint32_t;

#define QP 0        // if != 0, use queryperformance for timer

#if QP
#include <math.h>
#include <windows.h>
#pragma comment(lib, "winmm.lib")
typedef LARGE_INTEGER LI64;
#endif

#if QP
static LI64     liQPFrequency;  // cpu counter values
static LI64     liStartTime;
static LI64     liStopTime;
static double   dQPFrequency;
static double   dStartTime;
static double   dStopTime;
static double   dElapsedTime;
#else
clock_t ctTimeStart;            // clock values
clock_t ctTimeStop;
#endif

static HANDLE hs0;                      // semaphore handles
static HANDLE hs1;
static HANDLE hs2;
static HANDLE hs3;
static HANDLE ht1;                      // thread handles
static HANDLE ht2;
static HANDLE ht3;

static DWORD WINAPI Thread0(LPVOID);    // thread functions
static DWORD WINAPI Thread1(LPVOID);
static DWORD WINAPI Thread2(LPVOID);
static DWORD WINAPI Thread3(LPVOID);

static uint32_t *pa;                    // pointers to buffers
static uint32_t *pb;

void RadixSort(uint32_t a[], uint32_t b[], size_t n);
void Merge(uint32_t a[], uint32_t b[], size_t ll, size_t rr, size_t ee);

uint32_t main()
{
uint32_t *array  = new uint32_t[COUNT];
uint32_t *buffer = new uint32_t[COUNT];
    pa = array;
    pb = buffer;
    for(uint32_t i = 0; i < COUNT; i++){    // generate pseudo random data
        uint32_t r;
        r  = (((uint32_t)((rand()>>4) & 0xff))<< 0);
        r += (((uint32_t)((rand()>>4) & 0xff))<< 8);
        r += (((uint32_t)((rand()>>4) & 0xff))<<16);
        r += (((uint32_t)((rand()>>4) & 0xff))<<24);
        array[i] = r;
    }

    hs0 = CreateSemaphore(NULL,0,1,NULL);
    hs1 = CreateSemaphore(NULL,0,1,NULL);
    hs2 = CreateSemaphore(NULL,0,1,NULL);
    hs3 = CreateSemaphore(NULL,0,1,NULL);
    ht1 = CreateThread(NULL, 0, Thread1, 0, 0, 0);
    ht2 = CreateThread(NULL, 0, Thread2, 0, 0, 0);
    ht3 = CreateThread(NULL, 0, Thread3, 0, 0, 0);

#if QP
    QueryPerformanceFrequency(&liQPFrequency);
    dQPFrequency = (double)liQPFrequency.QuadPart;
    timeBeginPeriod(1);                     // set ticker to 1000 hz
    Sleep(128);                             // wait for it to settle
    QueryPerformanceCounter(&liStartTime);
#else
    ctTimeStart = clock();
#endif

    ReleaseSemaphore(hs0, 1, NULL);     // start sorts
    ReleaseSemaphore(hs1, 1, NULL);
    ReleaseSemaphore(hs2, 1, NULL);
    ReleaseSemaphore(hs3, 1, NULL);
    Thread0((LPVOID)NULL);              // run thread 0
    WaitForSingleObject(ht2, INFINITE); // wait for thead 2
    // merge 1st and 2nd halves
    Merge(pb, pa, 0, COUNT>>1, COUNT);

#if QP
    QueryPerformanceCounter(&liStopTime);
    dStartTime = (double)liStartTime.QuadPart;
    dStopTime  = (double)liStopTime.QuadPart;
    dElapsedTime = (dStopTime - dStartTime) / dQPFrequency;
    timeEndPeriod(1);                       // restore ticker to default
    std::cout << "# of seconds " << dElapsedTime << std::endl;
#else
    ctTimeStop = clock();
    std::cout << "# of ticks " << ctTimeStop - ctTimeStart << std::endl;
#endif

    for(uint32_t i = 1; i < COUNT; i++){        // check result 
        if(array[i-1] > array[i]){
            std::cout << "failed" << std::endl;
            break;
        }
    }
    CloseHandle(ht3);
    CloseHandle(ht2);
    CloseHandle(ht1);
    CloseHandle(hs3);
    CloseHandle(hs2);
    CloseHandle(hs1);
    CloseHandle(hs0);
    delete[] buffer;
    delete[] array;
    return 0;
}

static DWORD WINAPI Thread0(LPVOID lpvoid)
{
    WaitForSingleObject(hs0, INFINITE); // wait for semaphore
    // sort 1st quarter
    RadixSort(pa + 0*(COUNT>>2), pb + 0*(COUNT>>2), COUNT>>2);
    WaitForSingleObject(ht1, INFINITE); // wait for thead 1
    // merge 1st and 2nd quarter
    Merge(pa + 0*(COUNT>>1), pb + 0*(COUNT>>1), 0, COUNT>>2, COUNT>>1);
    return 0;
}

static DWORD WINAPI Thread1(LPVOID lpvoid)
{
    WaitForSingleObject(hs1, INFINITE); // wait for semaphore
    // sort 2nd quarter
    RadixSort(pa + 1*(COUNT>>2), pb + 1*(COUNT>>2), COUNT>>2);
    return 0;
}

static DWORD WINAPI Thread2(LPVOID lpvoid)
{
    WaitForSingleObject(hs2, INFINITE); // wait for semaphore
    // sort 3rd quarter
    RadixSort(pa + 2*(COUNT>>2), pb + 2*(COUNT>>2), COUNT>>2);
    WaitForSingleObject(ht3, INFINITE); // wait for thread 3
    // merge 3rd and 4th quarter
    Merge(pa + 1*(COUNT>>1), pb + 1*(COUNT>>1), 0, COUNT>>2, COUNT>>1);
    return 0;
}

static DWORD WINAPI Thread3(LPVOID lpvoid)
{
    WaitForSingleObject(hs3, INFINITE); // wait for semaphore
    // sort 4th quarter
    RadixSort(pa + 3*(COUNT>>2), pb + 3*(COUNT>>2), COUNT>>2);
    return 0;
}

void RadixSort(uint32_t a[], uint32_t b[], size_t count)
{
size_t mIndex[4][256] = {0};            // count / index matrix
size_t i,j,m,n;
uint32_t u;
    for(i = 0; i < count; i++){         // generate histograms
        u = a[i];
        for(j = 0; j < 4; j++){
            mIndex[j][(size_t)(u & 0xff)]++;
            u >>= 8;
        }       
    }
    for(j = 0; j < 4; j++){             // convert to indices
        m = 0;
        for(i = 0; i < 256; i++){
            n = mIndex[j][i];
            mIndex[j][i] = m;
            m += n;
        }       
    }
    for(j = 0; j < 4; j++){             // radix sort
        for(i = 0; i < count; i++){     //  sort by current lsb
            u = a[i];
            m = (size_t)(u>>(j<<3))&0xff;
            b[mIndex[j][m]++] = u;
        }
        std::swap(a, b);                //  swap ptrs
    }
}

void Merge(uint32_t a[], uint32_t b[], size_t ll, size_t rr, size_t ee)
{
    size_t o = ll;                      // b[]       index
    size_t l = ll;                      // a[] left  index
    size_t r = rr;                      // a[] right index
    while(1){                           // merge data
        if(a[l] <= a[r]){               // if a[l] <= a[r]
            b[o++] = a[l++];            //   copy a[l]
            if(l < rr)                  //   if not end of left run
                continue;               //     continue (back to while)
            do                          //   else copy rest of right run
                b[o++] = a[r++];
            while(r < ee);
            break;                      //     and return
        } else {                        // else a[l] > a[r]
            b[o++] = a[r++];            //   copy a[r]
            if(r < ee)                  //   if not end of right run
                continue;               //     continue (back to while)
            do                          //   else copy rest of left run
                b[o++] = a[l++];
            while(l < rr);
            break;                      //     and return
        }
    }
}
票数 0
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/156275

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档