Elasticsearch:如何“拯救”无法通过映射解析的文档?

Elasticsearch:如何“拯救”无法通过映射解析的文档?

我们使用 ElasticSearch 来存储和检查来自我们基础设施的日志。其中一些日志是法律要求的,我们不能丢失任何日志。

我们已经解析日志很长时间了,但没有任何映射。这使得它们几乎无法用于搜索和/或绘图。例如,一些整数字段已被自动识别为文本,因此我们无法将它们聚合到直方图中。

我们希望引入模板和映射,这将解决新索引的问题。

然而,我们注意到映射也会导致解析失败。如果某个字段被定义为整数,但突然获得非整数值,则解析将失败,文档将被拒绝。

这些文件有地方放吗?或者有什么方法可以保存它们以供日后检查?

下面的 Python 脚本与本地 ES 实例一起工作。

#!/usr/bin/env python3

import requests
import JSON
from typing import Any, Dict


ES_HOST = "http://localhost:9200"


def es_request(method: str, path: str, data: Dict[str, Any]) -> None:
    response = requests.request(method, f"{ES_HOST}{path}", json=data)

    if response.status_code != 200:
        print(response.content)


es_request('put', '/_template/my_template', {
    "index_patterns": ["my_index"],
    "mappings": {
        "properties": {
            "some_integer": { "type": "integer" }
        }
    }
})

# This is fine
es_request('put', '/my_index/_doc/1', {
    'some_integer': 42
})

# This will be rejected by ES, as it doesn't match the mapping.
# But how can I save it?
es_request('put', '/my_index/_doc/2', {
    'some_integer': 'hello world'
})

运行脚本出现以下错误:

{
    "error": {
        "root_cause": [
            {
                "type": "mapper_parsing_exception",
                "reason":"failed to parse field [some_integer] of type [integer] in document with id '2'. Preview of field's value: 'hello world'"
            }
        ],
        "type": "mapper_parsing_exception",
        "reason":"failed to parse field [some_integer] of type [integer] in document with id '2'. Preview of field's value: 'hello world'",
        "caused_by": {
            "type": "number_format_exception",
            "reason": "For input string: \"hello world\""
        }
    },
    "status": 400
}

然后文档就丢失了,至少看起来是这样。我可以在某处设置一个选项,让文档自动保存到其他地方,就像某种死信队列一样?

总结:我们需要映射,但又不能因为解析错误而丢失日志行。我们能自动将不符合映射的文档保存到其他地方吗?

答案1

事实证明,这很简单,只要允许“格式错误”的属性即可。有两种方法可以做到这一点。要么在整个索引上:

PUT /_template/ignore_malformed_attributes
{
  "index_patterns": ["my_index"],
  "settings": {
      "index.mapping.ignore_malformed": true
  }
}

或者按属性(参见此处的示例:https://www.elastic.co/guide/en/elasticsearch/reference/current/ignore-malformed.html

PUT my_index
{
  "mappings": {
    "properties": {
      "number_one": {
        "type": "integer",
        "ignore_malformed": true
      },
      "number_two": {
        "type": "integer"
      }
    }
  }
}

# Will work
PUT my_index/_doc/1
{
  "text":       "Some text value",
  "number_one": "foo" 
}

# Will be rejected
PUT my_index/_doc/2
{
  "text":       "Some text value",
  "number_two": "foo" 
}

请注意,您也可以更改现有索引的属性,但您需要先关闭它:

POST my_existing_index/_close
PUT my_existing_index/_settings
{
  "index.mapping.ignore_malformed": false
}
POST my_existing_index/_open

笔记:除非您刷新索引模式,否则类型更改不会在 kibana 中显示。然后您将遇到类型冲突,这需要您重新索引数据以再次搜索它……真麻烦。

POST _reindex
{
  "source": {
    "index": "my_index"
  },
  "dest": {
    "index": "my_new_index"
  }
}

https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html

答案2

对于相当多的用例来说,可能更可取的替代方法是将 logstash 放在生产者和 Elasticseaech 之间。logstash 可以重新格式化和/或检查并路由到特定索引。
或者当然,如果您有本地生产者,请让它们验证和路由。

相关内容