【Python】アクセスログ確認ツール作成

Python

 HPのアクセスログは、定期的に目視で確認していたのですが、DXするべきだと今頃気付き、確認ツールを作成しました。
 作った後でエクスプロイトコードを調べていたところ、アクセスログ確認ツールがたくさんあることに気づき…ました。せっかく作ったので、自戒の為に載せておきます。
 情報処理推進機構が公開している「iLogScanner」のリンクは、悔しいので一番下に貼ります。

● 環境

 OS Windows11 Home  
 Python 3.1.2
 確認ログファイル Apache 2.4のアクセスログ(標準設定)
(NginXのログも読めました)

● 実行結果

実行方法は次の通りです

■使い方
c:\ > py aclog.py [ファイル名] [オプション] [オプション引数]
実行するたびにカレントフォルダにアプリケーションログ(JSON)を作成します

例)オプションを指定しないで実行
c:\ > py aclog.py access_log_20240630
ログの先頭5行を表示します
[13/Jun/2024:01:11:27 +0900], 66.??.??.??, 304, /wordpress/wp-content/??
[13/Jun/2024:01:19:16 +0900], 85.??.??.??, 404, /????
[13/Jun/2024:01:51:18 +0900], 20.??.??.??, 404, /????
[13/Jun/2024:02:06:15 +0900], 66.??.??.??, 304, /wordpress/wp-content/??
[13/Jun/2024:04:34:31 +0900], crawl-66.??.??.??.????, 404, /wordpress/??

例)-allオプション
c:\ > py aclog.py access_log_20240630 -all
全てのログを表示します

例)-fオプション
c:\ > py aclog.py access_log_20240630 -f 66.??.??.??
対象のIPからのアクセスだけをフィルタリングして表示します
[13/Jun/2024:01:11:27 +0900], 66.??.??.??, 304, /wordpress/wp-content/??

例)-hostオプション
c:\ > py aclog.py access_log_20240630 -host
66.??.??.?? (1)
85.??.??.?? (1)
20.??.??.?? (4)
:
アクセスログ中のHOST一覧と、Host毎の接続回数を表示します

例)-eオプション
c:\ > py aclog.py access_log_20240630 -e
注意を要するアクセスログを表示します(一部のSQLインジェクション,悪用されやすいURL等を登録)
[13/Jun/2024:02:44:18 +0900], 19.??.??.??, 404, /????

● ソース

①設定ファイル(aclog.json)

{
    "SAFE_REFERER_URLS": [
        "https://tansunohazama.sakura.ne.jp/??/??"
    ],
    "SAFE_URLS": [
        "/??/??"
    ],
    "SAFE_IP": [
        "??.??.??.??" 
    ],
    "SAFE_STATUS": [
        "200"
    ],
    "EXPLOIT_CODES":[
        "%ad",
        "test.php",
        "admin.php",
        "up.php",
        "error_log.php",
        "info.php",
        "' OR "
    ],
    "LOG_PATTERN": "%h %l %u %t \"%r\" %>s %b \"%{Referer}i\" \"%{User-Agent}i\""
}

パラメータ補足
 SAFE_REFERER_URLS ログから除外するリファラー
 SAFE_URLS ログから除外するURL
 SAFE_IP ログから除外するIP(自身のIP,信頼性のあるBOT,問題ないと認めたIP等)
 SAFE_STATUS “200” ログから成功したアクセスを除外する場合に使用
 EXPLOIT_CODES 悪用されやすいものを記述、調べたいエクスプロイトがあれば追加してください

②実行ファイル(aclog.py

import argparse
import apache_log_parser
import pprint
import json

# コンフィグファイル名
CONFIG_FILE ='aclog.json'

# アプリログファイル名
APPLI_LOG ="aclog.log"

# JSON ファイルを読み込む
with open(CONFIG_FILE, 'r', encoding='utf-8') as f:
    config = json.load(f)

SAFE_REFERER_URLS = config['SAFE_REFERER_URLS']
SAFE_URLS = config['SAFE_URLS']
SAFE_IP = config['SAFE_IP']
SAFE_STATUS = config['SAFE_STATUS']
LOG_PATTERN = config['LOG_PATTERN']
EXPLOIT_CODES = config['EXPLOIT_CODES']

# 除外対象リファラー
def is_safe_referer_url(url):
    return any(safe_url in url for safe_url in SAFE_REFERER_URLS)

# 除外対象URL
def is_safe_url(url):
    return any(safe_url in url for safe_url in SAFE_URLS)

# 除外対象URL
def is_safe_ip(ip_address):
    return any(safe_ip in ip_address for safe_ip in SAFE_IP)

# 除外対象ステータスコード
def is_safe_status(status):
    return any(safe_status in status for safe_status in SAFE_STATUS)

# エクスプロイトコード疑いリスト
def contains_exploit_code(url):
    return any(exploit_code in url for exploit_code in EXPLOIT_CODES)

def parse_logs(file_name, show_all, filter_host, list_hosts, count_only, check_exploits):
    with open(file_name, 'r', encoding='shift-jis') as f:
        lines = f.readlines()

    line_parser = apache_log_parser.make_parser(LOG_PATTERN)
    res_list = []
    host_list = []
    host_count = {}

    try:
        for line in lines:
            try:
                parsed_line = line_parser(line)
                if filter_host and parsed_line['remote_host'] != filter_host:
                    continue
                if is_safe_referer_url(parsed_line['request_header_referer']):
                    continue
                if is_safe_ip(parsed_line['remote_host']):
                    continue
                if is_safe_status(parsed_line['status']):
                    continue
                if is_safe_url(parsed_line['request_url']):
                    continue
                if check_exploits and not contains_exploit_code(parsed_line['request_url']):
                    continue
                res_list.append(parsed_line)
                host_list.append(parsed_line['remote_host'])
                
                # ホスト毎のアクセス回数をカウント
                if parsed_line['remote_host'] in host_count:
                    host_count[parsed_line['remote_host']] += 1
                else:
                    host_count[parsed_line['remote_host']] = 1
            except apache_log_parser.LineDoesntMatchException:
                continue

        if count_only:
            print(f"表示されるログの件数: {len(res_list)}")
        
        elif list_hosts:
            unique_hosts = sorted(set(host_list))
            # host 一覧
            print("host の一覧(アクセス回数)\n")
            
            for host, count in sorted(host_count.items()):
                print(f"{host} ({count})")
        else:
            if show_all or filter_host or check_exploits:
                # すべてのログを表示
                print("ログ(全て)を表示します。\n")
                for item in res_list:
                	print(f"{item['time_received']}, {item['remote_host']}, {item['status']}, {item['request_url']}")
            else:
                # 最初の5件だけ表示
                print("ログ(先頭5件分)を表示します。\n")
                for item in res_list[:5]:
                	print(f"{item['time_received']}, {item['remote_host']}, {item['status']}, {item['request_url']}")
            
            for item in res_list:
                item['time_received'] = item['time_received_isoformat']
                item['time_received_datetimeobj'] = item['time_received_datetimeobj'].isoformat()
                item['time_received_tz_datetimeobj'] = item['time_received_tz_datetimeobj'].isoformat()
                item['time_received_utc_datetimeobj'] = item['time_received_utc_datetimeobj'].isoformat()

            # 結果をファイルに書き出す
            with open("parsed_log.json", 'w', encoding='utf-8') as f:
                json.dump(res_list, f, ensure_ascii=False, indent=4)
            print("\n-- -- -- -- -- -- -- -- -- --\n出力結果をファイルに出力しました parsed_log.json")

    except Exception as e:
        print(f"エラーが発生しました: {e}")

def main():
    # 'aclog.log' ファイルは毎回空にする
    with open(APPLI_LOG, 'w', encoding='utf-8') as f:
        f.write("")

    parser = argparse.ArgumentParser(description='aplog [Access Log チェッカー] v0.01')
    
    parser.add_argument('file_name', type=str, help='アクセスLogのファイル名を指定してください')
    parser.add_argument('-all', action='store_true', help='パーサしたログを全て表示します(表示に時間がかかります)')
    parser.add_argument('-f', type=str, help='対象のリモートホストで表示を絞り込みます')
    parser.add_argument('-host', action='store_true', help='リモートホストだけの一覧を表示します')
    parser.add_argument('-c', action='store_true', help='表示されるログの件数を表示します')
    parser.add_argument('-e', action='store_true', help='注意を要するリクエストの一覧を表示します')

    args = parser.parse_args()
    parse_logs(args.file_name, args.all, args.f, args.host, args.c, args.e)

if __name__ == '__main__':
    main()

久しぶりにPythonのプログラムを作成しましたが、ソースはPythonが一番書きやすいですね。
最近はPython書く人が多いようで、DOCやライブラリも充実しているので助かります。
Jupyter Notebook で骨格を作っておいて、コピペでソースプログラムにしています。

●実行方法

 ソースとconfigファイルを同じフォルダに置けば実行できます。
 apache_log_parserを使用したので、入っていなければ次のコマンドでインストールしてください

apache_log_parser インストール方法

■Jupyter Notebook
 !pip install apache_log_parser
 でインストールできます

■コマンドプロンプト
 pip install apache_log_parser
 でインストールできます

● 業務で使うならiLogScanner

 ここまで作って、エクスプロイトコードを収集していたところ、情報処理推進機構が攻撃兆候検出ツール iLogScannerを作っていることに気づきました。(ハゥ
 せっかく作ったので私はaclog.pyの方を推しますが、実際仕事で使うならiLogScannerにはかないません。
 悔しいので、IPAのリンクは一番下に貼ります。

 情報処理推進機構 iLogScanner https://www.ipa.go.jp/security/vuln/ilogscanner/index.html
   ※実行にはJavaランタイムが必要です

  javaで作っているんですね、しかもApplet.classって・・・
  懐かしい、久しぶりに見ました。

iLogScanner 実行画面