中国大数据教育领跑者

IT培训上市机构

您现在的位置:首页 > 大数据培训 > 技术分享>spark-streaming集成Kafka工程实例>

spark-streaming集成Kafka工程实例

2017-12-27 16:29:40 作者:魔据大数据学院

场景模拟

 

 

我试图覆盖工程上最为常用的一个场景:

 

 

1)首先,向Kafka里实时的写入订单数据,JSON格式,包含订单ID-订单类型-订单收益

 

 

2)然后,spark-streaming每十秒实时去消费kafka中的订单数据,并以订单类型分组统计收益

 

 

3)最后,spark-streaming统计结果实时的存入本地MySQL。

 

前提条件

 

 

安装

 

 

1)spark:我使用的yarn-client模式下的spark,环境中集群客户端已经搞定

 

 

2)zookeeper:我使用的是这个集群:10.93.21.21:2181,10.93.18.34:2181,10.93.18.35:2181

 

 

3)kafka:我使用的是standalone模式:10.93.21.21:9093

 

 

4)mysql:10.93.84.53:3306

 

 

语言

 

 

python:pykafka,pip install pykafka

 

 

java:spark,spark-streaming

 

 

下面开始

 

 

1、数据写入kafka

 

 

kafka写入

 

 

我们使用pykafka模拟数据实时写入,代码如下:

 

 

kafka_producer.py

# -* coding:utf8 *-

import time

import json

import uuid

import random

import threading

from pykafka import KafkaClient

 

# 创建kafka实例

hosts = '10.93.21.21:9093'

client = KafkaClient(hosts=hosts)

 

# 打印一下有哪些topic

print client.topics

 

# 创建kafka producer句柄

topic = client.topics['kafka_spark']

producer = topic.get_producer()

 

 

# work

def work():

while 1:

msg = json.dumps({

"id": str(uuid.uuid4()).replace('-', ''),

"type": random.randint(1, 5),

"profit": random.randint(13, 100)})

producer.produce(msg)

 

# 多线程执行

thread_list = [threading.Thread(target=work) for i in range(10)]

for thread in thread_list:

thread.setDaemon(True)

thread.start()

 

time.sleep(60)

 

# 关闭句柄, 退出

producer.stop()

可以看到,我们写入的形式是一个json,订单id是一个uuid,订单类型type从1-5随机,订单收益profit从13-100随机,形如

{"id": ${uid}, "type": 1, "profit": 30}

注意:1)python对kafka的读写不需要借助zookeeper,2)使用多线程的形式写入,让数据量具有一定的规模。

 

 

执行producer,会持续写入数据1分钟。

python kafka_producer.py

验证一下

kafka_consumer.py

# -* coding:utf8 *-

from pykafka import KafkaClient

 

hosts = '10.93.21.21:9093'

client = KafkaClient(hosts=hosts)

# 消费者

topic = client.topics['kafka_spark']

consumer = topic.get_simple_consumer(consumer_group='test', auto_commit_enable=True, auto_commit_interval_ms=1,

consumer_id='test')

for message in consumer:

if message is not None:

print message.offset, message.value

执行,可以消费kafka刚才写入的数据

python kafka_consumer.py

 

 

2、spark-streaming

 

 

1)先解决依赖

 

 

其中比较核心的是spark-streaming和kafka集成包spark-streaming-kafka_2.10,还有spark引擎spark-core_2.10

 

 

json和mysql看大家爱好。

 

 

pom.xml

org.apache.spark

spark-streaming-kafka_2.10

1.6.0

 

org.apache.spark

spark-streaming_2.10

1.6.0

 

org.apache.spark

spark-core_2.10

1.6.0

provided

 

com.alibaba

fastjson

1.2.19

 

mysql

mysql-connector-java

5.1.38

 

commons-dbcp

commons-dbcp

1.4

 

 

2)MySQL准备

 

 

建表

我们的结果去向是MySQL,先建立一个结果表。

 

 

id:主键,自增id

 

 

type:订单类型

 

 

profit:每个spark batch聚合出的订单收益结果

 

 

time:时间戳

CREATE TABLE `order` (

`id` int(11) NOT NULL AUTO_INCREMENT,

`type` int(11) DEFAULT NULL,

`profit` int(11) DEFAULT NULL,

`time` mediumtext,

PRIMARY KEY (`id`)

) ENGINE=InnoDB AUTO_INCREMENT=56 DEFAULT CHARSET=utf8

Java客户端

采用了单例线程池的模式简单写了一下。

 

 

ConnectionPool.java

package com.xiaoju.dqa.realtime_streaming;

 

import java.sql.Connection;

import java.sql.DriverManager;

import java.util.LinkedList;

 

 

public class ConnectionPool {

private static LinkedList connectionQueue;

 

static {

try {

Class.forName("com.mysql.jdbc.Driver");

} catch (ClassNotFoundException e) {

e.printStackTrace();

}

}

 

public synchronized static Connection getConnection() {

try {

if (connectionQueue == null) {

connectionQueue = new LinkedList();

for (int i = 0; i < 5; i++) {

Connection conn = DriverManager.getConnection(

"jdbc:mysql://10.93.84.53:3306/big_data",

"root",

"1234");

connectionQueue.push(conn);

}

}

} catch (Exception e) {

e.printStackTrace();

}

return connectionQueue.poll();

 

}

public static void returnConnection(Connection conn){connectionQueue.push(conn);}

}

3)代码实现

 

 

我用java写的,不会用scala很尴尬。

 

 

即时用java整个的处理过程依然比较简单。跟常见的wordcount也没有多大的差别。

 

 

SparkStreaming特点

 

 

spark的特点就是RDD,通过对RDD的操作,来屏蔽分布式运算的复杂度。

 

 

而spark-streaming的操作对象是RDD的时间序列DStream,这个序列的生成是跟batch的选取有关。例如我这里Batch是10s一个,那么每隔10s会产出一个RDD,对RDD的切割和序列的生成,spark-streaming对我们透明了。唯一暴露给我们的DStream和原生RDD的使用方式基本一致。

 

 

这里需要讲解一下MySQL写入注意的事项。

 

 

MySQL写入

 

 

在处理mysql写入时使用了foreachPartition方法,即,在foreachPartition中使用borrow mysql句柄。

 

 

这样做的原因是:

 

 

1)你无法再Driver端创建mysql句柄,并通过序列化的形式发送到worker端

 

 

2)如果你在处理rdd中创建mysql句柄,很容易对每一条数据创建一个句柄,在处理过程中很快内存就会溢出。

 

 

OrderProfitAgg.java

package com.xiaoju.dqa.realtime_streaming;

 

import com.alibaba.fastjson.JSON;

import com.alibaba.fastjson.JSONObject;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function;

import org.apache.spark.api.java.function.Function2;

import org.apache.spark.api.java.function.PairFunction;

import org.apache.spark.api.java.function.VoidFunction;

import org.apache.spark.streaming.Durations;

import org.apache.spark.streaming.api.java.JavaPairDStream;

import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;

import org.apache.spark.streaming.api.java.JavaStreamingContext;

import org.apache.spark.streaming.kafka.KafkaUtils;

import scala.Tuple2;

 

import java.sql.Connection;

import java.sql.Statement;

import java.util.*;

 

 

/*

* 生产者可以选用kafka自带的producer脚本

* bin/kafka-console-producer.sh --broker-list localhost:9093 --topic test

* */

public class OrderProfitAgg {

 

public static void main(String[] args) throws InterruptedException {

/*

* kafka所注册的zk集群

* */

String zkQuorum = "10.93.21.21:2181,10.93.18.34:2181,10.93.18.35:2181";

 

/*

* spark-streaming消费kafka的topic名称, 多个以逗号分隔

* */

String topics = "kafka_spark,kafka_spark2";

 

/*

* 消费组 group

* */

String group = "bigdata_qa";

 

/*

* topic的分区数

* */

int numThreads = 2;

 

/*

* 选用yarn队列模式, spark-streaming程序的app名称是"order profit"

* */

SparkConf sparkConf = new SparkConf().setMaster("yarn-client").setAppName("order profit");

 

/*

* 创建sc, 全局唯一, 设置logLevel可以打印一些东西到控制台

* */

JavaSparkContext sc = new JavaSparkContext(sparkConf);

sc.setLogLevel("WARN");

 

/*

* 创建jssc, spark-streaming的batch是每10s划分一个

* */

JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(10));

 

/*

* 准备topicMap

* */

Map topicMap = new HashMap();

for (String topic : topics.split(",")) {

topicMap.put(topic, numThreads);

}

 

/*

* kafka数据流

* */

List> streams = new ArrayList>();

for (int i = 0; i < numThreads; i++) {

streams.add(KafkaUtils.createStream(jssc, zkQuorum, group, topicMap));

}

/*

* 从kafka消费数据的RDD

* */

JavaPairDStream streamsRDD = streams.get(0);

for (int i = 1; i < streams.size(); i++) {

streamsRDD = streamsRDD.union(streams.get(i));

}

 

/*

* kafka消息形如: {"id": ${uuid}, "type": 1, "profit": 35}

* 统计结果, 以type分组的总收益

* mapToPair, 将kafka消费的数据, 转化为type-profit key-value对

* reduceByKey, 以type分组, 聚合profit

* */

JavaPairDStream profits = streamsRDD.mapToPair(new PairFunction, Integer, Integer>() {

@Override

public Tuple2 call(Tuple2 s_tuple2) throws Exception {

JSONObject jsonObject = JSON.parseObject(s_tuple2._2);

return new Tuple2(jsonObject.getInteger("type"), jsonObject.getInteger("profit"));

}

}).reduceByKey(new Function2() {

@Override

public Integer call(Integer i1, Integer i2) throws Exception {

return i1 + i2;

}

});

 

/*

* 输出结果到MySQL

* 需要为每一个partition创建一个MySQL句柄, 使用foreachPartition

* */

profits.foreachRDD(new Function, Void>() {

@Override

public Void call(JavaPairRDD integerIntegerJavaPairRDD) throws Exception {

 

integerIntegerJavaPairRDD.foreachPartition(new VoidFunction>>() {

@Override

public void call(Iterator> tuple2Iterator) throws Exception {

Connection connection = ConnectionPool.getConnection();

Statement stmt = connection.createStatement();

long timestamp = System.currentTimeMillis();

while(tuple2Iterator.hasNext()) {

Tuple2 tuple = tuple2Iterator.next();

Integer type = tuple._1;

Integer profit = tuple._2;

String sql = String.format("insert into `order` (`type`, `profit`, `time`) values (%s, %s, %s)", type, profit, timestamp);

stmt.executeUpdate(sql);

}

ConnectionPool.returnConnection(connection);

}

});

return null;

}

});

 

/*

* 开始计算, 等待计算结束

* */

jssc.start();

try {

jssc.awaitTermination();

} catch (Exception ex) {

ex.printStackTrace();

} finally {

jssc.close();

}

}

}

4)打包方法

 

 

编写pom.xml build tag。

 

 

mvn clean package打包即可。

 

 

pom.xml

maven-assembly-plugin

com.xiaoju.dqa.realtime_streaming.OrderProfitAgg

 

 

 

jar-with-dependencies

 

 

make-assembly

package

single

 

 

 

 

org.apache.maven.plugins

maven-compiler-plugin

1.6

1.6

 

 

 

 

 

 

3、执行与结果

 

 

1)执行kafka_producer.py

python kafka_producer.py

2) 执行spark-streaming

这里使用的是默认参数提交yarn队列。

spark-submit --queue=root.XXXX realtime-streaming-1.0-SNAPSHOT-jar-with-dependencies.jar

3)查看结果

 

 

到MySQL中查看结果,每隔10秒会聚合出type=1-5的5条数据。

 

 

例如第一条数据,就是type=4这种类型的业务,在10s内收益是555473元。业务量惊人啊。哈哈。

spark-streaming集成Kafka工程实例

更多学习资料请点击大数据教程

相关推荐
[免责声明]本文来源于网络转载,仅供学习交流使用,不构成商业目的。版权归原作者所有,如涉及作品内容、版权和其它问题请在30日内与本网联系,我们将在第一时进行处理

值班手机:18501996998

咨询QQ: 226594285 / 428683440

校区地址:北京市海淀区中关村科技园首农蓝海中心C座-7层

全国咨询热线:400-690-5006

点击关注:

魔据教育官方微博

魔据官方微信