この記事はQiitaに投稿されたものの転載です。


行列に興味がある。しかし数Cは廃止され、行列の勉強はしないとのことなので少しづつ自主学習。
とりあえず行列の積について理解できたのでプログラムにしてみる。ついでに前からやってみたかった並列処理とかを試して速度をはかる。

テンプレート

100次の正方行列を1000回かける。

//import略

alias int[][] matrix;

matrix matrix_product(matrix A,matrix B)
in{
	assert(A[0].length == B.length);
	}
body{
	//ここに処理を書く
	}

void init_matrix(ref matrix A){
	foreach(ref row;A){
		foreach(ref n;row){
			n = uniform(-10,10);
		}
	}
}

void main(){
	matrix A = [
		[2,-1],
		[-3,4]];
	matrix B = [
		[1],
		[2]];
	matrix C = [
		[1,-1],
		[-2,3]];
	matrix D = [
		[1,2],
		[3,4]];
	matrix E = [
		[1,2]];
	matrix F = [
		[2,-1],
		[-3,4]];
	assert(matrix_product(A,B) == [[0],[5]]);
	assert(matrix_product(C,D) == [[-2,-2],[7,8]]);
	assert(matrix_product(E,F) == [[-4,7]]);

	StopWatch sw;
	int n = 1000;
	matrix[][] testdata = new matrix[][](n,2);
	foreach(i;0..n){
		testdata[i][0] = new matrix(100,100);
		testdata[i][1] = new matrix(100,100);
		init_matrix(testdata[i][0]);
		init_matrix(testdata[i][1]);
	}
	sw.start();
	foreach(i; 0..n) matrix_product(testdata[i][0],testdata[i][1]);
	sw.stop();
	writeln(n," times done. time: ", sw.peek().msecs, "[ms]");
	}

1. 単純なやり方

prod_1.d

matrix matrix_product(matrix A,matrix B)
in{
	assert(A[0].length == B.length);
	}
body{
	matrix result = new matrix(A.length,B[0].length);
	foreach(k,row;A){
		foreach(i;0..B[0].length){
			int sum;
			foreach(j;0..row.length){
				sum += B[j][i] * row[j];
			}
			result[k][i] = sum;
		}
	}
	return result;
	}
$ dmd prod_1
$ ./prod_1
1000 times done. time: 6411[ms]
$ ./prod_1
1000 times done. time: 6389[ms]
$ ./prod_1
1000 times done. time: 6402[ms]
$ dmd prod_1 -O
$ ./prod_1
1000 times done. time: 3570[ms]
$ ./prod_1
1000 times done. time: 3511[ms]
$ ./prod_1
1000 times done. time: 3502[ms]

Optimizeつけるだけで倍くらい速くなる。

2. parallelをつける

行を並列で処理する。こんな書き方でいいのだろうか?

prod_2.d

matrix matrix_product(matrix A,matrix B)
in{
	assert(A[0].length == B.length);
	}
body{
	matrix result = new matrix(A.length,B[0].length);
	foreach(k,row;A.parallel){
		foreach(i;0..B[0].length){
			int sum;
			foreach(j;0..row.length){
				sum += B[j][i] * row[j];
			}
			result[k][i] = sum;
		}
	}
	return result;
	}
dmd prod_2
$ ./prod_2
1000 times done. time: 4143[ms]
$ ./prod_2
1000 times done. time: 4164[ms]
$ ./prod_2
1000 times done. time: 4187[ms]
$ dmd prod_2 -O
$ ./prod_2
1000 times done. time: 2420[ms]
$ ./prod_2
1000 times done. time: 2440[ms]
$ ./prod_2
1000 times done. time: 2437[ms]

CPUは4つあるのだが、高速化は半分くらいに。

3. SIMD

prod_3.d

matrix matrix_product(matrix A,matrix B)
in{
	assert(A[0].length == B.length);
	}
body{
	ulong blen = B[0].length;
	matrix result = new matrix(A.length,blen);
	foreach(k,row;A){
		int4 v;
		int[] tmp = new int[](blen);
		foreach(i,a;row){
			foreach(j;0..blen/4+1){
				v.array[0] = (j<blen)?B[i][j*4]:0;
				v.array[1] = (j+1<blen)?B[i][j*4+1]:0;
				v.array[2] = (j+2<blen)?B[i][j*4+2]:0;
				v.array[3] = (j+3<blen)?B[i][j*4+3]:0;
				v = a*v;
				tmp[j*4] += v.array[0];
				tmp[j*4+1] += v.array[1];
				tmp[j*4+2] += v.array[2];
				tmp[j*4+3] += v.array[3];
			}
		}
		result[k] = tmp;
	}
	return result;
	}

しかしこのコード動かない。

$ dmd prod_3
prod_3.d(28): Error: incompatible types for ((cast(__vector(int[4]))a) * (v)): '__vector(int[4])' and '__vector(int[4])'

そもそもSIMDに触れたのが初めてだし、この使い方が正しいのかもわからない。
OpenCLとかも使ってみたかったが今回は何もできていない。そのうち書けるようになればいいな。

まとめ

3回の計測の平均をとった。

1. 単純なやり方 1-A. -Oをつける 2. parallelをつける 2-A. -Oをつける
6401 3528 4165 2432

parallelできるところを見つけるよりも最適化オプションをつけるのを忘れないほうが速くなるのは意外だった。