注册 登录  
 加关注
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

anqiang专栏

不要问细节是怎么搞的,源码说明一切

 
 
 

日志

 
 

BayesFeatureMapper   

2010-04-06 16:38:18|  分类: Hadoop & Mahout |  标签: |举报 |字号 订阅

  下载LOFTER 我的照片书  |

import java.io.IOException;

import java.util.Iterator;

import java.util.regex.Pattern;

 

import org.apache.commons.lang.mutable.MutableDouble;

import org.apache.hadoop.io.DoubleWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.JobConf;

import org.apache.hadoop.mapred.MapReduceBase;

import org.apache.hadoop.mapred.Mapper;

import org.apache.hadoop.mapred.OutputCollector;

import org.apache.hadoop.mapred.Reporter;

import org.apache.lucene.analysis.TokenStream;

import org.apache.lucene.analysis.shingle.ShingleFilter;

import org.apache.lucene.analysis.tokenattributes.TermAttribute;

import org.apache.mahout.classifier.BayesFileFormatter;

import org.apache.mahout.common.Parameters;

import org.apache.mahout.common.StringTuple;

import org.apache.mahout.common.iterator.ArrayIterator;

import org.apache.mahout.math.function.ObjectIntProcedure;

import org.apache.mahout.math.function.ObjectProcedure;

import org.apache.mahout.math.map.OpenObjectIntHashMap;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

 

/**

 * Reads the input train set(preprocessed using the {@link BayesFileFormatter}).

 */

public class BayesFeatureMapper extends MapReduceBase implements Mapper<Text,Text,StringTuple,DoubleWritable> {

 

  private static final Logger log = LoggerFactory.getLogger(BayesFeatureMapper.class);

 

  private static final DoubleWritable ONE = new DoubleWritable(1.0);

  //空格的表达式情况

  private static final Pattern SPACE_PATTERN = Pattern.compile("[ ]+");

 

  private int gramSize = 1;

 

  /**

   * We need to count the number of times we've seen a term with a given label and we need to output that. But

   * this Mapper does more than just outputing the count. It first does weight normalisation. Secondly, it

   * outputs for each unique word in a document value 1 for summing up as the Term Document Frequency. Which

   * later is used to calculate the Idf Thirdly, it outputs for each label the number of times a document was

   * seen(Also used in Idf Calculation)

   *

   * @param key

   *          The label

   * @param value

   *          the features (all unique) associated w/ this label in stringtuple format

   * @param output

   *          The OutputCollector to write the results to

   * @param reporter

   *          Not used

   */

  @Override

  public void map(Text key,

                  Text value,

                  final OutputCollector<StringTuple,DoubleWritable> output,

                  Reporter reporter) throws IOException {

    // String line = value.toString();

    final String label = key.toString();

    String[] tokens = SPACE_PATTERN.split(value.toString());

    OpenObjectIntHashMap<String> wordList = new OpenObjectIntHashMap<String>(tokens.length * gramSize);

   

    //统计出每个文档的word信息

    if (gramSize > 1) {

      ShingleFilter sf = new ShingleFilter(new IteratorTokenStream(new ArrayIterator<String>(tokens)), gramSize);

      do {

        String term = ((TermAttribute) sf.getAttribute(TermAttribute.class)).term();

        if (term.length() > 0) {

          if (wordList.containsKey(term)) {

            wordList.put(term, 1 + wordList.get(term));

          } else {

            wordList.put(term, 1);

          }

        }

      } while (sf.incrementToken());

    } else {

      for (String term : tokens) {

        if (wordList.containsKey(term)) {

          wordList.put(term, 1 + wordList.get(term));

        } else {

          wordList.put(term, 1);

        }

      }

    }

   

   

    final MutableDouble lengthNormalisationMut = new MutableDouble(0);

 

    //OpenObjectIntHashMap这种类型可以帮助统计一些信息

    //在这里是将每个termTF平方,然后相加得到一个和值

    wordList.forEachPair(new ObjectIntProcedure<String>() {

      @Override

      public boolean apply(String word, int dKJ) {

        lengthNormalisationMut.add(dKJ * dKJ);

        return true;

      }

    });

   

   

    final double lengthNormalisation = Math.sqrt(lengthNormalisationMut.doubleValue());

   

    // Output Length Normalized + TF Transformed Frequency per Word per Class

    // Log(1 + D_ij)/SQRT( SIGMA(k, D_kj) )

    //统计出每个词的TF(在一个文档中的tf)

    //因为现在的是一行代表一个文档,但是最终它是统计一个word在一个类别里面的TF

    wordList.forEachPair(new ObjectIntProcedure<String>() {

      @Override

      public boolean apply(String token, int dKJ) {

        try {

          StringTuple tuple = new StringTuple();

          tuple.add(BayesConstants.WEIGHT);

          tuple.add(label);

          tuple.add(token);

          DoubleWritable f = new DoubleWritable(Math.log(1.0 + dKJ) / lengthNormalisation);

          output.collect(tuple, f);

        } catch (IOException e) {

          throw new IllegalStateException(e);

        }

        return true;

      }

    });

    reporter.setStatus("Bayes Feature Mapper: Document Label: " + label);

   

    // Output Document Frequency per Word per Class

    //计算一个word在一个类别下的DF,为后面计算IDF做铺垫

    wordList.forEachKey(new ObjectProcedure<String>() {

      @Override

      public boolean apply(String token) {

        try {

          StringTuple dfTuple = new StringTuple();

          dfTuple.add(BayesConstants.DOCUMENT_FREQUENCY);

          dfTuple.add(label);

          dfTuple.add(token);

          output.collect(dfTuple, ONE);

         

          //统计feature的数量

          StringTuple tokenCountTuple = new StringTuple();

          tokenCountTuple.add(BayesConstants.FEATURE_COUNT);

          tokenCountTuple.add(token);

          output.collect(tokenCountTuple, ONE);

        } catch (IOException e) {

          throw new IllegalStateException(e);

        }

        return true;

      }

    });

   

    // output that we have seen the label to calculate the Count of Document per

    // class

    //统计每个类别下文档的数量

    StringTuple labelCountTuple = new StringTuple();

    labelCountTuple.add(BayesConstants.LABEL_COUNT);

    labelCountTuple.add(label);

    output.collect(labelCountTuple, ONE);

  }

 

  @Override

  public void configure(JobConf job) {

    try {

      log.info("Bayes Parameter {}", job.get("bayes.parameters"));

      Parameters params = Parameters.fromString(job.get("bayes.parameters", ""));

      gramSize = Integer.valueOf(params.get("gramSize"));

     

    } catch (IOException ex) {

      log.warn(ex.toString(), ex);

    }

  }

 

  /** Used to emit tokens from an input string array in the style of TokenStream */

  public static class IteratorTokenStream extends TokenStream {

    private final TermAttribute termAtt;

    private final Iterator<String> iterator;

   

    public IteratorTokenStream(Iterator<String> iterator) {

      this.iterator = iterator;

      this.termAtt = (TermAttribute) addAttribute(TermAttribute.class);

    }

   

    @Override

    public boolean incrementToken() throws IOException {

      if (iterator.hasNext()) {

        clearAttributes();

        termAtt.setTermBuffer(iterator.next());

        return true;

      } else {

        return false;

      }

    }

  }

}

  评论这张
 
阅读(944)| 评论(0)
推荐 转载

历史上的今天

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2017