如何打印计划

spark plan

首先在本地部署thriftserver

然后建表

CREATE TEMPORARY VIEW lineitem  
USING org.apache.spark.sql.parquet  
OPTIONS (  
  path "/data1/liyang/cppproject/gluten/gluten-core/src/test/resources/tpch-data/lineitem"  
);  

拿到TPC-H Q1

SELECT  
    l_returnflag,  
    l_linestatus,  
    sum(l_quantity) AS sum_qty,  
    sum(l_extendedprice) AS sum_base_price,  
    sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,  
    sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge,  
    avg(l_quantity) AS avg_qty,  
    avg(l_extendedprice) AS avg_price,  
    avg(l_discount) AS avg_disc,  
    count(*) AS count_order  
FROM  
    lineitem  
WHERE  
    l_shipdate <= date'1998-09-02' - interval 1 day  
GROUP BY  
    l_returnflag,  
    l_linestatus  
ORDER BY  
    l_returnflag,  
    l_linestatus;  

执行explain formatted + query,就能看到对应的spark optimized physical plan

+----------------------------------------------------+  
|                        plan                        |  
+----------------------------------------------------+  
| == Physical Plan ==  
CHNativeColumnarToRow (12)  
+- * SortExecTransformer (10)  
   +- ColumnarExchangeAdaptor (9)  
      +- * CHHashAggregateExecTransformer (7)  
         +- ColumnarExchangeAdaptor (6)  
            +- * CHHashAggregateExecTransformer (4)  
               +- * ProjectExecTransformer (3)  
                  +- * FilterExecTransformer (2)  
                     +- * Scan parquet  (1)  
   
   
(1) Scan parquet  
Output [7]: [l_quantity#364, l_extendedprice#365, l_discount#366, l_tax#367, l_returnflag#368, l_linestatus#369, l_shipdate#370]  
Batched: true  
Location: InMemoryFileIndex [file:/data1/liyang/cppproject/gluten/gluten-core/src/test/resources/tpch-data/lineitem]  
PushedFilters: [IsNotNull(l_shipdate), LessThanOrEqual(l_shipdate,1998-09-01)]  
ReadSchema: struct<l_quantity:double,l_extendedprice:double,l_discount:double,l_tax:double,l_returnflag:string,l_linestatus:string,l_shipdate:date>  
   
(2) FilterExecTransformer  
Input [7]: [l_quantity#364, l_extendedprice#365, l_discount#366, l_tax#367, l_returnflag#368, l_linestatus#369, l_shipdate#370]  
Arguments: (isnotnull(l_shipdate#370) AND (l_shipdate#370 <= 1998-09-01))  
   
(3) ProjectExecTransformer  
Input [7]: [l_quantity#364, l_extendedprice#365, l_discount#366, l_tax#367, l_returnflag#368, l_linestatus#369, l_shipdate#370]  
Arguments: [l_quantity#364, l_extendedprice#365, l_discount#366, l_tax#367, l_returnflag#368, l_linestatus#369]  
   
(4) CHHashAggregateExecTransformer  
Input [6]: [l_quantity#364, l_extendedprice#365, l_discount#366, l_tax#367, l_returnflag#368, l_linestatus#369]  
Keys [2]: [l_returnflag#368, l_linestatus#369]  
Functions [8]: [partial_sum(l_quantity#364), partial_sum(l_extendedprice#365), partial_sum((l_extendedprice#365 * (1.0 - l_discount#366))), partial_sum(((l_extendedprice#365 * (1.0 - l_discount#366)) * (1.0 + l_tax#367))), partial_avg(l_quantity#364), partial_avg(l_extendedprice#365), partial_avg(l_discount#366), partial_count(1)]  
Aggregate Attributes [11]: [sum#414, sum#415, sum#416, sum#417, sum#418, count#419L, sum#420, count#421L, sum#422, count#423L, count#424L]  
Results [13]: [l_returnflag#368, l_linestatus#369, sum#425, sum#426, sum#427, sum#428, sum#429, count#430L, sum#431, count#432L, sum#433, count#434L, count#435L]  
   
(5) WholeStageCodegenTransformer (13)  
Input [13]: [l_returnflag#368, l_linestatus#369, sum#425, sum#426, sum#427, sum#428, sum#429, count#430L, sum#431, count#432L, sum#433, count#434L, count#435L]  
   
(6) ColumnarExchangeAdaptor  
Input [13]: [l_returnflag#368, l_linestatus#369, sum#425, sum#426, sum#427, sum#428, sum#429, count#430L, sum#431, count#432L, sum#433, count#434L, count#435L]  
Arguments: hashpartitioning(l_returnflag#368, l_linestatus#369, 5), ENSURE_REQUIREMENTS, false, [plan_id=891], [id=#891]  
   
(7) CHHashAggregateExecTransformer  
Input [13]: [l_returnflag#368, l_linestatus#369, sum#425, sum#426, sum#427, sum#428, sum#429, count#430L, sum#431, count#432L, sum#433, count#434L, count#435L]  
Keys [2]: [l_returnflag#368, l_linestatus#369]  
Functions [8]: [sum(l_quantity#364), sum(l_extendedprice#365), sum((l_extendedprice#365 * (1.0 - l_discount#366))), sum(((l_extendedprice#365 * (1.0 - l_discount#366)) * (1.0 + l_tax#367))), avg(l_quantity#364), avg(l_extendedprice#365), avg(l_discount#366), count(1)]  
Aggregate Attributes [8]: [sum(l_quantity#364)#407, sum(l_extendedprice#365)#408, sum((l_extendedprice#365 * (1.0 - l_discount#366)))#412, sum(((l_extendedprice#365 * (1.0 - l_discount#366)) * (1.0 + l_tax#367)))#413, avg(l_quantity#364)#409, avg(l_extendedprice#365)#410, avg(l_discount#366)#411, count(1)#406L]  
Results [10]: [l_returnflag#368, l_linestatus#369, sum(l_quantity#364)#407 AS sum_qty#393, sum(l_extendedprice#365)#408 AS sum_base_price#394, sum((l_extendedprice#365 * (1.0 - l_discount#366)))#412 AS sum_disc_price#395, sum(((l_extendedprice#365 * (1.0 - l_discount#366)) * (1.0 + l_tax#367)))#413 AS sum_charge#396, avg(l_quantity#364)#409 AS avg_qty#397, avg(l_extendedprice#365)#410 AS avg_price#398, avg(l_discount#366)#411 AS avg_disc#399, count(1)#406L AS count_order#400L]  
   
(8) WholeStageCodegenTransformer (14)  
Input [10]: [l_returnflag#368, l_linestatus#369, sum_qty#393, sum_base_price#394, sum_disc_price#395, sum_charge#396, avg_qty#397, avg_price#398, avg_disc#399, count_order#400L]  
   
(9) ColumnarExchangeAdaptor  
Input [10]: [l_returnflag#368, l_linestatus#369, sum_qty#393, sum_base_price#394, sum_disc_price#395, sum_charge#396, avg_qty#397, avg_price#398, avg_disc#399, count_order#400L]  
Arguments: rangepartitioning(l_returnflag#368 ASC NULLS FIRST, l_linestatus#369 ASC NULLS FIRST, 5), ENSURE_REQUIREMENTS, false, [plan_id=895], [id=#895]  
   
(10) SortExecTransformer  
Input [10]: [l_returnflag#368, l_linestatus#369, sum_qty#393, sum_base_price#394, sum_disc_price#395, sum_charge#396, avg_qty#397, avg_price#398, avg_disc#399, count_order#400L]  
Arguments: [l_returnflag#368 ASC NULLS FIRST, l_linestatus#369 ASC NULLS FIRST], true, 0  
   
(11) WholeStageCodegenTransformer (15)  
Input [10]: [l_returnflag#368, l_linestatus#369, sum_qty#393, sum_base_price#394, sum_disc_price#395, sum_charge#396, avg_qty#397, avg_price#398, avg_disc#399, count_order#400L]  
   
(12) CHNativeColumnarToRow  
Input [10]: [l_returnflag#368, l_linestatus#369, sum_qty#393, sum_base_price#394, sum_disc_price#395, sum_charge#396, avg_qty#397, avg_price#398, avg_disc#399, count_order#400L]  

substrait plan

spark log中substrait plan默认使用 pb.DebugString() 输出,不便于阅读。
在调试代码时,为了更清楚的打印substrait plan,首先在utils/local-engine/Common/common.cpp中,将默认log level从"error"改成"trace”

auto level = config->getString("logger.level", "trace");  

重新编译后执行TCP-H Q1, 可在spark log中发现三种不同的substrait plan,对应三个阶段

第一阶段: 对应聚合的Partial阶段

{"extensions":[{"extensionFunction":{"functionAnchor":7,"name":"avg:req_fp64"}},{"extensionFunction":{"name":"is_not_null:opt_bool_date"}},{"extensionFunction":{"functionAnchor":1,"name":"lte:opt_date_date"}},{"extensionFunction":{"functionAnchor":2,"name":"and:opt_bool_bool"}},{"extensionFunction":{"functionAnchor":3,"name":"subtract:opt_fp64_fp64"}},{"extensionFunction":{"functionAnchor":6,"name":"sum:req_fp64"}},{"extensionFunction":{"functionAnchor":5,"name":"add:opt_fp64_fp64"}},{"extensionFunction":{"functionAnchor":8,"name":"count:req_i32"}},{"extensionFunction":{"functionAnchor":4,"name":"multiply:opt_fp64_fp64"}}],"relations":[{"root":{"input":{"aggregate":{"common":{"direct":{}},"input":{"project":{"common":{"direct":{}},"input":{"project":{"common":{"direct":{}},"input":{"filter":{"common":{"direct":{}},"input":{"read":{"common":{"direct":{}},"baseSchema":{"names":["l_quantity","l_extendedprice","l_discount","l_tax","l_returnflag","l_linestatus","l_shipdate"],"struct":{"types":[{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},{"string":{"nullability":"NULLABILITY_NULLABLE"}},{"string":{"nullability":"NULLABILITY_NULLABLE"}},{"date":{"nullability":"NULLABILITY_NULLABLE"}}]},"partitionColumns":{"columnType":["NORMAL_COL","NORMAL_COL","NORMAL_COL","NORMAL_COL","NORMAL_COL","NORMAL_COL","NORMAL_COL"]}},"filter":{"scalarFunction":{"functionReference":2,"outputType":{"bool":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"scalarFunction":{"outputType":{"bool":{"nullability":"NULLABILITY_REQUIRED"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":6}}}}}]}}},{"value":{"scalarFunction":{"functionReference":1,"outputType":{"bool":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":6}}}}},{"value":{"literal":{"date":10470}}}]}}}]}},"localFiles":{"items":[{"uriFile":"file:///data1/liyang/cppproject/gluten/gluten-core/src/test/resources/tpch-data/lineitem/part-00000-d08071cb-0dfa-42dc-9198-83cb334ccda3-c000.snappy.parquet","length":"17777735","parquet":{}}]}}},"condition":{"scalarFunction":{"functionReference":2,"outputType":{"bool":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"scalarFunction":{"outputType":{"bool":{"nullability":"NULLABILITY_REQUIRED"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":6}}}}}]}}},{"value":{"scalarFunction":{"functionReference":1,"outputType":{"bool":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":6}}}}},{"value":{"literal":{"date":10470}}}]}}}]}}}},"expressions":[{"selection":{"directReference":{"structField":{}}}},{"selection":{"directReference":{"structField":{"field":1}}}},{"selection":{"directReference":{"structField":{"field":2}}}},{"selection":{"directReference":{"structField":{"field":3}}}},{"selection":{"directReference":{"structField":{"field":4}}}},{"selection":{"directReference":{"structField":{"field":5}}}}]}},"expressions":[{"selection":{"directReference":{"structField":{"field":4}}}},{"selection":{"directReference":{"structField":{"field":5}}}},{"selection":{"directReference":{"structField":{}}}},{"selection":{"directReference":{"structField":{"field":1}}}},{"scalarFunction":{"functionReference":4,"outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":1}}}}},{"value":{"scalarFunction":{"functionReference":3,"outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"literal":{"fp64":1}}},{"value":{"selection":{"directReference":{"structField":{"field":2}}}}}]}}}]}},{"scalarFunction":{"functionReference":4,"outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"scalarFunction":{"functionReference":4,"outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":1}}}}},{"value":{"scalarFunction":{"functionReference":3,"outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"literal":{"fp64":1}}},{"value":{"selection":{"directReference":{"structField":{"field":2}}}}}]}}}]}}},{"value":{"scalarFunction":{"functionReference":5,"outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"literal":{"fp64":1}}},{"value":{"selection":{"directReference":{"structField":{"field":3}}}}}]}}}]}},{"selection":{"directReference":{"structField":{"field":2}}}},{"literal":{"i32":1}}]}},"groupings":[{"groupingExpressions":[{"selection":{"directReference":{"structField":{}}}},{"selection":{"directReference":{"structField":{"field":1}}}}]}],"measures":[{"measure":{"functionReference":6,"phase":"AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE","outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":2}}}}}]}},{"measure":{"functionReference":6,"phase":"AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE","outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":3}}}}}]}},{"measure":{"functionReference":6,"phase":"AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE","outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":4}}}}}]}},{"measure":{"functionReference":6,"phase":"AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE","outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":5}}}}}]}},{"measure":{"functionReference":7,"phase":"AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE","outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":2}}}}}]}},{"measure":{"functionReference":7,"phase":"AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE","outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":3}}}}}]}},{"measure":{"functionReference":7,"phase":"AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE","outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":6}}}}}]}},{"measure":{"functionReference":8,"phase":"AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE","outputType":{"i64":{"nullability":"NULLABILITY_REQUIRED"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":7}}}}}]}}]}},"names":["l_returnflag#8","l_linestatus#9","sum#42","sum#43","sum#47","sum#48","avg#44","avg#45","avg#46","count#41"]}}]}  

第二阶段:对应聚合的Final阶段

{"extensions":[{"extensionFunction":{"functionAnchor":1,"name":"avg:req_fp64"}},{"extensionFunction":{"functionAnchor":4,"name":"alias:req_i64"}},{"extensionFunction":{"name":"sum:req_fp64"}},{"extensionFunction":{"functionAnchor":2,"name":"count:req_i32"}},{"extensionFunction":{"functionAnchor":3,"name":"alias:req_fp64"}}],"relations":[{"root":{"input":{"project":{"common":{"direct":{}},"input":{"aggregate":{"common":{"direct":{}},"input":{"read":{"common":{"direct":{}},"baseSchema":{"names":["l_returnflag#8","l_linestatus#9","sum#42#Partial#sum","sum#43#Partial#sum","sum#47#Partial#sum","sum#48#Partial#sum","avg#44#Partial#avg","avg#45#Partial#avg","avg#46#Partial#avg","count#41#Partial#count"],"struct":{"types":[{"string":{"nullability":"NULLABILITY_NULLABLE"}},{"string":{"nullability":"NULLABILITY_NULLABLE"}},{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},{"i64":{"nullability":"NULLABILITY_REQUIRED"}}]}},"localFiles":{"items":[{"uriFile":"iterator:0"}]}}},"groupings":[{"groupingExpressions":[{"selection":{"directReference":{"structField":{}}}},{"selection":{"directReference":{"structField":{"field":1}}}}]}],"measures":[{"measure":{"phase":"AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT","outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":2}}}}}]}},{"measure":{"phase":"AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT","outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":3}}}}}]}},{"measure":{"phase":"AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT","outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":4}}}}}]}},{"measure":{"phase":"AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT","outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":5}}}}}]}},{"measure":{"functionReference":1,"phase":"AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT","outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":6}}}}}]}},{"measure":{"functionReference":1,"phase":"AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT","outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":7}}}}}]}},{"measure":{"functionReference":1,"phase":"AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT","outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":8}}}}}]}},{"measure":{"functionReference":2,"phase":"AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT","outputType":{"i64":{"nullability":"NULLABILITY_REQUIRED"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":9}}}}}]}}]}},"expressions":[{"selection":{"directReference":{"structField":{}}}},{"selection":{"directReference":{"structField":{"field":1}}}},{"scalarFunction":{"functionReference":3,"outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":2}}}}}]}},{"scalarFunction":{"functionReference":3,"outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":3}}}}}]}},{"scalarFunction":{"functionReference":3,"outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":4}}}}}]}},{"scalarFunction":{"functionReference":3,"outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":5}}}}}]}},{"scalarFunction":{"functionReference":3,"outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":6}}}}}]}},{"scalarFunction":{"functionReference":3,"outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":7}}}}}]}},{"scalarFunction":{"functionReference":3,"outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":8}}}}}]}},{"scalarFunction":{"functionReference":4,"outputType":{"i64":{"nullability":"NULLABILITY_REQUIRED"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":9}}}}}]}}]}},"names":["l_returnflag#8","l_linestatus#9","sum_qty#33","sum_base_price#34","sum_disc_price#35","sum_charge#36","avg_qty#37","avg_price#38","avg_disc#39","count_order#40"]}}]}  

第三阶段:基于聚合结果进行排序

{"extensions":[{"extensionFunction":{"functionAnchor":1,"name":"avg:req_fp64"}},{"extensionFunction":{"functionAnchor":4,"name":"alias:req_i64"}},{"extensionFunction":{"name":"sum:req_fp64"}},{"extensionFunction":{"functionAnchor":2,"name":"count:req_i32"}},{"extensionFunction":{"functionAnchor":3,"name":"alias:req_fp64"}}],"relations":[{"root":{"input":{"project":{"common":{"direct":{}},"input":{"aggregate":{"common":{"direct":{}},"input":{"read":{"common":{"direct":{}},"baseSchema":{"names":["l_returnflag#8","l_linestatus#9","sum#42#Partial#sum","sum#43#Partial#sum","sum#47#Partial#sum","sum#48#Partial#sum","avg#44#Partial#avg","avg#45#Partial#avg","avg#46#Partial#avg","count#41#Partial#count"],"struct":{"types":[{"string":{"nullability":"NULLABILITY_NULLABLE"}},{"string":{"nullability":"NULLABILITY_NULLABLE"}},{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},{"i64":{"nullability":"NULLABILITY_REQUIRED"}}]}},"localFiles":{"items":[{"uriFile":"iterator:0"}]}}},"groupings":[{"groupingExpressions":[{"selection":{"directReference":{"structField":{}}}},{"selection":{"directReference":{"structField":{"field":1}}}}]}],"measures":[{"measure":{"phase":"AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT","outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":2}}}}}]}},{"measure":{"phase":"AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT","outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":3}}}}}]}},{"measure":{"phase":"AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT","outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":4}}}}}]}},{"measure":{"phase":"AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT","outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":5}}}}}]}},{"measure":{"functionReference":1,"phase":"AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT","outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":6}}}}}]}},{"measure":{"functionReference":1,"phase":"AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT","outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":7}}}}}]}},{"measure":{"functionReference":1,"phase":"AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT","outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":8}}}}}]}},{"measure":{"functionReference":2,"phase":"AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT","outputType":{"i64":{"nullability":"NULLABILITY_REQUIRED"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":9}}}}}]}}]}},"expressions":[{"selection":{"directReference":{"structField":{}}}},{"selection":{"directReference":{"structField":{"field":1}}}},{"scalarFunction":{"functionReference":3,"outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":2}}}}}]}},{"scalarFunction":{"functionReference":3,"outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":3}}}}}]}},{"scalarFunction":{"functionReference":3,"outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":4}}}}}]}},{"scalarFunction":{"functionReference":3,"outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":5}}}}}]}},{"scalarFunction":{"functionReference":3,"outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":6}}}}}]}},{"scalarFunction":{"functionReference":3,"outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":7}}}}}]}},{"scalarFunction":{"functionReference":3,"outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":8}}}}}]}},{"scalarFunction":{"functionReference":4,"outputType":{"i64":{"nullability":"NULLABILITY_REQUIRED"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":9}}}}}]}}]}},"names":["l_returnflag#8","l_linestatus#9","sum_qty#33","sum_base_price#34","sum_disc_price#35","sum_charge#36","avg_qty#37","avg_price#38","avg_disc#39","count_order#40"]}}]}  

clickhouse plan

同样的,在开启log level = ‘trace'后,在spark log中也能发现clickhouse在不同阶段的plan

第一阶段

Expression (Rename Output)  
Header: l_returnflag#8 Nullable(String)  
        l_linestatus#9 Nullable(String)  
        sum#42 AggregateFunction(sum, Nullable(Float64))  
        sum#43 AggregateFunction(sum, Nullable(Float64))  
        sum#47 AggregateFunction(sum, Nullable(Float64))  
        sum#48 AggregateFunction(sum, Nullable(Float64))  
        avg#44 AggregateFunction(avg, Nullable(Float64))  
        avg#45 AggregateFunction(avg, Nullable(Float64))  
        avg#46 AggregateFunction(avg, Nullable(Float64))  
        count#41 AggregateFunction(count, Int32)  
Actions: INPUT : 0 -> l_returnflag Nullable(String) : 0  
         INPUT : 1 -> l_linestatus Nullable(String) : 1  
         INPUT : 2 -> sum(l_quantity) AggregateFunction(sum, Nullable(Float64)) : 2  
         INPUT : 3 -> sum(l_extendedprice) AggregateFunction(sum, Nullable(Float64)) : 3  
         INPUT : 4 -> sum(multiply(l_extendedprice,minus(1_1,l_discount))) AggregateFunction(sum, Nullable(Float64)) : 4  
         INPUT : 5 -> sum(multiply(multiply(l_extendedprice,minus(1_2,l_discount)),plus(1_3,l_tax))) AggregateFunction(sum, Nullable(Float64)) : 5  
         INPUT : 6 -> avg(l_quantity) AggregateFunction(avg, Nullable(Float64)) : 6  
         INPUT : 7 -> avg(l_extendedprice) AggregateFunction(avg, Nullable(Float64)) : 7  
         INPUT : 8 -> avg(l_discount) AggregateFunction(avg, Nullable(Float64)) : 8  
         INPUT : 9 -> count(1_4) AggregateFunction(count, Int32) : 9  
         ALIAS l_returnflag :: 0 -> l_returnflag#8 Nullable(String) : 10  
         ALIAS l_linestatus :: 1 -> l_linestatus#9 Nullable(String) : 0  
         ALIAS sum(l_quantity) :: 2 -> sum#42 AggregateFunction(sum, Nullable(Float64)) : 1  
         ALIAS sum(l_extendedprice) :: 3 -> sum#43 AggregateFunction(sum, Nullable(Float64)) : 2  
         ALIAS sum(multiply(l_extendedprice,minus(1_1,l_discount))) :: 4 -> sum#47 AggregateFunction(sum, Nullable(Float64)) : 3  
         ALIAS sum(multiply(multiply(l_extendedprice,minus(1_2,l_discount)),plus(1_3,l_tax))) :: 5 -> sum#48 AggregateFunction(sum, Nullable(Float64)) : 4  
         ALIAS avg(l_quantity) :: 6 -> avg#44 AggregateFunction(avg, Nullable(Float64)) : 5  
         ALIAS avg(l_extendedprice) :: 7 -> avg#45 AggregateFunction(avg, Nullable(Float64)) : 6  
         ALIAS avg(l_discount) :: 8 -> avg#46 AggregateFunction(avg, Nullable(Float64)) : 7  
         ALIAS count(1_4) :: 9 -> count#41 AggregateFunction(count, Int32) : 8  
Positions: 10 0 1 2 3 4 5 6 7 8  
  Aggregating  
  Header: l_returnflag Nullable(String)  
          l_linestatus Nullable(String)  
          sum(l_quantity) AggregateFunction(sum, Nullable(Float64))  
          sum(l_extendedprice) AggregateFunction(sum, Nullable(Float64))  
          sum(multiply(l_extendedprice,minus(1_1,l_discount))) AggregateFunction(sum, Nullable(Float64))  
          sum(multiply(multiply(l_extendedprice,minus(1_2,l_discount)),plus(1_3,l_tax))) AggregateFunction(sum, Nullable(Float64))  
          avg(l_quantity) AggregateFunction(avg, Nullable(Float64))  
          avg(l_extendedprice) AggregateFunction(avg, Nullable(Float64))  
          avg(l_discount) AggregateFunction(avg, Nullable(Float64))  
          count(1_4) AggregateFunction(count, Int32)  
  Keys: l_returnflag, l_linestatus  
  Aggregating  
  Header: l_returnflag Nullable(String)  
          l_linestatus Nullable(String)  
          sum(l_quantity) AggregateFunction(sum, Nullable(Float64))  
          sum(l_extendedprice) AggregateFunction(sum, Nullable(Float64))  
          sum(multiply(l_extendedprice,minus(1_1,l_discount))) AggregateFunction(sum, Nullable(Float64))  
          sum(multiply(multiply(l_extendedprice,minus(1_2,l_discount)),plus(1_3,l_tax))) AggregateFunction(sum, Nullable(Float64))  
          avg(l_quantity) AggregateFunction(avg, Nullable(Float64))  
          avg(l_extendedprice) AggregateFunction(avg, Nullable(Float64))  
          avg(l_discount) AggregateFunction(avg, Nullable(Float64))  
          count(1_4) AggregateFunction(count, Int32)  
  Keys: l_returnflag, l_linestatus  
  Aggregates:  
      sum(l_quantity)  
        Function: sum(Nullable(Float64)) → Nullable(Float64)  
        Arguments: l_quantity  
      sum(l_extendedprice)  
        Function: sum(Nullable(Float64)) → Nullable(Float64)  
        Arguments: l_extendedprice  
      sum(multiply(l_extendedprice,minus(1_1,l_discount)))  
        Function: sum(Nullable(Float64)) → Nullable(Float64)  
        Arguments: multiply(l_extendedprice,minus(1_1,l_discount))  
      sum(multiply(multiply(l_extendedprice,minus(1_2,l_discount)),plus(1_3,l_tax)))  
        Function: sum(Nullable(Float64)) → Nullable(Float64)  
        Arguments: multiply(multiply(l_extendedprice,minus(1_2,l_discount)),plus(1_3,l_tax))  
      avg(l_quantity)  
        Function: avg(Nullable(Float64)) → Nullable(Float64)  
        Arguments: l_quantity  
      avg(l_extendedprice)  
        Function: avg(Nullable(Float64)) → Nullable(Float64)  
        Arguments: l_extendedprice  
      avg(l_discount)  
        Function: avg(Nullable(Float64)) → Nullable(Float64)  
        Arguments: l_discount  
      count(1_4)  
        Function: count(Int32) → UInt64  
        Arguments: 1_4  
    Expression (Project)  
    Header: l_returnflag Nullable(String)  
            l_linestatus Nullable(String)  
            l_quantity Nullable(Float64)  
            l_extendedprice Nullable(Float64)  
            multiply(l_extendedprice,minus(1_1,l_discount)) Nullable(Float64)  
            multiply(multiply(l_extendedprice,minus(1_2,l_discount)),plus(1_3,l_tax)) Nullable(Float64)  
            l_discount Nullable(Float64)  
            1_4 Int32  
    Actions: INPUT :: 0 -> l_quantity Nullable(Float64) : 0  
             INPUT : 1 -> l_extendedprice Nullable(Float64) : 1  
             INPUT : 2 -> l_discount Nullable(Float64) : 2  
             INPUT : 3 -> l_tax Nullable(Float64) : 3  
             INPUT :: 4 -> l_returnflag Nullable(String) : 4  
             INPUT :: 5 -> l_linestatus Nullable(String) : 5  
             COLUMN Const(Float64) -> 1_1 Float64 : 6  
             COLUMN Const(Float64) -> 1_2 Float64 : 7  
             COLUMN Const(Float64) -> 1_3 Float64 : 8  
             COLUMN Const(Int32) -> 1_4 Int32 : 9  
             FUNCTION minus(1_1 :: 6, l_discount : 2) -> minus(1_1,l_discount) Nullable(Float64) : 10  
             FUNCTION minus(1_2 :: 7, l_discount : 2) -> minus(1_2,l_discount) Nullable(Float64) : 6  
             FUNCTION plus(1_3 :: 8, l_tax :: 3) -> plus(1_3,l_tax) Nullable(Float64) : 7  
             FUNCTION multiply(l_extendedprice : 1, minus(1_1,l_discount) :: 10) -> multiply(l_extendedprice,minus(1_1,l_discount)) Nullable(Float64) : 3  
             FUNCTION multiply(l_extendedprice : 1, minus(1_2,l_discount) :: 6) -> multiply(l_extendedprice,minus(1_2,l_discount)) Nullable(Float64) : 10  
             FUNCTION multiply(multiply(l_extendedprice,minus(1_2,l_discount)) :: 10, plus(1_3,l_tax) :: 7) -> multiply(multiply(l_extendedprice,minus(1_2,l_discount)),plus(1_3,l_tax)) Nullable(Float64) : 6  
    Positions: 4 5 0 1 3 6 2 9  
      Expression (Project)  
      Header: l_quantity Nullable(Float64)  
              l_extendedprice Nullable(Float64)  
              l_discount Nullable(Float64)  
              l_tax Nullable(Float64)  
              l_returnflag Nullable(String)  
              l_linestatus Nullable(String)  
      Actions: INPUT :: 0 -> l_quantity Nullable(Float64) : 0  
               INPUT :: 1 -> l_extendedprice Nullable(Float64) : 1  
               INPUT :: 2 -> l_discount Nullable(Float64) : 2  
               INPUT :: 3 -> l_tax Nullable(Float64) : 3  
               INPUT :: 4 -> l_returnflag Nullable(String) : 4  
               INPUT :: 5 -> l_linestatus Nullable(String) : 5  
      Positions: 0 1 2 3 4 5  
        Expression (Remove nullable properties)  
        Header: l_quantity Nullable(Float64)  
                l_extendedprice Nullable(Float64)  
                l_discount Nullable(Float64)  
                l_tax Nullable(Float64)  
                l_returnflag Nullable(String)  
                l_linestatus Nullable(String)  
                l_shipdate Date32  
        Actions: INPUT :: 0 -> l_quantity Nullable(Float64) : 0  
                 INPUT :: 1 -> l_extendedprice Nullable(Float64) : 1  
                 INPUT :: 2 -> l_discount Nullable(Float64) : 2  
                 INPUT :: 3 -> l_tax Nullable(Float64) : 3  
                 INPUT :: 4 -> l_returnflag Nullable(String) : 4  
                 INPUT :: 5 -> l_linestatus Nullable(String) : 5  
                 INPUT : 6 -> l_shipdate Nullable(Date32) : 6  
                 FUNCTION assumeNotNull(l_shipdate :: 6) -> l_shipdate Date32 : 7  
        Positions: 0 1 2 3 4 5 7  
          Filter  
          Header: l_quantity Nullable(Float64)  
                  l_extendedprice Nullable(Float64)  
                  l_discount Nullable(Float64)  
                  l_tax Nullable(Float64)  
                  l_returnflag Nullable(String)  
                  l_linestatus Nullable(String)  
                  l_shipdate Nullable(Date32)  
          Filter column: and(isNotNull(l_shipdate),lessOrEquals(l_shipdate,10470_0)) (removed)  
          Actions: INPUT :: 0 -> l_quantity Nullable(Float64) : 0  
                   INPUT :: 1 -> l_extendedprice Nullable(Float64) : 1  
                   INPUT :: 2 -> l_discount Nullable(Float64) : 2  
                   INPUT :: 3 -> l_tax Nullable(Float64) : 3  
                   INPUT :: 4 -> l_returnflag Nullable(String) : 4  
                   INPUT :: 5 -> l_linestatus Nullable(String) : 5  
                   INPUT : 6 -> l_shipdate Nullable(Date32) : 6  
                   COLUMN Const(Int32) -> 10470_0 Date32 : 7  
                   FUNCTION isNotNull(l_shipdate : 6) -> isNotNull(l_shipdate) UInt8 : 8  
                   FUNCTION lessOrEquals(l_shipdate : 6, 10470_0 :: 7) -> lessOrEquals(l_shipdate,10470_0) Nullable(UInt8) : 9  
                   FUNCTION and(isNotNull(l_shipdate) :: 8, lessOrEquals(l_shipdate,10470_0) :: 9) -> and(isNotNull(l_shipdate),lessOrEquals(l_shipdate,10470_0)) Nullable(UInt8) : 7  
          Positions: 0 1 2 3 4 5 6 7  
            ReadFromStorage (read local files)  
            Header: l_quantity Nullable(Float64)  
                    l_extendedprice Nullable(Float64)  
                    l_discount Nullable(Float64)  
                    l_tax Nullable(Float64)  
                    l_returnflag Nullable(String)  
                    l_linestatus Nullable(String)  
                    l_shipdate Nullable(Date32)  

对应的pipeline

alt text

第二阶段

Expression (Rename Output)  
Header: l_returnflag#8 Nullable(String)  
        l_linestatus#9 Nullable(String)  
        sum_qty#33 Nullable(Float64)  
        sum_base_price#34 Nullable(Float64)  
        sum_disc_price#35 Nullable(Float64)  
        sum_charge#36 Nullable(Float64)  
        avg_qty#37 Nullable(Float64)  
        avg_price#38 Nullable(Float64)  
        avg_disc#39 Nullable(Float64)  
        count_order#40 Int64  
Actions: INPUT :: 0 -> l_returnflag#8 Nullable(String) : 0  
         INPUT :: 1 -> l_linestatus#9 Nullable(String) : 1  
         INPUT : 2 -> sum#42#Partial#sum Nullable(Float64) : 2  
         INPUT : 3 -> sum#43#Partial#sum Nullable(Float64) : 3  
         INPUT : 4 -> sum#47#Partial#sum Nullable(Float64) : 4  
         INPUT : 5 -> sum#48#Partial#sum Nullable(Float64) : 5  
         INPUT : 6 -> avg#44#Partial#avg Nullable(Float64) : 6  
         INPUT : 7 -> avg#45#Partial#avg Nullable(Float64) : 7  
         INPUT : 8 -> avg#46#Partial#avg Nullable(Float64) : 8  
         INPUT : 9 -> count#41#Partial#count Int64 : 9  
         ALIAS sum#42#Partial#sum :: 2 -> sum_qty#33 Nullable(Float64) : 10  
         ALIAS sum#43#Partial#sum :: 3 -> sum_base_price#34 Nullable(Float64) : 2  
         ALIAS sum#47#Partial#sum :: 4 -> sum_disc_price#35 Nullable(Float64) : 3  
         ALIAS sum#48#Partial#sum :: 5 -> sum_charge#36 Nullable(Float64) : 4  
         ALIAS avg#44#Partial#avg :: 6 -> avg_qty#37 Nullable(Float64) : 5  
         ALIAS avg#45#Partial#avg :: 7 -> avg_price#38 Nullable(Float64) : 6  
         ALIAS avg#46#Partial#avg :: 8 -> avg_disc#39 Nullable(Float64) : 7  
         ALIAS count#41#Partial#count :: 9 -> count_order#40 Int64 : 8  
Positions: 0 1 10 2 3 4 5 6 7 8  
  Expression (Project)  
  Header: l_returnflag#8 Nullable(String)  
          l_linestatus#9 Nullable(String)  
          sum#42#Partial#sum Nullable(Float64)  
          sum#43#Partial#sum Nullable(Float64)  
          sum#47#Partial#sum Nullable(Float64)  
          sum#48#Partial#sum Nullable(Float64)  
          avg#44#Partial#avg Nullable(Float64)  
          avg#45#Partial#avg Nullable(Float64)  
          avg#46#Partial#avg Nullable(Float64)  
          count#41#Partial#count Int64  
  Actions: INPUT :: 0 -> l_returnflag#8 Nullable(String) : 0  
           INPUT :: 1 -> l_linestatus#9 Nullable(String) : 1  
           INPUT :: 2 -> sum#42#Partial#sum Nullable(Float64) : 2  
           INPUT :: 3 -> sum#43#Partial#sum Nullable(Float64) : 3  
           INPUT :: 4 -> sum#47#Partial#sum Nullable(Float64) : 4  
           INPUT :: 5 -> sum#48#Partial#sum Nullable(Float64) : 5  
           INPUT :: 6 -> avg#44#Partial#avg Nullable(Float64) : 6  
           INPUT :: 7 -> avg#45#Partial#avg Nullable(Float64) : 7  
           INPUT :: 8 -> avg#46#Partial#avg Nullable(Float64) : 8  
           INPUT :: 9 -> count#41#Partial#count Int64 : 9  
  Positions: 0 1 2 3 4 5 6 7 8 9  
    Expression (Convert Aggregate Output)  
    Header: l_returnflag#8 Nullable(String)  
            l_linestatus#9 Nullable(String)  
            sum#42#Partial#sum Nullable(Float64)  
            sum#43#Partial#sum Nullable(Float64)  
            sum#47#Partial#sum Nullable(Float64)  
            sum#48#Partial#sum Nullable(Float64)  
            avg#44#Partial#avg Nullable(Float64)  
            avg#45#Partial#avg Nullable(Float64)  
            avg#46#Partial#avg Nullable(Float64)  
            count#41#Partial#count Int64  
    Actions: INPUT :: 0 -> l_returnflag#8 Nullable(String) : 0  
             INPUT :: 1 -> l_linestatus#9 Nullable(String) : 1  
             INPUT :: 2 -> sum#42#Partial#sum Nullable(Float64) : 2  
             INPUT :: 3 -> sum#43#Partial#sum Nullable(Float64) : 3  
             INPUT :: 4 -> sum#47#Partial#sum Nullable(Float64) : 4  
             INPUT :: 5 -> sum#48#Partial#sum Nullable(Float64) : 5  
             INPUT :: 6 -> avg#44#Partial#avg Nullable(Float64) : 6  
             INPUT :: 7 -> avg#45#Partial#avg Nullable(Float64) : 7  
             INPUT :: 8 -> avg#46#Partial#avg Nullable(Float64) : 8  
             INPUT : 9 -> count#41#Partial#count UInt64 : 9  
             COLUMN Const(String) -> Int64 String : 10  
             FUNCTION _CAST(count#41#Partial#count :: 9, Int64 :: 10) -> _CAST(count#41#Partial#count, Int64) Int64 : 11  
             ALIAS _CAST(count#41#Partial#count, Int64) :: 11 -> count#41#Partial#count Int64 : 10  
    Positions: 0 1 2 3 4 5 6 7 8 10  
      MergingAggregated  
      Header: l_returnflag#8 Nullable(String)  
              l_linestatus#9 Nullable(String)  
              sum#42#Partial#sum Nullable(Float64)  
              sum#43#Partial#sum Nullable(Float64)  
              sum#47#Partial#sum Nullable(Float64)  
              sum#48#Partial#sum Nullable(Float64)  
              avg#44#Partial#avg Nullable(Float64)  
              avg#45#Partial#avg Nullable(Float64)  
              avg#46#Partial#avg Nullable(Float64)  
              count#41#Partial#count UInt64  
      Keys: l_returnflag#8, l_linestatus#9  
      Aggregates:  
          sum#42#Partial#sum  
            Function: sumPartialMerge(AggregateFunction(sum, Nullable(Float64))) → Nullable(Float64)  
            Arguments: sum#42#Partial#sum  
          sum#43#Partial#sum  
            Function: sumPartialMerge(AggregateFunction(sum, Nullable(Float64))) → Nullable(Float64)  
            Arguments: sum#43#Partial#sum  
          sum#47#Partial#sum  
            Function: sumPartialMerge(AggregateFunction(sum, Nullable(Float64))) → Nullable(Float64)  
            Arguments: sum#47#Partial#sum  
          sum#48#Partial#sum  
            Function: sumPartialMerge(AggregateFunction(sum, Nullable(Float64))) → Nullable(Float64)  
            Arguments: sum#48#Partial#sum  
          avg#44#Partial#avg  
            Function: avgPartialMerge(AggregateFunction(avg, Nullable(Float64))) → Nullable(Float64)  
            Arguments: avg#44#Partial#avg  
          avg#45#Partial#avg  
            Function: avgPartialMerge(AggregateFunction(avg, Nullable(Float64))) → Nullable(Float64)  
            Arguments: avg#45#Partial#avg  
          avg#46#Partial#avg  
            Function: avgPartialMerge(AggregateFunction(avg, Nullable(Float64))) → Nullable(Float64)  
            Arguments: avg#46#Partial#avg  
          count#41#Partial#count  
            Function: countPartialMerge(AggregateFunction(count, Int64)) → UInt64  
            Arguments: count#41#Partial#count  
        ReadFromPreparedSource (Read From Java Iter)  
        Header: l_returnflag#8 Nullable(String)  
                l_linestatus#9 Nullable(String)  
                sum#42#Partial#sum AggregateFunction(sum, Nullable(Float64))  
                sum#43#Partial#sum AggregateFunction(sum, Nullable(Float64))  
                sum#47#Partial#sum AggregateFunction(sum, Nullable(Float64))  
                sum#48#Partial#sum AggregateFunction(sum, Nullable(Float64))  
                avg#44#Partial#avg AggregateFunction(avg, Nullable(Float64))  
                avg#45#Partial#avg AggregateFunction(avg, Nullable(Float64))  
                avg#46#Partial#avg AggregateFunction(avg, Nullable(Float64))  
                count#41#Partial#count AggregateFunction(count, Int64)  

对应的pipeline

alt text

第三阶段

Expression (Rename Output)  
Header: l_returnflag#8 Nullable(String)  
        l_linestatus#9 Nullable(String)  
        sum_qty#33 Nullable(Float64)  
        sum_base_price#34 Nullable(Float64)  
        sum_disc_price#35 Nullable(Float64)  
        sum_charge#36 Nullable(Float64)  
        avg_qty#37 Nullable(Float64)  
        avg_price#38 Nullable(Float64)  
        avg_disc#39 Nullable(Float64)  
        count_order#40 Int64  
Actions: INPUT :: 0 -> l_returnflag#8 Nullable(String) : 0  
         INPUT :: 1 -> l_linestatus#9 Nullable(String) : 1  
         INPUT :: 2 -> sum_qty#33 Nullable(Float64) : 2  
         INPUT :: 3 -> sum_base_price#34 Nullable(Float64) : 3  
         INPUT :: 4 -> sum_disc_price#35 Nullable(Float64) : 4  
         INPUT :: 5 -> sum_charge#36 Nullable(Float64) : 5  
         INPUT :: 6 -> avg_qty#37 Nullable(Float64) : 6  
         INPUT :: 7 -> avg_price#38 Nullable(Float64) : 7  
         INPUT :: 8 -> avg_disc#39 Nullable(Float64) : 8  
         INPUT :: 9 -> count_order#40 Int64 : 9  
Positions: 0 1 2 3 4 5 6 7 8 9  
  Sorting (Sorting step)  
  Header: l_returnflag#8 Nullable(String)  
          l_linestatus#9 Nullable(String)  
          sum_qty#33 Nullable(Float64)  
          sum_base_price#34 Nullable(Float64)  
          sum_disc_price#35 Nullable(Float64)  
          sum_charge#36 Nullable(Float64)  
          avg_qty#37 Nullable(Float64)  
          avg_price#38 Nullable(Float64)  
          avg_disc#39 Nullable(Float64)  
          count_order#40 Int64  
  Sort description: l_returnflag#8 ASC, l_linestatus#9 ASC  
    ReadFromPreparedSource (Read From Java Iter)  
    Header: l_returnflag#8 Nullable(String)  
            l_linestatus#9 Nullable(String)  
            sum_qty#33 Nullable(Float64)  
            sum_base_price#34 Nullable(Float64)  
            sum_disc_price#35 Nullable(Float64)  
            sum_charge#36 Nullable(Float64)  
            avg_qty#37 Nullable(Float64)  
            avg_price#38 Nullable(Float64)  
            avg_disc#39 Nullable(Float64)  
            count_order#40 Int64  

对应的Pipeline

alt text

其他

如何在clickhouse-client中打印AST,Plan,Pipeline呢?
到测试环境找到一条query:

SELECT  
    (intDiv(toUInt32(rtime), 10) * 10) * 1000 AS t,  
    count()  
FROM default.test_ranger_0  
WHERE ((day >= toDate(1678356215)) AND (day <= toDate(1678356515))) AND ((rtime >= toDateTime(1678356215)) AND (rtime <= toDateTime(1678356515)))  
GROUP BY t  
ORDER BY t ASC  

打印AST: explain ast ,这一步会打印出query对应的语法树。但是输出结果不太便于阅读。我们可以添加选项explain ast graph = 1 ,该命令会输出dot代码,dot代码可通过在线工具转化为矢量图。

alt text

规整化query: explain syntax , 该命令会格式化query, 并且显示经过AST优化之后的query

EXPLAIN SYNTAX  
SELECT sumIf(1, (number % 2) > 2)  
FROM numbers(100)  
   
Query id: 3fb3b332-a988-43c5-9677-9fff8c44ff3d  
   
┌─explain──────────────────────────┐  
│ SELECT countIf((number % 2) > 2) │  
│ FROM numbers(100)                │  
└──────────────────────────────────┘  

打印Plan: explain plan header = 1, actions = 1

EXPLAIN header = 1, actions = 1  
SELECT  
    (intDiv(toUInt32(rtime), 10) * 10) * 1000 AS t,  
    count()  
FROM default.test_ranger_0  
WHERE ((day >= toDate(1678356215)) AND (day <= toDate(1678356515))) AND ((rtime >= toDateTime(1678356215)) AND (rtime <= toDateTime(1678356515)))  
GROUP BY t  
ORDER BY t ASC  
   
Query id: f395457f-a684-4fc3-b6c9-6e9eb7ece5fe  
   
┌─explain────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐  
│ Expression (Projection)                                                                                                                                                                                                                                    │  
│ Header: t UInt64                                                                                                                                                                                                                                           │  
│         count() UInt64                                                                                                                                                                                                                                     │  
│ Actions: INPUT : 0 -> multiply(multiply(intDiv(toUInt32(rtime), 10), 10), 1000) UInt64 : 0                                                                                                                                                                 │  
│          INPUT :: 1 -> count() UInt64 : 1                                                                                                                                                                                                                  │  
│          ALIAS multiply(multiply(intDiv(toUInt32(rtime), 10), 10), 1000) :: 0 -> t UInt64 : 2                                                                                                                                                              │  
│ Positions: 2 1                                                                                                                                                                                                                                             │  
│   Sorting (Sorting for ORDER BY)                                                                                                                                                                                                                           │  
│   Header: multiply(multiply(intDiv(toUInt32(rtime), 10), 10), 1000) UInt64                                                                                                                                                                                 │  
│           count() UInt64                                                                                                                                                                                                                                   │  
│   Sort description: multiply(multiply(intDiv(toUInt32(rtime), 10), 10), 1000) ASC                                                                                                                                                                          │  
│     Expression (Before ORDER BY)                                                                                                                                                                                                                           │  
│     Header: multiply(multiply(intDiv(toUInt32(rtime), 10), 10), 1000) UInt64                                                                                                                                                                               │  
│             count() UInt64                                                                                                                                                                                                                                 │  
│     Actions: INPUT :: 0 -> multiply(multiply(intDiv(toUInt32(rtime), 10), 10), 1000) UInt64 : 0                                                                                                                                                            │  
│              INPUT :: 1 -> count() UInt64 : 1                                                                                                                                                                                                              │  
│     Positions: 0 1                                                                                                                                                                                                                                         │  
│       Aggregating                                                                                                                                                                                                                                          │  
│       Header: multiply(multiply(intDiv(toUInt32(rtime), 10), 10), 1000) UInt64                                                                                                                                                                             │  
│               count() UInt64                                                                                                                                                                                                                               │  
│       Keys: multiply(multiply(intDiv(toUInt32(rtime), 10), 10), 1000)                                                                                                                                                                                      │  
│       Aggregates:                                                                                                                                                                                                                                          │  
│           count()                                                                                                                                                                                                                                          │  
│             Function: count() → UInt64                                                                                                                                                                                                                     │  
│             Arguments: none                                                                                                                                                                                                                                │  
│       Skip merging: 0                                                                                                                                                                                                                                      │  
│         Expression (Before GROUP BY)                                                                                                                                                                                                                       │  
│         Header: multiply(multiply(intDiv(toUInt32(rtime), 10), 10), 1000) UInt64                                                                                                                                                                           │  
│         Actions: INPUT : 0 -> rtime DateTime : 0                                                                                                                                                                                                           │  
│                  COLUMN Const(UInt8) -> 10 UInt8 : 1                                                                                                                                                                                                       │  
│                  COLUMN Const(UInt16) -> 1000 UInt16 : 2                                                                                                                                                                                                   │  
│                  FUNCTION toUInt32(rtime :: 0) -> toUInt32(rtime) UInt32 : 3                                                                                                                                                                               │  
│                  FUNCTION intDiv(toUInt32(rtime) :: 3, 10 : 1) -> intDiv(toUInt32(rtime), 10) UInt32 : 0                                                                                                                                                   │  
│                  FUNCTION multiply(intDiv(toUInt32(rtime), 10) :: 0, 10 :: 1) -> multiply(intDiv(toUInt32(rtime), 10), 10) UInt64 : 3                                                                                                                      │  
│                  FUNCTION multiply(multiply(intDiv(toUInt32(rtime), 10), 10) :: 3, 1000 :: 2) -> multiply(multiply(intDiv(toUInt32(rtime), 10), 10), 1000) UInt64 : 1                                                                                      │  
│         Positions: 1                                                                                                                                                                                                                                       │  
│           Filter (WHERE)                                                                                                                                                                                                                                   │  
│           Header: rtime DateTime                                                                                                                                                                                                                           │  
│           Filter column: and(and(greaterOrEquals(day, toDate(1678356215)), lessOrEquals(day, toDate(1678356515))), and(greaterOrEquals(rtime, toDateTime(1678356215)), lessOrEquals(rtime, toDateTime(1678356515)))) (removed)                             │  
│           Actions: INPUT : 0 -> rtime DateTime : 0                                                                                                                                                                                                         │  
│                    INPUT : 1 -> day Date : 1                                                                                                                                                                                                               │  
│                    COLUMN Const(UInt16) -> toDate(1678356215) Date : 2                                                                                                                                                                                     │  
│                    COLUMN Const(UInt16) -> toDate(1678356515) Date : 3                                                                                                                                                                                     │  
│                    COLUMN Const(UInt32) -> toDateTime(1678356215) DateTime : 4                                                                                                                                                                             │  
│                    COLUMN Const(UInt32) -> toDateTime(1678356515) DateTime : 5                                                                                                                                                                             │  
│                    FUNCTION greaterOrEquals(day : 1, toDate(1678356215) :: 2) -> greaterOrEquals(day, toDate(1678356215)) UInt8 : 6                                                                                                                        │  
│                    FUNCTION lessOrEquals(day :: 1, toDate(1678356515) :: 3) -> lessOrEquals(day, toDate(1678356515)) UInt8 : 2                                                                                                                             │  
│                    FUNCTION greaterOrEquals(rtime : 0, toDateTime(1678356215) :: 4) -> greaterOrEquals(rtime, toDateTime(1678356215)) UInt8 : 3                                                                                                            │  
│                    FUNCTION lessOrEquals(rtime : 0, toDateTime(1678356515) :: 5) -> lessOrEquals(rtime, toDateTime(1678356515)) UInt8 : 4                                                                                                                  │  
│                    FUNCTION and(greaterOrEquals(day, toDate(1678356215)) :: 6, lessOrEquals(day, toDate(1678356515)) :: 2) -> and(greaterOrEquals(day, toDate(1678356215)), lessOrEquals(day, toDate(1678356515))) UInt8 : 5                               │  
│                    FUNCTION and(greaterOrEquals(rtime, toDateTime(1678356215)) :: 3, lessOrEquals(rtime, toDateTime(1678356515)) :: 4) -> and(greaterOrEquals(rtime, toDateTime(1678356215)), lessOrEquals(rtime, toDateTime(1678356515))) UInt8 : 2       │  
│                    FUNCTION and(and(greaterOrEquals(day, toDate(1678356215)), lessOrEquals(day, toDate(1678356515))) :: 5, and(greaterOrEquals(rtime, toDateTime(1678356215)), lessOrEquals(rtime, toDateTime(1678356515))) :: 2) -> and(and(greaterOrEquals(day, toDate(1678356215)), lessOrEquals(day, toDate(1678356515))), and(greaterOrEquals(rtime, toDateTime(1678356215)), lessOrEquals(rtime, toDateTime(1678356515)))) UInt8 : 4 │  
│           Positions: 0 4                                                                                                                                                                                                                                   │  
│             ReadFromPreparedSource (Read from NullSource)                                                                                                                                                                                                  │  
│             Header: rtime DateTime                                                                                                                                                                                                                         │  
│                     day Date     

打印pipeline: explain pipeline header = 1, graph = 1 ,同explain ast graph = 1类似,也会输出dot代码,可在线转化成图形。
alt text

计划解读

表达式

CH表达式

我们截取TPC-H Q1的CH计划片段

Expression (Project)  
    Header: l_returnflag Nullable(String)  
            l_linestatus Nullable(String)  
            l_quantity Nullable(Float64)  
            l_extendedprice Nullable(Float64)  
            multiply(l_extendedprice,minus(1_1,l_discount)) Nullable(Float64)  
            multiply(multiply(l_extendedprice,minus(1_2,l_discount)),plus(1_3,l_tax)) Nullable(Float64)  
            l_discount Nullable(Float64)  
            1_4 Int32  
    Actions: INPUT :: 0 -> l_quantity Nullable(Float64) : 0  
             INPUT : 1 -> l_extendedprice Nullable(Float64) : 1  
             INPUT : 2 -> l_discount Nullable(Float64) : 2  
             INPUT : 3 -> l_tax Nullable(Float64) : 3  
             INPUT :: 4 -> l_returnflag Nullable(String) : 4  
             INPUT :: 5 -> l_linestatus Nullable(String) : 5  
             COLUMN Const(Float64) -> 1_1 Float64 : 6  
             COLUMN Const(Float64) -> 1_2 Float64 : 7  
             COLUMN Const(Float64) -> 1_3 Float64 : 8  
             COLUMN Const(Int32) -> 1_4 Int32 : 9  
             FUNCTION minus(1_1 :: 6, l_discount : 2) -> minus(1_1,l_discount) Nullable(Float64) : 10  
             FUNCTION minus(1_2 :: 7, l_discount : 2) -> minus(1_2,l_discount) Nullable(Float64) : 6  
             FUNCTION plus(1_3 :: 8, l_tax :: 3) -> plus(1_3,l_tax) Nullable(Float64) : 7  
             FUNCTION multiply(l_extendedprice : 1, minus(1_1,l_discount) :: 10) -> multiply(l_extendedprice,minus(1_1,l_discount)) Nullable(Float64) : 3  
             FUNCTION multiply(l_extendedprice : 1, minus(1_2,l_discount) :: 6) -> multiply(l_extendedprice,minus(1_2,l_discount)) Nullable(Float64) : 10  
             FUNCTION multiply(multiply(l_extendedprice,minus(1_2,l_discount)) :: 10, plus(1_3,l_tax) :: 7) -> multiply(multiply(l_extendedprice,minus(1_2,l_discount)),plus(1_3,l_tax)) Nullable(Float64) : 6  
    Positions: 4 5 0 1 3 6 2 9  

表达式计算一般出现在Project算子里

  • Header: Project算子的输出schema
  • Actions: Project算子的计算内容
  • Position: Actions到header的映射关系

Actions分成三种类型:

  • Input: 由前置算子输入的column
  • Column: Literal值,const column
  • Alias: 为输出增加alias name
  • Function: 普通函数计算,
  • ArrayJoin: 与普通函数不同,CH中针对array join特殊处理。对应着Spark中的lateral view explode

相关源码:

ActionsDAG: 用于表示某个算子中的一组表达式,这些表达式共同组成了一个DAG.

  • addInput: 增加输入column
  • addColumn: 增加literal column
  • addAlias: 增加alias映射
  • addArrayJoin: 增加array join计算
  • addFunction: 增加普通函数计算
  • addCast: 本质上还是增加普通函数cast, 只是方便调用
  • findInOutputs: 从actions的输出中找到指定列
  • addOrReplaceInOutputs: 从actions的输出中找到指定列并取而代之。如果不存在则直接添加。
  • project:规整化ActionsDAG, 批量添加必要的输出列和ALIAS, 删除无用的action(没有被任何输入所依赖的计算)

ActionsVisitor: 用于在ExpressionAnalyzer中,将输入的AST解析成不同阶段的ActionsDAG

ExpressionActionsChain: 表示多组ActionsDag, 例如SELECT和WHERE中的expression对应两个独立的ActionsDAG。用于ExpressionAnalyzer中,将解析AST得到的ActionsDAG追加到其中。

ExpressionActions: 基于输入的Block执行ActionsDAG。用于Expression算子执行(ExpressionStep/ExpressionTransform)

ArrayJoinAction: 基于输入的Block执行Array Join Action,注意与ExpressionActions是并列关系。用于ArrayJoin算子执行(ArrayJoinStep/ArrayJoinTransform)

substrait表达式

message Expression {  
  oneof rex_type {  
    /// query中的常量,分为不同的类型,除了interval/fixedxxx/userdefined外都已支持  
    Literal literal = 1;  
    /// 对字段的引用,例如select a from table,select a.b from table  
    FieldReference selection = 2;  
    /// 普通函数  
    ScalarFunction scalar_function = 3;  
    /// 窗口函数  
    WindowFunction window_function = 5;  
    /// 对应case when或if clause  
    IfThen if_then = 6;  
    /// 目前没用到,其实是IfThen的子集  
    SwitchExpression switch_expression = 7;  
    /// 用于表示 a in (b, c, d)  
    SingularOrList singular_or_list = 8;  
    /// 没用到  
    MultiOrList multi_or_list = 9;  
    /// 将input的类型强转成指定类型,failure_behavior表示失败后如何处理  
    Cast cast = 11;  
    /// 没用到  
    Subquery subquery = 12;  
    /// 没用到  
    Nested nested = 13;  
   
   
    // deprecated: enum literals are only sensible in the context of  
    // function arguments, for which FunctionArgument should now be  
    // used  
    Enum enum = 10 [deprecated = true];  
  }  
  

substrait到CH表达式的转化

代码入口:expressionsToActionsDAG,将substrait::Expression转化为CH中的ActionsDAG

  • 如果Expression类型是FieldReference, 根据input schema获取column name, 到ActionsDAG的output column中搜索,加入到required_columns中
  • 如果Expression类型是ScalarFunction
    • 递归解析function arguments(也是substrait::Expression类型),并将其加入到ActionsDAG中
    • 根据映射关系(SCALAR_FUNCTIONS)将substrait function name转化为ch function name,生成FunctionOverloadResolverPtr
    • 执行ActionsDAG::addFunction将该函数加入其中。例外:alias函数用ActionsDAG::addAlias, array join函数用ActionsDAG::addArrayJoin
  • 如果Expression类型是Literal, 将Literal对应的const column通过ActionsDAG::addColumn加入其中
  • 如果Expression类型是Cast, 处理类似ScalarFunction,对应ch function: toInt/toUInt/toString/toDecimal/toDate32
  • 如果Expression类型是IfThen, 处理类似ScalaFunction, 对应ch function: multiIf
  • 其他:略

算子

substrait算子

// A relation with output field names.  
//  
// This is for use at the root of a `Rel` tree.  
message RelRoot {  
  // A relation  
  Rel input = 1;  
  // Field names in depth-first order  
  repeated string names = 2;  
}  
   
   
// A relation (used internally in a plan)  
message Rel {  
  oneof rel_type {  
    /// source算子,ReadFromPreparedSource  
    ReadRel read = 1;  
    /// filter算子,对应CH FilterStep/FilterTransform  
    FilterRel filter = 2;  
    /// Limit算子,对应CH LimitStep/LimitTransform  
    FetchRel fetch = 3;  
    /// aggregate算子,partial阶段对应AggregatingStep/AggregatingTransform, final阶段对应MergingAggregated/MergingAggregatedTransform  
    AggregateRel aggregate = 4;  
    /// sort算子,对应CH SortingStep/SortingTransform  
    SortRel sort = 5;  
    /// join算子,对应CH JoinStep/JoiningTransform  
    JoinRel join = 6;  
    /// project算子, 对应CH ExpressionStep/ExpressionTransform  
    ProjectRel project = 7;  
    SetRel set = 8;  
    ExtensionSingleRel extension_single = 9;  
    ExtensionMultiRel extension_multi = 10;  
    ExtensionLeafRel extension_leaf = 11;  
    /// cross join算子,暂时没用  
    CrossRel cross = 12;  
    /// grouping sets算子,对应CH ExpandStep/ExpandTransform  
    ExpandRel expand = 13;  
    /// window算子,对应CH WindowStep/WindowTransform  
    WindowRel window = 14;  
    /// generate算子,对应spark explode lateral view,对应CH ArrayJoinStep/ArrayJoinTransform  
    GenerateRel generate = 15;  
  }  
}  
  

CH算子

注意这里说的算子是物理算子

接口:IQueryPlanStep
代码:src/Processors/QueryPlan/*Step.h
CH中算子按照拓扑结构分成三类

  • Source算子:输入为空。读取data source的算子,例如读取MergeTree、Parquet、ORC、Java Iterator
  • Transform算子:输入和输出非空。如project, filter, join, aggregate等
  • Sink算子:输出为空。将数据写入到MergeTree、Parquet、ORC文件中。

CH中包含算子的QueryPlan会构建QueryPipeLine,然后由PipelineExecutor执行
alt text

Source算子的实现

ReadFromStorage (read local files)  
Header: l_quantity Nullable(Float64)  
        l_extendedprice Nullable(Float64)  
        l_discount Nullable(Float64)  
        l_tax Nullable(Float64)  
        l_returnflag Nullable(String)  
        l_linestatus Nullable(String)  
        l_shipdate Nullable(Date32)  

ReadFromStorageStep算子用于将现成的Pipe加入到QueryPlan中作为Source算子。这个设计的精妙之处在于:

  • 通过ReadBuffer的抽象,我们可通过ReadFromStorageStep读取CH已支持的各种压缩格式(zstd, lz4, snappy等)、各种文件系统(local, s3, hdfs)
  • 通过InputFormat的抽象,我们可通过ReadFromStorageStep读取任意CH已支持的Format(CSV, Parquet, Arrow, ORC等)

用户通过ReadBuffer和InputFormat的排列组合即可生成ReadFromStorageStep,实现各类功能

  • 首先根据压缩格式、文件系统构造ReadBuffer对象
  • 其次根据ReadBuffer对象构造InputFormat对象
  • InputFormat对象本质上是一个Processor, 通过它可构造Pipe
  • 通过Pipe构造ReadFromStorageStep,加入到QueryPlan中
  • 通过QueryPlan构造QueryPipeline, 通过PipelineExecutor执行之

扩展阅读:

  • ReadBufferFromHDFS: 从hdfs filesystem读取数据
  • ReadBufferFromFile:从local filesystem读取数据
  • ReadBufferFromS3: 从s3 filesystem读取数据
  • Lz4InflatingReadBuffer:在上游ReadBuffer上级联Lz4解压
  • ZstdInflatingReadBuffer:在上游ReadBuffer上级联zstd解压
  • SnappyReadBuffer: 在上游ReadBuffer上级联snappy解压
  • CSVRowInputFormat:从ReadBuffer读取csv数据并解析后输出Block
  • ParquetBlockInputFormat: 从ReadBuffer读取parquet数据并解析后输出Block
  • ORCBlockInputFormat: 从ReadBuffer中读取orc数据并解析后输出Block

Project算子的实现

流程如下:

  • 首先构造Pipeline: 调用ExpressionStep::transformPipeline
    • 由ActionsDAG构造ExpressionActions,继而构造ExpressionTransform,追加到pipeline中。
    • 如果发现pipeline的输出header与ExpressionStep的输出header有差异,生成一个新的ActionsDAG实现类型强转,追加到pipeline中
  • 然后执行Pipeline: 调用ExpressionTransform::transform → ExpressionActions::execute
    • 根据计算required_columns到input block中字段的映射关系
    • 遍历actions, 调用executeAction执行。生成新的Block并替换回输入Block
      • 如果action是FUNCTION类型,如果is_lazy_executed = true(一般用于and/or/multiIf的短路执行优化)时,result column为ColumnFunction,此时不会执行function;如果is_function_compiled = true(用于JIT等优化场景),则执行编译好的CompiledFunction。否则执行普通函数
      • 如果action是ARRAY_JOIN类型,根据array join key column的offset对其他column执行replicate操作,生成result column
      • 如果action是COLUMN类型,修改const column的num_rows
      • 如果action是ALIAS类型,将source column复制到result column,并修改result column的name
      • 如果action是INPUT类型,将inputs中的column move到result position

Aggregate算子的实现

第一阶段: AggregatingStep,完成初步聚合

  • 首先构造pipeline: 调用AggregatingStep::transformPipeline
    • 如果pipeline的输出并行度>1, 构造parallel的AggregatingTransform,加入到pipeline中
    • 如果pipeline的输出并行度 = 1, 构造串行的AggregatingTransform加入pipeline
  • 然后执行pipeline: 调用AggregatingTransform::work
    • 首先从上游算子pull一个chunk
    • 不断调用Aggregator::executeOnBlock对输入的block进行聚合,创建AggregateFunction columns存储聚合的中间状态并不断更新。Aggregator::executeOnBlock的流程:
      • 首先根据aggregate method初始化AggregatedDataVariants中的聚合函数指针
      • 如果是without_key method, 初始化AggregatedDataVariants中的without_key聚合结构,并执行executeWithoutKeyImpl
        • 遍历每一个aggregate function, 执行对应的addBatchSinglePlace方法
      • 如果是其他method, 执行executeImplBatch
        • 遍历每一行,判断该行的key是否重复,如果不重复新建method对应的aggregate data, 否则复用已有data. places[row_idx]对应着该行的aggregate data,然后遍历每一个aggregate function, 批量执行add, 将数据聚合到对应的aggregate data中。
    • 当上游无输入或达到max_rows_to_group_by上限时,AggregatingTransform::consume结束, is_consume_finished = true
    • 判断当前AggregatingTransform是否为同一批并行聚合算子中最后一个完成consume的。如果不是则当前算子结束。否则:
      • 对所有并行算子产出的ManyAggregatedDataVariants结构进行预处理:去掉空值,按照size大小逆序排列,预分配内存池,都是为了性能,见Aggregator::prepareVariantsToMerge
      • 在pipeline中后置ConvertingAggregatedToChunksTransform算子,将聚合中间状态进行合并,流程如下:
        • 首先初始化:分配arena,针对without key聚合有特殊处理
        • 开始合并:Aggregator::mergeSingleLevelDataImpl, 对每一项aggregate data, 调用mergeDataImpl将他们合并到一处,再调用convertToBlockImpl将汇总好的aggregate data转化成Block。如果final = true, 则以最终聚合结果输出,否则以中间结果的形式输出

第二阶段:MergingAggregatedStep,完成最终聚合

  • 首先构造MergingAggregatedTransform加入到pipeline中,并行度为1
  • 然后执行pipeline: 调用MergingAggregatedTransform::work
  • consume阶段:不断调用MergingAggregatedTransform::consume消费输入的Block。该阶段会将输入的column追加到内存中。
  • generate阶段:调用Aggregator::mergeBlocks将之前输入的blocks进行合并
    • 如果是aggregate method = without_key,用mergeBlockWithoutKeyStreamsImpl
    • 否则用mergeStreamsImpl → mergeStreamsImplCase
      • 遍历每一行,判断key是否重复,指定该行的aggregate data
      • 遍历每个aggregate function, 执行mergeBatch,将输入block中的aggregate data合并到目标aggregate data中。

注意:CH Aggregator中针对不同key的类型进行了深度优化,例如无key, 不同长度的整形key,string类型的key, LowCardinality类型的key, nullable类型的key,用于聚合的算法与数据结构都是不一样的,体现了CH极致的性能优化策略。

从代码中可以看到,IAggregateFunction有不同的add接口用于处理输入:

  • add: 一次处理一行数据,性能不如addBatch
  • addBatchSinglePlace: 一次处理一批数据,支持if combinator(可选),如果当前行的if条件不成立,则该行不参与聚合
  • addBatchSinglePlaceNotNull: 一次处理一批数据,支持if/null combinator,前者可选。如果当前行尾null值,则该行不参与聚合
  • addManyDefaults: 一次处理一批default value
  • addBatchSparse: 一次处理一批ColumnSparse类型的输入,针对ColumnSparse结构做了优化
  • merge: 一次merge ColumnAggregateFunction中的一行数据
  • mergeBatch: 一次merge ColumnAggregateFunction中的一批数据

2.2.2.4 Sort算子的实现
代码入口:SortingStep
流程(以full sort为例):

  • 构造pipeline:
    • 如果有limit clause, 则将PartialSortingTransform加入pipeline
    • 追加LimitsCheckingTransform(为了检查是否超出local rows/bytes限制)
    • 追加MergeSortingTransform
    • 如果pipeline并发度>1, 追加MergingSortedTransform
  • 执行pipeline:
    • PartialSortingTransform: 不断地对input Block进行topn排序,然后输出。这一步是粗排。这里有个性能优化:维护一组topn相关的阈值,高于该阈值的行全部过滤掉,低于该阈值的行才执行下一步的sortBlock。随着迭代不断调整该阈值。
    • MergeSortingTransform: 将上游PartialSortingTransform输出的多个局部有序chunk进行归并排序,输出全局排序数据
      • consume阶段:小优化(去掉输入Block中的const column),加入到chunks队列。随着chunks中数据积累到一定阈值,执行remerge(MergeSorter)。
      • generate阶段:首先创建MergeSorter, 从中读取chunk, 然后之前去掉的const column加入到Chunk中来。
    • MergingSortedTransform: 将上游多路MergeSortingTransform的有序输入进行归并排序。具体算法参考MergingSortedAlgorithm

substrait到CH算子的转化

substrait read算子的转化

// The scan operator of base data (physical or virtual), including filtering and projection.  
message ReadRel {  
  RelCommon common = 1;  
  /// 读取数据源的schema, 包括column name和type  
  NamedStruct base_schema = 2;  
  Expression filter = 3;  
  Expression best_effort_filter = 11;  
  Expression.MaskExpression projection = 4;  
  substrait.extensions.AdvancedExtension advanced_extension = 10;  
   
  // Definition of which type of scan operation is to be performed  
  oneof read_type {  
    VirtualTable virtual_table = 5;  
    /// gluten实际运行中,local_files分为两种来源,一种来自iterator, 一种来自本地或远程的Parquet/ORC文件  
    LocalFiles local_files = 6;  
    NamedTable named_table = 7;  
    ExtensionTable extension_table = 8;  
  }  
   
   
  // Represents a list of files in input of a scan operation  
  message LocalFiles {  
    /// 一个或多个本地或远程文件。  
    repeated FileOrFiles items = 1;  
    substrait.extensions.AdvancedExtension advanced_extension = 10;  
   
   
    // Many files consist of indivisible chunks (e.g. parquet row groups  
    // or CSV rows).  If a slice partially selects an indivisible chunk  
    // then the consumer should employ some rule to decide which slice to  
    // include the chunk in (e.g. include it in the slice that contains  
    // the midpoint of the chunk)  
    message FileOrFiles {  
      oneof path_type {  
        // A URI that can refer to either a single folder or a single file  
        string uri_path = 1;  
        // A URI where the path portion is a glob expression that can  
        // identify zero or more paths.  
        // Consumers should support the POSIX syntax.  The recursive  
        // globstar (**) may not be supported.  
        string uri_path_glob = 2;  
        // A URI that refers to a single file  
        string uri_file = 3;  
        // A URI that refers to a single folder  
        string uri_folder = 4;  
      }  

代码入口:SerializedPlanParser.h parseOP函数

  • 如果local file来自于java iterator, 则将先用SourceFromJavaIter组装Pipe, 再用Pipe生成ReadFromPreparedSource算子,再把算子加入到QueryPlan中。
  • 如果local file来自于本地或远程的Parquet/ORC文件,先用SubstraitFileSource组装Pipe, 再生成ReadFromStorageStep算子,加入到QueryPlan中

SourceFromJavaIter: CH算子,通过jni从gluten中的io/glutenproject/execution/ColumnarNativeIterator中批量读取Block。实现见SourceFromJavaIter::generate

SubstraitFileSource: CH算子, 支持从不同数据源local/s3/hdfs读取不同格式Parquet/ORC格式的数据。相关类:

  • NormalFileReader: 从单个文件个读取文件,返回Chunk
  • FormatFile: 对不同格式的文件的抽象, 目前已经实现了Text, ORC, Parquet三种格式

substrait project算子的转化

// This operator allows to represent calculated expressions of fields (e.g., a+b). Direct/Emit are used to represent classical relational projections  
message ProjectRel {  
  RelCommon common = 1;  
  /// 输入算子  
  Rel input = 2;  
  /// project算子中包含的多组表达式  
  repeated Expression expressions = 3;  
  substrait.extensions.AdvancedExtension advanced_extension = 10;  
}  

转化流程相对简单:

  • 执行expressionsToActionsDAG,将多组substrait::Expression转化为ActionsDAG
  • 用ActionsDAG构造ExpressionStep,加入到QueryPlan中

substrait aggregate算子的转化

// The relational operator representing a GROUP BY Aggregate  
message AggregateRel {  
  RelCommon common = 1;  
   
  // Input of the aggregation  
  Rel input = 2;  
   
  /// group by expressions,对应维度,可能有多个,可能是表达式  
  repeated Grouping groupings = 3;  
   
  /// aggregate expressions, 对应指标,可能有多个  
  repeated Measure measures = 4;  
   
  substrait.extensions.AdvancedExtension advanced_extension = 10;  
   
  message Grouping {  
    repeated Expression grouping_expressions = 1;  
  }  
   
  message Measure {  
    AggregateFunction measure = 1;  
   
    // An optional boolean expression that acts to filter which records are  
    // included in the measure. True means include this record for calculation  
    // within the measure.  
    // Helps to support SUM(<c>) FILTER(WHERE...) syntax without masking opportunities for optimization  
    Expression filter = 2;  
  }  
}  

转化流程:

  • 执行SerializedPlanParser::parseAggregate,将substrait aggregate算子转化为QueryPlanStep
    • 判断是否需要在CH计划中前置一个ExpressionStep, 满足以下两个条件之一即成立。如果是,则在CH聚合算子之前加入ExpressionStep
      • aggregate function的参数为Literal
      • aggregate function的参数为非nullable, 但aggregate function的output type为nullable, 且当前聚合算子处于Partial阶段。
    • 将多组measures转化为AggregateDescriptions
    • 判断当前aggregate算子是否为final阶段
      • 如果是,则在QueryPlan加入MergingAggregatedStep算子
      • 否则加入AggregatingStep算子

substrait sort算子的转化

// The ORDERY BY (or sorting) relational operator. Beside describing a base relation, it includes a list of fields to sort on  
message SortRel {  
  RelCommon common = 1;  
  Rel input = 2;  
  /// 多组sort expressions: order by expr1, expr2, expr3  
  repeated SortField sorts = 3;  
  substrait.extensions.AdvancedExtension advanced_extension = 10;  
}  
   
// The description of a field to sort on (including the direction of sorting and null semantics)  
message SortField {  
  Expression expr = 1;  
     
  oneof sort_kind {  
    /// sort排序方向,这里分的比较细  
    SortDirection direction = 2;  
    uint32 comparison_function_reference = 3;  
  }  
     
  enum SortDirection {  
    SORT_DIRECTION_UNSPECIFIED = 0;  
    SORT_DIRECTION_ASC_NULLS_FIRST = 1;  
    SORT_DIRECTION_ASC_NULLS_LAST = 2;  
    SORT_DIRECTION_DESC_NULLS_FIRST = 3;  
    SORT_DIRECTION_DESC_NULLS_LAST = 4;  
    SORT_DIRECTION_CLUSTERED = 5;  
  }  
}  

入口:SortRelParser::parse
转化流程

  • 首先判断order by后是否有limit clause, 没有则limit = 0
  • 将substrait SortField转化为CH中的SortDescription
  • 构造SortingStep,加入到QueryPlan中。