Hadoop0.2之前版本和之后版本在Job中有很大的改进,本次采用的版本是Hadoop1.1.2版本。
现在作为作业驱动器,可以直接继承Configured以及实现Tool,这种方式可以很便捷的获取启动时候命令行中输入的作业配置参数,常规的Job启动如下:
public class SortByHash extends Configured implements Tool{ public int run(String[] args) throws Exception { //这里面负责配置job属性 Configuration conf=getConf(); String[] paths=new GenericOptionsParser(conf, args).getRemainingArgs(); String tradeDir=paths[0]; String payDir=paths[1]; String joinDir=paths[2]; Job job=new Job(conf,"JoinJob"); job.setJarByClass(JoinMain.class); FileInputFormat.addInputPath(job, new Path(tradeDir)); FileInputFormat.addInputPath(job, new Path(payDir)); FileOutputFormat.setOutputPath(job, new Path(joinDir)); job.setMapperClass(JoinMapper.class); job.setReducerClass(JoinReducer.class); job.setMapOutputKeyClass(TextIntPair.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.waitForCompletion(true); int exitCode=job.isSuccessful()?0:1; return exitCode; } public static void main(String[] args) throws Exception { // TODO Auto-generated method stub int exitCode=ToolRunner.run(new SortByHashPartitioner(), args); System.exit(exitCode); }}
由于Tool的所有实现都需要实现Configurable,而Configured又是Configurable的具体实现,所以要同时继承Configured和实现Tool,这样就不需要实现Tool中定义的所有方法了。利用Tool接口来跑MapReduce,可以在命令行中设置一些参数,比硬编码好很多。
注意利用Tool启动作业基本方式如下:ToolRunner首先调用自己的静态方法run,在该方法中会首先创建一个Configurable对象。然后调用GenericOptionsParser解析命令行参数,并设置给刚创建的Configurable对象。然后再次设置主类(这里即SortByHashPartitioner)的setConf方法,最后调用主类的run方法执行。所以在run中要想使用命令行参数必须如下:
Configurationconf = getConf();
Jobjob = newJob(conf);
hadoop0.2之后的作业启动是调用job.waitForCompletion(true);的;然后就会进行作业的提交、执行、完成等操作
调用waitForCompletion具体的工作流:
第一,调用submit提交作业
第二,当参数为true的时候,调用monitorAndPrintJob来进行监听作业的进度。
作业提交即submit():
第一,打开一个JobTracker的连接,这里会创建一个JobClient对象
jobClient= new JobClient((JobConf) getConfiguration());
这里的Configuration在初始化创建job的时候就会主动创建的
第二,根据创建的JobClient来调用submitJobInternal()提交作业给系统。
这里会对于命令行中的选项进行检查
1)获取一个作业编号JobID,jobSubmitClient.getNewJobId(),jobSubmitclient是一个JobSubmissionProtocol,JobTracker就是这个类的子类,在JobClient创建的时候就会new一个
2)获取目录的代理,将运行作业需要的资源jar文件,配置文件都复制到一个以作业ID命名的目录下JobTracker文件系统中。
3)检查作业的输出说明。如果没有指定的输出目录或者输出目录已经不存在,则不提交,返回错误
4)创建作业的输入分片。如果分片无法计算,如输入路径不存在,则不提交,报告错误。
5)将该作业写入作业队列中,然后将该文件写入JobTracker的文件系统中。
6)所有都通过后,真正的提交作业,调用submitJob()告知JobTracker准备执行作业.
作业初始化
当JobTracker接受到submitJob()调用后,会将此调用放入内部队列中queue,交由作业调度器(JobScheduler)进行调度,并对其进行初始化操作。
初始化主要是由作业调度器完成的,创建一个任务运行列表。作业调度器会首先从共享文件系统中获取JobClient已经计算好的分片信息,然后为每一个分片创建一个Map任务,创建的reduce数量由mapred.reduce.task来决定,一般是通过setNumReduceTask()设定的。
任务的分配
tasktracker会定期的向jobtracker发送一个心跳告诉是否存活,也是两个之间的通信通道。这里发送心跳的目的就是利用心跳来告知jobtracker,tasktracker还活着,会指明自己是否已经准备好运行新的任务,如果是,则jobtracker会为它分配一个任务。这里的tasktracker就利用周期性的循环来向jobtracker来“拉活”。
每一个tasktracker有固定的map任务槽和reduce任务槽。
选择一个map任务,jobtracker会考虑tasktracker的网络位置,并且选择一个距离其输入分片最近的tasktracker。一般都遵循数据本地化或机架本地化。
选择一个reduce任务,jobtracker简单地从待运行的reduce任务列表中选取下一个来执行,不许要考虑数据的本地化。
任务的执行
当tasktracker初次被分配了一个任务后,就开始要运行该任务。
第一步,从共享文件系统将作业的JAR文件复制到tasktracker所在的文件系统,目的就是实现了作业的JAR文件本地化。并且将应用程序所需要的全部文件从分布式缓存中复制到本地磁盘。
第二步,tasktracker为分配的任务创建一个本地工作目录,将JAR文件内容解压到这个文件夹
第三步,tasktracker创建一个TaskRunner实例来运行该任务。TaskRunner启动一个新的JVM来运行每个任务。
任务进度的更新
monitorAndPrintJob(),这个方法就是实时的报告作业的运行情况,以打印在控制台上。这个会每隔1秒进行查看,利用的就是Thread.sleep(1000)来执行的。
如果任务报告了进度,则会设置一个标志来表明任务状态发生了变化。在tasktracker中,除了运行任务的线程外,还有个独立的线程每隔3秒会检测任务的状态,如果已经设置,则告知tasktracker当前任务状态。而tasktracker每隔5秒会发送心跳到jobtracker,这里发送心跳的目的主要是报告tasktracker上运行的所有任务的状态。
作业的完成
当jobtracker收到作业的最后一个任务已经完成的通知后,则就会把作业的状态设置为“成功”。此时JobClient会打印一条消息告知用户作业已经完成了。Jobtracker和tasktracker都会清空作业的工作状态。