
MapReduce业务 - 图片关联计算



2.1 实现思路


pic_001pic_002 pic_003,pic_004,pic_005 pic_001pic_003 pic_002,pic_005 pic_001pic_004 pic_002,pic_005 pic_001pic_005 pic_002,pic_003,pic_004 ......



  • 第一步:拆分数据,关联数据两两组合作为Key输出。
  • 第二步:将相同Key分组,然后求并集得到计算结果。



  • 拆分数据,两两组合。
public static class PictureMap extends Mapper<LongWritable, Text, Text, Text> {     @Override     protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)      throws IOException, InterruptedException {  StringTokenizer strToken = new StringTokenizer(value.toString());  Text owner = new Text();  Set<String> set = new TreeSet<String>();  owner.set(strToken.nextToken());  while (strToken.hasMoreTokens()) {      set.add(strToken.nextToken());  }  String[] relations = new String[set.size()];  relations = set.toArray(relations);  for (int i = 0; i < relations.length; i++) {      for (int j = i + 1; j < relations.length; j++) {   String outPutKey = relations[i] + relations[j];   context.write(new Text(outPutKey), owner);      }  }     } } 
  • 按Key分组,求并集
public static class PictureReduce extends Reducer<Text, Text, Text, Text> {  @Override  protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)    throws IOException, InterruptedException {   String common = "";   for (Text val : values) {    if (common == "") {     common = val.toString();    } else {     common = common + "," + val.toString();    }   }   context.write(key, new Text(common));  } } 
  • 完整示例
package cn.hadoop.hdfs.example; import java.io.IOException; import java.util.Set; import java.util.StringTokenizer; import java.util.TreeSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import cn.hadoop.hdfs.util.HDFSUtils; import cn.hadoop.hdfs.util.SystemConfig; /**  * @Date Aug 31, 2015  *  * @Author dengjie  *  * @Note Find picture relations  */ public class PictureRelations extends Configured implements Tool {  private static Logger log = LoggerFactory.getLogger(PictureRelations.class);  private static Configuration conf;  static {   String tag = SystemConfig.getProperty("dev.tag");   String[] hosts = SystemConfig.getPropertyArray(tag + ".hdfs.host", ",");   conf = new Configuration();   conf.set("fs.defaultFS", "hdfs://cluster1");   conf.set("dfs.nameservices", "cluster1");   conf.set("dfs.ha.namenodes.cluster1", "nna,nns");   conf.set("dfs.namenode.rpc-address.cluster1.nna", hosts[0]);   conf.set("dfs.namenode.rpc-address.cluster1.nns", hosts[1]);   conf.set("dfs.client.failover.proxy.provider.cluster1",     "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");   conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());   conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());  }  public static class PictureMap extends Mapper<LongWritable, Text, Text, Text> {   @Override   protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)     throws IOException, InterruptedException {    StringTokenizer strToken = new StringTokenizer(value.toString());    Text owner = new Text();    Set<String> set = new TreeSet<String>();    owner.set(strToken.nextToken());    while (strToken.hasMoreTokens()) {     set.add(strToken.nextToken());    }    String[] relations = new String[set.size()];    relations = set.toArray(relations);    for (int i = 0; i < relations.length; i++) {     for (int j = i + 1; j < relations.length; j++) {      String outPutKey = relations[i] + relations[j];      context.write(new Text(outPutKey), owner);     }    }   }  }  public static class PictureReduce extends Reducer<Text, Text, Text, Text> {   @Override   protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)     throws IOException, InterruptedException {    String common = "";    for (Text val : values) {     if (common == "") {      common = val.toString();     } else {      common = common + "," + val.toString();     }    }    context.write(key, new Text(common));   }  }  public int run(String[] args) throws Exception {   final Job job = Job.getInstance(conf);   job.setJarByClass(PictureMap.class);   job.setMapperClass(PictureMap.class);   job.setMapOutputKeyClass(Text.class);   job.setMapOutputValueClass(Text.class);   job.setReducerClass(PictureReduce.class);   job.setOutputKeyClass(Text.class);   job.setOutputValueClass(Text.class);   FileInputFormat.setInputPaths(job, args[0]);   FileOutputFormat.setOutputPath(job, new Path(args[1]));   int status = job.waitForCompletion(true) ? 0 : 1;   return status;  }  public static void main(String[] args) {   try {    if (args.length != 1) {     log.warn("args length must be 1 and as date param");     return;    }    String tmpIn = SystemConfig.getProperty("hdfs.input.path.v2");    String tmpOut = SystemConfig.getProperty("hdfs.output.path.v2");    String inPath = String.format(tmpIn, "t_pic_20150801.log");    String outPath = String.format(tmpOut, "meta/" + args[0]);    // bak dfs file to old    HDFSUtils.bak(tmpOut, outPath, "meta/" + args[0] + "-old", conf);    args = new String[] { inPath, outPath };    int res = ToolRunner.run(new Configuration(), new PictureRelations(), args);    System.exit(res);   } catch (Exception ex) {    ex.printStackTrace();    log.error("Same friend task has error,msg is" + ex.getMessage());   }  } } 



