qsort より速い 「多重分割ソート(mps)」 のソースプログラムです。 身近な計算機で調べた、 mps の性能は以下の通りです。 実行時間の中にはデータの準備とソート後の検査が含まれています。 ソートの実行時間(単位:秒) 要素数4万 要素サイズ200バイト キーの種類| 乱数 | 乱数%1000 | 関数の種類| qsort qs6 mps | qsort qs6 mps | 機種名 |-----------------|-----------------| NEWS5000 | 3.6 1.5 1.4 | 3.1 1.3 1.0 | FreeBSD2 | 2.1 1.6 1.3 | 1.9 1.3 0.9 | POWER520 | 19.5 4.9 3.4 | 15.1 3.2 2.2 | HP9000 | 7.0 2.6 2.1 | 5.5 2.0 1.4 | 比較関数の呼び出し回数(単位:回) キーの種類| 乱数 | 乱数%1000 | 関数の種類| qsort qs6 mps | qsort qs6 mps | 機種名 |----------------------|-------------------------| 乱数関数 NEWS5000 | 673296 598763 599986 | 1064581 466331 391469 | random FreeBSD2 | 631870 598763 599986 | 392303 466331 391469 | random POWER520 | 696959 598763 599986 | 465540 466331 391469 | random HP9000 | 703122 605522 583968 | 490942 463712 390785 | rand 要素の代入回数(単位:回) キーの種類| 乱数 | 乱数%1000 | 関数の種類| qsort qs6 mps | qsort qs6 mps | 機種名 |----------------------|-------------------------| NEWS5000 | - 297280 159144 | - 194563 51827 | FreeBSD2 | - 297280 159144 | - 194563 51827 | POWER520 | - 297280 159144 | - 194563 51827 | HP9000 | - 285801 144486 | - 194879 52048 | 機種名とOSの対応 機種名 |会社名| 正式機種名  |   CPU |   OS | ---------|------|-----------------|---------------|----------------| NEWS5000 |SONY |NEWS-5000VI | R4000 | NEWS-OS-4.2.1R | FreeBSD2 |-- |IBM-PC互換機 | Pentium(90MHz)| FreeBSD-2.0R | POWER520 |IBM |Power-station-520| RS-6000 | AIX 3.0 | HP9000 |HP |HP9000/720GRX | PA-RISC | HP-UX-A.09.01 | 以下のファイル(bat main.c mm.c qs6.c mps.c)を作って bat を走らせてみて下さい。 mps が qsort や qs6 より遅い場合はお知らせ下さい。 ------- bat -------------------------------------------------------------- #!/bin/csh echo `hostname` cc -O -c qs6.c cc -O -c mps.c cc -O -o qsort -DSORT=qsort main.c cc -O -o qs6 -DSORT=qsort main.c qs6.o cc -O -o mps -DSORT=msort main.c qs6.o mps.o time ./qsort 1 100000 80 # key=乱数 個数=100000 要素=80byte time ./qs6 1 100000 80 time ./mps 1 100000 80 ; echo " " time ./qsort 1 40000 200 # key=乱数 個数=40000 要素=200byte time ./qs6 1 40000 200 time ./mps 1 40000 200 ; echo " " time ./qsort 1 8000 1000 # key=乱数 個数=8000 要素=1000byte time ./qs6 1 8000 1000 time ./mps 1 8000 1000 ; echo " " time ./qsort 1000 40000 200 # key=乱数%1000 個数=40000 要素=200byte time ./qs6 1000 40000 200 time ./mps 1000 40000 200 ; echo " " time ./qsort 100 40000 200 # key=乱数%100 個数=40000 要素=200byte time ./qs6 100 40000 200 time ./mps 100 40000 200 ; echo " " time ./qsort 10 40000 200 # key=乱数%10 個数=40000 要素=200byte time ./qs6 10 40000 200 time ./mps 10 40000 200 ; echo " " time ./qsort -1 40000 200 # key=昇順 個数=40000 要素=200byte time ./qs6 -1 40000 200 time ./mps -1 40000 200 ; echo " " time ./qsort -2 40000 200 # key=降順 個数=40000 要素=200byte time ./qs6 -2 40000 200 time ./mps -2 40000 200 ; echo " " ------- main.c ----------------------------------------------------------- #include unsigned int cmp_cnt, ass_cnt; die( s ) char *s; { fprintf(stderr, "***** %s *****\n", s); exit(1); } typedef struct { int key; int data; } el_t; int cmpfnc( xp, yp ) el_t *xp, *yp; {cmp_cnt++; return xp->key - yp->key;} #define KEY(i) (((el_t*)(vec+(i)*rec_siz))->key) #define DATA(i) (((el_t*)(vec+(i)*rec_siz))->data) main( argc, argv ) int argc; char **argv; { int arr_max, div_val, rec_siz, i; char *vec, *chk; if (argc != 4) die("Usage: a.out div_val arr_max rec_siz"); div_val = atoi(argv[1]); /*テストデータの種類を指定する*/ arr_max = atoi(argv[2]); /*要素の個数*/ rec_siz = atoi(argv[3]); /*要素の大きさ*/ printf("[%s] div_val=%d arr_max=%d rec_siz=%d arr=%dkB\n", argv[0],div_val, arr_max, rec_siz, arr_max*rec_siz/1000); if (rec_siz < sizeof(el_t) || 11000 < rec_siz) die("ill rec_siz"); if (rec_siz % 4) die("recsiz must be multiplier of 4"); if ((vec = (char*)malloc((arr_max+5)*rec_siz)) == NULL) die("large arr_max"); srandom( arr_max ); /*乱数の初期化*/ if (div_val == 0 ) for (i = 0; i < arr_max; i++) KEY(i)= 5; /*一定*/ if (div_val == -1) for (i = 0; i < arr_max; i++) KEY(i)= i + 1; /*昇順*/ if (div_val == -2) for (i = 0; i < arr_max; i++) KEY(i)= arr_max - i; /*降順*/ if (div_val == 1 ) for (i = 0; i < arr_max; i++) KEY(i)= random(); /*乱数*/ if (div_val >= 2 ) for (i = 0; i < arr_max; i++) KEY(i)= random()%div_val; for (i = 0; i < arr_max; i++) DATA(i) = i; /*下の検査のための準備*/ SORT( (char*)vec, arr_max, rec_siz, cmpfnc ); printf("ass=%u cmp=%u OK ", ass_cnt, cmp_cnt); /*以下でソートできたことを検査する*/ for (i = 1; i < arr_max; i++) if (KEY(i-1) > KEY(i)) die("not sorted"); chk = (char*)calloc(arr_max,1); for (i = 0; i < arr_max; i++) chk[DATA(i)] = 1; for (i = 0; i < arr_max; i++) if (chk[i] != 1) die("chk err"); } ------- mm.c ------------------------------------------------------------- /*8バイト整数があるときは、さらに高速化できる*/ static int mmkind, mmsize, high, low; extern int ass_cnt; #define A ((int*)a) #define B ((int*)b) #define C ((int*)c) #define D ((int*)d) static void mmprepare( void *base, int size ) { #ifdef DEBUG if (sizeof(int) != 4) die("sizeof(int) != 4"); if (size <= 0) die("mmsize <= 0"); #endif if ( ((int)base & (4-1)) == 0 && (size & (4-1)) == 0 ) if (size >= 16) mmkind = 1; else mmkind = 0; else mmkind = -1; mmsize = size; high = (size & (-16)); low = (size & 0x0C ); } static void mmcopy( register char *a, register char *b ) { if (mmkind >= 0) { if (mmkind > 0) { register char *t = a + high; do { A[0] = B[0]; A[1] = B[1]; A[2] = B[2]; A[3] = B[3]; a += 16; b += 16; }while (a < t); } if (low != 0) {A[0] = B[0]; if (low >= 8) {A[1] = B[1]; if (low == 12) {A[2] = B[2]; }}} }else{ register char *t = a + mmsize; do *a++ = *b++; while (a < t); } ass_cnt++; } static void mmswap( register char *a, register char *b ) { register int s; if (mmkind >= 0) { if (mmkind > 0) { register char *t = a + high; do { s = A[0]; A[0] = B[0]; B[0] = s; s = A[1]; A[1] = B[1]; B[1] = s; s = A[2]; A[2] = B[2]; B[2] = s; s = A[3]; A[3] = B[3]; B[3] = s; a += 16; b += 16; }while (a < t); } if (low != 0) { s = A[0]; A[0] = B[0]; B[0] = s; if (low >= 8) { s = A[1]; A[1] = B[1]; B[1] = s; if (low == 12) {s = A[2]; A[2] = B[2]; B[2] = s;}}} }else{ register char *t = a + mmsize; do {s = *a; *a++ = *b; *b++ = s;} while (a < t); } ass_cnt += 2; } static void mmrot3( register char *a, register char *b, register char *c ) { register int s; if (mmkind >= 0) { if (mmkind > 0) { register char *t = a + high; do { s = A[0]; A[0] = B[0]; B[0] = C[0]; C[0] = s; s = A[1]; A[1] = B[1]; B[1] = C[1]; C[1] = s; s = A[2]; A[2] = B[2]; B[2] = C[2]; C[2] = s; s = A[3]; A[3] = B[3]; B[3] = C[3]; C[3] = s; a += 16; b += 16; c += 16; }while (a < t); } if (low != 0) { s = A[0]; A[0] = B[0]; B[0] = C[0]; C[0] = s; if (low >= 8) { s = A[1]; A[1] = B[1]; B[1] = C[1]; C[1] = s; if (low == 12) {s = A[2]; A[2] = B[2]; B[2] = C[2]; C[2] = s;}}} }else{ register char *t = a + mmsize; do {s = *a; *a++ = *b; *b++ = *c; *c++ = s;} while (a < t); } ass_cnt += 3; } ------- qs6.c ------------------------------------------------------------ /*****************************************************/ /* */ /* qs6 (Quick sort function) */ /* */ /* by 河村 知行 1995.4.21 */ /* 〒745 山口県徳山市久米 徳山高専 情報電子工学科 */ /* kawamura@tokuyama.ac.jp */ /*****************************************************/ #include "mm.c" typedef struct { char *LL, *RR; } stack_node; /*L,l,R,rを積むスタックの構造体*/ #define PUSH(ll,rr) {top->LL = (ll); top->RR = (rr); ++top;} /*L,l,R,rを積む*/ #define POP(ll,rr) {--top; ll = top->LL; rr = top->RR;} /*L,l,R,rを戻す*/ #define med3(a,b,c) ((*cmp)(a,b)<0 ? \ ((*cmp)(b,c)<0 ? b : ((*cmp)(a,c)<0 ? c : a)) : \ ((*cmp)(b,c)>0 ? b : ((*cmp)(a,c)<0 ? a : c)) ) void qsort (base, nel, size, cmp) char* base; int nel; int size; int (*cmp)(); { register char *l, *r, *m; /*l,r:左右の集団の端 m:配列の中央の位置*/ register int t, eq_l, eq_r; /*eq_l:左の集団が全てsに等しいことを示す*/ char *L = base; /*現在分割している区間の左端の要素の先頭 */ char *R = &base[size * (nel - 1)]; /*現在分割している区間の右端の要素の先頭 */ int chklim = 63; /*昇(降)順検査をする要素数の下限*/ stack_node stack[32], *top = stack; /*32ビットマシンでは32で十分*/ if (nel <= 1) return; /*要素数が1以下のときは即終了する*/ mmprepare( base, size ); goto start; nxt: if (stack == top) return; /*スタックが空になったとき終了する*/ POP(L,R); /*スタックに積んである値を取り出す*/ for (;;) { start: if (L + size == R) {if ((*cmp)(L,R) > 0) mmswap(L,R); goto nxt;}/*要素数2*/ l = L; r = R; t = (r - l + size) / size; /*要素数*/ m = l + size * (t >> 1); /*配列の中央を計算*/ if (t >= 60) { register char *m1; register char *m3; if (t >= 200) { t = size*(t>>3); /*8等分したときのバイト数*/ { register char *p1 = l + t; register char *p2 = p1 + t; register char *p3 = p2 + t; m1 = med3( p1, p2, p3 ); p1 = m + t; p2 = p1 + t; p3 = p2 + t; m3 = med3( p1, p2, p3 ); } }else{ t = size*(t>>2); /*4等分したときのバイト数*/ m1 = l + t; m3 = m + t; } m = med3( m1, m, m3 ); } if ((t = (*cmp)(l,m)) < 0) { /*3-5-?*/ if ((t = (*cmp)(m,r)) < 0) { /*3-5-7の場合*/ if (chklim && nel >= chklim) { /*既に昇順か検査する*/ char *p; chklim = 0; for (p=l; p 0) goto fail; goto nxt; } fail: goto loopA; /*3-5-7*/ } if (t > 0) { if ((*cmp)(l,r) <= 0) {mmswap(m,r); goto loopA;} /*3-5-4*/ mmrot3(r,m,l); goto loopA; /*3-5-2*/ } goto loopB; /*3-5-5*/ } if (t > 0) { /*7-5-?*/ if ((t = (*cmp)(m,r)) > 0) { /*7-5-3の場合*/ if (chklim && nel >= chklim) { /*既に降順か検査する*/ char *p; chklim = 0; for (p=l; p 0) {mmswap(l,r); goto loopB;} /*5-5-3*/ /*5-5-5 のときに分割の種類を決める*/ /*5-5-5*/ for (;;) { if ((l += size) == r) goto nxt; /*5-5-5*/ if (l == m) continue; if ((t = (*cmp)(l,m)) > 0) {mmswap(l,r); l = L; goto loopA;} /*575-5*/ if (t < 0) {mmswap(L,l); l = L; goto loopB;} /*535-5*/ } loopA: eq_l = 1; eq_r = 1; /*タイプAの分割*/ /*左 <= 分割値 < 右*/ for (;;) { for (;;) { if ((l += size) == r) {l -= size; if (l != m) mmswap(m,l); l -= size; goto fin;} if (l == m) continue; if ((t = (*cmp)(l,m)) > 0) {eq_r = 0; break;} if (t < 0) eq_l = 0; } for (;;) { if (l == (r -= size)) {l -= size; if (l != m) mmswap(m,l); l -= size; goto fin;} if (r == m) {m = l; break;} if ((t = (*cmp)(r,m)) < 0) {eq_l = 0; break;} if (t == 0) break; } mmswap(l,r); /*左と右を交換する*/ } loopB: eq_l = 1; eq_r = 1; /*タイプBの分割*/ /*左 < 分割値 <= 右*/ for (;;) { for (;;) { if (l == (r -= size)) {r += size; if (r != m) mmswap(r,m); r += size; goto fin;} if (r == m) continue; if ((t = (*cmp)(r,m)) < 0) {eq_l = 0; break;} if (t > 0) eq_r = 0; } for (;;) { if ((l += size) == r) {r += size; if (r != m) mmswap(r,m); r += size; goto fin;} if (l == m) {m = r; break;} if ((t = (*cmp)(l,m)) > 0) {eq_r = 0; break;} if (t == 0) break; } mmswap(l,r); /*左と右を交換する*/ } fin: if (eq_l == 0) /*左側をソートする必要がある*/ if (eq_r == 0) /*両側をソートする必要がある*/ if (l-L < R-r) {PUSH(r,R); R = l;} /*左から先にソートする*/ else {PUSH(L,l); L = r;} /*右から先にソートする*/ else R = l; /*左側のみソートする必要がある*/ else if (eq_r == 0) L = r; /*右側のみソートする必要がある*/ else goto nxt; /*両側ともソートする必要がない*/ } } /* 次のような改良を試してみたが処理速度の向上はほとんど得られなかった。 1。要素数3のときは別処理を行う 2。要素数3のときは loopの直前で goto nxt; する 3。要素数2の区間はスタックに積まないようにする 4。要素数4を検出してloopを回避する 5。要素数4を検出してloopを回避する(4つに場合分けして処理を減らす) 6。中央に集まった連続した分割値を検出して、再ソートの要素数を減らす*/ ------- mps.c ------------------------------------------------------------ /*****************************************************/ /* */ /* mps (Multi Partition Sort) */ /* */ /* by 河村 知行 1995.5.15 */ /* 〒745 山口県徳山市久米 徳山高専 情報電子工学科 */ /* kawamura@tokuyama.ac.jp */ /*****************************************************/ #include #include "mm.c" extern unsigned int cmp_cnt; /*比較回数を数える 昇順などの判定に使う*/ #define M(x,y) {mmcopy(x,y);} /*yをxへ複写する {memcpy(x,y,size);}も可*/ #define ISZ (sizeof(int)) typedef short grp_t; /*区間番号を格納する変数の型*/ int msort( arr, nel, size, cmp ) char *arr; int nel, size; int (*cmp)(); /* arr : ソートしようとする配列へのポインタ */ /* nel : 配列arrの要素数 */ /* size: 配列arrの要素の大きさ(バイト単位) */ /* cmp : 要素の大小比較をする関数へのポインタ */ { int bunkatu; /*分割数*/ int beki; /*区間の個数(bunkatu*2) 1〜beki-1が区間番号 0は使用しない*/ int delta; /*サンプルを取り出すときの要素間の距離*/ register int i; /*iはarr中の要素の位置*/ int g; /*gは区間番号*/ int init_grp; /*要素の存在する最初の区間の番号*/ int sp; /*区間番号と要素をsave[]に保存するときのスタックポインタ*/ int Lsize = ISZ + ((size+(ISZ-1)) & ~(ISZ-1)); /*区間番号と要素を合せた大きさ*/ char *save; /*前半ではサンプルを格納する 後半では区間番号と要素を保存する*/ grp_t *arr2grp; /*arr2grp[i]は要素iの区間番号*/ int *g_icnt; /*g_icnt[g]は区間gの要素数*/ int *g_cnt; /*g_cnt[g]は区間gの処理をしていない残りの要素数*/ int *g_head; /*g_head[g]は区間gの未確定要素の先頭位置*/ cmp_cnt = 0; /*サンプルのキーの昇順/降順/同一を調べるための準備*/ ass_cnt = 0; /*mpsは要素の代入回数を返り値とする エラー時は-1を返すべき*/ if (nel < 1000 || size < 100) /*サイズがmpsに適さないときはqsortを呼び出す*/ { qsort0: qsort( arr, nel, size, cmp ); return ass_cnt; } /*分割数を決定する*/ if (nel >= 40000) bunkatu = 1024; else if (size < 200) goto qsort0; else if (nel >= 20000) bunkatu = 512; else if (nel >= 10000) bunkatu = 256; else if (nel >= 4000) bunkatu = 128; else if (nel >= 2000) bunkatu = 64; else if (nel >= 1000) bunkatu = 32; else goto qsort0; /*区間の個数を決定する bekiが32の時1〜31が区間番号となる 0は使用しない*/ beki = bunkatu << 1; if ((save = (char*)malloc(beki*Lsize ))==NULL) die("malloc save"); if ((arr2grp=(grp_t*)malloc(nel *sizeof(grp_t)))==NULL) die("malloc arr2grp"); if ((g_icnt = (int*)malloc(beki*sizeof(int) ))==NULL) die("malloc g_icnt"); if ((g_cnt = (int*)malloc(beki*sizeof(int) ))==NULL) die("malloc g_cnt"); if ((g_head = (int*)malloc(beki*sizeof(int) ))==NULL) die("malloc g_head"); mmprepare( arr, size ); delta = nel / bunkatu; /*サンプルを取り出すときの要素間の距離を決定する*/ { /*サンプルを集める*/ int s; /*sはサンプル用配列の添え字*/ for (s = 1; s < bunkatu; s++) /*サンプルを集める*/ M( save+s*size, arr+s*size*delta ); /*ここではsave[]はサンプル用の配列*/ } /*サンプルをソートする mpsを再帰呼出ししても速くならないことは実験済み*/ qsort( save+size, bunkatu-1, size, cmp ); if (cmp_cnt <= bunkatu-1+8) goto qsort0; /*キーが昇順/降順/同一*/ for (g = 1; g < beki; g++) g_icnt[g] = 0; /*要素数カウントするための初期化*/ for (i = 0; i < nel; i++) /* arr中の各要素がどの区間に入るかを調べる*/ { register int g, n, t; /*高速化のためにレジスタ宣言する*/ register char *p = arr+i*size; /*pは調べようとする要素へのポインタ*/ /************************* beki=16 の場合 *********************************/ /* 4            (2分木の根) */ /* 2 6 サンプルの添え字(g>>1の値) */ /* 1 3 5 7   (2分木の葉) */ /* ------------------------------------------- */ /* 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 区間番号(添え字)(gの値) */ /***************************************************************************/ for (g = beki>>1, n = g>>1; n >= 2; n >>= 1) if ((t = cmp( p, save+(g>>1)*size )) > 0) g += n; /*木構造を右へ進む*/ else if (t < 0) g -= n; /*木構造を左へ進む*/ else goto xx; /*==時はその区間へ*/ if ((t = cmp( p, save+(g>>1)*size )) > 0) g++; /*2分木の葉の右へ*/ else if (t < 0) g--; /*2分木の葉の左へ*/ xx: arr2grp[i] = g; /*要素iの区間番号を記憶する*/ g_icnt[g]++; /*区間gの要素数をカウントアップする*/ } /*要素の存在する最初の区間を探す*/ for (g = 1; g < beki; g++) if (g_icnt[g] != 0) break; init_grp = g; /*それを記憶する*/ { int cnt = g_icnt[g]; /*gは、要素の存在する最初の区間の番号(init_grp)*/ g_cnt[g] = cnt; /*この行では cnt は必ず != 0 である*/ i = 0; /*左端からg_headの位置を決めていく iはarr中の位置*/ for (sp = 0, g++; g < beki; g++) /*gより右の区間に空きを作る*/ { i += cnt; /*cntは直前の区間の残りの要素数*/ cnt = g_icnt[g]; /*g_icnt[g]はこの区間の要素数*/ if (cnt == 0) continue; while (cnt >= 1 && arr2grp[i] == g) cnt--,i++; /*よそ者を探す*/ g_cnt[g] = cnt; /*g_cnt[g]はこの区間の残りの要素数*/ if (cnt > 0) /*cntはよそ者などの数*/ { g_head[g] = i; /*未確定要素の塊の先頭位置を記憶する*/ *(int*)(save+sp*Lsize) = arr2grp[i]; /*退避する要素の grpid を記憶する*/ M( save+sp*Lsize+4, arr+i*size ); /*空きを作る save[]に記憶する*/ sp++; } } } if (sp == 0) goto fin; /*save[](スタック)にもう残ってないのでループ終了*/ { register int src, src_grp; /*srcは移動元のarr中の位置 src_grpはその区間番号*/ register int dst, dst_grp; /*dst_grpは移動先の区間番号 dstはそのarr中の位置*/ src = 0; src_grp = init_grp; for (;;) /*繰り返しにより要素を正しい位置に移動する*/ { dst_grp = arr2grp[src]; if (dst_grp != src_grp) { dst = g_head[dst_grp]; /*ここは必ず既に空きなっている*/ M( arr+dst*size, arr+src*size ); /*移動する*/ g_head[src_grp] = src; /*移動元が新しく空き要素となる*/ } else { dst = src; } loop: /*ここで dst と dst_grp の値が確定していなければならない*/ if (--g_cnt[dst_grp] != 0) /*未確定の要素の数を1減らす*/ { src = dst+1; /*次のソースと区間番号をセットする*/ src_grp = dst_grp; continue; } sp--; dst_grp = *(int*)(save+sp*Lsize); /*save[sp]の grp id を求める*/ dst = g_head[dst_grp]; /*必ず空きがあるはず*/ M( arr+dst*size, save+sp*Lsize+4 ); if (sp > 0) goto loop; break; } } fin: /*奇数番号の区間を一つ一つソートする*/ g = init_grp; if (g % 2 == 1) i = 0; else i = g_icnt[g++]; for (;;) { int icnt = g_icnt[g]; /*この時点でgは必ず奇数*/ if (icnt >= 2) qsort( arr+i*size, icnt, size, cmp ); if (++g >= beki) break; i += icnt + g_icnt[g++]; /* g_icnt[g] は、しきい値自身の要素数*/ } free( g_head ); free( g_cnt ); free( g_icnt ); free( arr2grp ); free( save ); return ass_cnt; } --------------------------------------------------------------------------