从 MySQL 到 ElasticSearch 的嵌套对象

从 MySQL 到 ElasticSearch 的嵌套对象

我是 ES 新手,正在尝试使用 logstash jdbc 将数据从 MYSQL 加载到 Elasticsearch。

在我的情况下,我想使用列值作为字段名称,请查看输出数据中的新值和十六进制值,我想要“id”值作为字段名称。

Mysql 数据

cid    id       color      new     hex      create            modified
1      101     100 euro    abcd   #86c67c  5/5/2016 15:48   5/13/2016 14:15
1      102     100 euro    1234   #fdf8ff  5/5/2016 15:48   5/13/2016 14:15

需要输出

{
  "_index": "colors_hexa",
  "_type": "colors",
  "_id": "1",
  "_version": 218,
  "found": true,
  "_source": {
    "cid": 1,
    "color": "100 euro",
    "new" : {
            "101": "abcd",
            "102": "1234",
        }
    "hex" : {
            "101": "#86c67c",
            "102": "#fdf8ff",
        }
    "created": "2016-05-05T10:18:51.000Z",
    "modified": "2016-05-13T08:45:30.000Z",
    "@version": "1",
    "@timestamp": "2016-05-14T01:30:00.059Z"
  }
}

Logstash 配置:

input {
 jdbc {
   jdbc_driver_library => "/etc/logstash/mysql/mysql-connector-java-5.1.39-bin.jar"
   jdbc_driver_class => "com.mysql.jdbc.Driver"
   jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/test"
   jdbc_user => "root"
   jdbc_password => "*****"
   schedule => "* * * * *"

   statement => "select cid,id,color, new ,hexa_value ,created,modified from colors_hex_test order by cid"
   jdbc_paging_enabled => "true"
   jdbc_page_size => "50000"
}
}

   output {
    elasticsearch {
        index => "colors_hexa"
        document_type => "colors"
        document_id => "%{cid}"
        hosts => "localhost:9200"
    }
}

有人能帮忙过滤这些数据吗?这里的问题在于“新”和“十六进制”字段。我正在尝试将两个记录转换为单个文档。

答案1

您正在寻找聚合过滤器。他们的一个示例明确针对您在此处寻找的 JDBC 用例(参见示例 4)。

由于 JDBC 输入是计划操作,因此您可以设置聚合过滤器以合并在短时间内到达的所有事件。比如说,10 秒。JDBC 输入提取的所有行将以非常紧密的分组到达,并且最终应该合并。

此过滤器比其他过滤器更复杂,因为您必须编写 ruby​​ 代码来处理您要查找的字段连接。但它应该能够做到这一点。

相关内容