もなかアイスの試食品

「とりあえずやってみたい」そんな気持ちが先走りすぎて挫折が多い私のメモ書きみたいなものです.

【Python】運行情報をSlack API を使って通知するデーモンを作ってみた

前にPythonでYahoo路線情報から運行情報をスクレイピングで取得するアプリを作った

monakaice88.hatenablog.com

あとPythonスクリプトをデーモン化させる方法もわかった

monakaice88.hatenablog.com

なので今回は運行情報が変わったときに、Slack APIで通知するデーモンを作ることにした

ざっくり機能まとめ

  • 10分間隔でYahoo路線情報にアクセスして、スクレイピング
  • 情報を監視する路線は複数対応可能
  • 以下の状態でSlackに通知する
    • トラブルを検知したとき
    • トラブルが無くなったとき
    • 情報が更新されたとき(掲載日時が変わったとき)

Pythonスクリプトの作成

普段Pythonを触らないので、命名規則やら標準関数・クラスに苦しみながら作ってみた

まずは、Yahoo路線情報からスクレイピングする機能を作成

#!/ust/bin/env python
# -*- coding: utf-8 -*-

from abc import ABCMeta, abstractmethod
from bs4 import BeautifulSoup
from datetime import datetime
from urllib import request
from my_util import logger, dict_to_str


class Scraper(metaclass=ABCMeta):
    def __init__(self, url):
        self._url = url

    @abstractmethod
    def get_data(self):
        pass

    def __make_proxy_url(self):
        pass


class JRService(Scraper):
    def __init__(self, url):
        super().__init__(url)

    def get_data(self):
        html = request.urlopen(self._url).read()
        soup = BeautifulSoup(html, 'lxml')

        # 路線情報
        route_element = soup.find('div', {'id': 'main'}).find('div', {'class': 'mainWrp'}).find('div', {'class': 'labelLarge'})
        route_name = route_element.find('h1', {'class': 'title'}).find(text=True, recursive=False)
        update_time_str = route_element.find('span', {'class': 'subText'}).find(text=True, recursive=False)

        update_time = self.__get_update_datetime(update_time_str)

        # 運行情報
        status_element = soup.find('div', {'id': 'mdServiceStatus'})
        information = status_element.find('p').find(text=True, recursive=False)

        posting_element = status_element.find('p').find('span')
        posting_data = self.__get_posting_datetime(posting_element)

        message = information + posting_data['datetime_str']

        trouble_class = status_element.find('dd', {'class': 'trouble'})
        in_trouble = False
        if trouble_class is not None:
            in_trouble = True

        data = {
            'route_name': route_name,
            'update_datetime': update_time,
            'posting_datetime': posting_data['datetime'],
            'message': message,
            'in_trouble': in_trouble
        }

        logger.info(dict_to_str(data))

        return data

    @staticmethod
    def __get_update_datetime(update_time_str):
        year = datetime.today().year
        convert_str = '%d年%s' % (year, update_time_str)
        update_time = datetime.strptime(convert_str, '%Y年%m月%d日 %H時%M分更新')
        return update_time

    @staticmethod
    def __get_posting_datetime(element):
        posting_date_str = ''
        posting_date = None
        if element is not None:
            posting_date_str = element.find(text=True, recursive=False)
            year = datetime.today().year
            convert_str = '%d年%s' % (year, posting_date_str)
            posting_date = datetime.strptime(convert_str, '%Y年(%m月%d日 %H時%M分掲載)')

        return {
            'datetime_str': posting_date_str,
            'datetime': posting_date
        }


次に周期的にスクレイピング処理を実行し、情報が変わっていたら、通知する処理を作成

#!/ust/bin/env python
# -*- coding: utf-8 -*-


from configparser import ConfigParser
from datetime import datetime
from scraping import JRService
from threading import Timer
from time import sleep
from sys import exit
from my_util import logger
import requests


config = ConfigParser()
config.read('./.env')
thread_time_min = config.getint('settings', 'watch_interval_min')
thread_time_sec = thread_time_min * 60
services = config.items('jr_services')


def push_slack_chat(name, message):
    url = config.get('settings', 'post_url')
    token = config.get('settings', 'token')
    channel = config.get('settings', 'channel')
    data = {
        'token': token,
        'channel': channel,
        'username': name,
        'text': message
    }
    requests.post(url, data=data)


def scrape_service():
    try:
        for key, url in services:
            scraper = JRService(url)
            data = scraper.get_data()

            last_posting_datetime_str = config.get('posting_datetime', key)
            last_posting_datetime = datetime.strptime(last_posting_datetime_str, '%Y-%m-%d %H:%M')

            last_in_trouble = config.getboolean('in_trouble', key)
            if data['in_trouble'] and (data['posting_datetime'] <= last_posting_datetime):
                continue
            elif (data['in_trouble'] and not last_in_trouble)\
                    or (last_in_trouble and not data['in_trouble'])\
                    or data['in_trouble'] and (data['posting_datetime'] > last_posting_datetime):

                push_slack_chat(data['route_name'], data['message'])

                in_trouble_str = 'True' if data['in_trouble'] else 'False'
                config.set('in_trouble', key, in_trouble_str)
                if data['in_trouble']:
                    posting_datetime_str = data['posting_datetime'].strftime('%Y-%m-%d %H:%M')
                    config.set('posting_datetime', key, posting_datetime_str)

                config.write(open('./.env', 'w'))

    except Exception as e:
        logger.error(e.with_traceback())
    finally:
        thread = Timer(thread_time_sec, scrape_service)
        thread.start()


if __name__ == '__main__':

    scrape_service()

    try:
        while True:
            sleep(0.1)
    except KeyboardInterrupt:
        exit(0)

全体的に日時の文字列をdatetime型に変換する方法がなんか冗長な気がする・・・

↓参考にならないかもしれないソースコードはコチラ

github.com

supervisorの設定

作成したPythonスクリプトをデーモン化するため、supervisorの設定を追記

ここで使用しているsupervisorのバージョンは2.1

ちなみに、Pythonスクリプトの配置場所は「/usr/local/bin/Train-Service-Information」

[program:train_service_information]
command=bash -c '. /etc/profile; cd /usr/local/bin/Train-Service-Information; python -u main.py'
autostart=true
autorestart=true
startsecs=10
startretries=3
log_stdout=true
log_stderr=true
logfile=/var/log/supervisor/train_service_information.log

commandで「. /etc/profile」や「cd /usr/local/bin/Train-Service-Information」をしているのは、環境変数がなかったり、カレントがsupervisor2.1で設定出来ないため

あと「python -u main.py」の【-u】オプションは、スクリプトデバッグログが全然出力されないのを防ぐため、バッファリングを無効化するオプション

↓参考サイト

pythonで標準出力のバッファリングを無効にする - blog.mouten.info

実行結果

f:id:monakaice88:20170207065231p:plain

ちゃんと遅延が発生した時と遅延が収まった時にSlackに通知することが出来た。

次の機能について

Yahoo路線情報は情報の更新に大きめなタイムラグがある

ツイッターのつぶやきを解析して通知する機能を入れたほうがリアルタイムな感じになるかも

あと気象庁XMLを受信し始めてからかなり時間が経っているので、いい加減利用しなければ・・・