SparkSQL SHOW语句执行过程


发布于 2024-08-01 / 28 阅读 / 0 评论 /
SparkSQL中,我们可以通过SHOW来查看table、database、namespace、catalog的列表。

我们在使用SparkSQL时,常用到SHOW TABLES、SHOW DATABASES、SHOW NAMESPACES、SHOW CATALOGS等SQL。

1.SHOW语句执行过程

对于SHOW这种类型的SQL,执行过程比较简单,分为以下四个步骤:

1.1.语法解析

任务提交后,需要创建SparkSession来封装SparkSQL的执行上下文以及执行环境。

以下是通过kyuubi提交的sparksql任务的堆栈信息。

[arthas@43]$ stack org.apache.spark.sql.execution.datasources.v2.ShowTablesExec run
Press Q or Ctrl+C to abort.
Affect(class count: 1 , method count: 1) cost in 185 ms, listenerId: 35
ts=2024-07-31 14:02:23;thread_name=SparkSQLSessionManager-exec-pool: Thread-520;id=520;is_daemon=true;priority=5;TCCL=org.apache.spark.sql.internal.NonClosableMutableURLClassLoader@5cca5ff3
    @org.apache.spark.sql.execution.datasources.v2.ShowTablesExec.run()
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:517)
        at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1249)
        at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1248)
        at org.apache.kyuubi.plugin.spark.authz.rule.rowfilter.ObjectFilterPlaceHolder.mapChildren(ObjectFilterPlaceHolder.scala:25)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:517)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
        at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:218)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:98)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:95)
        at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.$anonfun$executeStatement$1(ExecuteStatement.scala:99) 
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) 
        at org.apache.kyuubi.engine.spark.operation.SparkOperation.$anonfun$withLocalProperties$1(SparkOperation.scala:192) 
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195) 
        at org.apache.kyuubi.engine.spark.operation.SparkOperation.withLocalProperties(SparkOperation.scala:158) 
        at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.executeStatement(ExecuteStatement.scala:94) 
        at org.apache.kyuubi.engine.spark.operation.ExecuteStatement$$anon$1.run(ExecuteStatement.scala:116) 
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) 
        at java.util.concurrent.FutureTask.run(FutureTask.java:264) 
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
        at java.lang.Thread.run(Thread.java:829) 

从堆栈信息中可清晰看到生成ShowTablesExec任务的过程。

1.2.生成ShowTablesExec任务

ShowTablesExec对应SHOW TABLES语法生成的任务,对应的run方法定义如下:

  override protected def run(): Seq[InternalRow] = {
    val rows = new ArrayBuffer[InternalRow]()

    val tables = catalog.listTables(namespace.toArray)
    tables.map { table =>
      if (pattern.map(StringUtils.filterPattern(Seq(table.name()), _).nonEmpty).getOrElse(true)) {
        rows += toCatalystRow(table.namespace().quoted, table.name(), isTempView(table))
      }
    }

    rows.toSeq
  }

需要通过catalog.listTables来查询库中所有的表信息,catalog的类型为TableCatalog。

1.3.调用Catalog管理器

在《Spark Catalog设计》文章中我们讲到,TableCatalog的实现类有V2SessionCatalog,V2SessionCatalog会使用对应的元数据管理器进行DDL操作,比如HiveExternalCatalog。

解析到SessionCatalog后,SHOW任务后续的执行过程如下所示:

org.apache.spark.sql.catalyst.analysis.ResolveSessionCatalog#apply
org.apache.spark.sql.execution.command.ShowTablesCommand#run
org.apache.spark.sql.hive.HiveSessionCatalog
org.apache.spark.sql.hive.HiveExternalCatalog.listTables
org.apache.spark.sql.hive.client.HiveClient.listTables
org.apache.hadoop.hive.ql.metadata.Hive#getAllTables
最终通过IMetaStoreClient.getTables这个thrift api来完成

最后需要通过调HiveMetaStore的thrift rpc api来实现list all tables的操作。

1.4.结果集处理

对于SHOW这种SQL,kyuubi-spark-authz会在拿到结果集后进行鉴权。通过FilterDataSourceV2Strategy来实现,FilterDataSourceV2Strategy的注册是在SparkSessionExtensions类中完成的。

class RangerSparkExtension extends (SparkSessionExtensions => Unit) {
  SparkRangerAdminPlugin.initialize()

  override def apply(v1: SparkSessionExtensions): Unit = {
    v1.injectCheckRule(AuthzConfigurationChecker)
    v1.injectResolutionRule(_ => RuleReplaceShowObjectCommands)
    v1.injectResolutionRule(_ => RuleApplyPermanentViewMarker)
    v1.injectResolutionRule(_ => RuleApplyTypeOfMarker)
    v1.injectResolutionRule(RuleApplyRowFilter)
    v1.injectResolutionRule(RuleApplyDataMaskingStage0)
    v1.injectResolutionRule(RuleApplyDataMaskingStage1)
    v1.injectOptimizerRule(_ => RuleEliminateMarker)
    v1.injectOptimizerRule(RuleAuthorization)
    v1.injectOptimizerRule(RuleEliminatePermanentViewMarker)
    v1.injectOptimizerRule(_ => RuleEliminateTypeOf)
    v1.injectPlannerStrategy(FilterDataSourceV2Strategy)
  }
}

FilterDataSourceV2Strategy针对不同的SHOW TABLES、SHOW DATABASES、SHOW NAMESPACES会进行不同的结果集过滤处理。源码如下所示:

case class FilterDataSourceV2Strategy(spark: SparkSession) extends Strategy {
  override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
    // For Spark 3.1 and below, `ColumnPruning` rule will set `ObjectFilterPlaceHolder#child` to
    // `Project`
    case ObjectFilterPlaceHolder(Project(_, child)) if child.nodeName == "ShowNamespaces" =>
      spark.sessionState.planner.plan(child)
        .map(FilteredShowNamespaceExec(_, spark.sparkContext)).toSeq

    // For Spark 3.2 and above
    case ObjectFilterPlaceHolder(child) if child.nodeName == "ShowNamespaces" =>
      spark.sessionState.planner.plan(child)
        .map(FilteredShowNamespaceExec(_, spark.sparkContext)).toSeq

    case ObjectFilterPlaceHolder(child) if child.nodeName == "ShowTables" =>
      spark.sessionState.planner.plan(child)
        .map(FilteredShowTablesExec(_, spark.sparkContext)).toSeq

    case _ => Nil
  }
}

这里可以看到对于SHOW TABLES,通过FilteredShowTablesExec实现具体结果集的过滤,FilteredShowTablesExec源码如下:

object FilteredShowTablesExec extends FilteredShowObjectsCheck {
  def apply(delegated: SparkPlan, sc: SparkContext): FilteredShowNamespaceExec = {
    val result = delegated.executeCollect()
      .filter(isAllowed(_, AuthZUtils.getAuthzUgi(sc)))
    new FilteredShowNamespaceExec(result, delegated.output)
  }

  override def isAllowed(r: InternalRow, ugi: UserGroupInformation): Boolean = {
    val database = r.getString(0)
    val table = r.getString(1)
    val isTemp = r.getBoolean(2)
    val objectType = if (isTemp) ObjectType.VIEW else ObjectType.TABLE
    val resource = AccessResource(objectType, database, table, null)
    val request = AccessRequest(resource, ugi, OperationType.SHOWTABLES, AccessType.USE)
    val result = SparkRangerAdminPlugin.isAccessAllowed(request)
    result != null && result.getIsAllowed
  }
}

这里,具体就通过SparkRangerAdminPlugin.isAccessAllowed来进行鉴权,如果没有SHOWTABLES和USE的权限,则鉴权不通过。

2.SHOW语句可能出现的问题

在我们的生产实践过程中,SHOW语法可能会出现以下问题。

2.1.鉴权不通过无法看到结果集

如果鉴权不通过,会出现catalog rpc接口返回值不为空,但是客户端得到的数据集为空,出现结果的不一致。