Skip to content

Commit 88029b1

Browse files
pdabre12Pratik Joseph Dabre
authored andcommitted
Add a new router plugin : presto-plan-checker-router-plugin
1 parent 5bbdf93 commit 88029b1

28 files changed

Lines changed: 781 additions & 69 deletions

File tree

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,7 @@
215215
<module>presto-base-arrow-flight</module>
216216
<module>presto-function-server</module>
217217
<module>presto-router-example-plugin-scheduler</module>
218+
<module>presto-plan-checker-router-plugin</module>
218219
</modules>
219220

220221
<dependencyManagement>
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
3+
<modelVersion>4.0.0</modelVersion>
4+
5+
<parent>
6+
<groupId>com.facebook.presto</groupId>
7+
<artifactId>presto-root</artifactId>
8+
<version>0.293-SNAPSHOT</version>
9+
</parent>
10+
11+
<groupId>com.facebook.presto.router</groupId>
12+
<artifactId>presto-plan-checker-router-plugin</artifactId>
13+
<version>0.293-SNAPSHOT</version>
14+
<packaging>jar</packaging>
15+
<name>presto-plan-checker-router-plugin</name>
16+
<description>Presto plan checker router plugin</description>
17+
18+
<properties>
19+
<air.main.basedir>${project.parent.basedir}</air.main.basedir>
20+
</properties>
21+
22+
<dependencies>
23+
<dependency>
24+
<groupId>com.facebook.presto</groupId>
25+
<artifactId>presto-spi</artifactId>
26+
<scope>provided</scope>
27+
</dependency>
28+
29+
<dependency>
30+
<groupId>com.google.guava</groupId>
31+
<artifactId>guava</artifactId>
32+
</dependency>
33+
34+
<!-- test dependencies-->
35+
<dependency>
36+
<groupId>org.testng</groupId>
37+
<artifactId>testng</artifactId>
38+
<scope>test</scope>
39+
</dependency>
40+
<dependency>
41+
<groupId>javax.servlet</groupId>
42+
<artifactId>javax.servlet-api</artifactId>
43+
</dependency>
44+
<dependency>
45+
<groupId>com.facebook.airlift</groupId>
46+
<artifactId>configuration</artifactId>
47+
</dependency>
48+
49+
<dependency>
50+
<groupId>com.facebook.presto</groupId>
51+
<artifactId>presto-common</artifactId>
52+
<scope>provided</scope>
53+
</dependency>
54+
55+
<dependency>
56+
<groupId>io.airlift</groupId>
57+
<artifactId>slice</artifactId>
58+
<scope>provided</scope>
59+
</dependency>
60+
61+
<dependency>
62+
<groupId>com.fasterxml.jackson.core</groupId>
63+
<artifactId>jackson-annotations</artifactId>
64+
<scope>provided</scope>
65+
</dependency>
66+
67+
<dependency>
68+
<groupId>io.airlift</groupId>
69+
<artifactId>units</artifactId>
70+
<scope>provided</scope>
71+
</dependency>
72+
73+
<dependency>
74+
<groupId>org.openjdk.jol</groupId>
75+
<artifactId>jol-core</artifactId>
76+
<scope>provided</scope>
77+
</dependency>
78+
79+
<dependency>
80+
<groupId>com.facebook.airlift</groupId>
81+
<artifactId>bootstrap</artifactId>
82+
</dependency>
83+
84+
<dependency>
85+
<groupId>com.facebook.airlift</groupId>
86+
<artifactId>json</artifactId>
87+
</dependency>
88+
89+
<dependency>
90+
<groupId>com.facebook.presto</groupId>
91+
<artifactId>presto-parser</artifactId>
92+
</dependency>
93+
94+
<dependency>
95+
<groupId>com.facebook.presto</groupId>
96+
<artifactId>presto-main-base</artifactId>
97+
</dependency>
98+
99+
<dependency>
100+
<groupId>com.facebook.presto</groupId>
101+
<artifactId>presto-client</artifactId>
102+
</dependency>
103+
104+
<dependency>
105+
<groupId>com.facebook.presto</groupId>
106+
<artifactId>presto-main</artifactId>
107+
</dependency>
108+
109+
<dependency>
110+
<groupId>com.squareup.okhttp3</groupId>
111+
<artifactId>okhttp</artifactId>
112+
</dependency>
113+
114+
<dependency>
115+
<groupId>com.google.inject</groupId>
116+
<artifactId>guice</artifactId>
117+
</dependency>
118+
119+
<dependency>
120+
<groupId>javax.inject</groupId>
121+
<artifactId>javax.inject</artifactId>
122+
</dependency>
123+
124+
<dependency>
125+
<groupId>com.facebook.drift</groupId>
126+
<artifactId>drift-api</artifactId>
127+
<scope>provided</scope>
128+
</dependency>
129+
130+
<dependency>
131+
<groupId>com.facebook.airlift</groupId>
132+
<artifactId>log</artifactId>
133+
</dependency>
134+
135+
<dependency>
136+
<groupId>com.facebook.presto</groupId>
137+
<artifactId>presto-router</artifactId>
138+
<version>0.293-SNAPSHOT</version>
139+
<scope>compile</scope>
140+
</dependency>
141+
</dependencies>
142+
143+
<!-- Build configurations -->
144+
<build>
145+
<plugins>
146+
<plugin>
147+
<groupId>com.facebook.presto</groupId>
148+
<artifactId>presto-maven-plugin</artifactId>
149+
<executions>
150+
<execution>
151+
<phase>validate</phase>
152+
<goals>
153+
<goal>check-spi-dependencies</goal>
154+
</goals>
155+
</execution>
156+
</executions>
157+
</plugin>
158+
<plugin>
159+
<groupId>ca.vanzyl.provisio.maven.plugins</groupId>
160+
<artifactId>provisio-maven-plugin</artifactId>
161+
<executions>
162+
<execution>
163+
<phase>package</phase>
164+
<goals>
165+
<goal>provision</goal>
166+
</goals>
167+
</execution>
168+
</executions>
169+
</plugin>
170+
</plugins>
171+
</build>
172+
</project>
Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.router.scheduler;
15+
16+
import com.facebook.airlift.json.JsonCodec;
17+
import com.facebook.airlift.log.Logger;
18+
import com.facebook.airlift.stats.CounterStat;
19+
import com.facebook.presto.client.ClientSession;
20+
import com.facebook.presto.client.ErrorLocation;
21+
import com.facebook.presto.client.FailureInfo;
22+
import com.facebook.presto.client.QueryError;
23+
import com.facebook.presto.client.QueryResults;
24+
import com.facebook.presto.client.QueryStatusInfo;
25+
import com.facebook.presto.client.StatementClient;
26+
import com.facebook.presto.router.RouterResource;
27+
import com.facebook.presto.server.HttpRequestSessionContext;
28+
import com.facebook.presto.server.SessionContext;
29+
import com.facebook.presto.sql.parser.SqlParserOptions;
30+
import com.google.common.collect.ImmutableMap;
31+
import io.airlift.units.Duration;
32+
import okhttp3.OkHttpClient;
33+
import org.weakref.jmx.Managed;
34+
import org.weakref.jmx.Nested;
35+
36+
import javax.servlet.http.HttpServletRequest;
37+
38+
import java.net.URI;
39+
import java.util.Locale;
40+
import java.util.Optional;
41+
42+
import static com.facebook.airlift.json.JsonCodec.jsonCodec;
43+
import static com.facebook.presto.client.PrestoHeaders.PRESTO_TRANSACTION_ID;
44+
import static com.facebook.presto.client.StatementClientFactory.newStatementClient;
45+
import static com.google.common.base.Verify.verify;
46+
import static java.util.concurrent.TimeUnit.MINUTES;
47+
48+
public class PlanCheckerPrestoClient
49+
{
50+
private static final Logger log = Logger.get(RouterResource.class);
51+
private static final JsonCodec<QueryResults> QUERY_RESULTS_CODEC = jsonCodec(QueryResults.class);
52+
private static final String ANALYZE_CALL = "EXPLAIN (TYPE DISTRIBUTED) ";
53+
private static final CounterStat javaClusterRedirectRequests = new CounterStat();
54+
private static final CounterStat nativeClusterRedirectRequests = new CounterStat();
55+
56+
private final OkHttpClient httpClient = new OkHttpClient();
57+
private final URI preonClusterURI;
58+
private final URI javaRouterURI;
59+
private final URI nativeRouterURI;
60+
61+
public PlanCheckerPrestoClient(URI preonClusterURI, URI javaRouterURI, URI nativeRouterURI)
62+
{
63+
this.preonClusterURI = preonClusterURI;
64+
this.javaRouterURI = javaRouterURI;
65+
this.nativeRouterURI = nativeRouterURI;
66+
}
67+
68+
public Optional<URI> getCompatibleClusterURI(HttpServletRequest httpServletRequest, String statement)
69+
{
70+
String newSql = ANALYZE_CALL + statement;
71+
ClientSession clientSession = parseHeadersToClientSession(httpServletRequest);
72+
boolean isNativeCompatible = true;
73+
// submit initial query
74+
try (StatementClient client = newStatementClient(httpClient, clientSession, newSql)) {
75+
// read query output
76+
while (client.isRunning()) {
77+
log.info((client.currentData().toString()));
78+
79+
if (!client.advance()) {
80+
break;
81+
}
82+
}
83+
84+
// verify final state
85+
if (client.isClientAborted()) {
86+
throw new IllegalStateException("Query aborted by user");
87+
}
88+
89+
if (client.isClientError()) {
90+
throw new IllegalStateException("Query is gone (server restarted?)");
91+
}
92+
93+
verify(client.isFinished());
94+
QueryError resultsError = client.finalStatusInfo().getError();
95+
if (resultsError != null) {
96+
if (resultsError.getErrorType().equalsIgnoreCase("USER_ERROR")) {
97+
throw new RuntimeException(newQueryResults(client));
98+
}
99+
isNativeCompatible = false;
100+
log.info(resultsError.getMessage());
101+
}
102+
}
103+
104+
if (isNativeCompatible) {
105+
log.info("Native compatible, routing to native-clusters router: ");
106+
nativeClusterRedirectRequests.update(1);
107+
return Optional.of(nativeRouterURI);
108+
}
109+
log.info("Native incompatible, routing to java-clusters router: ");
110+
javaClusterRedirectRequests.update(1);
111+
return Optional.of(javaRouterURI);
112+
}
113+
114+
@Managed
115+
@Nested
116+
public CounterStat getJavaClusterRedirectRequests()
117+
{
118+
return javaClusterRedirectRequests;
119+
}
120+
121+
@Managed
122+
@Nested
123+
public CounterStat getNativeClusterRedirectRequests()
124+
{
125+
return nativeClusterRedirectRequests;
126+
}
127+
128+
private String newQueryResults(StatementClient client)
129+
{
130+
QueryStatusInfo queryStatusInfo = client.finalStatusInfo();
131+
QueryResults queryResults = new QueryResults(
132+
queryStatusInfo.getId(),
133+
queryStatusInfo.getInfoUri(),
134+
queryStatusInfo.getPartialCancelUri(),
135+
queryStatusInfo.getNextUri(),
136+
queryStatusInfo.getColumns(),
137+
null, // if error thrown no data is returned.
138+
null,
139+
queryStatusInfo.getStats(),
140+
newQueryError(queryStatusInfo.getError()),
141+
queryStatusInfo.getWarnings(),
142+
queryStatusInfo.getUpdateType(),
143+
queryStatusInfo.getUpdateCount());
144+
145+
return QUERY_RESULTS_CODEC.toJson(queryResults);
146+
}
147+
148+
private QueryError newQueryError(QueryError error)
149+
{
150+
// todo: the getMessage() calls will still return the previous error location
151+
ErrorLocation errorLocation = error.getErrorLocation();
152+
ErrorLocation newErrorLocation = null;
153+
FailureInfo failureInfo = error.getFailureInfo();
154+
if (errorLocation != null) {
155+
newErrorLocation = new ErrorLocation(
156+
errorLocation.getLineNumber(),
157+
errorLocation.getColumnNumber() - ANALYZE_CALL.length());
158+
}
159+
return new QueryError(
160+
error.getMessage(),
161+
error.getSqlState(),
162+
error.getErrorCode(),
163+
error.getErrorName(),
164+
error.getErrorType(),
165+
error.isRetriable(),
166+
newErrorLocation,
167+
new FailureInfo(
168+
failureInfo.getType(),
169+
failureInfo.getMessage(),
170+
failureInfo.getCause(),
171+
failureInfo.getSuppressed(),
172+
failureInfo.getStack(),
173+
newErrorLocation));
174+
}
175+
176+
private ClientSession parseHeadersToClientSession(HttpServletRequest httpServletRequest)
177+
{
178+
// todo: How to parse headers into a ClientSession object?
179+
SessionContext sessionContext = new HttpRequestSessionContext(
180+
httpServletRequest,
181+
new SqlParserOptions());
182+
return new ClientSession(
183+
preonClusterURI,
184+
sessionContext.getIdentity().getUser(),
185+
sessionContext.getSource(),
186+
sessionContext.getTraceToken(),
187+
sessionContext.getClientTags(),
188+
sessionContext.getClientInfo(),
189+
sessionContext.getCatalog(),
190+
sessionContext.getSchema(),
191+
sessionContext.getTimeZoneId(),
192+
Locale.ENGLISH,
193+
ImmutableMap.of(),
194+
sessionContext.getSystemProperties(),
195+
sessionContext.getPreparedStatements(),
196+
sessionContext.getIdentity().getRoles(),
197+
sessionContext.getIdentity().getExtraCredentials(),
198+
httpServletRequest.getHeader(PRESTO_TRANSACTION_ID),
199+
new Duration(2, MINUTES),
200+
true,
201+
ImmutableMap.of(),
202+
ImmutableMap.of(),
203+
true);
204+
}
205+
}

0 commit comments

Comments
 (0)