社内AIプログラミングコンテストで優勝したゲームプレイAIの紹介
こんにちは、応用人工知能チームの辻本です。
最近は計算リソース、データ量、アルゴリズムの改善によって簡単に精度の高いAIが利用できるようになりつつあります。しかし、現状では全てのタスクにおいてAIを利用すればいいわけでもありませんし、リソースの制限もあるため、特性を理解して上手に応用することが重要です。そこで、社内ではAI応用のための知見や環境を積み上げていく機会を増やす取り組みを行っています。
先日、取り組みの一環でゲームプレイAIのプログラミングコンテストが開催され、約20チームが参加して盛り上がりました。このコンテストで優勝したゲームプレイAIについて紹介します。
AIによるゲームプレイ動画です。
コンテスト概要
AIにアクションゲームなどのデバッグの一部を任せられるかどうか検証したいという考えもあったので、ゲームプレイAIが対象として選ばれました。ゲームをAI用に変更せずに画面情報だけを使ってプレイさせたかったので、ゲームプレイのためのフレームワークにUniverseを利用しました。NeonRaceというFlashゲームのステージ6のスコアで順位を決定します。
Universeは様々なゲームやアプリケーションをAIにプレイさせるためのフレームワークです。ゲームはdockerコンテナで実行され、AIはゲーム毎に用意された報酬サーバとVNCを通じて、画面情報および報酬の取得とマウスやキーボードによる操作ができます。利用者は、取得した画面や報酬から次のアクションを決定するAIを作ることになります。 似たようなプロジェクトとしてSerpent AIもあります。
ゲームの起動
まずはゲームを起動してみます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
import sys import gym import universe def main(): env_id = 'flashgames.NeonRaceLvl6-v0' env = gym.make(env_id) env.configure(remotes=1) # automatically creates a local docker container observation_n = env.reset() while True: action_n = [[('KeyEvent', 'ArrowUp', False)]] observation_n, reward_n, done_n, info = env.step(action_n) return 0 if __name__ == '__main__': sys.exit(main()) |
上記コードを実行するとNeonRaceのステージ6が開始され、VNCクライアントでlocalhost:5900に接続すると実際にゲームをプレイできます。5900番ポートで接続できなかった場合は、docker psなどで待ち受けポートを調べてください。
初めてのプレイでも70000点前後は出せると思うので、70000点を最初の目標にしました。
サンプルコードの実行
A3Cのサンプルコードが提供されているので、これをベースにして改良してくことにしました。16並列で実行すると12時間程度で収束するようです。
AIが選択可能な操作から無駄なものを除いて学習させた結果は10000点程度、壁との衝突回数を減らすためにコース外を走り続けるようになりました。
性能改善の取り組み
ここからは性能改善のために取り組んだ方法をいくつか紹介します。
罰則の追加
コース内を走らせるために、コース外に出た時に罰則を与えました。
コース外に出たことを検出するために、ゲームを手動でプレイして画面のキャプチャを用意して、CNNでコース内・コース外判定を行います。98%程度の精度でコース内・コース外の判定ができるようになり、AIがコース内を走り続けるようになりました。スコアも改善して20000点です。
キャプチャは以下のコードで用意できます。同じような画像だらけになってしまうので画面のキャプチャは5フレームに1回にして、コース内・コース外それぞれの画像を2000枚ずつ用意しました。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
import sys import gym import universe from PIL import Image def main(): env_id = 'flashgames.NeonRaceLvl6-v0' env = gym.make(env_id) env.configure(remotes=1) # automatically creates a local docker container observation_n = env.reset() i = 0 j = 0 while True: action_n = [[('KeyEvent', 'ArrowUp', False)]] observation_n, reward_n, done_n, info = env.step(action_n) if i == 5: i = 0 if observation_n[0] is not None : pil_img = Image.fromarray(observation_n[0]) pil_img.save('./%d.jpg' % j) j += 1 i += 1 return 0 if __name__ == '__main__': sys.exit(main()) |
CNNは最後に掲載するコードと同じです。
状態空間の単純化
サンプルコードのモデルでは画面情報から特徴を抽出できていないように思えたので、画面情報から役に立ちそうな特徴を抽出して状態空間を単純化してみました。事前に特徴を抽出するので、サンプルコードをCNNから全結合ネットワークに変更しています。
OpenCVによる画像処理やCNNを利用して以下の特徴を抽出しました。
- コース内の自機の位置
- 妨害車両の位置
- ポイントアップアイテムの位置
これらの特徴を利用することで、壁との衝突の回避、妨害車両の回避、ポイントアップアイテムの取得を学習することを期待しましたが特に改善は見られませんでした。ネットワークを改良すると性能が向上するとは思いますが、学習に時間がかかるので後回しにした結果、コンテスト期間中にネットワークの改良には着手できませんでした。
行動を一定時間固定
AIはフレームごとに操作を決定するので、曲がっている途中に1回だけ直進してからまた曲がり始めるなどの無駄な行動が多いことが分かりました。NeonRaceではフレームごとにキー入力を変化させる必要がなかったので、操作を一定時間固定することによってカーブが曲がれるようになり、スコアが40000点に改善しました。
ルールベースAI化
AIの学習を待っている間に、自機位置を判定して壁に近づくと反対側に曲がるだけの単純なルールベースAIを作ってみると50000点が出るようになりました。
ここからは強化学習を捨てて、AIの行動ルールのチューニングに注力しました。以下のルールを調整すると、最終的に初心者と同程度の80000点を出すAIが完成しました。
- コースの曲がり具合によって、早めに曲がり始める
- ポイントアップアイテムを取得できるようにブレーキをかける
所感
今回対象としたゲームを強化学習で学習させるためには報酬の設計、特徴エンジニアリング、行動を一定時間固定するなどの対応が必要でした。しかし、手間をかけた割には学習が進みません。画面情報からの学習のためにはゲーム毎のチューニングが必要で、1つのAIで様々なゲームに対応させることは難しそうです。また、AIの学習のためにAWSのc4.4xlargeインスタンスを利用したのですが、パラメータの変更やモデルの変更のたびに12時間ほど実行させる必要があり、人間と同等の性能のAIを作るまでには大量の計算リソースが必要だと思われます。現状では性能面でもコスト面でもAIにプレイさせるデバッグよりも人間によるデバッグのほうが良さそうでした。単純なゲームであれば、簡単なルールベースAIでもそれなりの性能がでることが実感できたので、まずはルールベースAIで要件が満たせるのか検討するのも良いと思います。
コンテスト課題の選択としては反省点がありました。今回のテーマである強化学習に馴染みのない人が多く、参加者の半分くらいはサンプルコードを動かすだけで終わってしまっているようでした。幅広い人にAIを利用する機会を与えるためには少しずつステップアップしていくような課題を選ぶべきかもしれませんが、AIを利用できるタスクの知見を貯めるためには同じような課題ばかりを選ぶわけにもいきません。これからも試行錯誤しながらちょうどいい課題設定を考えていきたいと思います。
おまけ
単純なルールベースAIのコードを掲載します。本文中に掲載したキャプチャ用コードで用意した画像を自機のコース中の位置で左・中央・右に分類してleft.txt
center.txt
right.txt
にファイルパスを保存してから、python sample.py train
で学習させてください。学習後、python sample.py
を実行すればルールベースのAIがゲームプレイを開始します。
このAIはそれほど性能を要求しないので、手元のCore i7 2.8GHz、16GBのメモリを搭載したMacBook Proで動作させていました。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 |
import sys import gym import universe from universe import spaces from universe import vectorized from universe.wrappers import Vision import numpy as np import cv2 import tensorflow as tf INPUT_SHAPE=(22, 82, 1) def flatten(x): return tf.reshape(x, [-1, np.prod(x.get_shape().as_list()[1:])]) def linear(x, size, name, initializer=None, bias_init=0): w = tf.get_variable(name + "/w", [x.get_shape()[1], size], initializer=initializer) b = tf.get_variable(name + "/b", [size], initializer=tf.constant_initializer(bias_init)) return tf.matmul(x, w) + b def conv2d(x, num_filters, name, filter_size=(3, 3), stride=(1, 1), pad="SAME", dtype=tf.float32, collections=None): with tf.variable_scope(name): stride_shape = [1, stride[0], stride[1], 1] filter_shape = [filter_size[0], filter_size[1], int(x.get_shape()[3]), num_filters] fan_in = np.prod(filter_shape[:3]) fan_out = np.prod(filter_shape[:2]) * num_filters w_bound = np.sqrt(6. / (fan_in + fan_out)) w = tf.get_variable("W", filter_shape, dtype, tf.random_uniform_initializer(-w_bound, w_bound), collections=collections) b = tf.get_variable("b", [1, 1, 1, num_filters], initializer=tf.constant_initializer(0.0), collections=collections) return tf.nn.conv2d(x, w, stride_shape, pad) + b def make_graph(): x = tf.placeholder(tf.float32, shape=[None] + list(INPUT_SHAPE)) labels = tf.placeholder(tf.float32, shape=[None, 3]) l = tf.nn.relu(conv2d(x, 16, "conv1")) l = tf.nn.relu(conv2d(l, 16, "conv2")) l = tf.nn.batch_normalization(l, mean=0, variance=1.0, offset=0.0, scale=True, variance_epsilon=0.001) l = tf.nn.max_pool(l, ksize=(1, 2, 2, 1), strides=(1, 2, 2, 1), padding="SAME") l = tf.nn.relu(linear(flatten(l), 256, "linear1")) l = linear(l, 3, "linear2") softmax = tf.nn.softmax(l) cross_entropy = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(labels=labels, logits=l)) train_step = tf.train.RMSPropOptimizer(0.001).minimize(cross_entropy) correct_prediction = tf.equal(tf.argmax(l, 1), tf.argmax(labels, 1)) accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32)) return x, labels, softmax, cross_entropy, train_step, accuracy def load_img(path): img = cv2.imread(path, 0) img = cv2.resize(img, (INPUT_SHAPE[1], INPUT_SHAPE[0])) return np.reshape(img, list(INPUT_SHAPE)) def train(): input_img, labels, softmax, cross_entropy, train_step, accuracy = make_graph() data = [] label = [] print("loading data") with open("./left.txt", "r") as f: for fname in f: path = './left/%s' % fname.strip() img = load_img(path) data.append(img) label.append([1, 0, 0]) with open("./center.txt", "r") as f: for fname in f: path = './center/%s' % fname.strip() img = load_img(path) data.append(img) label.append([0, 1, 0]) with open("./right.txt", "r") as f: for fname in f: path = './right/%s' % fname.strip() img = load_img(path) data.append(img) label.append([0, 0, 1]) ids = np.random.permutation(len(data)) data = np.array(data) label = np.array(label) train_x = data[ids[:-50],:,:,:] train_y = label[ids[:-50],:] test_x = data[ids[-50:],:,:,:] test_y = label[ids[-50:],:] EPOCH = 10 BATCH_SIZE = 128 print("start training process") saver = tf.train.Saver() with tf.Session() as sess: sess.run(tf.global_variables_initializer()) for i in range(EPOCH): print("epoch %d" % (i + 1)) x = np.array_split(train_x, train_x.shape[0] / BATCH_SIZE) y = np.array_split(train_y, train_y.shape[0] / BATCH_SIZE) processed_sample = 0 for j in range(len(x)): loss, train_acc, _ = sess.run([cross_entropy, accuracy, train_step], {input_img: x[j], labels: y[j]}) processed_sample += len(x[j]) print("[%d / %d] loss = %.4f, acc = %.3f" % (processed_sample , len(train_x), loss, train_acc)) ids = np.random.permutation(len(train_x)) train_x = train_x[ids] train_y = train_y[ids] saver.save(sess, "./model/model.ckpt") pred_y = sess.run(accuracy, {input_img: test_x, labels: test_y}) print("test acc = %.3f" % pred_y) class CropScreen(vectorized.ObservationWrapper): def __init__(self, env, height, width, top=0, left=0): super(CropScreen, self).__init__(env) self.height = height self.width = width self.top = top self.left = left def _observation(self, observation): return [ob[self.top:self.top+self.height, self.left:self.left+self.width, :] if ob is not None else None for ob in observation] def main(): input_img, labels, softmax, _, _, _ = make_graph() saver = tf.train.Saver() env_id = 'flashgames.NeonRaceLvl6-v0' env = gym.make(env_id) env = Vision(env) env = CropScreen(env, 260, 640, 260, 18) env.configure(remotes=1) observation = env.reset() with tf.Session() as sess: saver.restore(sess, "./model/model.ckpt") left = right = False while True: action = [racing_vnc(left, right) for ob in observation] observation, _, _, _ = env.step(action) if observation[0] is not None : img = cv2.cvtColor(observation[0], cv2.COLOR_RGB2GRAY) img = cv2.resize(img, (INPUT_SHAPE[1], INPUT_SHAPE[0])) img = np.reshape(img, list(INPUT_SHAPE)) position = sess.run(softmax, {input_img: np.array([img])}) # コースの左、中央、右のどこにいるか判定 if np.argmax(position[0]) == 0: # 左端にいる場合 left = False right = True elif np.argmax(position[0]) == 1: # 中央にいる場合 left = right = False elif np.argmax(position[0]) == 2: # 右端にいる場合は左に曲がる left = True right = False return 0 def racing_vnc(left=False, right=False): return [spaces.KeyEvent.by_name('up', down=True), spaces.KeyEvent.by_name('left', down=left), spaces.KeyEvent.by_name('right', down=right)] if __name__ == '__main__': args = sys.argv if len(args) == 2 and args[1] == "train": train() else: main() |