- 读CSV文件
- 脚本示例
- Csv Batch Source
- 脚本运行结果
- Csv Stream Source
- 脚本运行结果
读CSV文件
功能介绍
读CSV文件。支持从本地、hdfs、http读取
参数说明
| 名称 |
中文名称 |
描述 |
类型 |
是否必须? |
默认值 |
| filePath |
文件路径 |
文件路径 |
String |
✓ |
|
| schemaStr |
Schema |
Schema。格式为”colname coltype[, colname2, coltype2[, …]]”,例如”f0 string, f1 bigint, f2 double” |
String |
✓ |
|
| fieldDelimiter |
字段分隔符 |
字段分隔符 |
String |
|
“,” |
| quoteChar |
引号字符 |
引号字符 |
Character |
|
“\”” |
| skipBlankLine |
是否忽略空行 |
是否忽略空行 |
Boolean |
|
true |
| rowDelimiter |
行分隔符 |
行分隔符 |
String |
|
“\n” |
| ignoreFirstLine |
是否忽略第一行数据 |
是否忽略第一行数据 |
Boolean |
|
false |
|
支持的字段类型包括:
| 字段类型 |
描述 |
值域 |
Flink类型 |
Java类型 |
| VARCHAR/STRING |
可变长度字符串 |
最大容量为4mb |
Types.STRING |
java.lang.String |
| BOOLEAN |
逻辑值 |
值:TRUE,FALSE,UNKNOWN |
Types.BOOLEAN |
java.lang.Boolean |
| TINYINT |
微整型,1字节整数 |
范围是-128到127 |
Types.BYTE |
java.lang.Byte |
| SMALLINT |
短整型,2字节整数 |
范围为-32768至32767 |
Types.SHORT |
java.lang.Short |
| INT |
整型,4字节整数 |
范围是-2147483648到2147483647 |
Types.INT |
java.lang.Integer |
| BIGINT/LONG |
长整型,8字节整数 |
范围是-9223372036854775808至9223372036854775807 |
Types.LONG |
java.lang.Long |
| FLOAT |
4字节浮点数 |
6位数字精度 |
Types.FLOAT |
java.lang.Float |
| DOUBLE |
8字节浮点数 |
15位十进制精度 |
Types.DOUBLE |
java.lang.Double |
| DECIMAL |
小数类型 |
示例:123.45是DECIMAL(5,2)值 |
Types.DECIMAL |
java.math.BigDecimal |
| DATE |
日期 |
示例:’1969-07-20’ |
Types.SQL_DATE |
java.sql.Date |
| TIME |
时间 |
示例:’20:17:40’ |
Types.SQL_TIME |
java.sql.Time |
| TIMESTAMP |
时间戳,日期和时间 |
示例:’1969-07-20 20:17:40’ |
Types.SQL_TIMESTAMP |
java.sql.Timestamp |
关于分隔符的说明:
Web前端支持用户输入如下转义字符和unicode字符作为分隔符:
| 输入分隔符 |
含义 |
| \t |
制表符(tab键) |
| \n |
换行符 |
| \b |
退格符 |
| \r |
回车 |
| \f |
换页 |
| \\ |
反斜线字符 |
| ‘ |
单引号 |
| “ |
双引号 |
| \ddd |
1到3位八进制数所代表的任意字符,例如\001表示’ctrl + A’, \40表示空格符 |
| \udddd |
1到4位十六进制数所代表unicode字符,例如\u0001表示’ctrl + A’, \u0020表示空格符 |
脚本示例
Csv Batch Source
filePath = 'http://alink-dataset.cn-hangzhou.oss.aliyun-inc.com/csv/iris.csv'schema = 'sepal_length double, sepal_width double, petal_length double, petal_width double, category string'csvSource = CsvSourceBatchOp()\ .setFilePath(filePath)\ .setSchemaStr(schema)\ .setFieldDelimiter(",")BatchOperator.collectToDataframe(csvSource)
脚本运行结果
sepal_length sepal_width petal_length petal_width category0 6.3 3.3 6.0 2.5 Iris-virginica1 5.6 2.8 4.9 2.0 Iris-virginica2 5.0 3.3 1.4 0.2 Iris-setosa3 5.8 2.7 5.1 1.9 Iris-virginica4 7.0 3.2 4.7 1.4 Iris-setosa
Csv Stream Source
filePath = 'http://alink-dataset.cn-hangzhou.oss.aliyun-inc.com/csv/iris.csv'schema = 'sepal_length double, sepal_width double, petal_length double, petal_width double, category string'csvSource = CsvSourceStreamOp()\ .setFilePath(filePath)\ .setSchemaStr(schema)\ .setFieldDelimiter(",")csvSource.print()StreamOperator.execute()
脚本运行结果
sepal_length sepal_width petal_length petal_width category1 5.5 2.4 3.8 1.1 Iris-versicolor2 6.1 2.6 5.6 1.4 Iris-virginica3 6.0 2.2 4.0 1.0 Iris-versicolor4 5.5 2.4 3.7 1.0 Iris-versicolor5 4.6 3.1 1.5 0.2 Iris-setosa