正如sort的含義,go的sort包提供排序的能力,其內(nèi)部實現(xiàn)了堆排、快排、插入排序、希爾排序和歸并排序,而且針對某些排序比如快排和歸并排序進行了優(yōu)化,做到了性能的極致
接口
// An implementation of Interface can be sorted by the routines in this package.
// The methods refer to elements of the underlying collection by integer index.
// 所有實現(xiàn)了該interface的都可以使用sort包進行排序
type Interface interface {
// Len is the number of elements in the collection.
// 待排序元素個數(shù)
Len() int
// Less reports whether the element with index i
// must sort before the element with index j.
//
// If both Less(i, j) and Less(j, i) are false,
// then the elements at index i and j are considered equal.
// Sort may place equal elements in any order in the final result,
// while Stable preserves the original input order of equal elements.
//
// Less must describe a transitive ordering:
// - if both Less(i, j) and Less(j, k) are true, then Less(i, k) must be true as well.
// - if both Less(i, j) and Less(j, k) are false, then Less(i, k) must be false as well.
//
// Note that floating-point comparison (the < operator on float32 or float64 values)
// is not a transitive ordering when not-a-number (NaN) values are involved.
// See Float64Slice.Less for a correct implementation for floating-point values.
// 比較第i個元素和第j個元素,具體如何比較自己按需實現(xiàn),返回一個bool類型變量
// 這里提一嘴,你可以通過實現(xiàn)Less來控制從大到小排序和從小到大排序
Less(i, j int) bool
// Swap swaps the elements with indexes i and j.
// 交換第i和第j個元素
Swap(i, j int)
}
插入排序
// insertionSort sorts data[a:b] using insertion sort.
// 中規(guī)中矩的插入排序?qū)崿F(xiàn)
// 時間復雜度n^2
func insertionSort(data Interface, a, b int) {
// 從第a+1遍歷到第b-1個元素
for i := a + 1; i < b; i++ {
// 依次插入元素并進行向前比較、調(diào)換
for j := i; j > a && data.Less(j, j-1); j-- {
data.Swap(j, j-1)
}
}
}
堆排序
// siftDown implements the heap property on data[lo:hi].
// first is an offset into the array where the root of the heap lies.
// 向下追溯的過程,這個之前在講解container包的文章中已經(jīng)說過了,這里不再贅述
func siftDown(data Interface, lo, hi, first int) {
root := lo
for {
child := 2*root + 1
if child >= hi {
break
}
if child+1 < hi && data.Less(first+child, first+child+1) {
child++
}
if !data.Less(first+root, first+child) {
return
}
data.Swap(first+root, first+child)
root = child
}
}
// 中規(guī)中矩的堆排
func heapSort(data Interface, a, b int) {
first := a
lo := 0
hi := b - a
// 找到最后一個葉子節(jié)點,然后向前遍歷所有葉子節(jié)點到根節(jié)點
// 從下往上構(gòu)造最大/小堆
// Build heap with greatest element at top.
for i := (hi - 1) / 2; i >= 0; i-- {
siftDown(data, i, hi, first)
}
// Pop elements, largest first, into end of data.
// 構(gòu)造完最大/小堆之后,就開始將堆頂元素與最后一個元素互換
// 這樣最大/小的元素就到最后了,然后繼續(xù)向下追溯找下一個正確的堆頂元素
// 重復該動作,最后就是遞增/減有序的了
for i := hi - 1; i >= 0; i-- {
data.Swap(first, first+i)
siftDown(data, lo, i, first)
}
}
快排
// Quicksort, loosely following Bentley and McIlroy,
// ``Engineering a Sort Function,'' SP&E November 1993.
// medianOfThree moves the median of the three values data[m0], data[m1], data[m2] into data[m1].
// 該方法就是找三數(shù)中值,即保證最終m0 <= m1 <= m2,m1是中間值
func medianOfThree(data Interface, m1, m0, m2 int) {
// sort 3 elements
if data.Less(m1, m0) {
data.Swap(m1, m0)
}
// data[m0] <= data[m1]
if data.Less(m2, m1) {
data.Swap(m2, m1)
// data[m0] <= data[m2] && data[m1] < data[m2]
if data.Less(m1, m0) {
data.Swap(m1, m0)
}
}
// now data[m0] <= data[m1] <= data[m2]
}
// Sort sorts data.
// It makes one call to data.Len to determine n and O(n*log(n)) calls to
// data.Less and data.Swap. The sort is not guaranteed to be stable.
// 對外提供的非穩(wěn)定排序的方法
func Sort(data Interface) {
// 獲取待排序元素個數(shù)
n := data.Len()
// 排序
// 注意這里會通過maxDepth計算出數(shù)據(jù)量級
// quickSort會根據(jù)數(shù)據(jù)量級來選擇合適的排序算法
quickSort(data, 0, n, maxDepth(n))
}
// maxDepth returns a threshold at which quicksort should switch
// to heapsort. It returns 2*ceil(lg(n+1)).
// 計算結(jié)構(gòu)就是 2*ceil(lg(n+1))
// 即使用二叉樹來裝載數(shù)據(jù),對應深度的2倍就是計算出的量級
// 通過這個來控制遞歸的深度
// 至于為什么是2*ceil(lg(n+1)),這個暫時不清楚,可能是實驗的結(jié)果吧
func maxDepth(n int) int {
var depth int
for i := n; i > 0; i >>= 1 {
depth++
}
return depth * 2
}
// 看函數(shù)名是快排,但實際實現(xiàn)并不是單純的快排算法
func quickSort(data Interface, a, b, maxDepth int) {
// 如果待排元素數(shù)量 <=12 就會使用后面的希爾+插入排序
for b-a > 12 { // Use ShellSort for slices <= 12 elements
// 如果超過了遞歸深度,則退化為堆排序
// 這里沒有直接用堆排序來實現(xiàn)主要還是因為快排比堆排綜合性能高一些(兩者的平均復雜度都是O(n*logn))
// 主要表現(xiàn)在兩點,一個是快排的數(shù)據(jù)交換少點,另外一個是快排的數(shù)據(jù)訪問遵從局部性原則,而堆排則是跳躍訪問的,對cpu緩存不友好
if maxDepth == 0 {
heapSort(data, a, b)
return
}
// 每次遞歸數(shù)減一
maxDepth--
// doPivot返回兩個位置,一個是低位一個是高位
// 這里有些人可能有些困惑,因為傳統(tǒng)的快排算法只需要算出中間位置即可
// 原因能是因為對傳統(tǒng)快排算法進行了優(yōu)化,具體怎么優(yōu)化,doPivot里面會說
// 這里你就認為mlo及其左邊的元素都是 <= 某個值的
// mhi及其右邊的元素都是 > 某個值的
mlo, mhi := doPivot(data, a, b)
// Avoiding recursion on the larger subproblem guarantees
// a stack depth of at most lg(b-a).
// 通過上面的操作,待排序的數(shù)據(jù)分兩段,即a - mlo 和 mhi - b
// 選取元素更少的那段進行遞歸
// 元素更多的繼續(xù)循環(huán)切分成小段
// 這樣可以減少遞歸深度
if mlo-a < b-mhi {
quickSort(data, a, mlo, maxDepth)
a = mhi // i.e., quickSort(data, mhi, b)
} else {
quickSort(data, mhi, b, maxDepth)
b = mlo // i.e., quickSort(data, a, mlo)
}
}
// 如果待排元素數(shù)量 <=12 && >= 2
// 直接選用希爾+插入排序
if b-a > 1 {
// Do ShellSort pass with gap 6
// It could be written in this simplified form cause b-a <= 12
// 希爾本省就是插入的優(yōu)化,核心思路就是通過局部有序來達到整體大致有序,從而減少插入過程中的swap次數(shù)
// 這里因為數(shù)據(jù)量很少,所以只對間隔為6的數(shù)據(jù)進行了兩兩比較替換
for i := a + 6; i < b; i++ {
if data.Less(i, i-6) {
data.Swap(i, i-6)
}
}
// 最后插入排序
insertionSort(data, a, b)
}
}
// 終于到了核心部分了
func doPivot(data Interface, lo, hi int) (midlo, midhi int) {
// lo+hi/2
// 位操作寫法學起來
m := int(uint(lo+hi) >> 1) // Written like this to avoid integer overflow.
// 這是第一個優(yōu)化點
// 常規(guī)的快排一般選取第一個/最后一個元素作為哨兵
// 這種方式在某些有序的情況下會導致切分后的兩段長度非常不均勻,導致時間復雜度急劇下降到O(n^2)
// Tukey提出一種解決思路,就是以中位數(shù)的中位數(shù)的方式來選取哨兵,具體可見https://www.johndcook.com/blog/2009/06/23/tukey-median-ninther/
// 其中也給了個例子,比如數(shù)據(jù)集3, 1, 4, 4, 5, 9, 9, 8, 2
// yA = median( 3, 1, 4 ) = 3
// yB = median( 4, 5, 9 ) = 5
// yC = median( 9, 8, 2 ) = 8
// 最后再求(3, 5, 8)的中位數(shù)得到5,選取5作為哨兵
// 具體能提升多少效率以及為什么能提升,這里不贅述了,自己看論文去
// 言歸正傳,當元素數(shù)量 > 40的時候,使用3+3+3的方式計算最后的中位數(shù)
if hi-lo > 40 {
// Tukey's ``Ninther,'' median of three medians of three.
// 計算跨度,為啥除以8呢,因為切成8份,就能平均分出9個位點,剛好就是3+3+3
// 不信你假設hi-ho=80,80/8 = 10,切成8份
// lo lo+s lo+2*s m m-s m+s hi-1 hi-1-s hi-1-2*s剛好對應兩個邊界點和7個中間分割點
s := (hi - lo) / 8
// lo是中位數(shù)
medianOfThree(data, lo, lo+s, lo+2*s)
// m是中位數(shù)
medianOfThree(data, m, m-s, m+s)
// hi-1是中位數(shù)
medianOfThree(data, hi-1, hi-1-s, hi-1-2*s)
}
// 將三個中位數(shù)再算一次中位數(shù)
// 當元素數(shù)量 <= 40的時候,直接計算(lo, m, hi-1)的中位數(shù)
// 這里提下,medianOfThree的實現(xiàn)方式會對數(shù)據(jù)進行調(diào)換,也就是說最終中位數(shù)就是lo,lo就是哨兵
medianOfThree(data, lo, m, hi-1)
// Invariants are:
// data[lo] = pivot (set up by ChoosePivot)
// data[lo < i < a] < pivot
// data[a <= i < b] <= pivot
// data[b <= i < c] unexamined
// data[c <= i < hi-1] > pivot
// data[hi-1] >= pivot
// 哨兵pivot = lo
pivot := lo
// a是第二個元素(以lo為基準),c是最后一個元素(這里hi可以去追溯一下,最開始傳進來的就是元素的數(shù)量即長度,索引下標減1)
// 這么一頓操作之后,我們可以得出以下結(jié)論
// 第hi-1個元素即data[hi-1](后續(xù)都這么表示)肯定是 >= pivot的
// 因為第二輪取中位數(shù)data[hi-1]就是最大的
a, c := lo+1, hi-1
// 從a遍歷到c,直到第一個 >=pivot的元素
for ; a < c && data.Less(a, pivot); a++ {
}
// 將這個位置記錄下來
// 注意,這個位置很重要,后面第二次優(yōu)化會用到
// 也就是說(lo,a)(注意這是個開區(qū)間)之間的數(shù)據(jù)都是 < pivot的
b := a
for {
// 這里就正式兩頭遍歷對比哨兵并進行替換了
// 注意!data.Less(pivot, b)代表所有 <=pivot的元素都在b的左邊,注意=
for ; b < c && !data.Less(pivot, b); b++ { // data[b] <= pivot
}
// 這里為啥從c-1開始呢,因為上面說了data[hi-1]肯定是 > pivot的
for ; b < c && data.Less(pivot, c-1); c-- { // data[c-1] > pivot
}
// 循環(huán)臨界條件
// 一旦b和c碰上了肯定就結(jié)束了
if b >= c {
break
}
// data[b] > pivot; data[c-1] <= pivot
// 交換
data.Swap(b, c-1)
// 繼續(xù)向后和向前
b++
c--
}
// 執(zhí)行到這里,我們又可以得出以下結(jié)論
// b要么=c,要么>c
// data[c, hi-1) > pivot 為啥c是閉區(qū)間,雖然是拿c-1去比較,但是最后c--,所以c-1+1=c
// If hi-c<3 then there are duplicates (by property of median of nine).
// Let's be a bit more conservative, and set border to 5.
// 當當當當,到了第二個優(yōu)化的位置了
// 先看看某個場景,假設待排序結(jié)合中有很多的元素=pivot
// 正常我們希望所有跟pivot相等的元素都集中到一個點,然后再對兩邊的 <pivot和>pivot段進行排序
// 然而事實就是傳統(tǒng)的快排算法并沒有做到這點,所有跟pivot相等的元素都會被隨機的分配到左邊或者右邊
// 所以第二個優(yōu)化就是要講所有與pivot相等的元素都集中到pivot周圍,然后計算出左右區(qū)間的位置,即要返回的mlo和mhi
// 言歸正傳,上面說過 data[c, hi-1) > pivot
// protect就是>pivot的數(shù)量,如果這個數(shù) < 5,就可以認為=pivot的元素比較多
// 至于什么叫多,為啥是5,我也不太清楚,這里只說思路
protect := hi-c < 5
// 如果protect >= 5
// 但是> pivot的元素數(shù)量還不及總數(shù)量的1/4,我們還需要抽樣來進一步確認
if !protect && hi-c < (hi-lo)/4 {
// Lets test some points for equality to pivot
dups := 0
// 抽樣data[hi-1]
// 判斷是否=pivot
if !data.Less(pivot, hi-1) { // data[hi-1] = pivot
// 調(diào)換,還記得吧 data[c, hi-1) > pivot
data.Swap(c, hi-1)
// c自增,即往右邊挪
c++
// 計數(shù)加1
dups++
}
// 抽樣data[b-1],因為上面最后b++了
// 判斷是否=pivot
if !data.Less(b-1, pivot) { // data[b-1] = pivot
// b往左邊挪
b--
dups++
}
// m-lo = (hi-lo)/2 > 6
// b-lo > (hi-lo)*3/4-1 > 8
// ==> m < b ==> data[m] <= pivot
// 抽樣data[m],通過上線計算可知m在b的左邊
if !data.Less(m, pivot) { // data[m] = pivot
// 所以交換b和m
data.Swap(m, b-1)
// b往左挪
b--
dups++
}
// if at least 2 points are equal to pivot, assume skewed distribution
// 如果有兩個抽樣點=pivot,就可以認為=pivot的元素比較多
protect = dups > 1
}
// 如果判定需要優(yōu)化
if protect {
// Protect against a lot of duplicates
// Add invariant:
// 到這里經(jīng)過上面的b左挪,c右挪可以知道如下
// data[a <= i < b] unexamined
// data[b <= i < c] = pivot (主要看這個)
// 也就是說[b,c)區(qū)間都是 =pivot的元素
// 接下來就判斷(a,b)區(qū)間就行了,這里終于發(fā)現(xiàn)了a的作用了,就是為了盡量減少這里的區(qū)間長度
for {
// 做右往左找<pivot的元素
for ; a < b && !data.Less(b-1, pivot); b-- { // data[b] == pivot
}
// 從左往右找=pivot的元素
for ; a < b && data.Less(a, pivot); a++ { // data[a] < pivot
}
// 臨界條件
if a >= b {
break
}
// data[a] == pivot; data[b-1] < pivot
// 替換
data.Swap(a, b-1)
a++
b--
}
}
// Swap pivot into middle
// 最終將哨兵和b-1替換,這里為啥是b-1呢
// 看上面data.Swap(a, b-1)后data[b-1]肯定是=pivot的,然后b--那么data[b] = pivot
// 替換哨兵pivot肯定是要跟離b最近的<pivot的元素替換,所以是b-1咯
data.Swap(pivot, b-1)
// 最后返回低位和高位
return b - 1, c
}
堆排
// Stable用于穩(wěn)定排序
func Stable(data Interface) {
stable(data, data.Len())
}
// 主要是用歸并排序+插入排序來實現(xiàn)
func stable(data Interface, n int) {
// 定義塊大小,這里是20
blockSize := 20 // must be > 0
a, b := 0, blockSize
// 以20個元素為一組,并對每組使用插入排序達到組內(nèi)有序
// 這里分組可以減少遞歸深度,至于為啥是20個,可能是20個插入排序和歸并排序性能差不多吧,實驗結(jié)果,不必糾結(jié)
for b <= n {
insertionSort(data, a, b)
a = b
b += blockSize
}
// 最后一組可能不滿20個
insertionSort(data, a, n)
// 組分好了,也排好了,就該合并了
for blockSize < n {
a, b = 0, 2*blockSize
// 從左到右兩兩合并
for b <= n {
symMerge(data, a, a+blockSize, b)
a = b
b += 2 * blockSize
}
// 如果最后兩組不夠2*blockSize
if m := a + blockSize; m < n {
symMerge(data, a, m, n)
}
// 每次兩兩合并之后,blockSize肯定翻倍的
blockSize *= 2
}
}
// symMerge merges the two sorted subsequences data[a:m] and data[m:b] using
// the SymMerge algorithm from Pok-Son Kim and Arne Kutzner, "Stable Minimum
// Storage Merging by Symmetric Comparisons", in Susanne Albers and Tomasz
// Radzik, editors, Algorithms - ESA 2004, volume 3221 of Lecture Notes in
// Computer Science, pages 714-723. Springer, 2004.
//
// Let M = m-a and N = b-n. Wolog M < N.
// The recursion depth is bound by ceil(log(N+M)).
// The algorithm needs O(M*log(N/M + 1)) calls to data.Less.
// The algorithm needs O((M+N)*log(M)) calls to data.Swap.
//
// The paper gives O((M+N)*log(M)) as the number of assignments assuming a
// rotation algorithm which uses O(M+N+gcd(M+N)) assignments. The argumentation
// in the paper carries through for Swap operations, especially as the block
// swapping rotate uses only O(M+N) Swaps.
//
// symMerge assumes non-degenerate arguments: a < m && m < b.
// Having the caller check this condition eliminates many leaf recursion calls,
// which improves performance.
// symMerge就是合并的具體實現(xiàn)了
// 具體實現(xiàn)吧是基于論文 Stable Minimum Storage Merging by Symmetric Comparisons
// 這里寫的實在不知道咋用語言表達了,所以自己去看論文吧
func symMerge(data Interface, a, m, b int) {
// Avoid unnecessary recursions of symMerge
// by direct insertion of data[a] into data[m:b]
// if data[a:m] only contains one element.
if m-a == 1 {
// Use binary search to find the lowest index i
// such that data[i] >= data[a] for m <= i < b.
// Exit the search loop with i == b in case no such index exists.
i := m
j := b
for i < j {
h := int(uint(i+j) >> 1)
if data.Less(h, a) {
i = h + 1
} else {
j = h
}
}
// Swap values until data[a] reaches the position before i.
for k := a; k < i-1; k++ {
data.Swap(k, k+1)
}
return
}
// Avoid unnecessary recursions of symMerge
// by direct insertion of data[m] into data[a:m]
// if data[m:b] only contains one element.
if b-m == 1 {
// Use binary search to find the lowest index i
// such that data[i] > data[m] for a <= i < m.
// Exit the search loop with i == m in case no such index exists.
i := a
j := m
for i < j {
h := int(uint(i+j) >> 1)
if !data.Less(m, h) {
i = h + 1
} else {
j = h
}
}
// Swap values until data[m] reaches the position i.
for k := m; k > i; k-- {
data.Swap(k, k-1)
}
return
}
mid := int(uint(a+b) >> 1)
n := mid + m
var start, r int
if m > mid {
start = n - b
r = mid
} else {
start = a
r = m
}
p := n - 1
for start < r {
c := int(uint(start+r) >> 1)
if !data.Less(p-c, c) {
start = c + 1
} else {
r = c
}
}
end := n - start
if start < m && m < end {
rotate(data, start, m, end)
}
if a < start && start < mid {
symMerge(data, a, start, mid)
}
if mid < end && end < b {
symMerge(data, mid, end, b)
}
}
一些常用的函數(shù)
// Ints sorts a slice of ints in increasing order.
func Ints(x []int) { Sort(IntSlice(x)) }
// Float64s sorts a slice of float64s in increasing order.
// Not-a-number (NaN) values are ordered before other values.
func Float64s(x []float64) { Sort(Float64Slice(x)) }
// Strings sorts a slice of strings in increasing order.
func Strings(x []string) { Sort(StringSlice(x)) }
// IntsAreSorted reports whether the slice x is sorted in increasing order.
func IntsAreSorted(x []int) bool { return IsSorted(IntSlice(x)) }
// Float64sAreSorted reports whether the slice x is sorted in increasing order,
// with not-a-number (NaN) values before any other values.
func Float64sAreSorted(x []float64) bool { return IsSorted(Float64Slice(x)) }
// StringsAreSorted reports whether the slice x is sorted in increasing order.
func StringsAreSorted(x []string) bool { return IsSorted(StringSlice(x)) }
// 舉個栗子
func main() {
s := []int{1,2,1}
sort.Ints(s)
fmt.Println(s) // 1,1,2
}
// 注意上面幾個方法都是非穩(wěn)定排序的
// 那如果要用穩(wěn)定排序怎么辦呢
// sort也提供了幾個自定義類型,如下
type IntSlice []int
func (x IntSlice) Len() int { return len(x) }
func (x IntSlice) Less(i, j int) bool { return x[i] < x[j] }
func (x IntSlice) Swap(i, j int) { x[i], x[j] = x[j], x[i] }
// Sort is a convenience method: x.Sort() calls Sort(x).
func (x IntSlice) Sort() { Sort(x) }
// Float64Slice implements Interface for a []float64, sorting in increasing order,
// with not-a-number (NaN) values ordered before other values.
type Float64Slice []float64
func (x Float64Slice) Len() int { return len(x) }
// Less reports whether x[i] should be ordered before x[j], as required by the sort Interface.
// Note that floating-point comparison by itself is not a transitive relation: it does not
// report a consistent ordering for not-a-number (NaN) values.
// This implementation of Less places NaN values before any others, by using:
//
// x[i] < x[j] || (math.IsNaN(x[i]) && !math.IsNaN(x[j]))
//
func (x Float64Slice) Less(i, j int) bool { return x[i] < x[j] || (isNaN(x[i]) && !isNaN(x[j])) }
func (x Float64Slice) Swap(i, j int) { x[i], x[j] = x[j], x[i] }
// isNaN is a copy of math.IsNaN to avoid a dependency on the math package.
func isNaN(f float64) bool {
return f != f
}
// Sort is a convenience method: x.Sort() calls Sort(x).
func (x Float64Slice) Sort() { Sort(x) }
// StringSlice attaches the methods of Interface to []string, sorting in increasing order.
type StringSlice []string
func (x StringSlice) Len() int { return len(x) }
func (x StringSlice) Less(i, j int) bool { return x[i] < x[j] }
func (x StringSlice) Swap(i, j int) { x[i], x[j] = x[j], x[i] }
// Sort is a convenience method: x.Sort() calls Sort(x).
func (x StringSlice) Sort() { Sort(x) }
// 再舉個栗子(穩(wěn)定排序)
func main() {
s := []int{1,2,1}
sort.Stable(sort.IntSlice(s))
fmt.Println(s) // 1,1,2
}
總結(jié)
不得不說,代碼性能極致和可讀性真是不可兼得,這sort包寫的,啃得我牙都碎了~