浅野直樹の学習日記

この画面は、簡易表示です

2021 / 8月

Twitterで好きな画像を収集して自動でリツイートするbotを作る

夏休みの課題としてTwitterで好きな画像を収集して自動でリツイートするbotを作ろうと思い立ちました。

機械学習やTwitterのAPIに習熟するいい機会になりました。

一応の完成形にたどり着いたのでそのやり方を共有します。

 

1.画像を取得する対象ツイートの開始時刻と終了時刻を設定する

例えば毎日午前6時から過去24時間分の画像を収集するなら次のようにします。

from datetime import date, time, datetime, timezone, timedelta

#開始時刻と終了時刻の設定
today = date.today()
time = time(6)
end_time = datetime.combine(today, time, tzinfo=timezone(timedelta(hours=9)))
start_time = end_time - timedelta(hours=24)

#開始時刻と終了時刻の確認
print('start_time:', start_time.isoformat())
print('end_time:', end_time.isoformat())

午前6時を10分ほど過ぎてから実行しないとエラーになります。

start_time: 2021-08-30T06:00:00+09:00
end_time: 2021-08-31T06:00:00+09:00

このような表示になれば成功です。日付の部分は実行した日によって変わります。

 

2.Twitter API v2で画像を一括取得する

開始時刻から終了時刻までのツイートの中で検索ワードにマッチして画像が含まれるオリジナルツイート(リツイートではないツイート)を網羅的に取得します。

import requests
import os
import json

#定数の設定
bearer_token = "YOUR_BEARER_TOKEN"
search_url = "https://api.twitter.com/2/tweets/search/recent"
query_params = {
    'query': 'テスト has:images -is:retweet',
    'expansions': 'attachments.media_keys',
    'media.fields': 'url',
    'max_results': 100,
    'start_time': start_time.isoformat(),
    'end_time': end_time.isoformat(),
}

#画像URLをキー、そのツイートIDを値として格納するディクショナリを作成
results = {}

#認証用の関数
def bearer_oauth(r):
    r.headers["Authorization"] = f"Bearer {bearer_token}"
    r.headers["User-Agent"] = "v2RecentSearchPython"
    return r

#検索エンドポイントに接続して必要なデータを取得する関数
def connect_to_endpoint(url, params):
    has_next = True
    while has_next:
        #APIを叩いて結果を取得
        response = requests.get(url, auth=bearer_oauth, params=params)

        #ステータスコードが200以外ならエラー処理
        print(response.status_code)
        if response.status_code != 200:
            raise Exception(response.status_code, response.text)

        #responseからJSONを取得
        json_response = response.json()

        #ツイートをループしてmedia_keyをキー、ツイートIDを値とするディクショナリを作成
        media_tweet_dict = {}
        for tweet in json_response['data']:
            if 'attachments' in tweet.keys():
                for media_key in tweet['attachments']['media_keys']:
                    media_tweet_dict[media_key] = tweet['id']

        #メディアのループ
        for image in json_response['includes']['media']:
            try:
                results[image['url']] = media_tweet_dict[image['media_key']]
            except:
                pass

        #次のページがあるかどうかを確かめ、あればquery_paramsにnext_tokenを追加
        has_next = 'next_token' in json_response['meta'].keys()
        if has_next:
            query_params['next_token'] = json_response['meta']['next_token']

#実行
connect_to_endpoint(search_url, query_params)

#結果の確認
print(results)

YOUR_BEARER_TOKENという部分を自分のBearer Tokenに書き換えて、「テスト」という部分を検索したいワードに書き換えます。

うまく動けば、200という成功を表わすステータスコードと、画像URLをキー、そのツイートIDを値とするディクショナリの内容が表示されます。

Twitter API v2で画像を一括取得する – 浅野直樹の学習日記と大枠は同じです。

画像URLとツイートIDを直接紐付けることはできないので、media_keyを中間に介在させなければならないのがやや面倒です。

 

3.収集した画像にターゲットが写っているかを機械学習で判定

これも結論を載せます。

import pickle
from skimage import io, color
from skimage.transform import resize
import numpy as np
import matplotlib.pyplot as plt

#画像データとツイートIDを保存するリストをそれぞれ作成
data = []
tweet_ids = []

#事前に保存したパイプラインとしきい値をロードする
filename = 'finalized_pipe_and_threshold.sav'
loaded = pickle.load(open(filename, 'rb'))

#画像の読み込み
for k, v in results.items():
    image = io.imread(k)
    if image.ndim == 2:
        continue
    if image.shape[2] == 4:
        image = color.rgba2rgb(image)
    resized_image = resize(image, (64, 64))
    data.append(resized_image)
    tweet_ids.append(v)

#画像データを変形して機械学習の適用
X = np.array(data).reshape(len(data), -1)
y = loaded['pipe'].decision_function(X) >= loaded['threshold']

#Trueと判定されたデータとツイートIDだけを取得
true_data = [data[i] for i in range(len(data)) if y[i] == True]
ids_to_retweet = [tweet_ids[i] for i in range(len(data)) if y[i] == True]

#描画領域の確保
fig, axes = plt.subplots(10, 10, figsize=(15, 15), subplot_kw={'xticks':[], 'yticks':[]})

#Trueと判定された画像の表示
for i, ax in enumerate(axes.flat):
    ax.imshow(true_data[i])
    if i+1 >= len(true_data): break

機械学習は事前に済ませてpickleで保存しているという前提です。

詳細はscikit-learnで機械学習をして好きな画像を自動で分類する – 浅野直樹の学習日記をご参照ください。

ここではTrueと判定された画像を表示させて目で見て確認していますが、実際に運用する際はその必要はありません。

#Falseと判定されたデータだけを取得
false_data = [data[i] for i in range(len(data)) if y[i] == False]

#描画領域の確保
fig, axes = plt.subplots(10, 10, figsize=(15, 15), subplot_kw={'xticks':[], 'yticks':[]})

#Falseと判定された画像の表示
for i, ax in enumerate(axes.flat):
    ax.imshow(false_data[i])
    if i+1 >= len(false_data): break

こうすればFalseと判定された画像だけを表示することもできます。

 

4.見つけた画像の自動リツイート

いよいよ最後のプロセスです。

from requests_oauthlib import OAuth1Session
import os
import json

#定数の設定
consumer_key = "YOUR_CONSUMER_KEY"
consumer_secret = "YOUR_CONSUMER_SECRET"
access_token = "YOUR_ACCESS_TOKEN"
access_token_secret = "YOUR_ACCESS_TOKEN_SECRET"
user_id = "YOUR_USER_ID"

#リクエストの作成
oauth = OAuth1Session(
    consumer_key,
    client_secret=consumer_secret,
    resource_owner_key=access_token,
    resource_owner_secret=access_token_secret,
)

#リツイートの実行
for tweet_id in ids_to_retweet:
    payload = {"tweet_id": tweet_id}
    response = oauth.post("https://api.twitter.com/2/users/{}/retweets".format(user_id), json=payload)
    if response.status_code != 200:
        raise Exception("Request returned an error: {} {}".format(response.status_code, response.text))
    print("Response code: {}".format(response.status_code))

Twitter-API-v2-sample-code/retweet_a_tweet.py at main · twitterdev/Twitter-API-v2-sample-codeを大幅に簡略化しました。

定数の部分は自分の値に設定してください。

これでリツイートした回数分だけ「Response code: 200」と表示されるはずです。

 

5.さいごに

ここまで来る道のりは長かったです。何度も挫折しそうになりました。

この記事では解説のためにコードを分割しましたが、本番では一つのコードにまとめて、cronで定期的に自動実行します。

一歩ずつ進めば好きな画像を自動で収集してリツイートするbotを作ることができます。



Twitter API v2で画像を一括取得する

Twitterで画像を一括取得する方法です。

pythonでTwitter API v2を直接触ります。

 

1.前提

pythonの実行環境は各自で用意してください。私はjupyter notebookを使っています。

Twitter Developerの登録を済ませてBearer Tokenを取得していることを前提とします。

v1.1を使っていた人は、以下のリンクを参考にしてv2が使えるようになっているかを確認してください。

Twitter API V2を利用しようとしたらハマったこと | 日々機械的に考える

 

2.Recent searchの簡単なテスト

フリーで使えるRecent searchを使います。

きちんと認証して検索できているかを簡単にテストします。

Twitter-API-v2-sample-code/recent_search.py at main · twitterdev/Twitter-API-v2-sample-codeをアレンジしました。

import requests
import os
import json

#定数の設定
bearer_token = "YOUR_BEARER_TOKEN"
search_url = "https://api.twitter.com/2/tweets/search/recent"
query_params = {'query': '(from:twitterdev -is:retweet) OR #twitterdev','tweet.fields': 'author_id'}

#認証用の関数
def bearer_oauth(r):
    r.headers["Authorization"] = f"Bearer {bearer_token}"
    r.headers["User-Agent"] = "v2RecentSearchPython"
    return r

#検索エンドポイントに接続してJSONを取得する関数
def connect_to_endpoint(url, params):
    response = requests.get(url, auth=bearer_oauth, params=params)
    print(response.status_code)
    if response.status_code != 200:
        raise Exception(response.status_code, response.text)
    return response.json()

#JSON取得の実行
json_response = connect_to_endpoint(search_url, query_params)
print(json_response)

YOUR_BEARER_TOKENのところに自分のBearer Tokenをそのまま記載(ハードコード)するのが手軽です。セキュリティが気になる人は適当に処理してください。

うまくいっていれば、200というステータスコードと、JSON形式のauthor_id, id, textのデータが表示されます。

 

3.画像URLの取得

画像のURLを取得するためには、query_paramsを次のように設定します。

query_params = {'query': 'テスト has:images', 'expansions': 'attachments.media_keys', 'media.fields': 'url'}

expansionsとmedia.fieldsの両方を指定しなければなりません。リレーショナルデータベースのように、まずexpansionsでメディアのキー(ID)を取得して、そのキー(ID)からURLを取得しているのでしょう。この仕組みを理解するまでに数時間かかりました。

他の部分は上でテストしたときと同じです。

これで「テスト」というキーワードで検索して表示される直近のツイートのうちで画像があるものを検索し、そのURLが取得できます。「テスト」という部分を好きなワードに置き換えればそのワードで検索した結果を取得できます。

json_response['includes']['media'][0]['url']

のようにすればURLだけが得られます。

 

4.一括取得(ページネーション)

デフォルトでは1回の検索で最大10件の取得になります。’max_results’: 100を設定することでこれを100件にしましょう。

query_params = {'query': 'テスト has:images', 'expansions': 'attachments.media_keys', 'media.fields': 'url', 'max_results': 100}

結果が100件を超える場合は、JSONのmetaという部分に含まれるnext_tokenを利用します。

query_paramsにnext_tokenを設定すれば、その続きから結果を取得できます。

import requests
import os
import json

#定数の設定
bearer_token = "YOUR_BEARER_TOKEN"
search_url = "https://api.twitter.com/2/tweets/search/recent"
query_params = {'query': 'テスト has:images', 'expansions': 'attachments.media_keys', 'media.fields': 'url', 'max_results': 100}

#画像URLを格納するリストを作成
image_urls = []

#認証用の関数
def bearer_oauth(r):
    r.headers["Authorization"] = f"Bearer {bearer_token}"
    r.headers["User-Agent"] = "v2RecentSearchPython"
    return r

#検索エンドポイントに接続してJSONを取得する関数
def connect_to_endpoint(url, params):
    has_next = True
    while has_next:
        #APIを叩いて結果を取得
        response = requests.get(url, auth=bearer_oauth, params=params)

        #ステータスコードが200以外ならエラー処理
        if response.status_code != 200:
            raise Exception(response.status_code, response.text)

        #responseからJSONを取得してループを回し、URLを追加していく
        json_response = response.json()
        for image in json_response['includes']['media']:
            try:
                image_urls.append(image['url'])
            except:
                pass

        #次のページがあるかどうかを確かめ、あればquery_paramsにnext_tokenを追加
        has_next = 'next_token' in json_response['meta'].keys()
        if has_next:
            query_params['next_token'] = json_response['meta']['next_token']

#実行
connect_to_endpoint(search_url, query_params)

#結果の確認
print(image_urls)

【Python】Twitter API V2 でツイートを取得する | SEのプログラミングと英語の勉強ブログを参考にさせてもらいました。

rate limitsの処理はしていません。

URLを取得できないというエラーが発生したことがあったので、その部分をtry節にしました。

うまくいけば画像のURLがずらっと表示されます。

あとはそれを好きなように使ってください。

 



scikit-learnで機械学習をして好きな画像を自動で分類する

scikit-learnで機械学習をして好きな画像を自動で分類することに挑戦します。

 

例えば、Twitterで検索して表示される画像から、ターゲットが写っている画像とそうでない画像とを分類するときに使えます。

 

1.訓練用の画像データの読み込み(訓練データの用意)

何からの手段で訓練用の画像を入手し、ターゲットが写っている画像とそうでない画像とでフォルダ分けします。ここではそれぞれpositive、negativeという名前のフォルダにしてプログラムを実行しているファイルと同じ階層に入れています。

import os
from skimage import io, color
from skimage.transform import resize

#定数の設定
POS_DIRECTORY = './positive/'
NEG_DIRECTORY = './negative/'

#データを格納するディクショナリを作成
materials = {'data': [], 'target': []}

#画像の読み込みとリサイズをする関数の作成
def read_and_resize_images(directory, target_value):
    list_files = os.listdir(directory)
    for filename in list_files:
        image = io.imread(directory + filename)
        if image.ndim == 2:
            continue
        if image.shape[2] == 4:
            image = color.rgba2rgb(image)
        resized_image = resize(image, (64, 64))
        materials['data'].append(resized_image)
        materials['target'].append(target_value)

#イメージの読み込み
read_and_resize_images(POS_DIRECTORY, 1)
read_and_resize_images(NEG_DIRECTORY, 0)

#データ数の確認
print(len(materials['data']))

時間をかけて画像ファイルを読み込んだ後、そのファイルの数が表示されます。

読み込んだ画像の一部を確認してみましょう。

import matplotlib.pyplot as plt

#描画領域の確保
fig, axes = plt.subplots(10, 10, figsize=(15, 15), subplot_kw={'xticks':[], 'yticks':[]})

#確保した描画領域に読み込んだ画像の最初の50枚と最後の50枚を表示
for i, ax in enumerate(axes.flat):
    if i < 50:
        ax.imshow(materials['data'][i])
    else:
        ax.imshow(materials['data'][-i+49])

これで最初の50枚(ターゲットが写っている画像)と最後の50枚(ターゲットが写っていない画像)が表示されるはずです。

 

2.次元削減の程度の決定

上で読み込んだ画像の特徴量の数は64×64×3=12288です。PCAで次元削減をしましょう。

import numpy as np
from sklearn.decomposition import PCA

#データの整形
reshaped_data = np.array(materials['data']).reshape(len(materials['data']), -1)
print (reshaped_data.shape)

#PCAによって削減される特徴量数と失われるデータの分散との関係を表示
pca = PCA().fit(reshaped_data)
plt.plot(np.cumsum(pca.explained_variance_ratio_))
plt.xlabel('number of components')
plt.ylabel('cumulative explained variance')
plt.show()

まず、データを整形することにより、サンプル数×特徴量数(12288)の2次元配列が得られます。

次に、特徴量数と説明できる分散の割合のグラフが表示されます。このグラフを参考にして、分散の値を決めます。

今度はその分散の値でPCAを実行し、特徴量数を確かめます。分散の値を0.95にするなら次のようなコードです。

#PCAで次元削減
pca = PCA(0.95).fit(reshaped_data)
print('pca.n_components_:', pca.n_components_)

これで特徴量数が表示されます。私が試してみたデータでは306まで特徴量数を削減できました。

次元削減してから復元した画像を目で見て確かめてみます。

#次元を削減してから元に戻し、0~1の範囲にクリップする
reduced_data = pca.transform(reshaped_data)
restored_data = pca.inverse_transform(reduced_data)
clipped_data = np.clip(restored_data, 0, 1)

#描画領域の確保
fig, axes = plt.subplots(10, 10, figsize=(15, 15), subplot_kw={'xticks':[], 'yticks':[]})

#確保した描画領域に読み込んだ画像の最初の50枚と最後の50枚を表示
for i, ax in enumerate(axes.flat):
    if i < 50:
        ax.imshow(clipped_data[i].reshape(64, 64, 3), vmin=0, vmax=1)
    else:
        ax.imshow(clipped_data[-i+49].reshape(64, 64, 3), vmin=0, vmax=1)

クリップしないとClipping input data to the valid range for imshow with RGB data ([0..1] for floats or [0..255] for integers)という警告メッセージが表示されますが、画像は表示されます。ここではそのエラーを防ぐために数値が0〜1の範囲に収まるようにクリップしました。

 

3.しきい値の調節

いくらか余計な画像が混じってもいいからターゲット画像を見逃したくないということが多いでしょう。

そこでしきい値を調節して再現率(recall)を上げます。

まずは普通に学習した結果の情報を確認します。

SVCをデフォルトパラメータで使うとします。

import matplotlib.pyplot as plt
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.decomposition import PCA
from sklearn.pipeline import Pipeline
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score
from sklearn.metrics import plot_confusion_matrix
from sklearn.metrics import classification_report
from sklearn.metrics import plot_precision_recall_curve

#訓練データとテストデータに分割
reshaped_data = np.array(materials['data']).reshape(len(materials['data']), -1)
X_train, X_test, y_train, y_test = train_test_split(reshaped_data, materials['target'])

#パイプラインの構築
steps = [('pca', PCA()), ('model', SVC())]
pipe = Pipeline(steps)

#作成したモデルで学習
pipe.fit(X_train, y_train)

#学習結果のテスト
y_test_pred = pipe.predict(X_test)
print('accuracy_score:', accuracy_score(y_test, y_test_pred))

#テストの混同行列
plot_confusion_matrix(pipe, X_test, y_test)
plt.show()

#訓練の混同行列
plot_confusion_matrix(pipe, X_train, y_train)
plt.show()

#適合率(precision)・再現率(recall)・F1値の一括計算
y_train_pred = pipe.predict(X_train)
print(classification_report(y_train, y_train_pred))

#適合率―再現率曲線
plot_precision_recall_curve(pipe, X_train, y_train)
plt.show()

その結果から、目指すべき再現率(recall)を決めます。そして、その再現率に合うように、訓練データの中で見逃してもよい個数を算出します。

from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

#見逃す個数の設定
NUM_TO_MISS = 3

#訓練データの中からfalse_negativeだけをリスト内包表記で抽出
false_negative = [X_train[i] for i in range(len(X_train)) if (y_train_pred[i] == False and y_train[i] == True)]

#false_negativeに決定関数を適用した値を取得後、ソートして表示
false_negative_value = pipe.decision_function(false_negative)
sorted_false_negative_value = np.sort(false_negative_value)
print(sorted_false_negative_value, '\n\n')

#決定関数のしきい値を見逃す個数に応じて設定
y_train_pred_lower_threshold = (pipe.decision_function(X_train) >= sorted_false_negative_value[NUM_TO_MISS - 1])

#新しく設定したしきい値での適合率(precision)・再現率(recall)・F1値の一括計算
print(classification_report(y_train, y_train_pred_lower_threshold))

#新しく設定したしきい値での混同行列
disp = ConfusionMatrixDisplay(confusion_matrix(y_train, y_train_pred_lower_threshold))
disp.plot()
plt.show()

#新しく設定したしきい値での学習結果のテスト
y_test_pred_lower_threshold = (pipe.decision_function(X_test) >= sorted_false_negative_value[NUM_TO_MISS - 1])
accuracy_score(y_test, y_test_pred_lower_threshold)

#新しく設定したしきい値で学習したテストデータの混同行列
disp = ConfusionMatrixDisplay(confusion_matrix(y_test, y_test_pred_lower_threshold))
disp.plot()
plt.show()

望む結果が得られたら学習後のモデルとしきい値を保存しておきましょう。

import pickle

pipe_and_threshold = {'pipe':pipe, 'threshold':sorted_false_negative_value[NUM_TO_MISS - 1]}
filename = 'finalized_pipe_and_threshold.sav'
pickle.dump(pipe_and_threshold, open(filename, 'wb'))

 

4.未知の画像への適用

これでやりたいことができます。未知の画像に先ほど学習したモデルを適用します。

import pickle
import os
from skimage import io, color
from skimage.transform import resize
import numpy as np
import matplotlib.pyplot as plt

#定数の設定
APPLY_DIRECTORY = './apply/'

#画像データを保存するリストを作成
data = []

# 保存したパイプラインとしきい値をロードする
filename = 'finalized_pipe_and_threshold.sav'
loaded = pickle.load(open(filename, 'rb'))

#適用する画像ファイルの読み込み
list_files = os.listdir(APPLY_DIRECTORY)
for filename in list_files:
    image = io.imread(APPLY_DIRECTORY + filename)
    if image.ndim == 2:
        continue
    if image.shape[2] == 4:
        image = color.rgba2rgb(image)
    resized_image = resize(image, (64, 64))
    data.append(resized_image)

#画像データを変形して適用
X = np.array(data).reshape(len(data), -1)
y = loaded['pipe'].decision_function(X) >= loaded['threshold']

#Trueと判定されたデータだけを取得
true_data = [data[i] for i in range(len(data)) if y[i] == True]

#描画領域の確保
fig, axes = plt.subplots(10, 10, figsize=(15, 15), subplot_kw={'xticks':[], 'yticks':[]})

#Trueと判定された画像の表示
for i, ax in enumerate(axes.flat):
    ax.imshow(true_data[i])
    if i+1 >= len(true_data): break

できました。

 



scikit-learnでMNIST手書き数字の分類機械学習(3)

scikit-learnでMNIST手書き数字の分類機械学習 – 浅野直樹の学習日記scikit-learnでMNIST手書き数字の分類機械学習(2) – 浅野直樹の学習日記の続きです。

今回は、しきい値の調節、ニューラルネットワーク、アンサンブルに取り組みます。

 

1.しきい値の調節

機械学習の目的によっては、単純に正解率のスコアを上げるのではなく、正解率は下がってもよいからできるだけ漏れなく検出してほしいという場合があります。そのような場合にはしきい値を調節します。

0〜9までの10種類の数字を分類するのはややこしいので、8か8以外の数字かを分類するという二項分類の例で考えます。

from sklearn import datasets
from sklearn.model_selection import train_test_split
from sklearn import svm
from sklearn.metrics import accuracy_score
from sklearn.metrics import plot_confusion_matrix
import matplotlib.pyplot as plt

#データの読み込み
digits = datasets.load_digits()

#訓練データとテストデータに分割
X_train, X_test, y_train, y_test = train_test_split(digits.data, digits.target, shuffle=False)

#8か8以外の数字かでnumpy配列のマスク処理
y_train_8 = (y_train == 8)
y_test_8 = (y_test == 8)

#機械学習モデルの作成
model = svm.SVC()

#作成したモデルで学習
model.fit(X_train, y_train_8)

#学習結果のテスト
y_test_pred = model.predict(X_test)
print('accuracy_score:', accuracy_score(y_test_8, y_test_pred))

#混同行列
plot_confusion_matrix(model, X_test, y_test_8)
plt.show()

いつもの要領で学習をして、結果を表示させます。

0.9755555555555555

正解率のスコアが0.975…とはなかなかいいですね。

次に、訓練データの混同行列と適合率(precision)・再現率(recall)・F1値を確認してみましょう。

import matplotlib.pyplot as plt
from sklearn.metrics import plot_confusion_matrix
from sklearn.metrics import precision_score, recall_score
from sklearn.metrics import classification_report

#混同行列
plot_confusion_matrix(model, X_train, y_train_8)
plt.show()

#適合率(precision)・再現率(recall)・F1値の手計算
y_train_pred = model.predict(X_train)
precision_score = precision_score(y_train_8, y_train_pred)
recall_score = recall_score(y_train_8, y_train_pred)
f1_score = (2 * precision_score * recall_score) / (precision_score + recall_score)
print('precision_score:', precision_score)
print('recall_score:', recall_score)
print('f1_score:', f1_score, '\n\n')

#適合率(precision)・再現率(recall)・F1値の一括計算
print(classification_report(y_train_8, y_train_pred))

以下の図と文字列が表示されます。

precision_score: 1.0
recall_score: 0.8872180451127819
f1_score: 0.9402390438247012


precision recall f1-score support

False 0.99 1.00 0.99 1214
True 1.00 0.89 0.94 133

accuracy 0.99 1347
macro avg 0.99 0.94 0.97 1347
weighted avg 0.99 0.99 0.99 1347

適合率は1.0と完璧ですが、再現率は0.88程度とやや低く、本当は8なのに見逃しているものが15個とけっこうたくさんあります。

適合率が下がることは覚悟で再現率を上げてみましょう。

その前に、適合率と再現率の関係をグラフ化します。

from sklearn.metrics import plot_precision_recall_curve

#適合率―再現率曲線
plot_precision_recall_curve(model, X_train, y_train_8)
plt.show()

これだけで簡単きれいに表示されます。

ROC曲線も見てみましょう。

from sklearn.metrics import plot_roc_curve

#ROC曲線
plot_roc_curve(model, X_train, y_train_8)
plt.show()

再現率(recall)0.99を狙ってみます。訓練データの中にある133個の8のうち、先ほどは15個を見逃していましたが、見逃してもよいのは1個だけということです。

from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import numpy as np

#訓練データの中で実際には8なのに8ではないと予測したデータ(false_negative)だけをリスト内包表記で抽出
false_negative = [X_train[i] for i in range(len(X_train)) if (y_train_pred[i] == False and y_train_8[i] == True)]

#false_negativeに決定関数を適用した値を取得後、ソートして表示
false_negative_value = model.decision_function(false_negative)
sorted_false_negative_value = np.sort(false_negative_value)
print(sorted_false_negative_value, '\n\n')

#決定関数のしきい値をsorted_false_negative_valueの2番目に小さい数に設定
y_train_pred_lower_threshold = (model.decision_function(X_train) >= sorted_false_negative_value[1])

#新しく設定したしきい値での適合率(precision)・再現率(recall)・F1値の一括計算
print(classification_report(y_train_8, y_train_pred_lower_threshold))

#新しく設定したしきい値での混同行列
disp = ConfusionMatrixDisplay(confusion_matrix(y_train_8, y_train_pred_lower_threshold))
disp.plot()
plt.show()

#新しく設定したしきい値での学習結果のテスト
y_test_pred_lower_threshold = (model.decision_function(X_test) >= sorted_false_negative_value[1])
accuracy_score(y_test_8, y_test_pred_lower_threshold)

#新しく設定したしきい値で学習したテストデータの混同行列
disp = ConfusionMatrixDisplay(confusion_matrix(y_test_8, y_test_pred_lower_threshold))
disp.plot()
plt.show()

結果は以下です。

[-0.56471249 -0.5230137 -0.4710125 -0.47010167 -0.46740437 -0.35965516
-0.34701922 -0.21615345 -0.192735 -0.11733131 -0.10970646 -0.06225593
-0.04509709 -0.03724317 -0.02381575]


precision recall f1-score support

False 1.00 1.00 1.00 1214
True 0.99 0.99 0.99 133

accuracy 1.00 1347
macro avg 0.99 1.00 0.99 1347
weighted avg 1.00 1.00 1.00 1347

プログラム内のコメントに書いたとおり、false_negative(偽陰性)だけを取り出し、その決定関数の値を参考にして、しきい値を-0.523…に設定しています。

当然ながら、目論見通りに再現率(recall)0.99を達成しています。

テストデータでの結果はそこまでよくありませんが、それでも当初の結果と比べると、見逃しは11個から4個に減少しています。

plot_confusion_matrixはXとyを引数に取るため、しきい値を変更した場合の混同行列はconfusion_matrixとConfusionMatrixDisplayを併用する形で表現しています。

やりたいことはできました。

Aurélien Géron 著,下田倫大 監訳,長尾高弘 訳『scikit-learnとTensorFlowによる実践機械学習』(オライリー・ジャパン, 2018)の3章を主に参照しました。

 

2.ニューラルネットワーク

scikit-learnでお手軽にニューラルネットワークを試してみます。

from sklearn import datasets
from sklearn.model_selection import train_test_split
from sklearn.neural_network import MLPClassifier
from sklearn.metrics import accuracy_score
from sklearn.metrics import plot_confusion_matrix
import matplotlib.pyplot as plt

#データの読み込み
digits = datasets.load_digits()

#訓練データとテストデータに分割
X_train, X_test, y_train, y_test = train_test_split(digits.data, digits.target, shuffle=False)

#機械学習モデルの作成
model = MLPClassifier(verbose=True, random_state=0)

#学習
model.fit(X_train, y_train)

#学習結果のテスト
y_pred = model.predict(X_test)
print('accuracy_score:', accuracy_score(y_test, y_pred))

#混同行列
plot_confusion_matrix(model, X_test, y_test)
plt.show()

いつもの手順でモデルにMLPClassifierを指定しただけです。ニューラルネットワークの雰囲気を醸し出すためにverbose=Trueにしています。よって以下のように出力されます。

Iteration 1, loss = 11.97505127
Iteration 2, loss = 5.72047413
Iteration 3, loss = 3.28455794
(中略)
Iteration 133, loss = 0.00351769
Iteration 134, loss = 0.00345282
Iteration 135, loss = 0.00341083
Training loss did not improve more than tol=0.000100 for 10 consecutive epochs. Stopping.
accuracy_score: 0.9244444444444444

サンプル数が少ないためか、それほどよくない結果ですね。

 

3.アンサンブル

アンサンブルも試してみます。

#データ関係
from sklearn import datasets
from sklearn.model_selection import train_test_split
#学習モデル関係
from sklearn.ensemble import VotingClassifier
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.linear_model import LogisticRegressionCV
from sklearn.ensemble import RandomForestClassifier
from sklearn.neural_network import MLPClassifier
#結果の表示
from sklearn.metrics import accuracy_score
from sklearn.metrics import plot_confusion_matrix
import matplotlib.pyplot as plt

#データの読み込み
digits = datasets.load_digits()

#訓練データとテストデータに分割
X_train, X_test, y_train, y_test = train_test_split(digits.data, digits.target, shuffle=False)

#スケーリング
X_train = X_train / 16
X_test = X_test / 16

#機械学習モデルの作成
svc = SVC(probability=True)
kn = KNeighborsClassifier()
lr = LogisticRegressionCV(max_iter=500)
rf = RandomForestClassifier()
mlp = MLPClassifier(max_iter=500)

#アンサンブルモデルの作成
hard_voting = VotingClassifier(
estimators=[('svc', svc), ('kn', kn), ('lr', lr), ('rf', rf), ('mlp', mlp)],
voting='hard')

soft_voting = VotingClassifier(
estimators=[('svc', svc), ('kn', kn), ('lr', lr), ('rf', rf), ('mlp', mlp)],
voting='soft')

#学習
hard_voting.fit(X_train, y_train)
soft_voting.fit(X_train, y_train)

#学習結果のテスト
y_hard_pred = hard_voting.predict(X_test)
y_soft_pred = soft_voting.predict(X_test)
print('hard_voting accuracy_score:', accuracy_score(y_test, y_hard_pred))
print('soft_voting accuracy_score:', accuracy_score(y_test, y_soft_pred))

#混同行列
plot_confusion_matrix(hard_voting, X_test, y_test)
plt.show()
plot_confusion_matrix(soft_voting, X_test, y_test)
plt.show()

#個別のモデル
estimators = [svc, kn, lr, rf, mlp]
for estimator in estimators:
estimator.fit(X_train, y_train)
y_individual_pred = estimator.predict(X_test)
print(estimator.__class__.__name__, 'accuracy_score:', accuracy_score(y_test, y_individual_pred))

soft_votingのためにSVC(probability=True)としています。また、警告表示を見て、LogisticRegressionCVとMLPClassifierにはmax_iter=500と設定しました。

結果は以下です。

hard_voting accuracy_score: 0.9488888888888889
soft_voting accuracy_score: 0.9444444444444444

SVC accuracy_score: 0.9488888888888889
KNeighborsClassifier accuracy_score: 0.9644444444444444
LogisticRegressionCV accuracy_score: 0.9288888888888889
RandomForestClassifier accuracy_score: 0.9333333333333333
MLPClassifier accuracy_score: 0.9244444444444444

アンサンブルにしたからといって劇的にスコアがよくなるというわけではなさそうです。

 

 



scikit-learnでMNIST手書き数字の分類機械学習(2)

scikit-learnでMNIST手書き数字の分類機械学習 – 浅野直樹の学習日記の続きです。

今回はモデルやパラメータの選択に挑戦してみます。

その前に、次元削減、パイプライン、交差検証の練習をしておきます。

 

1.次元削減

今回は学習を何度も実行するので、その学習にかかる時間を短くできるように特徴量の次元削減をします。

import matplotlib.pyplot as plt
import numpy as np
from sklearn import datasets
from sklearn.decomposition import PCA

#データの読み込み
digits = datasets.load_digits()

#PCAによって削減される特徴量数と失われるデータの分散との関係を表示
pca = PCA().fit(digits.data)
plt.plot(np.cumsum(pca.explained_variance_ratio_))
plt.xlabel('number of components')
plt.ylabel('cumulative explained variance')

以下のような図が表示されます。

コードはIn Depth: Principal Component Analysis | Python Data Science Handbookから流用させてもらいました。

この図を見ると、元のデータの分散の0.95を説明できるようにしても特徴量の数を半分以下にできそうです。

実際に試してみましょう。

#PCAで次元削減
pca = PCA(0.95).fit(digits.data)
reduced_data = pca.transform(digits.data)
restored_data = pca.inverse_transform(reduced_data)
print('pca.n_components_:', pca.n_components_)
print('digits.data.shape:', digits.data.shape)
print('reduced_data.shape:', reduced_data.shape)
print('restored_data.shape:', restored_data.shape)

####次元削減をしてから元に戻した手書き数字データの画像表示####
#描画領域の確保
fig, axes = plt.subplots(10, 10, figsize=(15, 15), subplot_kw={'xticks':[], 'yticks':[]}, gridspec_kw=dict(hspace=0.5, wspace=0.5))

#最初の100枚の確保した描画領域に表示
for i, ax in enumerate(axes.flat):
    ax.imshow(restored_data[i].reshape(8, 8), cmap=plt.cm.gray_r, interpolation='nearest')
    ax.set_title(digits.target[i])

次のような文字列と画像が出力されます。

pca.n_components_: 29
digits.data.shape: (1797, 64)
reduced_data.shape: (1797, 29)
restored_data.shape: (1797, 64)

特徴量の次元数を64から29に減少させたたのに、画像を目視で確認すると次元削減をしなかった前回と比べても変化に気づかないくらいです。

なお、ここで言っている次元数は特徴量の次元数のことであって、配列の次元数ではないことにご注意ください。機械学習をするための配列の次元数は常に2です。

 

2.パイプライン

これから特徴量の次元数を削減してからいろいろなモデルで学習させることになるので、特徴量の次元削減とモデルとをパイプラインでくっつけます。

最初に一度だけデータ全体の特徴量の次元削減をしてからいろいろなモデルで学習させればよいのではないかと思う人もいるかもしれませんが、次元削減は訓練データにだけするようにしないと情報がリークして学習に悪影響を及ぼしてしまうので、そうしてはいけません。

from sklearn.model_selection import train_test_split
from sklearn import svm
from sklearn.pipeline import Pipeline
from sklearn.metrics import accuracy_score

#訓練データとテストデータに分割
X_train, X_test, y_train, y_test = train_test_split(digits.data, digits.target, shuffle=False)

#パイプラインの構築
steps = [('pca', PCA()), ('model', svm.SVC())]
pipe = Pipeline(steps)

#構築したパイプラインで学習
pipe.fit(X_train, y_train)

#学習結果のテスト
y_model = pipe.predict(X_test)
accuracy_score(y_test, y_model)

結果は以下です。

0.9622222222222222

前回と似たような値になりました。

 

3.交差検証

これまでは1回のテストでスコア(正解率)を計算してきましたが、訓練データとテストデータの分け方によってその値が変わります。

そこで、交差検証を導入して複数回のテストを行い、より信頼できる値を出してみます。

from sklearn.model_selection import cross_val_score

#交差検証
scores = cross_val_score(pipe, digits.data, digits.target, cv=3)
print('scores:', scores)
print('average_score:', scores.mean())

出力は以下です。

scores: [0.96994992 0.98330551 0.96994992]
average_score: 0.9744017807456872

scikit-learnのおかげで簡単にできました。

 

4.モデル選択

いよいよ本題のモデル選択です。scikit-learnには全てのモデルを返してくれるall_estimators()関数があるのでこれを使いましょう。

もう一度データの読み込みからやり直して一気に行きます。

import warnings
import pandas as pd
from sklearn import datasets
from sklearn.utils import all_estimators
from sklearn.decomposition import PCA
from sklearn.pipeline import Pipeline
from sklearn.model_selection import cross_val_score

#警告の非表示
warnings.filterwarnings('ignore')

#結果を格納するデータフレームの作成
df = pd.DataFrame(columns=['model_name', 'score1', 'score2', 'score3', 'average_score'])

#データの読み込み
digits = datasets.load_digits()

#全てのモデルで学習
for (name, algorithm) in all_estimators(type_filter="classifier"):
    try:
        steps = [('pca', PCA()), ('model', algorithm())]
        pipe = Pipeline(steps)
        scores = cross_val_score(pipe, digits.data, digits.target, cv=3)
        df.loc[len(df)+1] = [name, *scores, scores.mean()]
    except:
        pass

#スコアのいい順に並べ替えて表示
sorted_df = df.sort_values('average_score', ascending=False)
sorted_df.set_axis(range(1, (len(df)+1) ), axis='index')

気になる結果を表示します。

model_name score1 score2 score3 average_score
1 SVC 0.969950 0.983306 0.969950 0.974402
2 KNeighborsClassifier 0.958264 0.963272 0.966611 0.962716
3 ExtraTreesClassifier 0.949917 0.956594 0.943239 0.949917
4 NuSVC 0.943239 0.956594 0.936561 0.945465
5 MLPClassifier 0.921536 0.951586 0.933222 0.935448
6 LogisticRegressionCV 0.923205 0.951586 0.928214 0.934335
7 RandomForestClassifier 0.924875 0.929883 0.933222 0.929327
8 LogisticRegression 0.924875 0.934891 0.923205 0.927657
9 SGDClassifier 0.918197 0.934891 0.901503 0.918197
10 CalibratedClassifierCV 0.914858 0.934891 0.903172 0.917641
11 HistGradientBoostingClassifier 0.911519 0.928214 0.908180 0.915971
12 LinearDiscriminantAnalysis 0.926544 0.911519 0.906511 0.914858
13 PassiveAggressiveClassifier 0.899833 0.934891 0.903172 0.912632
14 LinearSVC 0.903172 0.929883 0.888147 0.907067
15 RidgeClassifierCV 0.921536 0.911519 0.879800 0.904285
16 RidgeClassifier 0.923205 0.906511 0.879800 0.903172
17 GradientBoostingClassifier 0.898164 0.881469 0.906511 0.895381
18 NearestCentroid 0.891486 0.881469 0.881469 0.884808
19 BaggingClassifier 0.894825 0.871452 0.846411 0.870896
20 GaussianNB 0.881469 0.851419 0.863105 0.865331
21 Perceptron 0.869783 0.874791 0.789649 0.844741
22 QuadraticDiscriminantAnalysis 0.881469 0.816361 0.813022 0.836950
23 DecisionTreeClassifier 0.833055 0.766277 0.756260 0.785198
24 BernoulliNB 0.757930 0.777963 0.786311 0.774068
25 ExtraTreeClassifier 0.621035 0.535893 0.580968 0.579299
26 AdaBoostClassifier 0.292154 0.345576 0.230384 0.289371
27 GaussianProcessClassifier 0.100167 0.101836 0.101836 0.101280
28 LabelPropagation 0.100167 0.098497 0.098497 0.099054
29 LabelSpreading 0.100167 0.098497 0.098497 0.099054
30 DummyClassifier 0.091820 0.088481 0.091820 0.090707
31 CategoricalNB NaN NaN NaN NaN
32 ComplementNB NaN NaN NaN NaN
33 MultinomialNB NaN NaN NaN NaN

うまく表示されました。

このようにランキング形式で表示させるのに苦労しました。pandasを使ういい練習になりました。

jupyter notebook上には先ほどのコードを実行するだけで結果がきれいに表示されますが、このブログ記事に載せるためにはto_html()メソッドを使ってhtmlにしました。

all_estimators()関数で取得できるモデルの中にはエラーが発生するものもあるのでtry節の中に入れています。

エラーまでは発生しなくても警告が表示されるものもたくさんあるので警告の表示をしないようにもしました。

本来はこのようにしてテストデータを見てしまうのではなく、Choosing the right estimator — scikit-learn 0.24.2 documentationなどを参考にしながらモデルを選ぶべきなのでしょう。

 

5.グリッドサーチで最適なパラメータの選択

次は一番成績のよいモデルだったSVCの最適なパラメータを探りましょう。

グリッドサーチで交差検証をしてくれるGridSearchCVに頼ります。

from sklearn.model_selection import train_test_split
from sklearn import svm
from sklearn.pipeline import Pipeline
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import accuracy_score
import seaborn as sns

#訓練データとテストデータに分割
X_train, X_test, y_train, y_test = train_test_split(digits.data, digits.target, shuffle=False)

#パイプラインの構築
steps = [('pca', PCA()), ('model', svm.SVC())]
pipe = Pipeline(steps)

#グリッドサーチのパラメータの設定
param_grid = {
    'model__C': [0.1, 1, 10, 100, 1000],
    'model__gamma': [0.00001, 0.0001, 0.001, 0.01, 0.1]
}

#グリッドサーチの実行
grid_search = GridSearchCV(pipe, param_grid, cv=3)
grid_search.fit(X_train, y_train)

#結果の表示
print('cv_score:', grid_search.best_score_)
print('best parameters:', grid_search.best_params_)

#テストデータへの適用と結果の表示
y_best_model = grid_search.predict(X_test)
print('test_score:', accuracy_score(y_test, y_best_model))

#結果の可視化
cv_result = pd.DataFrame(grid_search.cv_results_)
cv_result = cv_result[['param_model__gamma', 'param_model__C', 'mean_test_score']]
cv_result_pivot = cv_result.pivot_table('mean_test_score', 'param_model__gamma', 'param_model__C')
heat_map = sns.heatmap(cv_result_pivot, cmap='viridis', annot=True)

結果は以下の通りです。

cv_score: 0.96362286562732
best parameters: {'model__C': 10, 'model__gamma': 0.001}
test_score: 0.9688888888888889

このあたりはAndreas C.Müller, Sarah Guido 著,中田秀基 訳『Pythonではじめる機械学習 : scikit-learnで学ぶ特徴量エンジニアリングと機械学習の基礎』(オライリー・ジャパン, 2017)を大いに参照しました。

 

6.自分で書いたオリジナルの手書き数字で実験

前回同様自分で書いたオリジナルの手書き数字で実験してみます。

import os
from skimage import io, color
from skimage.transform import resize
import numpy as np
import matplotlib.pyplot as plt

#オリジナルデータを格納するリストの作成
X_original = []
y_original = []

#ディレクトリから画像の読み込み
image_files = os.listdir('./original_images/')
for filename in image_files:
    image = io.imread('./original_images/' + filename)
    inverted_image = np.invert(image)
    gray_image = color.rgb2gray(inverted_image)
    resized_image = resize(gray_image, (8, 8))
    scaled_image = resized_image * 16
    X_original.append(scaled_image)
    y_original.append(int(filename[0]))

#画像データを機械学習に適した形にする
X_original_data = np.array(X_original).reshape(10, -1)
y_original_target = np.array(y_original)

#以前に作成したモデルでオリジナルデータの予測
y_original_model = grid_search.predict(X_original_data)

#描画領域の確保
fig, axes = plt.subplots(1, 10, figsize=(15, 15), subplot_kw={'xticks':[], 'yticks':[]}, gridspec_kw=dict(hspace=0.5, wspace=0.5))

#オリジナルデータを「予測値→真の値」というラベルとともに表示
for i, ax in enumerate(axes.flat):
    ax.imshow(X_original_data[i].reshape(8, 8), cmap=plt.cm.gray_r, interpolation='nearest')
    label = str(y_original_model[i]) + '→' + str(y_original_target[i])
    title_color = "black" if y_original_model[i] == y_original_target[i] else "red"
    ax.set_title(label, color=title_color)

前回「model.predict(X_original_data)」と書いたところを「grid_search.predict(X_original_data)」に変更しただけです。

前回よりも結果が悪くなってしまいました。

MNISTデータにより適合しすぎた(オリジナルの手書き数字をターゲットだと考えるなら訓練データに過学習した)せいかもしれません。

 




top