Hadoop0.2之前版本和之后版本在Job中有很大的改进,本次采用的版本是Hadoop1.1.2版本。

     现在作为作业驱动器,可以直接继承Configured以及实现Tool,这种方式可以很便捷的获取启动时候命令行中输入的作业配置参数,常规的Job启动如下:

public class SortByHash extends Configured implements Tool{   public int run(String[] args) throws Exception {    //这里面负责配置job属性        Configuration conf=getConf();        String[] paths=new GenericOptionsParser(conf, args).getRemainingArgs();        String tradeDir=paths[0];        String payDir=paths[1];    String joinDir=paths[2];    Job job=new Job(conf,"JoinJob");    job.setJarByClass(JoinMain.class);    FileInputFormat.addInputPath(job, new Path(tradeDir));    FileInputFormat.addInputPath(job, new Path(payDir));    FileOutputFormat.setOutputPath(job, new Path(joinDir));                                                                                                                                                              job.setMapperClass(JoinMapper.class);    job.setReducerClass(JoinReducer.class);    job.setMapOutputKeyClass(TextIntPair.class);    job.setMapOutputValueClass(Text.class);    job.setOutputKeyClass(Text.class);    job.setOutputValueClass(Text.class);    job.waitForCompletion(true);    int exitCode=job.isSuccessful()?0:1;    return exitCode;   }   public static void main(String[] args) throws Exception   {      // TODO Auto-generated method stub      int exitCode=ToolRunner.run(new SortByHashPartitioner(), args);      System.exit(exitCode);   }}

由于Tool的所有实现都需要实现Configurable,Configured又是Configurable的具体实现,所以要同时继承Configured和实现Tool,这样就不需要实现Tool中定义的所有方法了。利用Tool接口来跑MapReduce,可以在命令行中设置一些参数,比硬编码好很多。

      注意利用Tool启动作业基本方式如下:ToolRunner首先调用自己的静态方法run,在该方法中会首先创建一个Configurable对象。然后调用GenericOptionsParser解析命令行参数,并设置给刚创建的Configurable对象。然后再次设置主类(这里即SortByHashPartitioner)setConf方法,最后调用主类的run方法执行。所以在run中要想使用命令行参数必须如下:

       Configurationconf = getConf();

       Jobjob = newJob(conf);

       hadoop0.2之后的作业启动是调用job.waitForCompletion(true);的;然后就会进行作业的提交、执行、完成等操作

调用waitForCompletion具体的工作流:

第一,调用submit提交作业

第二,当参数为true的时候,调用monitorAndPrintJob来进行监听作业的进度。

作业提交即submit():

第一,打开一个JobTracker的连接,这里会创建一个JobClient对象

jobClient= new JobClient((JobConf) getConfiguration());

这里的Configuration在初始化创建job的时候就会主动创建的

第二,根据创建的JobClient来调用submitJobInternal()提交作业给系统。

这里会对于命令行中的选项进行检查

1)获取一个作业编号JobIDjobSubmitClient.getNewJobId()jobSubmitclient是一个JobSubmissionProtocolJobTracker就是这个类的子类,JobClient创建的时候就会new一个

2)获取目录的代理,将运行作业需要的资源jar文件,配置文件都复制到一个以作业ID命名的目录下JobTracker文件系统中。

3)检查作业的输出说明。如果没有指定的输出目录或者输出目录已经不存在,则不提交,返回错误

4)创建作业的输入分片。如果分片无法计算,如输入路径不存在,则不提交,报告错误。

5)将该作业写入作业队列中,然后将该文件写入JobTracker的文件系统中

6)所有都通过后,真正的提交作业,调用submitJob()告知JobTracker准备执行作业.

作业初始化

JobTracker接受到submitJob()调用后,会将此调用放入内部队列中queue,交由作业调度器(JobScheduler)进行调度,并对其进行初始化操作。

初始化主要是由作业调度器完成的,创建一个任务运行列表。作业调度器会首先从共享文件系统中获取JobClient已经计算好的分片信息,然后为每一个分片创建一个Map任务,创建的reduce数量由mapred.reduce.task来决定,一般是通过setNumReduceTask()设定的。

任务的分配

tasktracker会定期的向jobtracker发送一个心跳告诉是否存活,也是两个之间的通信通道。这里发送心跳的目的就是利用心跳来告知jobtrackertasktracker还活着,会指明自己是否已经准备好运行新的任务,如果是,则jobtracker会为它分配一个任务。这里的tasktracker就利用周期性的循环来向jobtracker来“拉活”。

每一个tasktracker有固定的map任务槽和reduce任务槽。

选择一个map任务,jobtracker会考虑tasktracker的网络位置,并且选择一个距离其输入分片最近的tasktracker。一般都遵循数据本地化或机架本地化。

选择一个reduce任务,jobtracker简单地从待运行的reduce任务列表中选取下一个来执行,不许要考虑数据的本地化。

任务的执行

tasktracker初次被分配了一个任务后,就开始要运行该任务。

第一步,从共享文件系统将作业的JAR文件复制到tasktracker所在的文件系统,目的就是实现了作业的JAR文件本地化。并且将应用程序所需要的全部文件从分布式缓存中复制到本地磁盘。

第二步,tasktracker为分配的任务创建一个本地工作目录,将JAR文件内容解压到这个文件夹

第三步,tasktracker创建一个TaskRunner实例来运行该任务。TaskRunner启动一个新的JVM来运行每个任务。

任务进度的更新

monitorAndPrintJob(),这个方法就是实时的报告作业的运行情况,以打印在控制台上。这个会每隔1秒进行查看,利用的就是Thread.sleep(1000)来执行的。

如果任务报告了进度,则会设置一个标志来表明任务状态发生了变化。在tasktracker中,除了运行任务的线程外,还有个独立的线程每隔3秒会检测任务的状态,如果已经设置,则告知tasktracker当前任务状态。而tasktracker每隔5秒会发送心跳到jobtracker,这里发送心跳的目的主要是报告tasktracker上运行的所有任务的状态。

作业的完成

jobtracker收到作业的最后一个任务已经完成的通知后,则就会把作业的状态设置为“成功”。此时JobClient会打印一条消息告知用户作业已经完成了。Jobtrackertasktracker都会清空作业的工作状态。