Skip to content

Spark query cancellation #267

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Sep 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
### Added

- Superset updated to v0.34.0rc1 which brings many improvements and bug fixes ([#250](https://github.com/src-d/sourced-ui/issues/250))
- Add support for Spark query cancelation ([#223](https://github.com/src-d/sourced-ui/issues/223))

</details>

Expand Down
2 changes: 1 addition & 1 deletion superset/requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-e git+https://github.com/src-d/[email protected]srcd1#egg=PyHive
-e git+https://github.com/src-d/[email protected]srcd2#egg=PyHive
black==19.3b0
coverage==4.5.3
flake8-import-order==0.18.1
Expand Down
82 changes: 82 additions & 0 deletions superset/superset/db_engine_specs/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,3 +440,85 @@ def select_star(
latest_partition=latest_partition,
cols=cols,
)

@classmethod
def fetch_data(cls, cursor, limit):
import pyhive
from TCLIService import ttypes

try:
return super(SparkSQLEngineSpec, cls).fetch_data(cursor, limit)
except pyhive.exc.OperationalError as op_err:
# FIXME: The `pyhive.exc.OperationalError` exception is raised when
# a query is cancelled. This seems to happen because `gsc` expects
# the state to be `FINISHED` but got `CANCELED`. This has to be
# fixed once `gsc` correctly handles cancelation.
cancelation_error_msg = "Expected state FINISHED, but found CANCELED"
if (
len(op_err.args) > 0
and isinstance(op_err.args[0], ttypes.TFetchResultsResp)
# pylint: disable=no-member
and op_err.args[0].status.errorMessage == cancelation_error_msg
):
logging.warning("Query has been cancelled, returning empty result")
return []

raise op_err

@classmethod
def _dumps_operation_handle(cls, op_handle):
return dict(
op_handle.__dict__,
operationId={
"guid": op_handle.operationId.guid.decode("ISO-8859-1"),
"secret": op_handle.operationId.secret.decode("ISO-8859-1"),
},
)

@classmethod
def _loads_operation_handle(cls, op_handle):
from pyhive import hive

op_handle["operationId"] = hive.ttypes.THandleIdentifier(
**{k: v.encode("ISO-8859-1") for k, v in op_handle["operationId"].items()}
)

return hive.ttypes.TOperationHandle(**op_handle)

@classmethod
def get_connection_id(cls, cursor):
"""Returns connection id for a cursor

Just uses a hard-coded dummy id. This is done because queries
corresponding to a cursor without a connection id are interpreted as
non-cancellable.

A more suitable value should be the session id of the underlying
connection of the cursor, but that"s not an integer, and here an
integer is required."""

return 1

@classmethod
def handle_cursor(cls, cursor, query, session):
"""Handle a live cursor between the execute and fetchall calls

Adds the json dumps of the `pyhive.hive.ttypes.TOperationHandle`s
to the query object."""

operation_handles = query.extra.get("operation_handles", [])
operation_handles.append(cls._dumps_operation_handle(cursor._operationHandle))
query.set_extra_json_key("operation_handles", operation_handles)
session.commit()
logging.info("Current operation handles: %s", operation_handles)

super(SparkSQLEngineSpec, cls).handle_cursor(cursor, query, session)

@classmethod
def cancel_query(cls, cursor, query):
"""Cancels query in the underlying database"""

logging.info("Cancelling query with id: `%s`", query.id)
for op_handle in query.extra.get("operation_handles", []):
logging.info("Cancelling operation handle: `%s`", op_handle)
cursor.cancel(operation_handle=cls._loads_operation_handle(op_handle))
6 changes: 6 additions & 0 deletions superset/superset/sql_lab.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ def execute_sql_statements(
with closing(engine.raw_connection()) as conn:
with closing(conn.cursor()) as cursor:
query.connection_id = db_engine_spec.get_connection_id(cursor)
session.commit()
statement_count = len(statements)
for i, statement in enumerate(statements):
# check if the query was stopped
Expand Down Expand Up @@ -368,7 +369,11 @@ def execute_sql_statements(


def cancel_query(query, user_name):
logging.info(
"Query with id `%s` has connection id `%s`", query.id, query.connection_id
)
if not query.connection_id:
logging.info("No connection id found, query cancellation skipped")
return

database = query.database
Expand All @@ -382,4 +387,5 @@ def cancel_query(query, user_name):

with closing(engine.raw_connection()) as conn:
with closing(conn.cursor()) as cursor:
logging.info("Calling `cancel_query` on db engine")
db_engine_spec.cancel_query(cursor, query)
7 changes: 5 additions & 2 deletions superset/superset/views/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2428,11 +2428,14 @@ def stop_query(self):
client_id = request.form.get("client_id")
try:
query = db.session.query(Query).filter_by(client_id=client_id).one()
logging.info("Query retrieved with id `%s`", query.id)
query.status = QueryStatus.STOPPED

db.session.commit()
logging.info("Committed status change for query with id `%s`", query.id)
sql_lab.cancel_query(query, g.user.username if g.user else None)
except Exception:
pass
except Exception as e:
return json_error_response("{}".format(e))
return self.json_response("OK")

@has_access_api
Expand Down