ElasticSearch 深度分页解决方案

原创 elasticsearch

常见深度分页方式 from+size

es 默认采用的分页方式是 from+size 的形式,在深度分页的情况下,这种使用方式效率是非常低的,比如 from = 5000, size=10, es 需要在各个分片上匹配排序并得到 5000x10 条有效数据,然后在结果集中取最后 10 条数据返回,这种方式类似于 mongo 的 skip+size。

除了效率上的问题,还有一个无法解决的问题是,es 目前支持最大的 skip 值是 max_result_window ,默认为 10000 。也就是当 from + size > max_result_window 时,es 将返回错误。

写一个简单测试演示错误。

@Test
public void testEsSearch() throws IOException
{
    String authorization = "Basic " + Base64.encodeBase64String("root:111111".getBytes());
    
    String url = "http://192.190.20.5:9200/flint_ip2location_v6/_search";
    
    Document document = Jsoup.connect(url)
            .header("Authorization", authorization)
            .header("Accept", "application/json; charset=UTF-8")
            .header("Content-Type", "application/json")
            .requestBody("{\"from\": 10000, \"size\": 1000}")
            .ignoreContentType(true)
            .post();
    System.out.println(document.body().text());
}

前端请求返回 500 状态:

org.jsoup.HttpStatusException: HTTP error fetching URL. Status=500, URL=http://192.190.20.5:9200/flint_ip2location_v6/_search
	at org.jsoup.helper.HttpConnection$Response.execute(HttpConnection.java:776)
	at org.jsoup.helper.HttpConnection$Response.execute(HttpConnection.java:722)
	at org.jsoup.helper.HttpConnection.execute(HttpConnection.java:306)
	at org.jsoup.helper.HttpConnection.post(HttpConnection.java:301)
	at com.spinfo.soup.JsoupTest.testEsSearch(JsoupTest.java:204)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:389)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:115)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:167)
	at org.junit.jupiter.engine.execution.ThrowableCollector.execute(ThrowableCollector.java:40)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:163)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:110)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:57)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.lambda$execute$3(HierarchicalTestExecutor.java:83)
	at org.junit.platform.engine.support.hierarchical.SingleTestExecutor.executeSafely(SingleTestExecutor.java:66)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:77)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.lambda$null$2(HierarchicalTestExecutor.java:92)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
	at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
	at java.util.Iterator.forEachRemaining(Iterator.java:116)
	at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.lambda$execute$3(HierarchicalTestExecutor.java:92)
	at org.junit.platform.engine.support.hierarchical.SingleTestExecutor.executeSafely(SingleTestExecutor.java:66)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:77)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.lambda$null$2(HierarchicalTestExecutor.java:92)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
	at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
	at java.util.Iterator.forEachRemaining(Iterator.java:116)
	at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.lambda$execute$3(HierarchicalTestExecutor.java:92)
	at org.junit.platform.engine.support.hierarchical.SingleTestExecutor.executeSafely(SingleTestExecutor.java:66)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:77)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:51)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:43)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:170)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:154)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:90)
	at org.eclipse.jdt.internal.junit5.runner.JUnit5TestReference.run(JUnit5TestReference.java:86)
	at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:538)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:760)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:460)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:206)

es 服务日志错误:

[2018-11-12T08:15:37,328][DEBUG][o.e.a.s.TransportSearchAction] [esnode-1] [flint_ip2location_v6][0], node[e4kCsoe0R8-Bq3ZmOFyEGQ], [P], s[STARTED], a[id=SfhVgfmJTzGlyUhWfgV5_g]: Failed to execute [SearchRequest{searchType=QUERY_THEN_FETCH, indices=[flint_ip2location_v6], indicesOptions=IndicesOptions[id=38, ignore_unavailable=false, allow_no_indices=true, expand_wildcards_open=true, expand_wildcards_closed=false, allow_aliases_to_multiple_indices=true, forbid_closed_indices=true, ignore_aliases=false], types=[], routing='null', preference='null', requestCache=null, scroll=null, maxConcurrentShardRequests=5, batchedReduceSize=512, preFilterShardSize=128, allowPartialSearchResults=true, source={"from":10000,"size":1000}}]
org.elasticsearch.transport.RemoteTransportException: [esnode-1][192.190.20.5:9300][indices:data/read/search[phase/query]]
Caused by: org.elasticsearch.search.query.QueryPhaseExecutionException: Result window is too large, from + size must be less than or equal to: [10000] but was [11000]. See the scroll api for a more efficient way to request large data sets. This limit can be set by changing the [index.max_result_window] index level setting.
	at org.elasticsearch.search.DefaultSearchContext.preProcess(DefaultSearchContext.java:206) ~[elasticsearch-6.3.0.jar:6.3.0]
	at org.elasticsearch.search.query.QueryPhase.preProcess(QueryPhase.java:89) ~[elasticsearch-6.3.0.jar:6.3.0]
	at org.elasticsearch.search.SearchService.createContext(SearchService.java:587) ~[elasticsearch-6.3.0.jar:6.3.0]
	at org.elasticsearch.search.SearchService.createAndPutContext(SearchService.java:551) ~[elasticsearch-6.3.0.jar:6.3.0]
	at org.elasticsearch.search.SearchService.executeQueryPhase(SearchService.java:347) ~[elasticsearch-6.3.0.jar:6.3.0]
	at org.elasticsearch.search.SearchService$2.onResponse(SearchService.java:333) [elasticsearch-6.3.0.jar:6.3.0]
	at org.elasticsearch.search.SearchService$2.onResponse(SearchService.java:329) [elasticsearch-6.3.0.jar:6.3.0]
	at org.elasticsearch.search.SearchService$3.doRun(SearchService.java:1019) [elasticsearch-6.3.0.jar:6.3.0]
	at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:724) [elasticsearch-6.3.0.jar:6.3.0]
	at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [elasticsearch-6.3.0.jar:6.3.0]
	at org.elasticsearch.common.util.concurrent.TimedRunnable.doRun(TimedRunnable.java:41) [elasticsearch-6.3.0.jar:6.3.0]
	at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [elasticsearch-6.3.0.jar:6.3.0]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_171]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_171]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]
[2018-11-12T08:15:37,329][DEBUG][o.e.a.s.TransportSearchAction] [esnode-1] All shards failed for phase: [query]
org.elasticsearch.search.query.QueryPhaseExecutionException: Result window is too large, from + size must be less than or equal to: [10000] but was [11000]. See the scroll api for a more efficient way to request large data sets. This limit can be set by changing the [index.max_result_window] index level setting.
	at org.elasticsearch.search.DefaultSearchContext.preProcess(DefaultSearchContext.java:206) ~[elasticsearch-6.3.0.jar:6.3.0]
	at org.elasticsearch.search.query.QueryPhase.preProcess(QueryPhase.java:89) ~[elasticsearch-6.3.0.jar:6.3.0]
	at org.elasticsearch.search.SearchService.createContext(SearchService.java:587) ~[elasticsearch-6.3.0.jar:6.3.0]
	at org.elasticsearch.search.SearchService.createAndPutContext(SearchService.java:551) ~[elasticsearch-6.3.0.jar:6.3.0]
	at org.elasticsearch.search.SearchService.executeQueryPhase(SearchService.java:347) ~[elasticsearch-6.3.0.jar:6.3.0]
	at org.elasticsearch.search.SearchService$2.onResponse(SearchService.java:333) [elasticsearch-6.3.0.jar:6.3.0]
	at org.elasticsearch.search.SearchService$2.onResponse(SearchService.java:329) [elasticsearch-6.3.0.jar:6.3.0]
	at org.elasticsearch.search.SearchService$3.doRun(SearchService.java:1019) [elasticsearch-6.3.0.jar:6.3.0]
	at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:724) [elasticsearch-6.3.0.jar:6.3.0]
	at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [elasticsearch-6.3.0.jar:6.3.0]
	at org.elasticsearch.common.util.concurrent.TimedRunnable.doRun(TimedRunnable.java:41) [elasticsearch-6.3.0.jar:6.3.0]
	at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [elasticsearch-6.3.0.jar:6.3.0]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_171]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_171]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]
[2018-11-12T08:15:37,329][WARN ][r.suppressed             ] path: /flint_ip2location_v6/_search, params: {index=flint_ip2location_v6}
org.elasticsearch.action.search.SearchPhaseExecutionException: all shards failed
	at org.elasticsearch.action.search.AbstractSearchAsyncAction.onPhaseFailure(AbstractSearchAsyncAction.java:288) ~[elasticsearch-6.3.0.jar:6.3.0]
	at org.elasticsearch.action.search.AbstractSearchAsyncAction.executeNextPhase(AbstractSearchAsyncAction.java:128) ~[elasticsearch-6.3.0.jar:6.3.0]
	at org.elasticsearch.action.search.AbstractSearchAsyncAction.onPhaseDone(AbstractSearchAsyncAction.java:249) ~[elasticsearch-6.3.0.jar:6.3.0]
	at org.elasticsearch.action.search.InitialSearchPhase.onShardFailure(InitialSearchPhase.java:101) ~[elasticsearch-6.3.0.jar:6.3.0]
	at org.elasticsearch.action.search.InitialSearchPhase.access$100(InitialSearchPhase.java:48) ~[elasticsearch-6.3.0.jar:6.3.0]
	at org.elasticsearch.action.search.InitialSearchPhase$2.lambda$onFailure$1(InitialSearchPhase.java:222) ~[elasticsearch-6.3.0.jar:6.3.0]
	at org.elasticsearch.action.search.InitialSearchPhase.maybeFork(InitialSearchPhase.java:176) [elasticsearch-6.3.0.jar:6.3.0]
	at org.elasticsearch.action.search.InitialSearchPhase.access$000(InitialSearchPhase.java:48) [elasticsearch-6.3.0.jar:6.3.0]
	at org.elasticsearch.action.search.InitialSearchPhase$2.onFailure(InitialSearchPhase.java:222) [elasticsearch-6.3.0.jar:6.3.0]
	at org.elasticsearch.action.search.SearchExecutionStatsCollector.onFailure(SearchExecutionStatsCollector.java:73) [elasticsearch-6.3.0.jar:6.3.0]
	at org.elasticsearch.action.ActionListenerResponseHandler.handleException(ActionListenerResponseHandler.java:51) [elasticsearch-6.3.0.jar:6.3.0]
	at org.elasticsearch.action.search.SearchTransportService$ConnectionCountingHandler.handleException(SearchTransportService.java:527) [elasticsearch-6.3.0.jar:6.3.0]
	at org.elasticsearch.transport.TransportService$ContextRestoreResponseHandler.handleException(TransportService.java:1095) [elasticsearch-6.3.0.jar:6.3.0]
	at org.elasticsearch.transport.TransportService$DirectResponseChannel.processException(TransportService.java:1188) [elasticsearch-6.3.0.jar:6.3.0]
	at org.elasticsearch.transport.TransportService$DirectResponseChannel.sendResponse(TransportService.java:1172) [elasticsearch-6.3.0.jar:6.3.0]
	at org.elasticsearch.transport.TaskTransportChannel.sendResponse(TaskTransportChannel.java:66) [elasticsearch-6.3.0.jar:6.3.0]
	at org.elasticsearch.action.search.SearchTransportService$6$1.onFailure(SearchTransportService.java:385) [elasticsearch-6.3.0.jar:6.3.0]
	at org.elasticsearch.search.SearchService$2.onFailure(SearchService.java:341) [elasticsearch-6.3.0.jar:6.3.0]
	at org.elasticsearch.search.SearchService$2.onResponse(SearchService.java:335) [elasticsearch-6.3.0.jar:6.3.0]
	at org.elasticsearch.search.SearchService$2.onResponse(SearchService.java:329) [elasticsearch-6.3.0.jar:6.3.0]
	at org.elasticsearch.search.SearchService$3.doRun(SearchService.java:1019) [elasticsearch-6.3.0.jar:6.3.0]
	at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:724) [elasticsearch-6.3.0.jar:6.3.0]
	at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [elasticsearch-6.3.0.jar:6.3.0]
	at org.elasticsearch.common.util.concurrent.TimedRunnable.doRun(TimedRunnable.java:41) [elasticsearch-6.3.0.jar:6.3.0]
	at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [elasticsearch-6.3.0.jar:6.3.0]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_171]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_171]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]
Caused by: org.elasticsearch.search.query.QueryPhaseExecutionException: Result window is too large, from + size must be less than or equal to: [10000] but was [11000]. See the scroll api for a more efficient way to request large data sets. This limit can be set by changing the [index.max_result_window] index level setting.
	at org.elasticsearch.search.DefaultSearchContext.preProcess(DefaultSearchContext.java:206) ~[elasticsearch-6.3.0.jar:6.3.0]
	at org.elasticsearch.search.query.QueryPhase.preProcess(QueryPhase.java:89) ~[elasticsearch-6.3.0.jar:6.3.0]
	at org.elasticsearch.search.SearchService.createContext(SearchService.java:587) ~[elasticsearch-6.3.0.jar:6.3.0]
	at org.elasticsearch.search.SearchService.createAndPutContext(SearchService.java:551) ~[elasticsearch-6.3.0.jar:6.3.0]
	at org.elasticsearch.search.SearchService.executeQueryPhase(SearchService.java:347) ~[elasticsearch-6.3.0.jar:6.3.0]
	at org.elasticsearch.search.SearchService$2.onResponse(SearchService.java:333) ~[elasticsearch-6.3.0.jar:6.3.0]
	... 9 more

错误原因和解决方式在日志中给出了:

Result window is too large, from + size must be less than or equal to: [10000] but was [11000].See the scroll api for a more efficient way to request large data sets. This limit can be set by changing the [index.max_result_window] index level setting.

查看索引配置信息:

[Hadoop@Hadoop0 ~]$ curl -XGET 192.190.20.5:9200/flint_ip2location_v6/_settings?pretty
{
  "flint_ip2location_v6" : {
    "settings" : {
      "index" : {
        "number_of_shards" : "1",
        "provided_name" : "flint_ip2location_v6",
        "max_result_window" : "10000",
        "creation_date" : "1535653837486",
        "number_of_replicas" : "1",
        "uuid" : "iq4bBu2jQ2KIzB5B4Wqk9w",
        "version" : {
          "created" : "6030099"
        }
      }
    }
  }
}
[Hadoop@Hadoop0 ~]$

如果是线上的 es 数据出现问题,当分页到几百页的时候,es 无法返回数据,此时为了恢复正常使用,只能采用了紧急规避方案,就是将 max_result_window 的值调至 100000000。

[Hadoop@Hadoop0 ~]$ curl -XPUT "192.190.20.5:9200/flint_ip2location_v6/_settings" -d 
'{ 
    "index" : { 
        "max_result_window" : 50000 
    }
}'

然后这种方式只能暂时解决问题,当 es 的使用越来越多,数据量越来越大,深度分页的场景越来越复杂时,如何解决这种问题呢?

另一种分页方式 scroll

为了满足深度分页的场景,es 提供了 scroll 的方式进行分页读取。原理上是对某次查询生成一个游标 scroll_id,后续的查询只需要根据这个游标去取数据,直到结果集中返回的 hits 字段为空,就表示遍历结束。scroll_id 的生成可以理解为建立了一个临时的历史快照,在此之后的增删改查等操作不会影响到这个快照的结果。

使用 curl 进行分页读取过程如下:

  1. 先获取第一个 scroll_id,url 参数包括 /index/_type/ 和 scroll,scroll 字段指定了scroll_id 的有效生存期,以分钟为单位,过期之后会被es 自动清理。如果文档不需要特定排序,可以指定按照文档创建的时间返回会使迭代更高效。
[root@dnsserver ~]# curl -XGET 200.200.107.232:9200/product/info/_search?pretty&scroll=2m -d 
'{"query":{"match_all":{}}, "sort": ["_doc"]}'

# 返回结果
{
  "_scroll_id": "cXVlcnlBbmRGZXRjaDsxOzg3OTA4NDpTQzRmWWkwQ1Q1bUlwMjc0WmdIX2ZnOzA7",
  "took": 1,
  "timed_out": false,
  "_shards": {
  "total": 1,
  "successful": 1,
  "failed": 0
  },
  "hits":{...}
}
  1. 后续的文档读取上一次查询返回的 scroll_id 来不断的取下一页,如果 srcoll_id 的生存期很长,那么每次返回的 scroll_id 都是一样的,直到该 scroll_id 过期,才会返回一个新的 scroll_id。请求指定的 scroll_id 时就不需要 /index/_type 等信息了。每读取一页都会重新设置 scroll_id 的生存时间,所以这个时间只需要满足读取当前页就可以,不需要满足读取所有的数据的时间,1 分钟足够。
[root@dnsserver ~]# curl -XGET '200.200.107.232:9200/_search/scroll?scroll=1m&scroll_id=cXVlcnlBbmRGZXRjaDsxOzg4NDg2OTpTQzRmWWkwQ1Q1bUlwMjc0WmdIX2ZnOzA7'

#返回结果
{
    "_scroll_id": "cXVlcnlBbmRGZXRjaDsxOzk1ODg3NDpTQzRmWWkwQ1Q1bUlwMjc0WmdIX2ZnOzA7",
    "took": 106,
    "_shards": {
        "total": 1,
        "successful": 1,
        "failed": 0
    },
    "hits": {
        "total": 22424,
        "max_score": 1.0,
        "hits": [{
                "_index": "product",
                "_type": "info",
                "_id": "did-519392_pdid-2010",
                "_score": 1.0,
                "_routing": "519392",
                "_source": {
                    ....
                }
            }
        ]
    }
}
  1. 所有文档获取完毕之后,需要手动清理掉 scroll_id 。虽然es 会有自动清理机制,但是 srcoll_id 的存在会耗费大量的资源来保存一份当前查询结果集映像,并且会占用文件描述符。所以用完之后要及时清理。使用 es 提供的 CLEAR_API 来删除指定的 scroll_id
## 删掉指定的多个 srcoll_id 
[root@dnsserver ~]# curl -XDELETE 127.0.0.1:9200/_search/scroll -d 
'{"scroll_id" : ["cXVlcnlBbmRGZXRjaDsxOzg3OTA4NDpTQzRmWWkwQ1Q1bUlwMjc0WmdIX2ZnOzA7"]}'

## 删除掉所有索引上的 scroll_id 
[root@dnsserver ~]# curl -XDELETE 127.0.0.1:9200/_search/scroll/_all

## 查询当前所有的scroll 状态
[root@dnsserver ~]# curl -XGET 127.0.0.1:9200/_nodes/stats/indices/search?pretty
{
  "cluster_name" : "200.200.107.232",
  "nodes" : {
    "SC4fYi0CT5mIp274ZgH_fg" : {
      "timestamp" : 1514346295736,
      "name" : "200.200.107.232",
      "transport_address" : "200.200.107.232:9300",
      "host" : "200.200.107.232",
      "ip" : [ "200.200.107.232:9300", "NONE" ],
      "indices" : {
        "search" : {
          "open_contexts" : 0,
          "query_total" : 975758,
          "query_time_in_millis" : 329850,
          "query_current" : 0,
          "fetch_total" : 217069,
          "fetch_time_in_millis" : 84699,
          "fetch_current" : 0,
          "scroll_total" : 5348,
          "scroll_time_in_millis" : 92712468,
          "scroll_current" : 0
        }
      }
    }
  }
}

实际测试单索引 10000000 数据,总存储数据大小为 3G,使用 from + size 的方式抽取一个小时只跑了 4000000 左右,而采用 scroll 方式在 15 分钟全部跑完。性能差距非常大。

scroll + scan

当 scroll 的文档不需要排序时,es 为了提高检索的效率,在 2.0 版本提供了 scroll + scan 的方式。随后又在 2.1.0 版本去掉了 scan 的使用,直接将该优化合入了 scroll 中。

没有具体使用过,只简单提一下。使用的 scan 的方式是指定 search_type=scan

# 2.0-beta 版本禁用 scroll 的排序,使遍历更加高效
[root@dnsserver ~]# curl '127.0.0.1:9200/order/info/_search?scroll=1m&search_type=scan'  -d '{"query":{"match_all":{}}'

search_after 的方式

上述的 scroll search 的方式,官方的建议并不是用于实时的请求,因为每一个 scroll_id 不仅会占用大量的资源(特别是排序的请求),而且是生成的历史快照,对于数据的变更不会反映到快照上。这种方式往往用于非实时处理大量数据的情况,比如要进行数据迁移或者索引变更之类的。那么在实时情况下如果处理深度分页的问题呢?es 给出了 search_after 的方式,这是在 >= 5.0 版本才提供的功能。

search_after 分页的方式和 scroll 有一些显著的区别,首先它是根据上一页的最后一条数据来确定下一页的位置,同时在分页请求的过程中,如果有索引数据的增删改查,这些变更也会实时的反映到游标上。

为了找到每一页最后一条数据,每个文档必须有一个全局唯一值,官方推荐使用 _uid 作为全局唯一值,其实使用业务层的 id 也可以。

  1. 第一页的请求和正常的请求一样
curl -XGET 127.0.0.1:9200/order/info/_search
{
    "size": 10,
    "query": {
        "term" : {
            "did" : 519390
        }
    },
    "sort": [
        {"date": "asc"},
        {"_uid": "desc"}
    ]
}
  1. 第二页的请求,使用第一页返回结果的最后一个数据的值,加上 search_after 字段来取下一页。注意,使用 search_after 的时候要将 from 置为 0 或 -1
curl -XGET 127.0.0.1:9200/order/info/_search
{
    "size": 10,
    "query": {
        "term" : {
            "did" : 519390
        }
    },
    "search_after": [1463538857, "tweet#654323"],
    "sort": [
        {"date": "asc"},
        {"_uid": "desc"}
    ]
}

总结:search_after 适用于深度分页 + 排序,因为每一页的数据依赖于上一页最后一条数据,所以无法跳页请求。

且返回的始终是最新的数据,在分页过程中数据的位置可能会有变更。


参考
1:blog.csdn.net/u011228889/article/details/79760167

如果觉得这对你有用,请随意赞赏,给与作者支持
评论 0
最新评论