博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Java工具:定时器工具入门理解
阅读量:149 次
发布时间:2019-02-28

本文共 8080 字,大约阅读时间需要 26 分钟。

问题:定时器的应用场景有哪些?

  • 定时进行数据备份
  • 对系统应用进行心跳监控
  • 定时同步信息到持久化数据库
  • 定时检测订单的支付状态

问题:定时器开发工具有哪些?

  • Elastic-job:分布式定时任务
    • 不支持动态添加任务
    • 分布式
    • 支持集群
  • Quartz:企业级定时任务
    • 支持动态添加任务
    • 伪分布式
    • 支持集群

问题:如何简单理解Elastic-job?

最大特点-分布式定时任务:

  • 概念:将一个任务拆分成多个独立的任务项,由分布式的服务器分别执行某一个或几个分片项。
  • 例子:统计100个数值,两台服务器各执行计算50个数值,最后进行数据合并。

Elastic-job主要作用:

  • 不直接提供数据处理能力
  • 将分片项分配至运行中的各个服务器
  • 开发者自行处理分片项与数据的关系
  • 宕机,会进行作业自动转移

进阶详细资料:

  • 后续会针对Elastic-job为主题出新的博客,后补链接

简单使用:使用的是SpringBoot集成

  • 代码下载地址:https://download.csdn.net/download/k295330167/18327938
  • 引入依赖:
com.dangdang
elastic-job-lite-core
2.1.5
  • dataflowJob定义:
package com.miracle.javasimplejob.job;import com.dangdang.ddframe.job.api.ShardingContext;import com.dangdang.ddframe.job.api.dataflow.DataflowJob;import com.miracle.javasimplejob.model.Order;import java.time.LocalDateTime;import java.util.ArrayList;import java.util.List;import java.util.stream.Collectors;/** * @author Miracle * @date 2019/6/27 22:14 */public class MyDataflowJob implements DataflowJob
{
private List
orders = new ArrayList<>(); {
for (int i = 0; i < 100; i++){
Order order = new Order(); order.setOrderId(i + 1); order.setStatus(0); orders.add(order); } } /** * 抓取数据 * @param shardingContext * @return */ @Override public List
fetchData(ShardingContext shardingContext) {
// 订单号 % 分片总数 == 当前分片项 List
orderList = orders.stream().filter(o -> o.getStatus() == 0) .filter(o -> o.getOrderId() % shardingContext.getShardingTotalCount() == shardingContext.getShardingItem()) .collect(Collectors.toList()); List
subList = null; if (orderList.size() > 0){
subList = orderList.subList(0,10); } try {
Thread.sleep(1000); } catch (InterruptedException e) {
e.printStackTrace(); } System.out.println(LocalDateTime.now() + ",我是分片项:" + shardingContext.getShardingItem()+",我抓取的数据是:"+subList); return subList; } /** * 处理数据 * @param shardingContext * @param list */ @Override public void processData(ShardingContext shardingContext, List
list){ list.forEach(o -> o.setStatus(1)); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(LocalDateTime.now() + ",我是分片项:" + shardingContext.getShardingItem()+",我正在处理中"); }}
  • job定义:
package com.miracle.javasimplejob.job;import com.dangdang.ddframe.job.api.ShardingContext;import com.dangdang.ddframe.job.api.simple.SimpleJob;/** * @author Miracle * @date 2019/6/25 21:42 */public class MySimpleJob implements SimpleJob {
@Override public void execute(ShardingContext shardingContext) {
System.out.println("我是分片项:" + shardingContext.getShardingItem() + ",总分片项:" + shardingContext.getShardingTotalCount()); }}
  • 主线程:
package com.miracle.javasimplejob;import com.dangdang.ddframe.job.config.JobCoreConfiguration;import com.dangdang.ddframe.job.config.JobTypeConfiguration;import com.dangdang.ddframe.job.config.dataflow.DataflowJobConfiguration;import com.dangdang.ddframe.job.config.script.ScriptJobConfiguration;import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;import com.dangdang.ddframe.job.lite.api.JobScheduler;import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;import com.miracle.javasimplejob.job.MyDataflowJob;import com.miracle.javasimplejob.job.MySimpleJob;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class JavaSimpleJobApplication {
public static void main(String[] args) {
SpringApplication.run(JavaSimpleJobApplication.class, args); new JobScheduler(zkCenter(),configurationScript()).init(); } /** * Zookeeper注册中心 * @return */ public static CoordinatorRegistryCenter zkCenter(){
// 配置Zookeeper(地址(多个地址使用逗号隔开),命名空间) ZookeeperConfiguration zc = new ZookeeperConfiguration("localhost:8074","java-simple-job"); CoordinatorRegistryCenter crc = new ZookeeperRegistryCenter(zc); // 注册中心初始化 crc.init(); return crc; } /** * job配置 * @return */ public static LiteJobConfiguration configuration(){
// job核心配置 参数解析(任务名称,表达式“0/10 从零开始,每十秒执行一次,后面的星号分别对应分时年月星期”,分片总数) JobCoreConfiguration jcc = JobCoreConfiguration.newBuilder("mySimpleJob","0/10 * * * * ?",2).build(); // job类型配置 JobTypeConfiguration jtc = new SimpleJobConfiguration(jcc, MySimpleJob.class.getCanonicalName()); // job根的配置(LiteJobConfiguration) LiteJobConfiguration ljc = LiteJobConfiguration.newBuilder(jtc).overwrite(true).build(); return ljc; } /** * 流式任务配置 * @return */ public static LiteJobConfiguration configurationDataflow(){
// job核心配置 参数解析(任务名称,表达式“0/10 从零开始,每十秒执行一次,后面的星号分别对应分时年月星期”,分片总数) JobCoreConfiguration jcc = JobCoreConfiguration.newBuilder("myDataflowJob","0/10 * * * * ?",2).build(); // job类型配置,第三参数:是否开启流式任务 JobTypeConfiguration jtc = new DataflowJobConfiguration(jcc, MyDataflowJob.class.getCanonicalName(), true); // job根的配置(LiteJobConfiguration) LiteJobConfiguration ljc = LiteJobConfiguration .newBuilder(jtc) // 是否覆盖zookeeper上原来的配置,如表达式、任务名字 .overwrite(true) .build(); return ljc; } /** * job配置 * @return */ public static LiteJobConfiguration configurationScript(){
// job核心配置 参数解析(任务名称,表达式“0/10 从零开始,每十秒执行一次,后面的星号分别对应分时年月星期”,分片总数) JobCoreConfiguration jcc = JobCoreConfiguration.newBuilder("myScriptJob","0/10 * * * * ?",2).build(); // job类型配置,第三参数:是否开启流式任务 JobTypeConfiguration jtc = new ScriptJobConfiguration(jcc, "F:/test.cmd"); // job根的配置(LiteJobConfiguration) LiteJobConfiguration ljc = LiteJobConfiguration .newBuilder(jtc) // 是否覆盖zookeeper上原来的配置,如表达式、任务名字 .overwrite(true) .build(); return ljc; }}

问题:如何简单理解Quartz?

最大特点-动态添加任务:

  • 在程序中定义后相应规则可以动态添加定时任务,非常适合充当定时器工具

进阶详细资料:

  • 后续会针对Quartz为主题出新的博客,后补链接

简单使用:

  • 引入依赖:
org.quartz-scheduler
quartz
2.3.1
  • 配置quartz.properties:
# 业务调度器的实例名称org.quartz.scheduler.instanceName=myScheduler# 工作线程总数org.quartz.threadPool.threadCount=1# 设置job存储机制为内存存储org.quartz.jobStore.class=org.quartz.simpl.RAMJobStore# 设置超时超过多长时间时触发Misfire机制org.quartz.jobStore.misfireThreshold=1000
  • 定时器任务执行实现:
/** * 计时任务 * @author Miracle * @date 2019/7/6 21:04 */public class MyJob implements Job {
@Override public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
LocalTime localTime = LocalTime.now(); System.out.println("我正在执行!" + localTime.toString()); }}
  • 主线程:
public class QuartzDemo {
public static void main(String[] args) throws SchedulerException, InterruptedException {
Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler(); scheduler.start(); // 配置任务处理类 JobDetail jobDetail = JobBuilder.newJob(MyJob.class) .withIdentity("jobDetail1", "group1") .build(); // 配置任务信息 Trigger trigger = TriggerBuilder.newTrigger() .withSchedule(SimpleScheduleBuilder.simpleSchedule() .withIntervalInSeconds(10) .repeatForever() ).build(); // 将配置引入计时器中 scheduler.scheduleJob(jobDetail, trigger); Thread.sleep(60000); // 关闭计时器 scheduler.shutdown(); }}

转载地址:http://wphd.baihongyu.com/

你可能感兴趣的文章