博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Azkaban任务流编写
阅读量:6515 次
发布时间:2019-06-24

本文共 6014 字,大约阅读时间需要 20 分钟。

在Azkaban中,一个project包含一个或多个flows,一个flow包含多个job。job是你想在azkaban中运行的一个进程,可以是Command,也可以是一个Hadoop任务。当然,如果你安装相关插件,也可以运行插件。一个job可以依赖于另一个job,这种多个job和它们的依赖组成的图表叫做flow。本文介绍如何在Azkaban上编写四类任务流:Command、Hive、Java、Hadoop。

1、Command任务编写

这里将模拟一个数据从采集到上传最后入库的整个过程的工作流。涉及的job如下:

create_dir.job:创建对应的目录
get_data1.job:获取数据1
get_data2.job:获取数据2
upload_to_hdfs.job:数据上传到hdfs
insert_to_hive.job:从hdfs入库到hive中
    • create_dir.job
    • type=commandcommand=echo "create directory before get data"
    • get_data1.job
      type=commandcommand=echo "get data from logserver"dependencies=create_dir

 

  • get_data2.job
    type=commandcommand=echo "get data from ftp"dependencies=create_dir
  • upload_to_hdfs.job
    type=commandcommand=echo "upload to hdfs"dependencies=get_data1,get_data2

完成后的目录如下

打包成demo.zip压缩包,并上传到Azkaban中,可以看到依赖图如下:

点击执行

在Job List里可以看到每个job的运行情况

点击Details可以看到每个job执行的日志

Job中的其他配置选项

 

  • 可以定义job依赖另一个flow
    type=flowflow.name=fisrt_flow
  • 可以配置多个command命令
    type=commandcommand=echo "hello"command.1=echo "world"command.2=echo "azkaban"
  • 可以配置job失败重启次数,及间隔时间,比如,上述ftp获取日志,我可以配置重试12次,每隔5分钟一次
    type=commandcommand=echo "retry test" retries=12#单位毫秒retry.backoff=300000

 

2、Hive任务编写

Hive任务的编写比较简单,在新的目录下新建hive.job文件,内容如下

#定义类型type=hive#定义执行HiveSQL的用户user.to.proxy=azkaban#固定值azk.hive.action=execute.queryhive.query.01=drop table words;hive.query.02=create table words (freq int, word string) row format delimited fields terminated by '\t' stored as textfile;hive.query.03=describe words;hive.query.04=load data local inpath "res/input" into table words;hive.query.05=select * from words limit 10;hive.query.06=select freq, count(1) as f2 from words group by freq sort by f2 desc limit 10;

以上第四条语句涉及到数据文件,需要在同级目录下新建res文件夹,然后新建input文件,内容如下

11	and10	the9	to9	in9	of9	is9	CLAUDIUS8	KING8	this8	we7	what7	us7	GUILDENSTERN6	And5	d4	ROSENCRANTZ3	a2	his1	QUEEN1	he

然后打包成zip文件即可上传到azkaban中运行

3、Java任务编写

Java任务比较简单,只需要在类里提供一个run方法即可,如果需要设定参数,着在构造方法中指定Props类,然后在job文件里配置好参数。

Java类如下

package com.dataeye.java;import org.apache.log4j.Logger; import azkaban.utils.Props; public class JavaMain { private static final Logger logger = Logger.getLogger(JavaMain.class); private final int fileRows; private final int fileLine; public JavaMain(String name, Props props) throws Exception { this.fileRows = props.getInt("file.rows"); this.fileLine = props.getInt("file.line"); } public void run() throws Exception { logger.info(" ### this is JavaMain method ###"); logger.info("fileRows value is ==> " + fileRows); logger.info("fileLine value is ==> " + fileLine); } }

java.job文件如下

type=java#指定类的全路径job.class=com.dataeye.java.JavaMain#指定执行jar包的路径classpath=lib/*#用户参数1file.rows=10#用户参数2file.line=50

新建目录,把java.job拷贝到该目录下,然后新建lib文件夹,把以上java类打包成jar文件,放入lib目录下,打包成zip文件,上传到azkaban中。执行成功后的日志如下

31-08-2016 14:41:15 CST simple INFO - INFO Running job simple31-08-2016 14:41:15 CST simple INFO - INFO Class name com.dataeye.java.JavaMain31-08-2016 14:41:15 CST simple INFO - INFO Constructor found public com.dataeye.java.JavaMain(java.lang.String,azkaban.utils.Props) throws java.lang.Exception31-08-2016 14:41:15 CST simple INFO - INFO Invoking method run31-08-2016 14:41:15 CST simple INFO - INFO Proxy check failed, not proxying run.31-08-2016 14:41:15 CST simple INFO - INFO  ### this is JavaMain method ###31-08-2016 14:41:15 CST simple INFO - INFO fileRows value is ==> 1031-08-2016 14:41:15 CST simple INFO - INFO fileLine value is ==> 5031-08-2016 14:41:15 CST simple INFO - INFO Apparently there isn't a method[getJobGeneratedProperties] on object[com.dataeye.java.JavaMain@591f989e], using empty Props object instead.31-08-2016 14:41:15 CST simple INFO - INFO Outputting generated properties to /home/hadoop/azkaban/azkaban-solo-server-3.0.0/executions/339/simple_output_6034902760752438337_tmp31-08-2016 14:41:15 CST simple INFO - Process completed successfully in 0 seconds.31-08-2016 14:41:15 CST simple INFO - Finishing job simple attempt: 0 at 1472625675501 with status SUCCEEDED

日志中已经打印出run方法中的参数值。

4、Hadoop任务编写

Hadoop相对以上三种类型会复杂一些,需要注意的地方如下

  • 必须继承 AbstractHadoopJob 类
    public class WordCount extends AbstractHadoopJob
  • 必须要有构造方法,参数是String和Props,且要调用super方法
    public WordCount(String name, Props props) {	super(name, props);	//other code	}
  • 必须提供run方法,且在run方法的最后调用super.run();
    public void run() throws Exception{//other codesuper.run();}

下面提供一个 WordCount 任务的例子

WordCount.java类

package com.dataeye.mr;import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.log4j.Logger; import azkaban.jobtype.javautils.AbstractHadoopJob; import azkaban.utils.Props; import com.dataeye.mr.maper.WordCountMap; import com.dataeye.mr.reducer.WordCountReduce; public class WordCount extends AbstractHadoopJob { private static final Logger logger = Logger.getLogger(WordCount.class); private final String inputPath; private final String outputPath; private boolean forceOutputOverrite; public WordCount(String name, Props props) { super(name, props); this.inputPath = props.getString("input.path"); this.outputPath = props.getString("output.path"); this.forceOutputOverrite = props.getBoolean("force.output.overwrite", false); } public void run() throws Exception { logger.info(String.format("Hadoop job, class is %s", new Object[] { getClass().getSimpleName() })); JobConf jobconf = getJobConf(); jobconf.setJarByClass(WordCount.class); jobconf.setOutputKeyClass(Text.class); jobconf.setOutputValueClass(IntWritable.class); jobconf.setMapperClass(WordCountMap.class); jobconf.setReducerClass(WordCountReduce.class); jobconf.setInputFormat(TextInputFormat.class); jobconf.setOutputFormat(TextOutputFormat.class); FileInputFormat.addInputPath(jobconf, new Path(this.inputPath)); FileOutputFormat.setOutputPath(jobconf, new Path(this.outputPath)); if (this.forceOutputOverrite)

转载地址:http://dbofo.baihongyu.com/

你可能感兴趣的文章
html5常用属性text-shadow、vertical-align、background如何使用
查看>>
微软正式宣布Azure MongoDB Atlas免费方案
查看>>
Jessica Kerr:高绩效团队简史
查看>>
开发者需要知道的有关软件架构的五件事
查看>>
GitLab 9提供了子群组、部署面板和集成监控
查看>>
继爆款超级账本后,IBM再次推出新产品
查看>>
贝壳金控赵文乐:基于 Spring Cloud 的服务治理实践
查看>>
Pyspider框架 —— Python爬虫实战之爬取 V2EX 网站帖子
查看>>
区域生长算法 C++实现
查看>>
数据分析-从入门到崩溃
查看>>
web.xml 中的listener、 filter、servlet 加载顺序
查看>>
MyBatis原理简介和小试牛刀
查看>>
js部分基础
查看>>
Docker 常用基础命令
查看>>
脏读,幻读,不可重复读解释和例子
查看>>
Day02 数值运算&条件判断
查看>>
Tomcat指定(JDK路径)JAVA_HOME而不用环境变量
查看>>
Bluemix专属版本落地中国 开放物联网和认知计算能力
查看>>
汤姆大叔的6道javascript编程题题解
查看>>
【世界知名量子科学家加盟阿里】施尧耘出任阿里云量子技术首席科学家
查看>>