azkaban集群搭建(三种分派器实现类的源码分析)
azkaban集群搭建(三种分派器实现类的源码分析)第三,另外两个实现类 ContainerizedDispatchManager 和 ExecutionController 没有维护队列,也没有重写父类方法,它们直接轮询数据库中的 Flow 数据,二者的区别在于 ExecutionController 通过 API 接口对象 apiGateway 向执行器下发执行任务, ContainerizedDispatchManager 则在 Kubernetes 容器中下发任务。由内部线程 QueueProcessorThread 轮询该队列,调用 processQueuedFlows 完成流任务的派发工作,核心方法是 selectExecutorAndDispatchFlow,它又调用 dispatch 方法,通过 API 接口对象 apiGateway 向执行器下发执行任务。奇怪的是,它为什么要定义三种实现类呢?这里探究一下三种实现方式的差异
分派执行流程分析Azkaban 提供了三种执行器实现类,默认的是 ExecutorManager ,但是该类却被标注为废弃,由 ExecutionController 类替代。
整个框架的引用遵循面向抽象编程的原则,ExecutorManagerAdapter 类型是各个组件引用执行管理类的类型,它有三个实现类:
AzkabanWebServer 类在装配时,由 AzkabanWebServerModule 类根据配置文件中的 azkaban.execution.dispatch.method 配置,指定具体实现类型:
private Class<? extends ExecutorManagerAdapter> resolveExecutorManagerAdaptorClassType() {
switch (DispatchMethod.getDispatchMethod(this.props
.getString(Constants.ConfigurationKeys.AZKABAN_EXECUTION_DISPATCH_METHOD
DispatchMethod.PUSH.name()))) {
case POLL:
return ExecutionController.class;
case CONTAINERIZED:
bind(ContainerizedImpl.class).to(resolveContainerizedImpl());
return ContainerizedDispatchManager.class;
case PUSH:
default:
return ExecutorManager.class;
}
}
三种实现的核心思路是一样的,就是遍历一遍当前所有 Ready 状态的 ExecuableFlow ,找到一个可用的执行器,调用执行器的 API 接口完成 Flow 任务下发操作。
奇怪的是,它为什么要定义三种实现类呢?这里探究一下三种实现方式的差异。
第一,顶层抽象类 AbstractExecutorManagerAdapter ,它的 submitExecutableFlow() 方法,会向数据库的 execution_flows 表插入一条执行记录,这里仅仅操作了数据库,并没有完成执行下发到任务执行器端的逻辑。
第二,ExecutorManager 这个实现类,它维护了一个缓存队列 queuedFlows,并重写了父类的 submitExecutableFlow() 方法,先入库,再存储一条待执行的记录到缓存队列 queuedFlows :
由内部线程 QueueProcessorThread 轮询该队列,调用 processQueuedFlows 完成流任务的派发工作,核心方法是 selectExecutorAndDispatchFlow,它又调用 dispatch 方法,通过 API 接口对象 apiGateway 向执行器下发执行任务。
第三,另外两个实现类 ContainerizedDispatchManager 和 ExecutionController 没有维护队列,也没有重写父类方法,它们直接轮询数据库中的 Flow 数据,二者的区别在于 ExecutionController 通过 API 接口对象 apiGateway 向执行器下发执行任务, ContainerizedDispatchManager 则在 Kubernetes 容器中下发任务。
第四, ExecutionController 类定义比较简单,它包含了一个外部类 ExecutorHealthChecker,由它完成轮询数据库 Flow、任务派发给执行器的逻辑,它包含一单线程池,且启动了一个默认每隔 5 分钟执行一次的定时任务:
this.scheduler = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("azk-health-checker").build());
public void start() {
logger.info("Starting executor health checker.");
this.scheduler.scheduleAtFixedRate(this::checkExecutorHealthQuietly 0L
this.healthCheckIntervalMin
TimeUnit.MINUTES);
}
第五 ContainerizedDispatchManager 它定义了一个内部线程类 QueueProcessorThread ,实现较为复杂一些,这里着重分析下它的代码。
ContainerizedDispatchManager它包含一个 RateLimiter 限流器、容器实现类 KubernetesContainerizedImpl :
内部线程定义为:
它在容器 alive 状态下循环:
public void run() {
// Loops till QueueProcessorThread is shutdown
while (!this.shutdown) {
try {
// Start processing queue if active otherwise wait for sometime
if (this.isActive) {
processQueuedFlows();
}
} catch (final Exception e) {
ContainerizedDispatchManager.logger.error(
"QueueProcessorThread Interrupted. Probably to shut down." e);
}
}
}
限流类 RateLimit 根据配置 creation.rate.limit ,默认值为 20 。
processQueuedFlows 方法轮询数据库中所有待执行的流任务,提交 ExecutionDispatcher 任务:
private void processQueuedFlows() throws ExecutorManagerException {
final Set<Integer> executionIds =
executorLoader.selectAndUpdateExecutionWithLocking(this.executionsBatchProcessingEnabled
this.executionsBatchSize
Status.DISPATCHING DispatchMethod.CONTAINERIZED);
for (final int executionId : executionIds) {
rateLimiter.acquire();
logger.info("Starting dispatch for {} execution." executionId);
Runnable worker = new ExecutionDispatcher(executionId);
executorService.execute(worker);
}
}
透过这两句代码可知,它是多线程分派任务给执行器的:
Runnable worker = new ExecutionDispatcher(executionId);
executorService.execute(worker);
为什么这个实现类叫 Containerized 分发器呢?
继续看 ExecutionDispatcher 这个分派任务类,它的 run 方法中委派了下面一段调用:
ContainerizedDispatchManager.this.containerizedImpl.createContainer(executionId);
这个容器实现类是 KubernetesContainerizedImpl,创建了一个指定执行编号的容器:
public void createContainer(final int executionId) throws ExecutorManagerException {
createPod(executionId);
if (isServiceRequired()) {
createService(executionId);
}
}
没有部署过 Kubernetes 集群,这里感觉应该是将执行器纳入了 Kubernetes 管理了,所以看不到 apiGateWay 的引用,直接通过 Kubernetes 的 API 下发给执行器了。
启示录分发执行的三种实现类的区别: