当前位置 : 首页 » 文章分类 :  开发  »  Elasticsearch-Api-文档操作

Elasticsearch-Api-文档操作

Elasticsearch-Api-文档操作

Index 里面单条的记录称为 Document(文档)。许多条 Document 构成了一个 Index。


读写文档

Elasticsearch Guide [7.17] » REST APIs » Document APIs » Reading and Writing documents
https://www.elastic.co/guide/en/elasticsearch/reference/7.17/docs-replication.html

基本写模型

1、协调阶段(coordinating):根据路由规则将文档路由到主分片
2、主分片处理阶段(primary):验证文档,在主分片执行操作,转发到 in-sync 副本


/index/_doc 创建文档

https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html

API

POST /<target>/_doc/           自动生成_id创建文档

PUT /<target>/_doc/<_id>       指定_id创建文档
PUT /<target>/_create/<_id>    只有当_id对应的文档不存在时才创建,否则报错
POST /<target>/_create/<_id>   只有当_id对应的文档不存在时才创建,否则报错

PUT /index/_doc/_id 指定_id创建文档

向指定的 /Index/Type/ID 发送 PUT 请求,就可以在 Index 里面新增一条记录。
ID 是调用方指定的唯一ID,如果已存在,则会完全替换更新文档并增加其版本 version
**在 ElasticSearch 7.0 及以上的版本中已经把 type 这个概念了,统一用 “_doc” 这个占位符来表示 “_type”**,你可以把 _type 看作是文档就行了,相当于 ElasticSearch 7.0 及以上版本只有索引和文档这两个概念了。

curl -X PUT 'http://localhost:9200/article/_doc/1' \
-H 'Content-Type: application/json' \
-d '{
    "title":"文章的标题",
    "pathname":"/article/postlink",
    "content":"美国留给伊拉克的是个烂摊子吗"
}'

返回

{
    "_index": "article",
    "_type": "_doc",
    "_id": "1",
    "_version": 1,
    "result": "created",
    "_shards": {
        "total": 2,
        "successful": 1,
        "failed": 0
    },
    "_seq_no": 0,
    "_primary_term": 1
}

POST /index/_doc 自动生成_id创建文档

新增记录的时候,也可以不指定 id,这时要改成 POST 请求。
向指定的 /Index/Type 发送 POST 请求,可以在 Index 里面新增一条记录,系统会自动生成唯一ID。

curl --location --request POST 'http://localhost:9200/article/_doc' \
--header 'Content-Type: application/json' \
--data-raw '{
    "title":"es的使用",
    "pathname":"/article/es",
    "content":"新增记录的时候,也可以不指定 Id,这时要改成 POST 请求。"
}'

返回

{
    "_index": "article",
    "_type": "_doc",
    "_id": "uv_ZjXEBrN9oq5tgVMuj",
    "_version": 1,
    "result": "created",
    "_shards": {
        "total": 2,
        "successful": 1,
        "failed": 0
    },
    "_seq_no": 0,
    "_primary_term": 1
}

返回的 _id 是自动生成的唯一 id


refresh 刷新操作

https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-refresh.html

Elasticsearch 为了提高写入性能,会将文档写入操作(创建、更新、删除)的相关改变暂存到内存中的一个缓冲区里,然后在后台周期性地将这些改变刷新(refresh)到硬盘上的索引文件中。只有刷新操作完成后,这些改变才对搜索操作可见。

refresh 参数就是用来控制这个刷新操作的:

  • false 默认值,不执行刷新操作,这次写入的改变会在下次周期性刷新时被应用(1秒钟间隔)。
  • true 立即执行刷新操作,使得这次写入的改变对搜索操作立即可见。
  • wait_for 等待直到这次写入的改变被刷新并对搜索操作可见,es内部自动刷新默认是1秒钟间隔

频繁执行刷新操作会对 Elasticsearch 的性能产生影响,因此在大量写入操作时,通常是使用默认设置(false),即在后台周期性地执行刷新。
只有在某些需要改变立即对搜索可见的场景下,才会设置 refresh 参数为 true 或者 wait_for。


version 外部版本号

https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html#index-versioning
ES 允许不使用内置的 version 进行版本控制,可以自定义使用外部的 version,此时将 version_type 设为 external,此时可传入一个大于 0 小于 9.2e+18 的 long 型 version 参数。
使用外部版本号时,只有当你提供的 version 比当前文档的 _version 大的时候,才能完成修改(包括删除)。

例如常见的双写方案,MySQL 和 ES 各存一份数据,ES 用于加速查询,此时可以将 version 维护在 MySQL 中。
例如:

PUT my-index-000001/_doc/1?version=2&version_type=external
{
  "user": {
    "id": "elkbee"
  }
}

POST /index/_update/_id 指定id更新文档

Elasticsearch Guide [7.16] » REST APIs » Document APIs » Update API
https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html

POST /<index>/_update/<_id>
之前版本的api是 POST /index/_doc/_id/_update

通过脚本更新文档。脚本可更新、删除或跳过文档。更新 API 还支持传入一部分文档内容,最终合并到已存在文档中。如果想完全替换更新已存在文档,使用带 ID 的文档创建 API 即可。

update API 实现的逻辑中,其实可以理解为三步操作:
1、qeury:通过文档 ID 去 GET 文档,此时可获取文档的 _version 版本
2、update:根据 script 脚本来更新 document;
3、reindex:将更新后的 document 重新写回到索引,

如果在 GET 和 Reindex 期间,文档被更新,_version 值发生变化,则更新失败。可以使用 retry_on_conflict 参数来设置当发生更新上述情况更新失败时,自动重试的次数。retry_on_conflict 的默认值为0,即不重试。

因此,ES 的 update API 依然是需要对文档做一次完全的 reindex 操作,而不是直接去修改原始document。但 update API 所能做的是减少了网络交互次数,当然这比起我们自己通过index获取数据并在业务代码中更新再写回到ES来实现,大大的减少了版本冲突的概率。

在遇到版本冲突问题时,ES 将会返回 409 Conflict HTTP 错误码。因此,当遇到 409 后,为了保证数据的最终插入,我们就必须要考虑到 retry 机制。为了实现冲突后的retry,有两种方案来实现:
1、业务代码自定义
通过识别 409 错误,在业务代码中,跟据自己的需求来进行 retry。因为是自定义的逻辑,所以我们可以任意的操作 retry 的回退策略,以及 retry 的内容等;
2、retry_on_conflict
通过在参数中指定来实现 retry_on_conflict 来实现

script 脚本更新

将文档 1 的 counter 值加4

POST test/_update/1
{
  "script" : {
    "source": "ctx._source.counter += params.count",
    "lang": "painless",
    "params" : {
      "count" : 4
    }
  }
}

doc 文档部分值更新

更新文档 1 的 name 值:

  • 如果原来 name 没有值或者没有 name 字段,会新增 name 字段
  • 如果 name 字段有值,会更新 name 字段的值
    POST test/_update/1
    {
    "doc": {
      "name": "new_name"
    }
    }
    

POST /index/_update_by_query 根据查询更新

Elasticsearch Guide [7.16] » REST APIs » Document APIs » Update By Query API
https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html

conflicts=proceed 版本冲突时继续处理

提交 _update_by_query 请求后,Elasticsearch 先获取索引数据的当前快照,然后使用 内部版本 _version 更新和 query 匹配的文档:

  • 如果 _version 能匹配则更新文档,然后增加 _version 版本号。
  • 如果在建立快照和更新当前文档之间文档被更新,会出现 _version 不匹配,导致版本冲突,更新操作失败。可以将 conflicts 设为 proceed 在冲突时继续处理,response 返回冲突个数。

处理 _update_by_query 请求时,Elasticsearch 内部进行多批次 search 查询请求来匹配满足条件的文档,然后在每一批匹配的文档上执行一次 bulk 更新请求。

conflicts 参数,遇到版本冲突时如何处理:

  • abort 默认值,报错
  • proceed 继续处理,返回 response 中有冲突文档个数。

ignore_unavailable=true 索引不存在时不报错

POST /my_index/_update_by_query?ignore_unavailable=true

Java 代码中:

UpdateByQueryRequest request = new UpdateByQueryRequest(index);
request.setIndicesOptions(IndicesOptions.lenientExpandOpen()); // 索引不存在时不报错

/_update_by_query 可用于热更新ik词库后重建索引

POST /my_index/_update_by_query?conflicts=proceed
可用于 ik 词库热更新后,重建索引,使词库中新加的单词生效


script 脚本条件更新

删除 status=published,且发布时间在指定范围的数据

POST /my_index/_update_by_query
{
  "query": {
    "bool": {
      "must": [{
        "term": {
          "status": "published"
        }
      },
      {
        "range": {
          "publish_time": {
            "lt": "2023-10-01 00:00:00",
            "gte": "2023-09-20 00:00:00"
          }
        }
      }]
    }
  },
  "script": {
    "lang": "painless",
    "source": "ctx._source.key1 = ctx._source.key2; ctx._source.key3.subkey31 = ctx._source.key4.subkey41;"
  }
}

删除 dense_vector 向量字段

删除 title_vector 和 content_vector 两个 dense_vector 向量字段

POST /my_index/_update_by_query
{
  "script": {
    "source": "ctx._source.remove('title_vector'); ctx._source.remove('content_vector');",
    "lang": "painless"
  },
  "query": {
    "term": {
      "id": "e76dc6ec97ad415882658dd62bcf69e3"
    }
  }
}

GET /index/_doc/_id 根据id查询文档

Elasticsearch Guide [7.16] » REST APIs » Document APIs » Get API
https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-get.html

GET <index>/_doc/<_id> 根据 ID 查询文档

ES 文档上的每一次写操作,包括删除,都会使文档的 _version 递增,已删除文档的 version 会在一小段时间内保持可见,时间由配置项 index.gc_deletes 决定,默认是 60 秒。

例如 GET 'http://localhost:9200/article/_doc/uv_ZjXEBrN9oq5tgVMuj' 返回

{
    "_index": "article",
    "_type": "_doc",
    "_id": "uv_ZjXEBrN9oq5tgVMuj",
    "_version": 1,
    "_seq_no": 0,
    "_primary_term": 1,
    "found": true,
    "_source": {
        "title": "es的使用",
        "pathname": "/article/es",
        "content": "新增记录的时候,也可以不指定 Id,这时要改成 POST 请求。"
    }
}

ID 不存在时,返回 "found": false

{
    "_index": "article",
    "_type": "_doc",
    "_id": "1",
    "found": false
}

DELETE /index/_doc/_id 删除文档

Elasticsearch Guide [8.1] » REST APIs » Document APIs » Delete API
https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete.html

DELETE /<index>/_doc/<_id>

通过修改 version 进行删除,异步合并 Segment 时才真正删除。


POST /index/_delete_by_query 根据条件删除

https://www.elastic.co/guide/en/elasticsearch/reference/7.6/docs-delete-by-query.html

POST /<target>/_delete_by_query 根据条件删除文档,使用和 search 接口相同的查询条件语法,可使用 URI 条件或 body 条件。

例如

POST /my-index-000001/_delete_by_query
{
  "query": {
    "match": {
      "user.id": "elkbee"
    }
  }
}

删除索引中的全部文档(清空索引)

使用空条件删除即可

POST /my-index-000001/_delete_by_query
{
    "query": {
        "match_all": {}
    }
}

wait_for_completion=false 异步删除

Elasticsearch 的 /_delete_by_query 接口默认是同步执行的,它会等待所有的匹配文档被处理后才返回。这意味着如果查询命中的数据量很大,那么删除操作可能会花费相当长的时间。
可以通过在请求中设置 wait_for_completion=false 参数来将其变为异步操作。在这种情况下,Elasticsearch 会先做一些请求合法性检查,然后立即返回一个任务ID,你可以使用这个ID来获取删除操作的进度或取消任务。内部是在 .tasks/task/${taskId} 索引中插入了一个文档来记录删除任务。

ignore_unavailable=true 索引不存在时不报错

默认情况下,如果索引不存在,返回 index_not_found_exception 错误
如果希望在索引不存在时不报错,你可以使用 ignore_unavailable=true 选项。这个选项会让 Elasticsearch 忽略那些在执行操作时不存在的索引。

POST /your_index/_delete_by_query?ignore_unavailable=true
{
    "query": {
        "match_all": {}
    }
}

java中:

DeleteByQueryRequest request = new DeleteByQueryRequest("your_index");
request.setQuery(QueryBuilders.matchAllQuery());
request.setIndicesOptions(IndicesOptions.lenientExpandOpen()); // 索引不存在时不报错
BulkByScrollResponse response = client.deleteByQuery(request, RequestOptions.DEFAULT);

slices 划分子任务

_delete_by_query 接口的 slices 参数用于将删除操作划分为多个并行任务以提高性能。这个参数的值可以是一个整数或者是 auto。
默认值是 1,表示不会划分子任务。
当你设定 slices 参数为一个大于1的整数,比如设定为5,那么 Elasticsearch 将把需要删除的文档划分为5个分片,每个分片都会并行地进行删除操作。这种方式可以有效地提高大规模删除操作的效率。
如果你设定 slices 参数为 auto,那么 Elasticsearch 会自动选择合适的分片数量,这个数量通常是索引的分片(shards)数。
需要注意的是,虽然增加 slices 的数量可以提高删除操作的速度,但是也会增加 Elasticsearch 集群的负载。因此,你需要根据你的集群的性能和负载情况来合理设定这个参数。
此外,slices 参数并不是越大越好。如果你的删除操作涉及的文档数量本身就不大,那么增加 slices 的数量并不会带来显著的性能提升,反而可能会因为创建过多的并行任务而浪费资源。
总的来说,slices 参数是一个用于优化删除操作性能的工具,你需要根据你的实际情况来合理使用它。

scroll 和 scroll_size 批次间隔与数量

由于删除的数据量可能很大,无法一次性处理删除,_delete_by_query 内部使用 scroll 进行批量查询并删除。

  • scroll 参数的值为一个时间值,表示上下文的保持时间,默认值 5 分钟。例如,如果设置scroll=1m,则表示上下文将保持1分钟。
  • scroll_size 参数用于控制每个批次检索和删除的文档数量,默认值 1000

scroll 是 Elasticsearch 中用于处理大量数据的机制,它可以在多个请求之间保留搜索上下文,以便能够获取到所有符合查询条件的文档,而不仅仅是返回的第一批文档。
scroll 参数的保持时间不宜设置得过长,因为保持搜索上下文会占用资源。
scroll_size 参数决定了每次批量操作的文档数量。例如,如果设置 scroll_size 为 1000,那么每个批次将处理 1000 个文档,然后再处理下一个批次。
这个参数可以根据你的需求和 Elasticsearch 集群的能力来调整。如果你设置的 scroll_size 太大,可能会消耗大量的内存和CPU资源,从而导致性能问题。反之,如果 scroll_size 太小,那么每次处理的文档数量就会很少,从而需要更多的批次来完成所有文档的处理,这可能会导致整个操作的效率较低。
因此,你需要根据你的情况来适当地设置 scroll_size 参数,以在性能和效率之间找到一个平衡。

requests_per_second 限制删除速度

requests_per_second 参数在 Elasticsearch 的删除查询接口中用于控制删除操作的吞吐量。这个参数的目标是防止删除操作过于频繁或过快地执行,从而影响 Elasticsearch 集群的性能。
这个参数是一个可选的参数,它的默认值是-1,表示删除操作将尽可能快地执行。你可以设置这个参数为任意浮点数,比如1.5,这表示 Elasticsearch 将每秒执行1.5次删除操作。

条件删除原理

_delete_by_query 并不是真正意义上物理文档删除,而是只是版本变化并且对文档增加了删除标记。当我们再次搜索的时候,会搜索全部然后过滤掉有删除标记的文档。因此,该索引所占的空间并不会随着该 API 的操作磁盘空间会马上释放掉,只有等到下一次段合并的时候才真正被物理删除,这个时候磁盘空间才会释放。相反,在被查询到的文档标记删除过程同样需要占用磁盘空间,这个时候,你会发现触发该 API 操作的时候磁盘不但没有被释放,反而磁盘使用率上升了。


POST /_bulk 批量操作

Elasticsearch Guide [7.17] » REST APIs » Document APIs » Bulk API
https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html

POST /_bulk
POST /<target>/_bulk

注意:

  • _bulk 批量操作操作使用 NDJSON(Newline Delimited JSON) 格式的请求体,批量操作的 body 必须是一行 action 紧接着一行数据(delete 不需要数据),数据必须在一行中且中间不能换行,一行数据结束后必须换行才能接下一个 action,且最后必须以一个空行结束
  • _bulk 批量操作的 HTTP 请求 Content-Type 可以使用 application/jsonapplication/x-ndjson

例如

POST _bulk
{ "index" : { "_index" : "test", "_id" : "1" } }
{ "field1" : "value1" }
{ "index" : { "_index" : "test"} }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_id" : "2" } }
{ "create" : { "_index" : "test", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_index" : "test"} }
{ "doc" : {"field2" : "value2"} }

{ "index": { "_index": "mytest", "_id": "1" } }
{ "content": "美国留给伊拉克的是个烂摊子吗" }
{ "index": { "_index": "mytest", "_id": "2" } }
{ "content": "公安部:各地校车将享最高路权" }
{ "index": { "_index": "mytest", "_id": "3" } }
{ "content": "中韩渔警冲突调查:韩警平均每天扣1艘中国渔船" }
{ "index": { "_index": "mytest", "_id": "4" } }
{ "content": "中国驻洛杉矶领事馆遭亚裔男子枪击 嫌犯已自首" }

返回体中,每个操作有一个单独的结果,按入参请求顺序排列,各个操作是否成功互不影响
例如

{
  "took": 131,
  "errors": false,
  "items": [
    {
      "index": {
        "_index": "index_ik",
        "_id": "1",
        "_version": 1,
        "result": "created",
        "_shards": {
          "total": 2,
          "successful": 1,
          "failed": 0
        },
        "_seq_no": 0,
        "_primary_term": 1,
        "status": 201
      }
    },
    {
      "index": {
        "_index": "index_ik",
        "_id": "2",
        "_version": 1,
        "result": "created",
        "_shards": {
          "total": 2,
          "successful": 1,
          "failed": 0
        },
        "_seq_no": 1,
        "_primary_term": 1,
        "status": 201
      }
    }
  ]
}

refresh=true 操作后立即对搜索可见

默认情况下, /bulk 操作后如果立即执行 /_search 或 /_count 可能无法检索到操作的数据,因为索引分片还未刷新。

refresh 参数可控制索引刷新:

  • true 在该批次操作完成后,立即刷新所有相关的分片。这意味着,在该批次操作后,所有的更改都将立即对搜索可见。
  • false 默认值,在该批次操作完成后,不立即刷新相关的分片。Elasticsearch 将按照其默认的刷新间隔进行刷新。这也是默认行为。
  • wait_for 在该批次操作完成后,等待自动刷新使得更改对搜索可见,之后再返回。这意味着,请求将在刷新完成后才返回。

所以,refresh=true 或 wait_for 时,请求返回后都立即对搜索可见,只不过 refresh=true 内部会触发立即刷新,而 refresh=wait_for 内部是等自动刷新完成。
注意,refresh=true 或 wait_for 时,只会等待批量操作文档路由到的相关的 shards 刷新,而不是等待全部 shards 刷新

Java 代码中设置 refresh

BulkRequest bulkRequest = new BulkRequest();
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);

enum RefreshPolicy implements Writeable {
    /**
     * Don't refresh after this request. The default.
     */
    NONE("false"),
    /**
     * Force a refresh as part of this request. This refresh policy does not scale for high indexing or search throughput but is useful
     * to present a consistent view to for indices with very low traffic. And it is wonderful for tests!
     */
    IMMEDIATE("true"),
    /**
     * Leave this request open until a refresh has made the contents of this request visible to search. This refresh policy is
     * compatible with high indexing and search throughput but it causes the request to wait to reply until a refresh occurs.
     */
    WAIT_UNTIL("wait_for");
}

Malformed action/metadata line

原因:批量操作的 body 必须是一行 action 一行数据(delete 不需要数据),数据必须在一行中且中间不能换行,一行数据结束后必须换行才能接下一个 action,且最后必须以一个空行结束
比如

{ "index": {"_index": "user_profile", "_type": "base_info", "_id": 1234567 } }
{ "user_id": 1234567 }

是正确的,但如果 改为

{ "index": {"_index": "user_profile", "_type": "base_info", "_id": 1234567 } }
{
  "user_id": 1234567
}

就会报下面的错误

{
    "error": {
        "root_cause": [
            {
                "type": "illegal_argument_exception",
                "reason": "Malformed action/metadata line [3], expected START_OBJECT but found [VALUE_STRING]"
            }
        ],
        "type": "illegal_argument_exception",
        "reason": "Malformed action/metadata line [3], expected START_OBJECT but found [VALUE_STRING]"
    },
    "status": 400
}

如果请求 body 的最后没有换行 \n,就会报下面的错误:

{
    "error":{
        "root_cause":[
            {
                "type":"illegal_argument_exception",
                "reason":"The bulk request must be terminated by a newline [
]"
            }
        ],
        "type":"illegal_argument_exception",
        "reason":"The bulk request must be terminated by a newline [
]"
    },
    "status":400
}

BULK API : Malformed action/metadata line [3], expected START_OBJECT but found [VALUE_STRING]
https://stackoverflow.com/questions/45792309/bulk-api-malformed-action-metadata-line-3-expected-start-object-but-found


乐观并发控制

Elasticsearch Guide [7.17] » REST APIs » Document APIs » Optimistic concurrency control
https://www.elastic.co/guide/en/elasticsearch/reference/7.17/optimistic-concurrency-control.html

每个文档都有一个 _version 版本号,当文档被修改时版本号递增。 Elasticsearch 使用这个 _version 号来确保变更以正确顺序得到执行。如果旧版本的文档在新版本之后到达,它可以被简单的忽略。

为了避免丢失数据, 更新 API 会在获取步骤中获取当前文档中的 _version,然后将其传递给重新索引步骤中的 索引 请求。如果其他的进程在这两步之间修改了这个文档,那么 _version 就会不同,这样更新就会失败。

409/Conflict

2 个请求并发对同一个 id 的文档进行更新:
请求 1 获取文档版本号是 1
请求 2 获取文档版本号是 1
请求 2 重新索引文档,写入成功,版本号更新为 2
请求 1 重新索引文档时,发现已有的文档 版本号是 2,索引失败,返回 409 Conflict


上一篇 Java-线程与线程池

下一篇 Elasticsearch-Api-索引操作

阅读
评论
5.7k
阅读预计24分钟
创建日期 2025-04-15
修改日期 2025-04-15
类别

页面信息

location:
protocol:
host:
hostname:
origin:
pathname:
href:
document:
referrer:
navigator:
platform:
userAgent:

评论