presto链接kafka数据源的source数据仓库
创建kafka数据
vim /opt/env/presto/presto/etc/catalog/kafka.properties
内容如下:
connector.name=kafka
kafka.table-names=source.ad_report_data,source.flink_kafka_test
kafka.nodes=172.16.1.61:9092
kafka.hide-internal-columns=false
/opt/env/presto/presto/bin/presto-cli --server localhost:8188 --catalog kafka --schema source
#查看所有表
show tables;
# 查看表结构
DESCRIBE ad_report_data;
SELECT _message FROM ad_report_data LIMIT 5;
SELECT sum(cast(json_extract_scalar(_message, '$.pid') AS bigint)) FROM ad_report_data LIMIT 10;
# 查看表结构
DESCRIBE ad_report_data;
Column | Type | Extra | Comment
-------------------+---------+-------+---------------------------------------------
_partition_id | bigint | | Partition Id
_partition_offset | bigint | | Offset for the message within the partition
_message_corrupt | boolean | | Message data is corrupt
_message | varchar | | Message text
_message_length | bigint | | Total number of message bytes
_key_corrupt | boolean | | Key data is corrupt
_key | varchar | | Key text
_key_length | bigint | | Total number of key bytes
_timestamp | bigint | | Offset Timestamp
(9 rows)
我们查看表结构如下,json数据都在一个字段_message中,json的字段并不显示
接下来我们来映射一下json字段映射成表字段
vim /opt/env/presto/presto/etc/kafka/source.ad_report_data.json
{
"tableName": "ad_report_data",
"schemaName": "source",
"topicName": "source.ad_report_data",
"message": {
"dataFormat": "json",
"fields": [
{
"name": "day",
"mapping": "day",
"type": "VARCHAR"
},
{
"name": "timestamp",
"mapping": "timestamp",
"type": "BIGINT"
},
{
"name": "pid",
"mapping": "pid",
"type": "BIGINT"
},
{
"name": "os",
"mapping": "os",
"type": "BIGINT"
},
{
"name": "device",
"mapping": "device",
"type": "VARCHAR"
},
{
"name": "brand",
"mapping": "brand",
"type": "VARCHAR"
},
{
"name": "channel",
"mapping": "channel",
"type": "VARCHAR"
},
{
"name": "version",
"mapping": "version",
"type": "VARCHAR"
},
{
"name": "city",
"mapping": "city",
"type": "VARCHAR"
},
{
"name": "ctime",
"mapping": "ctime",
"type": "VARCHAR"
}
]
}
}
我们再次查看表结构如下
presto:source> DESCRIBE ad_report_data;
Column | Type | Extra | Comment
-------------------+---------+-------+---------------------------------------------
day | varchar | |
timestamp | bigint | |
pid | bigint | |
os | bigint | |
device | varchar | |
brand | varchar | |
channel | varchar | |
version | varchar | |
city | varchar | |
ctime | varchar | |
_partition_id | bigint | | Partition Id
_partition_offset | bigint | | Offset for the message within the partition
_message_corrupt | boolean | | Message data is corrupt
_message | varchar | | Message text
_message_length | bigint | | Total number of message bytes
_key_corrupt | boolean | | Key data is corrupt
_key | varchar | | Key text
_key_length | bigint | | Total number of key bytes
_timestamp | bigint | | Offset Timestamp
插入json数据如下
{"adSrc":"ks","city":"CN_3_41_4","nt":"wifi","channel":"huawei","pid":2,"uuid":"f5d2e46e-39c2-3c69-a933-19df4bd976be","mac":"90:17:C8:3A:14:97","uid":17886491,"adType":"1001","extendLaunch":"","aeid":"aa54687bbc6a4192a1ee7afb1dad65de","ctime":"1643212833941","extendEcodeDetail":"","day":"2022-01-27","brand":"HUAWEI","contentType":"","adPlaySrc":"nets","timestamp":1643212800,"os":0,"adac":3,"contentTitle":"","version":"2.3.9","adCategory":0,"matType":"","url":"","ecode":"","contentUrl":"","adId":"5246000170","adPos":"PreRolls","clientIp":"121.27.90.80","imei":"ImeiUnknown","time":0,"device":"ART-AL00x","extendType":""}

