快捷搜索:  汽车  科技

azkaban集群搭建(三种分派器实现类的源码分析)

azkaban集群搭建(三种分派器实现类的源码分析)第三,另外两个实现类 ContainerizedDispatchManager 和 ExecutionController 没有维护队列,也没有重写父类方法,它们直接轮询数据库中的 Flow 数据,二者的区别在于 ExecutionController 通过 API 接口对象 apiGateway 向执行器下发执行任务, ContainerizedDispatchManager 则在 Kubernetes 容器中下发任务。由内部线程 QueueProcessorThread 轮询该队列,调用 processQueuedFlows 完成流任务的派发工作,核心方法是 selectExecutorAndDispatchFlow,它又调用 dispatch 方法,通过 API 接口对象 apiGateway 向执行器下发执行任务。奇怪的是,它为什么要定义三种实现类呢?这里探究一下三种实现方式的差异

分派执行流程分析

Azkaban 提供了三种执行器实现类,默认的是 ExecutorManager ,但是该类却被标注为废弃,由 ExecutionController 类替代。

整个框架的引用遵循面向抽象编程的原则,ExecutorManagerAdapter 类型是各个组件引用执行管理类的类型,它有三个实现类:

azkaban集群搭建(三种分派器实现类的源码分析)(1)


AzkabanWebServer 类在装配时,由 AzkabanWebServerModule 类根据配置文件中的 azkaban.execution.dispatch.method 配置,指定具体实现类型:

private Class<? extends ExecutorManagerAdapter> resolveExecutorManagerAdaptorClassType() { switch (DispatchMethod.getDispatchMethod(this.props .getString(Constants.ConfigurationKeys.AZKABAN_EXECUTION_DISPATCH_METHOD DispatchMethod.PUSH.name()))) { case POLL: return ExecutionController.class; case CONTAINERIZED: bind(ContainerizedImpl.class).to(resolveContainerizedImpl()); return ContainerizedDispatchManager.class; case PUSH: default: return ExecutorManager.class; } }

三种实现的核心思路是一样的,就是遍历一遍当前所有 Ready 状态的 ExecuableFlow ,找到一个可用的执行器,调用执行器的 API 接口完成 Flow 任务下发操作。

奇怪的是,它为什么要定义三种实现类呢?这里探究一下三种实现方式的差异。

第一,顶层抽象类 AbstractExecutorManagerAdapter ,它的 submitExecutableFlow() 方法,会向数据库的 execution_flows 表插入一条执行记录,这里仅仅操作了数据库,并没有完成执行下发到任务执行器端的逻辑。

第二,ExecutorManager 这个实现类,它维护了一个缓存队列 queuedFlows,并重写了父类的 submitExecutableFlow() 方法,先入库,再存储一条待执行的记录到缓存队列 queuedFlows :

azkaban集群搭建(三种分派器实现类的源码分析)(2)


由内部线程 QueueProcessorThread 轮询该队列,调用 processQueuedFlows 完成流任务的派发工作,核心方法是 selectExecutorAndDispatchFlow,它又调用 dispatch 方法,通过 API 接口对象 apiGateway 向执行器下发执行任务。

第三,另外两个实现类 ContainerizedDispatchManager 和 ExecutionController 没有维护队列,也没有重写父类方法,它们直接轮询数据库中的 Flow 数据,二者的区别在于 ExecutionController 通过 API 接口对象 apiGateway 向执行器下发执行任务, ContainerizedDispatchManager 则在 Kubernetes 容器中下发任务。

第四, ExecutionController 类定义比较简单,它包含了一个外部类 ExecutorHealthChecker,由它完成轮询数据库 Flow、任务派发给执行器的逻辑,它包含一单线程池,且启动了一个默认每隔 5 分钟执行一次的定时任务:

this.scheduler = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat("azk-health-checker").build()); public void start() { logger.info("Starting executor health checker."); this.scheduler.scheduleAtFixedRate(this::checkExecutorHealthQuietly 0L this.healthCheckIntervalMin TimeUnit.MINUTES); }

第五 ContainerizedDispatchManager 它定义了一个内部线程类 QueueProcessorThread ,实现较为复杂一些,这里着重分析下它的代码。

ContainerizedDispatchManager

它包含一个 RateLimiter 限流器、容器实现类 KubernetesContainerizedImpl :

azkaban集群搭建(三种分派器实现类的源码分析)(3)


内部线程定义为:

azkaban集群搭建(三种分派器实现类的源码分析)(4)


它在容器 alive 状态下循环:

public void run() { // Loops till QueueProcessorThread is shutdown while (!this.shutdown) { try { // Start processing queue if active otherwise wait for sometime if (this.isActive) { processQueuedFlows(); } } catch (final Exception e) { ContainerizedDispatchManager.logger.error( "QueueProcessorThread Interrupted. Probably to shut down." e); } } }

限流类 RateLimit 根据配置 creation.rate.limit ,默认值为 20 。

processQueuedFlows 方法轮询数据库中所有待执行的流任务,提交 ExecutionDispatcher 任务:

private void processQueuedFlows() throws ExecutorManagerException { final Set<Integer> executionIds = executorLoader.selectAndUpdateExecutionWithLocking(this.executionsBatchProcessingEnabled this.executionsBatchSize Status.DISPATCHING DispatchMethod.CONTAINERIZED); for (final int executionId : executionIds) { rateLimiter.acquire(); logger.info("Starting dispatch for {} execution." executionId); Runnable worker = new ExecutionDispatcher(executionId); executorService.execute(worker); } }

透过这两句代码可知,它是多线程分派任务给执行器的:

Runnable worker = new ExecutionDispatcher(executionId); executorService.execute(worker);

为什么这个实现类叫 Containerized 分发器呢?

继续看 ExecutionDispatcher 这个分派任务类,它的 run 方法中委派了下面一段调用:

ContainerizedDispatchManager.this.containerizedImpl.createContainer(executionId);

这个容器实现类是 KubernetesContainerizedImpl,创建了一个指定执行编号的容器:

public void createContainer(final int executionId) throws ExecutorManagerException { createPod(executionId); if (isServiceRequired()) { createService(executionId); } }

没有部署过 Kubernetes 集群,这里感觉应该是将执行器纳入了 Kubernetes 管理了,所以看不到 apiGateWay 的引用,直接通过 Kubernetes 的 API 下发给执行器了。

启示录

分发执行的三种实现类的区别:

azkaban集群搭建(三种分派器实现类的源码分析)(5)

猜您喜欢: