flink函数join和connect(Flink自定义实现OSS)
flink函数join和connect(Flink自定义实现OSS)import com.aliyun.oss.OSS; import com.aliyun.oss.OSSClientBuilder; import com.aliyun.oss.model.OSSObject; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; imp
导读:本文将讨论如何实现一个 Flink OSS Table Source 读取阿里云 OSS 上的目标 CSV 文件。
类设计思路
上图描述了 table connectors 从 API 中的纯声明到将在集群上执行的运行时代码的通用架构。官网在 User-defined Sources & Sinks | Apache Flink 这篇文章中对其进行了详细描述,并给出了一个自定义 socket Connector 实现例子,笔者这里就不再赘述。结合上面的信息,我们在实现 OSS Table Source 需要编写以下三个类
- OssTableConnecterFactory
 - OssDynamicTableSource
 - OssSourceFunction
 
- 根据文档 Java - 对象存储 OSS - 阿里云 的描述,要读取 OSS 上的文件需要有 endpoint、access-key-id、access-key-secret、bucket-name、object-name 这几个参数。
 - 为了使 OSS Table Source 具有可拓展性并能够解析多种数据类型,笔者便让连接器支持 format 属性。Flink 在接口设计上支持 Format 自定义,自定义的方式与自定义 Connector 相似,Connector 可根据 format 属性找到对应的 Format 对数据进行解析转换。Flink 已提供了部分内建 Format Formats | Apache Flink ,不满足需求时可自定义 Format。笔者认为这个设计非常棒,不但定义好的 Format 可被多个 Connector 共用、对于 Format 定义有一套规范的接口易于实现,而且指定 format 后可额外指定该 format 其他可配置参数使之能够适应更多解析场景。
 
综合上面的思路设计出的 OSS Table Source SQL 如下:
CREATE TABLE socketSource (
    name STRING 
    score INT)
WITH (
  'connector' = 'oss' 
  'endpoint' = 'XXXXXXXXXXXX' 
  'ACCESS-key-id' = 'XXXXXXXXXXXX' 
  'access-key-secret' = 'XXXXXXXXXXXX' 
  'bucket-name' = 'XXXXXXXXXXXX' 
  'object-name' = 'XXXXXXXXXXXX' 
  'format' = 'csv',
  'csv.field-delimiter' = '|'
);文件读取设计思路
    
考虑到存放于 OSS 上的文件数据量可能很大这会导致文件下载时间很长、占用内存很多的等问题,因此决定采用 流式下载 - 对象存储 OSS - 阿里云 流式读取进行。
这里考虑到数据以流的形式到达 Sink 端,如果每到达一条就执行一次 Insert 操作,那么频繁的 I/O 性能是比较差的。这里解决的思路不在于 OSS Table Source 端,而是 Sink 端需支持数据的缓存,就如 JDBC | Apache Flink 的 sink.buffer-flush.max-rows 和 sink.buffer-flush.interval 功能一样便可以解决此问题。
依赖<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>1.15.0</version>
    <scope>provided</scope>
</dependency>
<!-- 引入 CSV Format -->
<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-csv</artifactId>
	 <version>1.15.0</version>
</dependency>代码实现
    
OssTableConnecterFactory
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
/**
 * OssTableConnecterFactory
 *
 * @author wxb
 * @date 2022-05-23
 */
public class OssTableConnecterFactory implements DynamicTableSourceFactory {
    public static final ConfigOption<String> ENDPOINT = ConfigOptions.key("endpoint")
            .stringType()
            .noDefaultValue();
    public static final ConfigOption<String> ACCESS_KEY_ID = ConfigOptions.key("access-key-id")
            .stringType()
            .noDefaultValue();
    public static final ConfigOption<String> ACCESS_KEY_SECRET = ConfigOptions.key("access-key-secret")
            .StringType()
            .noDefaultValue();
    public static final ConfigOption<String> BUCKET_NAME = ConfigOptions.key("bucket-name")
            .stringType()
            .noDefaultValue();
    public static final ConfigOption<String> OBJECT_NAME = ConfigOptions.key("object-name")
            .stringType()
            .noDefaultValue();
    @Override
    public String factoryIdentifier() {
        return "oss";
    }
		
    @Override
    public Set<ConfigOption<?>> requiredOptions() {
        final Set<ConfigOption<?>> options = new HashSet<>();
        options.add(ENDPOINT);
        options.add(ACCESS_KEY_ID);
        options.add(ACCESS_KEY_SECRET);
        options.add(BUCKET_NAME);
        options.add(OBJECT_NAME);
        options.add(FactoryUtil.FORMAT);
        return options;
    }
    @Override
    public Set<ConfigOption<?>> optionalOptions() {
        return Collections.emptySet();
    }
    @Override
    public DynamicTableSource createDynamicTableSource(Context context) {
				
      	// helper 封装好了一些在实现自定义 Connector/Format 的通用方法
        final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this  context);
        // 根据 format 属性找到对应的 Format
        final DecodingFormat<DeserializationSchema<RowData>> decodingFormat = helper.discoverDecodingFormat(
                DeserializationFormatFactory.class 
                FactoryUtil.FORMAT);
        // 验证配置参数
        helper.validate();
      	// 读取配置
        final ReadableConfig options = helper.getOptions();
        final String endpoint = options.get(ENDPOINT);
        final String accessKeyId = options.get(ACCESS_KEY_ID);
        final String accessKeySecret = options.get(ACCESS_KEY_SECRET);
        final String bucketName = options.get(BUCKET_NAME);
        final String objectName = options.get(OBJECT_NAME);
        // 获取字段类型列表
        final DataType producedDataType =
                context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();
				
      	// 构建 OssDynamicTableSource
        return new OssDynamicTableSource(endpoint  accessKeyId  accessKeySecret  bucketName  objectName  decodingFormat  producedDataType);
    }
}
    
OssDynamicTableSource
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
/**
 * OssDynamicTableSource
 *
 * @author weixubin
 * @date 2022-95-24
 */
public class OssDynamicTableSource implements ScanTableSource {
    private final String endpoint;
    private final String accessKeyId;
    private final String accessKeySecret;
    private final String bucketName;
    private final String objectName;
    private final DecodingFormat<DeserializationSchema<RowData>> decodingFormat;
    private final DataType producedDataType;
    public OssDynamicTableSource(
            String endpoint 
            String accessKeyId 
            String accessKeySecret 
            String bucketName 
            String objectName 
            DecodingFormat<DeserializationSchema<RowData>> decodingFormat 
            DataType producedDataType) {
        this.endpoint = endpoint;
        this.accessKeyId = accessKeyId;
        this.accessKeySecret = accessKeySecret;
        this.bucketName = bucketName;
        this.objectName = objectName;
        this.decodingFormat = decodingFormat;
        this.producedDataType = producedDataType;
    }
    @Override
    public ChangelogMode getChangelogMode() {
        return decodingFormat.getChangelogMode();
    }
    @Override
    public ScanruntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
        // 创建发布到集群的运行时类
        final DeserializationSchema<RowData> deserializer = decodingFormat.createRuntimeDecoder(
                runtimeProviderContext 
                producedDataType);
        final SourceFunction<RowData> sourceFunction = new OssSourceFunction(
                endpoint 
                accessKeyId 
                accessKeySecret 
                bucketName 
                objectName 
                deserializer);
        return SourceFunctionProvider.of(sourceFunction  true);
    }
    @Override
    public DynamicTableSource copy() {
        return new OssDynamicTableSource(endpoint  accessKeyId  accessKeySecret  bucketName  objectName  decodingFormat  producedDataType);
    }
    @Override
    public String asSummaryString() {
        return "OSS Table Source";
    }
}
    
OssSourceFunction
import com.aliyun.oss.OSS;
import com.aliyun.oss.OSSClientBuilder;
import com.aliyun.oss.model.OSSObject;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.table.data.RowData;
import java.io.BufferedReader;
import java.io.InputStreamReader;
/**
 * OssSourceFunction
 *
 * @author wxb
 * @date 2022-05-23
 */
public class OssSourceFunction extends RichSourceFunction<RowData> implements ResultTypeQueryable<RowData> {
    private final String endpoint;
    private final String accessKeyId;
    private final String accessKeySecret;
    private final String bucketName;
    private final String objectName;
    private final DeserializationSchema<RowData> deserializer;
    private OSS currentOssClient;
    public OssSourceFunction(String endpoint  String accessKeyId  String accessKeySecret  String bucketName  String objectName  DeserializationSchema<RowData> deserializer) {
        this.endpoint = endpoint;
        this.accessKeyId = accessKeyId;
        this.accessKeySecret = accessKeySecret;
        this.bucketName = bucketName;
        this.objectName = objectName;
        this.deserializer = deserializer;
    }
    @Override
    public TypeInformation<RowData> getProducedType() {
        return deserializer.getProducedType();
    }
    @Override
    public void open(Configuration parameters) throws Exception {
        deserializer.open(RuntimeContextInitializationContextAdapters.deserializationAdapter(
                getRuntimeContext()));
        currentOssClient = new OSSClientBuilder().build(endpoint  accessKeyId  accessKeySecret);
    }
    @Override
    public void run(SourceContext<RowData> ctx) throws Exception {
        OSSObject ossObject = null;
        BufferedReader reader = null;
        try {
            ossObject = currentOssClient.getObject(bucketName  objectName);
            reader = new BufferedReader(new InputStreamReader(ossObject.getObjectContent()));
            while (true) {
                // 采用流读的方式
                String line = reader.readLine();
                if (line == null) {
                    break;
                } else {
                    // 根据指定的 Format 进行分析
                    ctx.collect(deserializer.deserialize(line.getBytes()));
                }
            }
        } catch (Throwable t) {
            t.printStackTrace();
        } finally {
            if (reader != null) {
                reader.close();
            }
            if (ossObject != null) {
                ossObject.close();
            }
        }
    }
    @Override
    public void cancel() {
        try {
            currentOssClient.shutdown();
        } catch (Throwable t) {
            // ignore
        }
    }
}
    
注意:需要在 META-INF/services/org.apache.flink.table.factories.Factory 加入 OssTableConnecterFactory
测试这里我们用一个简单的 python 脚本来生成一个具有 100W 行数据的 CSV 文件,每行数据具有 name 和 age 两列。
import csv
import random
def write_csv(s):
    with open('E:\\test\\flink_oss_test_bigdata.csv' 'a' newline='') as f:
        xieru = csv.writer(f dialect='excel')
        xieru.writerows(s)
        print('写入完成')
count  = 1000000
list = []
for i in range(count):
    # 随机生成指定字符
    name = ''.join(random.sample(['z' 'y' 'x' 'w' 'v' 'u' 't' 's' 'r' 'q' 'p' 'o' 'n' 'm' 'l' 'k' 'j' 'i' 'h' 'g' 'f' 'e' 'd' 'c' 'b' 'a']  3))
    # 随机生成数字
    age = random.randint(1  100)
    tup = (name age)
    list.append(tup)
write_csv(list)
    
SQL 编写,从 OSS 读取 CSV 文件并写入到 MySQL 中,MYSQL 设定为 5000 条刷新缓冲
CREATE TABLE sourceTable (
    name STRING 
    age INT)
WITH (
  'connector' = 'oss' 
  'endpoint' = 'XXXXXXXXXXXXXXXXX' 
  'access-key-id' = 'XXXXXXXXXXXXXXXXX' 
  'access-key-secret' = 'XXXXXXXXXXXXXXXXX' 
  'bucket-name' = 'XXXXXXXXXXXXXXXXX' 
  'object-name' = 'XXXXXXXXXXXXXXXXX' 
  'format' = 'csv'
);
CREATE TABLE sinkTable (
    name VARCHAR 
    age INT
)
WITH (
'connector' = 'jdbc' 
'url' = 'jdbc:mysql://XXXXXXX:3306/test?useSSL=false&rewriteBatchedStatements=true' 
'table-name' = 'csv_message' 
'driver' = 'com.mysql.cj.jdbc.Driver' 
'username' = 'XXXXX' 
'password' = 'XXXXX' 
'sink.buffer-flush.interval'='5s'  
'sink.buffer-flush.max-rows' = '5000');
INSERT INTO sinkTable(name age) SELECT name age FROM sourceTable;
    
成功运行

以上就是笔者自定义实现 Flink OSS Table Source 的过程及思路,这个 Source 实现得比较简单还有较多地方可进行拓展。这个简单的案例希望对各位了解 Flink 自定义 Connector 有所帮助。
感谢您的阅读,如果喜欢本文欢迎关注和转发,转载需注明出处,本头条号将持续分享IT技术知识。对于文章内容有其他想法或意见建议等,欢迎提出共同讨论共同进步。




