1、csv连接器 CREATE TABLE employee_information ( emp_id INT, name VARCHAR, dept_id INT ) WITH ( 'connector' = 'filesystem', 'path' = '/path/to/something.csv', 'format' = 'csv' ); 2、jdbc连接器 https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/table/jdbc/ CREATE TABLE MyUserTable ( id BIGINT, name STRING, age INT, status BOOLEAN, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/mydatabase', 'table-name' = 'users' ); 3、kafka连接器 https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/table/kafka/ CREATE TABLE KafkaTable ( `event_time` TIMESTAMP(3) METADATA FROM 'timestamp', `partition` BIGINT METADATA VIRTUAL, `offset` BIGINT METADATA VIRTUAL, `user_id` BIGINT, `item_id` BIGINT, `behavior` STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv' );
最后修改于 2021-10-21 09:42:19
如果觉得我的文章对你有用,请随意赞赏
扫一扫支付

