BlankLin

lazy and boring

0%

背景介绍

  • 什么是StarRocks
    StarRocks是一款极速全场景MPP企业级数据库产品,具备水平在线扩缩容,金融级高可用,兼容MySQL协议和MySQL生态,提供全面向量化引擎与多种数据源联邦查询等重要特性。StarRocks致力于在全场景OLAP业务上为用户提供统一的解决方案,适用于对性能,实时性,并发能力和灵活性有较高要求的各类应用场景。
  • 滴滴olap现状
    当前在滴滴内部,主要大数据olap生态包括clickhouse/druid/starrocks等,而写入olap可以划分为两个方向,分布是实时和离线:
    • 2.1 实时方向,包括网约车/金融等大部分业务侧均是使用kafka/ddmq作为source端,将数据通过flink实时攒批写入到大数据olap存储侧
    • 2.2 离线方向,包括网约车/金融等大部分业务侧则是将数据通过hive/mysql/hdfs这个source端,经过各项加工后流入到olap这个sink存储侧
  • 本文主题
    本篇文章主要是介绍在starrocks这一侧,如果实现将hive数据进行加工后写入到starrock,涉及到starrokc的源码解析、组件介绍、及相关平台架构

架构设计

如下图,是当前滴滴内部hive2sr导入的实现架构图,用户主要是通过访问同步中心,配置hive表和sr表的字段映射及默认值,同步中心会将映射关系通过srm开放的http接口传给srm侧,srm进行相关处理后提交给sr的各个组件,组件之间通过thrift server进行rpc通信,而srm则通过http方式进行任务最终状态监听,当任务完成后返回给数梦的任务调度系统,最终完成整个全链路流程.
架构图

基本原理

  1. 创建目标分区
    同步中心会将hive2sr当前批次需要导入hive的分区列表传入srm,srm会校验当前sr表对应分区是否存在,不存在则按照hive分区值对应创建出sr表分区
  2. 创建目标分区
    上面这一步创建出目标分区后,srm会对应目标分区range范围,在sr目标表中创建出对应的临时分区,临时分区的作用是这样的,srm可以在一张已经定义分区规则的分区表上,创建临时分区,并为这些临时分区设定单独的数据分布策略。将hive数据写入指定的临时分区后,通过原子覆盖写操作(调整分区分桶策略),srm可以将临时分区作为临时可用的数据载体覆盖到对应的目标分区,用以实现覆盖写操作
  3. 创建load任务
    3.1 这个load首先需要配置涉及sr访问hadoop环境的相关参数,这个操作相当于在sr中配置hdfs-site.xml和core-site.xml
    3.2 其实load任务需要组装出提交给spark的driver任务内容,通过cluster模式,以master on yarn的方式在fe节点提交出spark任务
  4. fe进程调度etl阶段
    在etl阶段,fe会开启spark driver等待yarn队列资源充足时,提交给yarn取执行spark任务,Spark集群执行ETL完成对导入数据的预处理。包括全局字典构建(BITMAP类型)、分区、排序、聚合等。预处理后的数据按parquet数据格式落盘HDFS存储侧。
  5. etl完成后fe会调度load阶段
    ETL 任务完成后,FE获取预处理过的每个分片的hdfs数据路径,并调度相关的 broker 执行 load 任务
  6. broker加载hdfs文件后通知be进行push
    BE 通过 Broker 进程读取 HDFS 数据,转化为 StarRocks 存储格式。此时临时分区完成对应hive分区数据的加载过程
  7. 将临时分区替换到目标分区
    临时分区的数据将完成的替换到目标分区去,而目标分区数据将被删除,查询后将是新的hive分区数据

源码解析

fe进程启动及相关调度处理

  • 入口文件->StarRocksFE.java

    1
    2
    3
    4
    .....
    feServer.start();
    httpServer.start();
    qeService.start();

    主要是初始化配置和启动服务,分别是mysql server端口、thrift server端口、http端口

  • mysq服务启动->QeService.java
    由于我们都是通过tcp协议来连sr,所以主要关注QeService

    1
    2
    3
    4
    5
    6
    7
    public void start() throws IOException {
    if (!mysqlServer.start()) {
    LOG.error("mysql server start failed");
    System.exit(-1);
    }
    LOG.info("QE service start.");
    }
  • MysqlServer.java
    这里主要是开启mysql协议的服务

    1
    2
    3
    4
    5
    6
    7
    8
    9
    public boolean start() {
    ......
    // 打开fe的mysql协议的socket管道
    // 开启一个常驻线程用以监听mysql协议
    listener = ThreadPoolManager.newDaemonCacheThreadPool(1, "MySQL-Protocol-Listener", true);
    running = true;
    listenerFuture = listener.submit(new Listener());
    ......
    }
  • 提交本次连接的上下文到连接调度器

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    ........
    clientChannel = serverChannel.accept();
    if (clientChannel == null) {
    continue;
    }
    // 初始化本次session的上下文信息到连接调度器
    // submit this context to scheduler
    ConnectContext context = new ConnectContext(clientChannel, sslContext);
    // Set globalStateMgr here.
    context.setGlobalStateMgr(GlobalStateMgr.getCurrentState());
    if (!scheduler.submit(context)) {
    LOG.warn("Submit one connect request failed. Client=" + clientChannel.toString());
    // clear up context
    context.cleanup();
    }
    ............
    ```

    + 提交本次连接的上下文给线程池

    public boolean submit(ConnectContext context) {
    if (context == null) {

      return false;
    

    }

    context.setConnectionId(nextConnectionId.getAndAdd(1));
    // no necessary for nio.
    if (context instanceof NConnectContext) {

      return true;
    

    }
    // 这里是将Runnable提交到connect-scheduler-pool线程池
    if (executor.submit(new LoopHandler(context)) == null) {

      LOG.warn("Submit one thread failed.");
      return false;
    

    }
    return true;
    }

    1
    2

    + LoopHandler.java (实现Runnable接口)

    public void run() {
    …..
    // 注册本次连接,sr会计算当前fe节点总的连接数,在每次连接超过1024后进行sleep连接的驱逐流程
    if (registerConnection(context)) {

      MysqlProto.sendResponsePacket(context);
    

    } else {

      context.getState().setError("Reach limit of connections");
      MysqlProto.sendResponsePacket(context);
      return;
    

    }
    ………
    // 常驻,进行核心sql的parser-》analyze-》rewrite-》logical plan-》optimizer-》physical plan
    ConnectProcessor processor = new ConnectProcessor(context);
    processor.loop();
    ……….
    }

    1
    2

    + ConnectProcessor.java -> loop

    public void loop() {
    while (!ctx.isKilled()) {

      try {
          processOnce();
      } catch (Exception e) {
          // TODO(zhaochun): something wrong
          LOG.warn("Exception happened in one seesion(" + ctx + ").", e);
          ctx.setKilled();
          break;
      }
    

    }
    }

    1
    2

    + ConnectProcessor.java -> processOnce

    // handle one process
    public void processOnce() throws IOException {
    // 重置上下文的状态
    ctx.getState().reset();
    executor = null;

    // 重置mysql协议的顺序标识符
    final MysqlChannel channel = ctx.getMysqlChannel();
    channel.setSequenceId(0);
    // 从通道里获取数据包
    try {

      packetBuf = channel.fetchOnePacket();
      if (packetBuf == null) {
          throw new IOException("Error happened when receiving packet.");
      }
    

    } catch (AsynchronousCloseException e) {

      // when this happened, timeout checker close this channel
      // killed flag in ctx has been already set, just return
      return;
    

    }

    // 调度,这里主要是上面介绍核心sql的parser-》analyze-》rewrite-》logical plan-》optimizer-》physical plan过程
    dispatch();
    ………….
    }

    1
    2

    + ConnectProcessor.java -> dispatch

    …….
    // 这里主要是实现mysql协议的几种状态
    switch (command) {
    case COM_INIT_DB:

      handleInitDb();
      break;
    

    case COM_QUIT:

      handleQuit();
      break;
    

    case COM_QUERY:
    // 这里是完整的sql处理的总入口

      handleQuery();
      ctx.setStartTime();
      break;
    

    case COM_FIELD_LIST:

      handleFieldList();
      break;
    

    case COM_CHANGE_USER:

      handleChangeUser();
      break;
    

    case COM_RESET_CONNECTION:

      handleResetConnnection();
      break;
    

    case COM_PING:

      handlePing();
      break;
    

    default:

      ctx.getState().setError("Unsupported command(" + command + ")");
      LOG.warn("Unsupported command(" + command + ")");
      break;
    

    }
    ……

    1
    2

    + ConnectProcessor.java -> handleQuery

    ….
    StatementBase parsedStmt = null;
    try {
    ctx.setQueryId(UUIDUtil.genUUID());
    List stmts;
    try {

      //通过antlr4进行sql的解析,获取sql解析ast树列表
      stmts = com.starrocks.sql.parser.SqlParser.parse(originStmt, ctx.getSessionVariable());
    

    } catch (ParsingException parsingException) {

      throw new AnalysisException(parsingException.getMessage());
    

    }
    // 对ast语法树进行analyze分析过程
    for (int i = 0; i < stmts.size(); ++i) {

      ..........
      // Only add the last running stmt for multi statement,
      // because the audit log will only show the last stmt.
      if (i == stmts.size() - 1) {
          addRunningQueryDetail(parsedStmt);
      }
    
      executor = new StmtExecutor(ctx, parsedStmt);
      ctx.setExecutor(executor);
    
      ctx.setIsLastStmt(i == stmts.size() - 1);
    
      executor.execute();
    
      // 如果sql有一条执行失败,后续不再执行
      if (ctx.getState().getStateType() == QueryState.MysqlStateType.ERR) {
          break;
      }
      ........
    

    }
    }
    ….

1
2
#### 对sql进行Parser语法解析
+ SqlParser.java -> parser

// 首先,我们需要初始化 StarRocksLexer,即词法解析器。在这里,StarRocksLexer 是根据上文介绍的StarRocksLex.g4 词法文件,使用 Antlr4 自动生成的代码类。
StarRocksLexer lexer = new StarRocksLexer(new CaseInsensitiveStream(CharStreams.fromString(sql)));
lexer.setSqlMode(sessionVariable.getSqlMode());
// 然后,代码将词法解析器 StarRocksLexer 作为参数,传入语法解析器中。语法解析器类StarRocksParser,同样是根据上文介绍的 StarRocks.g4 语法文件自动生成的代码类。
CommonTokenStream tokenStream = new CommonTokenStream(lexer);
StarRocksParser parser = new StarRocksParser(tokenStream);
// 到这里,我们就完成了语法解析类的构建。之后再调用 parser.addErrorListener(new ErrorHandler()),将 Antlr4 的默认错误处理规则,替换为自定义的错误处理逻辑即可。
parser.removeErrorListeners();
parser.addErrorListener(new ErrorHandler());
parser.removeParseListeners();
parser.addParseListener(new TokenNumberListener(sessionVariable.getParseTokensLimit(),
Math.max(Config.expr_children_limit, sessionVariable.getExprChildrenLimit())));
……
List statements = Lists.newArrayList();
// 调用 parser.sqlStatements() 返回值 StarRocksParser.SqlStatementsContext,这是一套 antlr 自定义的抽象语法树,根据语法文件生成。
List singleStatementContexts = parser.sqlStatements().singleStatement();
for (int idx = 0; idx < singleStatementContexts.size(); ++idx) {
// 将 antlr 的语法树转换为 StarRocks 的抽象语法树
StatementBase statement = (StatementBase) new AstBuilder(sessionVariable.getSqlMode())
.visitSingleStatement(singleStatementContexts.get(idx));
statement.setOrigStmt(new OriginStatement(sql, idx));
statements.add(statement);
}

1
2
3

+ StarRocks.g4 -> loadStatement
因为本文主要介绍的导入流程,所以以LoadStatement来举例

loadStatement
: LOAD LABEL label=labelName
data=dataDescList?
broker=brokerDesc?
(BY system=identifierOrString)?
(PROPERTIES props=propertyList)?
| LOAD LABEL label=labelName
data=dataDescList?
resource=resourceDesc
(PROPERTIES props=propertyList)?
;
1
2
3
antlr 的语法文件采用 BNF 范式,用'|'表示分支选项,'?'表达0次或一次,其他符号可类比正则表达式。
+ AstBuilder.java -> visitLoadStatement
Antlr4 会根据语法文件生成一份 Visitor 模式的代码,这样就可以做到动作代码与文法产生式解耦,利于文法产生式的重用。而自定义的 AstBuilder 文件则继承了 StarRocksBaseVisitor,用于将 antlr 内部的 AST 翻译成 StarRocks 自定义的 AST。

public ParseNode visitLoadStatement(StarRocksParser.LoadStatementContext context) {
// 解析出load里写的label名称
LabelName label = getLabelName(context.labelName());
// 解析出load里的字段映射默认值配置等描述
List dataDescriptions = null;
if (context.data != null) {
dataDescriptions = context.data.dataDesc().stream().map(this::getDataDescription)
.collect(toList());
}
// 解析出load里关于spark任务配置的property属性
Map properties = null;
if (context.props != null) {
properties = Maps.newHashMap();
List propertyList = visit(context.props.property(), Property.class);
for (Property property : propertyList) {
properties.put(property.getKey(), property.getValue());
}
}
// 解析出load里引用的spark外部资源名称
if (context.resource != null) {
ResourceDesc resourceDesc = getResourceDesc(context.resource);
return new LoadStmt(label, dataDescriptions, resourceDesc, properties);
}
// 解析出broker的配置
BrokerDesc brokerDesc = getBrokerDesc(context.broker);
String cluster = null;
if (context.system != null) {
cluster = ((Identifier) visit(context.system)).getValue();
}
// visit 的返回值返回一个 AST 的基类,在StarRocks 中称为 ParseNode
return new LoadStmt(label, dataDescriptions, brokerDesc, cluster, properties);
}
1
2
3
4
5
我们对 Parser 源码进行了重点分析,包括 ANTLR4、SqlParser 和 ASTBuilder。与此同时,我们还通过一个例子,介绍了如何将一条文本类型的 SQL 语句,一步步解析成 StarRocks 内部使用的 AST。
可以看到,Parser 能判断出用户的 SQL 中是否存在明显的语法错误,如 SQL 语句select * from;会在Parser 阶段报错。但如果 SQL 语句select * from foo;没有语法错误,StarRocks 中也没有 foo 这张表,那么 StarRocks 该如何做到错误处理呢?这就需要依赖下一节的 Analyzer 模块去判断了。

#### 对sql进行analyzer语法解析
+ StmtExecutor.java-> executor

……..
// 本文主要介绍load流程,所以这里值关注ddl的词法分析过程
} else if (parsedStmt instanceof DdlStmt) {
handleDdlStmt();
}
……
1
2

+ StmtExecutor.java -> handleDdlStmt

private void handleDdlStmt() {
if (parsedStmt instanceof ShowStmt) {
com.starrocks.sql.analyzer.Analyzer.analyze(parsedStmt, context);
PrivilegeChecker.check(parsedStmt, context);

QueryStatement selectStmt = ((ShowStmt) parsedStmt).toSelectStmt();
if (selectStmt != null) {
    parsedStmt = selectStmt;
    execPlan = StatementPlanner.plan(parsedStmt, context);
}

} else {
// 这里是analyze解析的入口函数
execPlan = StatementPlanner.plan(parsedStmt, context);
}
}

1
2

+ StatementPlanner.java -> plan

// 注意在analyze阶段,也是会进行锁库操作(db.readLock();)
lock(dbs);
try (PlannerProfile.ScopedTimer ignored = PlannerProfile.getScopedTimer(“Analyzer”)) {
Analyzer.analyze(stmt, session);
}

PrivilegeChecker.check(stmt, session);
if (stmt instanceof QueryStatement) {
OptimizerTraceUtil.logQueryStatement(session, “after analyze:\n%s”, (QueryStatement) stmt);
}

1
2
3
Analyzer 类是所有语句的语义解析的主入口,采用了 Visitor 设计模式,会根据处理的语句类型的不同,调用不同语句的 Analyzer。不同语句的处理逻辑会包含在单独的语句 Analyzer 中,交由不同的 Analyzer 处理

+ LoadExecutor.java -> accept

public R accept(AstVisitor visitor, C context) {
return visitor.visitLoadStatement(this, context);
}
1
2
3
![analyze](/images/starrocks/hive2sr/real_analyze.png)

+ LoadStmtAnalyzer.java -> visitLoadStatement

public Void visitLoadStatement(LoadStmt statement, ConnectContext context) {
analyzeLabel(statement, context);
analyzeDataDescriptions(statement);
analyzeProperties(statement);
return null;
}
1
2
3
如上所示,LoadStatement交由LoadStmtAnalyzer来专门处理解析,这样做的优点是,可以将 Statement 的定义文件和 Analyze 的处理逻辑分开,并且同一类型的语句也交由特定的 Analyzer 处理,做到不同功能之间代码的解耦合。

+ StmtExecutor.java -> handleDdlStmt

private void handleDdlStmt() {
……..
// 这里是ddl 执行分析的总入口
ShowResultSet resultSet = DDLStmtExecutor.execute(parsedStmt, context);
……..
}
1
2

+ DDLStmtExecutor.java -> execute

……..
// ddl statement analyze 处理逻辑的基类
return stmt.accept(StmtExecutorVisitor.getInstance(), context);
……..
1
2
3
![analyze](/images/starrocks/hive2sr/analyze.png)

+ DDLStmtExecutor.java -> visitLoadStatement

public ShowResultSet visitLoadStatement(LoadStmt stmt, ConnectContext context) {
ErrorReport.wrapWithRuntimeException(() -> {
EtlJobType jobType = stmt.getEtlJobType();
if (jobType == EtlJobType.UNKNOWN) {
throw new DdlException(“Unknown load job type”);
}
if (jobType == EtlJobType.HADOOP && Config.disable_hadoop_load) {
throw new DdlException(“Load job by hadoop cluster is disabled.”

                + " Try using broker load. See 'help broker load;'");
    }
    // 这里进行真正的创建load任务的分析过程
    context.getGlobalStateMgr().getLoadManager().createLoadJobFromStmt(stmt, context);
});
return null;

}

1
2
3
#### 创建load处理pending阶段

+ LoadManager.java -> createLoadJobFromStmt

public void createLoadJobFromStmt(LoadStmt stmt) throws DdlException {
// 校验库元数据
Database database = checkDb(stmt.getLabel().getDbName());
long dbId = database.getId();
LoadJob loadJob = null;
// 加写锁,防止并发
writeLock();
try {
// 校验lable是否已使用
checkLabelUsed(dbId, stmt.getLabel().getLabelName());
if (stmt.getBrokerDesc() == null && stmt.getResourceDesc() == null) {
throw new DdlException(“LoadManager only support the broker and spark load.”);
}
// 判断queue队列长度是否超过1024,超过就报错
if (loadJobScheduler.isQueueFull()) {
throw new DdlException(
“There are more than “ + Config.desired_max_waiting_jobs + “ load jobs in waiting queue, “

                      + "please retry later.");
  }
  // 按load类型初始化出LoadJob
  loadJob = BulkLoadJob.fromLoadStmt(stmt);
  // 在内存中记录该load元数据
  createLoadJob(loadJob);

} finally {
writeUnlock();
}
// 通知bdbje加入load元数据同步
Catalog.getCurrentCatalog().getEditLog().logCreateLoadJob(loadJob);

// loadJob守护进程从load job schedule queue取出任务去执行
loadJobScheduler.submitJob(loadJob);
}

1
2
3
4
5
6
7

通过 `createLoadJobFromStmt` 创建load任务
+ LoadJobScheduler.java -> process
注意:LoadJobScheduler 继承自 MasterDaemon,MasterDaemon 继承自 Daemon,
Daemon继承自Thread,重载了run方法,里面有一个loop,主要执行runOneCycle
MasterDaemon 又重写了 runOneCycle,执行 runAfterCatalogReady 函数
LoadJobScheduler 又重写了 runAfterCatalogReady 主要就是干process处理,里面是一个死循环,不断从LinkedBlockingQueue类型的needScheduleJobs里出栈取要执行的job

while (true) {
// take one load job from queue
LoadJob loadJob = needScheduleJobs.poll();
if (loadJob == null) {
return;
}

// schedule job
try {
loadJob.execute();
}

1
2

+ LoadJob.java -> execute

public void execute() throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException,
DuplicatedRequestException, LoadException {
// 加写锁,这里防止多并发提交
writeLock();
try {
unprotectedExecute();
} finally {
writeUnlock();
}
}
// 这里主要关注beginTxn,做了事务处理,目的是防止fe节点突然挂掉再拉起后其他follower节点切换到leader节点时会重新执行这个任务,而这个时候旧leader节点已经执行过一段这个任务,导致fe节点之间执行无法同步,所以直接抛出label已经存在不再继续执行
public void unprotectedExecute() throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException,
DuplicatedRequestException, LoadException {
// check if job state is pending
if (state != JobState.PENDING) {
return;
}
// the limit of job will be restrict when begin txn
beginTxn();
unprotectedExecuteJob();
// update spark load job state from PENDING to ETL when pending task is finished
if (jobType != EtlJobType.SPARK) {
unprotectedUpdateState(JobState.LOADING);
}
}
1
2

+ SparkLoadJob.java -> unprotectedExecuteJob

protected void unprotectedExecuteJob() throws LoadException {
// create pending task
LoadTask task = new SparkLoadPendingTask(this, fileGroupAggInfo.getAggKeyToFileGroups(),
sparkResource, brokerDesc);
task.init();
idToTasks.put(task.getSignature(), task);
// 注意这里的线程池,初始化时只有5个核心线程(最大线程数),而队列长度只有1024,这里会影响任务长期处于pending状态,如果当前5个任务在下面的spark driver阶段一直等不到yarn资源,则一直处于这个线程执行状态,其他提交进来的任务执行在queue队列中等待
submitTask(Catalog.getCurrentCatalog().getPendingLoadTaskScheduler(), task);
……….
// this.pendingLoadTaskScheduler = new LeaderTaskExecutor(“pending_load_task_scheduler”, Config.max_broker_load_job_concurrency,Config.desired_max_waiting_jobs, !isCheckpointCatalog);
}
1
2
3

+ SparkLoadPendingTask.java -> init
// 初始化任务的配置参数

public void init() throws LoadException {
createEtlJobConf();
}
1
2
3

+ LoadTask -> exec
// 由于SparkLoadTask继承自LoadTask,而LoadTask实现了PriorityRunnable接口,所以上面的submitTask实际上就是自动执行这里的exec方法

@Override
protected void exec() {
boolean isFinished = false;
try {
// execute pending task
executeTask();
// callback on pending task finished
callback.onTaskFinished(attachment);
isFinished = true;
………
}
1
2
3
4

#### load任务处理etl阶段

+ SparkLoadPendingTask.java -> executeTask

void executeTask() throws LoadException {
LOG.info(“begin to execute spark pending task. load job id: {}”, loadJobId);
submitEtlJob();
}
1
2

+ SparkLoadPendingTask.java -> submitEtlJob

private void submitEtlJob() throws LoadException {
SparkPendingTaskAttachment sparkAttachment = (SparkPendingTaskAttachment) attachment;
// 配置etl阶段清洗出的文件在hdfs上的存放路径
etlJobConfig.outputPath = EtlJobConfig.getOutputPath(resource.getWorkingDir(), dbId, loadLabel, signature);
sparkAttachment.setOutputPath(etlJobConfig.outputPath);

// 提交etl任务
SparkEtlJobHandler handler = new SparkEtlJobHandler();
handler.submitEtlJob(loadJobId, loadLabel, etlJobConfig, resource, brokerDesc, sparkLoadAppHandle,
        sparkAttachment);
LOG.info("submit spark etl job success. load job id: {}, attachment: {}", loadJobId, sparkAttachment);

}

1
2

+ SparkEtlJobHandler.java -> submitEtlJob

public void submitEtlJob(long loadJobId, String loadLabel, EtlJobConfig etlJobConfig, SparkResource resource,
BrokerDesc brokerDesc, SparkLoadAppHandle handle, SparkPendingTaskAttachment attachment)
throws LoadException {
// delete outputPath
// init local dir
// prepare dpp archive
SparkLauncher launcher = new SparkLauncher(envs);
// master | deployMode
// ——————|——————-
// yarn | cluster
// spark://xx | client
launcher.setMaster(resource.getMaster())
.setDeployMode(resource.getDeployMode().name().toLowerCase())
.setAppResource(appResourceHdfsPath)
.setMainClass(SparkEtlJob.class.getCanonicalName())
.setAppName(String.format(ETL_JOB_NAME, loadLabel))
.setSparkHome(sparkHome)
.addAppArgs(jobConfigHdfsPath)
.redirectError();

// spark configs

// start app
State state = null;
String appId = null;
String logPath = null;
String errMsg = “start spark app failed. error: “;
try {
// 提交spark任务给yarn
Process process = launcher.launch();
handle.setProcess(process);
if (!FeConstants.runningUnitTest) {
// 监听spark driver输出的日志,直到任务被yarn接收后,退出driver进程
SparkLauncherMonitor.LogMonitor logMonitor = SparkLauncherMonitor.createLogMonitor(handle);
logMonitor.setSubmitTimeoutMs(GET_APPID_TIMEOUT_MS);
logMonitor.setRedirectLogPath(logFilePath);
logMonitor.start();
try {
logMonitor.join();
} catch (InterruptedException e) {
logMonitor.interrupt();
throw new LoadException(errMsg + e.getMessage());
}
}
appId = handle.getAppId();
state = handle.getState();
logPath = handle.getLogPath();
} catch (IOException e) {
LOG.warn(errMsg, e);
throw new LoadException(errMsg + e.getMessage());
}
……….
// success
attachment.setAppId(appId);
attachment.setHandle(handle);
}

1
2
3
4


#### load任务处理loading阶段
+ 1、spark load 有一个 LoadEtlChecker定时调度任务,每5s执行一次

protected void runAfterCatalogReady() {
try {
loadManager.processEtlStateJobs();
} catch (Throwable e) {
LOG.warn(“Failed to process one round of LoadEtlChecker with error message {}”, e.getMessage(), e);
}
}
1
2

+ 2、这个定时调度就是定时检查load任务状态,有变化就去更新,简单说就是load状态扭转控制器

// only for those jobs which have etl state, like SparkLoadJob
public void processEtlStateJobs() {
idToLoadJob.values().stream().filter(job -> (job.jobType == EtlJobType.SPARK && job.state == JobState.ETL))
.forEach(job -> {
try {
((SparkLoadJob) job).updateEtlStatus();
} catch (DataQualityException e) {
LOG.info(“update load job etl status failed. job id: {}”, job.getId(), e);
job.cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_QUALITY_UNSATISFIED,
DataQualityException.QUALITY_FAIL_MSG),
true, true);
} catch (TimeoutException e) {
// timeout, retry next time
LOG.warn(“update load job etl status failed. job id: {}”, job.getId(), e);
} catch (UserException e) {
LOG.warn(“update load job etl status failed. job id: {}”, job.getId(), e);
job.cancelJobWithoutCheck(new FailMsg(CancelType.ETL_RUN_FAIL, e.getMessage()), true, true);
} catch (Exception e) {
LOG.warn(“update load job etl status failed. job id: {}”, job.getId(), e);
}
});
}
1
2

+ 3、当任务状态走到loading时,会提交load任务,函数是submitPushTasks(),这里简单说就是把表的排序健/分区/be副本等查询出来,后面要把这些元数据通过rpc调用be来处理

if (tablet instanceof LocalTablet) {
for (Replica replica : ((LocalTablet) tablet).getImmutableReplicas()) {
long replicaId = replica.getId();
tabletAllReplicas.add(replicaId);
long backendId = replica.getBackendId();
Backend backend = GlobalStateMgr.getCurrentState().getCurrentSystemInfo()
.getBackend(backendId);

    pushTask(backendId, tableId, partitionId, indexId, tabletId,
            replicaId, schemaHash, params, batchTask, tabletMetaStr,
            backend, replica, tabletFinishedReplicas, TTabletType.TABLET_TYPE_DISK);
}

if (tabletAllReplicas.size() == 0) {
    LOG.error("invalid situation. tablet is empty. id: {}", tabletId);
}

// check tablet push states
if (tabletFinishedReplicas.size() >= quorumReplicaNum) {
    quorumTablets.add(tabletId);
    if (tabletFinishedReplicas.size() == tabletAllReplicas.size()) {
        fullTablets.add(tabletId);
    }
}

}

1
2

+ 4、遍历生成出的每个tablet,再去遍历每个副本replica,都会给每个replica提交一个load任务,那这里要注意的是会构造

PushTask pushTask = new PushTask(backendId, dbId, tableId, partitionId,
indexId, tabletId, replicaId, schemaHash,
0, id, TPushType.LOAD_V2,
TPriority.NORMAL, transactionId, taskSignature,
tBrokerScanRange, params.tDescriptorTable,
params.useVectorized, timezone, tabletType);
if (AgentTaskQueue.addTask(pushTask)) {
batchTask.addTask(pushTask);
if (!tabletToSentReplicaPushTask.containsKey(tabletId)) {
tabletToSentReplicaPushTask.put(tabletId, Maps.newHashMap());
}
tabletToSentReplicaPushTask.get(tabletId).put(replicaId, pushTask);
}
1
2

+ 5、会通过thrift rpc 通知这个副本的be,主要是这个地方client.submit_tasks(agentTaskRequests);

// create AgentClient
address = new TNetworkAddress(backend.getHost(), backend.getBePort());
client = ClientPool.backendPool.borrowObject(address);
List agentTaskRequests = new LinkedList();
for (AgentTask task : tasks) {
agentTaskRequests.add(toAgentTaskRequest(task));
}
client.submit_tasks(agentTaskRequests);
1
2

+ 6、这个submit_tasks方法主要是调用BackendService类,这个类实现在be这一侧

public com.starrocks.thrift.TAgentResult submit_tasks(java.util.List tasks) throws org.apache.thrift.TException
{
send_submit_tasks(tasks);
return recv_submit_tasks();
}
1
2
3
4
5
6
7
8
9
10
11
这个submit_tasks方法是 Thrift 生成的 Java 客户端代码。它和 C++ 服务端的对应关系是:
1) fe发送 Thrift RPC 请求,调用这个 Java submit_tasks方法。
2) Thrift框架会将参数 tasks 序列化成数据包,发送给服务端be。
3) be服务端 Thrift 接收到数据包,反序列化出 tasks 参数。
4) 根据方法名submit_tasks映射到 C++ 的 BackendService::submit_tasks 方法,将 tasks 作为参数调用它。
5) C++ 方法执行完成,将返回值通过 Thrift 序列化后发送给客户端。
6) Java 端 recv_submit_tasks 反序列化出返回结果,赋值给 submit_tasks 方法的返回值。
7) submit_tasks 返回结果,Java 前端获得调用结果。

#### be部分如何处理loading的代码逻辑
+ 1、BackendService的rpc服务在启动be时已开启

// Begin to start services
// 1. Start thrift server with ‘be_port’.
auto thrift_server = BackendService::create(exec_env, starrocks::config::be_port);
if (auto status = thrift_server->start(); !status.ok()) {
LOG(ERROR) << “Fail to start BackendService thrift server on port “ << starrocks::config::be_port << “: “
<< status;
starrocks::shutdown_logging();
exit(1);
}
1
2
3


+ 2、客户端BackendServiceClient::submit_tasks()会序列化tasks参数,然后发送RPC请求到服务端,服务端的RPC框架收到请求,根据方法名映射到BackendService::submit_tasks()

void BackendService::submit_tasks(TAgentResult& return_value, const std::vector& tasks) {
_agent_server->submit_tasks(return_value, tasks);
}
1
2
3


+ 3、而agent_server里处理submit_tasks的核心逻辑是再次调用PushTaskWorkerPool下的submit_tasks

// batch submit push tasks
if (!push_divider.empty()) {
LOG(INFO) << “begin batch submit task: “ << tasks[0].task_type;
for (const auto& push_item : push_divider) {
const auto& push_type = push_item.first;
auto all_push_tasks = push_item.second;
switch (push_type) {
case TPushType::LOAD_V2:
_push_workers->submit_tasks(all_push_tasks);
break;
case TPushType::DELETE:
case TPushType::CANCEL_DELETE:
_delete_workers->submit_tasks(all_push_tasks);
break;
default:
ret_st = Status::InvalidArgument(strings::Substitute(“tasks(type=$0, push_type=$1) has wrong task type”,
TTaskType::PUSH, push_type));
LOG(WARNING) << “fail to batch submit push task. reason: “ << ret_st.get_error_msg();
}
}
}
1
2

+ 4、在TaskWorkPool里处理submit_tasks的,主要是实现批量注册task信息,做去重和幂等校验,成功后进行遍历,核心函数是_convert_task

for (size_t i = 0; i < task_count; i++) {
if (failed_task[i] == 0) {
auto new_task = _convert_task(*tasks[i], recv_time);
_tasks.emplace_back(std::move(new_task));
}
}
1
2

+ 5、_convert_task它会将TAgentTaskRequest转换成PushReqAgentTaskRequest类型的任务请求,这个是通用线程池,构造函数里回调了_worker_thread_callback,构造出的EngineBatchLoadTask对象后取执行task

EngineBatchLoadTask engine_task(push_req, &tablet_infos, agent_task_req->signature, &status,
ExecEnv::GetInstance()->load_mem_tracker());
StorageEngine::instance()->execute_task(&engine_task);
1
2

+ 6、在EngineBatchLoadTask::execute()里追踪了内存使用后,会检查tablet是否存在,目录是否达到存储限制等,继续执行load_v2(sparkload/brokerload)的_process,而_process也是做了下校验后提交到_push里

if (status == STARROCKS_SUCCESS) {
uint32_t retry_time = 0;
while (retry_time < PUSH_MAX_RETRY) {
status = _process();

    if (status == STARROCKS_PUSH_HAD_LOADED) {
        LOG(WARNING) << "transaction exists when realtime push, "
                        "but unfinished, do not report to fe, signature: "
                        << _signature;
        break; // not retry anymore
    }
    // Internal error, need retry
    if (status == STARROCKS_ERROR) {
        LOG(WARNING) << "push internal error, need retry.signature: " << _signature;
        retry_time += 1;
    } else {
        break;
    }
}

}

1
2

+ 7、在_push()里初始化了PushHandler类,现在才开始处理数据的摄入过程

vectorized::PushHandler push_handler;
res = push_handler.process_streaming_ingestion(tablet, request, type, tablet_info_vec);
1
2

+ 8、在核心逻辑_do_streaming_ingestion函数里,开始检查这个tablet是否能迁移,能的话,拿回tablet的写锁后做load转换过程

Status st = Status::OK();
if (push_type == PUSH_NORMAL_V2) {
st = _load_convert(tablet_vars->at(0).tablet, &(tablet_vars->at(0).rowset_to_add));
}
1
2

+ 9、而在_load_convert里就是初始化出RowsetWriterContext这个上下文对象和RowsetWriter,RowsetWriter对象主要是需要去读出tablet的每一行,用这个wirter写回

st = reader->init(t_scan_range, _request);
if (!st.ok()) {
LOG(WARNING) << “fail to init reader. res=” << st.to_string() << “, tablet=” << cur_tablet->full_name();
return st;
}
1
2

+ 10、而reader的初始化就是构造PushBrokerReader对象,在PushBrokerReader里会按照文件类型构造出扫描对象,们目前通过sparkload处理的数据,都是使用parquet格式,所以这个初始化就是初始化出ParquetScanner

// init scanner
FileScanner* scanner = nullptr;
switch (t_scan_range.ranges[0].format_type) {
case TFileFormatType::FORMAT_PARQUET: {
scanner = new ParquetScanner(_runtime_state.get(), _runtime_profile, t_scan_range, _counter.get());
if (scanner == nullptr) {
return Status::InternalError(“Failed to create scanner”);
}
break;
}
1
2

+ 11、再回到PushHandler里的_load_conver函数的读取tablet过程,也是按行来去读一个个chunk,而这个next_chunk在PushBrokerReader里又是如何处理的呢?

ChunkPtr chunk = ChunkHelper::new_chunk(schema, 0);
while (!reader->eof()) {
st = reader->next_chunk(&chunk);
if (!st.ok()) {
LOG(WARNING) << “fail to read next chunk. err=” << st.to_string() << “, read_rows=” << num_rows;
return st;
} else {
if (reader->eof()) {
break;
}

    st = rowset_writer->add_chunk(*chunk);
    if (!st.ok()) {
        LOG(WARNING) << "fail to add chunk to rowset writer"
                        << ". res=" << st << ", tablet=" << cur_tablet->full_name()
                        << ", read_rows=" << num_rows;
        return st;
    }

    num_rows += chunk->num_rows();
    chunk->reset();
}

}

1
2

+ 12、next_chunk函数会通过scanner对象,去读每行,直到读完为止,再把读出来的结果转换成chunk

auto res = _scanner->get_next();
if (res.status().is_end_of_file()) {
_eof = true;
return Status::OK();
} else if (!res.ok()) {
return res.status();
}

return _convert_chunk(res.value(), chunk);

1
2

+ 13、在parquet_scanner里是如何读每个行的呢?当然是按批来读,把读出的批追加到chunk去,chunk满了就返回,这种方式就是通用的流式数据处理,避免了一次新把整个parquet读到内存里,在这里主要关注到next_batch函数,就是Parquet文件的读取流程

const TBrokerRangeDesc& range_desc = _scan_range.ranges[_next_file];
Status st = create_random_access_file(range_desc, _scan_range.broker_addresses[0], _scan_range.params,
CompressionTypePB::NO_COMPRESSION, &file);

14、而在create_random_access_file函数由于parquet_scanner是继承自file_scanner,统一回来按文件类型去创建RandomAccessFile的逻辑
int64_t timeout_ms = _state->query_options().query_timeout * 1000 / 4;
timeout_ms = std::max(timeout_ms, static_cast(DEFAULT_TIMEOUT_MS));
BrokerFileSystem fs_broker(address, params.properties, timeout_ms);
if (config::use_local_filecache_for_broker_random_access_file) {
ASSIGN_OR_RETURN(auto broker_file, fs_broker.new_sequential_file(range_desc.path));

std::string dest_path = create_tmp_file_path();
LOG(INFO) << "broker load cache file: " << dest_path;
ASSIGN_OR_RETURN(auto dest_file, FileSystem::Default()->new_writable_file(dest_path));

auto res = fs::copy(broker_file.get(), dest_file.get(), 10 * 1024 * 1024);
std::shared_ptr<RandomAccessFile> local_file;
ASSIGN_OR_RETURN(local_file, FileSystem::Default()->new_random_access_file(dest_path));
src_file = std::make_shared<TempRandomAccessFile>(dest_path, local_file);

}

1
2

+ 15、使用BrokerFileSystem通过broker来打开文件(这里的address就是我们在fe节点里创建tBrokerScanRange传入的),用broker把hdfs上的文件下载到本地临时目录下,再打开本地临时目录

Status st = call_method(_broker_addr, &BrokerServiceClient::openReader, request, &response);
if (!st.ok()) {
LOG(WARNING) << “Fail to open “ << path << “: “ << st;
return st;
}
if (response.opStatus.statusCode != TBrokerOperationStatusCode::OK) {
LOG(WARNING) << “Fail to open “ << path << “: “ << response.opStatus.message;
return to_status(response.opStatus);
}

// Get file size.
ASSIGN_OR_RETURN(const uint64_t file_size, get_file_size(path));
auto stream = std::make_shared(_broker_addr, response.fd, file_size);
return std::make_unique(std::move(stream), path);

1
2
3
4

注意,这里又是通过rpc来调用broker了

+ 16、再回来看parquet_scanner里的get_next()函数,就是整个parquet文件的核心逻辑,而核心里的调用就是初始化chunk的逻辑

Status ParquetScanner::initialize_src_chunk(ChunkPtr chunk) {
SCOPED_RAW_TIMER(&_counter->init_chunk_ns);
_pool.clear();
(
chunk) = std::make_shared();
size_t column_pos = 0;
_chunk_filter.clear();
for (auto i = 0; i < _num_of_columns_from_file; ++i) {
SlotDescriptor slot_desc = _src_slot_descriptors[i];
if (slot_desc == nullptr) {
continue;
}
auto
array = _batch->column(column_pos++).get();
ColumnPtr column;
RETURN_IF_ERROR(new_column(array->type().get(), slot_desc, &column, &_conv_funcs[i], &_cast_exprs[i]));
column->reserve(_max_chunk_size);
(*chunk)->append_column(column, slot_desc->id());
}
return Status::OK();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
1) 清空内存池
2) 创建新的空chunk
3) 遍历所有columns:
3.1、如果column在schema中,获取对应的SlotDescriptor
3.2、根据数组类型和SlotDescriptor创建新的Column
3.3、预留chunk大小内存
3.4、将Column添加到chunk
4) 返回OK状态
这样就构造了一个空的、符合schema的chunk。
主要逻辑是:
- 清理内存池重用
- 创建空chunk
- 为每个column创建对应的Column对象
- 预留内存
- 添加到chunk
这是非常标准的按schema构建chunk的过程。
这种实现可以高效构建chunk,同时预留内存减少后续内存分配,并可以重用内存池避免频繁new/delete。
构建一个符合schema并预分配内存的空chunk是vectorized执行的基础,这个初始化实现是非常重要的一步


### 演进之路
> 上面主要解读了整个离线导入过程的核心代码,在离线导入模块集成到同步中心后,也遇到了一些问题,进行了3次升级来支持不同业务侧对离线导入的需求,下面主要介绍下几个大feature的升级

#### 国际化业务需要字符串分区
滴滴内部的国际化业务,由于不同国家有时区问题,所以业务侧在hive建表时总会创建出dt=yyyy-mm-dd/country_code=xx的表结构,但如果要将这种hive表结构导入到sr,在当前官方版本都是不支持的,一直到3.0+版本我们和社区一起共建了string类型分区这个feature,并且在离线导入这一块进行了深度迭代升级,用以支撑国际化业务的快速落地sr,这个feature我们已经和入到官方版本中,当前社区版本需要在3.0+才可以支持string类型分区,而滴滴内部在2.3和2.5都已经支持了string类型分区功能,如下表结构

CREATE TABLE db.tb (
is_passenger varchar(65533),
country_code varchar(65533),
dt varchar(65533),
pid varchar(65533),
resource_stage_1_show_cnt bigint(20)
) ENGINE=OLAP
DUPLICATE KEY(is_passenger, country_code, dt)
COMMENT “xxxx”
PARTITION BY LIST(country_code,dt)(
PARTITION pMX_20231208 VALUES IN ((‘MX’, ‘2023-12-08’))
)
DISTRIBUTED BY HASH(pid) BUCKETS 48
PROPERTIES (
“replication_num” = “3”,
“in_memory” = “false”,
“storage_format” = “DEFAULT”,
“enable_persistent_index” = “false”
);
1
2
3

#### spark任务提交由于yarn资源缺乏只能等待5分钟便终止等待load退出
滴滴内部yarn资源队列core和memory均做了一定限制,在降本增效的前提下,很多业务侧无法再去扩容yarn队列,而sr官方社区版本在spark任务提交给yarn之后,默认只给等待5分钟,如果始终等不到yarn接收任务,则自动退出,针对这种特定场景,我们优化了load任务的执行流程,引入了`"spark_load_submit_timeout" = "7200"`参数,在每个load任务提交后都可以自动化配置这个任务需要等待yarn多少时间,并且如果spark任务确实等待超时后无法提交,也加入了通知yarn去杀死这个等待的spark任务,防止后续等待到yarn资源后又再次提交执行,而sr这一侧load已经退出,白白浪费资源,如下load结构

LOAD LABEL db.lable_name_xxx (
DATA INFILE(
“hdfs://DClusterUS1/xxxxx”
)
INTO TABLE tb
TEMPORARY PARTITION(temppartition)
FORMAT AS “ORC”
(column_1, column_2)
SET (
column_1 = if(column_1 is null or column_1 = “null”, null, column_1),
column_2 = if(column_2 is null or column_2 = “null”, null, column_2),
dt = “2023-12-09”,
country_code = “MX”
)
) WITH RESOURCE ‘external_spark_resource’ (
“spark.yarn.tags” = “xxxxxxx”,
“spark.dynamicAllocation.enabled” = “true”,
“spark.executor.memory” = “3g”,
“spark.executor.memoryOverhead” = “2g”,
“spark.streaming.batchDuration” = “5”,
“spark.executor.cores” = “1”,
“spark.yarn.executor.memoryOverhead” = “2g”,
“spark.speculation” = “false”,
“spark.dynamicAllocation.minExecutors” = “2”,
“spark.dynamicAllocation.maxExecutors” = “100”
) PROPERTIES (
“timeout” = “36000”,
“spark_load_submit_timeout” = “7200”
)
1
2
3
4
5
6
7

#### 由于spark driver进程在fe节点中占用过多内存,会导致cgroup自动杀死fe进程
目前我们的k8s集群对fe节点的配置只有12g,所以大批量spark driver进程在fe节点中,按照之前统计每个dirver进程大约会占用200-300mb,而fe进程的元数据占用内存大约在5g-10g之间,所以留给spark driverfe进程的可用内存其实非常少,但是目前大量业务侧使用离线导入功能,这样就会造成fe进程不定时被kill的困局,针对这个场景,我们开发了load限流功能,将并发提交的任务转入限流队列中,由队列资源来控制当前任务的并行度,在fe节点内存资源达到高峰期时,进行相应的限流策略,用以保障fe节点的稳定性,如下所示
![架构图](/images/starrocks/hive2sr/structure.png)

#### 由于hivesql写入的hive表产出的hdsf文件自动生成的格式是_col1..._coln,在通过字段导入时会造成hive表字段和sr表字段映射失败
在离线导入功能演进过程中,这个场景在大量业务侧被迫切提出需求,业务侧由于业务口径的变更,需要对hive表进行修改,而一修改hive表结构,历史数据回溯时就会出现分区映射的hdfs文件无法正确映射成功,这个场景和通过hive sql导入hive表时一样的,hive和sr之间字段无法映射成功,就造成导入失败,针对这一场景我们升级了离线导入模块,引入了spark sql模式来读取hive表,而不是之前的spark 文件方式读取,这种方式成功解决了用户变更表字段回溯场景和hive sql导入场景,如下load结构:

LOAD LABEL db.label_name (
DATA FROM TABLE hive_tb
INTO TABLE sr_tb
TEMPORARY PARTITION(temp
partition)
SET (
column_1 = if(column_1 is null or column_1 = “null”, null, column_1),
column_2 = if(column_2 is null or column_2 = “null”, null, column_2),
…….
)
WHERE (dt = ‘2023-12-09’)
)WITH RESOURCE ‘spark_external_resource’ (
“spark.yarn.tags” = “h2s_foit_150748320231209914ff11b87c94c85947ab13f84ff4622”,
“spark.dynamicAllocation.enabled” = “true”,
“spark.executor.memory” = “3g”,
“spark.executor.memoryOverhead” = “2g”,
“spark.streaming.batchDuration” = “5”,
“spark.executor.cores” = “1”,
“spark.yarn.executor.memoryOverhead” = “2g”,
“spark.speculation” = “false”,
“spark.dynamicAllocation.minExecutors” = “2”,
“spark.dynamicAllocation.maxExecutors” = “100”
) PROPERTIES (
“timeout” = “36000”,
“spark_load_submit_timeout” = “7200”
)
```
注意:目前spark sql导入只在明细模型(duplicate key)和聚合模型(aggregate key)上支持

未来规划

目前starrocks离线导入功能已经集成在滴滴的同步中心,用户可以通过同步中心按hive表结构自动创建sr表结构,并且配置出hive表字段和sr表字段的映射关系后,构造出导入配置相应参数,调用sr的离线导入模块进行数据导入,而这个导入功能我们在2024年也会有以下规划:

spark sql支持主键模型(primary key)

目前有一部分业务侧会通过主键模型将hive分区数据定时导回给sr,当前还不支持spark sql方式导入,2024年我们会进行这个feature开发

独立sr构建工具

当前的etl构建过程仍然需要在fe进程中进行交互处理,但fe节点我们在k8s初始化时只有12g,虽然在演进之路里我们加入了限流功能来保障fe稳定性,但是这个方式也会将hive导入耗时加大,对于一些需要高优保障的任务仍然需要按时产出,所以我们规划在2024年可以将这个构建工具从sr的fe进程中独立出来,单独部署在构建集群中,在完成etl阶段后,通过rpc方式通知给sr的fe进程来拉取

hive外表联邦查询

当前很多业务侧的hive表分区压缩后也会在100g+,这种case导入到sr后查询性能也不一定会很好,但是导入过程却很浪费资源,我们计划在2024年完成hive外表的集成功能,让用户可以直接通过sr来查询hive表,而不需要再进行分区导入这一部分操作

如何看查占用cpu最多的进程?

  • 方法一
    核心指令:ps
    实际命令:

    1
    ps H -eo pid,pcpu | sort -nk2 | tail

    执行效果如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    [work@test01 ~]$ ps H -eo pid,pcpu | sort -nk2 | tail
    31396 0.6
    31396 0.6
    31396 0.6
    31396 0.6
    31396 0.6
    31396 0.6
    31396 0.6
    31396 0.6
    30904 1.0
    30914 1.0

    结果:
    瞧见了吧,最耗cpu的pid=30914。
    画外音:实际上是31396。

  • 方法二
    核心指令:top
    实际命令:

    1
    2
    top
    Shift + t

    找到了最耗CPU的进程ID,对应的服务名是什么呢?

  • 方法一
    核心指令:ps
    实际命令:

    1
    ps aux | fgrep pid

    执行效果如下:

    1
    2
    [work@test01 ~]$ ps aux | fgrep 30914
    work 30914 1.0 0.8 309568 71668 ? Sl Feb02 124:44 ./router2 –conf=rs.conf

    结果:
    瞧见了吧,进程是./router2

  • 方法二
    直接查proc即可。
    实际命令:

    1
    ll /proc/pid

    执行效果如下:

    1
    2
    3
    [work@test01 ~]$ ll /proc/30914
    lrwxrwxrwx 1 work work 0 Feb 10 13:27 cwd -> /home/work/im-env/router2
    lrwxrwxrwx 1 work work 0 Feb 10 13:27 exe -> /home/work/im-env/router2/router2

    画外音:这个好,全路径都出来了。

如何查看某个端口的连接情况?

  • 方法一
    核心指令:netstat
    实际命令:

    1
    netstat -lap | fgrep port

    执行效果如下:

    1
    2
    3
    4
    [work@test01 ~]$ netstat -lap | fgrep 22022
    tcp 0 0 1.2.3.4:22022 *:* LISTEN 31396/imui
    tcp 0 0 1.2.3.4:22022 1.2.3.4:46642 ESTABLISHED 31396/imui
    tcp 0 0 1.2.3.4:22022 1.2.3.4:46640 ESTABLISHED 31396/imui
  • 方法二
    核心指令:lsof
    实际命令:

    1
    lsof -i :port

    执行效果如下:

    1
    2
    3
    4
    5
    [work@test01 ~]$ /usr/sbin/lsof -i :22022
    COMMAND PID USER FD TYPE DEVICE SIZE NODE NAME
    router 30904 work 50u IPv4 69065770 TCP 1.2.3.4:46638->1.2.3.4:22022 (ESTABLISHED)
    router 30904 work 51u IPv4 69065772 TCP 1.2.3.4:46639->1.2.3.4:22022 (ESTABLISHED)
    router 30904 work 52u IPv4 69065774 TCP 1.2.3.4:46640->1.2.3.4:22022 (ESTABLISHED)

    学废了吗?

  • 2022年仓皇而逃,在恍惚间我仍然有一丝怀疑,此时此刻是2019年的冬天,我们在准备着即将到来到2020年,订好了机票,订好了酒店,在跨年夜的云层之上,陌生人在沉睡,而我即将见到相隔数月思念日日夜夜的人儿。

  • 这种怀疑在这3年间时不时的击碎我,重建我,沉溺我,到最后清醒我,时间过得太快了啊,快到迅雷烈风,快到掩耳而逝,我不是还在17/8岁的高中课堂上汗流浃背准备高考吗?怎么这一眨眼之间,我竟已是前额白发丝丝黑眼圈蜡黄脸的中年人了呢?

  • 人世间的痛不知道何时能结束,人世间的离愁不知道何时能消散,虚度30余载光阴,到如今即将到来的第二个本命年,我有何成就吗?我有实现过理想吗?我活着的这些时刻,有给这个世界带来什么美好吗?我不自知只觉惭愧。

  • 麻木的躯壳麻木的灵魂,肤浅的知识肤浅的见识,2022年在核酸、隔离、阳性中结束,这浅浅6个字结束了我的又一个365天,我是谁?我在2022年都做了什么事?有什么可以拿说来细说一二吗?有什么不足和遗憾吗?希望2023年可以改变吗?希望改变什么呢?

  • 没有答案,我甚至连流水账都写不出来,哦,我这个可笑的中年人。

背景介绍

clickhouse是分布式系统,一条查询sql会经过pipeline处理后通过后台的查询线程池开启多线程查询,而经常会收到cpu飙高告警,则是因为这条sql开启了系统所有的cpu资源(多个线程)进行计算

告警图

如下图所示,提示cpu飙高
clickhouse
如下图所示,ganglia上展示了当时的cpu顶着线在跑
clickhouse

system.query_log

地址,该表包含了所有进入到ck的sql语句。
该表包含字段如下:

  • type (Enum8) — 执行查询时的事件类型. 值:
    • ‘QueryStart’ = 1 — 查询成功启动.
    • ‘QueryFinish’ = 2 — 查询成功完成.
  • ‘ExceptionBeforeStart’ = 3 — 查询执行前有异常.
  • ‘ExceptionWhileProcessing’ = 4 — 查询执行期间有异常.
  • event_date (Date) — 查询开始日期.
  • event_time (DateTime) — 查询开始时间.
  • event_time_microseconds (DateTime64) — 查询开始时间(毫秒精度).
  • query_start_time (DateTime) — 查询执行的开始时间.
  • query_start_time_microseconds (DateTime64) — 查询执行的开始时间(毫秒精度).
  • query_duration_ms (UInt64) — 查询消耗的时间(毫秒).
  • read_rows (UInt64) — 从参与了查询的所有表和表函数读取的总行数. 包括:普通的子查询, IN 和 JOIN的子查询. 对于分布式查询 read_rows 包括在所有副本上读取的行总数。 每个副本发送它的 read_rows 值,并且查询的服务器-发起方汇总所有接收到的和本地的值。 缓存卷不会影响此值。
  • read_bytes (UInt64) — 从参与了查询的所有表和表函数读取的总字节数. 包括:普通的子查询, IN 和 JOIN的子查询. 对于分布式查询 read_bytes 包括在所有副本上读取的字节总数。 每个副本发送它的 read_bytes 值,并且查询的服务器-发起方汇总所有接收到的和本地的值。 缓存卷不会影响此值。
  • written_rows (UInt64) — 对于 INSERT 查询,为写入的行数。 对于其他查询,值为0。
  • written_bytes (UInt64) — 对于 INSERT 查询时,为写入的字节数。 对于其他查询,值为0。
  • result_rows (UInt64) — SELECT 查询结果的行数,或INSERT 的行数。
  • result_bytes (UInt64) — 存储查询结果的RAM量.
  • memory_usage (UInt64) — 查询使用的内存.
  • query (String) — 查询语句.
  • exception (String) — 异常信息.
  • exception_code (Int32) — 异常码.
  • stack_trace (String) — Stack Trace. 如果查询成功完成,则为空字符串。
  • is_initial_query (UInt8) — 查询类型. 可能的值:
    • 1 — 客户端发起的查询.
    • 0 — 由另一个查询发起的,作为分布式查询的一部分.
  • user (String) — 发起查询的用户.
  • query_id (String) — 查询ID.
  • address (IPv6) — 发起查询的客户端IP地址.
  • port (UInt16) — 发起查询的客户端端口.
  • initial_user (String) — 初始查询的用户名(用于分布式查询执行).
  • initial_query_id (String) — 运行初始查询的ID(用于分布式查询执行).
  • initial_address (IPv6) — 运行父查询的IP地址.
  • initial_port (UInt16) — 发起父查询的客户端端口.
  • interface (UInt8) — 发起查询的接口. 可能的值:
    • 1 — TCP.
    • 2 — HTTP.
  • os_user (String) — 运行 clickhouse-client的操作系统用户名.
  • client_hostname (String) — 运行clickhouse-client 或其他TCP客户端的机器的主机名。
  • client_name (String) — clickhouse-client 或其他TCP客户端的名称。
  • client_revision (UInt32) — clickhouse-client 或其他TCP客户端的Revision。
  • client_version_major (UInt32) — clickhouse-client 或其他TCP客户端的Major version。
  • client_version_minor (UInt32) — clickhouse-client 或其他TCP客户端的Minor version。
  • client_version_patch (UInt32) — clickhouse-client 或其他TCP客户端的Patch component。
  • http_method (UInt8) — 发起查询的HTTP方法. 可能值:
    • 0 — TCP接口的查询.
    • 1 — GET
    • 2 — POST
  • http_user_agent (String) — http请求的客户端参数
  • quota_key (String) — 在quotas 配置里设置的“quota key” (见 keyed).
  • revision (UInt32) — ClickHouse revision.
  • ProfileEvents (Map(String, UInt64))) — 不同指标的计数器
  • Settings (Map(String, String)) — 当前请求里的setting部分参数
  • thread_ids (Array(UInt64)) — 参与查询的线程数.
  • Settings.Names (Array(String)) — 客户端运行查询时更改的设置的名称。 要启用对设置的日志记录更改,请将log_query_settings参数设置为1。
  • Settings.Values (Array(String)) — Settings.Names 列中列出的设置的值。

解决过程

  • 查询当前时间内耗cpu最高的sql前10条
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    select initial_user, event_time, query,
    read_rows, read_bytes,
    written_rows, written_bytes,
    result_rows, result_bytes,
    memory_usage, length(thread_ids) as thread_count
    from system.query_log
    WHERE event_time > '2022-10-27 18:30:00' AND event_time < '2022-10-27 18:35:00'
    and initial_user<>'default'
    order by thread_count desc
    limit 10;

背景介绍

clickhouse的ReplicatedMergeTree表是通过zookeeper来完成同一个shard之间的副本数据的同步,当*Table is in readonly mode的原因是zookeeper当下压力过大,这个可以从system.replication_queue看到同步队列的数量,或者通过system.part_log看当前part的处理数量

system.replicas

官方地址所解释,该表记录了驻留在本地服务器上的复制表的信息和状态,如下图所示,我们看到每个表的每个副本在zookeeper上的状态

1
select table,zookeeper_path,replica_path from `system`.replicas limit 10

clickhouse
如上图所示,zookeeper_path字段可以查看到在哪个shard上的哪个副本处于readonly状态。

system.part_log

官方地址,该表包含part操作的所有记录,包括drop/merge/download/new等。

1
2
3
4
5
SELECT *    
FROM system.part_log
WHERE concat(database, '.', table) = 'i9xiaoapp_stream.dwd_pope_core_action_diff_di_local'
and event_time >= '2023-01-15 12:00:00'
order by event_time desc

clickhouse

解决副本不同步问题

注意,此时先让用户把写入任务停掉,已方便观察!

  • 可以通过system.part_log查询mergepart状态的写入是否都已经完成
  • 根据system.replicas表给出的readonly状态的zookeeper_path,去到当前所指向的副本机器上,删除这个副本的表
    1
    drop table i9xiaoapp_stream.dwd_pope_core_action_diff_di_local
  • 操作之后,可以通过查询system.replicas里看是否还有处于readonly,发现已经消失
    1
    select table,zookeeper_path,replica_path from `system`.replicas where concat(database, '.', table) = 'i9xiaoapp_stream.dwd_pope_core_action_diff_di_local' limit 10
  • 再在出问题的副本上,重新创建该本,之后可通过system.replicas找到该副本再次出现,并且已恢复
    1
    create table
  • 如果还没恢复,则去对应出错的副本节点,去检查一下zookeeper上的对应路径是否也已经删除
    1
    rmr zookeeper_path
  • 此时再查询system.replicas表readonly的队列应该已经被清理掉了
    可以继续操作元数据修改

背景介绍

clickhouse的Replicated**MergeTree表是通过zookeeper来完成同一个shard之间的副本数据的同步,因为clickhouse本身不是master/slave的架构,我们通过proxy的方式,设定了同一个shard里某个副本是读/写节点角色,另一个节点是读角色,当用户写入时,proxy会路由到读/写节点去完成写入操作,然后通过zk发起同步任务,把日志进入到system.replcation_queue。
今天早上用户和我们反馈说数据查询时每一次结果都不一样,用户查询也是通过连我们的proxy进行读节点的路由,所以用户反馈的结果不一致,其实是第一次路由到读/写节点查询结果是a,第二次路由到读节点,查询结果是b,由于a和b两个副本数据没有同步,导致了查询结果不一致,下面是排查的过程。

system.replication_queue

官方地址所解释,该表记录了当前的副本任务队列的所有信息,如下图所示,我们看到当前副本同步出现大量异常错误

1
select * from system.replication_queue where data_files > 0

clickhouse
如下图所示,type字段可以查看到当前是什么类型的操作导致的,发现是MUTATE_PART操作,last_exception字段显示当前我们在操作ttl时创建的元数据目录下columns.txt无法打开,这个是已知bug。
clickhouse

system.mutations

官方地址,该表包含了所有mutation操作的日志信息,mutation操作包括修改字段类型/修改表ttl操作/按条件删表的数据/按条件更新表的数据等,这些操作都是异步后台线程去处理,都会去回溯该表的所有parts,需要rewrite每个part的信息并且这个操作还不是原子性的,所以如果某个节点操作失败,可能引发该表无法使用。

该表包含字段如下:

  • database (String) — 数据库名称.

  • table (String) — 表名称.

  • data_path (String) — 本地文件的路径.

  • mutation_id (String) — mutation的唯一标识,可通过该标识直接kill mutation.

  • command (String) — mutation命令.

  • create_time (DateTime) — mutation创建时间.

  • block_numbers (Map) — partition_id:需要进行mutation操作的分区id,number:需要进行mutation的分区对应的block序号.

  • parts_to_do_names (Array) — 即将完成的需要进行mutation的数组.

  • parts_to_do (Int64) — 准备进行mutation操作的part序号.

  • is_done (UInt8) — 该操作是否已完成.

  • latest_failed_part (String) — mutation操作最后失败的part名称.

  • latest_fail_time (DateTime) — 最后失败的时间.

  • latest_fail_reason (String) — 最后失败的原因.

1
select * from system.mutations where latest_failed_part != ''

clickhouse

根据异常错误的command字段,我们看到错误是通过MATERIALIZE TTL FAST 16070400引起的,mutation操作在写节点处理完成后,也会通过zookeeper进行副本数据同步

解决副本不同步问题

  • 终止该失败的mutation
    具体操作语句可以查看官方文档
    1
    kill mutation where database = 'xx' and table = 'yy' and mutation_id = 'zz';
  • 操作之后,可以通过查询shard里每个副本的count是否一致来判断数据是否已经进行同步
    1
    select count(dt), dt from xx.yy group by dt;
  • 操作之后,可以通过上面的system.replication_queue表来观察是否开始进行副本数据同步
    1
    select * from system.replication_queue where type='GET_PART' and database = 'xx' and table = 'yy'
  • 如果还没恢复,则去对应出错的副本节点,将本地表删除后重建(出错节点可以从上一步里看出来)
    1
    2
    drop table if exists xx.yy
    create table xx.yy
  • 此时再查询replication_queue表出错的队列应该已经被清理掉了
    可以继续操作元数据修改

背景介绍

我们知道语法分析器的作用是根据给定的形式文法对由词法单元(Token)序列构成的输入进行语法检查、并构建由输入的词法单元(Token)组成的数据结构(一般是语法分析树抽象语法树等层次化的数据结构)。而一提到语法解析目前市面上有很多语法解析器,其中解析sql更是数不胜数,例如最为人所知的antlr和jflex,而本文的主人公ClickHouse却自己去纯手工打造实现了一套sql解析器,本篇文章就来聊聊 ClickHouse 的纯手工解析器,看看它们的底层工作机制。

简单入门

首先来简单入门解决个小问题,那就是我们如何去连接ck,如何将query传递ck呢,如何设置传递给ck的query长度呢?

通过TCP方式请求

通过tcp方式使用clickhouse自己的客户端,连接clickhouse,在会话session里先使用set max_query_size=xx的方式让当前这个会话修改query的长度,如下图:

clickhouse

通过HTTP方式请求

通过http方式请求,http://ip:port/database?user=xx&password=yy&max_query_size=xx,ck会传递这个参数给setting重写
注意chproxy只允许最大max_query_size为512mb,超过此长度会直接报错

通过sql创建setting授权给登陆用户

  1. 创建setting profile
    1
    create settings profile if not exists role_max_query_size SETTINGS max_query_size = 100000000000;
  2. 将profile赋值给某个用户
    1
    grant role_max_query_size to prod_voyager_stats_events;

源码解析ck是如何处理max_query_size的

由于源码较多,只抽出具体实现函数进行源码讲解,本次讲解基于clickhouse v20.6.3.28-stable(该版本与最新版出入较大)。

clickhouse

  1. 如上图所示,在HTTPHandler.cpp下进行各种http的协议处理时,有一个变量叫HTMLForm类型的params,承载的是http请求里的uri,并且在代码的484行进行了此变量的处理,如下
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    for (const auto & [key, value] : params)
    {
    if (key == "database")
    {
    if (database.empty())
    database = value;
    }
    else if (key == "default_format")
    {
    if (default_format.empty())
    default_format = value;
    }
    else if (param_could_be_skipped(key))
    {
    }
    else
    {
    /// Other than query parameters are treated as settings.
    if (!customizeQueryParam(context, key, value))
    settings_changes.push_back({key, value});
    }
    }
  2. customizeQueryParam会判断该参数param是否等于query,如果是则不会进入setting的设置,再判断是否是param_开头的如果是则会传入context(理解为这次session会话中需要设置的各种上下文内容)则也不会进行setting处理,不是前面2个case则进行setting处理,重载系统默认的setting里的参数,如下图
    clickhouse
  3. 虽然第二步已经设置了setting,但注意代码的512行,这行代码会走向SettingsConstraints.cpp类的checkImpl校验逻辑里,有一些配置是不允许修改的,例如profile,例如配置就是如果已经通过grant授权配置了setting profile了,会去看这个用户的相关权限,如果不符合则会直接抛出exception,不再进行处理,注意这里还有一个问题,抛了异常后,不再将此次请求写入到system.query_log中,之后我们会修复此问题
    1
    2
    3
    /// For external data we also want settings
    context.checkSettingsConstraints(settings_changes);
    context.applySettingsChanges(settings_changes);
  4. 做完setting的约束校验后,都符合条件,则我们已经重载了setting里的max_query_size,之后就走入了executeQuery.cpp执行query逻辑,在它的构造函数里我们就可以看到query是根据max_query_size来读取的,如下图
    clickhouse

深入探知

介绍完了max_query_size的处理逻辑后,其实我们已经大致明白ck在query的处理流程是如何流转的,那么现在问题来了,我们知道可以通过select xx from tb SETTINGS max_query_size=12112这种方式传入自定义的setting参数,但是有些参数有生效,而select语法却对max_query_size不生效,原因是什么呢?好了,别着急现在我们就来解答ck是如何处理setting这层逻辑的。

为什么max_query_size的select中不生效?

clickhouse
原因很简单,只要我们读过了上面的流转过程,就知道max_query_size这个参数的处理系统默认是256kb,那么如果未通过uri方式传入max_query_size,则在截取query长度前,默认都是256kb,注意截取query时是还未进行ck的parser逻辑处理的,我们可以看到query里的setting是需要经过ck的parser解析后,才会重载进去(如下图6),所以呢如果你的select query在256kb范围内,则截取完整query后,经过ck的parser解析出ast树,是会带上新的setting,但此时已经没有意义了,而相反的如果你的query超过了256kb,则只截取到256kb前的query,此时setting也不会走到ParserSelectQuery里,同时因为你的query被不完整截取后,会直接报ast语法错误
图6

源码看解析器

1. HTTPHandler.cpp => processQuery

每一个http请求都在clickhouse都会起一个叫HTTPHandler的线程去处理,根据http请求header和body,初始化请求上下文环境:包括session、用户信息、当前database、响应信息等,另外还处理限流,用户权限,根据配置取到setting信息进行设置,本文重点是调用executeQuery方法处理query

1
2
3
4
5
6
7
8
9
executeQuery(*in, *used_output.out_maybe_delayed_and_compressed, /* allow_into_outfile = */ false, context,
[&response] (const String & current_query_id, const String & content_type, const String & format, const String & timezone)
{
response.setContentType(content_type);
response.add("X-ClickHouse-Query-Id", current_query_id);
response.add("X-ClickHouse-Format", format);
response.add("X-ClickHouse-Timezone", timezone);
}
);

2. executeQuery.cpp => executeQuery

从流中读出字节到buffer里,根据设置的max_query_size判断buffer是否已满,复制到LimitReadBuffer里,重点是执行executeQueryImpl,返回tuple类型的(ast, stream),从stream里提取出pipeline(流水线),根据ast构造出IBlockInputStream或者IBlockOutputStream,传给pipeline后执行pipeline的execute方法

1
std::tie(ast, streams) = executeQueryImpl(begin, end, context, false, QueryProcessingStage::Complete, may_have_tail, &istr);

2. executeQuery.cpp => executeQueryImpl

按照解析出的ast,构造出Interpreter,调用Interpreter的exec方法去执行后返回pipeline,执行结果记录到query_log里,最后把构造出对应的ast和pipeline返回

1
2
3
4
5
6
7
8
9
10
11
// 这里是实现了ParserQuery对象,继承了IParserBase,IParserBase继承自IParser,等下走到6时,才知道虚函数parseImpl会通过ParserQuery对象实现
ParserQuery parser(end, settings.enable_debug_queries);
.........
ast = parseQuery(parser, begin, end, "", max_query_size, settings.max_parser_depth);
.........
auto interpreter = InterpreterFactory::get(ast, context, stage);
.........
res = interpreter->execute();
QueryPipeline & pipeline = res.pipeline;
.........
return std::make_tuple(ast, std::move(res));

3. parseQuery.cpp => parseQueryAndMovePosition

1
ASTPtr res = tryParseQuery(parser, pos, end, error_message, false, query_description, allow_multi_statements, max_query_size, max_parser_depth);

4. parseQuery.cpp => tryParseQuery

尝试解析SQL,将sql通过语法树规则装入TokenIterator,返回ASTPtr

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// ClickHouse词法分析器是由Tokens和Lexer类来实现,token是最基础的元祖,之间是没有任何关联的,只是一堆词组和符号,通过lexer语法进行解析后,把元祖里的token建立起关系。
Tokens tokens(pos, end, max_query_size);
IParser::Pos token_iterator(tokens, max_parser_depth);
// 注意这里,TokenIterator对->使用了重载,在重载函数里去初始化TOKEN,主要是从第一个字符开始使用pos++的方式进行判断,可以进入Token Lexer::nextTokenImpl()进行查看
if (token_iterator->isEnd() || token_iterator->type == TokenType::Semicolon) {
out_error_message = "Empty query";
pos = token_iterator->begin;
return nullptr;
}
.....
Expected expected;
......
ASTPtr res;
bool parse_res = parser.parse(token_iterator, res, expected);

注意:IParserparse方法是virtual虚函数,IParser作为接口角色,被IParserBase继承,在IParserBase里实现了parse方法。

5. IParserBase.cpp => parse

在解每个token时都会根据当前的token进行预判(parseImpl返回的结果),返回true才会进入下一个子token

1
2
3
4
5
6
7
8
9
10
11
12
bool IParserBase::parse(Pos & pos, ASTPtr & node, Expected & expected)
{
expected.add(pos, getName());

return wrapParseImpl(pos, IncreaseDepthTag{}, [&]
{
bool res = parseImpl(pos, node, expected);
if (!res)
node = nullptr;
return res;
});
}

注意到parseImpl在IParserBase中是一个虚函数,将被继承自IParserBase类的子类实现,而在 第2步 中我们定义的子类是ParserQuery,所以此时是直接调用到ParserQuery子类的parseImpl方法

6. ParserQuery.cpp => parseImpl

Parser的主要类(也都是继承自IParserBase)分别定义出来后,每个去尝试解析,如果都不在这几个主要Parser里,则返回false,否则返回true,clickhouse把query分类成以下14类,但本质上可以归纳为2类,第一类是有结果输出可对应show/select/desc/create等,第二类是无结果输出可对应insert/use/set等

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
bool ParserQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
ParserQueryWithOutput query_with_output_p(enable_explain);
ParserInsertQuery insert_p(end);
ParserUseQuery use_p;
ParserSetQuery set_p;
ParserSystemQuery system_p;
ParserCreateUserQuery create_user_p;
ParserCreateRoleQuery create_role_p;
ParserCreateQuotaQuery create_quota_p;
ParserCreateRowPolicyQuery create_row_policy_p;
ParserCreateSettingsProfileQuery create_settings_profile_p;
ParserDropAccessEntityQuery drop_access_entity_p;
ParserGrantQuery grant_p;
ParserSetRoleQuery set_role_p;
ParserExternalDDLQuery external_ddl_p;

bool res = query_with_output_p.parse(pos, node, expected)
|| insert_p.parse(pos, node, expected)
|| use_p.parse(pos, node, expected)
|| set_role_p.parse(pos, node, expected)
|| set_p.parse(pos, node, expected)
|| system_p.parse(pos, node, expected)
|| create_user_p.parse(pos, node, expected)
|| create_role_p.parse(pos, node, expected)
|| create_quota_p.parse(pos, node, expected)
|| create_row_policy_p.parse(pos, node, expected)
|| create_settings_profile_p.parse(pos, node, expected)
|| drop_access_entity_p.parse(pos, node, expected)
|| grant_p.parse(pos, node, expected)
|| external_ddl_p.parse(pos, node, expected);

return res;
}

注意看这个parseImpl方法,进来后都会先去接ParserQueryWithOutput类解析相关ast,这里类涉及到了explainselectshowcreatealter等相关语法的解析,如果解析不过,则直接报错,解析成功后会处理我们这篇文章中提到的SETTING,如下图7定义的,将setting传入的变量存入到s_settings指针中。
图7

clcikhouse的parser总结:

    1. ClickHouse词法分析器
      词法解析的主要任务是读入源程序的输入字符、将它们组成词素,生成并输出一个词法单元(Token)序列,每个词法单元对应于一个词素。ClickHouse中的每个词法单元(Token)使用一个struct Tocken结构体对象来进行存储,结构体中存储了词法单元的typevalue
      ClickHouse词法分析器是由TokensLexer类来实现, DB::Lexer::nextTokenImpl()函数用来对SQL语句进行词法分析的具体实现
    1. ClickHouse语法解析器
      ClickHouse中定义了不同的Parser用来对不同类型的SQL语句进行语法分析,例如:ParserInsertQuery(Insert语法分析器)、ParserCreateQuery(Create语法分析器)、ParserAlterQuery(Alter语法分析器)等等。
      Parser首先判断输入的Token序列是否是该类型的SQL,若是该类型的SQL,则继续检查语法的正确性,正确则生成AST返回,语法错误的则抛出语法错误异常,否则直接返回空AST语法树

clickhouse

解答setting生效问题

图7
如上图所示,当前原生ck只支持InterpreterSelectQuery和InterpreterInsertQuery对query传入setting进行了重载处理。
InterpreterSelectQuery是在自己的构造函数里初始化了setting到context里

1
2
3
4
5
6
void InterpreterSelectQuery::initSettings()
{
auto & query = getSelectQuery();
if (query.settings())
InterpreterSetQuery(query.settings(), *context).executeForCurrentContext();
}

InterpreterInsertQuery是在parser解析出ast后,在executeQueryImpl进行的setting重载context。
1
2
3
4
auto * insert_query = ast->as<ASTInsertQuery>();

if (insert_query && insert_query->settings_ast)
InterpreterSetQuery(insert_query->settings_ast, context).executeForCurrentContext();

剩下的setting都已经是通过Interpreter执行结束后再处理的,对于我们需要在前置传入时没有效果了。

解决业务困扰

当前我们的离线导入hive2ck,其实是通过将数据写入到临时表,这张临时表是按照目标表的表结构重新创建了一个MergeTree表,通过spark任务将hive数据以流式方式写入到临时表分区,生产出分区对应的多个part,生产过程中我们会将part拉回到临时表对应的detach目录,这个过程叫离线构建,再将再将part通过ck的attach命令激活,这时候临时表就对该分区可见了,然后再通过replace partition的方式,将临时表的分区替换到我们的目标表去,这整个过程,就是我们的hive2ck处理流程,如下图所示:
图10

我们将此离线构建继承到数梦的同步中心后,陆续遇到了业务方来咨询相关问题,下面是问题汇总及如何解决的

如何在离线导入中将明细数据写入到关联的物化视图

熟悉clickhouse的同学们都知道,原生ck对于物化视图的写入,唯一的方式是在明细表通过insert写入时,才会将数据经过物化视图的触发器写入关联的物化视图,而在离线构建过程中,ck是不支持的,但很多业务方跟我们提出这个需求,希望离线构建可以支持将分区数据写入到关联物化视图去,于是我们对ck的replace partition 进行了改造。
改造前的语法:

1
ALTER TABLE test.visits_basic REPLACE PARTITION '20221102' FROM test.visits_basic_tmp;

改造后的语法:
1
ALTER TABLE test.visits_basic REPLACE PARTITION '20221101' FROM test.visits_basic_tmp AND TRIGGER VIEW;

在离线构建走到替换分区这一步时,我们改造了AstAlterQuery,让ParserAlterQuery增加了对and trigger view的语法解析,解析之后进入到InterpreterAlterQuery时,如果ast返回的trigger view是true,则程序流程会流转到取出明细表元数据,查询是否有关联物化视图,重新构造出类似下面的sql,交过pipeline进行执行,由此将该分区数据写入到物化视图去。
1
insert into 物化视图 select from 明细表 where 分区=xx

分区过大导入失败如何解决

1
xxx has timed out! (120000ms) Possible deadlock avoided. Client should retry

图10
如上图所示,替换分区前,会给该明细表加一把锁,并设定锁时间(lock_acquire_timeout),系统默认时120s,如果该分区过大,替换过程超过120s,则会爆上面错误,而本文最开始已经讲解过如何处理setting,考虑到ck原生只支持insert和select时interpreter对setting重载,由此进行改造让InterpreterAlterQuery也支持通过sql传入锁时间,如下面语法:

1
ALTER TABLE test.visits_basic REPLACE PARTITION '20221108' FROM test.visits_basic_tmp AND TRIGGER VIEW SETTINGS lock_acquire_timeout=86400000;

因为ParserQueryWithOutput已经对setting进行了解析,而AstAlterQuery其实是继承自ASTQueryWithOutput,所以我们已经获得了setting这一块的ast,无需再自己初始化新的ast,只要在InterpreterAlterQuery里把setting重载就行了,如图11
图10

替换分区成功,物化视图数据写入报错如何解决

  1. 首先我们在数梦平台上控制了相关的ddl语句修改,如果用户要删明细表字段,则必须先去处理关联的物化视图字段,如果用户要删明细表,则必须先删物化视图
  2. 遇到替换分区成功,而物化视图写入失败,报错都是锁明细表超时,对于这种case可直接解锁明细表的锁,让物化视图自己去写,不再锁明细表,所以只需要做简单的锁释放便可

以上是对ck进行改造过程中遇到的3个问题,此改造过程主要是满足离线导入可写入物化视图,未来我们还将对ck进行更多改造,以满足不同业务需求,各个 业务线大佬们如果在使用ck过程中有遇到任何问题,欢迎加入ck用户群,和我们一起沟通解决。

背景介绍

ClickHouse介绍

ClickHouse是由俄罗斯的第一大搜索引擎Yandex公司开源的列存数据库。令人惊喜的是,ClickHouse相较于很多商业MPP数据库,比如VerticaInfiniDB有着极大的性能提升。除了Yandex以外,越来越多的公司开始尝试使用ClickHouse等列存数据库。对于一般的分析业务,结构性较强且数据变更不频繁,可以考虑将需要进行关联的表打平成宽表,放入ClickHouse中。

  • 配置丰富,只依赖与Zookeeper
  • 线性可扩展性,可以通过添加服务器扩展集群
  • 容错性高,不同分片间采用异步多主复制
  • 单表性能极佳,采用向量计算,支持采样和近似计算等优化手段
  • 功能强大支持多种表引擎

StarRocks介绍

StarRocks是一款极速全场景MPP企业级数据库产品,具备水平在线扩缩容,金融级高可用,兼容MySQL协议和MySQL生态,提供全面向量化引擎与多种数据源联邦查询等重要特性。StarRocks致力于在全场景OLAP业务上为用户提供统一的解决方案,适用于对性能,实时性,并发能力和灵活性有较高要求的各类应用场景。

  • 不依赖于大数据生态,同时外表的联邦查询可以兼容大数据生态
  • 提供多种不同的模型,支持不同维度的数据建模
  • 支持在线弹性扩缩容,可以自动负载均衡
  • 支持高并发分析查询
  • 实时性好,支持数据秒级写入
  • 兼容MySQL 5.7协议和MySQL生态

二者的对比

相似之处

  • 都可以提供极致的性能
  • 都不依赖于Hadoop生态
  • 底层存储分片都提供了主主的复制高可用机制。
  • 都是MPP架构
  • 都是列式存储
  • 都支持表述SQL语法
  • 都提供了MOLAP库的预聚合能力

差异性

  • ClickHouse在更适用于大宽表的场景,TP的数据通过CDC工具的,可以考虑在Flink中将需要关联的表打平,以大宽表的形式写入ClickHouse
  • StarRocks对于join的能力更强,ClickHouse虽然提供了join的语义,但使用上对大表关联的能力支撑较弱,复杂的关联查询经常会引起OOM
  • ClickHouse对高并发的业务并不友好,即使一个查询,也会用服务器一半的CPU去查询
  • StarRocks可以支撑数千用户同时进行分析查询,在部分场景下,高并发能力能够达到万级。StarRocks在数据存储层,采用先分区再分桶的策略,增加了数据的指向性,利用前缀索引可以快读对数据进行过滤和查找,减少磁盘的I/O操作,提升查询性能
  • 对于用户的原有的查询基表的 SQL 语句保持不变,StarRocks 会自动选择一个最优的物化视图,从物化视图中读取数据并计算。用户可以通过 EXPLAIN 命令来检查当前查询是否使用了物化视图。而ClickHouse则需要用户自行指定SQL中所需要使用的物化视图。

为什么推荐StarRocks

  1. 滴滴大数据OLAP团队目前在维护的引擎有StarRocksClickHouseDruid,3个引擎各有各的特点,现有的OLAP引擎(Kylin、Druid、ClickHouse)多表Join时的性能都比较差,甚至不支持多表Join,现有的引擎Druid虽然有lookup表的能力,但经过实际测试后性能不佳。Apache Kylin实际上也不支持Join,多表的Join需要通过在cube构建的时候底层打成宽表来实现。ClickHouse只支持本地Hash join的模式,不支持分布式Shuffle join,多数情况下灵活性受限,性能表现不佳。
  2. OLAP引擎需要同时具备明细数据查询和数据聚合的能力。由于Apache KylinDruid不能较好支持明细数据查询,我们引入了ClickHouse,通过在明细表基础上创建相应聚合物化视图来处理,但是不够灵活,对于上层应用来说,查明细和查聚合需要切到不同的表去处理。
  3. 目前我们团队在有限人员情况下需要维护这3个引擎的稳定性,导致我们对每一个引擎的理解深度都不够,特别像ClickHouse,运维成本非常高,ClickHouse集群的分片、副本信息,都是通过静态的配置文件的方式进行配置。当整个集群需要扩缩容的时候,就必须通过修改配置文件的方式进行刷新,数据的均衡都需要运维人员介入。此外ClickHouse通过zookeeper来做副本管理,当集群规模变大时,副本数过多会导致zookeeper的压力变大,集群的稳定性也就会相应变差。
    为解决以上问题,滴滴大数据OLAP团队在2022年初开始调研StarRocks,在全面测试过从上面对StarRocksClickHouse的对比,我们也可以明显感受到StarRocks在多数场景下都是优于ClickHouse的,我们希望通过StarRocks来实现OLAP平台的多业务场景的查询引擎统一化。
    1
    注:这是我们针对DruidClickHouseStarRocks进行的测试对比,链接

StarRocks特性

StarRocks的架构设计融合了MPP数据库,以及分布式系统的设计思想,其特性如下所示。

架构精简

  • StarRocks内部通过MPP计算框架完成SQL的具体执行工作。MPP框架能够充分的利用多节点的计算能力,整个查询可以并行执行,从而实现良好的交互式分析体验。
  • StarRocks集群不需要依赖任何其他组件,易部署、易维护和极简的架构设计,降低了StarRocks系统的复杂度和维护成本,同时也提升了系统的可靠性和扩展性。管理员只需要专注于StarRocks系统,无需学习和管理任何其他外部系统。

全面向量化引擎

StarRocks的计算层全面采用了向量化技术,将所有算子、函数、扫描过滤和导入导出模块进行了系统性优化。通过列式的内存布局、适配CPUSIMD指令集等手段,充分发挥了现代CPU的并行计算能力,从而实现亚秒级别的多维分析能力。

智能查询优化

StarRocks通过CBO优化器 (Cost Based Optimizer) 可以对复杂查询自动优化。无需人工干预,就可以通过统计信息合理估算执行成本,生成更优的执行计划,大大提高了AdHocETL场景的数据分析效率。

联邦查询

StarRocks支持使用外表的方式进行联邦查询,当前可以支持HiveMySQLElasticsearchIcebergHudi类型的外表,您无需通过数据导入,可以直接进行数据查询加速。

高效更新

StarRocks支持明细模型(DUPLICATE KEY)、聚合模型(AGGREGATE KEY)、主键模型(PRIMARY KEY)和更新模型(UNIQUE KEY),其中主键模型可以按照主键进行UpsertDelete操作,通过存储和索引的优化可以在并发更新的同时实现高效的查询优化,更好的服务实时数仓的场景。

智能物化视图

  • StarRocks支持智能的物化视图。您可以通过创建物化视图,预先计算生成预聚合表用于加速聚合类查询请求。
  • StarRocks的物化视图能够在数据导入时自动完成汇聚,与原始表数据保持一致。
  • 查询的时候,您无需指定物化视图,StarRocks能够自动选择最优的物化视图来满足查询请求。

标准SQL

  • StarRocks支持标准的SQL语法,包括聚合、JOIN、排序、窗口函数和自定义函数等功能。
  • StarRocks可以完整支持TPC-H的22个SQLTPC-DS的99个SQL
  • StarRocks兼容MySQL协议语法,可以使用现有的各种客户端工具、BI软件访问StarRocks,对StarRocks中的数据进行拖拽式分析。

流批一体

  • StarRocks支持实时和批量两种数据导入方式。
  • StarRocks支持的数据源有KafkaHDFS和本地文件。
  • StarRocks支持的数据格式有ORCParquetCSV等。
  • StarRocks可以实时消费Kafka数据来完成数据导入,保证数据不丢不重 (exactly once)
  • StarRocks也可以从本地或者远程(HDFS)批量导入数据。

高可用易扩展

  • StarRocks的元数据和数据都是多副本存储,并且集群中服务有热备,多实例部署,避免了单点故障。
  • 集群具有自愈能力,可弹性恢复,节点的宕机、下线和异常都不会影响StarRocks集群服务的整体稳定性。
  • StarRocks采用分布式架构,存储容量和计算能力可近乎线性水平扩展。StarRocks单集群的节点规模可扩展到数百节点,数据规模可达到10 PB级别。
  • 扩缩容期间无需停服,可以正常提供查询服务。
  • StarRocks中表模式热变更,可通过一条简单SQL命令动态地修改表的定义,例如增加列、减少列和新建物化视图等。同时,处于模式变更中的表也可以正常导入和查询数据。

StarRocks应用场景

StarRocks可以满足企业级用户的多种分析需求,具体的业务场景如下所示:

OLAP多维分析

  • 用户行为分析
  • 用户画像、标签分析、圈人
  • 高维业务指标报表
  • 自助式报表平台
  • 业务问题探查分析
  • 跨主题业务分析
  • 财务报表
  • 系统监控分析

实时数仓

  • 电商大促数据分析
  • 教育行业的直播质量分析
  • 物流行业的运单分析
  • 金融行业绩效分析、指标计算
  • 广告投放分析
  • 管理驾驶舱
  • 探针分析APM(Application Performance Management)

高并发查询

  • 广告主报表分析
  • 零售行业渠道人员分析
  • saas行业面向用户分析报表
  • dashboard多页面分析

统一分析

通过使用一套系统解决多维分析、高并发查询、实时分析和Ad-Hoc查询等场景,降低系统复杂度和多技术栈开发与维护成本。

如何使用StarRocks

我们olap团队已经将StarRocks接入到滴滴各个平台,下面会介绍如何使用滴滴内部的平台和工具方便快捷的使用StarRocks引擎。
1

如何按照hive表的表结构创建clickhouse表

点击链接通过数梦的实时平台可以快捷创建对应hive表结构的ck表,如下图所示:
clickhouse

hive表的数据如何导入clickhouse表

创建同步任务

点击链接通过数梦的同步中心可以快捷创建hive表映射到ck表的同步任务,如下图所示
clickhouse

同步任务流程

如下图所示,我们按照这个流程处理hive数据导入到clickhouse
clickhouse

    1. 创建临时表
      按照ck表的表结构,我们会在集群的所有写节点创建同样表结构的单机表(MergeTree)引擎
    1. 起spark任务,利用clickhouse-local工具将hive表导入到临时表
      1
      2
      3
      4
      cat data.orc | clickhouse-local \
      --format=Native \
      --query='CREATE TABLE input (col1 String, col2 String, col3 String) ENGINE = File(ORC, stdin);CREATE TABLE target_table (col1 String, col2 String, col3 String) ENGINE = MergeTree() partition by tuple() order by col1;insert into target_table select *,"$year" as year,"$month" as month, "$day" as day from input;optimize table target_table final' \
      --config-file=config.xmls

请求处理

ck请求处理过程
clickhouse

客户端请求发给ck,ck接收到请求,放入请求队列

ck将请求队列的请求分发给Request handler线程处理

Request handler线程处理请求,解析sql语句,执行sql语句逻辑。

简单请求处理使用Request handler线程直接执行(如create,insert,alter等), 复杂请求处理使用Pipeline executor执行(如select, insert select等)

请求线程处理完当前请求后,发送请求结果给给客户端。然后接着处理后续请求

insert语句解析执行

初始化请求上下文环境。包括session,用户信息,当前database等,限流,权限,设置等信息

解析sql语句

检查被写入的表是否存在,是否有写入权限,是否被限流

对insert数据校验,字段是否存在,是否满足约束

根据默认值填充空字段和物化字段

缓存单次insert语句中的数据,insert语句全部接收完成或缓存数据超过一定大小后批量写入数据。

将insert的数据写入存储引擎,主要包含StorageDistributed和StorageReplicatedXXMergeTree

检查是否有物化视图,如果有使用物化视图逻辑处理insert数据,写入物化视图表

分布式表写入

分布式表数据写入一般情况下是异步写入,只有对使用 remote(‘addresses_expr’, db, table[, ‘user’[, ‘password’], sharding_key]) 定义的表的写入是同步的。如果分布式表中含有local表或replical的表local副本,直接写本地表。
clickhouse

将写入block数据按sharding逻辑分成多个block

将原insert语句改写,表名改成分布式表对应的底表,数据改成分shard后的block数据

检查待写入的每个shard,如果shard在本机,则直接写入实际存储引擎

shard在远程,将新insert语句写入远程shard本地缓存文件。

通知后台线程发送本地缓存中的数据

后台执行过程

读远程shard本地缓存目录

逐个处理每个文件

根据配置的loadbalance策略,选择合适的机器连接

将文件的insert语句通过上步连接发送给远程机器执行

执行成功后删除对应文件

本地表写入

本地表一般是StorageReplicatedXXMergeTree,其写入过程如下:
clickhouse
本地表是以block为最小单元单次写入,一个block中的数据可能是一次insert的全部数据,也可以是部分数据。

检查当前表part数量,如果part数量过多(接近part数限制)延时写入数据,如果part数量过限制则写入失败。

检查写入block中parttition总数是否超过限制。如果超过限制,写入失败。

将写入block数据按partition by 逻辑分成多个block

依次将每个分区的block数据写入表中

创建分区的临时part

计算part数据的sha1生成part对应的blockid

检查该blockid在是否存在(表的zk中会记录所有已存在的part的blockid)。如果存在表示插入数据重复,忽略后续步骤。

将part信息发布到ck,通知其他副本拉去新添加part

将临时part加入到commit到mergetree表

如果配置最小同步副本大于1,则等代其他副本数据同步达到满足条件

临时part创建过程

  • 创建block数据对应分区的临时part对象
  • 计算分区min_max索引
  • 处理排序和主键索引
  • ttl处理
  • 其他二级索引处理
  • 将处理过的block数据和索引写入临时part,根据配置的压缩方式压缩