在最近的大数据任务优化工作中,业务分析同学手里有上千个复杂的 Spark SQL,如果全靠人工逐个调优,不仅耗时耗力,还容易遗漏大量低级问题,如分区设计不合理、存储格式落后、缺少 Bucket、压缩策略不当等。
为了解决这个痛点,先对底层 Hive 表进行了系统梳理,维护了一套表特征库。特征库里记录了每张表的结构化信息,包括但不限于:
1.存储格式:算法团队使用的表统一为 TEXT 格式,SQL 分析型表统一为 ORC 格式(或 Parquet)。
2.数据规模:表总行数、占用存储大小(GB/TB 级别)。
3.分区信息:是否分区、分区字段(日期/业务维度)、分区粒度、分区数量、是否存在数据倾斜。
4.其他关键特征:Bucket 数量与排序字段、压缩方式(ZLIB/SNAPPY)、表类型(外部表/内部表)、最近更新时间、血缘依赖关系等。
工作流核心优势
1.自动化闭环:从 SQL 解析 → 特征库查询 → Hive 元数据拉取 → 获取explain执行计划 → LLM 智能分析 → 优化报告,全程无人值守。
2.知识驱动:不再是“裸 LLM 瞎猜”,而是把公司沉淀的表特征库 + 真实元数据作为 Prompt 上下文,让 AI 给出精准、可落地的优化建议。
3.批量处理:一次上传包含上百张表的 SQL 文件也能秒级处理。
工作流详细拆解
整个流程采用 Dify Workflow 可视化拖拽搭建
1.起始节点:上传 SQL 文件
支持直接拖拽 .sql 文件,兼容多表 Spark SQL(SELECT / CREATE TABLE / INSERT OVERWRITE 等)。
2.提取 SQL 文件内容
将文件转为纯文本,供后续解析使用。
3.解析 SQL:提取表名
自动识别所有涉及的 Hive 表名。
import re
from typing import Dict, List, Set, Tuple
XXX_DB_PREFIX = "xxx_data_"
def _strip_comments(sql: str) -> str:
# 去掉 /* ... */ 块注释
sql = re.sub(r"/\*.*?\*/", " ", sql, flags=re.S)
# 去掉 -- 行注释
sql = re.sub(r"--.*?$", " ", sql, flags=re.M)
return sql
def _normalize_space(sql: str) -> str:
return re.sub(r"\s+", " ", sql).strip()
def _find_default_db(sql: str) -> str:
m = re.search(r"(?i)\buse\s+([a-zA-Z0-9_]+)\b", sql)
return m.group(1) if m else ""
def _extract_targets(sql: str, default_db: str) -> Set[str]:
targets = set()
# insert overwrite table xxx / insert into table xxx
for m in re.finditer(r"(?i)\binsert\s+(?:overwrite|into)\s+table\s+([a-zA-Z0-9_\.]+)", sql):
t = m.group(1)
targets.add(t)
# create table xxx as select ...
for m in re.finditer(r"(?i)\bcreate\s+table\s+([a-zA-Z0-9_\.]+)\s+as\b", sql):
targets.add(m.group(1))
# 规范化:补 default_db
norm = set()
for t in targets:
if "." not in t and default_db:
norm.add(f"{default_db}.{t}")
else:
norm.add(t)
return norm
def _is_valid_table_token(tok: str) -> bool:
# 过滤掉 from (select ...) 之类
if tok.startswith("("):
return False
low = tok.lower()
if low in ("select", "values"):
return False
return True
def _extract_sources(sql: str, default_db: str) -> Set[str]:
sources = set()
# from/join 后面的第一个 token 通常是表名(忽略 lateral/view 等复杂结构,先覆盖你主要场景)
for m in re.finditer(r"(?i)\b(from|join)\s+([a-zA-Z0-9_\.]+)", sql):
tok = m.group(2)
if not _is_valid_table_token(tok):
continue
# 去掉可能的反引号
tok = tok.replace("`", "")
# 补 default_db
if "." not in tok and default_db:
tok = f"{default_db}.{tok}"
sources.add(tok)
return sources
def _filter_xxx_tables(tables: Set[str]) -> Set[str]:
out = set()
for t in tables:
t = t.replace("`", "")
if "." not in t:
continue
db, tb = t.split(".", 1)
if db.startswith(XXX_DB_PREFIX) and tb:
out.add(f"{db}.{tb}")
return out
def _extract_column_tokens(sql: str) -> Set[str]:
# 粗粒度抓 where/on 里出现过的字段 token,用于判断分区是否有被限制
cols = set()
for m in re.finditer(r"(?i)\b(where|on)\b(.*?)(?=\b(group|order|having|limit|union|join|where|insert|create)\b|$)", sql):
segment = m.group(2)
# 抓类似 a.b、b、`b` 这种 token
for tok in re.findall(r"`?[a-zA-Z_][a-zA-Z0-9_]*`?(?:\.`?[a-zA-Z_][a-zA-Z0-9_]*`?)?", segment):
tok = tok.replace("`", "")
# 只保留列名部分(a.b -> b)
if "." in tok:
tok = tok.split(".", 1)[1]
cols.add(tok)
return cols
def main(sql_raw: str) -> dict:
sql_raw = sql_raw or ""
sql1 = _strip_comments(sql_raw)
sql2 = _normalize_space(sql1)
default_db = _find_default_db(sql2)
targets = _extract_targets(sql2, default_db)
sources = _extract_sources(sql2, default_db)
# 只保留 xxx_data_ 开头库
targets = _filter_xxx_tables(targets)
sources = _filter_xxx_tables(sources)
# 源表排除目标表
source_tables = sorted(list(sources - targets))
target_tables = sorted(list(targets))
# 粗粒度过滤字段:全局 where/on 出现的列 tokens
global_cols = _extract_column_tokens(sql2)
partition_filters: Dict[str, List[str]] = {}
for t in source_tables:
# 先把全局列 tokens 记上,后面 DDL 分区列会跟它对比
partition_filters[t] = sorted(list(global_cols))
return {
"default_db": default_db,
"source_tables": source_tables,
"target_tables": target_tables,
"partition_filters": partition_filters,
}4.迭代提取真实 Hive 元数据 && 查询表的特征库(核心节点)
根据提取出的表名,批量查询我们预先维护的表特征库(支持失败自动重试 3 次,保证稳定性),获取每张表的完整画像(存储格式、数据量、分区策略、Bucket 信息、使用场景等),这步相当于给 AI 提前“喂”了公司级最佳实践知识。

工作流输出结果
Prompt核心逻辑包括, 分区优化建议(粒度、字段、动态分区), 存储格式升级(TEXT → ORC/Parquet + 合适压缩), Bucket / Sort / Bloom Filter 推荐, 字段类型规范、注释补全、ACID 化建议, 结合表数据量和使用场景给出个性化优化, 最终输出结构化优化报告 + 重写后的完整 Spark SQL DDL。

评论