Skip to content

Aris Koliopoulos

How to join streams in Apache Flink

Flink, Distributed Systems, Scala, Kafka3 min read

A Stream Join

At DriveTribe, we use a fair bit of stream processing. All events flow through Kafka and we use Apache Flink for scoring, enrichment and denormalized view production in realtime at scale.

We often need to find talented developers to join our team. In order to test for stream processing knowledge, we came up with a simple stream join exercise. I often get asked to provide a solution for it, so I thought a blog post might be useful. The article assumes some basic understanding of Apache Kafka and related technologies.

The full code, along with some appropriate testing, can be found at https://github.com/ariskk/flink-stream-join.

Without further ado:

Assume you have the following rudimentary data model. Assume that User and Tweet are keyed using their respective keys and stored in Kafka. Describe* how you would implement a join function to produce a DataStream[TweetView] from a DataStream[User] and a DataStream[Tweet]. Choose a stream abstraction you are comfortable with. Kafka's KStream or Spark's DStream could work equally well.

* No live coding is required. Only a high level analysis of the problem and potential solutions.

  final case class User(id: UserId, name: String, avatar: Uri)
  final case class Tweet(id: TweetId, authorId: UserId, content: String)
  /** Denormalized view model */
  final case class TweetView(id: TweetId, tweet: Tweet, author: User)
  /** Kafka sources */
  val users: DataStream[User] = fromKafkaTopic("users")
  val tweets: DataStream[Tweet] = fromKafkaTopic("tweets")
  /** Stream join */
  def join(users: DataStream[User], tweets: DataStream[Tweet]): DataStream[TweetView] = ???

The description provides a few clues:

  • A Tweet can only have 1 author, but an author can create many tweets.
  • The link between the two entities is the authorId.
  • users and tweets are stored in Kafka and are keyed using their unique keys. This indicates that messages sharing the same key will be in the same partition. And thus, they will always be delivered in the order they were committed to Kafka. In fact, that's the only ordering guarantee Kafka provides. This indicates that our code doesn't have to deal with User events for a specific user arriving out of order; we will always get the latest update last. Ditto for tweets.

At the very basic level, we need a way to partition both streams using UserId and then funnel all events for a key into a single parallel instance of the transformation function. To do this in Flink:

  • We connect users and tweets, creating a ConnectedStreams[User, Tweet]. This gives us the ability to co-process data from both streams.
  • We keyBy the UserId field on both streams. This guarantees that all events (from both streams) sharing the same key will be processed by the same instance.

  val coStreams: ConnectedStreams[User, Tweet] = 
    users.connect(tweets).keyBy(_.id, _.author)

Now to the interesting part. We need to implement a function that takes:

  • A stream of updates for a single user profile
  • A stream of tweets from that user

and produces a stream of TweetView.

At this stage, it might be tempting to assume that we only need to store the user and then just decorate the stream of tweets as they come. Unfortunately, that would be wrong. There are no guarantees as to which order the events will arrive. We might receive tweets before the user creation event. The user might decide to update their profile (triggering a fan-out). Events can be late and arrive out of causal order for a variety of reasons. It is our job to ensure commutativity between users and tweets. Kafka can only make sure we get user and tweet updates (per key) in the correct order.

To process both streams properly we need to store the latest user profile and all the tweets we have received.

Our State would look like this:


final case class State(
  user: Option[User],
  tweets: Map[TweetId, Tweet]
)

Our function would then look like this:


coStreams.coMapWithState[Traversable[TweetView], State](
  (user, state) => {
    val views = state.tweets.values.map(t => TweetView(t.id, t, user))
    (views, state.copy(user = Option(user)))
  },
  (tweet, state) => {
    val maybeView = state.user.map(u => TweetView(tweet.id, tweet, u))
    (maybeView, state.copy(tweets = state.tweets + (tweet.id -> tweet)))
  },
  State.empty
).flatMap(identity(_))

Note: coMapWithState contains some Flink specific state config that has been omitted here. The full implementation can be found here.

What this function does is quite simple:

  • For every user we receive, read all the tweets we have and emit a view for each tweet. Update the user in State
  • For every tweet we receive, read the latest user from the State and emit a view. Store the tweet in State (or update, if an older version exists).

How well this works in practice depends on the cardinality of the datasets. Replace tweets with sensor readings and users with a faster stream and this approach might be impossible. In DriveTribe's use case where users update their profiles every few weeks and users have at most a few hundred posts this works quite well.

If you have feedback please reach out on Twitter.

Thanks for stopping by!


More

Apache Kafka: 8 things to check before going live

© 2022 by Aris Koliopoulos. All rights reserved.
Theme by LekoArts