首先了解TFIDF
TF-IDF的主要思想是,如果某个词或短语在一篇文章中出现的频率TF高,并且在其他文章中很少出现,则认为此词或者短语具有很好的类别区分能力,适合用来分类。TF词频(Term Frequency)指的是某一个给定的词语在该文件中出现的次数。IDF反文档频率(Inverse Document Frequency)的主要思想是:如果包含词条的文档越少,IDF越大,则说明词条具有很好的类别区分能力。
要想精准的向用户推送广告。我们需要知道的一个重要因素是,用户对产品的关注程度,我们可以使用数据建模来将‘关注程度’这样一个抽象的名次转化为一个具体的数字。本次实验我们使用的关注度权重公式为:
W = TF * Log(N/DF)
TF:当前关键字在该条记录中出现的总次数;
N:总的记录数;
DF:当前关键字在所有记录中出现的条数;
环境
一个虚拟机
hadoop2+
然后jdk1.7+,版本别太高
步骤
一般来说,创建maven环境,然后打包,扔到hadoop上跑,这是常规操作,但是如果分割的文件够多,hadoop处理的过程是相当慢的,因此,将hadoop环境加载到eclipse上,在eclipse上完成操作,然后只在hadoop上查看一下就好了。因此,你的虚拟机必须得有eclipse而且能跑程序,能满足这些就行了
数据准备:已经给你准备好了,点下面蓝字
链接:https://pan.baidu.com/s/1IKpJx5kqEao7ja-1sYVBAA
提取码:g1k0
1.hadooplib2.tar.gz
在官网下载一个hadoop2的jar包,不想麻烦的,上面链接有jar包
2.tj_data.txt(支持复制)
- 小时光***糖你好 我最近发现我的华为p10后置摄像头照相模糊。这个对于我个只会手机支付身上不带钱的用户造成很大的困扰。我刚去花粉俱乐部看了下,不只有我一个人有这样的问题。请问下p10的后置摄像头是否是批次硬件问题。以及如何解决,求回应
小媳***结成风5 我的P10耗电太快
人***花u 凤凰古镇
全***信他 360全景图 教你手机拍微博全景图哦。
你说***了没 想去拍茶卡盐湖,一望无际
路***锡 岳麓山
让***忧1 世界任你拍
小***峰 我想去拍青海湖!华为P10plus有了,能送个机票吗
19***潮 喜马拉雅
leo***海 最想和她在长江大桥上一起拍摄全景~
花生***商 我微薄有【落霞脆】冬枣转发抽奖哦,欢迎前来围观
愿我***有但是 想去大草原
EX***的 我用的去年买的华为 nova现在用着挺好的,以后也会继续支持华为手机的,我想去杭州西湖,可是路上缺一个充电宝
捕风***冷 西藏,超漂亮的!!!而且已经去过了,可惜评论不能发图,不是会员
Msh***kp 香港
ting***15 北京 北京 上海
没钱***食了 天安门?
御***殿 华山
梁天***博 全景
星***R 迪士尼
毛***狼叼走 转发微博
滑***师 站在鼓楼紫峰大厦楼顶拍一张
3.IKAnalyzer2012_u6.jar(这个包是个比较老的包了,能用来进行中文分词,我们这个算法用一下比较简单)
开始
首先把数据传到hdfs上
hadoop fs -mkdir /tj
hadoop fs -mkdir /tj/input
hadoop fs -put /data/mydata/tj_data.txt /tj/input
然后
创建java项目,然后创建lib,把jar包扔进去,构建路径啥的。。。。算了不说了,直接看目录吧
然后嘞创建第一个MapReduce。
创建一个名为FirstMapper的类,功能为:计算该条微博中,每个词出现的次数,也就是TF功能和微博总条数(N值)
package mr_tj;
import java.io.IOException;
import java.io.StringReader;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.wltea.analyzer.core.IKSegmenter;
import org.wltea.analyzer.core.Lexeme;
public class FirstMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] line = value.toString().split("\t"); //以tab键为分隔符
if (line.length >= 2) {
String id = line[0].trim(); //微博的ID
String content = line[1].trim(); //微博的内容
StringReader sr = new StringReader(content);
IKSegmenter iks = new IKSegmenter(sr, true); //使用
Lexeme lexeme = null;
while ((lexeme = iks.next()) != null) {
String word = lexeme.getLexemeText(); //word就是分完的每个词
context.write(new Text(word + "_" + id), new IntWritable(1));//
}
sr.close();
context.write(new Text("count"), new IntWritable(1));//
} else {
System.err.println("error:" + value.toString() + "-----------------------");
}
}
}
既然有map,那肯定少不了reduce,创建一个FirstReducer类,功能为:合并相同key值的数据,输出TF及N
package mr_tj;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class FirstReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override //text就是map中计算出来的key值
protected void reduce(Text text, Iterable<IntWritable> iterable, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable intWritable : iterable) {
sum += intWritable.get();
}//计算微博总条数,并进行输出
if (text.equals("count")) {
System.out.println(text.toString() + "==" + sum);
}
context.write(text, new IntWritable(sum));
}
}
创建一个FirstPartition类,功能为:分区,如果key值为count,就将数据放入一个单独的分区,如果key值为其他的,就平均分配到三个分区(这样做是为了区分数据格式)
package mr_tj;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
public class FirstPartition extends HashPartitioner<Text, IntWritable> {
@Override
public int getPartition(Text key, IntWritable value, int numReduceTasks) {
//如果key值为count,就返回3,其他的key值就平均分配到三个分区,
if (key.equals(new Text("count"))) {
return 3;
} else {
return super.getPartition(key, value, numReduceTasks - 1);
//numReduceTasks - 1的意思是有4个reduce,其中一个已经被key值为count的占用了,所以数据只能分配到剩下的三个分区中了
//使用super,可以调用父类的HashPartitioner
}
}
}
然后创建一个FirstJob类,功能为:执行计算,得到TF和N
package mr_tj;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FirstJob {
public static void main(String[] args) {
Configuration conf = new Configuration();
try {
Job job = Job.getInstance(conf, "weibo1");
job.setJarByClass(FirstJob.class);
//设置map任务的输出key类型,value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置reduce个数为4
job.setNumReduceTasks(4);
//定义一个partition表分区,哪些数据应该进入哪些分区
job.setPartitionerClass(FirstPartition.class);
job.setMapperClass(FirstMapper.class);
job.setCombinerClass(FirstReducer.class);
job.setReducerClass(FirstReducer.class);
//设置执行任务时,数据获取的目录及数据输出的目录
FileInputFormat.addInputPath(job, new Path(Paths.TJ_INPUT));
FileOutputFormat.setOutputPath(job, new Path(Paths.TJ_OUTPUT1));
if (job.waitForCompletion(true)) {
System.out.println("FirstJob-执行完毕");
TwoJob.mainJob();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
接下来创建第二个MapReduce。
创建一个TwoMapper类,功能为:统计每个词的DF
package mr_tj;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class TwoMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
FileSplit fs = (FileSplit) context.getInputSplit();
//map时拿到split片段所在文件的文件名
if (!fs.getPath().getName().contains("part-r-00003")) {
//拿到TF的统计结果
String[] line = value.toString().trim().split("\t");
if (line.length >= 2) {
String[] ss = line[0].split("_");
if (ss.length >= 2) {
String w = ss[0];
//统计DF,该词在所有微博中出现的条数,一条微博即使出现两次该词,也算一条
context.write(new Text(w), new IntWritable(1));
}
} else {
System.out.println("error:" + value.toString() + "-------------");
}
}
}
}
创建一个TwoReducer类
package mr_tj;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class TwoReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
protected void reduce(Text key, Iterable<IntWritable> arg1, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable i : arg1) {
sum = sum + i.get();
}
context.write(key, new IntWritable(sum));
}
}
创建一个TwoJob类,功能为:执行计算,得到DF
package mr_tj;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class TwoJob {
public static void mainJob() {
Configuration config = new Configuration();
try {
Job job = Job.getInstance(config, "weibo2");
job.setJarByClass(TwoJob.class);
//设置map任务的输出key类型,value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(TwoMapper.class);
job.setCombinerClass(TwoReducer.class);
job.setReducerClass(TwoReducer.class);
//设置任务运行时,数据的输入输出目录,这里的输入数据是上一个mapreduce的输出
FileInputFormat.addInputPath(job, new Path(Paths.TJ_OUTPUT1));
FileOutputFormat.setOutputPath(job, new Path(Paths.TJ_OUTPUT2));
if (job.waitForCompletion(true)) {
System.out.println("TwoJob-执行完毕");
LastJob.mainJob();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
创建第三个MapReduce。
创建一个LastMapper类,功能为:执行W = TF * Log(N/DF)计算
package mr_tj;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.text.NumberFormat;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class LastMapper extends Mapper<LongWritable, Text, Text, Text> {
public static Map<String, Integer> cmap = null; //cmap为count
public static Map<String, Integer> df = null;
//setup方法,表示在map之前
protected void setup(Context context) throws IOException, InterruptedException {
if (cmap == null || cmap.size() == 0 || df == null || df.size() == 0) {
URI[] ss = context.getCacheFiles();
if (ss != null) {
for (int i = 0; i < ss.length; i++) {
URI uri = ss[i];
//判断如果该文件是part-r-00003,那就是count文件,将数据取出来放入到一个cmap中
if (uri.getPath().endsWith("part-r-00003")) {
Path path = new Path(uri.getPath());
BufferedReader br = new BufferedReader(new FileReader(path.getName()));
String line = br.readLine();
if (line.startsWith("count")) {
String[] ls = line.split("\t");
cmap = new HashMap<String, Integer>();
cmap.put(ls[0], Integer.parseInt(ls[1].trim()));
}
br.close();
} else {
//其他的认为是DF文件,将数据取出来放到df中
df = new HashMap<String, Integer>();
Path path = new Path(uri.getPath());
BufferedReader br = new BufferedReader(new FileReader(path.getName()));
String line;
while ((line = br.readLine()) != null) {
String[] ls = line.split("\t");
df.put(ls[0], Integer.parseInt(ls[1].trim()));
//df这个map以单词作为key,以单词的df值作为value
}
br.close();
}
}
}
}
}
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
FileSplit fs = (FileSplit) context.getInputSplit();
if (!fs.getPath().getName().contains("part-r-00003")) {
String[] v = value.toString().trim().split("\t");
if (v.length >= 2) {
int tf = Integer.parseInt(v[1].trim());
String[] ss = v[0].split("_");
if (ss.length >= 2) {
String w = ss[0];
String id = ss[1];
//执行W = TF * Log(N/DF)计算
double s = tf * Math.log(cmap.get("count") / df.get(w));
//格式化,保留小数点后五位
NumberFormat nf = NumberFormat.getInstance();
nf.setMaximumFractionDigits(5);
//以 微博id+词:权重 输出
context.write(new Text(id), new Text(w + ":" + nf.format(s)));
}
} else {
System.out.println(value.toString() + "-------------");
}
}
}
}
创建一个LastReduce类
package mr_tj;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class LastReduce extends Reducer<Text, Text, Text, Text> {
protected void reduce(Text key, Iterable<Text> arg1, Context context) throws IOException, InterruptedException {
StringBuffer sb = new StringBuffer();
for (Text i : arg1) {
sb.append(i.toString() + "\t");
}
context.write(key, new Text(sb.toString()));
}
}
创建一个LastJob类,功能为:运行第三个MR
package mr_tj;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class LastJob {
public static void mainJob() {
Configuration config = new Configuration();
try {
Job job = Job.getInstance(config, "weibo3");
job.setJarByClass(LastJob.class);
//将第一个job和第二个job的输出作为第三个job的输入
//
job.addCacheFile(new Path(Paths.TJ_OUTPUT1 + "/part-r-00003").toUri());
//
job.addCacheFile(new Path(Paths.TJ_OUTPUT2 + "/part-r-00000").toUri());
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// job.setMapperClass();
job.setMapperClass(LastMapper.class);
job.setCombinerClass(LastReduce.class);
job.setReducerClass(LastReduce.class);
FileInputFormat.addInputPath(job, new Path(Paths.TJ_OUTPUT1));
FileOutputFormat.setOutputPath(job, new Path(Paths.TJ_OUTPUT3));
if (job.waitForCompletion(true)) {
System.out.println("LastJob-执行完毕");
System.out.println("全部工作执行完毕");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
创建一个Paths类,功能为:定义输入输出目录,下面的端口号要改成你自己的
package mr_tj;
public class Paths {
public static final String TJ_INPUT = "hdfs://localhost:9000/tj/input";
public static final String TJ_OUTPUT1 = "hdfs://localhost:9000/tj/output1";
public static final String TJ_OUTPUT2 = "hdfs://localhost:9000/tj/output2";
public static final String TJ_OUTPUT3 = "hdfs://localhost:9000/tj/output3";
}
然后右键运行
在hdfs上看一下结果
hadoop fs -ls -R /tj
查看一下各关键字的权重
hadoop fs -cat /tj/output3/part-r-00000