关于排序,我想没有多少程序员会感觉陌生,信手拈来的可能就有不少:冒泡、选择或者归并等等;可在实际开发中,排序又让许多程序员感到很不熟悉,因为在大多数情况下,自己实现一个排序算法都不是一个好主意,相反的,改而使用语言框架内建提供的排序功能才是明智之举,毕竟她又方便又高效……
久而久之,“排序”便与许多程序员成了“最熟悉的陌生人”,成了我们生活中司空见惯的“工具”:想要实现数组排序?简单,一个 sort(array) 搞定,便捷又高效!什么?它内部是怎么完成排序的?这个……不是特别清楚了……不过……管他呢……嘿嘿,不知你是否属于这种情况,反正我是 :)
这些天自己正在看些书籍,上面便提到了不少排序的算法,本以为自己对这些算法早已深谙于心,没想实际实现之时便窘态毕现,我想大抵可能便如上所述,“娇惯纵容”多了,以前只要简单的调调 sort,而今真刀实枪起来便不胜招架了,也罢,有了些许教训,也算进一步认识到“知其然不知其所以然”的道理,在此简单笔记一番,引以为戒吧 ~
而“快排”(快速排序)便是这次笔记的主题,话说在各类排序算法中,“快排”应该算是“明星”算法了,因其时间、空间复杂度俱佳,而被广泛运用于实际程序开发中(也许上面那个 sort 便是 :)),网上已有非常多优秀的教程说明(譬如这里和这里),在此我也无需过多费舌,主要的内容还是实现源码以及心得体会,仅此而已 :)
快速排序的基本思想便是分而治之,大体上分为三个主要步骤:
1. 在数据集中选取一个pivot元素
2. 根据选定的pivot,将数据集划分为左右两部分,左部皆小于pivot,右部皆大于pivot
3. 循环1、2两步于上述所划分的两部分数据之上,直到部分只剩下一个数据元素为止
根据上述的算法步骤,一个典型的快排程序,大抵便是这个样子:
/*!
\param array number array pointer
\param left left number range([left, right])
\param right right number range([left, right])
\return pivot index
*/
int GetPivot(int* array, int left, int right)
{
// simple pivot get
return left;
}
/*!
\param array number array pointer
\param left left number range([left, right])
\param right right number range([left, right])
\return pivot index after partition
*/
int Partition(int* array, int left, int right)
{
int pivot = GetPivot(array, left, right);
swap(array[left], array[pivot]);
// now left have the pivot value
int val = array[left];
int lp = left;
int rp = right + 1;
// preprocessing for avoiding some boder problems
do { ++lp; } while (array[lp] <= val && lp < rp);
do { --rp; } while (array[rp] > val);
// do loop partition
while (lp < rp)
{
swap(array[lp], array[rp]);
do { ++lp; } while (array[lp] <= val);
do { --rp; } while (array[rp] > val);
}
// swap back pivot value
swap(array[left], array[rp]);
return rp;
}
/*!
\param array number array pointer
\param left left number range([left, right])
\param right right number range([left, right])
*/
void QuickSort(int* array, int left, int right)
{
if (left < right)
{
int pivot = Partition(array, left, right);
QuickSort(array, left, pivot - 1);
QuickSort(array, pivot + 1, right);
}
}
当然,针对上述的三个步骤还有其他的实现方法,譬如这样:
/*!
\param array number array pointer
\param left left number range([left, right])
\param right right number range([left, right])
\return pivot index
*/
int GetPivot(int* array, int left, int right)
{
// random chioce pivot
int count = right - left + 1;
return left + rand() % count;
}
/*!
\param array number array pointer
\param left left number range([left, right])
\param right right number range([left, right])
\return pivot index after partition
*/
int Partition(int* array, int left, int right)
{
int pivot = GetPivot(array, left, right);
swap(array[left], array[pivot]);
// now left have the pivot value
int val = array[left];
int searchIndex = left + 1;
int sortedIndex = searchIndex;
while (searchIndex <= right)
{
if (array[searchIndex] <= val)
{
swap(array[sortedIndex], array[searchIndex]);
++sortedIndex;
}
++searchIndex;
}
// swap back pivot value
swap(array[left], array[sortedIndex - 1]);
return sortedIndex - 1;
}
/*!
\param array number array pointer
\param left left number range([left, right])
\param right right number range([left, right])
*/
void QuickSort(int* array, int left, int right)
{
int pivot = Partition(array, left, right);
if (pivot - 1 > left)
{
QuickSort(array, left, pivot - 1);
}
if (pivot + 1 < right)
{
QuickSort(array, pivot + 1, right);
}
}
虽说上面的两种实现细节上有所差异,但是基本都属于递归形式,并且递归形式也是快排算法(或者说对于很多二分(甚至多分)算法)实现的一般方法,有趣的是,上面提到的书籍中也说到了另一种实现快排算法的“循环”方式,颇有趣味:
//! sort range structure
struct SortRange
{
int l;
int r;
SortRange(int l_, int r_):l(l_),r(r_) {};
SortRange(const SortRange& si):l(si.l),r(si.r) {};
SortRange& operator = (const SortRange& si)
{
l = si.l;
r = si.r;
return *this;
}
};
/*!
\param array number array pointer
\param l left number range([left, right])
\param r right number range([left, right])
*/
void QuickSortIter(int* array, int l, int r)
{
std::queue<SortRange> q;
q.push(SortRange(l, r));
while (!q.empty())
{
// get front sort index and pop it
SortRange si = q.front();
q.pop();
int left = si.l;
int right = si.r;
if (left < right)
{
// do partition
int pivot = Partition(array, left, right);
// push back sub sort range
if (pivot - 1 > left)
{
q.push(SortRange(left, pivot - 1));
}
if (pivot + 1 < right)
{
q.push(SortRange(pivot + 1, right));
}
}
}
}
代码的思路其实非常简单,大体上便是模拟原先递归实现的调用过程,上面的源码中使用 std::queue 模拟了这个过程,当然,使用 std::stack(或者其他类似结构)同样也可以,两者只在子问题的解决顺序上存在差异,并不影响快速排序的正确性。
接着,书中又顺势提到了快排的各类并行实现方法,其中最直接的一个想法可能就是延承上面的递归算法,为每一次的Partition操作都生成一个线程,由于各个线程之间所操作的数据基本独立,数据竞争问题并不存在(暂不考虑“伪共享”之类的问题),但是由于后期数据粒度过细(基本上每一个数据项都产生了一个线程),往往导致线程调度的开销过大,而对于一些线程数量受限的平台,以上的方法甚至都无法实现,所以总的来看,简单延伸递归来实现并行的想法并不十分“靠谱”……
但是如果我们扩展思路,并不通过数据分解,而是通过任务分解来看待快排问题的话,那么快排的并行实现就会变的相对明晰,而这个任务分解,其实就是上面快排“循环”实现的一个延伸:
struct SortParam
{
int* a;
int l;
int r;
SortParam(int* a_, int l_, int r_):a(a_),l(l_),r(r_) {};
SortParam(const SortParam& sp):a(sp.a),l(sp.l),r(sp.r) {};
SortParam& operator = (const SortParam& sp)
{
a = sp.a;
l = sp.l;
r = sp.r;
return *this;
}
};
queue<SortParam> g_queue;
LONG g_count = 0;
bool g_done = false;
HANDLE g_sem = NULL;
HANDLE g_signal = NULL;
HANDLE g_mutex = NULL;
//! quick sort thread function, param is array number
unsigned __stdcall QuickSort(LPVOID pArg)
{
int N = *((int*)pArg);
while (true)
{
WaitForSingleObject(g_sem, INFINITE);
if (g_done) break;
WaitForSingleObject(g_mutex, INFINITE);
SortParam sp = g_queue.front();
g_queue.pop();
ReleaseMutex(g_mutex);
int* a = sp.a;
int l = sp.l;
int r = sp.r;
if (l < r)
{
int p = Partition(a, l, r);
// NOTE: since every partiton will cause pivot set, so we increment sort count here
InterlockedIncrement(&g_count);
WaitForSingleObject(g_mutex, INFINITE);
g_queue.push(SortParam(a, l, p - 1));
g_queue.push(SortParam(a, p + 1, r));
ReleaseMutex(g_mutex);
ReleaseSemaphore(g_sem, 2, NULL);
}
else if (l == r) // single element, already sorted
{
LONG t = InterlockedIncrement(&g_count);
if (t == N) SetEvent(g_signal); // all sorted
}
}
return 0;
}
int main()
{
int array[] = { 5, 4, 3, 2, 1, 7, 8, 9, 13, 0, 0, 2 };
int arrayNum = sizeof(array) / sizeof(arrayNum);
g_queue.push(SortParam(array, 0, arrayNum - 1));
g_sem = CreateSemaphore(NULL, 1, arrayNum, NULL);
g_signal = CreateEvent(NULL, true, false, NULL);
g_mutex = CreateMutex(NULL, false, NULL);
const int NUM_THREADS = 4;
for (int i = 0; i < NUM_THREADS; ++i)
{
_beginthreadex(NULL, 0, QuickSort, &arrayNum, 0, NULL);
}
WaitForSingleObject(g_signal, INFINITE);
g_done = true;
ReleaseSemaphore(g_sem, NUM_THREADS, NULL);
// NOTE: now we just sleep for a while to wait for threads quit
// TODO: join all
Sleep(100);
CloseHandle(g_sem);
CloseHandle(g_signal);
CloseHandle(g_mutex);
return 0;
}
基本思路便是通过信号量来获取可能存在的待排序的数据分段(即任务),并通过一个原子操作控制的计数器(g_count)来检测是否排序完毕,并退出相应排序线程,实际开发时甚至可以引入线程池来优化实现,当然,其中涉及的细节还有不少,有兴趣的朋友可以找来原书来看 :)
网上同样也有很多优秀的代码范例,虽说其中所列的大部分语言我本人都未曾听说过(稍囧),但是对于那些自己稍有了解的语言,范例所给的源码还是非常不错的,其中的一个Python实现令人印象深刻:
def qsort(L):
return (qsort([y for y in L[1:] if y < L[0]]) + L[:1] + qsort([y for y in L[1:] if y >= L[0]])) if len(L) > 1 else L
OK,That's It :)
附录:
近日在这里又看到了几个快排的实现示例,觉得也很有意思,一并摘录如下:(代码中用到了不少C++11特性,gcc4.8.1编译应该没有问题,至于VS2013和Clang就没有测试了:))
#include <iostream>
#include <list>
#include <algorithm>
#include <future>
template<typename T>
std::list<T> sequential_quick_sort(std::list<T> input) {
if (input.size() <= 1) {
return input;
}
std::list<T> result;
result.splice(result.begin(), input, input.begin());
const T& pivot = *result.begin();
auto divide_point = std::partition(input.begin(), input.end(), [&pivot](const T& t){ return t < pivot; });
std::list<T> lower_part;
lower_part.splice(lower_part.begin(), input, input.begin(), divide_point);
auto new_lower = sequential_quick_sort(std::move(lower_part));
auto new_higher = sequential_quick_sort(std::move(input));
result.splice(result.end(), new_higher);
result.splice(result.begin(), new_lower);
return result;
}
template<typename T>
std::list<T> parallel_quick_sort(std::list<T> input) {
if (input.size() <= 1) {
return input;
}
std::list<T> result;
result.splice(result.begin(), input, input.begin());
const T& pivot = *result.begin();
auto divide_point = std::partition(input.begin(), input.end(), [&pivot](const T& t){ return t < pivot; });
std::list<T> lower_part;
lower_part.splice(lower_part.begin(), input, input.begin(), divide_point);
std::future<std::list<T>> new_lower = std::async(¶llel_quick_sort<T>, std::move(lower_part));
auto new_higher = parallel_quick_sort(std::move(input));
result.splice(result.end(), new_higher);
result.splice(result.begin(), new_lower.get());
return result;
}
int main() {
std::list<int> list1 = { 6, 7, 8, 9, 1, 2, 3, 4, 5 };
std::list<int> list2 = { 6, 7, 8, 9, 1, 2, 3, 4, 5 };
auto print = [](const std::list<int>& l) {
for (const auto& e : l) {
std::cout << e << " ";
}
std::cout << "\n";
};
print(list1);
list1 = sequential_quick_sort(std::move(list1));
print(list1);
print(list2);
list2 = parallel_quick_sort(std::move(list2));
print(list2);
return 0;
}