前回の記事から1ヶ月以上経ちましたが、続きです。
↓前回の記事
前回、「株価の時系列データは無料で公開しているところはほぼ無い」って書いてしまったけれども、Tiingoというサイトで無料会員登録すれば、NYSE, NASDAQの株価の時系列データがダウンロード可能です。
で、各所で"pandas.datareader"を使ってTiingoからデータを取得する方法が紹介されていますが、それらには落とし穴があります。
Tiingoの無料会員には使用回数の制限があって、その回数を超えるとエラーが返ってくるのですが、"pandas.datareader"ではそのエラーをハンドリングできないのです。
使用回数の残量は、Tiingoにログインした状態でhttps://www.tiingo.com/account/api/usageにアクセスすれば下記のように表示されます。
import csv from datetime import datetime import os import time import requests def nengatsupi(filetime): getsu = str(filetime[1]) if len(getsu) == 1: getsu = '0' + getsu nichi = str(filetime[2]) if len(nichi) == 1: nichi = '0' + nichi nengappi = str(filetime[0]) + '-' + getsu + '-' + nichi return nengappi headers = { 'Content-Type': 'application/json', 'Authorization' : 'Token xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' } kabu_codes = [] with open('e:/data/code.csv',"r") as codecsv: code_lists = csv.reader(codecsv) for row in code_lists: kabu_codes.append(row[1]) codecsv.close() now_time = str(datetime.now()) print(now_time) for kabu_code in kabu_codes: filepath = 'e:/data/his_data/' + kabu_code + '.csv' if os.path.isfile(filepath): filetime = time.localtime(os.path.getmtime(filepath)) nengappi = nengatsupi(filetime) if now_time[0:10] == nengappi: if (filetime[3] >2) and (os.path.getsize(filepath) >0): print(kabu_code + ': ' + nengappi + ' ' + str(filetime[3]) + ':' + str(filetime[4])) continue Response = requests.get('https://api.tiingo.com/tiingo/daily/' + kabu_code + '/prices?startDate=2021-01-02',headers=headers) de_Res = Response.json() if len(de_Res) >1: with open(filepath,'w') as savefile: for datedata in de_Res: savefile.write(datedata["date"][0:10] + ',' + str(datedata["adjOpen"]) + ',' + str(datedata["adjHigh"]) + ',' + str(datedata["adjLow"]) + ',' + str(datedata["adjClose"]) + "\n") savefile.close() filetime = time.localtime(os.path.getmtime(filepath)) nengappi = nengatsupi(filetime) print(kabu_code + ': ' + nengappi + ' ' + str(filetime[3]) + ':' + str(filetime[4])) else: print(de_Res) break
"Token"の後の"xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"には、あなたのTiinagoのアクセスキーをコピペしてください。
規定のアクセス回数を超えた場合、エラーが返ってきます。
{'detail': 'Error: You have run over your hourly request allocation. Please upgrade at https://api.tiingo.com/pricing to have your limits increased.'}
時間が経って使用回数が回復したらこのscriptを再度実行することで、前回までにデータをダウンロードして保存した証券コードをskipして、続きの銘柄のデータをダウンロードします。
並列処理を行う
さて、取得したデータを使ってデータ処理をしたりするわけですが、少しでも早く処理したいですよね。
そんなときは並列処理ですよ。
import csv from concurrent.futures import ThreadPoolExecutor def keisan(kabu_code): filepath = 'e:/data/his_data/' + kabu_code + '.csv' with open(filepath,'r') as datafile: (省略) with open('e:/data/code.csv',"r") as codecsv: code_lists = csv.reader(codecsv) with ThreadPoolExecutor(max_workers=10) as executor: for row in code_lists: executor.submit(keisan(row[1])) codecsv.close()
データ処理を行う"keisan"の中身は省略します。
"ThreadPoolExecutor"を使って、そのwithのブロック内で"submit"で起動された関数は、パラメータ"max_workers"で定められた数の範囲内で並列処理が行われます。そして、起動された並列処理が全て完了するまで withの中のブロックから次の処理へ進みません。ちゃんと終了するまで待ちます。
max_workersの数はご使用されるPCのCPUのスレッド数の範囲内がよろしいかと思います。私の場合、Ryzen 5なので6コアで計12スレッドなので10ぐらいを設定しています。
非同期処理でスクレイピングを行う
私の場合、計算処理よりもスクレイピングの方が時間が掛かります。
ならば、先の例のように並列処理をすれば良いのではないかと思いがちですが、それではhttps通信でレスポンスを待っている間、CPUが専有されてしまうためCPU使用効率が悪いのです。実はレスポンスを待っている間は休むという方法がpythonには具備されています。
で、非同期処理の方法は他所様の記事にもあるのですが、それらのコードをコピペすると、爆速過ぎて、対象のwebサイトからDoS攻撃認定され、結局、データを取得できなくなるということになります。
程よいアクセスをするにはどうすれば良いのか。
aiohttpのオプションに「aiohttp.TCPConnector(limit_per_host=n)」という項目があるので試しに設定してみたところ、接続数をlimit以下に制限してくれる設定かと思いきや、「limitを超えました」というエラーが出るだけの、思っていたのと違う挙動でした。
https://stackoverflow.com/questions/48483348/how-to-limit-concurrency-with-python-asyncioにナイスな回答があったので流用しました。
from bs4 import BeautifulSoup import csv import aiohttp import asyncio import sqlite3 def html_kaiseki(kabu_code, http_data): bs4ele = BeautifulSoup(http_data, 'html.parser') (省略) sqlcur.execute(sql_com) sqlcon.commit() async def web_get(kabu_code): async with aiohttp.ClientSession() as session: get_url = 'https://hogehoge.com/hogehoge.php?&code=' + kabu_code async with session.get(get_url) as response: http_data = await response.read() return html_kaiseki(kabu_code, http_data) async def run_def(kabu_codes): no_concurrent = 3 dltasks = set() for i in range(len(kabu_codes)): if len(dltasks) >= no_concurrent: # Wait for some download to finish before adding a new one _done, dltasks = await asyncio.wait(dltasks, return_when=asyncio.FIRST_COMPLETED) dltasks.add(asyncio.create_task(web_get(kabu_codes[i]))) # Wait for the remaining downloads to finish await asyncio.wait(dltasks) kabu_codes =[] sqlcon = sqlite3.connect('e:/data/db/kabu.sqlite') sqlcur = sqlcon.cursor() with open('e:/data/code.csv',"r") as codecsv: code_lists = csv.reader(codecsv) for row in code_lists: kabu_codes.append(row[1]) codecsv.close() asyncio.run(run_def(kabu_codes))
"async def rundef()"の部分が流用させてもらった部分です。同時接続数(no_concurrent)は元のママの"3"を設定していますが、私の環境ではこのscriptを使って同時接続数"4"で動かしたところDoS攻撃認定されたサイトがあるので、意外にこの程度が無難だと思います。それでも3倍速くなるわけですから。
最初の"html_kaiseki()"はsqlite3のデータベースに結果を書き込むので戻り値はありません。どういう順番でデータが得られるか分からないので、データベースを使っています。なお、株価の時系列データは、データベースよりCSVファイルのほうが読み込むのも書き込むのも確実に速いです。
わざわざ"html_kaiseki()"と"web_get()"を分けたのは、"web_get()"が"html_kaiseki()"を起動した時点で"web_get()"自体は終了したことになるため、run_def()が次の"web_get()"を起動するタイミングが早くなるのではないかと思ったからですが、実際にどれぐらい効果があるのかは不明です(ぉぃ)。
"html_kaiseki()"自体は非同期処理ではありません。しかし、この関数を呼び出す "web_get()"が非同期であるため、この関数は暗黙的に並列処理がなされます。それがコールバック関数の本来的な使い方だと思うのですが、各所のpythonのコールバック関数の解説では「returnで呼び出される関数」ぐらいの解説しかなく、そういう視点が欠けているものが散見されます。