• Spark Writer
    • 概述
    • 获取 Spark Writer
      • 编译源码
      • 从云存储 OSS 下载
    • 使用流程
      • 构图
      • 数据示例
        • 含有地理位置 Geo 的数据
        • 数据源文件
          • HDFS 文件
          • 数据库
      • 编写配置文件
        • Spark 配置信息
        • Nebula Graph 配置信息
        • tags 和 edges 映射信息
        • 数据源映射
      • 执行命令导入数据
    • 性能测试结果

    Spark Writer

    概述

    Spark Writer 是 Nebula Graph 基于 Spark 的分布式数据导入工具,能够将多种数据仓库中的数据转化为图的点和边,并批量导入到图数据库中。目前支持的数据仓库有:

    • HDFS,包括 Parquet、JSON、ORC 和 CSV 格式的文件
    • HIVE

    Spark Writer 支持并发导入多个 tag、edge,支持不同 tag/edge 配置不同的数据仓库。

    获取 Spark Writer

    编译源码

    1. git clone https://github.com/vesoft-inc/nebula.git
    2. cd nebula/src/tools/spark-sstfile-generator
    3. mvn compile package

    或者直接下载

    从云存储 OSS 下载

    1. wget https://nebula-graph.oss-accelerate.aliyuncs.com/jar-packages/sst.generator-1.0.0-beta.jar

    使用流程

    基本流程分为以下几步:

    1. 在 Nebula Graph 中创建图模型,构图
    2. 编写数据文件
    3. 编写输入源映射文件
    4. 导入数据

    构图

    构图请参考快速试用中的示例构图。

    注意:请先在 Nebula Graph 中完成构图(创建图空间和定义图数据 Schema),再通过本工具向 Nebula Graph 中写入数据。

    数据示例

    顶点数据文件由一行一行的数据组成,文件中每一行表示一个点和它的属性。一般来说,第一列为点的 ID ——此列的名称将在后文的映射文件中指定,其他列为点的属性。

    • player 顶点数据
    1. {"id":100,"name":"Tim Duncan","age":42}
    2. {"id":101,"name":"Tony Parker","age":36}
    3. {"id":102,"name":"LaMarcus Aldridge","age":33}

    边数据文件由一行一行的数据组成,文件中每一行表示一条边和它的属性。一般来说,第一列为起点 ID,第二列为终点 ID,起点 ID 列及终点 ID 列会在映射文件中指定。其他列为边属性。下面以 JSON 格式为例进行说明。

    以边 边 follow 的数据为例:

    • 无 rank 的边
    1. {"source":100,"target":101,"likeness":95}
    2. {"source":101,"target":100,"likeness":95}
    3. {"source":101,"target":102,"likeness":90}
    • 有 rank 的边
    1. {"source":100,"target":101,"likeness":95,"ranking":2}
    2. {"source":101,"target":100,"likeness":95,"ranking":1}
    3. {"source":101,"target":102,"likeness":90,"ranking":3}

    含有地理位置 Geo 的数据

    Spark Writer 支持 Geo 数据导入,Geo 数据用 latitudelongitude 字段描述经纬度,数据类型为 double。

    1. {"latitude":30.2822095,"longitude":120.0298785,"target":0,"dp_poi_name":"0"}
    2. {"latitude":30.2813834,"longitude":120.0208692,"target":1,"dp_poi_name":"1"}
    3. {"latitude":30.2807347,"longitude":120.0181162,"target":2,"dp_poi_name":"2"}
    4. {"latitude":30.2812694,"longitude":120.0164896,"target":3,"dp_poi_name":"3"}

    数据源文件

    目前 Spark Writer 支持的数据源有:

    • HDFS
    • HIVE
    HDFS 文件

    支持的文件格式包括:

    • Parquet
    • JSON
    • CSV
    • ORC

    Player 的 Parquet 示例如下:

    1. +---+---+------------+
    2. |age| id| name|
    3. +---+---+------------+
    4. | 42|100| Tim Duncan |
    5. | 36|101| Tony Parker|
    6. +---+---+------------+

    JSON 示例如下:

    1. {"id":100,"name":"Tim Duncan","age":42}
    2. {"id":101,"name":"Tony Parker","age":36}

    CSV 示例如下:

    1. age,id,name
    2. 42,100,Tim Duncan
    3. 36,101,Tony Parker
    数据库

    Spark Writer 支持以数据库作为数据源,目前支持 HIVE。

    Player 表结构如下:

    col_name data_type comment
    id int
    name string
    age int

    编写配置文件

    配置文件由 Spark 相关信息,Nebula 相关信息,以及 tags 映射 和 edges 映射块组成。Spark 信息配置了 Spark 运行的相关参数,Nebula 相关信息配置了连接 Nebula Graph 的用户名和密码等信息。 tags 映射和 edges 映射分别对应多个 tag/edge 的输入源映射,描述每个 tag/edge 的数据源等基本信息,不同 tag/edge 可以来自不同数据源。

    输入源的映射文件示例:

    1. {
    2. # Spark 相关信息配置
    3. # 参见: http://spark.apache.org/docs/latest/configuration.html
    4. spark: {
    5. app: {
    6. name: Spark Writer
    7. }
    8. driver: {
    9. cores: 1
    10. maxResultSize: 1G
    11. }
    12. cores {
    13. max: 16
    14. }
    15. }
    16. # Nebula Graph 相关信息配置
    17. nebula: {
    18. # 查询引擎 IP 列表
    19. addresses: ["127.0.0.1:3699"]
    20. # 连接 Nebula Graph 服务的用户名和密码
    21. user: user
    22. pswd: password
    23. # Nebula Graph 图空间名称
    24. space: test
    25. # thrift 超时时长及重试次数
    26. # 如未设置,则默认值分别为 3000 和 3
    27. connection {
    28. timeout: 3000
    29. retry: 3
    30. }
    31. # nGQL 查询重试次数
    32. # 如未设置,则默认值为 3
    33. execution {
    34. retry: 3
    35. }
    36. }
    37. # 标签处理
    38. tags: {
    39. # 从 HDFS 文件加载数据, 此处数据类型为 Parquet
    40. # tag 名称为 tag name 0
    41. # HDFS Parquet 文件的中的 field 0、field 1、field 2 将写入 tag name 0
    42. # 节点列为 vertex key field
    43. tag name 0: {
    44. type: parquet
    45. path: hdfs path
    46. fields: {
    47. field 0: nebula field 0,
    48. field 1: nebula field 1,
    49. field 2: nebula field 2
    50. }
    51. vertex: vertex key field
    52. batch : 16
    53. }
    54. # 与上述类似
    55. # 从 Hive 加载将执行命令 $ {exec} 作为数据集
    56. tag name 1: {
    57. type: hive
    58. exec: "select hive field 0, hive field 1, hive field 2 from database.table"
    59. fields: {
    60. hive field 0: nebula field 0,
    61. hive field 1: nebula field 1,
    62. hive field 2: nebula field 2
    63. }
    64. vertex: vertex id field
    65. }
    66. }
    67. # 边处理
    68. edges: {
    69. # 从 HDFS 加载数据,数据类型为 JSON
    70. # 边名称为 edge name 0
    71. # HDFS JSON 文件中的 field 0、field 1、field 2 将被写入 edge name 0
    72. # 起始列为 source field
    73. edge name 0: {
    74. type: json
    75. path: hdfs path
    76. fields: {
    77. field 0: nebula field 0,
    78. field 1: nebula field 1,
    79. field 2: nebula field 2
    80. }
    81. source: source field
    82. target: target field
    83. ranking: ranking field
    84. }
    85. # 从 Hive 加载将执行命令 $ {exec} 作为数据集
    86. # 边权重为可选
    87. edge name 1: {
    88. type: hive
    89. exec: "select hive field 0, hive field 1, hive field 2 from database.table"
    90. fields: {
    91. hive field 0: nebula field 0,
    92. hive field 1: nebula field 1,
    93. hive field 2: nebula field 2
    94. }
    95. source: source id field
    96. target: target id field
    97. }
    98. }
    99. }

    Spark 配置信息

    下表给出了一些示例,所有可配置项请见 Spark Available Properties。

    字段 默认值 是否必须 说明
    spark.app.name Spark Writer app 名称
    spark.driver.cores 1 驱动程序进程的核数,仅适用于群集模式
    spark.driver.maxResultSize 1G 每个 Spark 操作(例如收集)中所有分区的序列化结果的上限(以字节为单位)。至少应为 1M,否则应为 0(无限制)
    spark.cores.max (not set) 当以“粗粒度”共享模式在独立部署群集或 Mesos 群集上运行时,跨群集(而非从每台计算机)请求应用程序的最大 CPU 核数。如果未设置,则默认值为 Spark 的独立集群管理器上的 spark.deploy.defaultCores 或 Mesos 上的 infinite(所有可用的内核)

    Nebula Graph 配置信息

    字段 默认值 是否必须 说明
    nebula.addresses 查询引擎的地址列表,逗号分隔
    nebula.user 数据库用户名,默认为 user
    nebula.pswd 数据库用户名对应密码,默认 user 密码为 password
    nebula.space 导入数据对应的 space,本例中为 test
    nebula.connection.timeout 3000 Thrift 连接超时时间
    nebula.connection.retry 3 Thrift 连接重试次数
    nebula.execution.retry 3 nGQL 语句执行重试次数

    tags 和 edges 映射信息

    tag 和 edge 映射的选项比较类似。下面先介绍相同的选项,再分别介绍 tag 映射edge 映射的特有选项。

    • 相同的选项

      • type 指定上文中提到的数据类型,目前支持 “Parquet”、”JSON”、”ORC” 和 “CSV”,大小写不敏感,必填
      • path 适用于 HDFS 数据源,指定HDFS 文件或目录的绝对路径,type 为 HDFS 时,必填
      • exec 适用于 Hive 数据源, 当执行查询语句 type 为 HIVE 时,必填
      • fields 将输入源列的列名映射为 tag / edge 的属性名,必填
    • tag 映射的特有选项

      • vertex 指定某一列作为点的 ID 列,必填
    • edge 映射的特有选项

      • source 指定输入源某一列作为源点的 ID 列,必填
      • target 指定某一列作为目标点的 ID 列,必填
      • 当插入边有 ranking 值, ranking 指定某一列作为边 ranking 列,选填

    数据源映射

    • HDFS Parquet 文件
      • type 指定输入源类型,当为 parquet 时大小写不敏感,必填
      • path 指定 HDFS 文件或目录的路径,必须是 HDFS 的绝对路径,必填
    • HDFS JSON 文件
      • type 指定输入源类型,当为 JSON 时大小写不敏感,必填
      • path 指定 HDFS 文件或目录的路径,必须是 HDFS 的绝对路径,必填
    • HIVE ORC 文件
      • type 指定输入源类型,当为 ORC时大小写不敏感,必填
      • path 指定 HDFS 文件或目录的路径,必须是 HDFS 的绝对路径,必填
    • HIVE CSV 文件
      • type 指定输入源类型,当为 CSV 时大小写不敏感,必填
      • path 指定 HDFS 文件或目录的路径,必须是 HDFS 的绝对路径,必填
    • HIVE
      • type 指定输入源类型,当为 HIVE 时大小写不敏感,必填
      • exec 指定 HIVE 执行查询的语句,必填

    执行命令导入数据

    导入数据命令:

    1. bin/spark-submit \
    2. --class com.vesoft.nebula.tools.generator.v2.SparkClientGenerator \
    3. --master ${MASTER-URL} \
    4. ${SPARK_WRITER_JAR_PACKAGE} -c conf/test.conf -h -d

    参数说明:

    Abbreviation Required Default Description 示例
    —class yes 指定程序主类
    —master yes 指定spark cluster master url,请参见 master-urls e.g. spark://23.195.26.187:7077
    -c / —config yes 上文所编写的配置文件路径
    -h / —hive no false 用于指定是否支持 Hive
    -d / —directly no false true 为客户端方式插入;
    false 为 sst 方式导入 (TODO)
    -D / —dry no false 检查配置文件是否正确

    性能测试结果

    三台物理机 (56 核,250G 内存,万兆网,SSD),写 1 亿条数据(每条数据三个字段,每个 batch 64 条记录),用时 4 分钟(40万条/秒)。