Yet another ode to Vert.x, or how to write a performance-wise expiring map in less than 100 lines of code.

I’ve been working with the Vert.x framework for more than 4 years but I won’t stop being excited how simple, lightweight and elegant it is (especially the event loop thread model). In this blog post I will tell you how we implemented PeriodicallyExpiri…


This content originally appeared on DEV Community and was authored by Oleg Agafonov

I've been working with the Vert.x framework for more than 4 years but I won't stop being excited how simple, lightweight and elegant it is (especially the event loop thread model). In this blog post I will tell you how we implemented PeriodicallyExpiringHashMap data structure in less than 100 lines of code. But first let me give you a bit of a context about why do we need it.

Problem

SIP3 is a very advanced VoIP monitoring and troubleshooting platform. To provide detailed information about calls quality we need to:

  1. Aggregate RTP packets into RTP streams in a real-time
  2. Periodically walk though all the RTP streams and terminate ones that haven't been updated for a certain period of time.

Let's stay away from telecom specific and take a look at a simplified code example:

class RtpStreamHandler : AbstractVerticle() {

    var expirationDelay: Long = 1000
    var aggregationTimeout: Long = 30000

    private val rtpStreams = mutableMapOf<String, RtpStream>()

    override fun start() {
        vertx.setPeriodic(expirationDelay) {
            val now = System.currentTimeMillis()

            rtpStreams.filterValues { rtpStream -> rtpStream.updatedAt + aggregationTimeout < now }
                .forEach { (rtpStreamId, rtpStream) ->
                    terminateRtpStream(rtpStream)
                    rtpStreams.remove(rtpStreamId)
                }
        }

        vertx.eventBus().localConsumer<RtpPacket>("on_rtp_packet") { event ->
            val rtpPacket = event.body()
            handleRtpPacket(rtpPacket)
        }
    }

    fun handleRtpPacket(rtpPacket: RtpPacket) {
        val rtpStream = rtpStreams.getOrPut(rtpPacket.rtpStreamId) { RtpStream() }
        rtpStream.addPacket(rtpPacket)
    }

    fun terminateRtpStream(rtpStream: RtpStream) {
        vertx.eventBus().localSend("on_rtp_stream", rtpStream)
    }
}

Now let's imagine that we constantly have a 30K of active RTP streams. Also every second we terminate approximately a thousand of old steams but get a thousand of new ones instead. In these circumstances our code doesn't look very efficient and we certainly need a better solution.

Solution

As you can see from the first code snippet once an RTP stream was updated it won't be terminated at least for the next aggregationTimeout. This means that we can simply do not bother about it for some time.

And this is the key idea behind the SIP3 PeriodicallyExpiringHashMap implementation:

class PeriodicallyExpiringHashMap<K, V> private constructor(
    vertx: Vertx,
    private val delay: Long,
    private val period: Int,
    private val expireAt: (K, V) -> Long,
    private val onExpire: (K, V) -> Unit
) {

    private val objects = mutableMapOf<K, V>()
    private val expiringSlots = (0 until period).map { mutableMapOf<K, V>() }.toList()
    private var expiringSlotIdx = 0

    init {
        vertx.setPeriodic(delay) {
            terminateExpiringSlot()
            expiringSlotIdx += 1
            if (expiringSlotIdx >= period) {
                expiringSlotIdx = 0
            }
        }
    }

    fun getOrPut(key: K, defaultValue: () -> V): V {
        return objects.getOrPut(key) {
            defaultValue.invoke().also { expiringSlots[expiringSlotIdx][key] = it }
        }
    }

    private fun terminateExpiringSlot() {
        val now = System.currentTimeMillis()

        expiringSlots[expiringSlotIdx].apply {
            forEach { (k, v) ->
                val expireAt = expireAt(k, v)

                when {
                    expireAt <= now -> {
                        objects.remove(k)?.let { onExpire(k, it) }
                    }
                    else -> {
                        var shift = ((expireAt - now) / delay).toInt() + 1
                        if (shift >= period) {
                            shift = period - 1
                        }
                        val nextExpiringSlotIdx = (expiringSlotIdx + shift) % period

                        expiringSlots[nextExpiringSlotIdx][k] = v
                    }
                }
            }
            clear()
        }
    }

    data class Builder<K, V>(
        var delay: Long = 1000,
        var period: Int = 60,
        var expireAt: (K, V) -> Long = { _: K, _: V -> Long.MAX_VALUE },
        var onExpire: (K, V) -> Unit = { _: K, _: V -> }
    ) {
        fun delay(delay: Long) = apply { this.delay = delay }
        fun period(period: Int) = apply { this.period = period }
        fun expireAt(expireAt: (K, V) -> Long) = apply { this.expireAt = expireAt }
        fun onExpire(onExpire: (K, V) -> Unit) = apply { this.onExpire = onExpire }

        fun build(vertx: Vertx) = PeriodicallyExpiringHashMap(vertx, delay, period, expireAt, onExpire)
    }
}

Here are the benefits of this data structure:

  1. Now we just have a bunch of time slots. So, instead of walking through all the objects in our map every expirationDelay we can walk trough a single slot. So, instead of checking on 30K objects every second we will check on 1K only.
  2. We don't need to create a copy of original map every time we decide to walk though it. In the previous example it also was an issue, because rtpSteams.filtervalues created a copy of the original map.
  3. The last and the most important. Our implementation will stay consistent within a particular verticle context. That means you can simply extend it and implement the rest of the methods (including tricky ones, like size()).

Conclusions

Finally let's see how our verticle will look like with the new PeriodicallyExpiringHashMap data structure:

class RtpStreamHandler : AbstractVerticle() {

    var expirationDelay: Long = 1000
    var aggregationTimeout: Long = 30000

    private lateinit var rtpStreams: PeriodicallyExpiringHashMap<String, RtpStream>

    override fun start() {
        rtpStreams = PeriodicallyExpiringHashMap.Builder<String, RtpStream>()
            .delay(expirationDelay)
            .period((aggregationTimeout / expirationDelay).toInt())
            .expireAt { _, rtpStream -> rtpStream.updatedAt + aggregationTimeout }
            .onExpire { _, rtpStream -> terminateRtpStream(rtpStream) }
            .build(vertx)

        vertx.eventBus().localConsumer<RtpPacket>("on_rtp_packet") { event ->
            val rtpPacket = event.body()
            handleRtpPacket(rtpPacket)
        }
    }

    fun handleRtpPacket(rtpPacket: RtpPacket) {
        val rtpStream = rtpStreams.getOrPut(rtpPacket.rtpStreamId) { RtpStream() }
        rtpStream.addPacket(rtpPacket)
    }

    fun terminateRtpStream(rtpStream: RtpStream) {
        vertx.eventBus().localSend("on_rtp_stream", rtpStream)
    }
}

The code looks clean and simple. And it's all due to the Vert.x event loop thread model.

?‍? Happy coding,
Your SIP3 team.


This content originally appeared on DEV Community and was authored by Oleg Agafonov


Print Share Comment Cite Upload Translate Updates
APA

Oleg Agafonov | Sciencx (2021-09-22T21:23:40+00:00) Yet another ode to Vert.x, or how to write a performance-wise expiring map in less than 100 lines of code.. Retrieved from https://www.scien.cx/2021/09/22/yet-another-ode-to-vert-x-or-how-to-write-a-performance-wise-expiring-map-in-less-than-100-lines-of-code/

MLA
" » Yet another ode to Vert.x, or how to write a performance-wise expiring map in less than 100 lines of code.." Oleg Agafonov | Sciencx - Wednesday September 22, 2021, https://www.scien.cx/2021/09/22/yet-another-ode-to-vert-x-or-how-to-write-a-performance-wise-expiring-map-in-less-than-100-lines-of-code/
HARVARD
Oleg Agafonov | Sciencx Wednesday September 22, 2021 » Yet another ode to Vert.x, or how to write a performance-wise expiring map in less than 100 lines of code.., viewed ,<https://www.scien.cx/2021/09/22/yet-another-ode-to-vert-x-or-how-to-write-a-performance-wise-expiring-map-in-less-than-100-lines-of-code/>
VANCOUVER
Oleg Agafonov | Sciencx - » Yet another ode to Vert.x, or how to write a performance-wise expiring map in less than 100 lines of code.. [Internet]. [Accessed ]. Available from: https://www.scien.cx/2021/09/22/yet-another-ode-to-vert-x-or-how-to-write-a-performance-wise-expiring-map-in-less-than-100-lines-of-code/
CHICAGO
" » Yet another ode to Vert.x, or how to write a performance-wise expiring map in less than 100 lines of code.." Oleg Agafonov | Sciencx - Accessed . https://www.scien.cx/2021/09/22/yet-another-ode-to-vert-x-or-how-to-write-a-performance-wise-expiring-map-in-less-than-100-lines-of-code/
IEEE
" » Yet another ode to Vert.x, or how to write a performance-wise expiring map in less than 100 lines of code.." Oleg Agafonov | Sciencx [Online]. Available: https://www.scien.cx/2021/09/22/yet-another-ode-to-vert-x-or-how-to-write-a-performance-wise-expiring-map-in-less-than-100-lines-of-code/. [Accessed: ]
rf:citation
» Yet another ode to Vert.x, or how to write a performance-wise expiring map in less than 100 lines of code. | Oleg Agafonov | Sciencx | https://www.scien.cx/2021/09/22/yet-another-ode-to-vert-x-or-how-to-write-a-performance-wise-expiring-map-in-less-than-100-lines-of-code/ |

Please log in to upload a file.




There are no updates yet.
Click the Upload button above to add an update.

You must be logged in to translate posts. Please log in or register.