本文是《Flink的sink实战》系列的第三篇,主要内容是体验Flink官方的cassandra connector,整个实战如下图所示,我们先从kafka获取字符串,再执行wordcount操作,然后将结果同时打印和写入cassandra:
本次实战的软件版本信息如下:
本次用到的cassandra是三台集群部署的集群,搭建方式请参考《ansible快速部署cassandra3集群》
先创建keyspace和table:
cqlsh 192.168.133.168
CREATE KEYSPACE IF NOT EXISTS example
WITH replication = {
'class': 'SimpleStrategy', 'replication_factor': '3'};
CREATE TABLE IF NOT EXISTS example.wordcount (
word text,
count bigint,
PRIMARY KEY(word)
);
./kafka-topics.sh \
--create \
--bootstrap-server 127.0.0.1:9092 \
--replication-factor 1 \
--partitions 1 \
--topic test001
./kafka-console-producer.sh \
--broker-list kafka:9092 \
--topic test001
如果您不想写代码,整个系列的源码可在GitHub下载到,地址和链接信息如下表所示(https://github.com/zq2599/blog_demos):
名称 | 链接 | 备注 |
---|---|---|
项目主页 | https://github.com/zq2599/blog_demos | 该项目在GitHub上的主页 |
git仓库地址(https) | https://github.com/zq2599/blog_demos.git | 该项目源码的仓库地址,https协议 |
git仓库地址(ssh) | [email protected]:zq2599/blog_demos.git | 该项目源码的仓库地址,ssh协议 |
这个git项目中有多个文件夹,本章的应用在flinksinkdemo文件夹下,如下图红框所示:
flink官方的connector支持两种方式写入cassandra:
接下来分别使用这两种方式;
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-cassandra_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
package com.bolingcavalry.addsink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import java.util.Properties;
public class CassandraTuple2Sink {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度
env.setParallelism(1);
//连接kafka用到的属性对象
Properties properties = new Properties();
//broker地址
properties.setProperty("bootstrap.servers", "192.168.50.43:9092");
//zookeeper地址
properties.setProperty("zookeeper.connect", "192.168.50.43:2181");
//消费者的groupId
properties.setProperty("group.id", "flink-connector");
//实例化Consumer类
FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>(
"test001",
new SimpleStringSchema(),
properties
);
//指定从最新位置开始消费,相当于放弃历史消息
flinkKafkaConsumer.setStartFromLatest();
//通过addSource方法得到DataSource
DataStream<String> dataStream = env.addSource(flinkKafkaConsumer);
DataStream<Tuple2<String, Long>> result = dataStream
.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Long>> out) {
String[] words = value.toLowerCase().split("\\s");
for (String word : words) {
//cassandra的表中,每个word都是主键,因此不能为空
if (!word.isEmpty()) {
out.collect(new Tuple2<String, Long>(word, 1L));
}
}
}
}
)
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
result.addSink(new PrintSinkFunction<>())
.name("print Sink")
.disableChaining();
CassandraSink.addSink(result)
.setQuery("INSERT INTO example.wordcount(word, count) values (?, ?);")
.setHost("192.168.133.168")
.build()
.name("cassandra Sink")
.disableChaining();
env.execute("kafka-2.4 source, cassandra-3.11.6 sink, tuple2");
}
}
接下来尝试POJO写入,即业务逻辑中的数据结构实例被写入cassandra,无需指定SQL:
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>3.1.4</version>
<classifier>shaded</classifier>
<!-- Because the shaded JAR uses the original POM, you still need
to exclude this dependency explicitly: -->
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
package com.bolingcavalry.addsink;
import com.datastax.driver.mapping.annotations.Column;
import com.datastax.driver.mapping.annotations.Table;
@Table(keyspace = "example", name = "wordcount")
public class WordCount {
@Column(name = "word")
private String word = "";
@Column(name = "count")
private long count = 0;
public WordCount() {
}
public WordCount(String word, long count) {
this.setWord(word);
this.setCount(count);
}
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
public long getCount() {
return count;
}
public void setCount(long count) {
this.count = count;
}
@Override
public String toString() {
return getWord() + " : " + getCount();
}
}
package com.bolingcavalry.addsink;
import com.datastax.driver.mapping.Mapper;
import com.datastax.shaded.netty.util.Recycler;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import java.util.Properties;
public class CassandraPojoSink {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度
env.setParallelism(1);
//连接kafka用到的属性对象
Properties properties = new Properties();
//broker地址
properties.setProperty("bootstrap.servers", "192.168.50.43:9092");
//zookeeper地址
properties.setProperty("zookeeper.connect", "192.168.50.43:2181");
//消费者的groupId
properties.setProperty("group.id", "flink-connector");
//实例化Consumer类
FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>(
"test001",
new SimpleStringSchema(),
properties
);
//指定从最新位置开始消费,相当于放弃历史消息
flinkKafkaConsumer.setStartFromLatest();
//通过addSource方法得到DataSource
DataStream<String> dataStream = env.addSource(flinkKafkaConsumer);
DataStream<WordCount> result = dataStream
.flatMap(new FlatMapFunction<String, WordCount>() {
@Override
public void flatMap(String s, Collector<WordCount> collector) throws Exception {
String[] words = s.toLowerCase().split("\\s");
for (String word : words) {
if (!word.isEmpty()) {
//cassandra的表中,每个word都是主键,因此不能为空
collector.collect(new WordCount(word, 1L));
}
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(5))
.reduce(new ReduceFunction<WordCount>() {
@Override
public WordCount reduce(WordCount wordCount, WordCount t1) throws Exception {
return new WordCount(wordCount.getWord(), wordCount.getCount() + t1.getCount());
}
});
result.addSink(new PrintSinkFunction<>())
.name("print Sink")
.disableChaining();
CassandraSink.addSink(result)
.setHost("192.168.133.168")
.setMapperOptions(() -> new Mapper.Option[] {
Mapper.Option.saveNullFields(true) })
.build()
.name("cassandra Sink")
.disableChaining();
env.execute("kafka-2.4 source, cassandra-3.11.6 sink, pojo");
}
}
至此,flink的结果数据写入cassandra的实战就完成了,希望能给您一些参考;
文章浏览阅读3.8k次,点赞5次,收藏39次。★了解Qt和C++的关系★掌握Qt的信号/槽机制的原理和使用方法★了解Qt的元对象系统★掌握Qt的架构★理解Qt的事件模型,掌握其使用的时机信号与槽、元对象系统、事件模型是Qt机制的核心,如果您想要掌握Qt编程,就需要对它们有比较深入的了解。本章重点介绍了信号与槽的基本概念和用法、元对象系统、Qt的事件模型,以及它们在实际使用过程中应注意的一些问题。Qt对标准C++的扩展标准C++对象模型为面向对象编程提供了有效的实时支持,但是它的静态特性在一些领域中表现的不够灵活。事实上,GUI应用程序_qt原理
文章浏览阅读8.2k次,点赞3次,收藏25次。TI-RTOS概述TI-RTOS是CC2640R2F设备上蓝牙低能耗项目的运行环境。TI-RTOS内核是传统SYS/BIOS内核的定制版本,可作为具有驱动程序,同步和调度工具的实时抢占式多线程操作系统。线程模块TI-RTOS内核管理线程执行的四个不同的任务级别,如图21所示。线程模块列表如下图所示,按照优先级降序排列。硬件中断软件中断任务后台空闲功能的空闲任务_ti rtos 总中断
文章浏览阅读2k次,点赞2次,收藏4次。在开发过程中我们可以通过按需引入的方式引入所需要的组件,以达到减小项目体积的目的:步骤一:使用babel-plugin-component插件。运行命令行npm install babel-plugin-component -D2、修改babel.config.js文件module.exports = { presets: ['@vue/cli-plugin-babel/preset'], plugins: [ [ 'component',_项目里面没找的.babelrc文件怎么按需引入elment
文章浏览阅读7.9k次,点赞7次,收藏11次。再做闪屏页广告的时候,如果是视频媒体,通常用户都不想听广告到底在播什么。如果是 MediaPlayer 的话设置静音模式mediaPlayer.setVolume(0f, 0f);设置有声模式mediaPlayer.setVolume(1, 1);假如是VideoView呢,MediaPlayer对象是私有成员,没办法直接获取到,咋办videoView.setOn..._android standardgsyvideoplayer 设置静音播放
文章浏览阅读8.2k次,点赞2次,收藏3次。使用lambda表达式分别 根据 单个字段、多个字段,分组求和示意图:1、根据 单个字段,分组求和:根据2019这个字段,计算一个list集合里,同属于2019的某个字段累加和2、根据 多个字段,分组求和:(1)先根据2019这个字段,再根据1这个字段,计算一个list集合里,同属于2019和1的某个字段累加和;(2)先根据2019这个字段,再根据2这个字段,计算一个list集合里,同属于2019..._jdk8分组求和
文章浏览阅读6.9k次,点赞58次,收藏526次。双十一,囤点什么书呢?小编想了想,新书?经典书?畅销书?感觉似乎每一类都值得推荐 。每一年出版社的专业新书上百本,如何在适宜的时间选您需要的好书?双十一就是一个很好的时间选择。让小编带你浏览本年度最值得推荐的双十一好书。经典书必备(思维修炼必备)1、编程珠玑(第2版)本书是计算机科学方面的经典名著。书的内容围绕程序设计人员面对的一系列实际问题展开。作者Jon Bentley 以其独有的洞察力和创造力,引导读者理解这些问题并学会解决方法,而这些正是程序员实际编程生涯中至关重要的。本书的特._编程屯书癖
文章浏览阅读5k次。webpack打包时如何修改文件名在使用webpack进行项目打包的时候,我们可通过以下方式对不同类型的资源,进行文件名或文件路径的修改_webpack打包文件名称设置
文章浏览阅读5.7k次。自定义dialog/** * Created by zhaoxiaoyu on 2019/10/31 0031. */public class CardDialog extends Dialog{ private DialogCardBinding cardBinding; private CardViewModel cardViewModel; private int num; public CardDialog(@NonNull Context context_android dialog中使用datebinding
文章浏览阅读6.3w次,点赞18次,收藏78次。零基础搭建服务器(型号:DELL PowerEdge R740)step1:开机按Ctrl+R删除默认的磁盘组step2:创建虚拟磁盘step3:选择RAID5,分三个硬盘(raid-5最少需要三个),VD-size我给100G用来装系统注意有时候是现实TB如下图step4:再分个盘,用来存储数据把身下的几T都给它,直接OK就行。step5:创建热盘,选中OK,按ESC回车保存退..._r740安装server2016
文章浏览阅读950次。最近在学后台,数据表设计设计就懵了,感觉并不知道如何设计,于是在QQ上问了公司的专业人士——数据库工程师。感觉学到很多,但因为用的还不够,知识上还是匮乏阶段,所以先整理记录,慢慢消化。 问:大神,问下数据库结构如下,在实际设计中会不会使用外键呢?还是单纯使用逻辑让数据保持一致? 答:独立表,像你列举的公司表,部门表这种相互关系并不大的,一般不会用外键。有外键会影响表的删除_访谈数据库的构建以及资料整理
文章浏览阅读5.7k次。第一周云计算什么是云计算云计算是一种模式,该模式允许用户通过无所不在的、便捷的、按需获得的网络,接入到一个可动态配置的共享计算资源池(其中包括网络设备、服务器、存储、应用以及业务),并且以最小的管理代价或业务交互复杂度即可实现这些可配置计算资源的快速发放与发布。Saas软件即服务面临的挑战和它的适用性挑战一:转换成本高。解决方案是提升服务质量,增强客户满意度。挑战二:有限的..._服务工程和服务计算
文章浏览阅读1.8k次。本文所使用的是 128×64 尺寸的屏幕(0.96寸oled),它的接口很简单,只有 4 个 Pin 脚:VCC,GND,SDA,SCL,所以它与树莓派的连接也很简单,如下图所示(本文使用树莓派 3B)开启 I2C 接口树莓派默认是不开启 I2C 接口的,所以我们需要手动打开它。执行以下命令:sudo apt-get install -y python-smbussudo apt-get install -y i2c-toolssudo raspi-config然后按下动图方式开启 I2C _树莓派如何让0.96oled显示终端命令