• 自定义receiver指南
    • 实现一个自定义的Receiver
    • 在Spark流应用程序中使用自定义的receiver
    • Receiver可靠性
    • 实现和使用自定义的基于actor的receiver

    自定义receiver指南

    Spark Streaming可以从包括内置数据源在内的任意数据源获取数据(其他数据源包括flume,kafka,kinesis,文件,套接字等等)。这需要开发者去实现一个定制receiver从具体的数据源接收
    数据。本指南介绍了实现自定义receiver的过程,以及怎样将receiver用到Spark Streaming应用程序中。

    实现一个自定义的Receiver

    这一节开始实现一个Receiver。一个自定义的receiver必须继承
    这个抽象类,实现它的两个方法onStart()(开始接收数据)以及onStop()(停止接收数据)。

    需要注意的是onStart()onStop()不能够无限期的阻塞。通常情况下,onStart()启动线程负责数据的接收,onStop()确保这个接收过程停止。接收线程也能够调用receiverisStopped
    方法去检查是否已经停止接收数据。

    一旦接收了数据,这些数据就能够通过调用store(data)方法存到Spark中,store(data)是[Receiver]类中的方法。有几个重载的store()方法允许你存储接收到的数据(record-at-a-time or as whole collection of objects/serialized bytes)

    在接收线程中出现的任何异常都应该被捕获或者妥善处理从而避免receiver在没有提示的情况下失败。restart(<exception>)方法将会重新启动receiver,它通过异步的方式首先调用onStop()方法,
    然后在一段延迟之后调用onStart()方法。stop(<exception>)将会调用onStop()方法终止receiverreportError(<error>)方法在不停止或者重启receiver的情况下打印错误消息到
    驱动程序(driver)。

    如下所示,是一个自定义的receiver,它通过套接字接收文本数据流。它用分界符’\n’把文本流分割为行记录,然后将它们存储到Spark中。如果接收线程碰到任何连接或者接收错误,receiver将会
    重新启动以尝试再一次连接。

    1. class CustomReceiver(host: String, port: Int)
    2. extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {
    3. def onStart() {
    4. // Start the thread that receives data over a connection
    5. new Thread("Socket Receiver") {
    6. override def run() { receive() }
    7. }.start()
    8. }
    9. def onStop() {
    10. // There is nothing much to do as the thread calling receive()
    11. // is designed to stop by itself isStopped() returns false
    12. }
    13. //Create a socket connection and receive data until receiver is stopped
    14. private def receive() {
    15. var socket: Socket = null
    16. var userInput: String = null
    17. try {
    18. // Connect to host:port
    19. socket = new Socket(host, port)
    20. // Until stopped or connection broken continue reading
    21. val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"))
    22. userInput = reader.readLine()
    23. while(!isStopped && userInput != null) {
    24. store(userInput)
    25. userInput = reader.readLine()
    26. }
    27. reader.close()
    28. socket.close()
    29. // Restart in an attempt to connect again when server is active again
    30. restart("Trying to connect again")
    31. } catch {
    32. case e: java.net.ConnectException =>
    33. // restart if could not connect to server
    34. restart("Error connecting to " + host + ":" + port, e)
    35. case t: Throwable =>
    36. // restart if there is any other error
    37. restart("Error receiving data", t)
    38. }
    39. }
    40. }

    在Spark流应用程序中使用自定义的receiver

    在Spark流应用程序中,用streamingContext.receiverStream(<instance of custom receiver>)方法,可以使用自动用receiver。代码如下所示:

    1. // Assuming ssc is the StreamingContext
    2. val customReceiverStream = ssc.receiverStream(new CustomReceiver(host, port))
    3. val words = lines.flatMap(_.split(" "))
    4. ...

    完整的代码见例子CustomReceiver.scala

    Receiver可靠性

    基于Receiver的稳定性以及容错语义,Receiver分为两种类型

    • Reliable Receiver:可靠的源允许发送的数据被确认。一个可靠的receiver正确的应答一个可靠的源,数据已经收到并且被正确地复制到了Spark中(指正确完成复制)。实现这个receiver并
      仔细考虑源确认的语义。
    • Unreliable Receiver :这些receivers不支持应答。即使对于一个可靠的源,开发者可能实现一个非可靠的receiver,这个receiver不会正确应答。

    为了实现可靠receiver,你必须使用store(multiple-records)去保存数据。保存的类型是阻塞访问,即所有给定的记录全部保存到Spark中后才返回。如果receiver的配置存储级别利用复制
    (默认情况是复制),则会在复制结束之后返回。因此,它确保数据被可靠地存储,receiver恰当的应答给源。这保证在复制的过程中,没有数据造成的receiver失败。因为缓冲数据不会应答,从而
    可以从源中重新获取数据。

    一个不可控的receiver不必实现任何这种逻辑。它简单的从源中接收数据,然后用store(single-record)一次一个地保存它们。虽然它不能用store(multiple-records)获得可靠的保证,
    它有下面一些优势:

    • 系统注重分块,将数据分为适当大小的块。
    • 如果指定了速率的限制,系统注重控制接收速率。
    • 因为以上两点,不可靠receiver比可靠receiver更容易实现。

    下面是两类receiver的特征
    Receiver Type | Characteristics
    —- | —-
    Unreliable Receivers | 实现简单;系统更关心块的生成和速率的控制;没有容错的保证,在receiver失败时会丢失数据
    Reliable Receivers | 高容错保证,零数据丢失;块的生成和速率的控制需要手动实现;实现的复杂性依赖源的确认机制

    实现和使用自定义的基于actor的receiver

    自定义的Akka actor也能够拥有接收数据。ActorHelpertrait可以
    应用于任何Akka actor中。它运行接收的数据通过调用store()方法存储于Spark中。可以配置这个actor的监控(supervisor)策略处理错误。

    1. class CustomActor extends Actor with ActorHelper {
    2. def receive = {
    3. case data: String => store(data)
    4. }
    5. }

    利用这个actor,一个新的输入数据流就能够被创建。

    1. // Assuming ssc is the StreamingContext
    2. val lines = ssc.actorStream[String](Props(new CustomActor()), "CustomReceiver")

    完整的代码间例子ActorWordCount.scala