阿里开源分布式任务调度系统LTS使用分享

Owen Jia 2019年10月21日 869次浏览

章节速览

  1. 背景介绍
  2. 环境部署
  3. LTS架构原理&代码样例
  4. 个人心得经验

一、背景介绍

很多公司应该都会遇到job服务部署执行时:定时、并发、分布式这些问题。有的人就是只跑一个job服务,这样会简单很多但是单节点宕机情况下就完了;对于分布式task就更别想了,集成个spring-cron包价格定时就跑起job基本可以处理很多简单问题;聪明点的人会集合MQ来执行task,这样就分布式了嘛而且压力什么的也分摊了不少。
作为一个优秀的程序员,这种程度的懒还是不够的,我们需要彻底解决繁琐的重复工作。所以我们找到了“度娘”,咨询知道世界很大的,很多牛人已经发明了有不少轮子,如:opencron LTS XXL-JOB Elastic-Job 等。
考虑到公司前辈使用LTS,所以我们就从这里入门介绍;但其实我更看好其他几个产品。

二、环境部署

LTS既支持单节点又支持多节点部署,我们这里只介绍单节点配置

使用说明和架构原理什么的推荐你看官网,不要去看其他乱七八糟的文章(LTS) 要是看不懂官网文档,建议你就放弃吧

教你怎么本地构建LTS

  1. github下载源码 https://github.com/ltsopensource/light-task-scheduler.git ,如下图: 本地目录
  2. 确保本地的安装maven环境 http://maven.apache.org/install.html
  3. 运行maven命令:mvn install | mvn compile package,会生成dist目录,目录里面的lts-1.7.2-SNAPSHOT-bin这个文件夹就是部署文件夹,可以zip包部署的服务器上面。
    dist目录如下: dist目录
  4. 重点看conf目录的配置文件:jobtracker.cfg&tasktracker.cfg这两个文件,如图:
    jobtracker
    tasktracker
    还有monitor和admin直接看文档就ok
  5. 创建mysql数据:lts,也建个lts专用mysql帐号密码,用来创建表;lts启动会自动创建需要的表;
  6. 确保zookeeper环境ok,依次启动JobTracker<S-Admin
  7. 访问服务端:http://localhost:8081/index.htm, 如下图:
    管理员账户登录页面
    输入配置文件中的账户和密码,看到如下图表示启动ok;
    主界面

三、架构原理介绍

详细原来github上面的描述感觉挺好的了,推荐你去看官网(官网文档)更靠谱;

两个图很好帮助你整体理解LTS,在后续的积累中你会一点点明白架构图中的意思;
软件架构设计图
job流程图

需要掌握LTS重点概念

节点功能

  • jobclient: 负责提交任务和接收执行反馈,一般直接写在业务系统里面;可以参考我目前的写法:
package com.isesol.rent.platform.job;

import com.github.ltsopensource.core.domain.Job;
import com.github.ltsopensource.jobclient.JobClient;
import com.isesol.api.service.BaseService;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.Date;
import java.util.Map;

/**
 * Created by owen jiaw
 * on 2018/9/5.
 */
@Service(value = "jobUtils")
public class JobUtils extends BaseService {

    @Resource private JobClient jobClient;
    @Value("${lts.nodeGroup.taskTracker}")
    String nodeGroupTaskTracker;

    /**
     * 提交任务
     * @param taskId
     * @param params
     */
    public void submitJob(String taskId, Map<String, String> params) {
        Job job = new Job();
        job.setTaskTrackerNodeGroup(nodeGroupTaskTracker);
        job.setTaskId(taskId);
        job.setMaxRetryTimes(3);
        for (Map.Entry<String, String> param : params.entrySet()) {
            job.setParam(param.getKey(), String.valueOf(param.getValue()));
        }
        jobClient.submitJob(job);
    }

    /**
     * 提交定时任务
     * @param taskId
     * @param params
     * @param triggerDate
     */
    public void submitTriggerJob(String taskId, Map<String, String> params, Date triggerDate) {
        Job job = new Job();
        job.setTriggerDate(triggerDate);
        job.setTaskTrackerNodeGroup(nodeGroupTaskTracker);
        job.setTaskId(taskId);
        job.setMaxRetryTimes(3);
        for (Map.Entry<String, String> param : params.entrySet()) {
            job.setParam(param.getKey(), String.valueOf(param.getValue()));
        }
        jobClient.submitJob(job);
    }

}

  • jobtracker: 负责执行任务和分配任务,任务的调度;
  • tasktracker: 具体执行任务,执行返回给jobtracker,在反馈给jobclient;一般和业务系统集成在一起。参考官网推荐的一个JVM一个tasktracker执行多个task方式;

项目集成,可以参考我的用法:spring方式,有些推荐spring-boot

package com.isesol.rent.platform.job;

import com.github.ltsopensource.jobclient.JobClient;
import com.github.ltsopensource.spring.JobClientFactoryBean;
import com.github.ltsopensource.spring.TaskTrackerAnnotationFactoryBean;
import com.github.ltsopensource.tasktracker.TaskTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Properties;

/**
 * LTS相关配置
 * @author: Owen Jia
 * @time: 2018/9/7 17:41
 */
@Configuration
public class LTSConfig implements ApplicationContextAware {
    static Logger logger = LoggerFactory.getLogger(LTSConfig.class);

    {
        logger.info("----------- 加载LTS配置类 -----------");
    }

    @Value(value = "${lts.zookeeperAddress}")
    String zookeeperAddress;
    @Value(value = "${lts.clusterName}")
    String clusterName;
    @Value("${lts.nodeGroup.taskTracker}")
    String nodeGroupTaskTracker;
    @Value("${lts.nodeGroup.jobClient}")
    String nodeGroupJobClient;
    @Value("${lts.dataPath}")
    static String dataPath;

    @Autowired
    JobCompletedHandlerImpl jobCompletedHandler;

    private ApplicationContext applicationContext;
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    @Bean(name = "jobClient",initMethod = "start")
    public JobClient getJobClient() throws Exception {
        JobClientFactoryBean factoryBean = new JobClientFactoryBean();
        factoryBean.setClusterName(clusterName);
        factoryBean.setRegistryAddress(zookeeperAddress);
        factoryBean.setNodeGroup(nodeGroupJobClient);
        factoryBean.setDataPath(dataPath);
        factoryBean.setJobCompletedHandler(jobCompletedHandler);
        Properties configs = new Properties();
        configs.setProperty("job.fail.store", "leveldb");
        factoryBean.setConfigs(configs);
        factoryBean.afterPropertiesSet();
        return factoryBean.getObject();
    }

    @Bean(name = "taskTracker",initMethod = "start")
    public TaskTracker getTaskTracker() throws Exception {
        TaskTrackerAnnotationFactoryBean factoryBean = new TaskTrackerAnnotationFactoryBean();
        factoryBean.setApplicationContext(applicationContext);
        factoryBean.setClusterName(clusterName);
        factoryBean.setJobRunnerClass(JobRunnerDispatcher.class);
        factoryBean.setNodeGroup(nodeGroupTaskTracker);
        factoryBean.setBizLoggerLevel("INFO");
        factoryBean.setRegistryAddress(zookeeperAddress);
        factoryBean.setDataPath(dataPath);
        factoryBean.setWorkThreads(15);
        Properties configs = new Properties();
        configs.setProperty("job.fail.store", "leveldb");
        configs.setProperty("job.retry.interval.millis", "60000*5");
        factoryBean.setConfigs(configs);
        factoryBean.afterPropertiesSet();
        return factoryBean.getObject();
    }
}

四、个人心得建议

担心LTS的后续版本维护更新比较慢了,代码OPEN后感觉没多少人在持续优化提交code; 我也加入他们推荐的QQ群,想着未来也能贡献点力量吧,希望群策群力提升它

有一些痛点分享给大家:

  • 痛点一
    lts_job_log_po 这张表的数据量非常巨大,是job的运行日志表,索引和数据量大大致存储会一直变大;建议各一段时间就清除掉数据,在admin里面的日志查询对历史的感觉作用不是很大。真正出了问题还是需要业务日志去分析的,靠个job对错没啥鸟用;
  • 痛点二
    lts-admin这个有点用又用着难受的工具建议公司实力强的重构适合公司业务需要逻辑,有太多不稳定和经常性错误;另外默认jetty服务器在linux上启动路径写死也经常出现个问题;
  • 痛点三
    LTS这东西文档太少,搞这东西不懂点技术你还真不一定搞的定

多节点部署

  • LTS的多节点部署方面我还在学习,很多概念没理解透,希望有共同学习的相互分享知识

写的不好,欢迎指正