diff --git a/Dockerfile b/Dockerfile index 4bad37e3..ac38c28a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -2,4 +2,4 @@ FROM openjdk:17 EXPOSE 5000 COPY build/libs/*.jar . -CMD java -jar *.jar +CMD java -jar *.jar \ No newline at end of file diff --git a/src/main/java/org/mifos/processor/bulk/camel/routes/InitSubBatchRoute.java b/src/main/java/org/mifos/processor/bulk/camel/routes/InitSubBatchRoute.java index 8983c5bf..5b876233 100644 --- a/src/main/java/org/mifos/processor/bulk/camel/routes/InitSubBatchRoute.java +++ b/src/main/java/org/mifos/processor/bulk/camel/routes/InitSubBatchRoute.java @@ -115,7 +115,6 @@ public void configure() throws Exception { PaymentModeMapping mapping = paymentModeConfiguration.getByMode(paymentMode); String tenantName = exchange.getProperty(TENANT_NAME, String.class); - tenantName = mapping.getDebulkingDfspid() == null ? tenantName : mapping.getDebulkingDfspid(); Map variables = exchange.getProperty(ZEEBE_VARIABLE, Map.class); variables.put(PAYMENT_MODE, paymentMode); variables.put(DEBULKINGDFSPID, mapping.getDebulkingDfspid() == null ? tenantName : mapping.getDebulkingDfspid()); diff --git a/src/main/java/org/mifos/processor/bulk/camel/routes/SplittingRoute.java b/src/main/java/org/mifos/processor/bulk/camel/routes/SplittingRoute.java index 8edfd624..6aaf3db1 100644 --- a/src/main/java/org/mifos/processor/bulk/camel/routes/SplittingRoute.java +++ b/src/main/java/org/mifos/processor/bulk/camel/routes/SplittingRoute.java @@ -1,7 +1,9 @@ package org.mifos.processor.bulk.camel.routes; import static org.mifos.processor.bulk.camel.config.CamelProperties.LOCAL_FILE_PATH; +import static org.mifos.processor.bulk.camel.config.CamelProperties.OVERRIDE_HEADER; import static org.mifos.processor.bulk.camel.config.CamelProperties.REGISTERING_INSTITUTE_ID; +import static org.mifos.processor.bulk.camel.config.CamelProperties.RESULT_TRANSACTION_LIST; import static org.mifos.processor.bulk.camel.config.CamelProperties.SERVER_FILE_NAME; import static org.mifos.processor.bulk.camel.config.CamelProperties.SERVER_SUB_BATCH_FILE_NAME_ARRAY; import static org.mifos.processor.bulk.camel.config.CamelProperties.SUB_BATCH_COUNT; @@ -22,7 +24,7 @@ import java.io.BufferedReader; import java.io.File; import java.io.FileReader; -import java.io.FileWriter; +import java.io.IOException; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; @@ -34,6 +36,7 @@ import org.apache.camel.LoggingLevel; import org.mifos.processor.bulk.schema.SubBatchEntity; import org.mifos.processor.bulk.schema.Transaction; +import org.mifos.processor.bulk.utility.TransactionParser; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; @@ -69,18 +72,39 @@ public void configure() throws Exception { List subBatchFile = new ArrayList<>(); Set distinctPayeeIds = transactionList.stream().map(Transaction::getPayeeDfspId).collect(Collectors.toSet()); logger.info("Payee id {}", distinctPayeeIds); - if (partyLookupEnabled && !distinctPayeeIds.isEmpty()) { + logger.info("Number of payeeId {}", distinctPayeeIds.size()); + Boolean batchAccountLookup = (Boolean) exchange.getProperty("batchAccountLookup"); + if (partyLookupEnabled && batchAccountLookup) { // Create a map to store transactions for each payeeid Map> transactionsByPayeeId = new HashMap<>(); // Split the list based on distinct payeeids + Map subBatchIdPayeeMap = new HashMap<>(); + Map> subBatchIdMap = new HashMap<>(); + List subBatchIdList = new ArrayList<>(); + List updatedTransactionList = new ArrayList(); for (String payeeId : distinctPayeeIds) { List transactionsForPayee = transactionList.stream() .filter(transaction -> payeeId.equals(transaction.getPayeeDfspId())).collect(Collectors.toList()); - transactionsByPayeeId.put(payeeId, transactionsForPayee); + String subBatchId = UUID.randomUUID().toString(); + subBatchIdList.add(subBatchId); + subBatchIdPayeeMap.put(payeeId, subBatchId); + subBatchIdMap.put(subBatchId, transactionsForPayee); + } + logger.info("Number of SubBatch based on payeeId {}", subBatchIdList.size()); + // mapping subBatchId in transactionList + for (String subBatchId : subBatchIdList) { + List transactions = subBatchIdMap.get(subBatchId); + for (Transaction transaction : transactions) { + for (Transaction originalTransaction : transactionList) { + if (originalTransaction.equals(transaction)) { + originalTransaction.setBatchId(subBatchId); + updatedTransactionList.add(originalTransaction); + } + } + } } - for (String payeeId : distinctPayeeIds) { List transactionsForSpecificPayee = transactionsByPayeeId.get(payeeId); String filename = UUID.randomUUID() + "_" + "sub-batch-" + payeeId + ".csv"; @@ -90,9 +114,12 @@ public void configure() throws Exception { File file = new File(filename); SequenceWriter writer = csvMapper.writerWithSchemaFor(Transaction.class).with(csvSchema).writeValues(file); for (Transaction transaction : transactionsForSpecificPayee) { + transaction.setBatchId(subBatchIdPayeeMap.get(payeeId)); writer.write(transaction); } + exchange.setProperty(RESULT_TRANSACTION_LIST, updatedTransactionList); subBatchFile.add(filename); + exchange.setProperty(TRANSACTION_LIST, updatedTransactionList); } } else { List lines = new ArrayList<>(); @@ -110,16 +137,30 @@ public void configure() throws Exception { } int subBatchCount = 1; + CsvSchema csvSchema = csvMapper.schemaFor(Transaction.class); + csvSchema = csvSchema.withHeader(); for (int i = 0; i < lines.size(); i += subBatchSize) { + String subBatchId = UUID.randomUUID().toString(); String filename = UUID.randomUUID() + "_" + "sub-batch-" + subBatchCount + ".csv"; - FileWriter writer = new FileWriter(filename); - writer.write(header); + logger.info("SubBatch Id {}", subBatchId); + + List subBatchTransactions = new ArrayList<>(); for (int j = i; j < Math.min(i + subBatchSize, lines.size()); j++) { - writer.write(lines.get(j) + System.lineSeparator()); + Transaction transaction = TransactionParser.parseLineToTransaction(lines.get(j)); + assert transaction != null; + transaction.setBatchId(subBatchId); // Set the subBatchId for the transaction + subBatchTransactions.add(transaction); + } + + // Write the list of Transactions to the file + File file = new File(filename); + try (SequenceWriter writer = csvMapper.writer(csvSchema).writeValues(file)) { + writer.writeAll(subBatchTransactions); + } catch (IOException e) { + logger.error("Failed to write sub-batch file: " + filename, e); } - writer.close(); logger.info("Created sub-batch with file name {}", filename); - subBatchFile.add(filename); + subBatchFile.add(filename); // Ensure this list is declared and accessible subBatchCount++; } } @@ -127,7 +168,9 @@ public void configure() throws Exception { exchange.setProperty(SUB_BATCH_COUNT, subBatchFile.size()); exchange.setProperty(SUB_BATCH_CREATED, true); exchange.setProperty(SERVER_SUB_BATCH_FILE_NAME_ARRAY, new ArrayList()); - }); + }).log("updating orignal").setProperty(LOCAL_FILE_PATH, exchangeProperty(SERVER_FILE_NAME)) + .setProperty(OVERRIDE_HEADER, constant(true)) // default header in CSV file will be used + .to("direct:update-file-v2").to("direct:upload-file"); // Iterate through each CSVs of sub-batches and uploads in cloud from("direct:upload-sub-batch-file").id("direct:upload-sub-batch-file").log("Starting upload of sub-batch file") @@ -165,7 +208,7 @@ public void configure() throws Exception { SubBatchEntity subBatchEntity = getDefaultSubBatchEntity(); subBatchEntity.setBatchId((String) zeebeVariables.get(BATCH_ID)); - subBatchEntity.setSubBatchId(UUID.randomUUID().toString()); + subBatchEntity.setSubBatchId(transactionList.get(0).getBatchId()); subBatchEntity.setRequestId((String) zeebeVariables.get(REQUEST_ID)); subBatchEntity.setCorrelationId((String) zeebeVariables.get(CLIENT_CORRELATION_ID)); subBatchEntity.setPayerFsp((String) zeebeVariables.get(PAYER_IDENTIFIER)); diff --git a/src/main/java/org/mifos/processor/bulk/schema/Transaction.java b/src/main/java/org/mifos/processor/bulk/schema/Transaction.java index 7c8f9514..79b12798 100644 --- a/src/main/java/org/mifos/processor/bulk/schema/Transaction.java +++ b/src/main/java/org/mifos/processor/bulk/schema/Transaction.java @@ -4,14 +4,16 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import java.util.Objects; import lombok.Getter; import lombok.Setter; @Getter @Setter @JsonIgnoreProperties(ignoreUnknown = true) -@JsonPropertyOrder({ "id", "request_id", "payment_mode", "account_number", "payer_identifier_type", "payer_identifier", - "payee_identifier_type", "payee_identifier", "amount", "currency", "note", "program_shortcode", "cycle", "payee_dfsp_id" }) +@JsonPropertyOrder({ "id", "request_id", "payment_mode", "payer_identifier_type", "payer_identifier", + "payee_identifier_type", "payee_identifier", "amount", "currency", "note", "program_shortcode", "cycle", "payee_dfsp_id", + "batch_id", "account_number" }) public class Transaction implements CsvSchema { @JsonProperty("id") @@ -32,6 +34,29 @@ public class Transaction implements CsvSchema { @JsonProperty("currency") private String currency; + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Transaction that = (Transaction) o; + return id == that.id && Objects.equals(requestId, that.requestId) && Objects.equals(paymentMode, that.paymentMode) + && Objects.equals(accountNumber, that.accountNumber) && Objects.equals(amount, that.amount) + && Objects.equals(currency, that.currency) && Objects.equals(note, that.note) + && Objects.equals(payerIdentifierType, that.payerIdentifierType) && Objects.equals(payerIdentifier, that.payerIdentifier) + && Objects.equals(payeeIdentifierType, that.payeeIdentifierType) && Objects.equals(payeeIdentifier, that.payeeIdentifier) + && Objects.equals(payeeDfspId, that.payeeDfspId); + } + + @Override + public int hashCode() { + return Objects.hash(id, requestId, paymentMode, accountNumber, amount, currency, note, payerIdentifierType, payerIdentifier, + payeeIdentifierType, payeeIdentifier, payeeDfspId); + } + @JsonProperty("note") private String note; @@ -56,7 +81,7 @@ public class Transaction implements CsvSchema { @JsonProperty("payee_dfsp_id") private String payeeDfspId; - @JsonIgnore + @JsonProperty("batch_id") private String batchId; @Override diff --git a/src/main/java/org/mifos/processor/bulk/utility/TransactionParser.java b/src/main/java/org/mifos/processor/bulk/utility/TransactionParser.java new file mode 100644 index 00000000..03f4138f --- /dev/null +++ b/src/main/java/org/mifos/processor/bulk/utility/TransactionParser.java @@ -0,0 +1,66 @@ +package org.mifos.processor.bulk.utility; + +import org.mifos.processor.bulk.schema.Transaction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class TransactionParser { + + private static final Logger logger = LoggerFactory.getLogger(TransactionParser.class); + + private TransactionParser() { + throw new IllegalStateException("Utility class"); + } + + public static Transaction parseLineToTransaction(String line) { + try { + String[] parts = line.split(",", -1); + Transaction transaction = new Transaction(); + + if (parts.length > 0 && !parts[0].isEmpty()) { + transaction.setId(Integer.parseInt(parts[0])); + } + if (parts.length > 1) { + transaction.setRequestId(parts[1]); + } + if (parts.length > 2) { + transaction.setPaymentMode(parts[2]); + } + if (parts.length > 4) { + transaction.setPayerIdentifierType(parts[3]); + } + if (parts.length > 5) { + transaction.setPayerIdentifier(parts[4]); + } + if (parts.length > 6) { + transaction.setPayeeIdentifierType(parts[5]); + } + if (parts.length > 7) { + transaction.setPayeeIdentifier(parts[6]); + } + if (parts.length > 8) { + transaction.setAmount(parts[7]); + } + if (parts.length > 9) { + transaction.setCurrency(parts[8]); + } + if (parts.length > 10) { + transaction.setNote(parts[9]); + } + if (parts.length > 11) { + transaction.setProgramShortCode(parts[10]); + } + if (parts.length > 12) { + transaction.setCycle(parts[11]); + } + if (parts.length > 13) { + transaction.setPayeeDfspId(parts[12]); + } + + return transaction; + } catch (Exception e) { + logger.error("Error parsing line to Transaction object: {}", line, e); + return null; + } + } +} diff --git a/src/main/java/org/mifos/processor/bulk/zeebe/worker/BatchAccountLookupCallbackWorker.java b/src/main/java/org/mifos/processor/bulk/zeebe/worker/BatchAccountLookupCallbackWorker.java index 4a94084e..f99c070b 100644 --- a/src/main/java/org/mifos/processor/bulk/zeebe/worker/BatchAccountLookupCallbackWorker.java +++ b/src/main/java/org/mifos/processor/bulk/zeebe/worker/BatchAccountLookupCallbackWorker.java @@ -34,6 +34,7 @@ public void setup() { String filename = (String) variables.get(FILE_NAME); String batchAccountLookupCallback = (String) variables.get("batchAccountLookupCallback"); variables.put(PARTY_LOOKUP_FAILED, false); + variables.put("batchAccountLookup", true); exchange.setProperty(SERVER_FILE_NAME, filename); exchange.setProperty("batchAccountLookupCallback", batchAccountLookupCallback); exchange.setProperty("workflowInstanceKey", job.getProcessInstanceKey()); diff --git a/src/main/java/org/mifos/processor/bulk/zeebe/worker/InitSubBatchWorker.java b/src/main/java/org/mifos/processor/bulk/zeebe/worker/InitSubBatchWorker.java index 2acacf11..55734bef 100644 --- a/src/main/java/org/mifos/processor/bulk/zeebe/worker/InitSubBatchWorker.java +++ b/src/main/java/org/mifos/processor/bulk/zeebe/worker/InitSubBatchWorker.java @@ -72,15 +72,15 @@ public void setup() { String fileName = subBatches.remove(0); SubBatchEntity subBatchEntity = null; - - for (SubBatchEntity subBatch : subBatchEntityList) { - if (subBatch.getRequestFile().contains(fileName)) { - subBatchEntity = subBatch; - logger.info("SubBatchEntity found"); + if (isSplittingEnabled) { + for (SubBatchEntity subBatch : subBatchEntityList) { + if (subBatch.getRequestFile().contains(fileName)) { + subBatchEntity = subBatch; + logger.info("SubBatchEntity found"); + } } + logger.debug("BatchEntity for this subbatch is {}", objectMapper.writeValueAsString(subBatchEntity)); } - logger.debug("BatchEntity for this subbatch is {}", objectMapper.writeValueAsString(subBatchEntity)); - Exchange exchange = new DefaultExchange(camelContext); exchange.setProperty(TENANT_NAME, variables.get(TENANT_ID)); exchange.setProperty(SERVER_FILE_NAME, fileName); diff --git a/src/main/java/org/mifos/processor/bulk/zeebe/worker/SplittingWorker.java b/src/main/java/org/mifos/processor/bulk/zeebe/worker/SplittingWorker.java index 8ba21146..f3bd3088 100644 --- a/src/main/java/org/mifos/processor/bulk/zeebe/worker/SplittingWorker.java +++ b/src/main/java/org/mifos/processor/bulk/zeebe/worker/SplittingWorker.java @@ -45,6 +45,9 @@ public void setup() { exchange.setProperty(SERVER_FILE_NAME, filename); exchange.setProperty(ZEEBE_VARIABLE, variables); exchange.setProperty("partyLookupFailed", partyLookupFailed); + exchange.setProperty("batchAccountLookup", + variables.get("batchAccountLookup") != null ? variables.get("batchAccountLookup") : false); + exchange.setProperty(SUB_BATCH_DETAILS, new ArrayList()); try { diff --git a/src/main/resources/application.yaml b/src/main/resources/application.yaml index c4012d69..8cc060f9 100644 --- a/src/main/resources/application.yaml +++ b/src/main/resources/application.yaml @@ -99,7 +99,7 @@ config: enable: true field: "payerIdentifier" splitting: - enable: false + enable: true sub-batch-size: 5 formatting: enable: false @@ -159,7 +159,7 @@ bulk_processor: hostname : "https://ph-ee-connector-bulk:8443" csv: - columnNames: "id,request_id,payment_mode,payer_identifier_type,payer_identifier,payee_identifier_type,payee_identifier,amount,currency,note,account_number,program_shortcode,cycle,payee_dfsp_id" + columnNames: "id,request_id,payment_mode,payer_identifier_type,payer_identifier,payee_identifier_type,payee_identifier,amount,currency,note,account_number,program_shortcode,cycle,payee_dfsp_id,batch_id" size : 100000 # in bytes budget-account: @@ -232,4 +232,4 @@ bpmns: batch-transactions: "bulk_processor_account_lookup-{dfspid}" - id: "lion" flows: - batch-transactions: "bulk_processor-{dfspid}" + batch-transactions: "bulk_processor-{dfspid}" \ No newline at end of file