基于Alink的微博评论情感分析
-
什么是alink?
Alink 是阿里巴巴计算平台事业部PAI团队从 2017 年开始基于实时计算引擎 Flink 研发的新一代机器学习算法平台,提供丰富的算法组件库和便捷的操作框架,开发者可以一键搭建覆盖数据处理、特征工程、模型训练、模型预测的算法模型开发全流程。
Alink 已被广泛运用在阿里巴巴搜索、推荐、广告等多个核心实时在线业务中。在刚刚落幕的天猫双 11 中,单日数据处理量达到 970PB,每秒处理峰值数据高达 25 亿条。Alink 成功经受住了超大规模实时数据训练的检验,并帮助提升 4% CTR(商品点击转化率)。
文本情感分析
情感分析是学术领域研究多年的课题,基本的方法上有基于词典规则的方法、语言文法的方法,此外还有分类器以及近几年比较火的深度学习、机器学习的方法。
我接触的文本情感分析处理过程
这里我只介绍我日常工作中接触到的常用的情感分析流程:
-
情感分析任务的界定
在我们的日常开发中,首先要弄清楚项目的需求是什么?要分析文本的哪个层面,如:段落、句子、短语词语等。准许的误差是在什么范围。
-
语料数据的加工、词典的加工
这一步中不同的方法要做的工作不同,基本上是铺人力的工作。语料数据通常来自网上开源语料库、公司自有数据整理。语料数据和词典是影响数据分析结果准确性的最重要因素。
-
根据数据的特征选择合适的方法,评测方法的优劣
工程中的方法并不是单一的方法。在应用到实际产品时,要结合产品的垂直特点、利用行业的特性、比如在金融、汽车、以及其他的一些“圈子”,它们一定有自己行话,这些话有非常明显的规则或特征。
-
-
项目搭建
本项目中使用的开发工具及中间件:
- Idea + Pycharm
- mysql 、redis 、kafka
-
数据抓取
在我接触的项目中,数据主要来源于客户的自有数据和网络爬虫数据。比如互联网舆情系统中,当发生社会性事件时,我们系统会根据用户设置的关键词到各大平台抓取相关数据,然后经过数据的清洗、数据的分析等工作后,为客户决策提供一些建议。
开始我们的数据抓取:
因为我们这是演示系统,所以爬取的关键词直接写死在代码里,实际项目中可以使用读取数据库或redis、mq的方式。爬取的方法主要是通过手机端api进行json数据的获取,然后进行数据的提取。本次演示使用到下方两个链接:
https://m.weibo.cn/api/container/getIndex?containerid=100103type%3D60%26q%3D$1%26t%3D0&page_type=searchall&page=$2 https://m.weibo.cn/comments/hotflow?id=$1&mid=$2&max_id_type=0
链接1 用于获取词条相关的热门微博,请求链接1后,在返回的数据中提取 id和mid与链接2进行拼接即可获取相关微博的评论数据。python代码就不贴了。
-
数据分析及展示
搭建数据分析模块:
这里使用Alink的1.4.0版本。详细pom文件可参考代码。alink的相关api请访问 Alink的开发文档
<dependency> <groupId>com.alibaba.alink</groupId> <artifactId>alink_core_flink-1.13_2.11</artifactId> <version>1.4.0</version> </dependency>
首先需要准备一个语料文本数据(数据来自互联网)
机器学习的一般步骤:
- 中文分词
- 去除停留词
- 数据转换为词向量
- 分类算法
-
模型训练
-
加载语料数据,通过CsvSourceBatchOp 读取数据。
-
String schema = "label double,text string"; //数据源 source CsvSourceBatchOp data = new CsvSourceBatchOp() .setFilePath("data/train.txt") .setSchemaStr(schema) .setFieldDelimiter("\t");
Pipeline pipeline = new Pipeline() .add(new Segment().setSelectedCol("text")) // 中文分词,内部使用jieba分词器 .add(new StopWordsRemover().setSelectedCol("text")) // 去除停留词 .add(new DocCountVectorizer().setSelectedCol("text")) // 转换为词向量 .add(new NaiveBayesTextClassifier() //朴素贝叶斯分类算法 .setPredictionCol("prediction") .setPredictionDetailCol("predictionDetail") .setVectorCol("text") .setLabelCol("label") ); // 训练模型 执行流程 PipelineModel model = pipeline.fit(data); // 预测 BatchOperator<?> transform = model.transform(data); //模型评估 BinaryClassMetrics metrics = new EvalBinaryClassBatchOp() .setLabelCol("label") .setPredictionDetailCol("predictionDetail") .linkFrom(transform) .collectMetrics();
//保存模型 model.save("data/model");
-
-
文本情感分析
public static void main(String[] args) throws Exception { Properties kafkaProps = new Properties(); kafkaProps.setProperty("zookeeper.connect", "127.0.0.1:2181"); kafkaProps.setProperty("bootstrap.servers", "127.0.0.1:9092"); kafkaProps.setProperty("group.id", "3"); //当使用alink需要通过这个方式创建flink环境 StreamTableEnvironment tableEnv = MLEnvironmentFactory.getDefault().getStreamTableEnvironment(); //连接kafka读取数据 tableEnv.connect( new Kafka() .version("universal") .topic("topic_weibo_comment") .startFromEarliest() .properties(kafkaProps) ).withFormat( new Csv().fieldDelimiter('|') ).withSchema( new Schema().field("sentId", DataTypes.STRING()) .field("date", DataTypes.STRING()) .field("text", DataTypes.STRING())) .inAppendMode() .createTemporaryTable("t_comment"); // flink table api //加载模型 PipelineModel model = PipelineModel.load("data/model"); //实时预测 Table comment = model.transform(tableEnv.from("t_comment")); //取出预测结果 Table select = comment.select("sentId,date,prediction,predictionDetail"); DataStream<Row> rowDataStream = tableEnv.toAppendStream(select, Row.class); //整理数据格式 SingleOutputStreamOperator<Tuple4<String, String, Double, Integer>> map1 = rowDataStream.map(new SentMapFunction()); //统计每个舆情每小时的正负的数量 SingleOutputStreamOperator<SentimentWeiboComment> result = map1 .keyBy(0, 1, 2) .sum(3) .map(new MapFunction<Tuple4<String, String, Double, Integer>, SentimentWeiboComment>() { @Override public SentimentWeiboComment map(Tuple4<String, String, Double, Integer> value) throws Exception { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd:hh"); long ts = format.parse(value.f1).getTime(); return new SentimentWeiboComment(value.f0, String.valueOf(ts), String.valueOf(value.f2), String.valueOf(value.f3)); } }); result.print(); // TODO 保存数据 result.addSink(esSinkBuilder.build()); /*es连接地址 ArrayList<HttpHost> httpHosts = new ArrayList<>(); httpHosts.add(new HttpHost("127.0.0.1", 9200, "http")); ElasticsearchSink.Builder<SentimentWeiboComment> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, new ElasticsearchSinkFunction<SentimentWeiboComment>() { IndexRequest createIndexRequest(SentimentWeiboComment element) { Map<String, String> json = new HashMap<>(); json.put("sent_id", element.getSentId()); json.put("date", element.getDate()); json.put("prediction", element.getPrediction()); json.put("prediction_detail", element.getPredictionDetail()); return Requests.indexRequest() .index("weibo_comments") .source(json); } @Override public void process(SentimentWeiboComment element, RuntimeContext runtimeContext, RequestIndexer indexer) { indexer.add(createIndexRequest(element)); } }); esSinkBuilder.setBulkFlushMaxActions(1); esSinkBuilder.setRestClientFactory(new RestClientFactory() { @Override public void configureRestClientBuilder(RestClientBuilder restClientBuilder) { Header[] headers = new BasicHeader[]{new BasicHeader("Content-Type", "application/json")}; restClientBuilder.setDefaultHeaders(headers); //以数组的形式可以添加多个header } }); esSinkBuilder.setFailureHandler(new RetryRejectedExecutionFailureHandler());*/ //alink程序启动方式 StreamOperator.execute(); }
public class SentMapFunction implements MapFunction<Row, Tuple4<String, String, Double, Integer>> {
@Override
public Tuple4<String, String, Double, Integer> map(Row value) throws Exception {
String sentId = (String) value.getField(0);
String time = (String) value.getField(1);
Double prediction = (Double) value.getField(2);
String predictionDetail = (String) value.getField(3);
Date date = new Date(time);
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd:hh");
String day = format.format(date);
JSONObject jsonObject = JSON.parseObject(predictionDetail);
//正面赋值 0
Double pos = jsonObject.getDouble("0.0");
//负面赋值 1
Double neg = jsonObject.getDouble("1.0");
//如果预测结果概率相差不多,将预测结果改成中性-2
if (Math.abs(pos - neg) < 0.3) {
prediction = 2.0;
}
return new Tuple4<>(sentId, day, prediction, 1);
}
}
数据分析结果:
3> SentimentWeiboComment{sentId='4649847929964235', date='1624050000000', prediction='0.0', predictionDetail='1'}
4> SentimentWeiboComment{sentId='4649830285050320', date='1624042800000', prediction='0.0', predictionDetail='1'}
1> SentimentWeiboComment{sentId='4649980101133235', date='1624122000000', prediction='0.0', predictionDetail='1'}
2> SentimentWeiboComment{sentId='4652495101755622', date='1624723200000', prediction='0.0', predictionDetail='1'}
1> SentimentWeiboComment{sentId='4650470423921846', date='1624240800000', prediction='1.0', predictionDetail='1'}
1> SentimentWeiboComment{sentId='4651664134901972', date='1624482000000', prediction='2.0', predictionDetail='1'}
2> SentimentWeiboComment{sentId='4651667297144331', date='1624482000000', prediction='1.0', predictionDetail='1'}
2> SentimentWeiboComment{sentId='4651681544934300', date='1624485600000', prediction='0.0', predictionDetail='1'}
2> SentimentWeiboComment{sentId='4651664299001407', date='1624482000000', prediction='0.0', predictionDetail='1'}
3> SentimentWeiboComment{sentId='4651664746744677', date='1624482000000', prediction='0.0', predictionDetail='1'}
4> SentimentWeiboComment{sentId='4651663752956378', date='1624482000000', prediction='0.0', predictionDetail='1'}
1> SentimentWeiboComment{sentId='4651664093744447', date='1624482000000', prediction='0.0', predictionDetail='1'}
1> SentimentWeiboComment{sentId='4651702923300876', date='1624489200000', prediction='0.0', predictionDetail='1'}
1> SentimentWeiboComment{sentId='4651669122191420', date='1624482000000', prediction='0.0', predictionDetail='1'}
4> SentimentWeiboComment{sentId='4651666412670769', date='1624482000000', prediction='1.0', predictionDetail='1'}
3> SentimentWeiboComment{sentId='4651663966867610', date='1624482000000', prediction='0.0', predictionDetail='1'}
2> SentimentWeiboComment{sentId='4651691267851919', date='1624489200000', prediction='2.0', predictionDetail='1'}
3> SentimentWeiboComment{sentId='4651664122316413', date='1624482000000', prediction='0.0', predictionDetail='1'}
1> SentimentWeiboComment{sentId='4651684327066170', date='1624485600000', prediction='0.0', predictionDetail='1'}
2> SentimentWeiboComment{sentId='4651665304849925', date='1624482000000', prediction='0.0', predictionDetail='1'}
3> SentimentWeiboComment{sentId='4652052653540144', date='1624575600000', prediction='0.0', predictionDetail='1'}
2> SentimentWeiboComment{sentId='4651910697323960', date='1624582800000', prediction='0.0', predictionDetail='1'}
2> SentimentWeiboComment{sentId='4651708938458099', date='1624492800000', prediction='0.0', predictionDetail='1'}
2> SentimentWeiboComment{sentId='4651688059210919', date='1624485600000', prediction='0.0', predictionDetail='1'}
2> SentimentWeiboComment{sentId='4651684217230098', date='1624485600000', prediction='1.0', predictionDetail='1'}