spring batch 处理记录(批处理框架系列)
spring batch 处理记录(批处理框架系列)Spring Batch在设计时充分考虑的不同类型的用户需求,重视框架的可扩展性。其设计上的分层架构如下图所示。Spring Batch是一个完全开源的框架,它提供了稳定的企业级批处理任务的解决方案,尤其适用于以下业务场景。Spring Batch提供了很多用于支持大批量数据处理的功能,例如失败后的重试、跳过记录、从最后一次失败的位置重新开始工作、定期批量的提交给事务型数据库、可重用的组件(如解析器、映射器、读取器、处理器、写入器和校验器)以及工作流定义。下面是几个典型的批处理程序的使用场景:Spring Batch 能够自动完成上述基础批处理迭代,并将类似的事务抽象成一个集合视角来处理,故而其典型应用场景就是无人值守的批处理。
一、简介一个轻量级的,全面的批处理框架,旨在支持开发对企业系统的日常运行至关重要的强大的批处理应用程序。Spring Batch提供了可重用的功能,这些功能对于处理大量记录至关重要,包括日志记录/跟踪,事务管理,作业处理统计信息,作业重启,跳过和资源管理。它还提供了更高级的技术服务和功能,这些服务和功能将通过优化和分区技术来实现极高容量和高性能的批处理作业。简单以及复杂的大批量批处理作业都可以以高度可扩展的方式利用框架来处理大量信息。总之,无论是简单的,还是复杂的大数据量的处理任务,都可以利用该框架为信息处理提供可扩展的支持。
二、特性Spring Batch有以下特性:
- 事务管理器
- 任务块处理
- 声明式I/O
- Start/Stop/Restart状态控制
- Retry/Skip 任务重试与跳过
- 管理员web操作接口(需要依赖Spring Cloud Data Flow)
在企业级应用中许多关键任务都需要批处理操作,需求大致可以分为如下几类:
- 自动化对大批量数据进行复杂处理。这些任务大部分都是基于时间事件驱动的无人值守任务(如月度统计、通知、通信任务)。
- 在超大数据集合中进行周期性重复的业务逻辑(比如借款利率计算)。
- 对内部或外部系统进行消息集成,通常还需要在一个事务管理器中完成格式化、验证以及处理并存储几率。批处理应用可以每天为企业处理数以亿计的数据。
Spring Batch继承了Spring框架的设计理念,强调基于POJO的开发方式并且促进创建可维护、可测试的代码。在功能设计上它利用调度框架(如Quartz)工作,并非是一个调度框架。
Spring Batch提供了很多用于支持大批量数据处理的功能,例如失败后的重试、跳过记录、从最后一次失败的位置重新开始工作、定期批量的提交给事务型数据库、可重用的组件(如解析器、映射器、读取器、处理器、写入器和校验器)以及工作流定义。
三、使用场景下面是几个典型的批处理程序的使用场景:
- 需要从数据库、文件或队列中读取大批量的数据
- 以数据流的方式处理数据
- 以修改的形式回写数据
Spring Batch 能够自动完成上述基础批处理迭代,并将类似的事务抽象成一个集合视角来处理,故而其典型应用场景就是无人值守的批处理。
Spring Batch是一个完全开源的框架,它提供了稳定的企业级批处理任务的解决方案,尤其适用于以下业务场景。
- 周期性提交的批处理任务
- 并行批处理
- 消息过程驱动任务处理
- 超大规模并行批处理
- 失败后手动或自动重启任务
- 支持任务工作流,按照指定步骤执行任务
- 任务分批机制和任务跳过功能
- 事务支持
- 使用Spring的编程模型,开发者只需集中精力关心业务逻辑的研发,所有的基础设施的操作都完全交由框架来管理。
- 使基础架构设施,软件执行环境和批处理应用之间完全分离。
- 以接口的方式提供公共核心服务的功能,业务系统可以根据需要实现所需的组件。
- 提供公共核心服务接口的简单默认实现,整个框架开箱可用
- 借助于Spring框架,可以很方便的完成框架的配置,定制化以及扩展服务。
- 所有存在的核心服务可以很容地被替换和扩展,不影响基础层。
- 提供一个简单的部署模式,利用Maven构建独立的Jar文件。
Spring Batch在设计时充分考虑的不同类型的用户需求,重视框架的可扩展性。其设计上的分层架构如下图所示。
Spring Batch 分层架构
这种分层结构有三个重要的组成部分:应用层、核心层、基础架构层。
应用层包含所有批处理任务以及开发者使用Spring Batch编写的其它代码。
核心层提供运行与管理批处理任务的能力,包含了Batch启动和控制所需要的核心类,如 :JobLauncher Job以及Step接口的实现类等。
应用层和核心层建立在基础构架层之上,基础构架层提供共通的读(ItemReader)、写(Itemwriter)、和服务(如RetryTemplate:重试模块。可以被应用层和核心层使用)。
六、Spring Batch流程介绍上图描绘了Spring Batch的执行过程。说明如下:
每个Batch都会包含一个Job。Job就像一个容器,这个容器里装了若干Step,Batch中实际干活的也就是这些Step,至于Step干什么活,无外乎读取数据,处理数据,然后将这些数据存储起来(ItemReader用来读取数据,Itemprocessor用来处理数据,ItemWriter用来写数据) 。JobLauncher用来启动Job,JobRepository是上述处理提供的一种持久化机制,它为JobLauncher,Job,和Step实例提供CRUD操作。外部控制器调用JobLauncher启动一个Job,Job调用自己的Step去实现对数据的操作,Step处理完成后,再将处理结果一步步返回给上一层,这就是Batch处理实现的一个简单流程。
Step执行过程
从DB或是文件中取出数据的时候,read()操作每次只读取一条记录,之后将读取的这条数据传递给processor(item)处理,框架将重复做这两步操作,直到读取记录的件数达到batch配置信息中”commin-interval”设定值的时候,就会调用一次write操作。然后再重复上图的处理,直到处理完所有的数据。当这个Step的工作完成以后,或是跳到其他Step,或是结束处理。
七、常用组件相关介绍1. JOB2. 监听器八、通用批处理程序参考指南在设计批处理应用时,我们应慎重思考以下几点。
- 在同时具有批处理和实时处理的环境中,最好使用数据块来作为整体操作对象,因为批处理架构和实时处理架构会相互影响。
- 拒绝在单一批处理应用中使用复杂的逻辑结构,在设计应用时一定要遵守简单至上的原则。
- 保证在进行数据处理时有备份数据
- 尽量减少系统资源占用,特别是应该大量使用内存计算以减少I/O操作。
- 应谨慎检查应用的I/O操作来确保应用没有非必要的I/O(可通过分析SQL语句等方式),特别应仔细检查是否存在以下四种缺陷:1. 当可以一次性读取到所有数据,缓存在应用中时,却在每个事务里都从物理磁盘读取相关数据。2.在同一个事务中重复读取数据。3.不必要的表或索引扫描。4.在where子句中使用非精确查找。
- 必要情况下对计算的临时结果值进行存储,不要重复地进行相同的计算。
- 在批处理应用开始时就申请足够的内存空间,避免随着时间的增加不断地申请更多的内存空间。
- 适当的检查与记录校验以保证数据的完整性。
- 进行周期性的校验。例如在文件持续变动的场景下,需要从末尾计算总记录数以及对关键字进行聚合统计。
- 尽早地在与接近真实环境的条件下使用真实的数据进行压力测试。
- 在7*24小时高可用的大批量数据处理系统中,数据备份是一项有挑战性的工作。运维都通常能设计好在线数据库的备份,同样重要文件的备份其实并不是那么容易操作。如果项目依赖于可变动的文件,那么文件的备份不但要关心适当的位置和文档化,还应该定期地进行测试。
为了更好地设计和实现批处理系统,基础批处理应用的的构建块和模式应该提供图表式的框架以及编程接口给设计和研发人员。在设计批处理任务时,首重抽象分解阶段,业务处理逻辑应该被分解为一连串的步骤,每一个步骤都是一个行为模块。具体的步骤设计可参考下面的标准行为模块。
- 转换型模块:所谓转换主要是在系统需要依赖外部数据时,将外部数据转化为标准的输入数据格式。在批处理系统中转换模块完全可以设计为公共可抽象的模块。
- 校验型模块:校验组件用来确保输入/输出记录的正确性以及可持久化。比较有代表性的校验如文件头尾格式信息,行数校验,校验算法以及数据纪录级别的反复核对。
- 选取型应用:对批处理模块的抽象的核心是精准定位应用的核心功能。如一个选取型模块应该做的事是依照预定义的规则从文件或数据中选取数据作为输入,然后输出到目标位置。
- 选取/更新型模块:采用基于数据事件驱动的方式从文件或数据库中读取数据,进行更改之后将数据回写持久化到数据库或文件中。
- 数据处理与更新模块:数据处理型模块的输入事务源于外部数据源或检验型模块。该行为模块通常包含从数据库中获取需要进行处理的数据,然后更新数据库或者创建若干新的数据。
- 格式化输出组件:输出型组件从文件中读取数据,依照标准去更改纪录的数据结构后,将数据输出到新的文件或者传输到其它系统中。
然而受复杂业务逻辑的影响,很多应用无法简单的由上述标准行为模块组成。此时或许可以尝试将多个标准组件组合,以完成业务需求。除了上述标准模块外,框架还提供了如下模块。
- 排序:从源文件中读取数据,并依照指定的字段作为键进行排序。
- 切分:从单个数据源中读出数据,并依据参数规则将每条数据拆分输出到多个目标地点。
- 合并:与切分型模块功能相反。
根据不同的数据源,批处理应用可被分为以下三类:
- 数据库驱动型应用:数据依赖于由数据库存储的纪录行或值。
- 文件驱动型应用:纪录或值存储在文件中
- 消息驱动型应用:数据纪录由消息队列维护
上述的批处理策略是批处理系统的基础。在选用具体的批处理策略时要充分考虑很多因素。首先要估算数据量的大小,其次是批处理系统要面临的并发,再者还需要考虑系统的可用性要求(业务方希望系统7*24小时可用)。
下面是几种典型的批处理模式(通常结合调度任务使用):
- 离线普通批处理
- 在线并发批处理
- 并行运行多种批处理任务
- 分区批处理(将一个批处理应用部署多个节点)
- 综合上述批处理模式
针对不同的模式,数据的提交和锁定的策略非常重要,且在设计完成整体架构之后,批处理模式很难能独立更改。锁策略可以选用数据库自带的锁,也可以自己自定义继承框架中的锁服务。锁服务可以根据数据库锁状态来判断是否给予数据库操作的权限。根据锁服务的状态,也可以避免自己继承实现的重试逻辑的无意义执行。
1. 离线普通批处理在数据不被线上客户修改访问,没有并发处理的需求,也没有其它批处理任务一同对这些数据进行更改时,当前任务处理完成后就可以简单的对结果进行提交。
诚然最简单的逻辑是最有效的。但是随着时间的发展和产品迭代,要处理的数据关系会愈加复杂,数据量也会变得越来越大。如果没有设计锁服务,此时批处理模型的数据单次提交逻辑就显得相形见绌。
2. 实时并发批处理实时并发批处理程序需要同时满足多个并发请求,且任务处理可能占用数秒时间,任务结束后还需要在同一事物中提交数据,故而并发批处理不应该锁定任何数据。整体锁定的数据越少,数据对于其它进程不可用的时间越少,整个系统就越健壮。
合理选择乐观锁或悲观锁来实现逻辑行级锁是减少物理锁定的好思路。下面分别对乐观锁和悲观锁进行介绍。
乐观锁: 适用于数据很少发生锁竞争的情况。在使用数据库时具体实现方案为在批处理程序需要用到的数据表的添加一个版本号或者时间戳字段。当应用读取数据一行数据时,同时也会读取到此列。且当应用处理完成要更新数据时,会将这个时间戳或版本作为WHERE子句的条件。如果数据库中该行当前的时间戳字段与WHERE条件一样,这条数据才会被更新。否则的话就意味着这条数据已经被其它应用修改过了,应用此时应该去尝试其它策略。
悲观锁: 适用于数据会发生很多竞争的情况,在检索时需要取得物理或者逻辑锁。使用数据库时可以借助数据库默认的行级锁来实现。当应用需要更新数据时,首先检索该行并将锁标识为已锁定。这时其它应用尝试检索相同数据时会发生逻辑失败。直到应用将数据更新完成,锁标志才会被释放,其它应用才能重新获取数据。使用悲观锁有两点情况需要注意:1.在锁的锁定和释放的时间范围内,应当保证数据的完整性。2. 应当对锁设置超时机制。
上述知识尤其适用于高并发程序。一般情况下,乐观锁适用于实时处理应用,而互斥锁在批处理应用中能够大展拳脚。无论具体使用哪种锁,其根本目的都是为了保证数据的安全。
不过这些锁只能对单条数据定位锁定。当需要对一组数据进行锁定时,你所面临的最大的挑战是死锁的发生。怎么避免呢?有了逻辑锁,或许可以再设计一个逻辑锁管理器,管理器应该是一致的、非死锁的且应该能理解你想保护的数据组。逻辑锁管理器常常有自己的一张自己的表,如此便能提供锁管理、竞争报告、超时机制以及其它用户关注的功能。
3. 并行处理并行批处理的意思是多个批处理应用或任务并行工作,从而减少整体处理时间。 如果多个应用间没有共享同一个文件、数据库表或者索引空间,多个应用并行处理,各自只关注自己的数据,其间没有数据冲突,也不会发生什么问题。当应用间有共享资源时,一般情况我们会通过自定义数据分片策略来决定每个应用应该去处理哪些数据。在这里引入另一个新思路,数据管理表。数据管理表是一个维护数据相互依赖性的管理模型,它记录者每行可共享资源以及正在被使用与否,如此批处理程序就能知道自己可以访问哪些共享资源。
解决了数据访问方面的问题,可以配合使用多线程来实现对数据的并行处理。大型机环境下CPU资源丰富,能够保证每个处理任务都能分配到足够的cpu时间片,并行作业处理技术也相对成熟。
再者,我们需要考虑并行处理资源的高可用和负载均衡问题。这些可能用到的技术有连接池、缓存等等。最后需要注意,超大的并行处理中数据管理表本身可能会成为资源瓶颈。