I'm trying to understand physical plans on spark but I'm not understanding some parts because they seem different from traditional rdbms. For example, in this plan below, it's a plan about a query over a hive table. The query is this:
select
        l_returnflag,
        l_linestatus,
        sum(l_quantity) as sum_qty,
        sum(l_extendedprice) as sum_base_price,
        sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
        sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
        avg(l_quantity) as avg_qty,
        avg(l_extendedprice) as avg_price,
        avg(l_discount) as avg_disc,
        count(*) as count_order
    from
        lineitem
    where
        l_shipdate <= '1998-09-16'
    group by
        l_returnflag,
        l_linestatus
    order by
        l_returnflag,
        l_linestatus;
== Physical Plan ==
Sort [l_returnflag#35 ASC,l_linestatus#36 ASC], true, 0
+- ConvertToUnsafe
   +- Exchange rangepartitioning(l_returnflag#35 ASC,l_linestatus#36 ASC,200), None
      +- ConvertToSafe
         +- TungstenAggregate(key=[l_returnflag#35,l_linestatus#36], functions=[(sum(l_quantity#31),mode=Final,isDistinct=false),(sum(l_extendedpr#32),mode=Final,isDistinct=false),(sum((l_extendedprice#32 * (1.0 - l_discount#33))),mode=Final,isDistinct=false),(sum(((l_extendedprice#32 * (1.0l_discount#33)) * (1.0 + l_tax#34))),mode=Final,isDistinct=false),(avg(l_quantity#31),mode=Final,isDistinct=false),(avg(l_extendedprice#32),mode=Fl,isDistinct=false),(avg(l_discount#33),mode=Final,isDistinct=false),(count(1),mode=Final,isDistinct=false)], output=[l_returnflag#35,l_linestatus,sum_qty#0,sum_base_price#1,sum_disc_price#2,sum_charge#3,avg_qty#4,avg_price#5,avg_disc#6,count_order#7L])
            +- TungstenExchange hashpartitioning(l_returnflag#35,l_linestatus#36,200), None
               +- TungstenAggregate(key=[l_returnflag#35,l_linestatus#36], functions=[(sum(l_quantity#31),mode=Partial,isDistinct=false),(sum(l_exdedprice#32),mode=Partial,isDistinct=false),(sum((l_extendedprice#32 * (1.0 - l_discount#33))),mode=Partial,isDistinct=false),(sum(((l_extendedpri32 * (1.0 - l_discount#33)) * (1.0 + l_tax#34))),mode=Partial,isDistinct=false),(avg(l_quantity#31),mode=Partial,isDistinct=false),(avg(l_extendedce#32),mode=Partial,isDistinct=false),(avg(l_discount#33),mode=Partial,isDistinct=false),(count(1),mode=Partial,isDistinct=false)], output=[l_retulag#35,l_linestatus#36,sum#64,sum#65,sum#66,sum#67,sum#68,count#69L,sum#70,count#71L,sum#72,count#73L,count#74L])
                  +- Project [l_discount#33,l_linestatus#36,l_tax#34,l_quantity#31,l_extendedprice#32,l_returnflag#35]
                     +- Filter (l_shipdate#37 <= 1998-09-16)
                        +- HiveTableScan [l_discount#33,l_linestatus#36,l_tax#34,l_quantity#31,l_extendedprice#32,l_shipdate#37,l_returnflag#35], astoreRelation default, lineitem, None
For what I'm understanding in the plan is:
First starts with a Hive table scan
Then it filter using where the condition
Then project to get the columns we want
Then TungstenAggregate?
Then TungstenExchange?
Then TungstenAggregate again?
Then ConvertToSafe?
Then sorts the final result
But I'm not understanding the 4, 5, 6 and 7 steps. Do you know what they are? I'm looking for information about this so I can understand the plan but I'm not finding anything concrete.
In Spark SQL the physical plan provides the fundamental information about the execution of the query. The objective of this talk is to convey understanding and familiarity of query plans in Spark SQL, and use that knowledge to achieve better performance of Apache Spark queries.
Logical Plan just depicts what I expect as output after applying a series of transformations like join, filter, where, groupBy, etc clause on a particular table. Physical Plan is responsible for deciding the type of join, the sequence of the execution of filter, where, groupBy clause, etc.
The EXPLAIN statement is used to provide logical/physical plans for an input statement. By default, this clause provides information about a physical plan only.
We call it an Unresolved Logical Plan because the column or table names may be inaccurate or may not even exist even when we have a valid code and correct syntax. So, it can be concluded that Spark creates a blank Logical Plan at this step where there are no checks for the column name, table name, etc.
Lets look at the structure of the SQL query you use:
SELECT     ...  -- not aggregated columns  #1     ...  -- aggregated columns      #2 FROM     ...                          -- #3 WHERE     ...                          -- #4 GROUP BY     ...                          -- #5 ORDER BY     ...                          -- #6 As you already suspect:
Filter (...) corresponds to predicates in WHERE clause (#4)Project ... limits number of columns to those required by an union of (#1 and #2, and #4 / #6 if not present in SELECT) HiveTableScan corresponds to FROM clause (#3)Remaining parts can attributed as follows:
#2 from SELECT clause - functions field in TungstenAggregates GROUP BY clause (#5):
TungstenExchange / hash partitioning key field in TungstenAggregates #6 - ORDER BY clause.
Project Tungsten in general describes a set of optimizations used by Spark DataFrames (-sets) including:
sun.misc.Unsafe. It means "native" (off-heap) memory usage and explicit memory allocation / freeing outside GC management. These conversions correspond to ConvertToUnsafe / ConvertToSafe steps in the execution plan. You can learn some interesting details about unsafe from  Understanding sun.misc.Unsafe You can learn more about Tungsten in general from Project Tungsten: Bringing Apache Spark Closer to Bare Metal. Apache Spark 2.0: Faster, Easier, and Smarter provides some examples of code generation.
TungstenAggregate occurs twice because data is first aggregated locally on each partition, than shuffled, and finally merged. If you are familiar with RDD API this process is roughly equivalent to reduceByKey.
If execution plan is not clear you can also try to convert resulting DataFrame to RDD and analyze output of toDebugString.
Tungsten is the new memory engine in Spark since 1.4, which manages data outside JVM to save some GC overhead. You can imagine doing that involves copy data from and to JVM. That's it. In Spark 1.5 you can turn Tungsten off through spark.sql.tungsten.enabled then you will see the "old" plan, in Spark 1.6 I think you can't turn it off any more.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With