本文共 8080 字,大约阅读时间需要 26 分钟。
最大特点-分布式定时任务:
Elastic-job主要作用:
进阶详细资料:
简单使用:使用的是SpringBoot集成
com.dangdang elastic-job-lite-core 2.1.5
package com.miracle.javasimplejob.job;import com.dangdang.ddframe.job.api.ShardingContext;import com.dangdang.ddframe.job.api.dataflow.DataflowJob;import com.miracle.javasimplejob.model.Order;import java.time.LocalDateTime;import java.util.ArrayList;import java.util.List;import java.util.stream.Collectors;/** * @author Miracle * @date 2019/6/27 22:14 */public class MyDataflowJob implements DataflowJob{ private List orders = new ArrayList<>(); { for (int i = 0; i < 100; i++){ Order order = new Order(); order.setOrderId(i + 1); order.setStatus(0); orders.add(order); } } /** * 抓取数据 * @param shardingContext * @return */ @Override public List fetchData(ShardingContext shardingContext) { // 订单号 % 分片总数 == 当前分片项 List orderList = orders.stream().filter(o -> o.getStatus() == 0) .filter(o -> o.getOrderId() % shardingContext.getShardingTotalCount() == shardingContext.getShardingItem()) .collect(Collectors.toList()); List subList = null; if (orderList.size() > 0){ subList = orderList.subList(0,10); } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(LocalDateTime.now() + ",我是分片项:" + shardingContext.getShardingItem()+",我抓取的数据是:"+subList); return subList; } /** * 处理数据 * @param shardingContext * @param list */ @Override public void processData(ShardingContext shardingContext, List list){ list.forEach(o -> o.setStatus(1)); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(LocalDateTime.now() + ",我是分片项:" + shardingContext.getShardingItem()+",我正在处理中"); }}
package com.miracle.javasimplejob.job;import com.dangdang.ddframe.job.api.ShardingContext;import com.dangdang.ddframe.job.api.simple.SimpleJob;/** * @author Miracle * @date 2019/6/25 21:42 */public class MySimpleJob implements SimpleJob { @Override public void execute(ShardingContext shardingContext) { System.out.println("我是分片项:" + shardingContext.getShardingItem() + ",总分片项:" + shardingContext.getShardingTotalCount()); }}
package com.miracle.javasimplejob;import com.dangdang.ddframe.job.config.JobCoreConfiguration;import com.dangdang.ddframe.job.config.JobTypeConfiguration;import com.dangdang.ddframe.job.config.dataflow.DataflowJobConfiguration;import com.dangdang.ddframe.job.config.script.ScriptJobConfiguration;import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;import com.dangdang.ddframe.job.lite.api.JobScheduler;import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;import com.miracle.javasimplejob.job.MyDataflowJob;import com.miracle.javasimplejob.job.MySimpleJob;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class JavaSimpleJobApplication { public static void main(String[] args) { SpringApplication.run(JavaSimpleJobApplication.class, args); new JobScheduler(zkCenter(),configurationScript()).init(); } /** * Zookeeper注册中心 * @return */ public static CoordinatorRegistryCenter zkCenter(){ // 配置Zookeeper(地址(多个地址使用逗号隔开),命名空间) ZookeeperConfiguration zc = new ZookeeperConfiguration("localhost:8074","java-simple-job"); CoordinatorRegistryCenter crc = new ZookeeperRegistryCenter(zc); // 注册中心初始化 crc.init(); return crc; } /** * job配置 * @return */ public static LiteJobConfiguration configuration(){ // job核心配置 参数解析(任务名称,表达式“0/10 从零开始,每十秒执行一次,后面的星号分别对应分时年月星期”,分片总数) JobCoreConfiguration jcc = JobCoreConfiguration.newBuilder("mySimpleJob","0/10 * * * * ?",2).build(); // job类型配置 JobTypeConfiguration jtc = new SimpleJobConfiguration(jcc, MySimpleJob.class.getCanonicalName()); // job根的配置(LiteJobConfiguration) LiteJobConfiguration ljc = LiteJobConfiguration.newBuilder(jtc).overwrite(true).build(); return ljc; } /** * 流式任务配置 * @return */ public static LiteJobConfiguration configurationDataflow(){ // job核心配置 参数解析(任务名称,表达式“0/10 从零开始,每十秒执行一次,后面的星号分别对应分时年月星期”,分片总数) JobCoreConfiguration jcc = JobCoreConfiguration.newBuilder("myDataflowJob","0/10 * * * * ?",2).build(); // job类型配置,第三参数:是否开启流式任务 JobTypeConfiguration jtc = new DataflowJobConfiguration(jcc, MyDataflowJob.class.getCanonicalName(), true); // job根的配置(LiteJobConfiguration) LiteJobConfiguration ljc = LiteJobConfiguration .newBuilder(jtc) // 是否覆盖zookeeper上原来的配置,如表达式、任务名字 .overwrite(true) .build(); return ljc; } /** * job配置 * @return */ public static LiteJobConfiguration configurationScript(){ // job核心配置 参数解析(任务名称,表达式“0/10 从零开始,每十秒执行一次,后面的星号分别对应分时年月星期”,分片总数) JobCoreConfiguration jcc = JobCoreConfiguration.newBuilder("myScriptJob","0/10 * * * * ?",2).build(); // job类型配置,第三参数:是否开启流式任务 JobTypeConfiguration jtc = new ScriptJobConfiguration(jcc, "F:/test.cmd"); // job根的配置(LiteJobConfiguration) LiteJobConfiguration ljc = LiteJobConfiguration .newBuilder(jtc) // 是否覆盖zookeeper上原来的配置,如表达式、任务名字 .overwrite(true) .build(); return ljc; }}
最大特点-动态添加任务:
进阶详细资料:
简单使用:
org.quartz-scheduler quartz 2.3.1
# 业务调度器的实例名称org.quartz.scheduler.instanceName=myScheduler# 工作线程总数org.quartz.threadPool.threadCount=1# 设置job存储机制为内存存储org.quartz.jobStore.class=org.quartz.simpl.RAMJobStore# 设置超时超过多长时间时触发Misfire机制org.quartz.jobStore.misfireThreshold=1000
/** * 计时任务 * @author Miracle * @date 2019/7/6 21:04 */public class MyJob implements Job { @Override public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { LocalTime localTime = LocalTime.now(); System.out.println("我正在执行!" + localTime.toString()); }}
public class QuartzDemo { public static void main(String[] args) throws SchedulerException, InterruptedException { Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler(); scheduler.start(); // 配置任务处理类 JobDetail jobDetail = JobBuilder.newJob(MyJob.class) .withIdentity("jobDetail1", "group1") .build(); // 配置任务信息 Trigger trigger = TriggerBuilder.newTrigger() .withSchedule(SimpleScheduleBuilder.simpleSchedule() .withIntervalInSeconds(10) .repeatForever() ).build(); // 将配置引入计时器中 scheduler.scheduleJob(jobDetail, trigger); Thread.sleep(60000); // 关闭计时器 scheduler.shutdown(); }}
转载地址:http://wphd.baihongyu.com/