Ingest 节点的基础原理,是:节点接收到数据之后,根据请求参数中指定的管道流 id,找到对应的已注册管道流,对数据进行处理,然后将处理过后的数据,按照 Elasticsearch 标准的 indexing 流程继续运行。
curl -XPUT http://localhost:9200/_ingest/pipeline/my-pipeline-id -d '
{
"description" : "describe pipeline",
"processors" : [
{
"convert" : {
"field": "foo",
"type": "integer"
}
}
]
}'
curl -XPUT http://localhost:9200/_ingest/pipeline/_simulate -d '
{
"pipeline" : {
"description" : "describe pipeline",
"processors" : [
{
"set" : {
"field": "foo",
"value": "bar"
}
}
]
},
"docs" : [
{
"_index": "index",
"_type": "type",
"_id": "id",
"_source": {
"foo" : "bar"
}
}
]
}'
Ingest 节点的处理器,相当于 Logstash 的 filter 插件。事实上其主要处理器就是直接移植了 Logstash 的 filter 代码成 Java 版本。目前最重要的几个处理器分别是:
{
"convert": {
"field" : "foo",
"type": "integer"
}
}
{
"grok": {
"field": "message",
"patterns": ["my %{FAVORITE_DOG:dog} is colored %{RGB:color}"]
"pattern_definitions" : {
"FAVORITE_DOG" : "beagle",
"RGB" : "RED|GREEN|BLUE"
}
}
}
{
"gsub": {
"field": "field1",
"pattern": "\.",
"replacement": "-"
}
}
{
"date" : {
"field" : "initial_date",
"target_field" : "timestamp",
"formats" : ["dd/MM/yyyy hh:mm:ss"],
"timezone" : "Europe/Amsterdam"
}
}