This content originally appeared on DEV Community and was authored by Marcos Maia
Context
This post is part of a series where we create a simple Kafka Streams Application with Kotlin using Spring boot and Spring Kafka.
Please check the first part of the tutorial to get started and get further context of what we're building.
If you want to start from here you can clone the source code for this project
git clone git@github.com:mmaia/simple-spring-kafka-stream-kotlin.git
and then checkout v4git checkout v6
and follow from there continuing with this post.
In this post we're going to create our first Spring Kafka Streams, GlobalKTable with a RocksDB state store and expose it on our Rest Controller created in part II using a ReadOnlyKeyValueStore implementation.
Configure App for Spring Kafka Streams
Now that we're going to use Kafka Streams we need to let Spring Boot know that so we start adding the @EnableKafkaStreams
Spring Kafka annotation to the kafkaConfiguration
class definition, just below the @EnableKafka
one we already have:
@Configuration
@EnableKafka
@EnableKafkaStreams
class KafkaConfiguration {
...
We also need to add the basic default streams configuration the same way we did to the consumers and producers before but this time we will use the programmatic approach instead of adding as a Spring Boot configuration to the application.yaml
file, which also would work but it's nice to have examples of both approaches so you're aware of them and can pick the one that suits you the best.
In case you have multiple consumers, producers or streams in the same Spring Boot application and want different configurations for each you will have to take the programmatic approach as it gives you more flexibility to configure each specific client while the configuration approach only enables you to configure the default spring bean of each type.
In the KafkaConfiguration
class we create the default Streams configuration:
@Bean(name = [KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME])
fun defaultKafkaStreamsConfig(): KafkaStreamsConfiguration {
val props: MutableMap<String, Any> = HashMap()
props[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = KAFKA_HOSTS
props[StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG] =
LogAndContinueExceptionHandler::class.java
props[AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = SCHEMA_REGISTRY_URL
props[StreamsConfig.APPLICATION_ID_CONFIG] = "quote-stream"
props[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = Serdes.String().javaClass.name
props[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = SpecificAvroSerde::class.java
props[ConsumerConfig.GROUP_ID_CONFIG] = "stock-quotes-stream-group"
return KafkaStreamsConfiguration(props)
}
and we will also add to the KafkaConfiguration
class another bean so we can print out the Stream states which is good for learning and understanding purposes, in a real application you could choose to not do this and, if you do, you would use a logger instead of stdout
printing it like we do in this example.
@Bean
fun configurer(): StreamsBuilderFactoryBeanConfigurer? {
return StreamsBuilderFactoryBeanConfigurer { fb: StreamsBuilderFactoryBean ->
fb.setStateListener { newState: KafkaStreams.State, oldState: KafkaStreams.State ->
println("State transition from $oldState to $newState")
}
}
}
Create a GlobalKTable for Leverage
Now that our Kafka Streams are properly configured we can create our first Kafka Stream element. We will start creating a GlobalKTable and a materialized view to the Leverage prices.
Create a new class called LeverageStream
in the same repository
package, annotate it with @Repository
.
@Repository
class LeverageStream {
...
Now before creating our GlobalKTable and Materialize it we need to tell Kafka Streams which type of Serialization and Deserialization it will need for the object we will process and store. In Kafka Streams this is done using a Serde
and because we are using Avro we will use an Avro Serde SpecificAvroSerde
passing in our LeveragePrice
type we have created in a previous step, add the following to the LeverageStream class to declare the Serde, configure and initialize it.
private val leveragePriceSerde = SpecificAvroSerde<LeveragePrice>()
val serdeConfig: MutableMap<String, String> = Collections.singletonMap( AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL)
@PostConstruct
fun init() {
leveragePriceSerde.configure(serdeConfig, false)
}
Now that we told Kafka Streams the Serde
and the configuration where to get the Avro Schema we can add the following code which creates a GlobalKTable with a materialized RocksDB KeyValue store.
@Bean
fun leveragePriceBySymbolGKTable(streamsBuilder: StreamsBuilder): GlobalKTable<String, LeveragePrice> {
return streamsBuilder
.globalTable(
LEVERAGE_PRICES_TOPIC,
Materialized.`as`<String, LeveragePrice, KeyValueStore<Bytes, ByteArray>>(LEVERAGE_BY_SYMBOL_TABLE)
.withKeySerde(Serdes.String())
.withValueSerde(leveragePriceSerde)
)
}
The materialized view will by default create a RocksDB locally to your application for you, you don't have to manage or explicitly do anything it's... Magic!
You might be wondering where the hack is this RocksDB being created, well you can control that using a configuration StreamsConfig.STATE_DIR_CONFIG
i.e - state.dir
and point it to the location you want. In this case I left the defaults as we didn't specify the property so it will pick the default depending on the O.S you're running this tutorial. On linux machines you will find it under /tmp/kafka-streams/${stream-id}/${type}/rocksdb/${local_storage_name}
folder, the full path for this case is: /tmp/kafka-streams/quote-stream/global/rocksdb/leverage-by-symbol-ktable/
where you can see the RocksDB files:
Expose Leverage Materialized view
Great, now that we have a local storage we want to be able to query it to get the most up to date Leverage for the instruments.
In order to be able to query RocksDB we will declare a ReadOnlyKeyValueStore
, we will use Kotlin's lateinit
modifier as we need to bind in the Stream lifecycle before we initialize it. i.e - the RocksDB storage needs to be in place. Declare it in LeverageStream
:
private lateinit var leveragePriceView: ReadOnlyKeyValueStore<String, LeveragePrice>
and then declare a @Bean
listener to initialized it once the Stream is created:
@Bean
fun afterStart(sbfb: StreamsBuilderFactoryBean): StreamsBuilderFactoryBean.Listener {
val listener: StreamsBuilderFactoryBean.Listener = object : StreamsBuilderFactoryBean.Listener {
override fun streamsAdded(id: String, streams: KafkaStreams) {
leveragePriceView = streams.store<ReadOnlyKeyValueStore<String, LeveragePrice>>(
StoreQueryParameters.fromNameAndType(
LEVERAGE_BY_SYMBOL_TABLE,
QueryableStoreTypes.keyValueStore()
)
)
}
}
sbfb.addListener(listener)
return listener
}
We can then create a function that we use to get the specific value passing in a key:
fun getLeveragePrice(key: String?): LeveragePrice? {
return leveragePriceView[key]
}
Finally we can then add a new REST endpoint to our QuotesController
class to expose the Leverage:
@GetMapping("leveragePrice/{instrumentSymbol}")
fun getLeveragePrice(@PathVariable instrumentSymbol: String): ResponseEntity<LeveragePriceDTO> {
val leveragePrice: LeveragePrice = leverageStream.getLeveragePrice(instrumentSymbol)
?: return ResponseEntity.noContent().build()
// if quote doesn't exist in our local store we return no content.
val result = LeveragePriceDTO(leveragePrice.symbol.toString(), BigDecimal.valueOf(leveragePrice.leverage))
return ResponseEntity.ok(result)
}
With that we can build and run our application: mvn clean package -DskipTests && mvn spring-boot:run
. Make sure to have a local Kafka running as explained in part 1 and part 2 of this series and execute some calls:
- Add a leverage which will send a new Leverage to the Kafka Topic:
POST http://localhost:8080/api/leverage
Content-Type: application/json
< ./test-data/leveragePrice1.json
Query it from the RocksDB local storage:
GET http://localhost:8080/api/leveragePrice/APPL
Add a few more quotes and play around querying it so you'll see how the storage is automatically updated with the most up to date quote you've sent.
As usual you can check the code up to this point checking out the project repo with git clone git@github.com:mmaia/simple-spring-kafka-stream-kotlin.git
and check out v7 with git checkout v7
.
This is it for part 3 of this series, I hope you had some fun learning and playing with Kafka Streams so far. In the next post of this series we will create a Stream for Quotes, Join with Leverage and branch producing it to different topics based on some criteria, stay tuned.
Have fun, be kind to yourself and others!
Photo by Ryland Dean on Unsplash
This content originally appeared on DEV Community and was authored by Marcos Maia
Marcos Maia | Sciencx (2022-02-11T15:38:53+00:00) Spring Kafka Streams playground with Kotlin – III. Retrieved from https://www.scien.cx/2022/02/11/spring-kafka-streams-playground-with-kotlin-iii/
Please log in to upload a file.
There are no updates yet.
Click the Upload button above to add an update.