参考: https://www.cnblogs.com/swordfall/p/10527423.html
flink 流处理写入数据到hbase. 采用的是批量写入(500条数据写入一次)。
HBaseWriter.java
package com.flink;
import com.flink.model.DeviceData;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
*
* 写入HBase
* 继承RichSinkFunction重写父类方法
*
* 写入hbase时500条flush一次, 批量插入, 使用的是writeBufferSize
*/
class HBaseWriter extends RichSinkFunction<DeviceData>{
private static final Logger logger = LoggerFactory.getLogger(HBaseWriter.class);
private static org.apache.hadoop.conf.Configuration configuration;
private static Connection connection = null;
private static BufferedMutator mutator;
private static int count = 0;
@Override
public void open(Configuration parameters) throws Exception {
configuration = HBaseConfiguration.create();
configuration.set("hbase.master", "192.168.3.101:60020");
configuration.set("hbase.zookeeper.quorum", "192.168.3.101");
configuration.set("hbase.zookeeper.property.clientPort", "2181");
try {
connection = ConnectionFactory.createConnection(configuration);
} catch (IOException e) {
e.printStackTrace();
}
BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf("t1"));
params.writeBufferSize(2 * 1024 * 1024);
mutator = connection.getBufferedMutator(params);
}
@Override
public void close() throws IOException {
if (mutator != null) {
mutator.close();
}
if (connection != null) {
connection.close();
}
}
@Override
public void invoke(DeviceData values, Context context) throws Exception {
//Date 1970-01-06 11:45:55 to 445555000
long unixTimestamp= 0;
try {
String gatherTime = values.GatherTime;
//毫秒和秒分开处理
if (gatherTime.length() > 20) {
long ms = Long.parseLong(gatherTime.substring(20, 23));
Date date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(gatherTime);
unixTimestamp = date.getTime() + ms;
} else {
Date date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(gatherTime);
unixTimestamp = date.getTime();
}
} catch (ParseException e) {
e.printStackTrace();
}
String RowKey = values.MachID + String.valueOf(unixTimestamp);
String Key = values.OperationValue;
String Value = values.OperationData;
System.out.println("Column Family=f1, RowKey=" + RowKey + ", Key=" + Key + " ,Value=" + Value);
Put put = new Put(RowKey.getBytes());
put.addColumn("f1".getBytes(), Key.getBytes(), Value.getBytes());
mutator.mutate(put);
//每满500条刷新一下数据
if (count >= 500){
mutator.flush();
count = 0;
}
count = count + 1;
}
}
Main.java
//写入hbase
dataStream.addSink(new HBaseWriter());
DeviceData.java
package com.flink.model;
/**
* 设备数据的数据结构
*/
class DeviceData {
String compID;
String machID;
String Type;
String gateMac;
String operationValue;
String operationData;
String gatherTime;
}
在多媒体通信领域,MRCP(Media Resource Control Protocol)协议被广泛用于控制语音识别和合成等媒体资源。UniMRCP是一个开源的MRCP实现,提供了客户端和服务端的库。UmcFramework是一个基于UniMRCP客户端库的示例应用程序框架,它帮助开发者快速集成和测试MRCP客户端功能。本文将详细介绍如何使用UmcFramework和unimrcpclient.xml配置文件连接到多个SIP设置,以及如何用C代码进行示例说明。
文章浏览阅读3k次。报错:java.net.ProtocolException: Server redirected too many times (20)1.没有检查到cookie,一直循环重定向。解决:CookieHandler.setDefault(new CookieManager(null, CookiePolicy.ACCEPT_ALL));URL url = new URL(url); ..._java.net.protocolexception: server redirected too many times (20)
文章浏览阅读4.1k次。问题这是部分报错信息2019-07-11 14:03:34.283 WARN [restartedMain][DirectJDKLog.java:175] - Failed to scan [file:/D:/repo/org/apache/derby/derby/10.14.2.0/derbyLocale_ja_JP.jar] from classloader hierarchyjava...._failed to scan from classloader hierarchy
文章浏览阅读2.8k次,点赞3次,收藏7次。在MATLAB中,ones函数用于创建一个指定大小的由1组成的矩阵或数组。_matlab中ones函数
文章浏览阅读3.9w次,点赞2次,收藏9次。 在使用电脑办公过程中,安装应用程序时难免遇到无法安装或者无法正常启动的问题,这对我们使用电脑带来了诸多不便。那遇到应用程序无法正常启动的问题要如何解决呢?相信大家肯定都是十分疑问的,每次都是只能忍痛重新安装软件。今天,小编就和大家探讨下应用程序无法正常启动的解决方法,帮助大家排忧解难。0xc000007b电脑图解1 第一种方案:SFC检查系统完整性来尝试修复丢失文件 1、打开电脑搜索输入cmd.exe,选择以管理员身份运行,跳出提示框时选择继续。0xc000007b电脑图解2_photoshop应用程序无法正常启动0xc000007b。请单击“确认”关闭应用程序。
文章浏览阅读396次。1、概念 REDO LOG是Oracle为确保已经提交的事务不会丢失而建立的一个机制。实际上REDO LOG的存在是为两种场景准备的:实例恢复(INSTANCE RECOVERY);介质恢复(MEDIA RECOVERY)。 实例恢复的目的是在数据库发生故障时,确保BUFFER CACHE中的数据不会丢失,不会造成数据库的..._oracle 实例恢复和介质恢复
文章浏览阅读418次。概述说明CAS内置了密码找回和密码修改的功能; 密码找回功能是,系统会吧密码重置的连接通过邮件或短信方式发送给用户,用户点击链接后就可以重置密码,cas还支持预留密码重置的问题,只有回答对了,才可以重置密码;系统可配置密码重置后,是否自动登录; 密码修改功能是,用户登录后输入新密码即可完成密码修改。安装步骤`1. 首先,搭建好cas sso server您需要按..._修改cas默认用户密码
文章浏览阅读141次。之前几章演示的熔断,降级 都是 RestTemplate + Ribbon 和RestTemplate + Hystrix ,但是在实际开发并不是这样,实际开发中都是 Feign 远程接口调用。Feign + Hystrix 演示: eruka(略)order 服务工程: pom.xml<?xml version="1.0" encoding="U..._this is order 服务工程
文章浏览阅读3.4k次,点赞35次,收藏43次。学习率是影响目标检测精度和速度的重要因素之一。合适的学习率调度策略可以加速模型的收敛和提高模型的精度。在YOLOv7算法中,可以使用基于余弦函数的学习率调度策略(Cosine Annealing Learning Rate Schedule)来调整学习率。
文章浏览阅读4k次,点赞4次,收藏9次。 linux中进程退出函数:exit()和_exit()的区别(1)_exit()执行后立即返回给内核,而exit()要先执行一些清除操作,然后将控制权交给内核。(2)调用_exit函数时,其会关闭进程所有的文件描述符,清理内存以及其他一些内核清理函数,但不会刷新流(stdin, stdout, stderr ...). exit函数是在_exit..._linux结束进程可以用哪些函数,它们之间有何区别?
文章浏览阅读134次。select 5000/10000.0 --想变成0.5select 5500/10000.0 --想变成0.55select 5550/10000.0 --想变成0.555select 5555/10000.0 --想变成0.5555其结果分别为:0.5000000 0.5500000 0.5550000 0.5555000一、如果想去掉数字5后面多余的0 ,需要转化一下:selec..._sql server 去小数 0
文章浏览阅读3.1k次。例一:import { Injectable } from '@angular/core';import { Observable } from 'rxjs';import { User } from "./model/User";import { map } from 'rxjs/operators';import { Http, Response, Headers, RequestOp..._angular6,requestoptions改成了什么