diff --git a/src/main/java/org/mifos/processor/bulk/api/ApiOriginFilter.java b/src/main/java/org/mifos/processor/bulk/api/ApiOriginFilter.java index 46d29187..1b181ac4 100644 --- a/src/main/java/org/mifos/processor/bulk/api/ApiOriginFilter.java +++ b/src/main/java/org/mifos/processor/bulk/api/ApiOriginFilter.java @@ -27,7 +27,7 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha HttpServletRequest req = (HttpServletRequest) request; String tenant = req.getHeader("Platform-TenantId"); logger.debug("Tenant Name is : {}", tenant); - logger.info("Client IP Address: {}",req.getRemoteHost()); + logger.info("Client IP Address: {}", req.getRemoteHost()); } @Override diff --git a/src/main/java/org/mifos/processor/bulk/camel/routes/BatchStatusRoute.java b/src/main/java/org/mifos/processor/bulk/camel/routes/BatchStatusRoute.java index d00fc449..624cc2ba 100644 --- a/src/main/java/org/mifos/processor/bulk/camel/routes/BatchStatusRoute.java +++ b/src/main/java/org/mifos/processor/bulk/camel/routes/BatchStatusRoute.java @@ -53,12 +53,14 @@ public void configure() throws Exception { BatchDTO batchSummary = exchange.getIn().getBody(BatchDTO.class); int percentage = (int) (((double) batchSummary.getSuccessful() / batchSummary.getTotal()) * 100); + logger.info("Completion rate: {}", percentage); if (percentage >= completionThreshold) { logger.info("Batch success threshold reached. Expected rate: {}, Actual Rate: {}", completionThreshold, percentage); } exchange.setProperty(COMPLETION_RATE, percentage); + logger.info("Exchange Completion rate: {}", exchange.getProperty(COMPLETION_RATE)); }).otherwise().log(LoggingLevel.ERROR, "Batch summary request unsuccessful").process(exchange -> { exchange.setProperty(BATCH_STATUS_FAILED, true); diff --git a/src/main/java/org/mifos/processor/bulk/camel/routes/FileRoute.java b/src/main/java/org/mifos/processor/bulk/camel/routes/FileRoute.java index c4d48b78..fe66509f 100644 --- a/src/main/java/org/mifos/processor/bulk/camel/routes/FileRoute.java +++ b/src/main/java/org/mifos/processor/bulk/camel/routes/FileRoute.java @@ -30,7 +30,7 @@ public void configure() throws Exception { */ from("direct:download-file").id("direct:download-file").log("Started download-file route").process(exchange -> { String filename = exchange.getProperty(SERVER_FILE_NAME, String.class); - + logger.info("Server file name: {}", exchange.getProperty(SERVER_FILE_NAME)); byte[] csvFile = fileTransferService.downloadFile(filename, bucketName); File file = new File(filename); try (FileOutputStream fos = new FileOutputStream(file)) { diff --git a/src/main/java/org/mifos/processor/bulk/camel/routes/HealthRoute.java b/src/main/java/org/mifos/processor/bulk/camel/routes/HealthRoute.java index c9cf0e78..b7d914ff 100644 --- a/src/main/java/org/mifos/processor/bulk/camel/routes/HealthRoute.java +++ b/src/main/java/org/mifos/processor/bulk/camel/routes/HealthRoute.java @@ -10,8 +10,11 @@ public class HealthRoute extends BaseRouteBuilder { public void configure() throws Exception { // todo remove once camel APIs are migrated to spring - from("rest:GET:/actuator/health/liveness") - .id("rest:GET:/actuator/health/liveness") - .setBody(exchange -> new JSONObject(){{ put("status", "UP"); }}.toString()); + from("rest:GET:/actuator/health/liveness").id("rest:GET:/actuator/health/liveness").setBody(exchange -> new JSONObject() { + + { + put("status", "UP"); + } + }.toString()); } } diff --git a/src/main/java/org/mifos/processor/bulk/camel/routes/InitSubBatchRoute.java b/src/main/java/org/mifos/processor/bulk/camel/routes/InitSubBatchRoute.java index 45f95efc..a6b9908d 100644 --- a/src/main/java/org/mifos/processor/bulk/camel/routes/InitSubBatchRoute.java +++ b/src/main/java/org/mifos/processor/bulk/camel/routes/InitSubBatchRoute.java @@ -94,6 +94,7 @@ public void configure() throws Exception { variables.put(FAILED_AMOUNT, exchange.getProperty(FAILED_AMOUNT)); variables.put(COMPLETED_AMOUNT, exchange.getProperty(COMPLETED_AMOUNT)); variables.put(RESULT_FILE, String.format("Result_%s", exchange.getProperty(SERVER_FILE_NAME))); + logger.info("Sub batch ID: {}", variables.get(SUB_BATCH_ID)); exchange.setProperty(ZEEBE_VARIABLE, variables); exchange.setProperty(PAYMENT_MODE, transactionList.get(0).getPaymentMode()); @@ -119,6 +120,8 @@ public void configure() throws Exception { Map variables = exchange.getProperty(ZEEBE_VARIABLE, Map.class); variables.put(PAYMENT_MODE, paymentMode); variables.put(DEBULKINGDFSPID, mapping.getDebulkingDfspid() == null ? tenantName : mapping.getDebulkingDfspid()); + logger.info("BPMN: {}", + Utils.getBulkConnectorBpmnName(mapping.getEndpoint(), mapping.getId().toLowerCase(), tenantName)); zeebeProcessStarter.startZeebeWorkflow( Utils.getBulkConnectorBpmnName(mapping.getEndpoint(), mapping.getId().toLowerCase(), tenantName), variables); exchange.setProperty(INIT_SUB_BATCH_FAILED, false); @@ -131,7 +134,9 @@ public void configure() throws Exception { exchange.setProperty(TRANSACTION_LIST_ELEMENT, transaction); }).setHeader("Platform-TenantId", exchangeProperty(TENANT_NAME)).to("direct:dynamic-payload-setter") .to("direct:external-api-call").to("direct:external-api-response-handler").end() // end loop block - .endChoice(); + .endChoice() + .choice().when(exchangeProperty(INIT_SUB_BATCH_FAILED).isEqualTo(false)) + .to("direct:upload-successful-batch").endChoice(); from("direct:dynamic-payload-setter").id("direct:runtime-payload-test").log("Starting route direct:runtime-payload-test") .process(exchange -> { @@ -150,7 +155,8 @@ public void configure() throws Exception { .process(exchange -> { logger.info("reached here"); exchange.setProperty(INIT_SUB_BATCH_FAILED, false); - }).otherwise().process(exchange -> { + }) + .otherwise().process(exchange -> { exchange.setProperty(INIT_SUB_BATCH_FAILED, true); }).endChoice(); @@ -184,16 +190,18 @@ public void configure() throws Exception { logger.info("Got the config with routing to endpoint {}", mapping.getEndpoint()); } }).choice().when(exchangeProperty(EXTERNAL_ENDPOINT_FAILED).isEqualTo(false)) - .log(LoggingLevel.DEBUG, "Making API call to endpoint ${exchangeProperty.extEndpoint} and body: ${body}") + .log(LoggingLevel.INFO, "Making API call to endpoint ${exchangeProperty.extEndpoint} and body: ${body}") .setHeader(Exchange.CONTENT_TYPE, constant("application/json")) .setHeader(BATCH_ID_HEADER, simple("${exchangeProperty." + BATCH_ID + "}")) .toD(ChannelURL + "${exchangeProperty.extEndpoint}" + "?bridgeEndpoint=true&throwExceptionOnFailure=false") - .log(LoggingLevel.DEBUG, "Response body: ${body}").otherwise().endChoice(); + .log(LoggingLevel.INFO, "Response body: ${body}").otherwise().log("No action taken,").endChoice(); from("direct:validate-payment-mode").id("direct:validate-payment-mode").log("Starting route direct:validate-payment-mode") .process(exchange -> { String paymentMde = exchange.getProperty(PAYMENT_MODE, String.class); + logger.info("Payment mode: {}", paymentMde); PaymentModeMapping mapping = paymentModeConfiguration.getByMode(paymentMde); + logger.info("Mapping: {}", mapping); if (mapping == null) { exchange.setProperty(IS_PAYMENT_MODE_VALID, false); } else { @@ -201,6 +209,22 @@ public void configure() throws Exception { exchange.setProperty(PAYMENT_MODE_TYPE, mapping.getType()); } }); + + from("direct:upload-successful-batch").id("direct:upload-successful-batch") + .log("Starting route direct:upload-successful-batch") + .process(exchange -> { + String serverFileName = exchange.getProperty(SERVER_FILE_NAME, String.class); + String resultFile = String.format("Result_%s", serverFileName); + List transactionList = exchange.getProperty(TRANSACTION_LIST, List.class); + List transactionResultList = updateTransactionStatusToCompleted(transactionList); + exchange.setProperty(RESULT_TRANSACTION_LIST, transactionResultList); + exchange.setProperty(RESULT_FILE, resultFile); + }).setProperty(LOCAL_FILE_PATH, exchangeProperty(RESULT_FILE)).setProperty(OVERRIDE_HEADER, constant(true)) + .process(exchange -> { + logger.info("A1 {}", exchange.getProperty(RESULT_FILE)); + logger.info("A2 {}", exchange.getProperty(LOCAL_FILE_PATH)); + logger.info("A3 {}", exchange.getProperty(OVERRIDE_HEADER)); + }).to("direct:update-result-file").to("direct:upload-file"); } // update Transactions status to failed @@ -217,4 +241,14 @@ private List updateTransactionStatusToFailed(List updateTransactionStatusToCompleted(List transactionList) { + List transactionResultList = new ArrayList<>(); + for (Transaction transaction : transactionList) { + TransactionResult transactionResult = Utils.mapToResultDTO(transaction); + transactionResult.setStatus("Completed"); + transactionResultList.add(transactionResult); + } + return transactionResultList; + } + } diff --git a/src/main/java/org/mifos/processor/bulk/camel/routes/MergeBackRoute.java b/src/main/java/org/mifos/processor/bulk/camel/routes/MergeBackRoute.java index db6d6456..98201116 100644 --- a/src/main/java/org/mifos/processor/bulk/camel/routes/MergeBackRoute.java +++ b/src/main/java/org/mifos/processor/bulk/camel/routes/MergeBackRoute.java @@ -86,6 +86,7 @@ public void configure() throws Exception { .log("Starting route direct:download-file-to-be-merged").log("Downloading files to be merged").process(exchange -> { List mergeList = exchange.getProperty(MERGE_FILE_LIST, List.class); exchange.setProperty(SERVER_FILE_NAME, mergeList.get(0)); + logger.info(exchange.getProperty(SERVER_FILE_NAME, String.class)); }).to("direct:download-file") // downloading first file .setProperty(FILE_1, exchangeProperty(LOCAL_FILE_PATH)).process(exchange -> { List mergeList = exchange.getProperty(MERGE_FILE_LIST, List.class); diff --git a/src/main/java/org/mifos/processor/bulk/camel/routes/ProcessorStartRoute.java b/src/main/java/org/mifos/processor/bulk/camel/routes/ProcessorStartRoute.java index df97d7aa..d77d5882 100644 --- a/src/main/java/org/mifos/processor/bulk/camel/routes/ProcessorStartRoute.java +++ b/src/main/java/org/mifos/processor/bulk/camel/routes/ProcessorStartRoute.java @@ -195,6 +195,7 @@ private void setup() { try { String tenantSpecificWorkflowId = workflowId.replace("{dfspid}", exchange.getProperty(TENANT_NAME).toString()); + logger.info("BPMN: {}", tenantSpecificWorkflowId); String txnId = zeebeProcessStarter.startZeebeWorkflow(tenantSpecificWorkflowId, "", variables); if (txnId == null || txnId.isEmpty()) { response.put("errorCode", 500); @@ -230,14 +231,16 @@ private void setup() { String requestId = exchange.getIn().getHeader("X-CorrelationID", String.class); String purpose = exchange.getIn().getHeader("Purpose", String.class); String type = exchange.getIn().getHeader("Type", String.class); + type = "csv"; exchange.setProperty(FILE_NAME, filename); exchange.setProperty(REQUEST_ID, requestId); exchange.setProperty(PURPOSE, purpose); exchange.setProperty(BATCH_REQUEST_TYPE, type); }).choice().when(exchange -> exchange.getProperty(BATCH_REQUEST_TYPE, String.class).equalsIgnoreCase("raw")) - .to("direct:start-batch-process-raw") - .when(exchange -> exchange.getProperty(BATCH_REQUEST_TYPE, String.class).equalsIgnoreCase("csv")).unmarshal() - .mimeMultipart("multipart/*").to("direct:start-batch-process-csv").otherwise() + .log("Processing raw batch request").to("direct:start-batch-process-raw") + .when(exchange -> exchange.getProperty(BATCH_REQUEST_TYPE, String.class).equalsIgnoreCase("csv")) + .log("Processing raw batch request").unmarshal().mimeMultipart("multipart/*").to("direct:start-batch-process-csv") + .otherwise().log("Unsupported batch request type: ${exchangeProperty.BATCH_REQUEST_TYPE}") .setBody(exchange -> getUnsupportedTypeJson(exchange.getProperty(BATCH_REQUEST_TYPE, String.class)).toString()) .log("Completed execution of route rest:POST:/batchtransactions"); diff --git a/src/main/java/org/mifos/processor/bulk/schema/BatchDTO.java b/src/main/java/org/mifos/processor/bulk/schema/BatchDTO.java index 26a25536..2d76f356 100644 --- a/src/main/java/org/mifos/processor/bulk/schema/BatchDTO.java +++ b/src/main/java/org/mifos/processor/bulk/schema/BatchDTO.java @@ -1,12 +1,13 @@ package org.mifos.processor.bulk.schema; import java.math.BigDecimal; +import java.util.List; public class BatchDTO { - private String batchId; + private String batch_id; - private String requestId; + private String request_id; private Long total; @@ -28,7 +29,7 @@ public class BatchDTO { private String notes; - private String createdAt; + private String created_at; private String status; @@ -36,13 +37,35 @@ public class BatchDTO { private String purpose; + private String failPercentage; + + private String successPercentage; + + private List subBatchesDetail; + + public String getFailPercentage() { + return failPercentage; + } + + public void setFailPercentage(String failPercentage) { + this.failPercentage = failPercentage; + } + + public String getSuccessPercentage() { + return successPercentage; + } + + public void setSuccessPercentage(String successPercentage) { + this.successPercentage = successPercentage; + } + public BatchDTO() {} public BatchDTO(String batchId, String requestId, Long totalTransactions, Long ongoing, Long failed, Long completed, BigDecimal total_amount, BigDecimal completed_amount, BigDecimal ongoing_amount, BigDecimal failed_amount, String result_file, - String note) { - this.batchId = batchId; - this.requestId = requestId; + String note, String failPercentage, String successPercentage, List subBatchesDetail) { + this.batch_id = batchId; + this.request_id = requestId; this.total = totalTransactions; this.ongoing = ongoing; this.failed = failed; @@ -53,13 +76,16 @@ public BatchDTO(String batchId, String requestId, Long totalTransactions, Long o this.failedAmount = failed_amount; this.file = result_file; this.notes = note; + this.failPercentage = failPercentage; + this.successPercentage = successPercentage; + this.subBatchesDetail = subBatchesDetail; } public BatchDTO(String batch_id, String request_id, Long total, Long ongoing, Long failed, Long successful, BigDecimal totalAmount, BigDecimal successfulAmount, BigDecimal pendingAmount, BigDecimal failedAmount, String file, String notes, String created_at, - String status, String modes, String purpose) { - this.batchId = batch_id; - this.requestId = request_id; + String status, String modes, String purpose, String failPercentage, String successPercentage) { + this.batch_id = batch_id; + this.request_id = request_id; this.total = total; this.ongoing = ongoing; this.failed = failed; @@ -70,26 +96,28 @@ public BatchDTO(String batch_id, String request_id, Long total, Long ongoing, Lo this.failedAmount = failedAmount; this.file = file; this.notes = notes; - this.createdAt = created_at; + this.created_at = created_at; this.status = status; this.modes = modes; this.purpose = purpose; + this.failPercentage = failPercentage; + this.successPercentage = successPercentage; } public String getBatch_id() { - return batchId; + return batch_id; } public void setBatch_id(String batch_id) { - this.batchId = batch_id; + this.batch_id = batch_id; } public String getRequest_id() { - return requestId; + return request_id; } public void setRequest_id(String request_id) { - this.requestId = request_id; + this.request_id = request_id; } public Long getTotal() { @@ -173,11 +201,11 @@ public void setNotes(String notes) { } public String getCreated_at() { - return createdAt; + return created_at; } public void setCreated_at(String created_at) { - this.createdAt = created_at; + this.created_at = created_at; } public String getStatus() { @@ -203,4 +231,12 @@ public String getPurpose() { public void setPurpose(String purpose) { this.purpose = purpose; } + + public List getSubBatchesDetail() { + return subBatchesDetail; + } + + public void setSubBatchesDetail(List subBatchesDetail) { + this.subBatchesDetail = subBatchesDetail; + } } diff --git a/src/main/java/org/mifos/processor/bulk/schema/SubBatchDetail.java b/src/main/java/org/mifos/processor/bulk/schema/SubBatchDetail.java new file mode 100644 index 00000000..c673ddc9 --- /dev/null +++ b/src/main/java/org/mifos/processor/bulk/schema/SubBatchDetail.java @@ -0,0 +1,26 @@ +package org.mifos.processor.bulk.schema; + +import java.util.Date; + +public class SubBatchDetail { + + private String subBatchId; + + private Date startedAt; + + public String getSubBatchId() { + return subBatchId; + } + + public void setSubBatchId(String subBatchId) { + this.subBatchId = subBatchId; + } + + public Date getStartedAt() { + return startedAt; + } + + public void setStartedAt(Date startedAt) { + this.startedAt = startedAt; + } +} diff --git a/src/main/java/org/mifos/processor/bulk/zeebe/worker/BatchStatusWorker.java b/src/main/java/org/mifos/processor/bulk/zeebe/worker/BatchStatusWorker.java index e2edc0c4..c737bff7 100644 --- a/src/main/java/org/mifos/processor/bulk/zeebe/worker/BatchStatusWorker.java +++ b/src/main/java/org/mifos/processor/bulk/zeebe/worker/BatchStatusWorker.java @@ -34,6 +34,7 @@ public void setup() { Boolean batchStatusFailed = exchange.getProperty(BATCH_STATUS_FAILED, Boolean.class); if (batchStatusFailed == null || !batchStatusFailed) { + logger.info("Completion rate: {}", exchange.getProperty(COMPLETION_RATE)); successRate = exchange.getProperty(COMPLETION_RATE, Integer.class); } else { variables.put(ERROR_CODE, exchange.getProperty(ERROR_CODE)); diff --git a/src/main/java/org/mifos/processor/bulk/zeebe/worker/InitSubBatchWorker.java b/src/main/java/org/mifos/processor/bulk/zeebe/worker/InitSubBatchWorker.java index 787a8596..3edbaac1 100644 --- a/src/main/java/org/mifos/processor/bulk/zeebe/worker/InitSubBatchWorker.java +++ b/src/main/java/org/mifos/processor/bulk/zeebe/worker/InitSubBatchWorker.java @@ -54,6 +54,10 @@ public void setup() { subBatches.add((String) variables.get(FILE_NAME)); } + for (String subBatch : subBatches) { + logger.info("Sub batch: {}", subBatch); + } + String fileName = subBatches.remove(0); Exchange exchange = new DefaultExchange(camelContext); diff --git a/src/main/java/org/mifos/processor/bulk/zeebe/worker/MergeBackWorker.java b/src/main/java/org/mifos/processor/bulk/zeebe/worker/MergeBackWorker.java index fd55824f..d5353efb 100644 --- a/src/main/java/org/mifos/processor/bulk/zeebe/worker/MergeBackWorker.java +++ b/src/main/java/org/mifos/processor/bulk/zeebe/worker/MergeBackWorker.java @@ -35,17 +35,24 @@ public void setup() { List successSubBatches = (List) variables.get(INIT_SUCCESS_SUB_BATCHES); List failureSubBatches = (List) variables.get(INIT_FAILURE_SUB_BATCHES); + logger.info("Sub batches: {}", subBatches); + logger.info("Success sub batches: {}", successSubBatches); + logger.info("Failed sub batches: {}", failureSubBatches); + for (int i = 0; i < successSubBatches.size(); i++) { + logger.info("Successful sub batches"); String initFile = successSubBatches.remove(i); successSubBatches.add(i, String.format("Result_%s", initFile)); } for (int i = 0; i < failureSubBatches.size(); i++) { + logger.info("Failed sub batches"); String initFile = failureSubBatches.remove(i); failureSubBatches.add(i, String.format("Result_%s", initFile)); } List mergeFileList = (List) variables.get(MERGE_FILE_LIST); if (mergeFileList == null) { + logger.info("merge file list empty"); mergeFileList = new ArrayList<>(); mergeFileList.addAll(successSubBatches); mergeFileList.addAll(failureSubBatches); diff --git a/src/main/java/org/mifos/processor/bulk/zeebe/worker/SplittingWorker.java b/src/main/java/org/mifos/processor/bulk/zeebe/worker/SplittingWorker.java index fb0d8c85..e6762a75 100644 --- a/src/main/java/org/mifos/processor/bulk/zeebe/worker/SplittingWorker.java +++ b/src/main/java/org/mifos/processor/bulk/zeebe/worker/SplittingWorker.java @@ -31,6 +31,8 @@ public void setup() { newWorker(Worker.SPLITTING, (client, job) -> { logger.debug("Job '{}' started from process '{}' with key {}", job.getType(), job.getBpmnProcessId(), job.getKey()); Map variables = job.getVariablesAsMap(); + + logger.info("Is splitting enabled: {}", workerConfig.isSplittingWorkerEnabled); if (workerConfig.isSplittingWorkerEnabled) { variables.put(SPLITTING_FAILED, false); } diff --git a/src/main/resources/application.yaml b/src/main/resources/application.yaml index ef56a688..205c9667 100644 --- a/src/main/resources/application.yaml +++ b/src/main/resources/application.yaml @@ -16,7 +16,7 @@ kafka: name: slcb application: - bucket-name: paymenthub-ee-dev + bucket-name: fynarfin-bucket zeebe: client: @@ -38,17 +38,18 @@ operations-app: batch-transaction: "/api/v1/batch/transactions" channel: - hostname: "http://ph-ee-connector-channel" + hostname: "https://channel.sandbox.fynarfin.io/" +# hostname: "http://ph-ee-connector-channel" cloud: aws: enabled: true - s3-base-url: "https://paymenthub-ee-dev.s3.us-east-2.amazonaws.com" + s3-base-url: "https://fynarfin-bucket.s3.ap-south-1.amazonaws.com" credentials: access-key: ${AWS_ACCESS_KEY:access_key_from_aws} secret-key: ${AWS_SECRET_KEY:secret_key_from_aws} region: - static: us-east-2 + static: ap-south-1 stack: auto: false azure: @@ -128,3 +129,19 @@ bulk-processor: csv: columnNames: "id,request_id,payment_mode,payer_identifier_type,payer_identifier,payee_identifier_type,payee_identifier,amount,currency,note" size : 100000 # in bytes + +payment-mode: + mappings: + - id: "GSMA" + type: "PAYMENT" + endpoint: "/channel/gsma/transfer" + - id: "MOJALOOP" + type: "PAYMENT" + endpoint: "/channel/transfer" + - id: "SLCB" + type: "BULK" + endpoint: "bulk_connector_{MODE}-{dfspid}" + - id: "CLOSEDLOOP" + type: "BULK" + endpoint: "bulk_connector_{MODE}-{dfspid}" + debulkingDfspid: "lion" \ No newline at end of file