消费kafka数据,flink经过清洗,计算后然后写入Elastic,

注意这里使用的elasticsearch是7.5.1版本,下载地址

https://www.elastic.co/cn/downloads/past-releases#elasticsearch

1、maven相关依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.example</groupId>
    <artifactId>kafka2flink2elastic</artifactId>
    <version>1.0-SNAPSHOT</version>
    <description>消费kafka数据,flink经过清洗,计算后然后写入Elastic </description>
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.14.4</flink.version>
        <scala.binary.version>2.12</scala.binary.version>
        <hutool-all.version>5.3.10</hutool-all.version>
    </properties>

    <dependencies>
        <!-- org.apache.flink 相关依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-state-processor-api_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime_${scala.binary.version}</artifactId>
            <version>1.13.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-avro</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-avro-confluent-registry</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- org.apache.flink 2 elasticsearch 相关依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch7_2.12</artifactId>
            <version>1.14.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch-base_${scala.binary.version}</artifactId>
            <version>1.14.4</version>
            <exclusions>
                <!-- Elasticsearch Java Client has been moved to a different module in 5.x -->
                <exclusion>
                    <groupId>org.elasticsearch</groupId>
                    <artifactId>elasticsearch</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>7.5.1</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-core</artifactId>
            <version>7.5.1</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-client</artifactId>
            <version>7.5.1</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>7.5.1</version>
        </dependency>

        <!-- org.apache.flink 2 kafka 相关依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.4.1</version>
        </dependency>

        <!-- 相关工具类 -->
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>${hutool-all.version}</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.10</version>
        </dependency>
    </dependencies>

</project>

1、测试类

import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.util.RandomUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.xcontent.XContentType;

import java.util.ArrayList;
import java.util.List;

public class FlinkElasticsearchSinkDemo {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2、创建数据源
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers("172.16.1.61:9092")
                .setTopics("source.ad_report_data")
                .setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        // 2、获取kafka数据源的 dataStream数据流
        DataStreamSource<String> dataStreamSource = environment.fromSource(source, WatermarkStrategy.noWatermarks(), "report_data_ad");

        // 3、对数据进行转换
        SingleOutputStreamOperator<ReportDataAdEntity> map = dataStreamSource.map((MapFunction<String, ReportDataAdEntity>) value -> {
            ReportDataAdEntity reportDataAdEntity = new ReportDataAdEntity();
            JSONObject object = JSONUtil.parseObj(value);
            BeanUtil.copyProperties(object, reportDataAdEntity);
            return reportDataAdEntity;
        });

        // 利用es客户端api处理数据,并写入到es索引中
        // 1. 连接至es http://172.16.1.61:9200/
        List<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost("172.16.1.61", 9200, "http"));
        // 2. 创建esSink
        ElasticsearchSink.Builder<ReportDataAdEntity> esSinkBuilder = new ElasticsearchSink.Builder<ReportDataAdEntity>(httpHosts,
                new ElasticsearchSinkFunction<ReportDataAdEntity>() {
                    @Override
                    public void process(ReportDataAdEntity s, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
                        System.out.println("======接收输入数据=======>" + s);
                        String json = JSONUtil.parseObj(s).toStringPretty();
                        IndexRequest indexRequest = Requests.indexRequest("posts");
                        indexRequest.index("flink_stream");
                        indexRequest.type("create");
                        indexRequest.opType(DocWriteRequest.OpType.CREATE);
                        indexRequest.id(RandomUtil.randomNumbers(20));
                        indexRequest.source(json, XContentType.JSON);
                        requestIndexer.add(indexRequest);
                    }
                });
        // 3. 数据输出
        esSinkBuilder.setBulkFlushMaxActions(1);
        // 将es处理逻辑作为sink,处理dataStreamSource
        map.addSink(esSinkBuilder.build());
        // 启动执行
        environment.execute("flink-es");
    }
}

实体

import lombok.Data;
import java.io.Serializable;

/**
 * <p>
 * report_data_ad
 * </p>
 *
 * @author zhy
 * @since 2022-02-22
 */
@Data
public class ReportDataAdEntity implements Serializable {

    private static final long serialVersionUID = 1L;

    private String day;

    private Long timestamp;

    /**
     * 产品线
     */
    private Integer pid;

    private Integer os;

    /**
     * 终端型号
     */
    private String device;

    private String brand;

    private String channel;

    private String uuid;

    private String version;

    /**
     * 客户端时间
     */
    private String ctime;

    private String nt;

    private String imei;

    private String mac;

    private Integer uid;

    private String city;

    private String clientIp;

    /**
     * 广告动作码0请求代理层,1代理层请求成功,2代理层请求失败,3请求第三方广告,4第三方广告获取成功,5第三方广告获取失败,6第三方广告请求超时,7广告曝光,8广告点击,9跳过广告,10开始下载apk,11apk下载已完成,12点击调起app,13app调起成功,14下载app安装成功
     */
    private Integer adac;

    /**
     * 广告源
     */
    private String adSrc;

    /**
     * 广告位
     */
    private String adPos;

    /**
     * sdk或api返回第三方错误码时上报,ad接口返回失败时,上报失败类型,0、1、2、3、4、5、6 【0=渠道关闭、1=provider ID为空、2=adid 为空、3=请求超时、4=网络错误、5=其他错误、6=拉取素材失败(直客类型】
     */
    private String ecode;

    private String url;

    /**
     * 获取广告数据成功时长
     */
    private Integer time;

    /**
     * 广告位ID
     */
    private String adId;

    /**
     * 广告类型
     */
    private String adType;

    /**
     * 广告策略类型(策略广告,兜底广告)
     */
    private Integer adCategory;

    /**
     * 素材url
     */
    private String contentUrl;

    /**
     * 素材标题
     */
    private String contentTitle;

    /**
     * 素材类型
     */
    private String contentType;

    /**
     * 第三方返回错误码时,有详细信息时上报扩展
     */
    private String extendEcodeDetail;

    /**
     * 开屏广告区分新启动和后台返回:launch=new,launch=back
     */
    private String extendLaunch;

    private String extendSplashid;

    /**
     * 当广告为直投类型素材时,区分点击类型, extend_type=adk,extend_type=h5
     */
    private String extendType;

    private String aeid;

    /**
     * 片源
     */
    private String adPlaySrc;

    /**
     * 素材曝光时上报缓存状态0实时请求素材,1缓存素材
     */
    private String matType;

    private String aeidFrom;

    private String adPosFrom;

    private String bidding;

    private String ecpm;


}

 

最后修改于 2022-04-27 17:37:50
如果觉得我的文章对你有用,请随意赞赏
扫一扫支付
上一篇