浅野直樹の学習日記

この画面は、簡易表示です

WikipediaのAPIを使ってランダムに記事本文を取得する

テキストの類似度を求める機械学習の練習をするために、Wikipediaの記事をランダムに集めようと思い立ちました。

これが簡単そうで意外に手こずりました。

今後同じようなことをする人のために手順をまとめておきます。

 

1.手動でWikipediaのランダム記事を取得する

これは簡単です。

https://ja.wikipedia.org/wiki/Special:Randompage

上記リンクをクリックするだけです。

Wikipediaのページ内にある「おまかせ表示」をクリックしても同じです。

 

2.WikipediaのAPIの概要

プログラムでランダムな記事を自動で取得する際にはAPIを使います。

そのAPIが微妙にわかりづらいので、ランダムにWikipediaの記事本文を取得するという用途に限って概要を先にお伝えします。

公式ドキュメントは以下のリンクです。

API:Main page – MediaWiki

情報が多すぎて圧迫されます。

日本語版Wikipediaのエンドポイントは「https://ja.wikipedia.org/w/api.php」です。

パラメータがずらずら書かれていますが、上記公式ドキュメントのAPI:Main pageの中で今回使うのは「action=query」と「format=json」だけです。

次はqueryについて詳しく見てみましょう。

API:Query – MediaWiki

ここにもパラメータがずらっと並べられていますが、使うのは基本的に「prop」と「list」だけです。

さらに「prop」や「list」として設定する値に応じて使えるパラメータが増えてきます。

パラメータが階層別にたくさんあるけれども実際に使うのは少しだけというのがポイントです。

 

3.APIを使ってWikipediaのランダム記事リストを取得する

それではAPIを使ってWikipediaのランダム記事リストを取得してみましょう。

API:Random – MediaWiki

この例をそのまま使います。

#!/usr/bin/python3

"""
    get_random.py

    MediaWiki API Demos
    Demo of `Random` module: Get request to list 5 random pages.

    MIT License
"""

import requests

S = requests.Session()

URL = "https://en.wikipedia.org/w/api.php"

PARAMS = {
    "action": "query",
    "format": "json",
    "list": "random",
    "rnlimit": "5"
}

R = S.get(url=URL, params=PARAMS)
DATA = R.json()

RANDOMS = DATA["query"]["random"]

for r in RANDOMS:
    print(r["title"])

例えば以下のように出力されます(毎回結果は異なります)。

Yaxham Light Railway
User talk:174.93.33.221
User talk:50.47.80.102
User talk:220.253.107.147
User talk:Antonioyuff

エンドポイントを日本語版にするのを忘れていました。

URL = “https://en.wikipedia.org/w/api.php”

URL = “https://ja.wikipedia.org/w/api.php”

に書き換えて実行します。

例えば次のように出力されます。

謝覧
利用者‐会話:Quanpuser
1対1 (データモデル)
利用者‐会話:アルカセット
Template‐ノート:Wide image

「利用者‐会話」や「Template‐ノート」といった通常の記事ではないものが表示されていますね。

通常の記事に限定しましょう。

パラメータに「rnnamespace=0」を加えます。

#!/usr/bin/python3

"""
    get_random.py

    MediaWiki API Demos
    Demo of `Random` module: Get request to list 5 random pages.

    MIT License
"""

import requests

S = requests.Session()

URL = "https://ja.wikipedia.org/w/api.php"

PARAMS = {
    "action": "query",
    "format": "json",
    "list": "random",
    "rnlimit": "5",
    "rnnamespace": "0",
}

R = S.get(url=URL, params=PARAMS)
DATA = R.json()

RANDOMS = DATA["query"]["random"]

for r in RANDOMS:
    print(r["title"])

結果の表示例です。

第5航空軍 (日本軍)
モラル・ハザード
レオミトレス
マッチウェンロックオリンピック
化石爬虫類の一覧

Wikipediaのランダム記事リストを取得するという目標は達成できました。

 

4.Wikipediaの記事本文のプレーンテキストを取得する

次に、Wikipediaの記事本文のプレーンテキストを取得することを考えます。

機械学習 – Wikipediaのページを例にとってやってみましょう。

結論を書きます。

import requests

#記事タイトルの設定
title = "機械学習"

#wikipediaに接続するための基本設定
S = requests.Session()
URL = "https://ja.wikipedia.org/w/api.php"

#記事本体を取得するためのパラメータの設定
ARTICLE_PARAMS = {
    "action": "query",
    "format": "json",
    "prop": "extracts",
    "explaintext": True,
    "exsectionformat": "plain",
    "titles": title
}

#記事タイトルから記事情報の取得
ARTICLE_R = S.get(url=URL, params=ARTICLE_PARAMS)
ARTICLE_DATA = ARTICLE_R.json()
pages = ARTICLE_DATA["query"]["pages"]
page_id = next(iter(pages))
print(pages[page_id]["extract"])

これで記事本文が出力されます。

ページIDを指定して取得するほうがスマートです。

「機械学習」の情報 – Wikipediaによると、機械学習の記事のページIDは185375です。

import requests

#記事ページIDの設定
page_id = "185375"

#wikipediaに接続するための基本設定
S = requests.Session()
URL = "https://ja.wikipedia.org/w/api.php"

#記事本体を取得するためのパラメータの設定
ARTICLE_PARAMS = {
    "action": "query",
    "format": "json",
    "prop": "extracts",
    "explaintext": True,
    "exsectionformat": "plain",
    "pageids": page_id
}

#ページIDから記事情報の取得
ARTICLE_R = S.get(url=URL, params=ARTICLE_PARAMS)
ARTICLE_DATA = ARTICLE_R.json()
print(ARTICLE_DATA["query"]["pages"][page_id]["extract"])

これでWikipediaの記事本文をプレーンテキストで取得する方法がわかりました。

 

5.WikipediaのAPIを使ってランダムに記事本文を取得する

先ほど記事本文を取得したときに使ったパラメータは「titles」や「pageids」のように複数形になっていることからもわかるように、複数の記事を一気に取得できます。

さらに、generatorを使えば、リクエストが1回で済むので、より望ましいです。

ということでgeneratorを使ってみましょう。

import requests

#wikipediaに接続するための基本設定
S = requests.Session()
URL = "https://ja.wikipedia.org/w/api.php"

#ランダム記事リストから記事本体を取得するためのパラメータの設定
GENERATOR_PARAMS = {
    #共通パラメータ
    "action": "query",
    "format": "json",
    #generatorパラメータ
    "generator": "random",
    "grnlimit": "5",
    "grnnamespace": "0",
    #記事取得本体パラメータ
    "prop": "extracts",
    "exintro": True,
    "explaintext": True,
    "exsectionformat": "plain",
}

#ランダム記事リストから記事情報の取得
GENERATOR_R = S.get(url=URL, params=GENERATOR_PARAMS)
GENERATOR_DATA = GENERATOR_R.json()
for k, v in GENERATOR_DATA["query"]["pages"].items():
    print(k, v)

API:Query – MediaWikiに書いてありますように、generatorのパラメータには名前の先頭にgをつけます。

記事取得本体パラメータのほうは先ほどと同じです。

ただ、「exintro」という先ほどはなかったパラメータが増えていることに注意してください。これは記事本文の最初の部分だけを取得するためのパラメータです。ここでは値をTrueとしていますが、値は何であっても(Falseでも0でも1でも何でも)パラメータが設定されているだけで有効になります。

「exintro」パラメータを設定しないと、”exlimit” was too large for a whole article extracts request, lowered to 1.エラーが発生して、一気にextract(記事本文)を取得することができませんでした。記事本文を一気に取得すると膨大なデータ量になるおそれがあるために運営側で制限しているのでしょう。

仕方がないのでランダムなページIDから一つずつ記事本体を取得します。

import requests
import time

#wikipediaに接続するための基本設定
S = requests.Session()
URL = "https://ja.wikipedia.org/w/api.php"

#wikipediaのランダム記事リスト(idとタイトル)を取得するためのパラメータの設定
RANDOM_PARAMS = {
    "action": "query",
    "format": "json",
    "list": "random",
    "rnlimit": "5",
    "rnnamespace": "0"
}

#記事本体を取得するためのパラメータの設定
ARTICLE_PARAMS = {
    "action": "query",
    "format": "json",
    "prop": "extracts",
    "explaintext": True,
    "exsectionformat": "plain",
}

#ランダム記事リストの取得
RANDOM_R = S.get(url=URL, params=RANDOM_PARAMS)
RANDOM_DATA = RANDOM_R.json()
RANDOMS = RANDOM_DATA["query"]["random"]

#ランダム記事リストから各記事本体を取得
for r in RANDOMS:
    #ページIDと記事タイトルを変数に格納
    page_id = r["id"]
    title = r["title"]

    #ページIDから記事情報の取得
    ARTICLE_PARAMS["pageids"] = str(page_id)
    ARTICLE_R = S.get(url=URL, params=ARTICLE_PARAMS)
    ARTICLE_DATA = ARTICLE_R.json()

    #結果の出力
    print(title)
    print(ARTICLE_DATA["query"]["pages"][str(page_id)]["extract"])
    print("-"*40)

    #高速でリクエストを繰り返すことで負担をかけないように1秒待つ
    time.sleep(1)

print("終了しました")

これでやりたいことができました。



Python3エンジニア認定データ分析試験を受験します

突然ですが、2021年12月末までにPython3エンジニア認定データ分析試験を受験します。

受験宣言して教科書のPython本をもらおう!(応募は2021年11月末日まで) | 一般社団法人Pythonエンジニア育成推進協会につられて宣言してみました。

pythonを使った機械学習に手を出していたところに上記キャンペーンの存在を知り、一念発起しました。

より基礎的なPython3エンジニア認定基礎試験も気になっています。

試験内容をざっと確認したところ、両試験とも調べればすぐにわかるような事柄を記憶しているかが問われます。

個人的にそうした暗記物は好きではないのですが、プログラムを書くときに初歩的な文法などでいちいち検索していたら生産が阻害されるのもまた事実であり、pythonを流暢に操れるようになるためにこの試験を活用してみたいです。

 



Twitterで好きな画像を収集して自動でリツイートするbotを作る

夏休みの課題としてTwitterで好きな画像を収集して自動でリツイートするbotを作ろうと思い立ちました。

機械学習やTwitterのAPIに習熟するいい機会になりました。

一応の完成形にたどり着いたのでそのやり方を共有します。

 

1.画像を取得する対象ツイートの開始時刻と終了時刻を設定する

例えば毎日午前6時から過去24時間分の画像を収集するなら次のようにします。

from datetime import date, time, datetime, timezone, timedelta

#開始時刻と終了時刻の設定
today = date.today()
time = time(6)
end_time = datetime.combine(today, time, tzinfo=timezone(timedelta(hours=9)))
start_time = end_time - timedelta(hours=24)

#開始時刻と終了時刻の確認
print('start_time:', start_time.isoformat())
print('end_time:', end_time.isoformat())

午前6時を10分ほど過ぎてから実行しないとエラーになります。

start_time: 2021-08-30T06:00:00+09:00
end_time: 2021-08-31T06:00:00+09:00

このような表示になれば成功です。日付の部分は実行した日によって変わります。

 

2.Twitter API v2で画像を一括取得する

開始時刻から終了時刻までのツイートの中で検索ワードにマッチして画像が含まれるオリジナルツイート(リツイートではないツイート)を網羅的に取得します。

import requests
import os
import json

#定数の設定
bearer_token = "YOUR_BEARER_TOKEN"
search_url = "https://api.twitter.com/2/tweets/search/recent"
query_params = {
    'query': 'テスト has:images -is:retweet',
    'expansions': 'attachments.media_keys',
    'media.fields': 'url',
    'max_results': 100,
    'start_time': start_time.isoformat(),
    'end_time': end_time.isoformat(),
}

#画像URLをキー、そのツイートIDを値として格納するディクショナリを作成
results = {}

#認証用の関数
def bearer_oauth(r):
    r.headers["Authorization"] = f"Bearer {bearer_token}"
    r.headers["User-Agent"] = "v2RecentSearchPython"
    return r

#検索エンドポイントに接続して必要なデータを取得する関数
def connect_to_endpoint(url, params):
    has_next = True
    while has_next:
        #APIを叩いて結果を取得
        response = requests.get(url, auth=bearer_oauth, params=params)

        #ステータスコードが200以外ならエラー処理
        print(response.status_code)
        if response.status_code != 200:
            raise Exception(response.status_code, response.text)

        #responseからJSONを取得
        json_response = response.json()

        #ツイートをループしてmedia_keyをキー、ツイートIDを値とするディクショナリを作成
        media_tweet_dict = {}
        for tweet in json_response['data']:
            if 'attachments' in tweet.keys():
                for media_key in tweet['attachments']['media_keys']:
                    media_tweet_dict[media_key] = tweet['id']

        #メディアのループ
        for image in json_response['includes']['media']:
            try:
                results[image['url']] = media_tweet_dict[image['media_key']]
            except:
                pass

        #次のページがあるかどうかを確かめ、あればquery_paramsにnext_tokenを追加
        has_next = 'next_token' in json_response['meta'].keys()
        if has_next:
            query_params['next_token'] = json_response['meta']['next_token']

#実行
connect_to_endpoint(search_url, query_params)

#結果の確認
print(results)

YOUR_BEARER_TOKENという部分を自分のBearer Tokenに書き換えて、「テスト」という部分を検索したいワードに書き換えます。

うまく動けば、200という成功を表わすステータスコードと、画像URLをキー、そのツイートIDを値とするディクショナリの内容が表示されます。

Twitter API v2で画像を一括取得する – 浅野直樹の学習日記と大枠は同じです。

画像URLとツイートIDを直接紐付けることはできないので、media_keyを中間に介在させなければならないのがやや面倒です。

 

3.収集した画像にターゲットが写っているかを機械学習で判定

これも結論を載せます。

import pickle
from skimage import io, color
from skimage.transform import resize
import numpy as np
import matplotlib.pyplot as plt

#画像データとツイートIDを保存するリストをそれぞれ作成
data = []
tweet_ids = []

#事前に保存したパイプラインとしきい値をロードする
filename = 'finalized_pipe_and_threshold.sav'
loaded = pickle.load(open(filename, 'rb'))

#画像の読み込み
for k, v in results.items():
    image = io.imread(k)
    if image.ndim == 2:
        continue
    if image.shape[2] == 4:
        image = color.rgba2rgb(image)
    resized_image = resize(image, (64, 64))
    data.append(resized_image)
    tweet_ids.append(v)

#画像データを変形して機械学習の適用
X = np.array(data).reshape(len(data), -1)
y = loaded['pipe'].decision_function(X) >= loaded['threshold']

#Trueと判定されたデータとツイートIDだけを取得
true_data = [data[i] for i in range(len(data)) if y[i] == True]
ids_to_retweet = [tweet_ids[i] for i in range(len(data)) if y[i] == True]

#描画領域の確保
fig, axes = plt.subplots(10, 10, figsize=(15, 15), subplot_kw={'xticks':[], 'yticks':[]})

#Trueと判定された画像の表示
for i, ax in enumerate(axes.flat):
    ax.imshow(true_data[i])
    if i+1 >= len(true_data): break

機械学習は事前に済ませてpickleで保存しているという前提です。

詳細はscikit-learnで機械学習をして好きな画像を自動で分類する – 浅野直樹の学習日記をご参照ください。

ここではTrueと判定された画像を表示させて目で見て確認していますが、実際に運用する際はその必要はありません。

#Falseと判定されたデータだけを取得
false_data = [data[i] for i in range(len(data)) if y[i] == False]

#描画領域の確保
fig, axes = plt.subplots(10, 10, figsize=(15, 15), subplot_kw={'xticks':[], 'yticks':[]})

#Falseと判定された画像の表示
for i, ax in enumerate(axes.flat):
    ax.imshow(false_data[i])
    if i+1 >= len(false_data): break

こうすればFalseと判定された画像だけを表示することもできます。

 

4.見つけた画像の自動リツイート

いよいよ最後のプロセスです。

from requests_oauthlib import OAuth1Session
import os
import json

#定数の設定
consumer_key = "YOUR_CONSUMER_KEY"
consumer_secret = "YOUR_CONSUMER_SECRET"
access_token = "YOUR_ACCESS_TOKEN"
access_token_secret = "YOUR_ACCESS_TOKEN_SECRET"
user_id = "YOUR_USER_ID"

#リクエストの作成
oauth = OAuth1Session(
    consumer_key,
    client_secret=consumer_secret,
    resource_owner_key=access_token,
    resource_owner_secret=access_token_secret,
)

#リツイートの実行
for tweet_id in ids_to_retweet:
    payload = {"tweet_id": tweet_id}
    response = oauth.post("https://api.twitter.com/2/users/{}/retweets".format(user_id), json=payload)
    if response.status_code != 200:
        raise Exception("Request returned an error: {} {}".format(response.status_code, response.text))
    print("Response code: {}".format(response.status_code))

Twitter-API-v2-sample-code/retweet_a_tweet.py at main · twitterdev/Twitter-API-v2-sample-codeを大幅に簡略化しました。

定数の部分は自分の値に設定してください。

これでリツイートした回数分だけ「Response code: 200」と表示されるはずです。

 

5.さいごに

ここまで来る道のりは長かったです。何度も挫折しそうになりました。

この記事では解説のためにコードを分割しましたが、本番では一つのコードにまとめて、cronで定期的に自動実行します。

一歩ずつ進めば好きな画像を自動で収集してリツイートするbotを作ることができます。



Twitter API v2で画像を一括取得する

Twitterで画像を一括取得する方法です。

pythonでTwitter API v2を直接触ります。

 

1.前提

pythonの実行環境は各自で用意してください。私はjupyter notebookを使っています。

Twitter Developerの登録を済ませてBearer Tokenを取得していることを前提とします。

v1.1を使っていた人は、以下のリンクを参考にしてv2が使えるようになっているかを確認してください。

Twitter API V2を利用しようとしたらハマったこと | 日々機械的に考える

 

2.Recent searchの簡単なテスト

フリーで使えるRecent searchを使います。

きちんと認証して検索できているかを簡単にテストします。

Twitter-API-v2-sample-code/recent_search.py at main · twitterdev/Twitter-API-v2-sample-codeをアレンジしました。

import requests
import os
import json

#定数の設定
bearer_token = "YOUR_BEARER_TOKEN"
search_url = "https://api.twitter.com/2/tweets/search/recent"
query_params = {'query': '(from:twitterdev -is:retweet) OR #twitterdev','tweet.fields': 'author_id'}

#認証用の関数
def bearer_oauth(r):
    r.headers["Authorization"] = f"Bearer {bearer_token}"
    r.headers["User-Agent"] = "v2RecentSearchPython"
    return r

#検索エンドポイントに接続してJSONを取得する関数
def connect_to_endpoint(url, params):
    response = requests.get(url, auth=bearer_oauth, params=params)
    print(response.status_code)
    if response.status_code != 200:
        raise Exception(response.status_code, response.text)
    return response.json()

#JSON取得の実行
json_response = connect_to_endpoint(search_url, query_params)
print(json_response)

YOUR_BEARER_TOKENのところに自分のBearer Tokenをそのまま記載(ハードコード)するのが手軽です。セキュリティが気になる人は適当に処理してください。

うまくいっていれば、200というステータスコードと、JSON形式のauthor_id, id, textのデータが表示されます。

 

3.画像URLの取得

画像のURLを取得するためには、query_paramsを次のように設定します。

query_params = {'query': 'テスト has:images', 'expansions': 'attachments.media_keys', 'media.fields': 'url'}

expansionsとmedia.fieldsの両方を指定しなければなりません。リレーショナルデータベースのように、まずexpansionsでメディアのキー(ID)を取得して、そのキー(ID)からURLを取得しているのでしょう。この仕組みを理解するまでに数時間かかりました。

他の部分は上でテストしたときと同じです。

これで「テスト」というキーワードで検索して表示される直近のツイートのうちで画像があるものを検索し、そのURLが取得できます。「テスト」という部分を好きなワードに置き換えればそのワードで検索した結果を取得できます。

json_response['includes']['media'][0]['url']

のようにすればURLだけが得られます。

 

4.一括取得(ページネーション)

デフォルトでは1回の検索で最大10件の取得になります。’max_results’: 100を設定することでこれを100件にしましょう。

query_params = {'query': 'テスト has:images', 'expansions': 'attachments.media_keys', 'media.fields': 'url', 'max_results': 100}

結果が100件を超える場合は、JSONのmetaという部分に含まれるnext_tokenを利用します。

query_paramsにnext_tokenを設定すれば、その続きから結果を取得できます。

import requests
import os
import json

#定数の設定
bearer_token = "YOUR_BEARER_TOKEN"
search_url = "https://api.twitter.com/2/tweets/search/recent"
query_params = {'query': 'テスト has:images', 'expansions': 'attachments.media_keys', 'media.fields': 'url', 'max_results': 100}

#画像URLを格納するリストを作成
image_urls = []

#認証用の関数
def bearer_oauth(r):
    r.headers["Authorization"] = f"Bearer {bearer_token}"
    r.headers["User-Agent"] = "v2RecentSearchPython"
    return r

#検索エンドポイントに接続してJSONを取得する関数
def connect_to_endpoint(url, params):
    has_next = True
    while has_next:
        #APIを叩いて結果を取得
        response = requests.get(url, auth=bearer_oauth, params=params)

        #ステータスコードが200以外ならエラー処理
        if response.status_code != 200:
            raise Exception(response.status_code, response.text)

        #responseからJSONを取得してループを回し、URLを追加していく
        json_response = response.json()
        for image in json_response['includes']['media']:
            try:
                image_urls.append(image['url'])
            except:
                pass

        #次のページがあるかどうかを確かめ、あればquery_paramsにnext_tokenを追加
        has_next = 'next_token' in json_response['meta'].keys()
        if has_next:
            query_params['next_token'] = json_response['meta']['next_token']

#実行
connect_to_endpoint(search_url, query_params)

#結果の確認
print(image_urls)

【Python】Twitter API V2 でツイートを取得する | SEのプログラミングと英語の勉強ブログを参考にさせてもらいました。

rate limitsの処理はしていません。

URLを取得できないというエラーが発生したことがあったので、その部分をtry節にしました。

うまくいけば画像のURLがずらっと表示されます。

あとはそれを好きなように使ってください。

 



scikit-learnで機械学習をして好きな画像を自動で分類する

scikit-learnで機械学習をして好きな画像を自動で分類することに挑戦します。

 

例えば、Twitterで検索して表示される画像から、ターゲットが写っている画像とそうでない画像とを分類するときに使えます。

 

1.訓練用の画像データの読み込み(訓練データの用意)

何からの手段で訓練用の画像を入手し、ターゲットが写っている画像とそうでない画像とでフォルダ分けします。ここではそれぞれpositive、negativeという名前のフォルダにしてプログラムを実行しているファイルと同じ階層に入れています。

import os
from skimage import io, color
from skimage.transform import resize

#定数の設定
POS_DIRECTORY = './positive/'
NEG_DIRECTORY = './negative/'

#データを格納するディクショナリを作成
materials = {'data': [], 'target': []}

#画像の読み込みとリサイズをする関数の作成
def read_and_resize_images(directory, target_value):
    list_files = os.listdir(directory)
    for filename in list_files:
        image = io.imread(directory + filename)
        if image.ndim == 2:
            continue
        if image.shape[2] == 4:
            image = color.rgba2rgb(image)
        resized_image = resize(image, (64, 64))
        materials['data'].append(resized_image)
        materials['target'].append(target_value)

#イメージの読み込み
read_and_resize_images(POS_DIRECTORY, 1)
read_and_resize_images(NEG_DIRECTORY, 0)

#データ数の確認
print(len(materials['data']))

時間をかけて画像ファイルを読み込んだ後、そのファイルの数が表示されます。

読み込んだ画像の一部を確認してみましょう。

import matplotlib.pyplot as plt

#描画領域の確保
fig, axes = plt.subplots(10, 10, figsize=(15, 15), subplot_kw={'xticks':[], 'yticks':[]})

#確保した描画領域に読み込んだ画像の最初の50枚と最後の50枚を表示
for i, ax in enumerate(axes.flat):
    if i < 50:
        ax.imshow(materials['data'][i])
    else:
        ax.imshow(materials['data'][-i+49])

これで最初の50枚(ターゲットが写っている画像)と最後の50枚(ターゲットが写っていない画像)が表示されるはずです。

 

2.次元削減の程度の決定

上で読み込んだ画像の特徴量の数は64×64×3=12288です。PCAで次元削減をしましょう。

import numpy as np
from sklearn.decomposition import PCA

#データの整形
reshaped_data = np.array(materials['data']).reshape(len(materials['data']), -1)
print (reshaped_data.shape)

#PCAによって削減される特徴量数と失われるデータの分散との関係を表示
pca = PCA().fit(reshaped_data)
plt.plot(np.cumsum(pca.explained_variance_ratio_))
plt.xlabel('number of components')
plt.ylabel('cumulative explained variance')
plt.show()

まず、データを整形することにより、サンプル数×特徴量数(12288)の2次元配列が得られます。

次に、特徴量数と説明できる分散の割合のグラフが表示されます。このグラフを参考にして、分散の値を決めます。

今度はその分散の値でPCAを実行し、特徴量数を確かめます。分散の値を0.95にするなら次のようなコードです。

#PCAで次元削減
pca = PCA(0.95).fit(reshaped_data)
print('pca.n_components_:', pca.n_components_)

これで特徴量数が表示されます。私が試してみたデータでは306まで特徴量数を削減できました。

次元削減してから復元した画像を目で見て確かめてみます。

#次元を削減してから元に戻し、0~1の範囲にクリップする
reduced_data = pca.transform(reshaped_data)
restored_data = pca.inverse_transform(reduced_data)
clipped_data = np.clip(restored_data, 0, 1)

#描画領域の確保
fig, axes = plt.subplots(10, 10, figsize=(15, 15), subplot_kw={'xticks':[], 'yticks':[]})

#確保した描画領域に読み込んだ画像の最初の50枚と最後の50枚を表示
for i, ax in enumerate(axes.flat):
    if i < 50:
        ax.imshow(clipped_data[i].reshape(64, 64, 3), vmin=0, vmax=1)
    else:
        ax.imshow(clipped_data[-i+49].reshape(64, 64, 3), vmin=0, vmax=1)

クリップしないとClipping input data to the valid range for imshow with RGB data ([0..1] for floats or [0..255] for integers)という警告メッセージが表示されますが、画像は表示されます。ここではそのエラーを防ぐために数値が0〜1の範囲に収まるようにクリップしました。

 

3.しきい値の調節

いくらか余計な画像が混じってもいいからターゲット画像を見逃したくないということが多いでしょう。

そこでしきい値を調節して再現率(recall)を上げます。

まずは普通に学習した結果の情報を確認します。

SVCをデフォルトパラメータで使うとします。

import matplotlib.pyplot as plt
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.decomposition import PCA
from sklearn.pipeline import Pipeline
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score
from sklearn.metrics import plot_confusion_matrix
from sklearn.metrics import classification_report
from sklearn.metrics import plot_precision_recall_curve

#訓練データとテストデータに分割
reshaped_data = np.array(materials['data']).reshape(len(materials['data']), -1)
X_train, X_test, y_train, y_test = train_test_split(reshaped_data, materials['target'])

#パイプラインの構築
steps = [('pca', PCA()), ('model', SVC())]
pipe = Pipeline(steps)

#作成したモデルで学習
pipe.fit(X_train, y_train)

#学習結果のテスト
y_test_pred = pipe.predict(X_test)
print('accuracy_score:', accuracy_score(y_test, y_test_pred))

#テストの混同行列
plot_confusion_matrix(pipe, X_test, y_test)
plt.show()

#訓練の混同行列
plot_confusion_matrix(pipe, X_train, y_train)
plt.show()

#適合率(precision)・再現率(recall)・F1値の一括計算
y_train_pred = pipe.predict(X_train)
print(classification_report(y_train, y_train_pred))

#適合率―再現率曲線
plot_precision_recall_curve(pipe, X_train, y_train)
plt.show()

その結果から、目指すべき再現率(recall)を決めます。そして、その再現率に合うように、訓練データの中で見逃してもよい個数を算出します。

from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

#見逃す個数の設定
NUM_TO_MISS = 3

#訓練データの中からfalse_negativeだけをリスト内包表記で抽出
false_negative = [X_train[i] for i in range(len(X_train)) if (y_train_pred[i] == False and y_train[i] == True)]

#false_negativeに決定関数を適用した値を取得後、ソートして表示
false_negative_value = pipe.decision_function(false_negative)
sorted_false_negative_value = np.sort(false_negative_value)
print(sorted_false_negative_value, '\n\n')

#決定関数のしきい値を見逃す個数に応じて設定
y_train_pred_lower_threshold = (pipe.decision_function(X_train) >= sorted_false_negative_value[NUM_TO_MISS - 1])

#新しく設定したしきい値での適合率(precision)・再現率(recall)・F1値の一括計算
print(classification_report(y_train, y_train_pred_lower_threshold))

#新しく設定したしきい値での混同行列
disp = ConfusionMatrixDisplay(confusion_matrix(y_train, y_train_pred_lower_threshold))
disp.plot()
plt.show()

#新しく設定したしきい値での学習結果のテスト
y_test_pred_lower_threshold = (pipe.decision_function(X_test) >= sorted_false_negative_value[NUM_TO_MISS - 1])
accuracy_score(y_test, y_test_pred_lower_threshold)

#新しく設定したしきい値で学習したテストデータの混同行列
disp = ConfusionMatrixDisplay(confusion_matrix(y_test, y_test_pred_lower_threshold))
disp.plot()
plt.show()

望む結果が得られたら学習後のモデルとしきい値を保存しておきましょう。

import pickle

pipe_and_threshold = {'pipe':pipe, 'threshold':sorted_false_negative_value[NUM_TO_MISS - 1]}
filename = 'finalized_pipe_and_threshold.sav'
pickle.dump(pipe_and_threshold, open(filename, 'wb'))

 

4.未知の画像への適用

これでやりたいことができます。未知の画像に先ほど学習したモデルを適用します。

import pickle
import os
from skimage import io, color
from skimage.transform import resize
import numpy as np
import matplotlib.pyplot as plt

#定数の設定
APPLY_DIRECTORY = './apply/'

#画像データを保存するリストを作成
data = []

# 保存したパイプラインとしきい値をロードする
filename = 'finalized_pipe_and_threshold.sav'
loaded = pickle.load(open(filename, 'rb'))

#適用する画像ファイルの読み込み
list_files = os.listdir(APPLY_DIRECTORY)
for filename in list_files:
    image = io.imread(APPLY_DIRECTORY + filename)
    if image.ndim == 2:
        continue
    if image.shape[2] == 4:
        image = color.rgba2rgb(image)
    resized_image = resize(image, (64, 64))
    data.append(resized_image)

#画像データを変形して適用
X = np.array(data).reshape(len(data), -1)
y = loaded['pipe'].decision_function(X) >= loaded['threshold']

#Trueと判定されたデータだけを取得
true_data = [data[i] for i in range(len(data)) if y[i] == True]

#描画領域の確保
fig, axes = plt.subplots(10, 10, figsize=(15, 15), subplot_kw={'xticks':[], 'yticks':[]})

#Trueと判定された画像の表示
for i, ax in enumerate(axes.flat):
    ax.imshow(true_data[i])
    if i+1 >= len(true_data): break

できました。

 




top