Skip to content

Commit 09dd84b

Browse files
authored
Merge pull request #19 from JebronLames32/kafkaSupport
Add Kafka consumer for Fineract external-events
2 parents 4d8abc4 + 05c241c commit 09dd84b

File tree

8 files changed

+159
-1
lines changed

8 files changed

+159
-1
lines changed

README.md

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,23 @@
1-
# message-consumer
1+
# message-consumer
2+
3+
## Event Consumption via Kafka or JMS
4+
5+
### Spring Profiles
6+
7+
| Profile | Behavior |
8+
|-------------|-------------------------------------------|
9+
| `kafka` | Enables Kafka-based event consumption |
10+
| `jms` | Enables JMS/ActiveMQ-based consumption |
11+
| *(none)* | Disables both consumers |
12+
13+
---
14+
15+
### Running with Kafka
16+
```bash
17+
./gradlew bootRun --args='--spring.profiles.active=kafka'
18+
```
19+
20+
### Running with JMS
21+
```bash
22+
./gradlew bootRun --args='--spring.profiles.active=jms'
23+
```

build.gradle

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,15 @@ dependencies {
2525
implementation group: 'org.apache.avro', name: 'avro', version: '1.12.0'
2626
implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
2727
implementation 'org.springframework.boot:spring-boot-starter-web'
28+
implementation 'org.apache.kafka:kafka-clients:4.0.0'
29+
implementation 'org.springframework.kafka:spring-kafka'
30+
implementation 'org.springframework.integration:spring-integration-kafka'
2831
compileOnly 'org.projectlombok:lombok'
2932
runtimeOnly 'com.h2database:h2'
3033
annotationProcessor 'org.projectlombok:lombok'
3134
testImplementation 'org.springframework.boot:spring-boot-starter-test'
3235
testImplementation 'org.springframework.integration:spring-integration-test'
36+
testImplementation 'org.springframework.kafka:spring-kafka-test'
3337

3438
implementation "org.apache.fineract:fineract-avro-schemas:${fineractVersion}"
3539
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package org.test.consumer.config;
2+
3+
import org.apache.kafka.clients.consumer.ConsumerConfig;
4+
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
5+
import org.apache.kafka.common.serialization.StringDeserializer;
6+
import org.springframework.beans.factory.annotation.Value;
7+
import org.springframework.context.annotation.Bean;
8+
import org.springframework.context.annotation.Configuration;
9+
import org.springframework.context.annotation.Profile;
10+
import org.springframework.kafka.annotation.EnableKafka;
11+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
12+
import org.springframework.kafka.core.ConsumerFactory;
13+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
14+
15+
import java.util.HashMap;
16+
import java.util.Map;
17+
18+
@EnableKafka
19+
@Configuration
20+
@Profile("kafka")
21+
public class KafkaConsumerConfiguration {
22+
23+
@Value("${spring.kafka.bootstrap-servers}")
24+
private String bootstrapServers;
25+
26+
@Bean
27+
public ConsumerFactory<String, byte[]> consumerFactory() {
28+
Map<String, Object> props = new HashMap<>();
29+
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
30+
props.put(ConsumerConfig.GROUP_ID_CONFIG, "fineract-event-ingestor");
31+
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
32+
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
33+
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
34+
return new DefaultKafkaConsumerFactory<>(props);
35+
}
36+
37+
@Bean
38+
public ConcurrentKafkaListenerContainerFactory<String, byte[]> kafkaListenerContainerFactory() {
39+
var factory = new ConcurrentKafkaListenerContainerFactory<String, byte[]>();
40+
factory.setConsumerFactory(consumerFactory());
41+
return factory;
42+
}
43+
}
44+
45+

src/main/java/org/test/consumer/config/MessageConsumerJMSBrokerConfiguration.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@
55
import org.springframework.beans.factory.annotation.Value;
66
import org.springframework.context.annotation.Bean;
77
import org.springframework.context.annotation.Configuration;
8+
import org.springframework.context.annotation.Profile;
89
import org.springframework.util.StringUtils;
910

1011
@Configuration
12+
@Profile("jms")
1113
public class MessageConsumerJMSBrokerConfiguration {
1214

1315
@Value("${brokerurl}")

src/main/java/org/test/consumer/config/MessageConsumerJMSConfiguration.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,15 @@
77
import org.springframework.context.annotation.Bean;
88
import org.springframework.context.annotation.Configuration;
99
import org.springframework.context.annotation.Import;
10+
import org.springframework.context.annotation.Profile;
1011
import org.springframework.integration.annotation.ServiceActivator;
1112
import org.springframework.integration.channel.DirectChannel;
1213
import org.springframework.integration.dsl.IntegrationFlow;
1314
import org.springframework.integration.jms.dsl.Jms;
1415
import org.test.consumer.handler.JMSMessageConsumerHandler;
1516

1617
@Configuration
18+
@Profile("jms")
1719
@Import(value = {MessageConsumerJMSBrokerConfiguration.class})
1820
public class MessageConsumerJMSConfiguration {
1921
@Autowired

src/main/java/org/test/consumer/handler/JMSMessageConsumerHandler.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import lombok.extern.slf4j.Slf4j;
88
import org.apache.fineract.avro.MessageV1;
99
import org.springframework.beans.factory.annotation.Autowired;
10+
import org.springframework.context.annotation.Profile;
1011
import org.springframework.messaging.Message;
1112
import org.springframework.messaging.MessageHandler;
1213
import org.springframework.messaging.MessagingException;
@@ -18,6 +19,7 @@
1819

1920
@Component
2021
@Slf4j
22+
@Profile("jms")
2123
public class JMSMessageConsumerHandler implements MessageHandler {
2224
@Autowired
2325
private ByteBufferConvertor byteBufferConvertor;
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package org.test.consumer.handler;
2+
3+
import lombok.extern.slf4j.Slf4j;
4+
import org.apache.fineract.avro.MessageV1;
5+
import org.springframework.beans.factory.annotation.Autowired;
6+
import org.springframework.context.annotation.Profile;
7+
import org.springframework.kafka.annotation.KafkaListener;
8+
import org.springframework.messaging.Message;
9+
import org.springframework.messaging.MessageHandler;
10+
import org.springframework.messaging.MessagingException;
11+
import org.springframework.stereotype.Component;
12+
import org.test.consumer.domain.EventMessage;
13+
import org.test.consumer.repository.EventMessageRepository;
14+
import org.test.consumer.utility.ByteBufferConvertor;
15+
16+
import java.io.IOException;
17+
import java.nio.ByteBuffer;
18+
import java.time.LocalDateTime;
19+
import java.time.format.DateTimeFormatter;
20+
21+
/**
22+
* Consumes external Fineract events from Kafka and persists them.
23+
*
24+
* Mirrors the structure of {@link JMSMessageConsumerHandler}.
25+
*/
26+
@Component
27+
@Slf4j
28+
@Profile("kafka")
29+
public class KafkaMessageConsumerHandler implements MessageHandler {
30+
31+
@Autowired
32+
private ByteBufferConvertor byteBufferConvertor;
33+
34+
@Autowired
35+
private EventMessageRepository repository;
36+
37+
@Override
38+
@KafkaListener(
39+
topics = "${app.kafka.topic:external-events}",
40+
containerFactory = "kafkaListenerContainerFactory"
41+
)
42+
public void handleMessage(Message<?> springMessage) throws MessagingException {
43+
byte[] rawPayload = (byte[]) springMessage.getPayload();
44+
ByteBuffer wrapperBuf = byteBufferConvertor.convert(rawPayload);
45+
46+
try {
47+
MessageV1 messagePayload = MessageV1.fromByteBuffer(wrapperBuf);
48+
log.info("Received Kafka event of Category = {}, Type = {}",
49+
messagePayload.getCategory(), messagePayload.getType());
50+
saveMessage(messagePayload);
51+
} catch (IOException e) {
52+
log.error("Unable to read message", e);
53+
}
54+
}
55+
56+
private void saveMessage(MessageV1 messagePayload) {
57+
LocalDateTime createdAt =
58+
LocalDateTime.parse(messagePayload.getCreatedAt(), DateTimeFormatter.ISO_DATE_TIME);
59+
60+
EventMessage message = new EventMessage(
61+
messagePayload.getId(),
62+
messagePayload.getType(),
63+
messagePayload.getCategory(),
64+
messagePayload.getDataschema(),
65+
messagePayload.getTenantId(),
66+
createdAt,
67+
byteBufferConvertor.convert(messagePayload.getData()),
68+
messagePayload.getBusinessDate()
69+
);
70+
repository.save(message);
71+
}
72+
}

src/main/resources/application.properties

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,12 @@ spring.datasource.driverClassName=org.h2.Driver
1010
spring.jpa.database-platform=org.hibernate.dialect.H2Dialect
1111
spring.jpa.hibernate.ddl-auto=validate
1212

13+
#kafka
14+
spring.kafka.bootstrap-servers=localhost:9092
15+
app.kafka.topic=external-events
16+
spring.kafka.consumer.group-id=fineract-event-ingestor
17+
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
18+
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
19+
spring.kafka.consumer.auto-offset-reset=earliest
20+
spring.kafka.admin.auto-create=true
21+

0 commit comments

Comments
 (0)