Send DStream of Apache Spark (PySpark) Streaming over to a Web Socket

In many use-cases of real-time analytics, while the real-time computation of streaming data happens in-memory, there are cases where you need to either visualize or connect the result of your computation, namely your DStream or TransformDStream to a dashboard. For this, you need to send the result of your MapReduce computation that are represented in tuples, assuming you don't want to or need to send the whole DStream over (This is a more complicated usecase because DStream object needs modification to be encoded to JSON).

solution

As you know a stream comprise of many RDD(s) that keep the states of stream that are immutable, distributed dataset.

The reason you need to know this is that the result of MapReduce is spread across those RDD and you need to aggregate the result.

Without further ado, here are the steps:

Loop through DStream or TransformDStream object with foreachRDD function. Arguments are time and rdd objects. We loop through each rdd, extract tuples and append them to form an array with num elements. Using any of Python Web Socket Client Libraries, we open connections to the socket endpoint, send over the encoded JSON result and we close it afterwards.


def takeAndPrint(time, rdd, num=1000):
    result = []
    taken = rdd.take(num + 1)
    print("-------------------------------------------")
    print("Time: %s" % time)
    print("-------------------------------------------")
    for record in taken[:num]:
        print(record)
        result.append(record)

    ws = create_connection(url)
    ws.send(json.dumps(result))
    ws.close()

    if len(taken) > num:
        print("...")
    print("")

address_by_destination.foreachRDD(takeAndPrint)

NOTE: Opening and Closing a socket connection for each RDD is a must. Because if not, you will end up getting the following serialisation error:

Traceback (most recent call last):
  File "/Users/ara/dev/personal/spark-1.5.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/util.py", line 95, in dumps
    return bytearray(self.serializer.dumps((func.func, func.deserializers)))
    File "/Users/ara/dev/personal/spark-1.5.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 428, in dumps
    return cloudpickle.dumps(obj, 2)
    File "/Users/ara/dev/personal/spark-1.5.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 646, in dumps
    cp.dump(obj)

Now I guess you are done! Post a comment or send a tweet to @_ambodi if you needed more help on this.

The Rstats tag of this blog is added to R Bloggers