2022-12-03

将棋AIを作る

Written by: @ekusiadadus

将棋AIを作る

はじめに

この記事は、将棋 AI を作ってみようという記事です。 目標は、棋譜データを学習させて、実際に対局できる(ルールに違反しない)ようにさせることとします。

20201208_1.gif

環境:

| 環境 | バージョン | | ------------ | -------------------- | --- | | OS | Windows Pro 64bit | | CPU | Ryzen 3900X | | | GPU | Geforce RTX2060Super | | | Python | 3.8.5 | | | CUDA | 10.1 | | | cuDNN | 7.6.5 | | | Chainer | 7.7.0 | | | python-shogi | 1.0.10 | |

*PC 構成を詳しく知りたい方は、以前の記事[『自作 PC 構成』]をご覧ください。 *NVIDIA 製品の NVIDIA Compute Capability 3.0 以上を使用してください。 *GCP, Colaboratory, AWS とかでも実現できると思います(たぶん)

また、この記事では Pytorch ではなく Chainer を使用します。 Pytoch でも、構築可能です。 また、別の機会があれば Pytorch で実装したものもご紹介できればと思います。

§0. 環境構築

この章では、環境構築をします。

0.1 Visual Studio ビルドツール 2015 のインストール

現状の最新の Visual Studio は、2019 ですが、2019 では正常に動作しない報告が多いので 2015 をインストールします。 image.png Microsoft Build Tools 2015 Update 3  から、インストーラをダウンロードできます。

0.2 CUDA のインストール

NVIDIAからインストーラをダウンロードできます。 2020.11.24 時点で、最新バージョンは 11.1 です。 しかし、使用するソフトウェアに対応したバージョンをインストールします。 現状 TensorFlow や Chainer などが対応したバージョンは、10.0, 10.1, (10.2)です。 私は、10.1 をインストールしました。

0.3 cuDNN のインストール

NVIDIAから、zip ファイルをダウンロードできます。 2020.11.24 時点で、最新バージョンは 8.0.5 です。 しかし、使用するソフトウェアに対応したバージョンをダウンロードします。 現状 TensorFlow や Chainer などが対応したバージョンは、7.6 以前(8.0 以降もたぶん大丈夫)です。 私は、7.6.5 で動かしています。

ファイルを解凍したら、NVIDIA GPU Computing Toolkit にファイルをコピーします 私の場合は、 C:\Program Files\NVIDIA GPU Computing Toolkit\CUDA\v10.1 にファイルをコピーしました。 image.png

画像にある、ファイル、フォルダをコピーして、上のフォルダに移します。

0.4 Chainer のインストール

Chainerをインストールします。 Chainer は、機械学習の Python フレームワークです。日本企業の株式会社 Preferred Networks が、研究、開発をしました。 現在は、FaceBook の Python フレームワークである、Pytorch に吸収されました。

0.4.0 Python のインストール

Python のインストール 今回は、Python は省略します。 Python は、3.6 以降、3.8 までであれば正常に動作します。 Anaconda でも大丈夫です。

0.4.1 その他モジュール

pip install -U pip setuptools
pip install -U jupyterlab jupyter jupyter-console jupytext spyder matplotlib numpy

0.4.2 Cupy のインストール

Chainer で、GPU を使用するためには、Cupy をインストールする必要があります。

pip install cupy-cuda101

image.png 確認: image.png エラーが出なければ大丈夫です。 エラーが出た場合、CuPy のインストールがうまくできていないです。 東京大学金子研究室に詳しく書いてあるので、エラーが出て進めない場合は参考にしてください。

0.4.3 Chainer のインストール

pip install chainer==7.7

image.png

0.4.4 Chainer の動作確認

Chainer の Githubから、サンプルを落としてきます。 image.png

cd chainer-7.7.0
python examples\mniost\train_mnist.py -g 0

オプションは、gpu の番号です。-1 で、gpu を使わずに計算できます。

20201208_2.gif

0.5 学習データ

image.png

東京大学内のサーバー、将棋コンピュータ対局場(Flood gate)のデータを使います。 後のニューラルネットワークを学習させるための棋譜を用意します。 今回は、『Flood gate』から最新の 2020 年の全棋譜データを csa 形式で落とします。 http://wdoor.c.u-tokyo.ac.jp/shogi/x/wdoor2017.7z 上は、2017 年の棋譜データですが、(web サイト更新されていない),2017→2020 にすれば落とせます。(書いていいのか…?) http://wdoor.c.u-tokyo.ac.jp/shogi/x/wdoor2020.7z 7zip で解凍してください。

全棋譜: 13 万件くらい(現状なので、今も増えています)

そのあと、学習データとして適切でないデータもたくさんあるので、手数が 50 手以上で、レーティングが 3000 以上のものに限定して、学習データとします。 ここら辺は、どんな方法でもいいです。

§1. ニューラルネットワーク

ここからは、思考部分を実装します。 具体的に、局面から指し手を予測するようにします。

コード構成

\<policynetwork>(root dir)
|    setup.py
|    train_policy.py
|    kifulist_train.txt
|    kifulist_test.txt
|    kifulist_train_1000.txt
|    kifulist_test_100.txt
|-  <model>
|          |  model_policy
|
|-  <pydlshogi>
|          |  common.py
|          |  features.py
|          |  read_kifu_.py
|          |
|          |-  <network>
|                   |  policy.py
|
|-  <utils>
|          |  fileter_csa.py
|          |  make_kifu_list.py
|          |  plot_log.py

1.1 モジュールインストール

import setuptools

setuptools.setup(
    name = 'python-dlshogi',
    version = '0.0.1',
    author = 'SudaDaisuke', # 名前
    packages = ['pydlshogi'],
    scripts = [],
)

スクリプトを別のスクリプトから、import できるように登録します。 プロジェクトのルートディレクトリで下のコマンドを打ちます。

pip install --no-cache-dir -e .

1.2 Policy Network

将棋の指し手を予測するための、ニューラルネットワークを構成します。 Alpha Goでは、打ち手を探索する「Policy Network」と局面を評価する「Value Network」という2つの深層ニューラルネットワークで構成されています。 将棋の指し手を予測するために、Alpha Goで採用された方法を改良して、13 層の畳み込みニューラルネットワークを構成します。

ニューラルネットワークの仕様

項目
フィルターサイズ3x3
中間層のフィルターサイズ192
ストライド1
パディング層1
ブーリング層1
活性化関数ReLU

1.3 Policy Network 実装

上の設計をもとに実装していきます。

pydlshogi\network\policy.py

from chainer import Chain
import chainer.functions as F
import chainer.links as L

from pydlshogi.common import *

ch = 192
class PolicyNetwork(Chain):
    def __init__(self):
        super(PolicyNetwork, self).__init__()
        with self.init_scope():
            self.l1=L.Convolution2D(in_channels = 104, out_channels = ch, ksize = 3, pad = 1)
            self.l2=L.Convolution2D(in_channels = ch, out_channels = ch, ksize = 3, pad = 1)
            self.l3=L.Convolution2D(in_channels = ch, out_channels = ch, ksize = 3, pad = 1)
            self.l4=L.Convolution2D(in_channels = ch, out_channels = ch, ksize = 3, pad = 1)
            self.l5=L.Convolution2D(in_channels = ch, out_channels = ch, ksize = 3, pad = 1)
            self.l6=L.Convolution2D(in_channels = ch, out_channels = ch, ksize = 3, pad = 1)
            self.l7=L.Convolution2D(in_channels = ch, out_channels = ch, ksize = 3, pad = 1)
            self.l8=L.Convolution2D(in_channels = ch, out_channels = ch, ksize = 3, pad = 1)
            self.l9=L.Convolution2D(in_channels = ch, out_channels = ch, ksize = 3, pad = 1)
            self.l10=L.Convolution2D(in_channels = ch, out_channels = ch, ksize = 3, pad = 1)
            self.l11=L.Convolution2D(in_channels = ch, out_channels = ch, ksize = 3, pad = 1)
            self.l12=L.Convolution2D(in_channels = ch, out_channels = ch, ksize = 3, pad = 1)
            self.l13=L.Convolution2D(in_channels = ch, out_channels = MOVE_DIRECTION_LABEL_NUM, ksize = 1, nobias = True)
            self.l13_bias=L.Bias(shape=(9*9*MOVE_DIRECTION_LABEL_NUM))

    def __call__(self, x):
        h1 = F.relu(self.l1(x))
        h2 = F.relu(self.l2(h1))
        h3 = F.relu(self.l3(h2))
        h4 = F.relu(self.l4(h3))
        h5 = F.relu(self.l5(h4))
        h6 = F.relu(self.l6(h5))
        h7 = F.relu(self.l7(h6))
        h8 = F.relu(self.l8(h7))
        h9 = F.relu(self.l9(h8))
        h10 = F.relu(self.l10(h9))
        h11 = F.relu(self.l11(h10))
        h12 = F.relu(self.l12(h11))
        h13 = self.l13(h12)
        return self.l13_bias(F.reshape(h13, (-1, 9*9*MOVE_DIRECTION_LABEL_NUM)))

1.2 学習処理

1.2.1 実装

train_policy.py

import numpy as np
import chainer
from chainer import cuda, Variable
from chainer import optimizers, serializers
import chainer.functions as F

from pydlshogi.common import *
from pydlshogi.network.policy import PolicyNetwork
from pydlshogi.features import *
from pydlshogi.read_kifu import *

import argparse
import random
import pickle
import os
import re

import logging

parser = argparse.ArgumentParser()
parser.add_argument('kifulist_train', type=str, help='train kifu list')
parser.add_argument('kifulist_test', type=str, help='test kifu list')
parser.add_argument('--batchsize', '-b', type=int, default=32, help='Number of positions in each mini-batch')
parser.add_argument('--test_batchsize', type=int, default=512, help='Number of positions in each test mini-batch')
parser.add_argument('--epoch', '-e', type=int, default=1, help='Number of epoch times')
parser.add_argument('--model', type=str, default='model/model_policy', help='model file name')
parser.add_argument('--state', type=str, default='model/state_policy', help='state file name')
parser.add_argument('--initmodel', '-m', default='', help='Initialize the model from given file')
parser.add_argument('--resume', '-r', default='', help='Resume the optimization from snapshot')
parser.add_argument('--log', default=None, help='log file path')
parser.add_argument('--lr', type=float, default=0.01, help='learning rate')
parser.add_argument('--eval_interval', '-i', type=int, default=1000, help='eval interval')
args = parser.parse_args()

logging.basicConfig(format='%(asctime)s\t%(levelname)s\t%(message)s', datefmt='%Y/%m/%d %H:%M:%S', filename=args.log, level=logging.DEBUG)

model = PolicyNetwork()
model.to_gpu()

optimizer = optimizers.SGD(lr=args.lr)
optimizer.setup(model)

# Init/Resume
if args.initmodel:
    logging.info('Load model from {}'.format(args.initmodel))
    serializers.load_npz(args.initmodel, model)
if args.resume:
    logging.info('Load optimizer state from {}'.format(args.resume))
    serializers.load_npz(args.resume, optimizer)

logging.info('read kifu start')
# 保存済みのpickleファイルがある場合、pickleファイルを読み込む
# train date
train_pickle_filename = re.sub(r'\..*?$', '', args.kifulist_train) + '.pickle'
if os.path.exists(train_pickle_filename):
    with open(train_pickle_filename, 'rb') as f:
        positions_train = pickle.load(f)
    logging.info('load train pickle')
else:
    positions_train = read_kifu(args.kifulist_train)

# test data
test_pickle_filename = re.sub(r'\..*?$', '', args.kifulist_test) + '.pickle'
if os.path.exists(test_pickle_filename):
    with open(test_pickle_filename, 'rb') as f:
        positions_test = pickle.load(f)
    logging.info('load test pickle')
else:
    positions_test = read_kifu(args.kifulist_test)

# 保存済みのpickleがない場合、pickleファイルを保存する
if not os.path.exists(train_pickle_filename):
    with open(train_pickle_filename, 'wb') as f:
        pickle.dump(positions_train, f, pickle.HIGHEST_PROTOCOL)
    logging.info('save train pickle')
if not os.path.exists(test_pickle_filename):
    with open(test_pickle_filename, 'wb') as f:
        pickle.dump(positions_test, f, pickle.HIGHEST_PROTOCOL)
    logging.info('save test pickle')
logging.info('read kifu end')

logging.info('train position num = {}'.format(len(positions_train)))
logging.info('test position num = {}'.format(len(positions_test)))

# mini batch
def mini_batch(positions, i, batchsize):
    mini_batch_data = []
    mini_batch_move = []
    for b in range(batchsize):
        features, move, win = make_features(positions[i + b])
        mini_batch_data.append(features)
        mini_batch_move.append(move)

    return (Variable(cuda.to_gpu(np.array(mini_batch_data, dtype=np.float32))),
            Variable(cuda.to_gpu(np.array(mini_batch_move, dtype=np.int32))))

def mini_batch_for_test(positions, batchsize):
    mini_batch_data = []
    mini_batch_move = []
    for b in range(batchsize):
        features, move, win = make_features(random.choice(positions))
        mini_batch_data.append(features)
        mini_batch_move.append(move)

    return (Variable(cuda.to_gpu(np.array(mini_batch_data, dtype=np.float32))),
            Variable(cuda.to_gpu(np.array(mini_batch_move, dtype=np.int32))))

# train
logging.info('start training')
itr = 0
sum_loss = 0
for e in range(args.epoch):
    positions_train_shuffled = random.sample(positions_train, len(positions_train))

    itr_epoch = 0
    sum_loss_epoch = 0
    for i in range(0, len(positions_train_shuffled) - args.batchsize, args.batchsize):
        x, t = mini_batch(positions_train_shuffled, i, args.batchsize)
        y = model(x)

        model.cleargrads()
        loss = F.softmax_cross_entropy(y, t)
        loss.backward()
        optimizer.update()

        itr += 1
        sum_loss += loss.data
        itr_epoch += 1
        sum_loss_epoch += loss.data

        # print train loss and test accuracy
        if optimizer.t % args.eval_interval == 0:
            x, t = mini_batch_for_test(positions_test, args.test_batchsize)
            y = model(x)
            logging.info('epoch = {}, iteration = {}, loss = {}, accuracy = {}'.format(optimizer.epoch + 1, optimizer.t, sum_loss / itr, F.accuracy(y, t).data))
            itr = 0
            sum_loss = 0

    # validate test data
    logging.info('validate test data')
    itr_test = 0
    sum_test_accuracy = 0
    for i in range(0, len(positions_test) - args.batchsize, args.batchsize):
        x, t = mini_batch(positions_test, i, args.batchsize)
        y = model(x)
        itr_test += 1
        sum_test_accuracy += F.accuracy(y, t).data
    logging.info('epoch = {}, iteration = {}, train loss avr = {}, test accuracy = {}'.format(optimizer.epoch + 1, optimizer.t, sum_loss_epoch / itr_epoch, sum_test_accuracy / itr_test))

    optimizer.new_epoch()

logging.info('save the model')
serializers.save_npz(args.model, model)
logging.info('save the optimizer')
serializers.save_npz(args.state, optimizer)

上のコードは、学習部分を実装しています。 具体的に、

image.png

こんな感じに実装されています。

1.2.2 学習実行

実際に学習を実行すると、こんな感じで、損失計算と、学習データから得られた結果との精度です。

image.png

§2. 将棋 AI 実装

ニューラルネットワークで、学習を終えたモデルを使って対局できるように、USI エンジンにします。

USI(Universal Shogi Interface)プロトコルとは、将棋 GUI ソフトと思考エンジンが通信をするために、Tord Romstad 氏によって考案された通信プロトコルです。 http://shogidokoro.starfree.jp/usi.html

image.png 上のように、USI プロトコルをもとに通信することで、将棋 AI を GUI 上で動かします。

フォルダ構成

\<policynetwork>(root dir)
|-  <bat>
|          |  Docbase.bat
|
|-  <pydlshogi>
|          |-  <player>
|          |  base_player.py
|          |  Docbase_player.py
|          |
|          |-  <usi>
|          |  usi.py
|          |  usi_Docbase_player.py
import numpy as np
import chainer
from chainer import serializers
from chainer import cuda, Variable
import chainer.functions as F

import shogi

from pydlshogi.common import *
from pydlshogi.features import *
from pydlshogi.network.policy import *
from pydlshogi.player.base_player import *

def greedy(logits):
    return logits.index(max(logits))

def boltzmann(logits, temperature):
    logits /= temperature
    logits -= logits.max()
    probabilities = np.exp(logits)
    probabilities /= probabilities.sum()
    return np.random.choice(len(logits), p=probabilities)

class PolicyPlayer(BasePlayer):
    def __init__(self):
        super().__init__()
        self.modelfile = r'学習したモデルのパス'
        self.model = None

    def usi(self):
        print('id name DocBase ShogiAI')
        print('option name modelfile type string default ' + self.modelfile)
        print('usiok')

    def setoption(self, option):
        if option[1] == 'modelfile':
            self.modelfile = option[3]

    def isready(self):
        if self.model is None:
            self.model = PolicyNetwork()
            self.model.to_gpu()
        serializers.load_npz(self.modelfile, self.model)
        print('readyok')

    def go(self):
        if self.board.is_game_over():
            print('bestmove resign')
            return

        features = make_input_features_from_board(self.board)
        x = Variable(cuda.to_gpu(np.array([features], dtype=np.float32)))

        with chainer.no_backprop_mode():
            y = self.model(x)

            logits = cuda.to_cpu(y.data)[0]
            probabilities = cuda.to_cpu(F.softmax(y).data)[0]

        # 全ての合法手について
        legal_moves = []
        legal_logits = []
        for move in self.board.legal_moves:
            # ラベルに変換
            label = make_output_label(move, self.board.turn)
            # 合法手とその指し手の確率(logits)を格納
            legal_moves.append(move)
            legal_logits.append(logits[label])
            # 確率を表示
            print('info string {:5} : {:.5f}'.format(move.usi(), probabilities[label]))

        # 確率が最大の手を選ぶ(グリーディー戦略)
        selected_index = greedy(legal_logits)
        # 確率に応じて手を選ぶ(ソフトマックス戦略)
        selected_index = boltzmann(np.array(legal_logits, dtype=np.float32), 0.5)
        bestmove = legal_moves[selected_index]

        print('bestmove', bestmove.usi())

上のように、AI の情報や局面ごとの最善手を計算していきます。

§3. GUI ソフトで動かす

上で作ったモデルと実際に対局します。 GUI ソフトはいくつかありますが、将棋所を使用します。 20201208_3.gif

上のように、登録することができます。

§4. 参考

参考URL
将棋 AI で学ぶディープラーニングhttps://book.mynavi.jp/ec/products/detail/id=88752
Alpha Go の論文https://www.nature.com/articles/nature16961

§5. 終わりに

いかがでしょうか? 私は、ボードゲームが趣味で将棋、囲碁、チェス等ももちろん好きです。

チェスは、1997 年に、当時のチャンピオン Гaрри Каспaров さんが、IBM 社のディープブルーに敗れました。 私がまだ生まれていない時代から、AI vs 人間の戦いが始まっていたことを知ったときは、驚愕しました。 そして、あと 10 年は人間に勝てないといわれていた、囲碁も 2016 年に李 世乭 さんが、Google 傘下の Demis Hassabis さんが率いる DeepMind 社の Alpha Go に敗れました。

今回の、将棋 AI も Alpha Go を参考にしています。

そして、将棋も 2010 年代からずっと、Bonanza といわれる将棋ソフトの強さは注目されていました。 Bonanza には、モンテカルロ法が採用されていました。 当時から、将棋プロでさえ、Bonanza を一目おいていて、当時の竜王である渡辺明さんと Bonanza の公開対局が催されたりしました。(渡辺明竜王(当時)の逆転勝ち)

2016 年,Alpha Go の論文が公開されると、すぐに Alpha Zero と呼ばれる将棋 AI が誕生しました。 現在の、将棋界は藤井聡太 2 冠をはじめとして、若年世代を筆頭に、世代を問わず将棋 AI を用いた研究が盛んになっています。

コンピュータ将棋大会も、開かれていて、将棋 AI の強さを競う大会もあります。

ぜひ、AI、機械学習等に興味を持ってもらえると嬉しいです。