Skip to content

Commit 34c168b

Browse files
abhinavAbhinav Mishra
authored andcommitted
checkstyle changes
1 parent 8c9f5af commit 34c168b

File tree

7 files changed

+301
-270
lines changed

7 files changed

+301
-270
lines changed

src/main/java/org/mifos/processor/bulk/camel/routes/SplittingRoute.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@
1919
import com.fasterxml.jackson.databind.SequenceWriter;
2020
import com.fasterxml.jackson.dataformat.csv.CsvMapper;
2121
import com.fasterxml.jackson.dataformat.csv.CsvSchema;
22-
23-
import java.io.*;
22+
import java.io.BufferedReader;
23+
import java.io.File;
24+
import java.io.FileReader;
25+
import java.io.IOException;
2426
import java.util.ArrayList;
2527
import java.util.Date;
2628
import java.util.HashMap;
@@ -32,8 +34,7 @@
3234
import org.apache.camel.LoggingLevel;
3335
import org.mifos.processor.bulk.schema.SubBatchEntity;
3436
import org.mifos.processor.bulk.schema.Transaction;
35-
import org.mifos.processor.bulk.utility.TransactionUtil;
36-
import org.mifos.processor.bulk.utility.Utils;
37+
import org.mifos.processor.bulk.utility.TransactionParser;
3738
import org.springframework.beans.factory.annotation.Autowired;
3839
import org.springframework.beans.factory.annotation.Value;
3940
import org.springframework.stereotype.Component;
@@ -143,7 +144,7 @@ public void configure() throws Exception {
143144

144145
List<Transaction> subBatchTransactions = new ArrayList<>();
145146
for (int j = i; j < Math.min(i + subBatchSize, lines.size()); j++) {
146-
Transaction transaction = TransactionUtil.parseLineToTransaction(lines.get(j));
147+
Transaction transaction = TransactionParser.parseLineToTransaction(lines.get(j));
147148
assert transaction != null;
148149
transaction.setBatchId(subBatchId); // Set the subBatchId for the transaction
149150
subBatchTransactions.add(transaction);
Lines changed: 116 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -1,116 +1,116 @@
1-
//package org.mifos.processor.bulk.kafka;
2-
//
3-
//import static org.mifos.connector.common.mojaloop.type.InitiatorType.CONSUMER;
4-
//import static org.mifos.connector.common.mojaloop.type.Scenario.TRANSFER;
5-
//import static org.mifos.connector.common.mojaloop.type.TransactionRole.PAYER;
6-
//import static org.mifos.processor.bulk.zeebe.ZeebeVariables.BATCH_ID;
7-
//import static org.mifos.processor.bulk.zeebe.ZeebeVariables.GSMA_CHANNEL_REQUEST;
8-
//import static org.mifos.processor.bulk.zeebe.ZeebeVariables.INITIATOR_FSPID;
9-
//import static org.mifos.processor.bulk.zeebe.ZeebeVariables.IS_RTP_REQUEST;
10-
//import static org.mifos.processor.bulk.zeebe.ZeebeVariables.PARTY_ID;
11-
//import static org.mifos.processor.bulk.zeebe.ZeebeVariables.PARTY_ID_TYPE;
12-
//import static org.mifos.processor.bulk.zeebe.ZeebeVariables.PARTY_LOOKUP_FSPID;
13-
//import static org.mifos.processor.bulk.zeebe.ZeebeVariables.TENANT_ID;
14-
//import static org.mifos.processor.bulk.zeebe.ZeebeVariables.TRANSACTION_TYPE;
15-
//
16-
//import com.fasterxml.jackson.core.JsonProcessingException;
17-
//import com.fasterxml.jackson.databind.ObjectMapper;
18-
//import java.util.HashMap;
19-
//import java.util.Map;
20-
//import lombok.extern.slf4j.Slf4j;
21-
//import org.mifos.connector.common.channel.dto.TransactionChannelRequestDTO;
22-
//import org.mifos.connector.common.gsma.dto.GSMATransaction;
23-
//import org.mifos.connector.common.gsma.dto.GsmaParty;
24-
//import org.mifos.connector.common.mojaloop.dto.MoneyData;
25-
//import org.mifos.connector.common.mojaloop.dto.Party;
26-
//import org.mifos.connector.common.mojaloop.dto.PartyIdInfo;
27-
//import org.mifos.connector.common.mojaloop.dto.TransactionType;
28-
//import org.mifos.connector.common.mojaloop.type.IdentifierType;
29-
//import org.mifos.processor.bulk.schema.TransactionOlder;
30-
//import org.mifos.processor.bulk.zeebe.ZeebeProcessStarter;
31-
//import org.springframework.beans.factory.annotation.Autowired;
32-
//import org.springframework.beans.factory.annotation.Value;
33-
//import org.springframework.kafka.annotation.KafkaListener;
34-
//import org.springframework.stereotype.Service;
35-
//
36-
//@Service
37-
//@Slf4j
38-
//public class Consumers {
39-
//
40-
// @Value("${bpmn.flows.international-remittance-payer}")
41-
// private String internationalRemittancePayer;
42-
//
43-
// @Autowired
44-
// private ObjectMapper objectMapper;
45-
//
46-
// @Autowired
47-
// private ZeebeProcessStarter zeebeProcessStarter;
48-
//
49-
// @KafkaListener(topics = "${kafka.topic.gsma.name}", groupId = "group_id")
50-
// public void listenTopicGsma(String message) throws JsonProcessingException {
51-
// log.debug("Received Message in topic GSMA and group group_id: {}", message);
52-
// TransactionOlder transaction = objectMapper.readValue(message, TransactionOlder.class);
53-
// String tenantId = "ibank-usa";
54-
//
55-
// GSMATransaction gsmaChannelRequest = new GSMATransaction();
56-
// gsmaChannelRequest.setAmount(transaction.getAmount());
57-
// gsmaChannelRequest.setCurrency(transaction.getCurrency());
58-
// gsmaChannelRequest.setRequestingLei("ibank-usa");
59-
// gsmaChannelRequest.setReceivingLei("ibank-india");
60-
// GsmaParty creditParty = new GsmaParty();
61-
// creditParty.setKey("msisdn");
62-
// creditParty.setValue(transaction.getAccountNumber());
63-
// GsmaParty debitParty = new GsmaParty();
64-
// debitParty.setKey("msisdn");
65-
// debitParty.setValue(transaction.getAccountNumber());
66-
// gsmaChannelRequest.setCreditParty(new GsmaParty[] { creditParty });
67-
// gsmaChannelRequest.setDebitParty(new GsmaParty[] { debitParty });
68-
// // gsmaChannelRequest.setInternationalTransferInformation().setReceivingAmount(gsmaChannelRequest.getAmount());
69-
//
70-
// TransactionChannelRequestDTO channelRequest = new TransactionChannelRequestDTO(); // Fineract Object
71-
// Party payee = new Party(new PartyIdInfo(IdentifierType.MSISDN, transaction.getAccountNumber()));
72-
// Party payer = new Party(new PartyIdInfo(IdentifierType.MSISDN, "7543010"));
73-
//
74-
// MoneyData moneyData = new MoneyData();
75-
// moneyData.setAmount(transaction.getAmount());
76-
// moneyData.setCurrency(transaction.getCurrency());
77-
//
78-
// channelRequest.setPayer(payer);
79-
// channelRequest.setPayee(payee);
80-
// channelRequest.setAmount(moneyData);
81-
//
82-
// TransactionType transactionType = new TransactionType();
83-
// transactionType.setInitiator(PAYER);
84-
// transactionType.setInitiatorType(CONSUMER);
85-
// transactionType.setScenario(TRANSFER);
86-
//
87-
// Map<String, Object> extraVariables = new HashMap<>();
88-
// extraVariables.put(IS_RTP_REQUEST, false);
89-
// extraVariables.put(TRANSACTION_TYPE, "inttransfer");
90-
// extraVariables.put(TENANT_ID, tenantId);
91-
//
92-
// extraVariables.put(BATCH_ID, transaction.getBatchId());
93-
//
94-
// String tenantSpecificBpmn = internationalRemittancePayer.replace("{dfspid}", tenantId);
95-
// channelRequest.setTransactionType(transactionType);
96-
//
97-
// PartyIdInfo requestedParty = (boolean) extraVariables.get(IS_RTP_REQUEST) ? channelRequest.getPayer().getPartyIdInfo()
98-
// : channelRequest.getPayee().getPartyIdInfo();
99-
// extraVariables.put(PARTY_ID_TYPE, requestedParty.getPartyIdType());
100-
// extraVariables.put(PARTY_ID, requestedParty.getPartyIdentifier());
101-
//
102-
// extraVariables.put(GSMA_CHANNEL_REQUEST, objectMapper.writeValueAsString(gsmaChannelRequest));
103-
// extraVariables.put(PARTY_LOOKUP_FSPID, gsmaChannelRequest.getReceivingLei());
104-
// extraVariables.put(INITIATOR_FSPID, gsmaChannelRequest.getRequestingLei());
105-
//
106-
// String transactionId = zeebeProcessStarter.startZeebeWorkflow(tenantSpecificBpmn, objectMapper.writeValueAsString(channelRequest),
107-
// extraVariables);
108-
//
109-
// log.debug("GSMA Transaction Started with:{} ", transactionId);
110-
// }
111-
//
112-
// @KafkaListener(topics = "${kafka.topic.slcb.name}", groupId = "group_id")
113-
// public void listenTopicSlcb(String message) {
114-
// log.debug("Received Message in topic SLCB and group group_id:{} ", message);
115-
// }
116-
//}
1+
package org.mifos.processor.bulk.kafka;
2+
3+
import static org.mifos.connector.common.mojaloop.type.InitiatorType.CONSUMER;
4+
import static org.mifos.connector.common.mojaloop.type.Scenario.TRANSFER;
5+
import static org.mifos.connector.common.mojaloop.type.TransactionRole.PAYER;
6+
import static org.mifos.processor.bulk.zeebe.ZeebeVariables.BATCH_ID;
7+
import static org.mifos.processor.bulk.zeebe.ZeebeVariables.GSMA_CHANNEL_REQUEST;
8+
import static org.mifos.processor.bulk.zeebe.ZeebeVariables.INITIATOR_FSPID;
9+
import static org.mifos.processor.bulk.zeebe.ZeebeVariables.IS_RTP_REQUEST;
10+
import static org.mifos.processor.bulk.zeebe.ZeebeVariables.PARTY_ID;
11+
import static org.mifos.processor.bulk.zeebe.ZeebeVariables.PARTY_ID_TYPE;
12+
import static org.mifos.processor.bulk.zeebe.ZeebeVariables.PARTY_LOOKUP_FSPID;
13+
import static org.mifos.processor.bulk.zeebe.ZeebeVariables.TENANT_ID;
14+
import static org.mifos.processor.bulk.zeebe.ZeebeVariables.TRANSACTION_TYPE;
15+
16+
import com.fasterxml.jackson.core.JsonProcessingException;
17+
import com.fasterxml.jackson.databind.ObjectMapper;
18+
import java.util.HashMap;
19+
import java.util.Map;
20+
import lombok.extern.slf4j.Slf4j;
21+
import org.mifos.connector.common.channel.dto.TransactionChannelRequestDTO;
22+
import org.mifos.connector.common.gsma.dto.GSMATransaction;
23+
import org.mifos.connector.common.gsma.dto.GsmaParty;
24+
import org.mifos.connector.common.mojaloop.dto.MoneyData;
25+
import org.mifos.connector.common.mojaloop.dto.Party;
26+
import org.mifos.connector.common.mojaloop.dto.PartyIdInfo;
27+
import org.mifos.connector.common.mojaloop.dto.TransactionType;
28+
import org.mifos.connector.common.mojaloop.type.IdentifierType;
29+
import org.mifos.processor.bulk.schema.TransactionOlder;
30+
import org.mifos.processor.bulk.zeebe.ZeebeProcessStarter;
31+
import org.springframework.beans.factory.annotation.Autowired;
32+
import org.springframework.beans.factory.annotation.Value;
33+
import org.springframework.kafka.annotation.KafkaListener;
34+
import org.springframework.stereotype.Service;
35+
36+
@Service
37+
@Slf4j
38+
public class Consumers {
39+
40+
@Value("${bpmn.flows.international-remittance-payer}")
41+
private String internationalRemittancePayer;
42+
43+
@Autowired
44+
private ObjectMapper objectMapper;
45+
46+
@Autowired
47+
private ZeebeProcessStarter zeebeProcessStarter;
48+
49+
@KafkaListener(topics = "${kafka.topic.gsma.name}", groupId = "group_id")
50+
public void listenTopicGsma(String message) throws JsonProcessingException {
51+
log.debug("Received Message in topic GSMA and group group_id: {}", message);
52+
TransactionOlder transaction = objectMapper.readValue(message, TransactionOlder.class);
53+
String tenantId = "ibank-usa";
54+
55+
GSMATransaction gsmaChannelRequest = new GSMATransaction();
56+
gsmaChannelRequest.setAmount(transaction.getAmount());
57+
gsmaChannelRequest.setCurrency(transaction.getCurrency());
58+
gsmaChannelRequest.setRequestingLei("ibank-usa");
59+
gsmaChannelRequest.setReceivingLei("ibank-india");
60+
GsmaParty creditParty = new GsmaParty();
61+
creditParty.setKey("msisdn");
62+
creditParty.setValue(transaction.getAccountNumber());
63+
GsmaParty debitParty = new GsmaParty();
64+
debitParty.setKey("msisdn");
65+
debitParty.setValue(transaction.getAccountNumber());
66+
gsmaChannelRequest.setCreditParty(new GsmaParty[] { creditParty });
67+
gsmaChannelRequest.setDebitParty(new GsmaParty[] { debitParty });
68+
// gsmaChannelRequest.setInternationalTransferInformation().setReceivingAmount(gsmaChannelRequest.getAmount());
69+
70+
TransactionChannelRequestDTO channelRequest = new TransactionChannelRequestDTO(); // Fineract Object
71+
Party payee = new Party(new PartyIdInfo(IdentifierType.MSISDN, transaction.getAccountNumber()));
72+
Party payer = new Party(new PartyIdInfo(IdentifierType.MSISDN, "7543010"));
73+
74+
MoneyData moneyData = new MoneyData();
75+
moneyData.setAmount(transaction.getAmount());
76+
moneyData.setCurrency(transaction.getCurrency());
77+
78+
channelRequest.setPayer(payer);
79+
channelRequest.setPayee(payee);
80+
channelRequest.setAmount(moneyData);
81+
82+
TransactionType transactionType = new TransactionType();
83+
transactionType.setInitiator(PAYER);
84+
transactionType.setInitiatorType(CONSUMER);
85+
transactionType.setScenario(TRANSFER);
86+
87+
Map<String, Object> extraVariables = new HashMap<>();
88+
extraVariables.put(IS_RTP_REQUEST, false);
89+
extraVariables.put(TRANSACTION_TYPE, "inttransfer");
90+
extraVariables.put(TENANT_ID, tenantId);
91+
92+
extraVariables.put(BATCH_ID, transaction.getBatchId());
93+
94+
String tenantSpecificBpmn = internationalRemittancePayer.replace("{dfspid}", tenantId);
95+
channelRequest.setTransactionType(transactionType);
96+
97+
PartyIdInfo requestedParty = (boolean) extraVariables.get(IS_RTP_REQUEST) ? channelRequest.getPayer().getPartyIdInfo()
98+
: channelRequest.getPayee().getPartyIdInfo();
99+
extraVariables.put(PARTY_ID_TYPE, requestedParty.getPartyIdType());
100+
extraVariables.put(PARTY_ID, requestedParty.getPartyIdentifier());
101+
102+
extraVariables.put(GSMA_CHANNEL_REQUEST, objectMapper.writeValueAsString(gsmaChannelRequest));
103+
extraVariables.put(PARTY_LOOKUP_FSPID, gsmaChannelRequest.getReceivingLei());
104+
extraVariables.put(INITIATOR_FSPID, gsmaChannelRequest.getRequestingLei());
105+
106+
String transactionId = zeebeProcessStarter.startZeebeWorkflow(tenantSpecificBpmn, objectMapper.writeValueAsString(channelRequest),
107+
extraVariables);
108+
109+
log.debug("GSMA Transaction Started with:{} ", transactionId);
110+
}
111+
112+
@KafkaListener(topics = "${kafka.topic.slcb.name}", groupId = "group_id")
113+
public void listenTopicSlcb(String message) {
114+
log.debug("Received Message in topic SLCB and group group_id:{} ", message);
115+
}
116+
}
Lines changed: 39 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,39 @@
1-
//package org.mifos.processor.bulk.kafka.config;
2-
//
3-
//import java.util.HashMap;
4-
//import java.util.Map;
5-
//import org.apache.kafka.clients.consumer.ConsumerConfig;
6-
//import org.apache.kafka.common.serialization.StringDeserializer;
7-
//import org.springframework.beans.factory.annotation.Value;
8-
//import org.springframework.context.annotation.Bean;
9-
//import org.springframework.context.annotation.Configuration;
10-
//import org.springframework.kafka.annotation.EnableKafka;
11-
//import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
12-
//import org.springframework.kafka.core.ConsumerFactory;
13-
//import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
14-
//
15-
//@EnableKafka
16-
//@Configuration
17-
//public class KafkaConsumerConfig {
18-
//
19-
// @Value(value = "${kafka.bootstrapAddress}")
20-
// private String bootstrapAddress;
21-
//
22-
// @Bean
23-
// public ConsumerFactory<String, String> consumerFactory() {
24-
// Map<String, Object> props = new HashMap<>();
25-
// props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
26-
// props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
27-
// props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
28-
// props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
29-
// return new DefaultKafkaConsumerFactory<>(props);
30-
// }
31-
//
32-
// @Bean
33-
// public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
34-
// ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
35-
// factory.setConsumerFactory(consumerFactory());
36-
// return factory;
37-
// }
38-
//
39-
//}
1+
package org.mifos.processor.bulk.kafka.config;
2+
3+
import java.util.HashMap;
4+
import java.util.Map;
5+
import org.apache.kafka.clients.consumer.ConsumerConfig;
6+
import org.apache.kafka.common.serialization.StringDeserializer;
7+
import org.springframework.beans.factory.annotation.Value;
8+
import org.springframework.context.annotation.Bean;
9+
import org.springframework.context.annotation.Configuration;
10+
import org.springframework.kafka.annotation.EnableKafka;
11+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
12+
import org.springframework.kafka.core.ConsumerFactory;
13+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
14+
15+
@EnableKafka
16+
@Configuration
17+
public class KafkaConsumerConfig {
18+
19+
@Value(value = "${kafka.bootstrapAddress}")
20+
private String bootstrapAddress;
21+
22+
@Bean
23+
public ConsumerFactory<String, String> consumerFactory() {
24+
Map<String, Object> props = new HashMap<>();
25+
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
26+
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
27+
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
28+
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
29+
return new DefaultKafkaConsumerFactory<>(props);
30+
}
31+
32+
@Bean
33+
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
34+
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
35+
factory.setConsumerFactory(consumerFactory());
36+
return factory;
37+
}
38+
39+
}

0 commit comments

Comments
 (0)