-
Notifications
You must be signed in to change notification settings - Fork 25.4k
Description
Currently, a remote enrich operation can't be after a reduction operator such as aggregation or limit (including topN). We can't do anything for aggregation, as we lose the originating cluster information after the aggregation. However, we should support remote enrich after limit/topn.
For example, the following query is currently not supported:
FROM logs, remote-cluster:logs
| SORT @timestamp DESC
| LIMIT 10
| ENRICH[ccq.mode=remote] hosts
This is because it would generate an invalid plan with remote enrich after the exchange:
EnrichExec[REMOTE,ip{f}#4,hosts,ip,{=idx},[ip{r}#9, os{r}#10, location{r}#11]]
\_TopNExec[[Order[@timestamp{f}#5,ASC,LAST]],10[INTEGER],null]
\_ExchangeExec[[],false]
\_FragmentExec[filter=null, estimatedRowSize=0, fragment=[<>
TopN[[Order[@timestamp{f}#5,ASC,LAST]],10[INTEGER]]
\_EsRelation[events][@timestamp{f}#5, ip{f}#4]<>]]
Users want to write such queries because they maintain a different dataset for the enrich index on each cluster. In this example, hosts
can refer to machines in either Europe or the USA only.
However, if we move the remote enrich to the fragment, the physical plan becomes valid, as the remote enrich will be executed on the remote cluster:
TopNExec[[Order[@timestamp{f}#5,ASC,LAST]],10[INTEGER],null]
\_ExchangeExec[[],false]
\_FragmentExec[filter=null, estimatedRowSize=0, fragment=[<>
\_Enrich[REMOTE,ip{f}#4,hosts,ip,{=idx},[ip{r}#9, os{r}#10, location{r}#11]]
\_TopN[[Order[@timestamp{f}#5,ASC,LAST]],10[INTEGER]]
\_EsRelation[events][@timestamp{f}#5, ip{f}#4]<>]]
While writing about this issue, I also realized that I overlooked assertions in tests for remote enrich and LIMIT (sorry). This query currently generates an invalid physical plan:
FROM logs, remote-cluster:logs
| ENRICH[ccq.mode=remote] hosts
| LIMIT 10
EnrichExec[REMOTE,ip{f}#20,hosts,ip,{=idx},[ip{r}#24, os{r}#25, location{r}#26]]
\_LimitExec[10[INTEGER]]
\_ExchangeExec[[],false]
\_FragmentExec[filter=null, estimatedRowSize=0, fragment=[<>Limit[10[INTEGER]]
So here we have two related issues in the planning of remote enrich. I am happy to keep working on these two issues, but I am also happy if someone else takes them on. Either way works for me.