转载

【每日一博】通过 MapReduce 统计用户Pv Uv

通过spring data 操作mongodb,利用map reduce 来统计用户访问的Pv Uv。

详细代码见 https://github.com/WangErXiao/spring-data

具体的spring-data 操作mongodb这里不做介绍。这里只介绍mongo map reduce。

@Component public class UserDaoImpl extends MongoBaseDao implements UserDao {     public void insertRecord(UserVisitRecord record) {  getMongoTemplate().insert(record);     }     public void statisUserPvUv(String date) {  String map = "function() {     "   + "   if(this.date=='"+date+"'){     "   + "      emit(this.date ,{uv:1,pv:1,userIds:this.userId}) "   + "     }"   + "   } ";  String reduce = "function(key, values) {     "   + "   var temp = new Array();        "   + "   var userIds= new Array();      "   + "   for(i = 0; i < values.length; i++) {  "   + "      userIds=userIds.concat(values[i].userIds);"   + "   } "   + "   userIds.sort();  "   + "   for(i = 0; i < userIds.length; i++) {"   + "  if( userIds[i] == userIds[i+1]) { continue;}"   + "  temp[temp.length]=userIds[i];"   + "   } "   + "   return {uv:temp.length,pv:userIds.length,userIds:userIds};"   + " }";  MapReduceOutput mapReduceOutput = getMongoTemplate().getCollection("userVisitRecord").mapReduce(map,reduce,"tmp",null);  DBCollection resultColl = mapReduceOutput.getOutputCollection();  try {      DBCursor cursor = resultColl.find();      while (cursor.hasNext()) {   DBObject dbObject = cursor.next();   if (dbObject.get("value") != null) {       UserStaticModel userStaticModel=new UserStaticModel();       userStaticModel.setUv(Math.round((double)((DBObject) dbObject.get("value")).get("uv")));       userStaticModel.setPv(Math.round((double) ((DBObject) dbObject.get("value")).get("pv")));       List<String>userIds=(List) ((DBObject) dbObject.get("value")).get("userIds");       Set<String> idSet=new HashSet<>(userIds);       userStaticModel.setUserIds(new ArrayList(idSet));       userStaticModel.setDate(date);       getMongoTemplate().insert(userStaticModel);   }      }  }catch (Exception e){      e.printStackTrace();  }finally {      resultColl.drop();  }     }     public UserStaticModel findStatic(String date) {  Query query=new Query();  query.addCriteria(Criteria.where("date").is(date));  return getMongoTemplate().findOne(query,UserStaticModel.class);     } } 

这段代码中staticUserPvUv方法统计某天用户访问的Pv Uv。

map  reduce方法如下:

String map = "function() {       "  + "   if(this.date=='"+date+"'){             "  + "      emit(this.date ,{uv:1,pv:1,userIds:this.userId}) "  + "     }"  + "   } "; String reduce = "function(key, values) {            "  + "   var temp = new Array();  "  + "   var userIds= new Array();             "  + "   for(i = 0; i < values.length; i++) {  "  + "      userIds=userIds.concat(values[i].userIds);"  + "   }        "  + "   userIds.sort();         "  + "   for(i = 0; i < userIds.length; i++) {"  + "         if( userIds[i] == userIds[i+1]) { continue;}"  + "         temp[temp.length]=userIds[i];"  + "   }        "  + "   return {uv:temp.length,pv:userIds.length,userIds:userIds};"  + " }"; 

看到这里很多人会疑惑:map方法为啥emit为

emit(this.date ,{uv:1,pv:1,userIds:this.userId})

而不直接

emit(this.date ,{userId:this.userId})

刚刚开始我也是这么写的,这么写会产生以下结果:

  1. 当某天只有一条记录:该记录就不走reduce ,直接出来,你得到的value就只有一个userId字符串,其他啥也没有。pv,uv 自然也没有。所以 你在emit 应该初始化{pv:1,uv:1,userIds:this.userId}

  2. 当某天记录特别多,超过100条的emit,mongo比较缺德的是,它会把这100的reduce的结果重新自动emit,所以这里把map中emit的对象结构和reduce的return返回的对象结构写成一致的原因。同一个key , 当每超过100个 emit,结果就会从新emit,所以这个结果的pv uv 是无效的,这里只会用到重新emit的userIds,然后在继续在reduce进行统计。

这两个点是mongo mapreduce 比较坑爹的地方。注意这两点其他都OK了

正文到此结束
Loading...