我想同步将一些 MySQL 表添加到 ElasticSearch 索引中,以便进行搜索。
但是几乎我能找到的每个例子都显示了一个大而粗的“SELECT * FROM”,作为例子来说这没问题,但我在生产中,我的一个表中接近 10M 行,所以我甚至不想每隔几分钟尝试一次“SELECT *”。
我已经习惯了 Sphinx,而且
UPDATE ... SET current_id = last_id, last_id = MAX(id);
SELECT * WHERE id > (SELECT current_id...);
这种策略。
我几乎可以,但仅仅是“几乎”,因为一个项目在其生命的最初几分钟内通常有很大的概率被修改,它将被索引为“新生儿”并保持原样。
因此,我可以想象更好的策略,比如在更新和创建时将主键存储在“索引”表中的触发器,河流变成
SELECT * FROM ... WHERE id IN (SELECT id ... FROM to_index)
从未尝试过,但看起来更好,至少是一个更好的起点。
还有删除行的问题......
但可能存在一些众所周知的策略,经过充分讨论和测试,我没有找到它们,我是不是错过了什么重要的东西?还是我是唯一一个试图避免SELECT *
数百万原始数据的人?
答案1
这篇文章readme
有点长,但其中有一节是关于这个的:
https://github.com/jprante/elasticsearch-river-jdbc#how-to-select-incremental-data-from-a-table
这个想法是保留每行变化的微秒级时间戳,并要求 ES 自上次河流运行以来进行查询:
{
"type" : "jdbc",
"jdbc" : {
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : [
{
"statement" : "select * from \"products\" where \"mytimestamp\" > ?",
"parameter" : [ "$river.state.last_active_begin" ]
}
],
"index" : "my_jdbc_river_index",
"type" : "my_jdbc_river_type"
}
}