Elastic-Job介绍
Elastic-Job是当当开源分布式调度解决方案,功能非常丰富,支持任务分片,能充分利用资源,这和之前介绍Quartz所不能做到的地方。Elastic-Job由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-CLoud组成,一般来说我们使用Elastic-Job-Lite就能够满足我们的需求,Elastic-Job的官方教程写的很详细,参见,本片文章主要介绍Elastic-Job_lite,本文做使用的Elastic-Job版本为2.1.5
Elastic-Job特点
Elastic-Job具有分布式调度,作业高可用、任务分片以及定制化流程等特点,具体说明如下:
-
分布式调度
Elastic-Job重写了Quartz基于数据库的分布式功能,改用Zookeeper实现注册中心,其作用仅用于作业注册和监控存储,而主作业点仅用于处理分片和清理等 -
作业高可用
Elastic-Job提供了非常安全的执行作业方式,如果将分片总数设置为1,并使用大于1台的服务器执行作业,则作业会按照以1主n从的方式执行,一旦执行作业的服务器崩溃,等待执行的服务器将在下次作业启动时自动替补执行,后面的示例会展示 -
任务分片执行
Elastic-Job提供了更灵活且更强大的作业吞吐量方式,就是任务的分布式执行,它将一个任务拆分为多个独立的任务项,然后由分布式的服务器分别执行某一个或几个分片项
在实际使用过程中,我们通常将分片项设置为大于服务器的数量,最好是大于服务器倍数的数量。
例如,有3台服务器,被分成10片,那分片的结果可能为:服务器A=0,1,2,服务器B=3,4,5,服务器C=6,7,8,9,若服务器C崩溃了,则分片项的分配结果为:服务器A=0,1,2,3,4,服务器B=5,6,7,8,9 -
定制化流程任务
Elastic-Job提供了简单类型的和数据流类型的两种模式,简单类型的实现SimpleJob接口,数据流类型实现DataflowJob接口,后面会介绍
数据流又分为高吞吐处理模式和顺序性处理模式,其中高吞吐处理模式可以开启足够多的线程快速的处理数据,而顺序性处理模式将每个分片项分配到一个独立的线程上,用于保证同一分片的顺序性
Elastic-Job实现
首先我们分两种方式来实现,先介绍第一种
1. 通过API的方式实现
- maven引入相关jar包
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>5.0.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.0.2.RELEASE</version>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>2.1.5</version>
</dependency>
- 定义任务类
public class HelloElasticJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
System.out.println(String.format("---Thread ID: %s,任务总片数:%s,当前分片项:%s---",
Thread.currentThread().getId(), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem()));
}
- 使用API接口配置作业信息
/**
* 配置Zookeeper配置中心
*/
private static CoordinatorRegistryCenter createRegistryCenter() {
CoordinatorRegistryCenter coordinatorRegistryCenter =
new ZookeeperRegistryCenter(new ZookeeperConfiguration("127.0.0.1:2181","helloElasticJob"));
coordinatorRegistryCenter.init();
return coordinatorRegistryCenter;
}
/**
* 创建作业配置
* @return
*/
private static LiteJobConfiguration createJobConfiguration() {
//定义作业的核心配置
JobCoreConfiguration coreConfiguration = JobCoreConfiguration.newBuilder("helloElasticJob", "0/10 * * * * ?", 1).build();
//定义simple类型的配置
SimpleJobConfiguration jobConfiguration = new SimpleJobConfiguration(coreConfiguration, HelloElasticJob.class.getCanonicalName());
//定义Lite作业配置
LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(jobConfiguration).build();
return liteJobConfiguration;
}
- 创建测试类
public class App {
public static void main(String[] args) {
new JobScheduler(createRegistryCenter(),createJobConfiguration()).init();
}
/**
* 配置Zookeeper配置中心
*/
private static CoordinatorRegistryCenter createRegistryCenter() {
CoordinatorRegistryCenter coordinatorRegistryCenter =
new ZookeeperRegistryCenter(new ZookeeperConfiguration("127.0.0.1:2181","helloElasticJob"));
coordinatorRegistryCenter.init();
return coordinatorRegistryCenter;
}
/**
* 创建作业配置
* @return
*/
private static LiteJobConfiguration createJobConfiguration() {
//定义作业的核心配置
JobCoreConfiguration coreConfiguration = JobCoreConfiguration.newBuilder("helloElasticJob", "0/10 * * * * ?", 1).build();
//定义simple类型的配置
SimpleJobConfiguration jobConfiguration = new SimpleJobConfiguration(coreConfiguration, HelloElasticJob.class.getCanonicalName());
//定义Lite作业配置
LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(jobConfiguration).build();
return liteJobConfiguration;
}
}
在启动Zookeeper后,运行就能看到效果
---Thread ID: 18,任务总片数:1,当前分片项:0---
---Thread ID: 18,任务总片数:1,当前分片项:0---
---Thread ID: 18,任务总片数:1,当前分片项:0---
---Thread ID: 18,任务总片数:1,当前分片项:0---
---Thread ID: 18,任务总片数:1,当前分片项:0---
---Thread ID: 18,任务总片数:1,当前分片项:0---
---Thread ID: 18,任务总片数:1,当前分片项:0---
---Thread ID: 18,任务总片数:1,当前分片项:0---
---Thread ID: 18,任务总片数:1,当前分片项:0---
---Thread ID: 18,任务总片数:1,当前分片项:0---
---Thread ID: 18,任务总片数:1,当前分片项:0---
2. 通过与Spring集成方式实现
- 引入Mavne配置文件,在原pom文件中只需引入下面的依赖即可
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>2.1.5</version>
</dependency>
- 在spring配置文件中配置作业spring-elasticJob.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
xmlns:job="http://www.dangdang.com/schema/ddframe/job"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.dangdang.com/schema/ddframe/reg
http://www.dangdang.com/schema/ddframe/reg/reg.xsd
http://www.dangdang.com/schema/ddframe/job
http://www.dangdang.com/schema/ddframe/job/job.xsd">
<!--配置作业注册中心-->
<reg:zookeeper id="regCenter" server-lists="127.0.0.1:2181" namespace="hello" base-sleep-time-milliseconds="3000" max-retries="3"/>
<!--配置作业-->
<job:simple id="helloElasticJob" class="top.starlin.elasticJobDemo.HelloElasticJob"
registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3"/>
</beans>
- 启动作业
public class App {
public static void main(String[] args) throws IOException {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"spring-elasticJob.xml"});
context.start();
System.in.read();
}
}
单台启动的效果如下:
---Thread ID: 28,任务总片数:3,当前分片项:1---
---Thread ID: 27,任务总片数:3,当前分片项:0---
---Thread ID: 29,任务总片数:3,当前分片项:2---
---Thread ID: 30,任务总片数:3,当前分片项:0---
---Thread ID: 31,任务总片数:3,当前分片项:1---
---Thread ID: 32,任务总片数:3,当前分片项:2---
为了模拟分布式环境,我将此工程copy一份后,两台同时启动后的效果,当然我们也可以利用虚拟机来部署
第一个工程,启动结果
---Thread ID: 18,任务总片数:3,当前分片项:1---
---Thread ID: 18,任务总片数:3,当前分片项:1---
---Thread ID: 18,任务总片数:3,当前分片项:1---
---Thread ID: 18,任务总片数:3,当前分片项:1---
第二个工程,启动结果:
---Thread ID: 28,任务总片数:3,当前分片项:2---
---Thread ID: 27,任务总片数:3,当前分片项:0---
---Thread ID: 29,任务总片数:3,当前分片项:0---
---Thread ID: 30,任务总片数:3,当前分片项:2---
---Thread ID: 32,任务总片数:3,当前分片项:2---
---Thread ID: 31,任务总片数:3,当前分片项:0---
---Thread ID: 34,任务总片数:3,当前分片项:2---
---Thread ID: 33,任务总片数:3,当前分片项:0---
我们能够发现,一共3个任务片数,分了2台机器来执行
如果将正在启动中的一台关闭(模拟服务器崩溃的情况),会怎样了
结果显示会自动替补执行
---Thread ID: 18,任务总片数:3,当前分片项:1---
---Thread ID: 18,任务总片数:3,当前分片项:1---
---Thread ID: 18,任务总片数:3,当前分片项:1---
---Thread ID: 27,任务总片数:3,当前分片项:0---
---Thread ID: 28,任务总片数:3,当前分片项:1---
---Thread ID: 29,任务总片数:3,当前分片项:2---
Elastic-Job作业类型
Elastic-Job作业类型分为3种,包括我们前面提到的SimpleJob,DataFlowJob这2种,还有一种为Script作业类型
-
Simple作业类型
简单实现的未经过任何封装的类型作业,需要实现SimpleJob接口,该接口提供了一个excute方法用于作业调度,它与原始的Quartz原生接口类似,但提供了可伸缩任务和任务分片功能 -
DataFlow作业类型
用于处理数据流,需要实现DataFlowJob接口,该接口提供了2个方法,分别用于抓取(fetchData)数据和处理(processData)数据,可以通过DataflowJobConfiguration配置是否流式处理。
需要注意的是,一旦开启的流式处理,只有fetchData方法的返回值为null或者集合的长度为空时,作业才会停止,否则作业会一直执行下去,非流式的处理则只会在每次作业执行的过程中执行一次fetchData和processData方法
如果采用DataFlow类型作业处理,建议processData在处理数据后更新其状态,避免fetchData再次抓取到重复的数据,使得作业永远执行下去
伪代码如下:
public class HelloJob implements DataflowJob {
@Override
public List<User> fetchData(ShardingContext shardingContext) {
List<User> users = null;
/** get users from database **/
return users;
}
@Override
public void processData(ShardingContext shardingContext, List<User> list) {
for (User user : data) {
System.out.println("");
user.setStatus(1);
/**
* update user
*/
}
}
}
- Script作业类型
为脚本作业类型,支持Shell,Python,Perl等所有类型的脚本,这里不多介绍,平时用的也少,感兴趣的童鞋可以参考官网
其他
上述介绍的是最精简常用的功能。elastic-job的功能集还不止这些,比如像作业事件追踪、任务监听等,另外,elastic-job-lite-console作为一个独立的运维平台还提供了用来查询和操作任务的web页面(这里就不介绍了)