• 读CSV文件
    • 功能介绍
    • 参数说明
      • 支持的字段类型包括:
      • 关于分隔符的说明:
  • 脚本示例
    • Csv Batch Source
    • 脚本运行结果
    • Csv Stream Source
    • 脚本运行结果

    读CSV文件

    功能介绍

    读CSV文件。支持从本地、hdfs、http读取

    参数说明

    名称 中文名称 描述 类型 是否必须? 默认值
    filePath 文件路径 文件路径 String
    schemaStr Schema Schema。格式为”colname coltype[, colname2, coltype2[, …]]”,例如”f0 string, f1 bigint, f2 double” String
    fieldDelimiter 字段分隔符 字段分隔符 String “,”
    quoteChar 引号字符 引号字符 Character “\””
    skipBlankLine 是否忽略空行 是否忽略空行 Boolean true
    rowDelimiter 行分隔符 行分隔符 String “\n”
    ignoreFirstLine 是否忽略第一行数据 是否忽略第一行数据 Boolean false

    支持的字段类型包括:

    字段类型 描述 值域 Flink类型 Java类型
    VARCHAR/STRING 可变长度字符串 最大容量为4mb Types.STRING java.lang.String
    BOOLEAN 逻辑值 值:TRUE,FALSE,UNKNOWN Types.BOOLEAN java.lang.Boolean
    TINYINT 微整型,1字节整数 范围是-128到127 Types.BYTE java.lang.Byte
    SMALLINT 短整型,2字节整数 范围为-32768至32767 Types.SHORT java.lang.Short
    INT 整型,4字节整数 范围是-2147483648到2147483647 Types.INT java.lang.Integer
    BIGINT/LONG 长整型,8字节整数 范围是-9223372036854775808至9223372036854775807 Types.LONG java.lang.Long
    FLOAT 4字节浮点数 6位数字精度 Types.FLOAT java.lang.Float
    DOUBLE 8字节浮点数 15位十进制精度 Types.DOUBLE java.lang.Double
    DECIMAL 小数类型 示例:123.45是DECIMAL(5,2)值 Types.DECIMAL java.math.BigDecimal
    DATE 日期 示例:’1969-07-20’ Types.SQL_DATE java.sql.Date
    TIME 时间 示例:’20:17:40’ Types.SQL_TIME java.sql.Time
    TIMESTAMP 时间戳,日期和时间 示例:’1969-07-20 20:17:40’ Types.SQL_TIMESTAMP java.sql.Timestamp

    关于分隔符的说明:

    Web前端支持用户输入如下转义字符和unicode字符作为分隔符:

    输入分隔符 含义
    \t 制表符(tab键)
    \n 换行符
    \b 退格符
    \r 回车
    \f 换页
    \\ 反斜线字符
    单引号
    双引号
    \ddd 1到3位八进制数所代表的任意字符,例如\001表示’ctrl + A’, \40表示空格符
    \udddd 1到4位十六进制数所代表unicode字符,例如\u0001表示’ctrl + A’, \u0020表示空格符

    脚本示例

    Csv Batch Source

    1. filePath = 'http://alink-dataset.cn-hangzhou.oss.aliyun-inc.com/csv/iris.csv'
    2. schema = 'sepal_length double, sepal_width double, petal_length double, petal_width double, category string'
    3. csvSource = CsvSourceBatchOp()\
    4. .setFilePath(filePath)\
    5. .setSchemaStr(schema)\
    6. .setFieldDelimiter(",")
    7. BatchOperator.collectToDataframe(csvSource)

    脚本运行结果

    1. sepal_length sepal_width petal_length petal_width category
    2. 0 6.3 3.3 6.0 2.5 Iris-virginica
    3. 1 5.6 2.8 4.9 2.0 Iris-virginica
    4. 2 5.0 3.3 1.4 0.2 Iris-setosa
    5. 3 5.8 2.7 5.1 1.9 Iris-virginica
    6. 4 7.0 3.2 4.7 1.4 Iris-setosa

    Csv Stream Source

    1. filePath = 'http://alink-dataset.cn-hangzhou.oss.aliyun-inc.com/csv/iris.csv'
    2. schema = 'sepal_length double, sepal_width double, petal_length double, petal_width double, category string'
    3. csvSource = CsvSourceStreamOp()\
    4. .setFilePath(filePath)\
    5. .setSchemaStr(schema)\
    6. .setFieldDelimiter(",")
    7. csvSource.print()
    8. StreamOperator.execute()

    脚本运行结果

    1. sepal_length sepal_width petal_length petal_width category
    2. 1 5.5 2.4 3.8 1.1 Iris-versicolor
    3. 2 6.1 2.6 5.6 1.4 Iris-virginica
    4. 3 6.0 2.2 4.0 1.0 Iris-versicolor
    5. 4 5.5 2.4 3.7 1.0 Iris-versicolor
    6. 5 4.6 3.1 1.5 0.2 Iris-setosa