Amoro优化器实现


发布于 2025-06-12 / 11 阅读 / 0 评论 /
基于amoro-0.9.0版本源码,解读Amoro优化器执行过程

Amoro优化器简述

Amoro对表的优化是通过优化器来完成的,目前支持以下优化器:flink优化器、spark优化器、standalone优化器。

Optimizer类型

Optimizer表示不同的优化器类型。

Optimizer的类型如下图所示

OptimizerExecutor类型

OptimizerExecutor表示优化器执行器,包含优化器的处理逻辑。

OptimizerExecutor的类型如下图所示

任务的执行在OptimizerExecutor中。

Amoro优化器类结构

Amoro优化器完整类结构如下图所示:

下面分别对这些优化器进行解析。

优化器配置

优化器配置表示启动优化器进程的一些命令行配置,定义在org.apache.amoro.optimizer.common.OptimizerConfig类中,包含以下配置

参数名

别名

是否必须

默认值

参数说明

-id

--resource-id

优化资源ID

-a

--ams-optimizing-uri

ams通信地址

-p

--execution-parallel

优化器执行的并发度

-g

--group-name

优化器组名称

-hb

--heart-beat-interval

10秒

优化器进程和ams的心跳间隔

-eds

--extend-disk-storage

false

是否扩展存储到磁盘

-dsp

--disk-storage-path

磁盘存储路径

-msz

--memory-storage-size

512MB

扩展存储到磁盘时,可使用的最大内存限制

Flink优化器

跟踪flink优化器的执行过程。

Flink优化器执行入口

Flink优化器进程由org.apache.amoro.server.manager.FlinkOptimizerContainer启动,进程main函数在org.apache.amoro.optimizer.flink.FlinkOptimizer中定义,main函数即为Flink优化器的执行入口,逻辑如下:

org.apache.amoro.optimizer.flink.FlinkOptimizer#main
	StreamExecutionEnvironment env
	OptimizerConfig optimizerConfig
	// 计算优化器内存分配
	calcOptimizerMemory(optimizerConfig, env);
	// 初始化FlinkOptimizer
	Optimizer optimizer = new FlinkOptimizer(optimizerConfig);
	env.addSource(new FlinkToucher())
		.transform(new FlinkExecutor())
		.addSink(new DiscardingSink<>())
	// 启动flink任务,指定任务名称
	env.execute("amoro-flink-optimizer-{resourceId}")

优化器内存计算

calcOptimizerMemory方法计算flink任务所需的内存,逻辑如下。

org.apache.amoro.optimizer.flink.FlinkOptimizer#calcOptimizerMemory(config)
	// 获取jobmanager.memory.process.size配置
	MemorySize jobMemorySize
	// 获取taskmanager.memory.process.size配置
	MemorySize taskMemorySize
	// jobMemorySize或taskMemorySize为空,表示本地运行,无需计算
	if (jobMemorySize == null || taskMemorySize == null) return;
	// 获取优化器并发度,由“-p”配置
	int parallelism
	// 获取taskmanager.numberOfTaskSlots配置
	int numberOfTaskSlots
	// 计算总内存大小,注意单位一致
	int memorySize = jobMemorySize + (parallelism / numberOfTaskSlots) * taskMemorySize;
	// 如果parallelism不是numberOfTaskSlots的整数倍,则还需要加一个task的内存
	if (parallelism % numberOfTaskSlots != 0) memorySize += taskMemorySize;
	config.setMemorySize(memorySize);

初始化FlinkOptimizer

初始化FlinkOptimizer实例,并根据优化器并发度初始化FlinkOptimizerExecutor列表

new FlinkOptimizer(optimizerConfig);
	// 初始化OptimizerToucher实例
	OptimizerToucher toucher
	// 根据优化器并发度初始化FlinkOptimizerExecutor列表,每个FlinkOptimizerExecutor表示一个线程,线程号为数字,从0开始
	OptimizerExecutor[] executors = new FlinkOptimizerExecutor[config.getExecutionParallel()]
	FlinkOptimizer
	// 如果资源id不为空,则注册此属性
	if (config.getResourceId() != null) {
		toucher.withRegisterProperty("resource-id", config.getResourceId());
	}

Spark优化器

跟踪spark优化器的执行过程。

Spark优化器执行入口

Spark优化器为org.apache.amoro.optimizer.spark.SparkOptimizer,AMS通过org.apache.amoro.server.manager.SparkOptimizerContainer来启动对应的SparkOptimizer。

SparkOptimizer的main函数执行过程如下:

spark优化器为org.apache.amoro.optimizer.spark.SparkOptimizer,main函数为执行入口,当
org.apache.amoro.optimizer.spark.SparkOptimizer#main(args)
    // 解析入参,包含并发度、资源名称、ams连接等信息
    config = new OptimizerConfig(args);
    // 创建SparkSession实例,名称为"amoro-spark-optimizer-{入参id的配置值}"
    spark = SparkSession.builder()
    // 创建应用上下文信息
    jsc = new JavaSparkContext(spark.sparkContext());
    // 设置内存大小,值为spark.driver.memory + spark.executor.memory * {并发度}
    config.setMemorySize()
    // 初始化SparkOptimizer实例
    optimizer = new SparkOptimizer(config, jsc);
        // 初始化OptimizerToucher构建工厂方法
        () -> new OptimizerToucher(config)
        // 初始化SparkOptimizerExecutor构建工厂,i表示线程号
        (i) -> new SparkOptimizerExecutor(jsc, config, i))
        // 调用OptimizerToucher构建工厂创建OptimizerToucher实例
        toucher = toucherFactory.get();
        // 调用SparkOptimizerExecutor构建工厂创建OptimizerExecutor实例,数量为配置的并发度
        executors = new OptimizerExecutor[config.getExecutionParallel()];
        // 注册resource-id信息
        toucher.withRegisterProperty(“resource-id”, config.getResourceId());
    // 获取optimizer中的toucher属性,OptimizerToucher的主要作用是维护优化器与AMS(Amoro Management Service)之间的通信和状态同步
    toucher = optimizer.getToucher()
    // 注册job-id信息
    toucher.withRegisterProperty(“job-id”, spark.sparkContext().applicationId());
    // 启动优化器
    optimizer.startOptimizing();
        LOG.info("Starting optimizer with configuration:{}", config);
        // 为每一个executor启动一个线程,线程号为“Optimizer-executor-{optimizerExecutor.getThreadId()}”
        executors.forEach(optimizerExecutor -> new Thread(optimizerExecutor::start).start());

OptimizerExecutor启动任务

OptimizerExecutor#start过程如下图所示:

拉取任务信息

OptimizerExecutor#pollTask的逻辑如下图所示:

task_runtime记录着任务执行记录,对应状态机如下图所示:

包含6个状态

任务执行

通过OptimizerExecutor.executeTask执行任务,具体逻辑如下图所示: