Gluten中Substrait算子的转化和CH算子的实现
文章目录
如何打印计划
spark plan
首先在本地部署thriftserver
然后建表
CREATE TEMPORARY VIEW lineitem
USING org.apache.spark.sql.parquet
OPTIONS (
path "/data1/liyang/cppproject/gluten/gluten-core/src/test/resources/tpch-data/lineitem"
);
拿到TPC-H Q1
SELECT
l_returnflag,
l_linestatus,
sum(l_quantity) AS sum_qty,
sum(l_extendedprice) AS sum_base_price,
sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge,
avg(l_quantity) AS avg_qty,
avg(l_extendedprice) AS avg_price,
avg(l_discount) AS avg_disc,
count(*) AS count_order
FROM
lineitem
WHERE
l_shipdate <= date'1998-09-02' - interval 1 day
GROUP BY
l_returnflag,
l_linestatus
ORDER BY
l_returnflag,
l_linestatus;
执行explain formatted + query,就能看到对应的spark optimized physical plan
+----------------------------------------------------+
| plan |
+----------------------------------------------------+
| == Physical Plan ==
CHNativeColumnarToRow (12)
+- * SortExecTransformer (10)
+- ColumnarExchangeAdaptor (9)
+- * CHHashAggregateExecTransformer (7)
+- ColumnarExchangeAdaptor (6)
+- * CHHashAggregateExecTransformer (4)
+- * ProjectExecTransformer (3)
+- * FilterExecTransformer (2)
+- * Scan parquet (1)
(1) Scan parquet
Output [7]: [l_quantity#364, l_extendedprice#365, l_discount#366, l_tax#367, l_returnflag#368, l_linestatus#369, l_shipdate#370]
Batched: true
Location: InMemoryFileIndex [file:/data1/liyang/cppproject/gluten/gluten-core/src/test/resources/tpch-data/lineitem]
PushedFilters: [IsNotNull(l_shipdate), LessThanOrEqual(l_shipdate,1998-09-01)]
ReadSchema: struct<l_quantity:double,l_extendedprice:double,l_discount:double,l_tax:double,l_returnflag:string,l_linestatus:string,l_shipdate:date>
(2) FilterExecTransformer
Input [7]: [l_quantity#364, l_extendedprice#365, l_discount#366, l_tax#367, l_returnflag#368, l_linestatus#369, l_shipdate#370]
Arguments: (isnotnull(l_shipdate#370) AND (l_shipdate#370 <= 1998-09-01))
(3) ProjectExecTransformer
Input [7]: [l_quantity#364, l_extendedprice#365, l_discount#366, l_tax#367, l_returnflag#368, l_linestatus#369, l_shipdate#370]
Arguments: [l_quantity#364, l_extendedprice#365, l_discount#366, l_tax#367, l_returnflag#368, l_linestatus#369]
(4) CHHashAggregateExecTransformer
Input [6]: [l_quantity#364, l_extendedprice#365, l_discount#366, l_tax#367, l_returnflag#368, l_linestatus#369]
Keys [2]: [l_returnflag#368, l_linestatus#369]
Functions [8]: [partial_sum(l_quantity#364), partial_sum(l_extendedprice#365), partial_sum((l_extendedprice#365 * (1.0 - l_discount#366))), partial_sum(((l_extendedprice#365 * (1.0 - l_discount#366)) * (1.0 + l_tax#367))), partial_avg(l_quantity#364), partial_avg(l_extendedprice#365), partial_avg(l_discount#366), partial_count(1)]
Aggregate Attributes [11]: [sum#414, sum#415, sum#416, sum#417, sum#418, count#419L, sum#420, count#421L, sum#422, count#423L, count#424L]
Results [13]: [l_returnflag#368, l_linestatus#369, sum#425, sum#426, sum#427, sum#428, sum#429, count#430L, sum#431, count#432L, sum#433, count#434L, count#435L]
(5) WholeStageCodegenTransformer (13)
Input [13]: [l_returnflag#368, l_linestatus#369, sum#425, sum#426, sum#427, sum#428, sum#429, count#430L, sum#431, count#432L, sum#433, count#434L, count#435L]
(6) ColumnarExchangeAdaptor
Input [13]: [l_returnflag#368, l_linestatus#369, sum#425, sum#426, sum#427, sum#428, sum#429, count#430L, sum#431, count#432L, sum#433, count#434L, count#435L]
Arguments: hashpartitioning(l_returnflag#368, l_linestatus#369, 5), ENSURE_REQUIREMENTS, false, [plan_id=891], [id=#891]
(7) CHHashAggregateExecTransformer
Input [13]: [l_returnflag#368, l_linestatus#369, sum#425, sum#426, sum#427, sum#428, sum#429, count#430L, sum#431, count#432L, sum#433, count#434L, count#435L]
Keys [2]: [l_returnflag#368, l_linestatus#369]
Functions [8]: [sum(l_quantity#364), sum(l_extendedprice#365), sum((l_extendedprice#365 * (1.0 - l_discount#366))), sum(((l_extendedprice#365 * (1.0 - l_discount#366)) * (1.0 + l_tax#367))), avg(l_quantity#364), avg(l_extendedprice#365), avg(l_discount#366), count(1)]
Aggregate Attributes [8]: [sum(l_quantity#364)#407, sum(l_extendedprice#365)#408, sum((l_extendedprice#365 * (1.0 - l_discount#366)))#412, sum(((l_extendedprice#365 * (1.0 - l_discount#366)) * (1.0 + l_tax#367)))#413, avg(l_quantity#364)#409, avg(l_extendedprice#365)#410, avg(l_discount#366)#411, count(1)#406L]
Results [10]: [l_returnflag#368, l_linestatus#369, sum(l_quantity#364)#407 AS sum_qty#393, sum(l_extendedprice#365)#408 AS sum_base_price#394, sum((l_extendedprice#365 * (1.0 - l_discount#366)))#412 AS sum_disc_price#395, sum(((l_extendedprice#365 * (1.0 - l_discount#366)) * (1.0 + l_tax#367)))#413 AS sum_charge#396, avg(l_quantity#364)#409 AS avg_qty#397, avg(l_extendedprice#365)#410 AS avg_price#398, avg(l_discount#366)#411 AS avg_disc#399, count(1)#406L AS count_order#400L]
(8) WholeStageCodegenTransformer (14)
Input [10]: [l_returnflag#368, l_linestatus#369, sum_qty#393, sum_base_price#394, sum_disc_price#395, sum_charge#396, avg_qty#397, avg_price#398, avg_disc#399, count_order#400L]
(9) ColumnarExchangeAdaptor
Input [10]: [l_returnflag#368, l_linestatus#369, sum_qty#393, sum_base_price#394, sum_disc_price#395, sum_charge#396, avg_qty#397, avg_price#398, avg_disc#399, count_order#400L]
Arguments: rangepartitioning(l_returnflag#368 ASC NULLS FIRST, l_linestatus#369 ASC NULLS FIRST, 5), ENSURE_REQUIREMENTS, false, [plan_id=895], [id=#895]
(10) SortExecTransformer
Input [10]: [l_returnflag#368, l_linestatus#369, sum_qty#393, sum_base_price#394, sum_disc_price#395, sum_charge#396, avg_qty#397, avg_price#398, avg_disc#399, count_order#400L]
Arguments: [l_returnflag#368 ASC NULLS FIRST, l_linestatus#369 ASC NULLS FIRST], true, 0
(11) WholeStageCodegenTransformer (15)
Input [10]: [l_returnflag#368, l_linestatus#369, sum_qty#393, sum_base_price#394, sum_disc_price#395, sum_charge#396, avg_qty#397, avg_price#398, avg_disc#399, count_order#400L]
(12) CHNativeColumnarToRow
Input [10]: [l_returnflag#368, l_linestatus#369, sum_qty#393, sum_base_price#394, sum_disc_price#395, sum_charge#396, avg_qty#397, avg_price#398, avg_disc#399, count_order#400L]
substrait plan
spark log中substrait plan默认使用 pb.DebugString() 输出,不便于阅读。
在调试代码时,为了更清楚的打印substrait plan,首先在utils/local-engine/Common/common.cpp中,将默认log level从"error"改成"trace”
auto level = config->getString("logger.level", "trace");
重新编译后执行TCP-H Q1, 可在spark log中发现三种不同的substrait plan,对应三个阶段
第一阶段: 对应聚合的Partial阶段
{"extensions":[{"extensionFunction":{"functionAnchor":7,"name":"avg:req_fp64"}},{"extensionFunction":{"name":"is_not_null:opt_bool_date"}},{"extensionFunction":{"functionAnchor":1,"name":"lte:opt_date_date"}},{"extensionFunction":{"functionAnchor":2,"name":"and:opt_bool_bool"}},{"extensionFunction":{"functionAnchor":3,"name":"subtract:opt_fp64_fp64"}},{"extensionFunction":{"functionAnchor":6,"name":"sum:req_fp64"}},{"extensionFunction":{"functionAnchor":5,"name":"add:opt_fp64_fp64"}},{"extensionFunction":{"functionAnchor":8,"name":"count:req_i32"}},{"extensionFunction":{"functionAnchor":4,"name":"multiply:opt_fp64_fp64"}}],"relations":[{"root":{"input":{"aggregate":{"common":{"direct":{}},"input":{"project":{"common":{"direct":{}},"input":{"project":{"common":{"direct":{}},"input":{"filter":{"common":{"direct":{}},"input":{"read":{"common":{"direct":{}},"baseSchema":{"names":["l_quantity","l_extendedprice","l_discount","l_tax","l_returnflag","l_linestatus","l_shipdate"],"struct":{"types":[{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},{"string":{"nullability":"NULLABILITY_NULLABLE"}},{"string":{"nullability":"NULLABILITY_NULLABLE"}},{"date":{"nullability":"NULLABILITY_NULLABLE"}}]},"partitionColumns":{"columnType":["NORMAL_COL","NORMAL_COL","NORMAL_COL","NORMAL_COL","NORMAL_COL","NORMAL_COL","NORMAL_COL"]}},"filter":{"scalarFunction":{"functionReference":2,"outputType":{"bool":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"scalarFunction":{"outputType":{"bool":{"nullability":"NULLABILITY_REQUIRED"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":6}}}}}]}}},{"value":{"scalarFunction":{"functionReference":1,"outputType":{"bool":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":6}}}}},{"value":{"literal":{"date":10470}}}]}}}]}},"localFiles":{"items":[{"uriFile":"file:///data1/liyang/cppproject/gluten/gluten-core/src/test/resources/tpch-data/lineitem/part-00000-d08071cb-0dfa-42dc-9198-83cb334ccda3-c000.snappy.parquet","length":"17777735","parquet":{}}]}}},"condition":{"scalarFunction":{"functionReference":2,"outputType":{"bool":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"scalarFunction":{"outputType":{"bool":{"nullability":"NULLABILITY_REQUIRED"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":6}}}}}]}}},{"value":{"scalarFunction":{"functionReference":1,"outputType":{"bool":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":6}}}}},{"value":{"literal":{"date":10470}}}]}}}]}}}},"expressions":[{"selection":{"directReference":{"structField":{}}}},{"selection":{"directReference":{"structField":{"field":1}}}},{"selection":{"directReference":{"structField":{"field":2}}}},{"selection":{"directReference":{"structField":{"field":3}}}},{"selection":{"directReference":{"structField":{"field":4}}}},{"selection":{"directReference":{"structField":{"field":5}}}}]}},"expressions":[{"selection":{"directReference":{"structField":{"field":4}}}},{"selection":{"directReference":{"structField":{"field":5}}}},{"selection":{"directReference":{"structField":{}}}},{"selection":{"directReference":{"structField":{"field":1}}}},{"scalarFunction":{"functionReference":4,"outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":1}}}}},{"value":{"scalarFunction":{"functionReference":3,"outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"literal":{"fp64":1}}},{"value":{"selection":{"directReference":{"structField":{"field":2}}}}}]}}}]}},{"scalarFunction":{"functionReference":4,"outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"scalarFunction":{"functionReference":4,"outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":1}}}}},{"value":{"scalarFunction":{"functionReference":3,"outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"literal":{"fp64":1}}},{"value":{"selection":{"directReference":{"structField":{"field":2}}}}}]}}}]}}},{"value":{"scalarFunction":{"functionReference":5,"outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"literal":{"fp64":1}}},{"value":{"selection":{"directReference":{"structField":{"field":3}}}}}]}}}]}},{"selection":{"directReference":{"structField":{"field":2}}}},{"literal":{"i32":1}}]}},"groupings":[{"groupingExpressions":[{"selection":{"directReference":{"structField":{}}}},{"selection":{"directReference":{"structField":{"field":1}}}}]}],"measures":[{"measure":{"functionReference":6,"phase":"AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE","outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":2}}}}}]}},{"measure":{"functionReference":6,"phase":"AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE","outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":3}}}}}]}},{"measure":{"functionReference":6,"phase":"AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE","outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":4}}}}}]}},{"measure":{"functionReference":6,"phase":"AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE","outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":5}}}}}]}},{"measure":{"functionReference":7,"phase":"AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE","outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":2}}}}}]}},{"measure":{"functionReference":7,"phase":"AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE","outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":3}}}}}]}},{"measure":{"functionReference":7,"phase":"AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE","outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":6}}}}}]}},{"measure":{"functionReference":8,"phase":"AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE","outputType":{"i64":{"nullability":"NULLABILITY_REQUIRED"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":7}}}}}]}}]}},"names":["l_returnflag#8","l_linestatus#9","sum#42","sum#43","sum#47","sum#48","avg#44","avg#45","avg#46","count#41"]}}]}
第二阶段:对应聚合的Final阶段
{"extensions":[{"extensionFunction":{"functionAnchor":1,"name":"avg:req_fp64"}},{"extensionFunction":{"functionAnchor":4,"name":"alias:req_i64"}},{"extensionFunction":{"name":"sum:req_fp64"}},{"extensionFunction":{"functionAnchor":2,"name":"count:req_i32"}},{"extensionFunction":{"functionAnchor":3,"name":"alias:req_fp64"}}],"relations":[{"root":{"input":{"project":{"common":{"direct":{}},"input":{"aggregate":{"common":{"direct":{}},"input":{"read":{"common":{"direct":{}},"baseSchema":{"names":["l_returnflag#8","l_linestatus#9","sum#42#Partial#sum","sum#43#Partial#sum","sum#47#Partial#sum","sum#48#Partial#sum","avg#44#Partial#avg","avg#45#Partial#avg","avg#46#Partial#avg","count#41#Partial#count"],"struct":{"types":[{"string":{"nullability":"NULLABILITY_NULLABLE"}},{"string":{"nullability":"NULLABILITY_NULLABLE"}},{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},{"i64":{"nullability":"NULLABILITY_REQUIRED"}}]}},"localFiles":{"items":[{"uriFile":"iterator:0"}]}}},"groupings":[{"groupingExpressions":[{"selection":{"directReference":{"structField":{}}}},{"selection":{"directReference":{"structField":{"field":1}}}}]}],"measures":[{"measure":{"phase":"AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT","outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":2}}}}}]}},{"measure":{"phase":"AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT","outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":3}}}}}]}},{"measure":{"phase":"AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT","outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":4}}}}}]}},{"measure":{"phase":"AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT","outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":5}}}}}]}},{"measure":{"functionReference":1,"phase":"AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT","outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":6}}}}}]}},{"measure":{"functionReference":1,"phase":"AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT","outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":7}}}}}]}},{"measure":{"functionReference":1,"phase":"AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT","outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":8}}}}}]}},{"measure":{"functionReference":2,"phase":"AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT","outputType":{"i64":{"nullability":"NULLABILITY_REQUIRED"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":9}}}}}]}}]}},"expressions":[{"selection":{"directReference":{"structField":{}}}},{"selection":{"directReference":{"structField":{"field":1}}}},{"scalarFunction":{"functionReference":3,"outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":2}}}}}]}},{"scalarFunction":{"functionReference":3,"outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":3}}}}}]}},{"scalarFunction":{"functionReference":3,"outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":4}}}}}]}},{"scalarFunction":{"functionReference":3,"outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":5}}}}}]}},{"scalarFunction":{"functionReference":3,"outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":6}}}}}]}},{"scalarFunction":{"functionReference":3,"outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":7}}}}}]}},{"scalarFunction":{"functionReference":3,"outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":8}}}}}]}},{"scalarFunction":{"functionReference":4,"outputType":{"i64":{"nullability":"NULLABILITY_REQUIRED"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":9}}}}}]}}]}},"names":["l_returnflag#8","l_linestatus#9","sum_qty#33","sum_base_price#34","sum_disc_price#35","sum_charge#36","avg_qty#37","avg_price#38","avg_disc#39","count_order#40"]}}]}
第三阶段:基于聚合结果进行排序
{"extensions":[{"extensionFunction":{"functionAnchor":1,"name":"avg:req_fp64"}},{"extensionFunction":{"functionAnchor":4,"name":"alias:req_i64"}},{"extensionFunction":{"name":"sum:req_fp64"}},{"extensionFunction":{"functionAnchor":2,"name":"count:req_i32"}},{"extensionFunction":{"functionAnchor":3,"name":"alias:req_fp64"}}],"relations":[{"root":{"input":{"project":{"common":{"direct":{}},"input":{"aggregate":{"common":{"direct":{}},"input":{"read":{"common":{"direct":{}},"baseSchema":{"names":["l_returnflag#8","l_linestatus#9","sum#42#Partial#sum","sum#43#Partial#sum","sum#47#Partial#sum","sum#48#Partial#sum","avg#44#Partial#avg","avg#45#Partial#avg","avg#46#Partial#avg","count#41#Partial#count"],"struct":{"types":[{"string":{"nullability":"NULLABILITY_NULLABLE"}},{"string":{"nullability":"NULLABILITY_NULLABLE"}},{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},{"i64":{"nullability":"NULLABILITY_REQUIRED"}}]}},"localFiles":{"items":[{"uriFile":"iterator:0"}]}}},"groupings":[{"groupingExpressions":[{"selection":{"directReference":{"structField":{}}}},{"selection":{"directReference":{"structField":{"field":1}}}}]}],"measures":[{"measure":{"phase":"AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT","outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":2}}}}}]}},{"measure":{"phase":"AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT","outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":3}}}}}]}},{"measure":{"phase":"AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT","outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":4}}}}}]}},{"measure":{"phase":"AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT","outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":5}}}}}]}},{"measure":{"functionReference":1,"phase":"AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT","outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":6}}}}}]}},{"measure":{"functionReference":1,"phase":"AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT","outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":7}}}}}]}},{"measure":{"functionReference":1,"phase":"AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT","outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":8}}}}}]}},{"measure":{"functionReference":2,"phase":"AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT","outputType":{"i64":{"nullability":"NULLABILITY_REQUIRED"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":9}}}}}]}}]}},"expressions":[{"selection":{"directReference":{"structField":{}}}},{"selection":{"directReference":{"structField":{"field":1}}}},{"scalarFunction":{"functionReference":3,"outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":2}}}}}]}},{"scalarFunction":{"functionReference":3,"outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":3}}}}}]}},{"scalarFunction":{"functionReference":3,"outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":4}}}}}]}},{"scalarFunction":{"functionReference":3,"outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":5}}}}}]}},{"scalarFunction":{"functionReference":3,"outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":6}}}}}]}},{"scalarFunction":{"functionReference":3,"outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":7}}}}}]}},{"scalarFunction":{"functionReference":3,"outputType":{"fp64":{"nullability":"NULLABILITY_NULLABLE"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":8}}}}}]}},{"scalarFunction":{"functionReference":4,"outputType":{"i64":{"nullability":"NULLABILITY_REQUIRED"}},"arguments":[{"value":{"selection":{"directReference":{"structField":{"field":9}}}}}]}}]}},"names":["l_returnflag#8","l_linestatus#9","sum_qty#33","sum_base_price#34","sum_disc_price#35","sum_charge#36","avg_qty#37","avg_price#38","avg_disc#39","count_order#40"]}}]}
clickhouse plan
同样的,在开启log level = ‘trace'后,在spark log中也能发现clickhouse在不同阶段的plan
第一阶段
Expression (Rename Output)
Header: l_returnflag#8 Nullable(String)
l_linestatus#9 Nullable(String)
sum#42 AggregateFunction(sum, Nullable(Float64))
sum#43 AggregateFunction(sum, Nullable(Float64))
sum#47 AggregateFunction(sum, Nullable(Float64))
sum#48 AggregateFunction(sum, Nullable(Float64))
avg#44 AggregateFunction(avg, Nullable(Float64))
avg#45 AggregateFunction(avg, Nullable(Float64))
avg#46 AggregateFunction(avg, Nullable(Float64))
count#41 AggregateFunction(count, Int32)
Actions: INPUT : 0 -> l_returnflag Nullable(String) : 0
INPUT : 1 -> l_linestatus Nullable(String) : 1
INPUT : 2 -> sum(l_quantity) AggregateFunction(sum, Nullable(Float64)) : 2
INPUT : 3 -> sum(l_extendedprice) AggregateFunction(sum, Nullable(Float64)) : 3
INPUT : 4 -> sum(multiply(l_extendedprice,minus(1_1,l_discount))) AggregateFunction(sum, Nullable(Float64)) : 4
INPUT : 5 -> sum(multiply(multiply(l_extendedprice,minus(1_2,l_discount)),plus(1_3,l_tax))) AggregateFunction(sum, Nullable(Float64)) : 5
INPUT : 6 -> avg(l_quantity) AggregateFunction(avg, Nullable(Float64)) : 6
INPUT : 7 -> avg(l_extendedprice) AggregateFunction(avg, Nullable(Float64)) : 7
INPUT : 8 -> avg(l_discount) AggregateFunction(avg, Nullable(Float64)) : 8
INPUT : 9 -> count(1_4) AggregateFunction(count, Int32) : 9
ALIAS l_returnflag :: 0 -> l_returnflag#8 Nullable(String) : 10
ALIAS l_linestatus :: 1 -> l_linestatus#9 Nullable(String) : 0
ALIAS sum(l_quantity) :: 2 -> sum#42 AggregateFunction(sum, Nullable(Float64)) : 1
ALIAS sum(l_extendedprice) :: 3 -> sum#43 AggregateFunction(sum, Nullable(Float64)) : 2
ALIAS sum(multiply(l_extendedprice,minus(1_1,l_discount))) :: 4 -> sum#47 AggregateFunction(sum, Nullable(Float64)) : 3
ALIAS sum(multiply(multiply(l_extendedprice,minus(1_2,l_discount)),plus(1_3,l_tax))) :: 5 -> sum#48 AggregateFunction(sum, Nullable(Float64)) : 4
ALIAS avg(l_quantity) :: 6 -> avg#44 AggregateFunction(avg, Nullable(Float64)) : 5
ALIAS avg(l_extendedprice) :: 7 -> avg#45 AggregateFunction(avg, Nullable(Float64)) : 6
ALIAS avg(l_discount) :: 8 -> avg#46 AggregateFunction(avg, Nullable(Float64)) : 7
ALIAS count(1_4) :: 9 -> count#41 AggregateFunction(count, Int32) : 8
Positions: 10 0 1 2 3 4 5 6 7 8
Aggregating
Header: l_returnflag Nullable(String)
l_linestatus Nullable(String)
sum(l_quantity) AggregateFunction(sum, Nullable(Float64))
sum(l_extendedprice) AggregateFunction(sum, Nullable(Float64))
sum(multiply(l_extendedprice,minus(1_1,l_discount))) AggregateFunction(sum, Nullable(Float64))
sum(multiply(multiply(l_extendedprice,minus(1_2,l_discount)),plus(1_3,l_tax))) AggregateFunction(sum, Nullable(Float64))
avg(l_quantity) AggregateFunction(avg, Nullable(Float64))
avg(l_extendedprice) AggregateFunction(avg, Nullable(Float64))
avg(l_discount) AggregateFunction(avg, Nullable(Float64))
count(1_4) AggregateFunction(count, Int32)
Keys: l_returnflag, l_linestatus
Aggregating
Header: l_returnflag Nullable(String)
l_linestatus Nullable(String)
sum(l_quantity) AggregateFunction(sum, Nullable(Float64))
sum(l_extendedprice) AggregateFunction(sum, Nullable(Float64))
sum(multiply(l_extendedprice,minus(1_1,l_discount))) AggregateFunction(sum, Nullable(Float64))
sum(multiply(multiply(l_extendedprice,minus(1_2,l_discount)),plus(1_3,l_tax))) AggregateFunction(sum, Nullable(Float64))
avg(l_quantity) AggregateFunction(avg, Nullable(Float64))
avg(l_extendedprice) AggregateFunction(avg, Nullable(Float64))
avg(l_discount) AggregateFunction(avg, Nullable(Float64))
count(1_4) AggregateFunction(count, Int32)
Keys: l_returnflag, l_linestatus
Aggregates:
sum(l_quantity)
Function: sum(Nullable(Float64)) → Nullable(Float64)
Arguments: l_quantity
sum(l_extendedprice)
Function: sum(Nullable(Float64)) → Nullable(Float64)
Arguments: l_extendedprice
sum(multiply(l_extendedprice,minus(1_1,l_discount)))
Function: sum(Nullable(Float64)) → Nullable(Float64)
Arguments: multiply(l_extendedprice,minus(1_1,l_discount))
sum(multiply(multiply(l_extendedprice,minus(1_2,l_discount)),plus(1_3,l_tax)))
Function: sum(Nullable(Float64)) → Nullable(Float64)
Arguments: multiply(multiply(l_extendedprice,minus(1_2,l_discount)),plus(1_3,l_tax))
avg(l_quantity)
Function: avg(Nullable(Float64)) → Nullable(Float64)
Arguments: l_quantity
avg(l_extendedprice)
Function: avg(Nullable(Float64)) → Nullable(Float64)
Arguments: l_extendedprice
avg(l_discount)
Function: avg(Nullable(Float64)) → Nullable(Float64)
Arguments: l_discount
count(1_4)
Function: count(Int32) → UInt64
Arguments: 1_4
Expression (Project)
Header: l_returnflag Nullable(String)
l_linestatus Nullable(String)
l_quantity Nullable(Float64)
l_extendedprice Nullable(Float64)
multiply(l_extendedprice,minus(1_1,l_discount)) Nullable(Float64)
multiply(multiply(l_extendedprice,minus(1_2,l_discount)),plus(1_3,l_tax)) Nullable(Float64)
l_discount Nullable(Float64)
1_4 Int32
Actions: INPUT :: 0 -> l_quantity Nullable(Float64) : 0
INPUT : 1 -> l_extendedprice Nullable(Float64) : 1
INPUT : 2 -> l_discount Nullable(Float64) : 2
INPUT : 3 -> l_tax Nullable(Float64) : 3
INPUT :: 4 -> l_returnflag Nullable(String) : 4
INPUT :: 5 -> l_linestatus Nullable(String) : 5
COLUMN Const(Float64) -> 1_1 Float64 : 6
COLUMN Const(Float64) -> 1_2 Float64 : 7
COLUMN Const(Float64) -> 1_3 Float64 : 8
COLUMN Const(Int32) -> 1_4 Int32 : 9
FUNCTION minus(1_1 :: 6, l_discount : 2) -> minus(1_1,l_discount) Nullable(Float64) : 10
FUNCTION minus(1_2 :: 7, l_discount : 2) -> minus(1_2,l_discount) Nullable(Float64) : 6
FUNCTION plus(1_3 :: 8, l_tax :: 3) -> plus(1_3,l_tax) Nullable(Float64) : 7
FUNCTION multiply(l_extendedprice : 1, minus(1_1,l_discount) :: 10) -> multiply(l_extendedprice,minus(1_1,l_discount)) Nullable(Float64) : 3
FUNCTION multiply(l_extendedprice : 1, minus(1_2,l_discount) :: 6) -> multiply(l_extendedprice,minus(1_2,l_discount)) Nullable(Float64) : 10
FUNCTION multiply(multiply(l_extendedprice,minus(1_2,l_discount)) :: 10, plus(1_3,l_tax) :: 7) -> multiply(multiply(l_extendedprice,minus(1_2,l_discount)),plus(1_3,l_tax)) Nullable(Float64) : 6
Positions: 4 5 0 1 3 6 2 9
Expression (Project)
Header: l_quantity Nullable(Float64)
l_extendedprice Nullable(Float64)
l_discount Nullable(Float64)
l_tax Nullable(Float64)
l_returnflag Nullable(String)
l_linestatus Nullable(String)
Actions: INPUT :: 0 -> l_quantity Nullable(Float64) : 0
INPUT :: 1 -> l_extendedprice Nullable(Float64) : 1
INPUT :: 2 -> l_discount Nullable(Float64) : 2
INPUT :: 3 -> l_tax Nullable(Float64) : 3
INPUT :: 4 -> l_returnflag Nullable(String) : 4
INPUT :: 5 -> l_linestatus Nullable(String) : 5
Positions: 0 1 2 3 4 5
Expression (Remove nullable properties)
Header: l_quantity Nullable(Float64)
l_extendedprice Nullable(Float64)
l_discount Nullable(Float64)
l_tax Nullable(Float64)
l_returnflag Nullable(String)
l_linestatus Nullable(String)
l_shipdate Date32
Actions: INPUT :: 0 -> l_quantity Nullable(Float64) : 0
INPUT :: 1 -> l_extendedprice Nullable(Float64) : 1
INPUT :: 2 -> l_discount Nullable(Float64) : 2
INPUT :: 3 -> l_tax Nullable(Float64) : 3
INPUT :: 4 -> l_returnflag Nullable(String) : 4
INPUT :: 5 -> l_linestatus Nullable(String) : 5
INPUT : 6 -> l_shipdate Nullable(Date32) : 6
FUNCTION assumeNotNull(l_shipdate :: 6) -> l_shipdate Date32 : 7
Positions: 0 1 2 3 4 5 7
Filter
Header: l_quantity Nullable(Float64)
l_extendedprice Nullable(Float64)
l_discount Nullable(Float64)
l_tax Nullable(Float64)
l_returnflag Nullable(String)
l_linestatus Nullable(String)
l_shipdate Nullable(Date32)
Filter column: and(isNotNull(l_shipdate),lessOrEquals(l_shipdate,10470_0)) (removed)
Actions: INPUT :: 0 -> l_quantity Nullable(Float64) : 0
INPUT :: 1 -> l_extendedprice Nullable(Float64) : 1
INPUT :: 2 -> l_discount Nullable(Float64) : 2
INPUT :: 3 -> l_tax Nullable(Float64) : 3
INPUT :: 4 -> l_returnflag Nullable(String) : 4
INPUT :: 5 -> l_linestatus Nullable(String) : 5
INPUT : 6 -> l_shipdate Nullable(Date32) : 6
COLUMN Const(Int32) -> 10470_0 Date32 : 7
FUNCTION isNotNull(l_shipdate : 6) -> isNotNull(l_shipdate) UInt8 : 8
FUNCTION lessOrEquals(l_shipdate : 6, 10470_0 :: 7) -> lessOrEquals(l_shipdate,10470_0) Nullable(UInt8) : 9
FUNCTION and(isNotNull(l_shipdate) :: 8, lessOrEquals(l_shipdate,10470_0) :: 9) -> and(isNotNull(l_shipdate),lessOrEquals(l_shipdate,10470_0)) Nullable(UInt8) : 7
Positions: 0 1 2 3 4 5 6 7
ReadFromStorage (read local files)
Header: l_quantity Nullable(Float64)
l_extendedprice Nullable(Float64)
l_discount Nullable(Float64)
l_tax Nullable(Float64)
l_returnflag Nullable(String)
l_linestatus Nullable(String)
l_shipdate Nullable(Date32)
对应的pipeline
第二阶段
Expression (Rename Output)
Header: l_returnflag#8 Nullable(String)
l_linestatus#9 Nullable(String)
sum_qty#33 Nullable(Float64)
sum_base_price#34 Nullable(Float64)
sum_disc_price#35 Nullable(Float64)
sum_charge#36 Nullable(Float64)
avg_qty#37 Nullable(Float64)
avg_price#38 Nullable(Float64)
avg_disc#39 Nullable(Float64)
count_order#40 Int64
Actions: INPUT :: 0 -> l_returnflag#8 Nullable(String) : 0
INPUT :: 1 -> l_linestatus#9 Nullable(String) : 1
INPUT : 2 -> sum#42#Partial#sum Nullable(Float64) : 2
INPUT : 3 -> sum#43#Partial#sum Nullable(Float64) : 3
INPUT : 4 -> sum#47#Partial#sum Nullable(Float64) : 4
INPUT : 5 -> sum#48#Partial#sum Nullable(Float64) : 5
INPUT : 6 -> avg#44#Partial#avg Nullable(Float64) : 6
INPUT : 7 -> avg#45#Partial#avg Nullable(Float64) : 7
INPUT : 8 -> avg#46#Partial#avg Nullable(Float64) : 8
INPUT : 9 -> count#41#Partial#count Int64 : 9
ALIAS sum#42#Partial#sum :: 2 -> sum_qty#33 Nullable(Float64) : 10
ALIAS sum#43#Partial#sum :: 3 -> sum_base_price#34 Nullable(Float64) : 2
ALIAS sum#47#Partial#sum :: 4 -> sum_disc_price#35 Nullable(Float64) : 3
ALIAS sum#48#Partial#sum :: 5 -> sum_charge#36 Nullable(Float64) : 4
ALIAS avg#44#Partial#avg :: 6 -> avg_qty#37 Nullable(Float64) : 5
ALIAS avg#45#Partial#avg :: 7 -> avg_price#38 Nullable(Float64) : 6
ALIAS avg#46#Partial#avg :: 8 -> avg_disc#39 Nullable(Float64) : 7
ALIAS count#41#Partial#count :: 9 -> count_order#40 Int64 : 8
Positions: 0 1 10 2 3 4 5 6 7 8
Expression (Project)
Header: l_returnflag#8 Nullable(String)
l_linestatus#9 Nullable(String)
sum#42#Partial#sum Nullable(Float64)
sum#43#Partial#sum Nullable(Float64)
sum#47#Partial#sum Nullable(Float64)
sum#48#Partial#sum Nullable(Float64)
avg#44#Partial#avg Nullable(Float64)
avg#45#Partial#avg Nullable(Float64)
avg#46#Partial#avg Nullable(Float64)
count#41#Partial#count Int64
Actions: INPUT :: 0 -> l_returnflag#8 Nullable(String) : 0
INPUT :: 1 -> l_linestatus#9 Nullable(String) : 1
INPUT :: 2 -> sum#42#Partial#sum Nullable(Float64) : 2
INPUT :: 3 -> sum#43#Partial#sum Nullable(Float64) : 3
INPUT :: 4 -> sum#47#Partial#sum Nullable(Float64) : 4
INPUT :: 5 -> sum#48#Partial#sum Nullable(Float64) : 5
INPUT :: 6 -> avg#44#Partial#avg Nullable(Float64) : 6
INPUT :: 7 -> avg#45#Partial#avg Nullable(Float64) : 7
INPUT :: 8 -> avg#46#Partial#avg Nullable(Float64) : 8
INPUT :: 9 -> count#41#Partial#count Int64 : 9
Positions: 0 1 2 3 4 5 6 7 8 9
Expression (Convert Aggregate Output)
Header: l_returnflag#8 Nullable(String)
l_linestatus#9 Nullable(String)
sum#42#Partial#sum Nullable(Float64)
sum#43#Partial#sum Nullable(Float64)
sum#47#Partial#sum Nullable(Float64)
sum#48#Partial#sum Nullable(Float64)
avg#44#Partial#avg Nullable(Float64)
avg#45#Partial#avg Nullable(Float64)
avg#46#Partial#avg Nullable(Float64)
count#41#Partial#count Int64
Actions: INPUT :: 0 -> l_returnflag#8 Nullable(String) : 0
INPUT :: 1 -> l_linestatus#9 Nullable(String) : 1
INPUT :: 2 -> sum#42#Partial#sum Nullable(Float64) : 2
INPUT :: 3 -> sum#43#Partial#sum Nullable(Float64) : 3
INPUT :: 4 -> sum#47#Partial#sum Nullable(Float64) : 4
INPUT :: 5 -> sum#48#Partial#sum Nullable(Float64) : 5
INPUT :: 6 -> avg#44#Partial#avg Nullable(Float64) : 6
INPUT :: 7 -> avg#45#Partial#avg Nullable(Float64) : 7
INPUT :: 8 -> avg#46#Partial#avg Nullable(Float64) : 8
INPUT : 9 -> count#41#Partial#count UInt64 : 9
COLUMN Const(String) -> Int64 String : 10
FUNCTION _CAST(count#41#Partial#count :: 9, Int64 :: 10) -> _CAST(count#41#Partial#count, Int64) Int64 : 11
ALIAS _CAST(count#41#Partial#count, Int64) :: 11 -> count#41#Partial#count Int64 : 10
Positions: 0 1 2 3 4 5 6 7 8 10
MergingAggregated
Header: l_returnflag#8 Nullable(String)
l_linestatus#9 Nullable(String)
sum#42#Partial#sum Nullable(Float64)
sum#43#Partial#sum Nullable(Float64)
sum#47#Partial#sum Nullable(Float64)
sum#48#Partial#sum Nullable(Float64)
avg#44#Partial#avg Nullable(Float64)
avg#45#Partial#avg Nullable(Float64)
avg#46#Partial#avg Nullable(Float64)
count#41#Partial#count UInt64
Keys: l_returnflag#8, l_linestatus#9
Aggregates:
sum#42#Partial#sum
Function: sumPartialMerge(AggregateFunction(sum, Nullable(Float64))) → Nullable(Float64)
Arguments: sum#42#Partial#sum
sum#43#Partial#sum
Function: sumPartialMerge(AggregateFunction(sum, Nullable(Float64))) → Nullable(Float64)
Arguments: sum#43#Partial#sum
sum#47#Partial#sum
Function: sumPartialMerge(AggregateFunction(sum, Nullable(Float64))) → Nullable(Float64)
Arguments: sum#47#Partial#sum
sum#48#Partial#sum
Function: sumPartialMerge(AggregateFunction(sum, Nullable(Float64))) → Nullable(Float64)
Arguments: sum#48#Partial#sum
avg#44#Partial#avg
Function: avgPartialMerge(AggregateFunction(avg, Nullable(Float64))) → Nullable(Float64)
Arguments: avg#44#Partial#avg
avg#45#Partial#avg
Function: avgPartialMerge(AggregateFunction(avg, Nullable(Float64))) → Nullable(Float64)
Arguments: avg#45#Partial#avg
avg#46#Partial#avg
Function: avgPartialMerge(AggregateFunction(avg, Nullable(Float64))) → Nullable(Float64)
Arguments: avg#46#Partial#avg
count#41#Partial#count
Function: countPartialMerge(AggregateFunction(count, Int64)) → UInt64
Arguments: count#41#Partial#count
ReadFromPreparedSource (Read From Java Iter)
Header: l_returnflag#8 Nullable(String)
l_linestatus#9 Nullable(String)
sum#42#Partial#sum AggregateFunction(sum, Nullable(Float64))
sum#43#Partial#sum AggregateFunction(sum, Nullable(Float64))
sum#47#Partial#sum AggregateFunction(sum, Nullable(Float64))
sum#48#Partial#sum AggregateFunction(sum, Nullable(Float64))
avg#44#Partial#avg AggregateFunction(avg, Nullable(Float64))
avg#45#Partial#avg AggregateFunction(avg, Nullable(Float64))
avg#46#Partial#avg AggregateFunction(avg, Nullable(Float64))
count#41#Partial#count AggregateFunction(count, Int64)
对应的pipeline
第三阶段
Expression (Rename Output)
Header: l_returnflag#8 Nullable(String)
l_linestatus#9 Nullable(String)
sum_qty#33 Nullable(Float64)
sum_base_price#34 Nullable(Float64)
sum_disc_price#35 Nullable(Float64)
sum_charge#36 Nullable(Float64)
avg_qty#37 Nullable(Float64)
avg_price#38 Nullable(Float64)
avg_disc#39 Nullable(Float64)
count_order#40 Int64
Actions: INPUT :: 0 -> l_returnflag#8 Nullable(String) : 0
INPUT :: 1 -> l_linestatus#9 Nullable(String) : 1
INPUT :: 2 -> sum_qty#33 Nullable(Float64) : 2
INPUT :: 3 -> sum_base_price#34 Nullable(Float64) : 3
INPUT :: 4 -> sum_disc_price#35 Nullable(Float64) : 4
INPUT :: 5 -> sum_charge#36 Nullable(Float64) : 5
INPUT :: 6 -> avg_qty#37 Nullable(Float64) : 6
INPUT :: 7 -> avg_price#38 Nullable(Float64) : 7
INPUT :: 8 -> avg_disc#39 Nullable(Float64) : 8
INPUT :: 9 -> count_order#40 Int64 : 9
Positions: 0 1 2 3 4 5 6 7 8 9
Sorting (Sorting step)
Header: l_returnflag#8 Nullable(String)
l_linestatus#9 Nullable(String)
sum_qty#33 Nullable(Float64)
sum_base_price#34 Nullable(Float64)
sum_disc_price#35 Nullable(Float64)
sum_charge#36 Nullable(Float64)
avg_qty#37 Nullable(Float64)
avg_price#38 Nullable(Float64)
avg_disc#39 Nullable(Float64)
count_order#40 Int64
Sort description: l_returnflag#8 ASC, l_linestatus#9 ASC
ReadFromPreparedSource (Read From Java Iter)
Header: l_returnflag#8 Nullable(String)
l_linestatus#9 Nullable(String)
sum_qty#33 Nullable(Float64)
sum_base_price#34 Nullable(Float64)
sum_disc_price#35 Nullable(Float64)
sum_charge#36 Nullable(Float64)
avg_qty#37 Nullable(Float64)
avg_price#38 Nullable(Float64)
avg_disc#39 Nullable(Float64)
count_order#40 Int64
对应的Pipeline
其他
如何在clickhouse-client中打印AST,Plan,Pipeline呢?
到测试环境找到一条query:
SELECT
(intDiv(toUInt32(rtime), 10) * 10) * 1000 AS t,
count()
FROM default.test_ranger_0
WHERE ((day >= toDate(1678356215)) AND (day <= toDate(1678356515))) AND ((rtime >= toDateTime(1678356215)) AND (rtime <= toDateTime(1678356515)))
GROUP BY t
ORDER BY t ASC
打印AST: explain ast
规整化query: explain syntax
EXPLAIN SYNTAX
SELECT sumIf(1, (number % 2) > 2)
FROM numbers(100)
Query id: 3fb3b332-a988-43c5-9677-9fff8c44ff3d
┌─explain──────────────────────────┐
│ SELECT countIf((number % 2) > 2) │
│ FROM numbers(100) │
└──────────────────────────────────┘
打印Plan: explain plan header = 1, actions = 1
EXPLAIN header = 1, actions = 1
SELECT
(intDiv(toUInt32(rtime), 10) * 10) * 1000 AS t,
count()
FROM default.test_ranger_0
WHERE ((day >= toDate(1678356215)) AND (day <= toDate(1678356515))) AND ((rtime >= toDateTime(1678356215)) AND (rtime <= toDateTime(1678356515)))
GROUP BY t
ORDER BY t ASC
Query id: f395457f-a684-4fc3-b6c9-6e9eb7ece5fe
┌─explain────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ Expression (Projection) │
│ Header: t UInt64 │
│ count() UInt64 │
│ Actions: INPUT : 0 -> multiply(multiply(intDiv(toUInt32(rtime), 10), 10), 1000) UInt64 : 0 │
│ INPUT :: 1 -> count() UInt64 : 1 │
│ ALIAS multiply(multiply(intDiv(toUInt32(rtime), 10), 10), 1000) :: 0 -> t UInt64 : 2 │
│ Positions: 2 1 │
│ Sorting (Sorting for ORDER BY) │
│ Header: multiply(multiply(intDiv(toUInt32(rtime), 10), 10), 1000) UInt64 │
│ count() UInt64 │
│ Sort description: multiply(multiply(intDiv(toUInt32(rtime), 10), 10), 1000) ASC │
│ Expression (Before ORDER BY) │
│ Header: multiply(multiply(intDiv(toUInt32(rtime), 10), 10), 1000) UInt64 │
│ count() UInt64 │
│ Actions: INPUT :: 0 -> multiply(multiply(intDiv(toUInt32(rtime), 10), 10), 1000) UInt64 : 0 │
│ INPUT :: 1 -> count() UInt64 : 1 │
│ Positions: 0 1 │
│ Aggregating │
│ Header: multiply(multiply(intDiv(toUInt32(rtime), 10), 10), 1000) UInt64 │
│ count() UInt64 │
│ Keys: multiply(multiply(intDiv(toUInt32(rtime), 10), 10), 1000) │
│ Aggregates: │
│ count() │
│ Function: count() → UInt64 │
│ Arguments: none │
│ Skip merging: 0 │
│ Expression (Before GROUP BY) │
│ Header: multiply(multiply(intDiv(toUInt32(rtime), 10), 10), 1000) UInt64 │
│ Actions: INPUT : 0 -> rtime DateTime : 0 │
│ COLUMN Const(UInt8) -> 10 UInt8 : 1 │
│ COLUMN Const(UInt16) -> 1000 UInt16 : 2 │
│ FUNCTION toUInt32(rtime :: 0) -> toUInt32(rtime) UInt32 : 3 │
│ FUNCTION intDiv(toUInt32(rtime) :: 3, 10 : 1) -> intDiv(toUInt32(rtime), 10) UInt32 : 0 │
│ FUNCTION multiply(intDiv(toUInt32(rtime), 10) :: 0, 10 :: 1) -> multiply(intDiv(toUInt32(rtime), 10), 10) UInt64 : 3 │
│ FUNCTION multiply(multiply(intDiv(toUInt32(rtime), 10), 10) :: 3, 1000 :: 2) -> multiply(multiply(intDiv(toUInt32(rtime), 10), 10), 1000) UInt64 : 1 │
│ Positions: 1 │
│ Filter (WHERE) │
│ Header: rtime DateTime │
│ Filter column: and(and(greaterOrEquals(day, toDate(1678356215)), lessOrEquals(day, toDate(1678356515))), and(greaterOrEquals(rtime, toDateTime(1678356215)), lessOrEquals(rtime, toDateTime(1678356515)))) (removed) │
│ Actions: INPUT : 0 -> rtime DateTime : 0 │
│ INPUT : 1 -> day Date : 1 │
│ COLUMN Const(UInt16) -> toDate(1678356215) Date : 2 │
│ COLUMN Const(UInt16) -> toDate(1678356515) Date : 3 │
│ COLUMN Const(UInt32) -> toDateTime(1678356215) DateTime : 4 │
│ COLUMN Const(UInt32) -> toDateTime(1678356515) DateTime : 5 │
│ FUNCTION greaterOrEquals(day : 1, toDate(1678356215) :: 2) -> greaterOrEquals(day, toDate(1678356215)) UInt8 : 6 │
│ FUNCTION lessOrEquals(day :: 1, toDate(1678356515) :: 3) -> lessOrEquals(day, toDate(1678356515)) UInt8 : 2 │
│ FUNCTION greaterOrEquals(rtime : 0, toDateTime(1678356215) :: 4) -> greaterOrEquals(rtime, toDateTime(1678356215)) UInt8 : 3 │
│ FUNCTION lessOrEquals(rtime : 0, toDateTime(1678356515) :: 5) -> lessOrEquals(rtime, toDateTime(1678356515)) UInt8 : 4 │
│ FUNCTION and(greaterOrEquals(day, toDate(1678356215)) :: 6, lessOrEquals(day, toDate(1678356515)) :: 2) -> and(greaterOrEquals(day, toDate(1678356215)), lessOrEquals(day, toDate(1678356515))) UInt8 : 5 │
│ FUNCTION and(greaterOrEquals(rtime, toDateTime(1678356215)) :: 3, lessOrEquals(rtime, toDateTime(1678356515)) :: 4) -> and(greaterOrEquals(rtime, toDateTime(1678356215)), lessOrEquals(rtime, toDateTime(1678356515))) UInt8 : 2 │
│ FUNCTION and(and(greaterOrEquals(day, toDate(1678356215)), lessOrEquals(day, toDate(1678356515))) :: 5, and(greaterOrEquals(rtime, toDateTime(1678356215)), lessOrEquals(rtime, toDateTime(1678356515))) :: 2) -> and(and(greaterOrEquals(day, toDate(1678356215)), lessOrEquals(day, toDate(1678356515))), and(greaterOrEquals(rtime, toDateTime(1678356215)), lessOrEquals(rtime, toDateTime(1678356515)))) UInt8 : 4 │
│ Positions: 0 4 │
│ ReadFromPreparedSource (Read from NullSource) │
│ Header: rtime DateTime │
│ day Date
打印pipeline: explain pipeline header = 1, graph = 1
计划解读
表达式
CH表达式
我们截取TPC-H Q1的CH计划片段
Expression (Project)
Header: l_returnflag Nullable(String)
l_linestatus Nullable(String)
l_quantity Nullable(Float64)
l_extendedprice Nullable(Float64)
multiply(l_extendedprice,minus(1_1,l_discount)) Nullable(Float64)
multiply(multiply(l_extendedprice,minus(1_2,l_discount)),plus(1_3,l_tax)) Nullable(Float64)
l_discount Nullable(Float64)
1_4 Int32
Actions: INPUT :: 0 -> l_quantity Nullable(Float64) : 0
INPUT : 1 -> l_extendedprice Nullable(Float64) : 1
INPUT : 2 -> l_discount Nullable(Float64) : 2
INPUT : 3 -> l_tax Nullable(Float64) : 3
INPUT :: 4 -> l_returnflag Nullable(String) : 4
INPUT :: 5 -> l_linestatus Nullable(String) : 5
COLUMN Const(Float64) -> 1_1 Float64 : 6
COLUMN Const(Float64) -> 1_2 Float64 : 7
COLUMN Const(Float64) -> 1_3 Float64 : 8
COLUMN Const(Int32) -> 1_4 Int32 : 9
FUNCTION minus(1_1 :: 6, l_discount : 2) -> minus(1_1,l_discount) Nullable(Float64) : 10
FUNCTION minus(1_2 :: 7, l_discount : 2) -> minus(1_2,l_discount) Nullable(Float64) : 6
FUNCTION plus(1_3 :: 8, l_tax :: 3) -> plus(1_3,l_tax) Nullable(Float64) : 7
FUNCTION multiply(l_extendedprice : 1, minus(1_1,l_discount) :: 10) -> multiply(l_extendedprice,minus(1_1,l_discount)) Nullable(Float64) : 3
FUNCTION multiply(l_extendedprice : 1, minus(1_2,l_discount) :: 6) -> multiply(l_extendedprice,minus(1_2,l_discount)) Nullable(Float64) : 10
FUNCTION multiply(multiply(l_extendedprice,minus(1_2,l_discount)) :: 10, plus(1_3,l_tax) :: 7) -> multiply(multiply(l_extendedprice,minus(1_2,l_discount)),plus(1_3,l_tax)) Nullable(Float64) : 6
Positions: 4 5 0 1 3 6 2 9
表达式计算一般出现在Project算子里
- Header: Project算子的输出schema
- Actions: Project算子的计算内容
- Position: Actions到header的映射关系
Actions分成三种类型:
- Input: 由前置算子输入的column
- Column: Literal值,const column
- Alias: 为输出增加alias name
- Function: 普通函数计算,
- ArrayJoin: 与普通函数不同,CH中针对array join特殊处理。对应着Spark中的lateral view explode
相关源码:
ActionsDAG: 用于表示某个算子中的一组表达式,这些表达式共同组成了一个DAG.
- addInput: 增加输入column
- addColumn: 增加literal column
- addAlias: 增加alias映射
- addArrayJoin: 增加array join计算
- addFunction: 增加普通函数计算
- addCast: 本质上还是增加普通函数cast, 只是方便调用
- findInOutputs: 从actions的输出中找到指定列
- addOrReplaceInOutputs: 从actions的输出中找到指定列并取而代之。如果不存在则直接添加。
- project:规整化ActionsDAG, 批量添加必要的输出列和ALIAS, 删除无用的action(没有被任何输入所依赖的计算)
ActionsVisitor: 用于在ExpressionAnalyzer中,将输入的AST解析成不同阶段的ActionsDAG
ExpressionActionsChain: 表示多组ActionsDag, 例如SELECT和WHERE中的expression对应两个独立的ActionsDAG。用于ExpressionAnalyzer中,将解析AST得到的ActionsDAG追加到其中。
ExpressionActions: 基于输入的Block执行ActionsDAG。用于Expression算子执行(ExpressionStep/ExpressionTransform)
ArrayJoinAction: 基于输入的Block执行Array Join Action,注意与ExpressionActions是并列关系。用于ArrayJoin算子执行(ArrayJoinStep/ArrayJoinTransform)
substrait表达式
message Expression {
oneof rex_type {
/// query中的常量,分为不同的类型,除了interval/fixedxxx/userdefined外都已支持
Literal literal = 1;
/// 对字段的引用,例如select a from table,select a.b from table
FieldReference selection = 2;
/// 普通函数
ScalarFunction scalar_function = 3;
/// 窗口函数
WindowFunction window_function = 5;
/// 对应case when或if clause
IfThen if_then = 6;
/// 目前没用到,其实是IfThen的子集
SwitchExpression switch_expression = 7;
/// 用于表示 a in (b, c, d)
SingularOrList singular_or_list = 8;
/// 没用到
MultiOrList multi_or_list = 9;
/// 将input的类型强转成指定类型,failure_behavior表示失败后如何处理
Cast cast = 11;
/// 没用到
Subquery subquery = 12;
/// 没用到
Nested nested = 13;
// deprecated: enum literals are only sensible in the context of
// function arguments, for which FunctionArgument should now be
// used
Enum enum = 10 [deprecated = true];
}
substrait到CH表达式的转化
代码入口:expressionsToActionsDAG,将substrait::Expression转化为CH中的ActionsDAG
- 如果Expression类型是FieldReference, 根据input schema获取column name, 到ActionsDAG的output column中搜索,加入到required_columns中
- 如果Expression类型是ScalarFunction
- 递归解析function arguments(也是substrait::Expression类型),并将其加入到ActionsDAG中
- 根据映射关系(SCALAR_FUNCTIONS)将substrait function name转化为ch function name,生成FunctionOverloadResolverPtr
- 执行ActionsDAG::addFunction将该函数加入其中。例外:alias函数用ActionsDAG::addAlias, array join函数用ActionsDAG::addArrayJoin
- 如果Expression类型是Literal, 将Literal对应的const column通过ActionsDAG::addColumn加入其中
- 如果Expression类型是Cast, 处理类似ScalarFunction,对应ch function: toInt/toUInt/toString/toDecimal/toDate32
- 如果Expression类型是IfThen, 处理类似ScalaFunction, 对应ch function: multiIf
- 其他:略
算子
substrait算子
// A relation with output field names.
//
// This is for use at the root of a `Rel` tree.
message RelRoot {
// A relation
Rel input = 1;
// Field names in depth-first order
repeated string names = 2;
}
// A relation (used internally in a plan)
message Rel {
oneof rel_type {
/// source算子,ReadFromPreparedSource
ReadRel read = 1;
/// filter算子,对应CH FilterStep/FilterTransform
FilterRel filter = 2;
/// Limit算子,对应CH LimitStep/LimitTransform
FetchRel fetch = 3;
/// aggregate算子,partial阶段对应AggregatingStep/AggregatingTransform, final阶段对应MergingAggregated/MergingAggregatedTransform
AggregateRel aggregate = 4;
/// sort算子,对应CH SortingStep/SortingTransform
SortRel sort = 5;
/// join算子,对应CH JoinStep/JoiningTransform
JoinRel join = 6;
/// project算子, 对应CH ExpressionStep/ExpressionTransform
ProjectRel project = 7;
SetRel set = 8;
ExtensionSingleRel extension_single = 9;
ExtensionMultiRel extension_multi = 10;
ExtensionLeafRel extension_leaf = 11;
/// cross join算子,暂时没用
CrossRel cross = 12;
/// grouping sets算子,对应CH ExpandStep/ExpandTransform
ExpandRel expand = 13;
/// window算子,对应CH WindowStep/WindowTransform
WindowRel window = 14;
/// generate算子,对应spark explode lateral view,对应CH ArrayJoinStep/ArrayJoinTransform
GenerateRel generate = 15;
}
}
CH算子
注意这里说的算子是物理算子
接口:IQueryPlanStep
代码:src/Processors/QueryPlan/*Step.h
CH中算子按照拓扑结构分成三类
- Source算子:输入为空。读取data source的算子,例如读取MergeTree、Parquet、ORC、Java Iterator
- Transform算子:输入和输出非空。如project, filter, join, aggregate等
- Sink算子:输出为空。将数据写入到MergeTree、Parquet、ORC文件中。
CH中包含算子的QueryPlan会构建QueryPipeLine,然后由PipelineExecutor执行
Source算子的实现
ReadFromStorage (read local files)
Header: l_quantity Nullable(Float64)
l_extendedprice Nullable(Float64)
l_discount Nullable(Float64)
l_tax Nullable(Float64)
l_returnflag Nullable(String)
l_linestatus Nullable(String)
l_shipdate Nullable(Date32)
ReadFromStorageStep算子用于将现成的Pipe加入到QueryPlan中作为Source算子。这个设计的精妙之处在于:
- 通过ReadBuffer的抽象,我们可通过ReadFromStorageStep读取CH已支持的各种压缩格式(zstd, lz4, snappy等)、各种文件系统(local, s3, hdfs)
- 通过InputFormat的抽象,我们可通过ReadFromStorageStep读取任意CH已支持的Format(CSV, Parquet, Arrow, ORC等)
用户通过ReadBuffer和InputFormat的排列组合即可生成ReadFromStorageStep,实现各类功能
- 首先根据压缩格式、文件系统构造ReadBuffer对象
- 其次根据ReadBuffer对象构造InputFormat对象
- InputFormat对象本质上是一个Processor, 通过它可构造Pipe
- 通过Pipe构造ReadFromStorageStep,加入到QueryPlan中
- 通过QueryPlan构造QueryPipeline, 通过PipelineExecutor执行之
扩展阅读:
- ReadBufferFromHDFS: 从hdfs filesystem读取数据
- ReadBufferFromFile:从local filesystem读取数据
- ReadBufferFromS3: 从s3 filesystem读取数据
- Lz4InflatingReadBuffer:在上游ReadBuffer上级联Lz4解压
- ZstdInflatingReadBuffer:在上游ReadBuffer上级联zstd解压
- SnappyReadBuffer: 在上游ReadBuffer上级联snappy解压
- CSVRowInputFormat:从ReadBuffer读取csv数据并解析后输出Block
- ParquetBlockInputFormat: 从ReadBuffer读取parquet数据并解析后输出Block
- ORCBlockInputFormat: 从ReadBuffer中读取orc数据并解析后输出Block
Project算子的实现
流程如下:
- 首先构造Pipeline: 调用ExpressionStep::transformPipeline
- 由ActionsDAG构造ExpressionActions,继而构造ExpressionTransform,追加到pipeline中。
- 如果发现pipeline的输出header与ExpressionStep的输出header有差异,生成一个新的ActionsDAG实现类型强转,追加到pipeline中
- 然后执行Pipeline: 调用ExpressionTransform::transform → ExpressionActions::execute
- 根据计算required_columns到input block中字段的映射关系
- 遍历actions, 调用executeAction执行。生成新的Block并替换回输入Block
- 如果action是FUNCTION类型,如果is_lazy_executed = true(一般用于and/or/multiIf的短路执行优化)时,result column为ColumnFunction,此时不会执行function;如果is_function_compiled = true(用于JIT等优化场景),则执行编译好的CompiledFunction。否则执行普通函数
- 如果action是ARRAY_JOIN类型,根据array join key column的offset对其他column执行replicate操作,生成result column
- 如果action是COLUMN类型,修改const column的num_rows
- 如果action是ALIAS类型,将source column复制到result column,并修改result column的name
- 如果action是INPUT类型,将inputs中的column move到result position
Aggregate算子的实现
第一阶段: AggregatingStep,完成初步聚合
- 首先构造pipeline: 调用AggregatingStep::transformPipeline
- 如果pipeline的输出并行度>1, 构造parallel的AggregatingTransform,加入到pipeline中
- 如果pipeline的输出并行度 = 1, 构造串行的AggregatingTransform加入pipeline
- 然后执行pipeline: 调用AggregatingTransform::work
- 首先从上游算子pull一个chunk
- 不断调用Aggregator::executeOnBlock对输入的block进行聚合,创建AggregateFunction columns存储聚合的中间状态并不断更新。Aggregator::executeOnBlock的流程:
- 首先根据aggregate method初始化AggregatedDataVariants中的聚合函数指针
- 如果是without_key method, 初始化AggregatedDataVariants中的without_key聚合结构,并执行executeWithoutKeyImpl
- 遍历每一个aggregate function, 执行对应的addBatchSinglePlace方法
- 如果是其他method, 执行executeImplBatch
- 遍历每一行,判断该行的key是否重复,如果不重复新建method对应的aggregate data, 否则复用已有data. places[row_idx]对应着该行的aggregate data,然后遍历每一个aggregate function, 批量执行add, 将数据聚合到对应的aggregate data中。
- 当上游无输入或达到max_rows_to_group_by上限时,AggregatingTransform::consume结束, is_consume_finished = true
- 判断当前AggregatingTransform是否为同一批并行聚合算子中最后一个完成consume的。如果不是则当前算子结束。否则:
- 对所有并行算子产出的ManyAggregatedDataVariants结构进行预处理:去掉空值,按照size大小逆序排列,预分配内存池,都是为了性能,见Aggregator::prepareVariantsToMerge
- 在pipeline中后置ConvertingAggregatedToChunksTransform算子,将聚合中间状态进行合并,流程如下:
- 首先初始化:分配arena,针对without key聚合有特殊处理
- 开始合并:Aggregator::mergeSingleLevelDataImpl, 对每一项aggregate data, 调用mergeDataImpl将他们合并到一处,再调用convertToBlockImpl将汇总好的aggregate data转化成Block。如果final = true, 则以最终聚合结果输出,否则以中间结果的形式输出
第二阶段:MergingAggregatedStep,完成最终聚合
- 首先构造MergingAggregatedTransform加入到pipeline中,并行度为1
- 然后执行pipeline: 调用MergingAggregatedTransform::work
- consume阶段:不断调用MergingAggregatedTransform::consume消费输入的Block。该阶段会将输入的column追加到内存中。
- generate阶段:调用Aggregator::mergeBlocks将之前输入的blocks进行合并
- 如果是aggregate method = without_key,用mergeBlockWithoutKeyStreamsImpl
- 否则用mergeStreamsImpl → mergeStreamsImplCase
- 遍历每一行,判断key是否重复,指定该行的aggregate data
- 遍历每个aggregate function, 执行mergeBatch,将输入block中的aggregate data合并到目标aggregate data中。
注意:CH Aggregator中针对不同key的类型进行了深度优化,例如无key, 不同长度的整形key,string类型的key, LowCardinality类型的key, nullable类型的key,用于聚合的算法与数据结构都是不一样的,体现了CH极致的性能优化策略。
从代码中可以看到,IAggregateFunction有不同的add接口用于处理输入:
- add: 一次处理一行数据,性能不如addBatch
- addBatchSinglePlace: 一次处理一批数据,支持if combinator(可选),如果当前行的if条件不成立,则该行不参与聚合
- addBatchSinglePlaceNotNull: 一次处理一批数据,支持if/null combinator,前者可选。如果当前行尾null值,则该行不参与聚合
- addManyDefaults: 一次处理一批default value
- addBatchSparse: 一次处理一批ColumnSparse类型的输入,针对ColumnSparse结构做了优化
- merge: 一次merge ColumnAggregateFunction中的一行数据
- mergeBatch: 一次merge ColumnAggregateFunction中的一批数据
2.2.2.4 Sort算子的实现
代码入口:SortingStep
流程(以full sort为例):
- 构造pipeline:
- 如果有limit clause, 则将PartialSortingTransform加入pipeline
- 追加LimitsCheckingTransform(为了检查是否超出local rows/bytes限制)
- 追加MergeSortingTransform
- 如果pipeline并发度>1, 追加MergingSortedTransform
- 执行pipeline:
- PartialSortingTransform: 不断地对input Block进行topn排序,然后输出。这一步是粗排。这里有个性能优化:维护一组topn相关的阈值,高于该阈值的行全部过滤掉,低于该阈值的行才执行下一步的sortBlock。随着迭代不断调整该阈值。
- MergeSortingTransform: 将上游PartialSortingTransform输出的多个局部有序chunk进行归并排序,输出全局排序数据
- consume阶段:小优化(去掉输入Block中的const column),加入到chunks队列。随着chunks中数据积累到一定阈值,执行remerge(MergeSorter)。
- generate阶段:首先创建MergeSorter, 从中读取chunk, 然后之前去掉的const column加入到Chunk中来。
- MergingSortedTransform: 将上游多路MergeSortingTransform的有序输入进行归并排序。具体算法参考MergingSortedAlgorithm
substrait到CH算子的转化
substrait read算子的转化
// The scan operator of base data (physical or virtual), including filtering and projection.
message ReadRel {
RelCommon common = 1;
/// 读取数据源的schema, 包括column name和type
NamedStruct base_schema = 2;
Expression filter = 3;
Expression best_effort_filter = 11;
Expression.MaskExpression projection = 4;
substrait.extensions.AdvancedExtension advanced_extension = 10;
// Definition of which type of scan operation is to be performed
oneof read_type {
VirtualTable virtual_table = 5;
/// gluten实际运行中,local_files分为两种来源,一种来自iterator, 一种来自本地或远程的Parquet/ORC文件
LocalFiles local_files = 6;
NamedTable named_table = 7;
ExtensionTable extension_table = 8;
}
// Represents a list of files in input of a scan operation
message LocalFiles {
/// 一个或多个本地或远程文件。
repeated FileOrFiles items = 1;
substrait.extensions.AdvancedExtension advanced_extension = 10;
// Many files consist of indivisible chunks (e.g. parquet row groups
// or CSV rows). If a slice partially selects an indivisible chunk
// then the consumer should employ some rule to decide which slice to
// include the chunk in (e.g. include it in the slice that contains
// the midpoint of the chunk)
message FileOrFiles {
oneof path_type {
// A URI that can refer to either a single folder or a single file
string uri_path = 1;
// A URI where the path portion is a glob expression that can
// identify zero or more paths.
// Consumers should support the POSIX syntax. The recursive
// globstar (**) may not be supported.
string uri_path_glob = 2;
// A URI that refers to a single file
string uri_file = 3;
// A URI that refers to a single folder
string uri_folder = 4;
}
代码入口:SerializedPlanParser.h parseOP函数
- 如果local file来自于java iterator, 则将先用SourceFromJavaIter组装Pipe, 再用Pipe生成ReadFromPreparedSource算子,再把算子加入到QueryPlan中。
- 如果local file来自于本地或远程的Parquet/ORC文件,先用SubstraitFileSource组装Pipe, 再生成ReadFromStorageStep算子,加入到QueryPlan中
SourceFromJavaIter: CH算子,通过jni从gluten中的io/glutenproject/execution/ColumnarNativeIterator中批量读取Block。实现见SourceFromJavaIter::generate
SubstraitFileSource: CH算子, 支持从不同数据源local/s3/hdfs读取不同格式Parquet/ORC格式的数据。相关类:
- NormalFileReader: 从单个文件个读取文件,返回Chunk
- FormatFile: 对不同格式的文件的抽象, 目前已经实现了Text, ORC, Parquet三种格式
substrait project算子的转化
// This operator allows to represent calculated expressions of fields (e.g., a+b). Direct/Emit are used to represent classical relational projections
message ProjectRel {
RelCommon common = 1;
/// 输入算子
Rel input = 2;
/// project算子中包含的多组表达式
repeated Expression expressions = 3;
substrait.extensions.AdvancedExtension advanced_extension = 10;
}
转化流程相对简单:
- 执行expressionsToActionsDAG,将多组substrait::Expression转化为ActionsDAG
- 用ActionsDAG构造ExpressionStep,加入到QueryPlan中
substrait aggregate算子的转化
// The relational operator representing a GROUP BY Aggregate
message AggregateRel {
RelCommon common = 1;
// Input of the aggregation
Rel input = 2;
/// group by expressions,对应维度,可能有多个,可能是表达式
repeated Grouping groupings = 3;
/// aggregate expressions, 对应指标,可能有多个
repeated Measure measures = 4;
substrait.extensions.AdvancedExtension advanced_extension = 10;
message Grouping {
repeated Expression grouping_expressions = 1;
}
message Measure {
AggregateFunction measure = 1;
// An optional boolean expression that acts to filter which records are
// included in the measure. True means include this record for calculation
// within the measure.
// Helps to support SUM(<c>) FILTER(WHERE...) syntax without masking opportunities for optimization
Expression filter = 2;
}
}
转化流程:
- 执行SerializedPlanParser::parseAggregate,将substrait aggregate算子转化为QueryPlanStep
- 判断是否需要在CH计划中前置一个ExpressionStep, 满足以下两个条件之一即成立。如果是,则在CH聚合算子之前加入ExpressionStep
- aggregate function的参数为Literal
- aggregate function的参数为非nullable, 但aggregate function的output type为nullable, 且当前聚合算子处于Partial阶段。
- 将多组measures转化为AggregateDescriptions
- 判断当前aggregate算子是否为final阶段
- 如果是,则在QueryPlan加入MergingAggregatedStep算子
- 否则加入AggregatingStep算子
- 判断是否需要在CH计划中前置一个ExpressionStep, 满足以下两个条件之一即成立。如果是,则在CH聚合算子之前加入ExpressionStep
substrait sort算子的转化
// The ORDERY BY (or sorting) relational operator. Beside describing a base relation, it includes a list of fields to sort on
message SortRel {
RelCommon common = 1;
Rel input = 2;
/// 多组sort expressions: order by expr1, expr2, expr3
repeated SortField sorts = 3;
substrait.extensions.AdvancedExtension advanced_extension = 10;
}
// The description of a field to sort on (including the direction of sorting and null semantics)
message SortField {
Expression expr = 1;
oneof sort_kind {
/// sort排序方向,这里分的比较细
SortDirection direction = 2;
uint32 comparison_function_reference = 3;
}
enum SortDirection {
SORT_DIRECTION_UNSPECIFIED = 0;
SORT_DIRECTION_ASC_NULLS_FIRST = 1;
SORT_DIRECTION_ASC_NULLS_LAST = 2;
SORT_DIRECTION_DESC_NULLS_FIRST = 3;
SORT_DIRECTION_DESC_NULLS_LAST = 4;
SORT_DIRECTION_CLUSTERED = 5;
}
}
入口:SortRelParser::parse
转化流程
- 首先判断order by后是否有limit clause, 没有则limit = 0
- 将substrait SortField转化为CH中的SortDescription
- 构造SortingStep,加入到QueryPlan中。
文章作者 后端侠
上次更新 2025-05-21