Multi-threaded Application With Simple Apache Kafka Consumers | by Ivelina Yordanova | Feb, 2022

With code examples in Kotlin

Ivelina Yordanova
Photo by Francisco De Nova on Unsplash
...
private final closed = new AtomicBoolean(false);
...
// this runs in a separate thread than the main
try {
consumer.subscribe(Arrays.asList("topic"));
while (!closed.get()) {
val records = consumer.poll(Duration.ofSeconds(1));
// processing..
}
} catch (e: Exception) {
// Ignore exception if closing
if (!closed.get()) throw e;
} finally {
consumer.close();
}
...
private final closed = new AtomicBoolean(false);
private final readyToCommit = new AtomicBoolean(false);
...
try {
consumer.subscribe(Arrays.asList("topic"));
while (!closed.get()) {
val records = consumer.poll(Duration.ofSeconds(1));
// start processing in another thread - call API, stitch data from this topic with another etc..
while (!readyToCommit.get()) {
// idle loop?, maybe add Thread.sleep() or delay()
}

}
} catch (e: Exception) {
// Ignore exception if closing
if (!closed.get()) throw e;
} finally {
consumer.close();
}

enum class State {
SUBSCRIBING,
CONSUMING
PAUSING,
PAUSED,
RESUMING,
CLOSING
CLOSED,
TERMINATING,
IN_ERROR;
fun isHealthy(): Boolean {
return this != IN_ERROR
}
fun shouldRun(): Boolean {
return this != TERMINATING
}
}
class StateMachine {
companion object {
private val rules = mapOf(
SUBSCRIBING to Predicate<State> { it == RESUMING || it == CLOSED },
CONSUMING to Predicate<State> { it == SUBSCRIBING },
PAUSING to Predicate<State> { it == CONSUMING },
PAUSED to Predicate<State> { it == PAUSING },
RESUMING to Predicate<State> { it == PAUSED },
CLOSING to Predicate<State> { it == CONSUMING },
CLOSED to Predicate<State> { it == CLOSING },
TERMINATING to Predicate<State> { it == SUBSCRIBING ||
it == CONSUMING || it == PAUSING || it == PAUSED || it == RESUMING || it == CLOSING || it == CLOSED || it == IN_ERROR }
IN_ERROR to Predicate<State> { it == SUBSCRIBING ||
it == CONSUMING || it == PAUSING || it == PAUSED || it == RESUMING || it == CLOSING || it == CLOSED }
)
fun transitionRuleFor(state: State) {
return rules[state]||
}
}
private val state = AtomicReference(State.CLOSED) fun transitionTo(newState: State){
state.getAndUpdate{
if(transitionRuleFor(newState).test(oldState)) {
newState
} else {
// handle invalid transitions...
}
}
}
}
...
val state = AtomicReference<StateMachine>(StateMachine())
...
while (state.get().shouldRun()) {
when(state.get()) {
SUBSCRIBING -> subscribe()
CONSUMING -> processRecords()
PAUSING -> pause()
PAUSED -> doNothing() //well almost, will discuss later
RESUMING -> resume()
CLOSING -> close()
CLOSED -> doNothing() //maybe add delays if right for the app
IN_ERROR - > logAndAlert() // any other error handling and either break or attempt a recovery
}
}
// handle successful termination like app TERM on deploy or else
sealed class Message {
sealed class ControlMessage: Message() {
class SubscribeMessage(topics: List<String>): ControlMessage()
object CloseMessage: ControlMessage()
object PauseMessage: ControlMessage()
object ResumeMessage: ControlMessage()
object TerminateMessage: ControlMessage()
}
sealed class FeedbackMessage(consumer: Consumer): Message() {
class SubscribedMessage(c: Consumer): FeedbackMessage(c)
class ClosedMessage(c: Consumer): FeedbackMessage(c)
class PausedMessage(c: Consumer): FeedbackMessage(c)
class ConsumingMessage(c: Consumer): FeedbackMessage(c)
class TerminatedMessage(c: Consumer): FeedbackMessage(c)
class InErrorMessage(c: Consumer, e: Exception): FeedbackMessage(c)
}
}
class ChannelFactory {
fun createChannel(consumer: Consumer<Message>): SendChannel<Message> {
return object: CoroutineScope {
override val coroutineContext = Dispatchers.Unconfined + Job()
val channel = actor<Message> {
for(message in channel) {
consumer.accept(message)
}
}
}.channel
}
}
val consumer = Consumer<Message> { message ->
when (message) {
is SubscribeMessage -> subscribe(message.topics)
is CloseMessage -> close()
is PauseMessage -> pause()
is ResumeMessage -> resume()
is TerminateMessage -> terminate()
}
}
val channel = channelFactory.createChannel(consumer)

Leave a Comment