OSS+ Compass

【OSSチャレンジ】第4回 Elasticsearch紹介

2017.05.11

OSS

はじめに

毎年40%以上のペースで増加し、2020年までには50倍にまで増加すると予想されている世界のデータ量。検索エンジンは増え続ける膨大で多様なデータから、ユーザが有益で理解しやすい情報に素早くアクセスする手助けをしており、今後ますますその重要性を増すことと思います。

今回は、そのような検索エンジンの中でも、FacebookやGitHub、Softbankといった著名企業が導入し、商用を含めた検索エンジン業界で最も人気を集めているオープンソースの全文検索エンジン「Elasticsearch(エラスティックサーチ)」についてご紹介します。

Elasticsearchとは

Elasticsearchはドキュメント指向型検索エンジンであり、投入されるデータはJSON形式なのでデータ構造の柔軟性に富んでいます。

また拡張性に優れた設計となっており、マシン1台の場合でも1台構成のクラスタとして起動し、Elasticsearch が起動しているサーバを追加すれば、既存データの分散、レプリカなどが自動で実行されます。

そんなElasticsearchの有用性を少しでも体験するために、今回は弊社が管理しているOpenStackサーバのsshアクセスログを使って、sshによる不正アクセス数やアクセス元の地域情報の分析、集計を行い、ダッシュボードで可視化するところまで行ってみたいと思います。

また本稿はその前編として、Elasticsearchの利用環境を簡単に準備できるElastic Cloudの無料トライアル版を使って、まずはElasticsearchがどのようなものか体験してみることに注力したいと思います。

全体の構成としては以下のとおりとなります。

・Kibana・・・検索結果の可視化、ダッシュボードの作成
・Elasticsearch・・・データの蓄積、検索
・Logstash・・・データの収集、加工、転送

Elastic Cloudの概要

Elastic Cloudとは、AWS(Amazon Web Services)で利用可能な、マネージド型Elasticsearchのことで、GUI上で簡単にクラスタの作成やサイズ変更などが出来る他、Elasticsearchに格納されているデータをグラフや表で可視化できるツールであるKibanaと連携してデータの可視化も行うことができます。つまり、上記のイメージで示すElasticsearchとKibanaの部分をAWS上で実現するマネージドサービスなのです。

Elastic Cloudの登録

それではさっそくこちらからElastic Cloudの登録をはじめます。

①「トライアル」をクリックします。

②メールアドレスを入力し、「トライアルを始める」をクリックします。

③下記のメールが来るので、「Verify Email and Accept TOS」をクリックします。

④ログインパスワードを設定して、「Set」をクリックします。

⑤クラスタを新規作成します。

⑥クラスタのサイズやデータセンターの数、バージョンなどを設定します。今回はトライアル版なので、デフォルトの設定のまま、画面最下部のName欄にクラスタの名前を入力し、「Create」をクリックします。

⑦下記の画面が出てきてパスワードが表示されるので、コピーして控えておきます。

⑧サイドメニュー欄、「Configuration」をクリックします。

⑨「Kibana」をクリックして有効化し、画面最下部にある「Update」をクリックします。

⑩サイドメニューの「Overview」からElasticsearchのURLを確認します。(後ほどLogstashの設定ファイル作成の際に使用します。)

これでElastic Cloudの登録は完了です。

Logstashの準備

さきほどElastic Cloudで作成したElasticsearchに転送するデータを作成するために、ログ収集管理ツールであるLogstashを用いてデータの収集、加工を行います。

まず、こちらからLogstashをダウンロードしてみます。今回はVersion 5.3.0を使います。

設定ファイルの作成

それではLogstashを動かすための設定ファイルを作成していきます。設定ファイルは大きく分けて下記の3つの部分に分けられます。

① input・・・Logstashで加工するデータの入力元を指定する

② filter・・・①で入力されたデータの加工方法を指定する

③ output・・・②で加工されたデータの出力先を指定する

まずは①のinput部分を作成します。今回Logstashに入力するデータはOpnestackサーバのアクセスログなので、アクセスログファイル(input_file.txt)を指定します。

file{
  path => "C:\logstash-5.3.0\input_file.txt"
 }
}

次に②のfilter部分を作成します。今回用いるのはログに記述されているIPアドレスやタイムスタンプといった特定部分を取り出すgrokフィルタと、grokフィルタによって取り出したIPアドレスからアクセス元の地域情報を取得するgeoipフィルタです。

filter{
 grok{
  match =>{
   "message" =>'%{HTTPDATE:date} %{IP:ip}'
  }
 }
 geoip{
  source => "ip"
}
}

なお、filter設定時に用いたgrokフィルタで記述しているパターンに合わせるために、pythonで変換コードを作成してsshのアクセスログから不正なアクセスを示すログだけを抜き出して上記のパターンに整形しています。

【整形前(抜粋)】

Feb 23 06:48:30 ML330_1 sshd[16952]: Invalid user tomcat from 61.31.xx.xx
Feb 23 06:48:30 ML330_1 sshd[16952]: input_userauth_request: invalid user tomcat [preauth]
Feb 23 06:48:30 ML330_1 sshd[16952]: Received disconnect from 61.31.xx.xx: 11: Bye Bye [preauth]
Feb 23 06:48:31 ML330_1 sshd[16989]: Received disconnect from 61.31.xx.xx: 11: Bye Bye [preauth]
Feb 23 06:48:35 ML330_1 sudo:  cinder : TTY=unknown ; PWD=/ ; USER=root ; COMMAND=/bin/cinder-rootwrap /etc/cinder/rootwrap.conf env LC_ALL=C vgs --noheadings --unit=g -o name,size,free,lv_count,uuid --separator : --nosuffix cinder-volumes
Feb 23 06:48:35 ML330_1 sudo:  cinder : TTY=unknown ; PWD=/ ; USER=root ; COMMAND=/bin/cinder-rootwrap /etc/cinder/rootw

【整形後(抜粋)】

23/Feb/2017:06:48:30 -0700 61.31.xx.xx
23/Feb/2017:06:48:49 -0700 61.31.xx.xx
23/Feb/2017:06:49:00 -0700 61.31.xx.xx
23/Feb/2017:06:49:08 -0700 61.31.xx.xx
23/Feb/2017:06:49:08 -0700 61.31.xx.xx

また、grokフィルタにはさまざまなパターンが用意されており、それらを自由に組み合わせてさまざまなデータを取得することができます。詳細はこちらをご確認ください。

次にタイムスタンプの設定を行います。Logstashでは特に指定がない限りLogstashにデータを入力した日時がタイムスタンプに設定されます。リアルタイムに入力する場合はそれでもよいのですが、今回は過去のデータを入力するため、filter設定部分にdateフィルタの設定を追加し、アクセスログに記載されているアクセス日時がタイムスタンプに設定されるようにします。

filter{
 grok{
  match =>{
   "message" =>'%{HTTPDATE:date} %{IP:ip}'
  }
 }
 date {
  match => ["date","dd/MMM/YYYY:HH:mm:ss Z"]
  locale => jp

 }
 geoip{
  source => "ip"
}
}

最後に③output部分を作成します。今回はElasticsearchに出力するので、Elastic Cloudで作成したElasticsearchのURLやID、パスワードを設定します。

output {
    stdout { codec => dots }
    elasticsearch {
        action => "index"
        hosts => ["ElasticsearchのURL"]
        index => "sshlog-%{+YYYY.MM.dd}"
        user => "elastic"
        password => "password"
    }
}

Logstashの動作確認

output部分に標準出力を設定してfilter設定の動作確認を行ってみます。Logstash-5.3.0に移動して下記のコマンドを実行します。作成した設定ファイル名をlogstash.confとしています。

【Logstash実行例】

bin\logstash -f logstash.conf

【Logstash実行結果(抜粋)】

{
          "path" => "C:\\logstash-5.3.0\\input_file.txt",
    "@timestamp" => 2017-02-22T16:28:09.000Z,
         "geoip" => {
             "city_name" => "Hanoi",
              "timezone" => "Asia/Ho_Chi_Minh",
                    "ip" => "123.31.xx.xx",
              "latitude" => 21.0333,
         "country_code2" => "VN",
          "country_name" => "Vietnam",
        "continent_code" => "AS",
         "country_code3" => "VN",
           "region_name" => "Thanh Pho Ha Noi",
              "location" => [
            [0] 105.85,
            [1] 21.0333
        ],
             "longitude" => 105.85,
           "region_code" => "64"
    },
}

期待どおり、入力したアクセスログからアクセス日時とアクセス元の地域情報が取得出来ています。これは、ベトナムの首都ハノイからの不正アクセスのようですね。

マッピング定義

マッピング定義とは、Elasticsearchに投入されるデータのフィールドやデータ型を定義することです。(リレーショナルDBのテーブル定義にあたります)

またその他にも検索精度(部分一致/完全一致など)の各種設定も行うことができます。Elasticsearchはスキーマレスであることが特徴の一つであり、基本的には自動でデータ型や解析設定をしてくれるのですが、データ型が誤っていたり、最適な検索精度ではなかったりする場合があるので、今回はマッピング定義することによって明示的に上記の設定を行います。

まずは「Overview」を選択後、KibanaのURLをクリックしてKibanaにログインします。

user: elastic
pw: Elastic Cloudの登録の⑦で控えたパスワード
次に、「Dev Tools」を選択し画面左側の欄にマッピング定義を記述します。
今回は下記のマッピング定義を行っています。まず、フィールドに対してテンプレート化を定義するダイナミックテンプレートを用いて、string型のフィールドをまとめて形態素解析を行わない設定にしています。

また、今回は不正アクセス元の地域情報を分析するので、Kibanaにおいて地図上でデータを可視化するための特殊なデータ型であるgeo_point型を”geoip.location”フィールドに定義します。

PUT _template/sshlog-template
{
  "template" : "sshlog-*",
  "mappings": {
    "logs": {
      "dynamic_templates" : [
        {
          "my_multi_strings" : {
            "match_mapping_type" : "string",
            "mapping" : {
              "type" : "keyword"
            }
          }
        }
      ],
      "properties": {
        "geoip.location": {
          "type": "geo_point"
        },
        "timeRange":{
          "type": "integer"  
        }
      }
    }
  }
}

最後にマッピング定義記述部分の右上にある実行ボタンをクリックします。画面右側に”acknowledged”:trueと表示されればマッピング定義は完了です。なお、マッピング定義内にシンタックスエラー等があるとこの時点でエラー内容が表示されます。

データの転送

それではいよいよLogstashを実行してElasticsearchにデータを転送します。logstash.confのoutput部分にElasticsearchを設定しているので、下記のコマンドを実行することでElasticsearchにデータが転送されます。

【Logstash実行例】

bin\logstash -f logstash.conf

Kibanaの設定

データの転送が完了したところでKibanaの設定を行います。まずインデックスパターンの設定を行います。「Management」を選択後、「Index name or pattern」にlogstash.confのoutput部分で記述したインデックス名、もしくは正規表現を含むインデックスパターンを入力します。

次に、「Time-field name」に時間の参照先となるフィールドを指定します。date型のフィールドがプルダウンで選択できるようになっているので、今回は@timestampを選択し、最後に「Create」ボタンをクリックします。

インデックスのフィールドごとの設定一覧が表示されます。”geoip.location”フィールドのデータ型がgeo_pointになっていることが確認できます。

次に可視化するデータの期間を設定します。「Discover」を選択して、画面右上の時計のマークをクリックします。

今回は2017年2月20日から2017年3月19日までの期間のアクセスログなので、設定方法として「Absolute」を選択後、日付設定部分で上記の期間を設定し「Go」をクリックします。

設定した期間の不正アクセス数が棒グラフで表示されました。

Visualizationの作成

ここまでのKibanaの設定でデータの形式や検索、分析対象期間の設定が完了しましたので、あとは検索、分析した結果をどのような表現形式で可視化するのかを設定します。

「Visualize」を選択後、「Create a visualization」をクリックします。

 

まずは、時系列で不正アクセス数の推移を確認したいので、「Line chart」を選択します。

 

「Name」で対象のインデックス名、あるいはインデックスパターンを選択します。

 

不正アクセス数を縦軸に、日付を横軸にしたグラフを作成したいので、縦軸を設定する「Y-Axis」のAggregation欄に「Count」を選択し、横軸を設定する「X-Axis」のAggregation欄に「Date Histogram」、Field欄に「@timestamp」、Interval欄は「Daily」を選択します。

最後に右矢印ボタンをクリックして設定を反映させます。

 

画面右側に不正アクセス数を示す時系列折れ線グラフが表示されました。また、画面下中央の矢印ボタンをクリックすると横軸で指定した単位毎(今回の設定の場合は日毎)のカウント数がリスト形式で追加表示されます。

 

最後に、画面上部の「Save」を選択し、名前を付けてSaveボタンをクリックすると作成したVisualizeの保存が完了です。

 

それでは続いて不正アクセス元の地域情報を分析するために、「Vertical bar chart」を選択します。

 

「Y-Axis」のAggregation欄に「Count」を選択し、「X-Axis」のAggregation欄に「Terms」(Field欄で指定する単語ごと)、Field欄に「geoip.country_name」(国名)、Order欄は「Descendig」(降順)、Size欄は「20」(20番目まで)とします。

 

今回使用したアクセスログでは、2位のベトナムにダブルスコアの差をつけて中国からの不正アクセスが最も多いという結果になりました。なお、Field欄を「geoip.city_name」(都市名)に変更すると、都市ごとのグラフを作成することもできます。

それでは今度は不正アクセス元の地域やアクセス数をさらに視覚的に分かりやすくするために、地図上に表示してみましょう。「Tile map」を選択します。

 

「Geo Coordinates」を選択して、Aggregation欄には「Geohash」、Field欄には「geoip.location」が選択されていることを確認して矢印ボタンをクリックします。

地図上に不正アクセス元の地域とアクセス数が表示されました。やはり中国、ベトナムあたりからのアクセスが特に多いことが分かります。

ダッシュボードの作成

最後に、作成したVisualizationの配置を行いダッシュボードを作成します。「Dashboard」を選択し、「Create a dashboard」をクリックします。

 

下記の画面が表示されるので、文中の「Add」をクリックします。

 

選択したVisualizationがダッシュボードに追加されるので、サイズや表示位置を自由に変更しながらレイアウトを調整します。なお、各Visualizationの設定を変更した場合、ダッシュボード上に配置したVisualizationにも結果が自動的に反映されます。

 

おわりに

今回はElastic Cloudを使ってElasticsearchをどのようにして使うのかという点を中心に紹介しました。

次回は実際にElasticsearchのサーバ環境を一から構築する過程を紹介しながら、どのようにして環境を構築していくか、という点を中心に紹介したいと思います。

ここまでお付き合いいただきありがとうございました。

文:坂部公彦

参考サイト

http://blog.johtani.info/blog/2014/11/21/import-apache-accesslog-using-logstash/

https://medium.com/hello-elasticsearch/elasticsearch-9a8743746467

https://www.skyarch.net/blog/?p=9709

最新記事

Category

Old Blogs