SparkStreaming+Kafka 实现基于缓存的实时wordcount程序

2019 Java 开发者跳槽指南.pdf (吐血整理)….>>>

点击上方'伦少的博客'关注与您一起成长

SparkStreaming+Kafka 实现基于缓存的实时wordcount程序

前言

本文利用SparkStreaming和Kafka实现基于缓存的实时wordcount程序,什么意思呢,因为一般的SparkStreaming的wordcount程序比如官网上的,只能统计最新时间间隔内的每个单词的数量,而不能将历史的累加起来,本文是看了教程之后,自己实现了一下kafka的程序,记录在这里。其实没什么难度,只是用了一个updateStateByKey算子就能实现,因为第一次用这个算子,所以正好学习一下。

1、数据

数据是我随机在kafka里生产的几条,单词以空格区分开

2、kafka topic

首先在kafka建一个程序用到topic:UpdateStateBykeyWordCount

1bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic UpdateStateBykeyWordCount

3、创建checkpoint的hdfs目录

我的目录为:/spark/dkl/kafka/wordcount_checkpoint

1hadoop fs -mkdir -p /spark/dkl/kafka/wordcount_checkpoint

4、Spark代码

启动下面的程序

 1package com.dkl.leanring.spark.kafka
2
3import org.apache.spark.streaming.StreamingContext
4import org.apache.spark.sql.SparkSession
5import org.apache.spark.streaming.Seconds
6import org.apache.kafka.common.serialization.StringDeserializer
7import org.apache.spark.streaming.kafka010.KafkaUtils
8import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
9import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
10object UpdateStateBykeyWordCount {
11
12  def main(args: Array[String]): Unit = {
13    //初始化,创建SparkSession
14    val spark = SparkSession.builder().appName("sskt").master("local[2]").enableHiveSupport().getOrCreate()
15    //初始化,创建sparkContext
16    val sc = spark.sparkContext
17    //初始化,创建StreamingContext,batchDuration为1秒
18    val ssc = new StreamingContext(sc, Seconds(5))
19
20    //开启checkpoint机制
21    ssc.checkpoint("hdfs://ambari.master.com:8020/spark/dkl/kafka/wordcount_checkpoint")
22
23    //kafka集群地址
24    val server = "ambari.master.com:6667"
25
26    //配置消费者
27    val kafkaParams = Map[String, Object](
28      "bootstrap.servers" -> server, //kafka集群地址
29      "key.deserializer" -> classOf[StringDeserializer],
30      "value.deserializer" -> classOf[StringDeserializer],
31      "group.id" -> "UpdateStateBykeyWordCount"//消费者组名
32      "auto.offset.reset" -> "latest"//latest自动重置偏移量为最新的偏移量   earliest 、none
33      "enable.auto.commit" -> (false: java.lang.Boolean)) //如果是true,则这个消费者的偏移量会在后台自动提交
34    val topics = Array("UpdateStateBykeyWordCount"//消费主题
35
36    //基于Direct方式创建DStream
37    val stream = KafkaUtils.createDirectStream(ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))
38
39    //开始执行WordCount程序
40
41    //以空格为切分符切分单词,并转化为 (word,1)形式
42    val words = stream.flatMap(_.value().split(" ")).map((_, 1))
43    val wordCounts = words.updateStateByKey(
44      //每个单词每次batch计算的时候都会调用这个函数
45      //第一个参数为每个key对应的新的值,可能有多个,比如(hello,1)(hello,1),那么values为(1,1)
46      //第二个参数为这个key对应的之前的状态
47      (values: Seq[Int], state: Option[Int]) => {
48
49        var newValue = state.getOrElse(0)
50        values.foreach(newValue += _)
51        Option(newValue)
52
53      })
54    wordCounts.print()
55
56    ssc.start()
57    ssc.awaitTermination()
58
59  }
60
61}

5、生产几条数据

随便写几条即可

1bin/kafka-console-producer.sh --broker-list ambari.master.com:6667 --topic UpdateStateBykeyWordCount 
SparkStreaming+Kafka 实现基于缓存的实时wordcount程序

6、结果

根据结果可以看到,历史的单词也被统计打印出来了

SparkStreaming+Kafka 实现基于缓存的实时wordcount程序

关注我


SparkStreaming+Kafka 实现基于缓存的实时wordcount程序


原文始发于微信公众号(伦少的博客):SparkStreaming+Kafka 实现基于缓存的实时wordcount程序