消费kafka数据,flink经过清洗,计算后然后写入Elastic,
注意这里使用的elasticsearch是7.5.1版本,下载地址
https://www.elastic.co/cn/downloads/past-releases#elasticsearch
1、maven相关依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>kafka2flink2elastic</artifactId>
<version>1.0-SNAPSHOT</version>
<description>消费kafka数据,flink经过清洗,计算后然后写入Elastic </description>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.14.4</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<hutool-all.version>5.3.10</hutool-all.version>
</properties>
<dependencies>
<!-- org.apache.flink 相关依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-state-processor-api_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
<version>1.13.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro-confluent-registry</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- org.apache.flink 2 elasticsearch 相关依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_2.12</artifactId>
<version>1.14.4</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch-base_${scala.binary.version}</artifactId>
<version>1.14.4</version>
<exclusions>
<!-- Elasticsearch Java Client has been moved to a different module in 5.x -->
<exclusion>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.5.1</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-core</artifactId>
<version>7.5.1</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>7.5.1</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.5.1</version>
</dependency>
<!-- org.apache.flink 2 kafka 相关依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
<!-- 相关工具类 -->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>${hutool-all.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.10</version>
</dependency>
</dependencies>
</project>
1、测试类
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.util.RandomUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.xcontent.XContentType;
import java.util.ArrayList;
import java.util.List;
public class FlinkElasticsearchSinkDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
// 2、创建数据源
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("172.16.1.61:9092")
.setTopics("source.ad_report_data")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
// 2、获取kafka数据源的 dataStream数据流
DataStreamSource<String> dataStreamSource = environment.fromSource(source, WatermarkStrategy.noWatermarks(), "report_data_ad");
// 3、对数据进行转换
SingleOutputStreamOperator<ReportDataAdEntity> map = dataStreamSource.map((MapFunction<String, ReportDataAdEntity>) value -> {
ReportDataAdEntity reportDataAdEntity = new ReportDataAdEntity();
JSONObject object = JSONUtil.parseObj(value);
BeanUtil.copyProperties(object, reportDataAdEntity);
return reportDataAdEntity;
});
// 利用es客户端api处理数据,并写入到es索引中
// 1. 连接至es http://172.16.1.61:9200/
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("172.16.1.61", 9200, "http"));
// 2. 创建esSink
ElasticsearchSink.Builder<ReportDataAdEntity> esSinkBuilder = new ElasticsearchSink.Builder<ReportDataAdEntity>(httpHosts,
new ElasticsearchSinkFunction<ReportDataAdEntity>() {
@Override
public void process(ReportDataAdEntity s, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
System.out.println("======接收输入数据=======>" + s);
String json = JSONUtil.parseObj(s).toStringPretty();
IndexRequest indexRequest = Requests.indexRequest("posts");
indexRequest.index("flink_stream");
indexRequest.type("create");
indexRequest.opType(DocWriteRequest.OpType.CREATE);
indexRequest.id(RandomUtil.randomNumbers(20));
indexRequest.source(json, XContentType.JSON);
requestIndexer.add(indexRequest);
}
});
// 3. 数据输出
esSinkBuilder.setBulkFlushMaxActions(1);
// 将es处理逻辑作为sink,处理dataStreamSource
map.addSink(esSinkBuilder.build());
// 启动执行
environment.execute("flink-es");
}
}
实体
import lombok.Data;
import java.io.Serializable;
/**
* <p>
* report_data_ad
* </p>
*
* @author zhy
* @since 2022-02-22
*/
@Data
public class ReportDataAdEntity implements Serializable {
private static final long serialVersionUID = 1L;
private String day;
private Long timestamp;
/**
* 产品线
*/
private Integer pid;
private Integer os;
/**
* 终端型号
*/
private String device;
private String brand;
private String channel;
private String uuid;
private String version;
/**
* 客户端时间
*/
private String ctime;
private String nt;
private String imei;
private String mac;
private Integer uid;
private String city;
private String clientIp;
/**
* 广告动作码0请求代理层,1代理层请求成功,2代理层请求失败,3请求第三方广告,4第三方广告获取成功,5第三方广告获取失败,6第三方广告请求超时,7广告曝光,8广告点击,9跳过广告,10开始下载apk,11apk下载已完成,12点击调起app,13app调起成功,14下载app安装成功
*/
private Integer adac;
/**
* 广告源
*/
private String adSrc;
/**
* 广告位
*/
private String adPos;
/**
* sdk或api返回第三方错误码时上报,ad接口返回失败时,上报失败类型,0、1、2、3、4、5、6 【0=渠道关闭、1=provider ID为空、2=adid 为空、3=请求超时、4=网络错误、5=其他错误、6=拉取素材失败(直客类型】
*/
private String ecode;
private String url;
/**
* 获取广告数据成功时长
*/
private Integer time;
/**
* 广告位ID
*/
private String adId;
/**
* 广告类型
*/
private String adType;
/**
* 广告策略类型(策略广告,兜底广告)
*/
private Integer adCategory;
/**
* 素材url
*/
private String contentUrl;
/**
* 素材标题
*/
private String contentTitle;
/**
* 素材类型
*/
private String contentType;
/**
* 第三方返回错误码时,有详细信息时上报扩展
*/
private String extendEcodeDetail;
/**
* 开屏广告区分新启动和后台返回:launch=new,launch=back
*/
private String extendLaunch;
private String extendSplashid;
/**
* 当广告为直投类型素材时,区分点击类型, extend_type=adk,extend_type=h5
*/
private String extendType;
private String aeid;
/**
* 片源
*/
private String adPlaySrc;
/**
* 素材曝光时上报缓存状态0实时请求素材,1缓存素材
*/
private String matType;
private String aeidFrom;
private String adPosFrom;
private String bidding;
private String ecpm;
}
最后修改于 2022-04-27 17:37:50
如果觉得我的文章对你有用,请随意赞赏
扫一扫支付

