【Python】運行情報をSlack API を使って通知するデーモンを作ってみた
前にPythonでYahoo路線情報から運行情報をスクレイピングで取得するアプリを作った
なので今回は運行情報が変わったときに、Slack APIで通知するデーモンを作ることにした
ざっくり機能まとめ
- 10分間隔でYahoo路線情報にアクセスして、スクレイピング
- 情報を監視する路線は複数対応可能
- 以下の状態でSlackに通知する
- トラブルを検知したとき
- トラブルが無くなったとき
- 情報が更新されたとき(掲載日時が変わったとき)
Pythonスクリプトの作成
普段Pythonを触らないので、命名規則やら標準関数・クラスに苦しみながら作ってみた
まずは、Yahoo路線情報からスクレイピングする機能を作成
#!/ust/bin/env python # -*- coding: utf-8 -*- from abc import ABCMeta, abstractmethod from bs4 import BeautifulSoup from datetime import datetime from urllib import request from my_util import logger, dict_to_str class Scraper(metaclass=ABCMeta): def __init__(self, url): self._url = url @abstractmethod def get_data(self): pass def __make_proxy_url(self): pass class JRService(Scraper): def __init__(self, url): super().__init__(url) def get_data(self): html = request.urlopen(self._url).read() soup = BeautifulSoup(html, 'lxml') # 路線情報 route_element = soup.find('div', {'id': 'main'}).find('div', {'class': 'mainWrp'}).find('div', {'class': 'labelLarge'}) route_name = route_element.find('h1', {'class': 'title'}).find(text=True, recursive=False) update_time_str = route_element.find('span', {'class': 'subText'}).find(text=True, recursive=False) update_time = self.__get_update_datetime(update_time_str) # 運行情報 status_element = soup.find('div', {'id': 'mdServiceStatus'}) information = status_element.find('p').find(text=True, recursive=False) posting_element = status_element.find('p').find('span') posting_data = self.__get_posting_datetime(posting_element) message = information + posting_data['datetime_str'] trouble_class = status_element.find('dd', {'class': 'trouble'}) in_trouble = False if trouble_class is not None: in_trouble = True data = { 'route_name': route_name, 'update_datetime': update_time, 'posting_datetime': posting_data['datetime'], 'message': message, 'in_trouble': in_trouble } logger.info(dict_to_str(data)) return data @staticmethod def __get_update_datetime(update_time_str): year = datetime.today().year convert_str = '%d年%s' % (year, update_time_str) update_time = datetime.strptime(convert_str, '%Y年%m月%d日 %H時%M分更新') return update_time @staticmethod def __get_posting_datetime(element): posting_date_str = '' posting_date = None if element is not None: posting_date_str = element.find(text=True, recursive=False) year = datetime.today().year convert_str = '%d年%s' % (year, posting_date_str) posting_date = datetime.strptime(convert_str, '%Y年(%m月%d日 %H時%M分掲載)') return { 'datetime_str': posting_date_str, 'datetime': posting_date }
次に周期的にスクレイピング処理を実行し、情報が変わっていたら、通知する処理を作成
#!/ust/bin/env python # -*- coding: utf-8 -*- from configparser import ConfigParser from datetime import datetime from scraping import JRService from threading import Timer from time import sleep from sys import exit from my_util import logger import requests config = ConfigParser() config.read('./.env') thread_time_min = config.getint('settings', 'watch_interval_min') thread_time_sec = thread_time_min * 60 services = config.items('jr_services') def push_slack_chat(name, message): url = config.get('settings', 'post_url') token = config.get('settings', 'token') channel = config.get('settings', 'channel') data = { 'token': token, 'channel': channel, 'username': name, 'text': message } requests.post(url, data=data) def scrape_service(): try: for key, url in services: scraper = JRService(url) data = scraper.get_data() last_posting_datetime_str = config.get('posting_datetime', key) last_posting_datetime = datetime.strptime(last_posting_datetime_str, '%Y-%m-%d %H:%M') last_in_trouble = config.getboolean('in_trouble', key) if data['in_trouble'] and (data['posting_datetime'] <= last_posting_datetime): continue elif (data['in_trouble'] and not last_in_trouble)\ or (last_in_trouble and not data['in_trouble'])\ or data['in_trouble'] and (data['posting_datetime'] > last_posting_datetime): push_slack_chat(data['route_name'], data['message']) in_trouble_str = 'True' if data['in_trouble'] else 'False' config.set('in_trouble', key, in_trouble_str) if data['in_trouble']: posting_datetime_str = data['posting_datetime'].strftime('%Y-%m-%d %H:%M') config.set('posting_datetime', key, posting_datetime_str) config.write(open('./.env', 'w')) except Exception as e: logger.error(e.with_traceback()) finally: thread = Timer(thread_time_sec, scrape_service) thread.start() if __name__ == '__main__': scrape_service() try: while True: sleep(0.1) except KeyboardInterrupt: exit(0)
全体的に日時の文字列をdatetime型に変換する方法がなんか冗長な気がする・・・
↓参考にならないかもしれないソースコードはコチラ
supervisorの設定
作成したPythonスクリプトをデーモン化するため、supervisorの設定を追記
ここで使用しているsupervisorのバージョンは2.1
ちなみに、Pythonスクリプトの配置場所は「/usr/local/bin/Train-Service-Information」
[program:train_service_information] command=bash -c '. /etc/profile; cd /usr/local/bin/Train-Service-Information; python -u main.py' autostart=true autorestart=true startsecs=10 startretries=3 log_stdout=true log_stderr=true logfile=/var/log/supervisor/train_service_information.log
commandで「. /etc/profile」や「cd /usr/local/bin/Train-Service-Information」をしているのは、環境変数がなかったり、カレントがsupervisor2.1で設定出来ないため
あと「python -u main.py」の【-u】オプションは、スクリプトのデバッグログが全然出力されないのを防ぐため、バッファリングを無効化するオプション
↓参考サイト
pythonで標準出力のバッファリングを無効にする - blog.mouten.info
実行結果
ちゃんと遅延が発生した時と遅延が収まった時にSlackに通知することが出来た。
次の機能について
Yahoo路線情報は情報の更新に大きめなタイムラグがある
ツイッターのつぶやきを解析して通知する機能を入れたほうがリアルタイムな感じになるかも