— Flink, Distributed Systems, Scala, Kafka — 3 min read
At DriveTribe, we use a fair bit of stream processing. All events flow through Kafka and we use Apache Flink for scoring, enrichment and denormalized view production in realtime at scale.
We often need to find talented developers to join our team. In order to test for stream processing knowledge, we came up with a simple stream join exercise. I often get asked to provide a solution for it, so I thought a blog post might be useful. The article assumes some basic understanding of Apache Kafka and related technologies.
The full code, along with some appropriate testing, can be found at https://github.com/ariskk/flink-stream-join.
Without further ado:
* No live coding is required. Only a high level analysis of the problem and potential solutions.Assume you have the following rudimentary data model. Assume that
User
andTweet
are keyed using their respective keys and stored in Kafka. Describe* how you would implement ajoin
function to produce aDataStream[TweetView]
from aDataStream[User]
and aDataStream[Tweet]
. Choose a stream abstraction you are comfortable with. Kafka'sKStream
or Spark'sDStream
could work equally well.
final case class User(id: UserId, name: String, avatar: Uri)
final case class Tweet(id: TweetId, authorId: UserId, content: String)
/** Denormalized view model */
final case class TweetView(id: TweetId, tweet: Tweet, author: User)
/** Kafka sources */
val users: DataStream[User] = fromKafkaTopic("users")
val tweets: DataStream[Tweet] = fromKafkaTopic("tweets")
/** Stream join */
def join(users: DataStream[User], tweets: DataStream[Tweet]): DataStream[TweetView] = ???
The description provides a few clues:
Tweet
can only have 1 author, but an author can create many tweets.authorId
.users
and tweets
are stored in Kafka and are keyed using their unique keys. This indicates that messages sharing the same key will be
in the same partition. And thus, they will always be delivered in the order they were committed to Kafka. In fact, that's the only ordering guarantee Kafka provides.
This indicates that our code doesn't have to deal with User
events for a specific user arriving out of order; we will always get the latest update last. Ditto for tweets.At the very basic level, we need a way to partition both streams using UserId
and then funnel all events for a key
into a single parallel instance of the transformation function.
To do this in Flink:
connect
users
and tweets
, creating a ConnectedStreams[User, Tweet]
. This gives us the ability to co-process data from both streams.keyBy
the UserId
field on both streams. This guarantees that all events (from both streams) sharing the same key will be processed by the same instance.
val coStreams: ConnectedStreams[User, Tweet] =
users.connect(tweets).keyBy(_.id, _.author)
Now to the interesting part. We need to implement a function that takes:
and produces a stream of TweetView
.
At this stage, it might be tempting to assume that we only need to store the user and then
just decorate the stream of tweets as they come. Unfortunately, that would be wrong. There are no guarantees as to which order the events will arrive.
We might receive tweets before the user creation event. The user might decide to update their profile (triggering a fan-out).
Events can be late and arrive out of causal order for a variety of reasons. It is our job to ensure commutativity between users
and tweets
.
Kafka can only make sure we get user and tweet updates (per key) in the correct order.
To process both streams properly we need to store the latest user profile and all the tweets we have received.
Our State
would look like this:
final case class State(
user: Option[User],
tweets: Map[TweetId, Tweet]
)
Our function would then look like this:
coStreams.coMapWithState[Traversable[TweetView], State](
(user, state) => {
val views = state.tweets.values.map(t => TweetView(t.id, t, user))
(views, state.copy(user = Option(user)))
},
(tweet, state) => {
val maybeView = state.user.map(u => TweetView(tweet.id, tweet, u))
(maybeView, state.copy(tweets = state.tweets + (tweet.id -> tweet)))
},
State.empty
).flatMap(identity(_))
Note: coMapWithState
contains some Flink specific state config that has been omitted here. The full implementation can be found here.
What this function does is quite simple:
State
State
and emit a view. Store the tweet in State
(or update, if an older version exists).How well this works in practice depends on the cardinality of the datasets. Replace tweets with sensor readings and users with a faster stream and this approach might be impossible. In DriveTribe's use case where users update their profiles every few weeks and users have at most a few hundred posts this works quite well.
If you have feedback please reach out on Twitter.
Thanks for stopping by!