Feature: Integrating Kafka with Spark Streaming

Apache Spark Streaming + Kafka 0.10 (3/4)

Billy Mobile
billyengineering

--

by Joan Viladrosa, Samuel Lissner.

This story is part of a series of articles related to the integration of Apache Spark Streaming and Apache Kafka. Feel free to check the other ones here.

New Apache Spark Streaming 2.0 Kafka Integration

But why you are probably reading this post (I expect you to read the whole series. Please, if you have scrolled until this part, go back ;-)), is because you are interested in the new Kafka integration that comes with Apache Spark 2.0+. For retrocompatibility reasons, the previous integration is maintained, and you can still use it. It is actually the stable one, but come on, we love experimental features, don’t we?

So two different maven artifacts are available now: spark-streaming-kafka-0–8 and spark-streaming-kafka-0–10.

Sadly, the Python API has been dropped from the new one for now (surely there will be one when the API is no longer experimental). But if you are interested in the new amazing features that comes with Apache Kafka 0.10 (mostly security and encryption), this is your only way to go. Luckily, it’s very similar to the previous direct approach, so minor changes will be needed to your code (some examples later).

Instead of the simple API, we will be using the new Consumer API provided with Apache Kafka 0.9+. The new consumer pre-fetches messages into local buffers to improve performance. In order to make use of these buffers, it makes sense to keep the consumers cached and alive by loading them into executors. Spawning new consumers for each task, on the other hand, would be bad practice as it makes the buffers useless.

So the same consumers will be used again and again for each task. The new Consumer API allows to allocate more consciously the Spark executors while consuming data. This is where the new LocationStrategies kicks in. We can specify on the creation of the DStream, where we want our consumers to be placed. Three different options are provided to cover all use cases:

  • PreferConsistent. Distribute partitions evenly across all available executors
  • PreferBrokers. If your Spark Cluster is on the same very nodes of your Kafka Cluster, this strategy will spawn consumers for each partition on the same node of the leader broker for that partition.
  • PreferFixed. If you want to specify any other magical and awesome distribution of consumer among executors.

Also the way we can specify to which topics and partitions to subscribe has improved, and we can actually change those on runtime without stopping the stream. The ConsumerStrategies are as follow:

  • Subscribe: To subscribe to a fixed collection of topics
  • SubscribePattern: Use a regex to specify the topics of interest
  • Assign: Specify a fixed collections of topics AND partitions
  • If none of the above fits you, ConsumerStrategy is a public class that you can extend and implement your own.

But probably the main reason to use new Kafka 0.10 is the security, since the new Consumer API supports SSL connections between your executors and kafka brokers. But, take into consideration that this will only secure the connection between Spark and Kafka, you are still responsible for separately securing Spark inter-node communication and other output operations.

New Integration Example code

You can find the whole example with a maven project to just import it with your favourite IDE at: https://github.com/joanvr/spark-streaming-kafka-010-demo

We will discuss this a bit further here.

First of all, we will need to create a Map with all the configurations of the stream. Obviously, you will need to provide the kafka brokers servers. Also the serializer of the key and the value of your messages (you can use the already implemented ones that come with the kafka client library for bytes, doubles, integers, longs and strings). If you want to use the offset commit API you will also need to specify the group.id, where to start the offsets the first time, and if you want to auto commit the offsets. More on this later when we will discuss the major semantic use cases.

Then we will need to specify to which topics we want to subscribe. The Subscribe Consumer Strategy takes an Array of topics, so we will create an array with a single string inside for the purpose of the demo.

Finally we just need to create the Direct Stream using the helper on the KafkaUtils object package helper. Notice that we are using a PreferConsistent LocationStrategy here.

Once you have a DStream, you can start using all the well known operations you can do with Spark. For this demo, we implemented a typical WordCount, and finally store it as Text Files.

At this point we have all setted up, so we just need to start our streaming context and see the magic happen. Hopefully, if everything works fine, soon you will start seeing files written into the output directory. Also you can connect to Spark UI on the 4040 port of the host you are running the demo.

Some other topics not covered in the demo

Apart from some very simple use cases, usually you will want more control over your offsets and when to commit them. You can easily do it if you cast each RDD of the stream to an instance of HasOffsetRanges, and then use them with the stream casted as CanCommitOffsets class.

Take into account that the offsets are committed asynchronously, so it might fail and you will end up with repeated records in case of failure. I recommend you to use your own way to store the offsets if that makes sense for your use case.

--

--

With headquarters in Barcelona, and a regional office in Singapore, Billy Mobile is the leading affiliate platform for Mobile Advertising.