浅野直樹の学習日記

この画面は、簡易表示です

Python3エンジニア認定データ分析試験を受験します

突然ですが、2021年12月末までにPython3エンジニア認定データ分析試験を受験します。

受験宣言して教科書のPython本をもらおう!(応募は2021年11月末日まで) | 一般社団法人Pythonエンジニア育成推進協会につられて宣言してみました。

pythonを使った機械学習に手を出していたところに上記キャンペーンの存在を知り、一念発起しました。

より基礎的なPython3エンジニア認定基礎試験も気になっています。

試験内容をざっと確認したところ、両試験とも調べればすぐにわかるような事柄を記憶しているかが問われます。

個人的にそうした暗記物は好きではないのですが、プログラムを書くときに初歩的な文法などでいちいち検索していたら生産が阻害されるのもまた事実であり、pythonを流暢に操れるようになるためにこの試験を活用してみたいです。

 



Twitterで好きな画像を収集して自動でリツイートするbotを作る

夏休みの課題としてTwitterで好きな画像を収集して自動でリツイートするbotを作ろうと思い立ちました。

機械学習やTwitterのAPIに習熟するいい機会になりました。

一応の完成形にたどり着いたのでそのやり方を共有します。

 

1.画像を取得する対象ツイートの開始時刻と終了時刻を設定する

例えば毎日午前6時から過去24時間分の画像を収集するなら次のようにします。

from datetime import date, time, datetime, timezone, timedelta

#開始時刻と終了時刻の設定
today = date.today()
time = time(6)
end_time = datetime.combine(today, time, tzinfo=timezone(timedelta(hours=9)))
start_time = end_time - timedelta(hours=24)

#開始時刻と終了時刻の確認
print('start_time:', start_time.isoformat())
print('end_time:', end_time.isoformat())

午前6時を10分ほど過ぎてから実行しないとエラーになります。

start_time: 2021-08-30T06:00:00+09:00
end_time: 2021-08-31T06:00:00+09:00

このような表示になれば成功です。日付の部分は実行した日によって変わります。

 

2.Twitter API v2で画像を一括取得する

開始時刻から終了時刻までのツイートの中で検索ワードにマッチして画像が含まれるオリジナルツイート(リツイートではないツイート)を網羅的に取得します。

import requests
import os
import json

#定数の設定
bearer_token = "YOUR_BEARER_TOKEN"
search_url = "https://api.twitter.com/2/tweets/search/recent"
query_params = {
    'query': 'テスト has:images -is:retweet',
    'expansions': 'attachments.media_keys',
    'media.fields': 'url',
    'max_results': 100,
    'start_time': start_time.isoformat(),
    'end_time': end_time.isoformat(),
}

#画像URLをキー、そのツイートIDを値として格納するディクショナリを作成
results = {}

#認証用の関数
def bearer_oauth(r):
    r.headers["Authorization"] = f"Bearer {bearer_token}"
    r.headers["User-Agent"] = "v2RecentSearchPython"
    return r

#検索エンドポイントに接続して必要なデータを取得する関数
def connect_to_endpoint(url, params):
    has_next = True
    while has_next:
        #APIを叩いて結果を取得
        response = requests.get(url, auth=bearer_oauth, params=params)

        #ステータスコードが200以外ならエラー処理
        print(response.status_code)
        if response.status_code != 200:
            raise Exception(response.status_code, response.text)

        #responseからJSONを取得
        json_response = response.json()

        #ツイートをループしてmedia_keyをキー、ツイートIDを値とするディクショナリを作成
        media_tweet_dict = {}
        for tweet in json_response['data']:
            if 'attachments' in tweet.keys():
                for media_key in tweet['attachments']['media_keys']:
                    media_tweet_dict[media_key] = tweet['id']

        #メディアのループ
        for image in json_response['includes']['media']:
            try:
                results[image['url']] = media_tweet_dict[image['media_key']]
            except:
                pass

        #次のページがあるかどうかを確かめ、あればquery_paramsにnext_tokenを追加
        has_next = 'next_token' in json_response['meta'].keys()
        if has_next:
            query_params['next_token'] = json_response['meta']['next_token']

#実行
connect_to_endpoint(search_url, query_params)

#結果の確認
print(results)

YOUR_BEARER_TOKENという部分を自分のBearer Tokenに書き換えて、「テスト」という部分を検索したいワードに書き換えます。

うまく動けば、200という成功を表わすステータスコードと、画像URLをキー、そのツイートIDを値とするディクショナリの内容が表示されます。

Twitter API v2で画像を一括取得する – 浅野直樹の学習日記と大枠は同じです。

画像URLとツイートIDを直接紐付けることはできないので、media_keyを中間に介在させなければならないのがやや面倒です。

 

3.収集した画像にターゲットが写っているかを機械学習で判定

これも結論を載せます。

import pickle
from skimage import io, color
from skimage.transform import resize
import numpy as np
import matplotlib.pyplot as plt

#画像データとツイートIDを保存するリストをそれぞれ作成
data = []
tweet_ids = []

#事前に保存したパイプラインとしきい値をロードする
filename = 'finalized_pipe_and_threshold.sav'
loaded = pickle.load(open(filename, 'rb'))

#画像の読み込み
for k, v in results.items():
    image = io.imread(k)
    if image.ndim == 2:
        continue
    if image.shape[2] == 4:
        image = color.rgba2rgb(image)
    resized_image = resize(image, (64, 64))
    data.append(resized_image)
    tweet_ids.append(v)

#画像データを変形して機械学習の適用
X = np.array(data).reshape(len(data), -1)
y = loaded['pipe'].decision_function(X) >= loaded['threshold']

#Trueと判定されたデータとツイートIDだけを取得
true_data = [data[i] for i in range(len(data)) if y[i] == True]
ids_to_retweet = [tweet_ids[i] for i in range(len(data)) if y[i] == True]

#描画領域の確保
fig, axes = plt.subplots(10, 10, figsize=(15, 15), subplot_kw={'xticks':[], 'yticks':[]})

#Trueと判定された画像の表示
for i, ax in enumerate(axes.flat):
    ax.imshow(true_data[i])
    if i+1 >= len(true_data): break

機械学習は事前に済ませてpickleで保存しているという前提です。

詳細はscikit-learnで機械学習をして好きな画像を自動で分類する – 浅野直樹の学習日記をご参照ください。

ここではTrueと判定された画像を表示させて目で見て確認していますが、実際に運用する際はその必要はありません。

#Falseと判定されたデータだけを取得
false_data = [data[i] for i in range(len(data)) if y[i] == False]

#描画領域の確保
fig, axes = plt.subplots(10, 10, figsize=(15, 15), subplot_kw={'xticks':[], 'yticks':[]})

#Falseと判定された画像の表示
for i, ax in enumerate(axes.flat):
    ax.imshow(false_data[i])
    if i+1 >= len(false_data): break

こうすればFalseと判定された画像だけを表示することもできます。

 

4.見つけた画像の自動リツイート

いよいよ最後のプロセスです。

from requests_oauthlib import OAuth1Session
import os
import json

#定数の設定
consumer_key = "YOUR_CONSUMER_KEY"
consumer_secret = "YOUR_CONSUMER_SECRET"
access_token = "YOUR_ACCESS_TOKEN"
access_token_secret = "YOUR_ACCESS_TOKEN_SECRET"
user_id = "YOUR_USER_ID"

#リクエストの作成
oauth = OAuth1Session(
    consumer_key,
    client_secret=consumer_secret,
    resource_owner_key=access_token,
    resource_owner_secret=access_token_secret,
)

#リツイートの実行
for tweet_id in ids_to_retweet:
    payload = {"tweet_id": tweet_id}
    response = oauth.post("https://api.twitter.com/2/users/{}/retweets".format(user_id), json=payload)
    if response.status_code != 200:
        raise Exception("Request returned an error: {} {}".format(response.status_code, response.text))
    print("Response code: {}".format(response.status_code))

Twitter-API-v2-sample-code/retweet_a_tweet.py at main · twitterdev/Twitter-API-v2-sample-codeを大幅に簡略化しました。

定数の部分は自分の値に設定してください。

これでリツイートした回数分だけ「Response code: 200」と表示されるはずです。

 

5.さいごに

ここまで来る道のりは長かったです。何度も挫折しそうになりました。

この記事では解説のためにコードを分割しましたが、本番では一つのコードにまとめて、cronで定期的に自動実行します。

一歩ずつ進めば好きな画像を自動で収集してリツイートするbotを作ることができます。



Twitter API v2で画像を一括取得する

Twitterで画像を一括取得する方法です。

pythonでTwitter API v2を直接触ります。

 

1.前提

pythonの実行環境は各自で用意してください。私はjupyter notebookを使っています。

Twitter Developerの登録を済ませてBearer Tokenを取得していることを前提とします。

v1.1を使っていた人は、以下のリンクを参考にしてv2が使えるようになっているかを確認してください。

Twitter API V2を利用しようとしたらハマったこと | 日々機械的に考える

 

2.Recent searchの簡単なテスト

フリーで使えるRecent searchを使います。

きちんと認証して検索できているかを簡単にテストします。

Twitter-API-v2-sample-code/recent_search.py at main · twitterdev/Twitter-API-v2-sample-codeをアレンジしました。

import requests
import os
import json

#定数の設定
bearer_token = "YOUR_BEARER_TOKEN"
search_url = "https://api.twitter.com/2/tweets/search/recent"
query_params = {'query': '(from:twitterdev -is:retweet) OR #twitterdev','tweet.fields': 'author_id'}

#認証用の関数
def bearer_oauth(r):
    r.headers["Authorization"] = f"Bearer {bearer_token}"
    r.headers["User-Agent"] = "v2RecentSearchPython"
    return r

#検索エンドポイントに接続してJSONを取得する関数
def connect_to_endpoint(url, params):
    response = requests.get(url, auth=bearer_oauth, params=params)
    print(response.status_code)
    if response.status_code != 200:
        raise Exception(response.status_code, response.text)
    return response.json()

#JSON取得の実行
json_response = connect_to_endpoint(search_url, query_params)
print(json_response)

YOUR_BEARER_TOKENのところに自分のBearer Tokenをそのまま記載(ハードコード)するのが手軽です。セキュリティが気になる人は適当に処理してください。

うまくいっていれば、200というステータスコードと、JSON形式のauthor_id, id, textのデータが表示されます。

 

3.画像URLの取得

画像のURLを取得するためには、query_paramsを次のように設定します。

query_params = {'query': 'テスト has:images', 'expansions': 'attachments.media_keys', 'media.fields': 'url'}

expansionsとmedia.fieldsの両方を指定しなければなりません。リレーショナルデータベースのように、まずexpansionsでメディアのキー(ID)を取得して、そのキー(ID)からURLを取得しているのでしょう。この仕組みを理解するまでに数時間かかりました。

他の部分は上でテストしたときと同じです。

これで「テスト」というキーワードで検索して表示される直近のツイートのうちで画像があるものを検索し、そのURLが取得できます。「テスト」という部分を好きなワードに置き換えればそのワードで検索した結果を取得できます。

json_response['includes']['media'][0]['url']

のようにすればURLだけが得られます。

 

4.一括取得(ページネーション)

デフォルトでは1回の検索で最大10件の取得になります。’max_results’: 100を設定することでこれを100件にしましょう。

query_params = {'query': 'テスト has:images', 'expansions': 'attachments.media_keys', 'media.fields': 'url', 'max_results': 100}

結果が100件を超える場合は、JSONのmetaという部分に含まれるnext_tokenを利用します。

query_paramsにnext_tokenを設定すれば、その続きから結果を取得できます。

import requests
import os
import json

#定数の設定
bearer_token = "YOUR_BEARER_TOKEN"
search_url = "https://api.twitter.com/2/tweets/search/recent"
query_params = {'query': 'テスト has:images', 'expansions': 'attachments.media_keys', 'media.fields': 'url', 'max_results': 100}

#画像URLを格納するリストを作成
image_urls = []

#認証用の関数
def bearer_oauth(r):
    r.headers["Authorization"] = f"Bearer {bearer_token}"
    r.headers["User-Agent"] = "v2RecentSearchPython"
    return r

#検索エンドポイントに接続してJSONを取得する関数
def connect_to_endpoint(url, params):
    has_next = True
    while has_next:
        #APIを叩いて結果を取得
        response = requests.get(url, auth=bearer_oauth, params=params)

        #ステータスコードが200以外ならエラー処理
        if response.status_code != 200:
            raise Exception(response.status_code, response.text)

        #responseからJSONを取得してループを回し、URLを追加していく
        json_response = response.json()
        for image in json_response['includes']['media']:
            try:
                image_urls.append(image['url'])
            except:
                pass

        #次のページがあるかどうかを確かめ、あればquery_paramsにnext_tokenを追加
        has_next = 'next_token' in json_response['meta'].keys()
        if has_next:
            query_params['next_token'] = json_response['meta']['next_token']

#実行
connect_to_endpoint(search_url, query_params)

#結果の確認
print(image_urls)

【Python】Twitter API V2 でツイートを取得する | SEのプログラミングと英語の勉強ブログを参考にさせてもらいました。

rate limitsの処理はしていません。

URLを取得できないというエラーが発生したことがあったので、その部分をtry節にしました。

うまくいけば画像のURLがずらっと表示されます。

あとはそれを好きなように使ってください。

 



scikit-learnで機械学習をして好きな画像を自動で分類する

scikit-learnで機械学習をして好きな画像を自動で分類することに挑戦します。

 

例えば、Twitterで検索して表示される画像から、ターゲットが写っている画像とそうでない画像とを分類するときに使えます。

 

1.訓練用の画像データの読み込み(訓練データの用意)

何からの手段で訓練用の画像を入手し、ターゲットが写っている画像とそうでない画像とでフォルダ分けします。ここではそれぞれpositive、negativeという名前のフォルダにしてプログラムを実行しているファイルと同じ階層に入れています。

import os
from skimage import io, color
from skimage.transform import resize

#定数の設定
POS_DIRECTORY = './positive/'
NEG_DIRECTORY = './negative/'

#データを格納するディクショナリを作成
materials = {'data': [], 'target': []}

#画像の読み込みとリサイズをする関数の作成
def read_and_resize_images(directory, target_value):
    list_files = os.listdir(directory)
    for filename in list_files:
        image = io.imread(directory + filename)
        if image.ndim == 2:
            continue
        if image.shape[2] == 4:
            image = color.rgba2rgb(image)
        resized_image = resize(image, (64, 64))
        materials['data'].append(resized_image)
        materials['target'].append(target_value)

#イメージの読み込み
read_and_resize_images(POS_DIRECTORY, 1)
read_and_resize_images(NEG_DIRECTORY, 0)

#データ数の確認
print(len(materials['data']))

時間をかけて画像ファイルを読み込んだ後、そのファイルの数が表示されます。

読み込んだ画像の一部を確認してみましょう。

import matplotlib.pyplot as plt

#描画領域の確保
fig, axes = plt.subplots(10, 10, figsize=(15, 15), subplot_kw={'xticks':[], 'yticks':[]})

#確保した描画領域に読み込んだ画像の最初の50枚と最後の50枚を表示
for i, ax in enumerate(axes.flat):
    if i < 50:
        ax.imshow(materials['data'][i])
    else:
        ax.imshow(materials['data'][-i+49])

これで最初の50枚(ターゲットが写っている画像)と最後の50枚(ターゲットが写っていない画像)が表示されるはずです。

 

2.次元削減の程度の決定

上で読み込んだ画像の特徴量の数は64×64×3=12288です。PCAで次元削減をしましょう。

import numpy as np
from sklearn.decomposition import PCA

#データの整形
reshaped_data = np.array(materials['data']).reshape(len(materials['data']), -1)
print (reshaped_data.shape)

#PCAによって削減される特徴量数と失われるデータの分散との関係を表示
pca = PCA().fit(reshaped_data)
plt.plot(np.cumsum(pca.explained_variance_ratio_))
plt.xlabel('number of components')
plt.ylabel('cumulative explained variance')
plt.show()

まず、データを整形することにより、サンプル数×特徴量数(12288)の2次元配列が得られます。

次に、特徴量数と説明できる分散の割合のグラフが表示されます。このグラフを参考にして、分散の値を決めます。

今度はその分散の値でPCAを実行し、特徴量数を確かめます。分散の値を0.95にするなら次のようなコードです。

#PCAで次元削減
pca = PCA(0.95).fit(reshaped_data)
print('pca.n_components_:', pca.n_components_)

これで特徴量数が表示されます。私が試してみたデータでは306まで特徴量数を削減できました。

次元削減してから復元した画像を目で見て確かめてみます。

#次元を削減してから元に戻し、0~1の範囲にクリップする
reduced_data = pca.transform(reshaped_data)
restored_data = pca.inverse_transform(reduced_data)
clipped_data = np.clip(restored_data, 0, 1)

#描画領域の確保
fig, axes = plt.subplots(10, 10, figsize=(15, 15), subplot_kw={'xticks':[], 'yticks':[]})

#確保した描画領域に読み込んだ画像の最初の50枚と最後の50枚を表示
for i, ax in enumerate(axes.flat):
    if i < 50:
        ax.imshow(clipped_data[i].reshape(64, 64, 3), vmin=0, vmax=1)
    else:
        ax.imshow(clipped_data[-i+49].reshape(64, 64, 3), vmin=0, vmax=1)

クリップしないとClipping input data to the valid range for imshow with RGB data ([0..1] for floats or [0..255] for integers)という警告メッセージが表示されますが、画像は表示されます。ここではそのエラーを防ぐために数値が0〜1の範囲に収まるようにクリップしました。

 

3.しきい値の調節

いくらか余計な画像が混じってもいいからターゲット画像を見逃したくないということが多いでしょう。

そこでしきい値を調節して再現率(recall)を上げます。

まずは普通に学習した結果の情報を確認します。

SVCをデフォルトパラメータで使うとします。

import matplotlib.pyplot as plt
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.decomposition import PCA
from sklearn.pipeline import Pipeline
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score
from sklearn.metrics import plot_confusion_matrix
from sklearn.metrics import classification_report
from sklearn.metrics import plot_precision_recall_curve

#訓練データとテストデータに分割
reshaped_data = np.array(materials['data']).reshape(len(materials['data']), -1)
X_train, X_test, y_train, y_test = train_test_split(reshaped_data, materials['target'])

#パイプラインの構築
steps = [('pca', PCA()), ('model', SVC())]
pipe = Pipeline(steps)

#作成したモデルで学習
pipe.fit(X_train, y_train)

#学習結果のテスト
y_test_pred = pipe.predict(X_test)
print('accuracy_score:', accuracy_score(y_test, y_test_pred))

#テストの混同行列
plot_confusion_matrix(pipe, X_test, y_test)
plt.show()

#訓練の混同行列
plot_confusion_matrix(pipe, X_train, y_train)
plt.show()

#適合率(precision)・再現率(recall)・F1値の一括計算
y_train_pred = pipe.predict(X_train)
print(classification_report(y_train, y_train_pred))

#適合率―再現率曲線
plot_precision_recall_curve(pipe, X_train, y_train)
plt.show()

その結果から、目指すべき再現率(recall)を決めます。そして、その再現率に合うように、訓練データの中で見逃してもよい個数を算出します。

from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

#見逃す個数の設定
NUM_TO_MISS = 3

#訓練データの中からfalse_negativeだけをリスト内包表記で抽出
false_negative = [X_train[i] for i in range(len(X_train)) if (y_train_pred[i] == False and y_train[i] == True)]

#false_negativeに決定関数を適用した値を取得後、ソートして表示
false_negative_value = pipe.decision_function(false_negative)
sorted_false_negative_value = np.sort(false_negative_value)
print(sorted_false_negative_value, '\n\n')

#決定関数のしきい値を見逃す個数に応じて設定
y_train_pred_lower_threshold = (pipe.decision_function(X_train) >= sorted_false_negative_value[NUM_TO_MISS - 1])

#新しく設定したしきい値での適合率(precision)・再現率(recall)・F1値の一括計算
print(classification_report(y_train, y_train_pred_lower_threshold))

#新しく設定したしきい値での混同行列
disp = ConfusionMatrixDisplay(confusion_matrix(y_train, y_train_pred_lower_threshold))
disp.plot()
plt.show()

#新しく設定したしきい値での学習結果のテスト
y_test_pred_lower_threshold = (pipe.decision_function(X_test) >= sorted_false_negative_value[NUM_TO_MISS - 1])
accuracy_score(y_test, y_test_pred_lower_threshold)

#新しく設定したしきい値で学習したテストデータの混同行列
disp = ConfusionMatrixDisplay(confusion_matrix(y_test, y_test_pred_lower_threshold))
disp.plot()
plt.show()

望む結果が得られたら学習後のモデルとしきい値を保存しておきましょう。

import pickle

pipe_and_threshold = {'pipe':pipe, 'threshold':sorted_false_negative_value[NUM_TO_MISS - 1]}
filename = 'finalized_pipe_and_threshold.sav'
pickle.dump(pipe_and_threshold, open(filename, 'wb'))

 

4.未知の画像への適用

これでやりたいことができます。未知の画像に先ほど学習したモデルを適用します。

import pickle
import os
from skimage import io, color
from skimage.transform import resize
import numpy as np
import matplotlib.pyplot as plt

#定数の設定
APPLY_DIRECTORY = './apply/'

#画像データを保存するリストを作成
data = []

# 保存したパイプラインとしきい値をロードする
filename = 'finalized_pipe_and_threshold.sav'
loaded = pickle.load(open(filename, 'rb'))

#適用する画像ファイルの読み込み
list_files = os.listdir(APPLY_DIRECTORY)
for filename in list_files:
    image = io.imread(APPLY_DIRECTORY + filename)
    if image.ndim == 2:
        continue
    if image.shape[2] == 4:
        image = color.rgba2rgb(image)
    resized_image = resize(image, (64, 64))
    data.append(resized_image)

#画像データを変形して適用
X = np.array(data).reshape(len(data), -1)
y = loaded['pipe'].decision_function(X) >= loaded['threshold']

#Trueと判定されたデータだけを取得
true_data = [data[i] for i in range(len(data)) if y[i] == True]

#描画領域の確保
fig, axes = plt.subplots(10, 10, figsize=(15, 15), subplot_kw={'xticks':[], 'yticks':[]})

#Trueと判定された画像の表示
for i, ax in enumerate(axes.flat):
    ax.imshow(true_data[i])
    if i+1 >= len(true_data): break

できました。

 



scikit-learnでMNIST手書き数字の分類機械学習(3)

scikit-learnでMNIST手書き数字の分類機械学習 – 浅野直樹の学習日記scikit-learnでMNIST手書き数字の分類機械学習(2) – 浅野直樹の学習日記の続きです。

今回は、しきい値の調節、ニューラルネットワーク、アンサンブルに取り組みます。

 

1.しきい値の調節

機械学習の目的によっては、単純に正解率のスコアを上げるのではなく、正解率は下がってもよいからできるだけ漏れなく検出してほしいという場合があります。そのような場合にはしきい値を調節します。

0〜9までの10種類の数字を分類するのはややこしいので、8か8以外の数字かを分類するという二項分類の例で考えます。

from sklearn import datasets
from sklearn.model_selection import train_test_split
from sklearn import svm
from sklearn.metrics import accuracy_score
from sklearn.metrics import plot_confusion_matrix
import matplotlib.pyplot as plt

#データの読み込み
digits = datasets.load_digits()

#訓練データとテストデータに分割
X_train, X_test, y_train, y_test = train_test_split(digits.data, digits.target, shuffle=False)

#8か8以外の数字かでnumpy配列のマスク処理
y_train_8 = (y_train == 8)
y_test_8 = (y_test == 8)

#機械学習モデルの作成
model = svm.SVC()

#作成したモデルで学習
model.fit(X_train, y_train_8)

#学習結果のテスト
y_test_pred = model.predict(X_test)
print('accuracy_score:', accuracy_score(y_test_8, y_test_pred))

#混同行列
plot_confusion_matrix(model, X_test, y_test_8)
plt.show()

いつもの要領で学習をして、結果を表示させます。

0.9755555555555555

正解率のスコアが0.975…とはなかなかいいですね。

次に、訓練データの混同行列と適合率(precision)・再現率(recall)・F1値を確認してみましょう。

import matplotlib.pyplot as plt
from sklearn.metrics import plot_confusion_matrix
from sklearn.metrics import precision_score, recall_score
from sklearn.metrics import classification_report

#混同行列
plot_confusion_matrix(model, X_train, y_train_8)
plt.show()

#適合率(precision)・再現率(recall)・F1値の手計算
y_train_pred = model.predict(X_train)
precision_score = precision_score(y_train_8, y_train_pred)
recall_score = recall_score(y_train_8, y_train_pred)
f1_score = (2 * precision_score * recall_score) / (precision_score + recall_score)
print('precision_score:', precision_score)
print('recall_score:', recall_score)
print('f1_score:', f1_score, '\n\n')

#適合率(precision)・再現率(recall)・F1値の一括計算
print(classification_report(y_train_8, y_train_pred))

以下の図と文字列が表示されます。

precision_score: 1.0
recall_score: 0.8872180451127819
f1_score: 0.9402390438247012


precision recall f1-score support

False 0.99 1.00 0.99 1214
True 1.00 0.89 0.94 133

accuracy 0.99 1347
macro avg 0.99 0.94 0.97 1347
weighted avg 0.99 0.99 0.99 1347

適合率は1.0と完璧ですが、再現率は0.88程度とやや低く、本当は8なのに見逃しているものが15個とけっこうたくさんあります。

適合率が下がることは覚悟で再現率を上げてみましょう。

その前に、適合率と再現率の関係をグラフ化します。

from sklearn.metrics import plot_precision_recall_curve

#適合率―再現率曲線
plot_precision_recall_curve(model, X_train, y_train_8)
plt.show()

これだけで簡単きれいに表示されます。

ROC曲線も見てみましょう。

from sklearn.metrics import plot_roc_curve

#ROC曲線
plot_roc_curve(model, X_train, y_train_8)
plt.show()

再現率(recall)0.99を狙ってみます。訓練データの中にある133個の8のうち、先ほどは15個を見逃していましたが、見逃してもよいのは1個だけということです。

from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import numpy as np

#訓練データの中で実際には8なのに8ではないと予測したデータ(false_negative)だけをリスト内包表記で抽出
false_negative = [X_train[i] for i in range(len(X_train)) if (y_train_pred[i] == False and y_train_8[i] == True)]

#false_negativeに決定関数を適用した値を取得後、ソートして表示
false_negative_value = model.decision_function(false_negative)
sorted_false_negative_value = np.sort(false_negative_value)
print(sorted_false_negative_value, '\n\n')

#決定関数のしきい値をsorted_false_negative_valueの2番目に小さい数に設定
y_train_pred_lower_threshold = (model.decision_function(X_train) >= sorted_false_negative_value[1])

#新しく設定したしきい値での適合率(precision)・再現率(recall)・F1値の一括計算
print(classification_report(y_train_8, y_train_pred_lower_threshold))

#新しく設定したしきい値での混同行列
disp = ConfusionMatrixDisplay(confusion_matrix(y_train_8, y_train_pred_lower_threshold))
disp.plot()
plt.show()

#新しく設定したしきい値での学習結果のテスト
y_test_pred_lower_threshold = (model.decision_function(X_test) >= sorted_false_negative_value[1])
accuracy_score(y_test_8, y_test_pred_lower_threshold)

#新しく設定したしきい値で学習したテストデータの混同行列
disp = ConfusionMatrixDisplay(confusion_matrix(y_test_8, y_test_pred_lower_threshold))
disp.plot()
plt.show()

結果は以下です。

[-0.56471249 -0.5230137 -0.4710125 -0.47010167 -0.46740437 -0.35965516
-0.34701922 -0.21615345 -0.192735 -0.11733131 -0.10970646 -0.06225593
-0.04509709 -0.03724317 -0.02381575]


precision recall f1-score support

False 1.00 1.00 1.00 1214
True 0.99 0.99 0.99 133

accuracy 1.00 1347
macro avg 0.99 1.00 0.99 1347
weighted avg 1.00 1.00 1.00 1347

プログラム内のコメントに書いたとおり、false_negative(偽陰性)だけを取り出し、その決定関数の値を参考にして、しきい値を-0.523…に設定しています。

当然ながら、目論見通りに再現率(recall)0.99を達成しています。

テストデータでの結果はそこまでよくありませんが、それでも当初の結果と比べると、見逃しは11個から4個に減少しています。

plot_confusion_matrixはXとyを引数に取るため、しきい値を変更した場合の混同行列はconfusion_matrixとConfusionMatrixDisplayを併用する形で表現しています。

やりたいことはできました。

Aurélien Géron 著,下田倫大 監訳,長尾高弘 訳『scikit-learnとTensorFlowによる実践機械学習』(オライリー・ジャパン, 2018)の3章を主に参照しました。

 

2.ニューラルネットワーク

scikit-learnでお手軽にニューラルネットワークを試してみます。

from sklearn import datasets
from sklearn.model_selection import train_test_split
from sklearn.neural_network import MLPClassifier
from sklearn.metrics import accuracy_score
from sklearn.metrics import plot_confusion_matrix
import matplotlib.pyplot as plt

#データの読み込み
digits = datasets.load_digits()

#訓練データとテストデータに分割
X_train, X_test, y_train, y_test = train_test_split(digits.data, digits.target, shuffle=False)

#機械学習モデルの作成
model = MLPClassifier(verbose=True, random_state=0)

#学習
model.fit(X_train, y_train)

#学習結果のテスト
y_pred = model.predict(X_test)
print('accuracy_score:', accuracy_score(y_test, y_pred))

#混同行列
plot_confusion_matrix(model, X_test, y_test)
plt.show()

いつもの手順でモデルにMLPClassifierを指定しただけです。ニューラルネットワークの雰囲気を醸し出すためにverbose=Trueにしています。よって以下のように出力されます。

Iteration 1, loss = 11.97505127
Iteration 2, loss = 5.72047413
Iteration 3, loss = 3.28455794
(中略)
Iteration 133, loss = 0.00351769
Iteration 134, loss = 0.00345282
Iteration 135, loss = 0.00341083
Training loss did not improve more than tol=0.000100 for 10 consecutive epochs. Stopping.
accuracy_score: 0.9244444444444444

サンプル数が少ないためか、それほどよくない結果ですね。

 

3.アンサンブル

アンサンブルも試してみます。

#データ関係
from sklearn import datasets
from sklearn.model_selection import train_test_split
#学習モデル関係
from sklearn.ensemble import VotingClassifier
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.linear_model import LogisticRegressionCV
from sklearn.ensemble import RandomForestClassifier
from sklearn.neural_network import MLPClassifier
#結果の表示
from sklearn.metrics import accuracy_score
from sklearn.metrics import plot_confusion_matrix
import matplotlib.pyplot as plt

#データの読み込み
digits = datasets.load_digits()

#訓練データとテストデータに分割
X_train, X_test, y_train, y_test = train_test_split(digits.data, digits.target, shuffle=False)

#スケーリング
X_train = X_train / 16
X_test = X_test / 16

#機械学習モデルの作成
svc = SVC(probability=True)
kn = KNeighborsClassifier()
lr = LogisticRegressionCV(max_iter=500)
rf = RandomForestClassifier()
mlp = MLPClassifier(max_iter=500)

#アンサンブルモデルの作成
hard_voting = VotingClassifier(
estimators=[('svc', svc), ('kn', kn), ('lr', lr), ('rf', rf), ('mlp', mlp)],
voting='hard')

soft_voting = VotingClassifier(
estimators=[('svc', svc), ('kn', kn), ('lr', lr), ('rf', rf), ('mlp', mlp)],
voting='soft')

#学習
hard_voting.fit(X_train, y_train)
soft_voting.fit(X_train, y_train)

#学習結果のテスト
y_hard_pred = hard_voting.predict(X_test)
y_soft_pred = soft_voting.predict(X_test)
print('hard_voting accuracy_score:', accuracy_score(y_test, y_hard_pred))
print('soft_voting accuracy_score:', accuracy_score(y_test, y_soft_pred))

#混同行列
plot_confusion_matrix(hard_voting, X_test, y_test)
plt.show()
plot_confusion_matrix(soft_voting, X_test, y_test)
plt.show()

#個別のモデル
estimators = [svc, kn, lr, rf, mlp]
for estimator in estimators:
estimator.fit(X_train, y_train)
y_individual_pred = estimator.predict(X_test)
print(estimator.__class__.__name__, 'accuracy_score:', accuracy_score(y_test, y_individual_pred))

soft_votingのためにSVC(probability=True)としています。また、警告表示を見て、LogisticRegressionCVとMLPClassifierにはmax_iter=500と設定しました。

結果は以下です。

hard_voting accuracy_score: 0.9488888888888889
soft_voting accuracy_score: 0.9444444444444444

SVC accuracy_score: 0.9488888888888889
KNeighborsClassifier accuracy_score: 0.9644444444444444
LogisticRegressionCV accuracy_score: 0.9288888888888889
RandomForestClassifier accuracy_score: 0.9333333333333333
MLPClassifier accuracy_score: 0.9244444444444444

アンサンブルにしたからといって劇的にスコアがよくなるというわけではなさそうです。

 

 




top