presto链接kafka数据源的source数据仓库

创建kafka数据
vim /opt/env/presto/presto/etc/catalog/kafka.properties
内容如下:
connector.name=kafka
kafka.table-names=source.ad_report_data,source.flink_kafka_test
kafka.nodes=172.16.1.61:9092
kafka.hide-internal-columns=false

/opt/env/presto/presto/bin/presto-cli --server localhost:8188 --catalog kafka --schema source
#查看所有表
show tables;
# 查看表结构
DESCRIBE ad_report_data;

SELECT _message FROM ad_report_data LIMIT 5;

SELECT sum(cast(json_extract_scalar(_message, '$.pid') AS bigint)) FROM ad_report_data LIMIT 10;
# 查看表结构
DESCRIBE ad_report_data;

      Column       |  Type   | Extra |                   Comment
-------------------+---------+-------+---------------------------------------------
 _partition_id     | bigint  |       | Partition Id
 _partition_offset | bigint  |       | Offset for the message within the partition
 _message_corrupt  | boolean |       | Message data is corrupt
 _message          | varchar |       | Message text
 _message_length   | bigint  |       | Total number of message bytes
 _key_corrupt      | boolean |       | Key data is corrupt
 _key              | varchar |       | Key text
 _key_length       | bigint  |       | Total number of key bytes
 _timestamp        | bigint  |       | Offset Timestamp
(9 rows)
我们查看表结构如下,json数据都在一个字段_message中,json的字段并不显示
接下来我们来映射一下json字段映射成表字段
vim /opt/env/presto/presto/etc/kafka/source.ad_report_data.json

{
  "tableName": "ad_report_data",
  "schemaName": "source",
  "topicName": "source.ad_report_data",
  "message": {
    "dataFormat": "json",
    "fields": [
      {
        "name": "day",
        "mapping": "day",
        "type": "VARCHAR"
      },
      {
        "name": "timestamp",
        "mapping": "timestamp",
        "type": "BIGINT"
      },
      {
        "name": "pid",
        "mapping": "pid",
        "type": "BIGINT"
      },
      {
        "name": "os",
        "mapping": "os",
        "type": "BIGINT"
      },
      {
        "name": "device",
        "mapping": "device",
        "type": "VARCHAR"
      },
      {
        "name": "brand",
        "mapping": "brand",
        "type": "VARCHAR"
      },
      {
        "name": "channel",
        "mapping": "channel",
        "type": "VARCHAR"
      },
      {
        "name": "version",
        "mapping": "version",
        "type": "VARCHAR"
      },
      {
        "name": "city",
        "mapping": "city",
        "type": "VARCHAR"
      },
      {
        "name": "ctime",
        "mapping": "ctime",
        "type": "VARCHAR"
      }
    ]
  }
}


我们再次查看表结构如下
presto:source> DESCRIBE ad_report_data;
      Column       |  Type   | Extra |                   Comment
-------------------+---------+-------+---------------------------------------------
 day               | varchar |       |
 timestamp         | bigint  |       |
 pid               | bigint  |       |
 os                | bigint  |       |
 device            | varchar |       |
 brand             | varchar |       |
 channel           | varchar |       |
 version           | varchar |       |
 city              | varchar |       |
 ctime             | varchar |       |
 _partition_id     | bigint  |       | Partition Id
 _partition_offset | bigint  |       | Offset for the message within the partition
 _message_corrupt  | boolean |       | Message data is corrupt
 _message          | varchar |       | Message text
 _message_length   | bigint  |       | Total number of message bytes
 _key_corrupt      | boolean |       | Key data is corrupt
 _key              | varchar |       | Key text
 _key_length       | bigint  |       | Total number of key bytes
 _timestamp        | bigint  |       | Offset Timestamp
插入json数据如下

{"adSrc":"ks","city":"CN_3_41_4","nt":"wifi","channel":"huawei","pid":2,"uuid":"f5d2e46e-39c2-3c69-a933-19df4bd976be","mac":"90:17:C8:3A:14:97","uid":17886491,"adType":"1001","extendLaunch":"","aeid":"aa54687bbc6a4192a1ee7afb1dad65de","ctime":"1643212833941","extendEcodeDetail":"","day":"2022-01-27","brand":"HUAWEI","contentType":"","adPlaySrc":"nets","timestamp":1643212800,"os":0,"adac":3,"contentTitle":"","version":"2.3.9","adCategory":0,"matType":"","url":"","ecode":"","contentUrl":"","adId":"5246000170","adPos":"PreRolls","clientIp":"121.27.90.80","imei":"ImeiUnknown","time":0,"device":"ART-AL00x","extendType":""}
 

最后修改于 2022-02-07 14:07:24
如果觉得我的文章对你有用,请随意赞赏
扫一扫支付
上一篇