PHPからElasticsearchへbulkインポートする

PHPからElasticsearchへデータを投入するサンプルがググってもサクっと見つからなかったので ドキュメントなどを見ながら実装してみたメモ。

ライブラリは本家のPHP用クライアントライブラリを使用。

http://www.elasticsearch.org/guide/en/elasticsearch/client/php-api/current/

composer.jsonにこんな感じで追加。

{
    "require": {
        "elasticsearch/elasticsearch": "1.2.*"
    }
}

Elasticsearch\Clientクラスのbulkメソッドには以下のようなコメントが記述してあるのですが、

    /**
     * $params['index']       = (string) Default index for items which don't provide one
     *        ['type']        = (string) Default document type for items which don't provide one
     *        ['consistency'] = (enum) Explicit write consistency setting for the operation
     *        ['refresh']     = (boolean) Refresh the index after performing the operation
     *        ['replication'] = (enum) Explicitly set the replication type
     *        ['body']        = (string) Default document type for items which don't provide one
     *
     * @param $params array Associative array of parameters
     *
     * @return array
     */
    public function bulk($params = array())

ElasticsearchのbulkAPIは、ほかのAPIとは違い、送信データ本体に以下のような配列を与える必要があるので、
[インデックス、タイプ、ID],
[データ本体],
.....

$es = new \Elasticsearch\Client([
    'hosts' => ['localhost:9200'],
]);
$params = ['index' => '', 'type' => '', 'body' => []];
for(....){
    $index = 'logstash-' . $date->format('Y.m.d');
    $type = ....;
    $id = ....;
    $params['body'][] = ['index' => ['_index'=>$index, '_type' => $type, '_id' => $id]];
    $params['body'][] = $log;
}
$res = $es->bulk($params);
if($res['errors']){
    throw new Exception('elasticsearch import error');
}

こんな感じで、bodyにデータをセットしてbulkAPIを呼ぶ必要があるようです。

こんな感じでデータベース上に保持してあるログデータをcronでelasticsearchに投入してカジュアルに集計、可視化できる環境を構築中。