Ack 메시지 수신 이후 update 수신하도록 로직 오류 정정

main
icksishu@gmail.com 5 days ago
parent b68f6395b9
commit 5858cbac03

@ -138,13 +138,11 @@ public class ListenerService {
AckDto ackDto;
try {
ackDto = MessageUtils.toAckDto(messageJsonString);
if((ackDto.getResult() == AckDto.ResultType.PROCESS_SUCCESS)) { // 상대 에이전트의 DropBox 수신 및 처리 후 처리결과가 정상일 때의 AckDto
postmanSchedulerService.makeCompleteSentMessage(ackDto.getMessageUuid());
postmanSchedulerService.makeCompleteSentMessage(ackDto); // 상대측 Dropbox 처리 결과에 따라서 보낸 메시지 결과 처리
if(agentConfigReader.isConnectedConsole()) {
MessageUtils.announceMessageAck(this.agentConfigReader, ackDto);
}
}
}
catch (NullMessageException | IllegalMessageException | JsonProcessingException e) {
ackDto = AckDto.builder().result(AckDto.ResultType.RECEIVE_FAIL).resultText(e.getLocalizedMessage()).messageUuid("").build();
}

@ -144,7 +144,13 @@ public class DropBoxSchedulerService {
}
private void ackDropBoxProcessResult(ReceiveMessageDto receiveMessageDto, String processMessage) {
AckDto.ResultType resultType = EnumUtils.getEnum(AckDto.ResultType.class, receiveMessageDto.getProcessStatus().toString());
AckDto.ResultType resultType = null;
if(receiveMessageDto.getProcessStatus() == ReceiveMessageDto.ProcessStatus.PROCESS_DONE) {
resultType = AckDto.ResultType.PROCESS_SUCCESS;
}
else if(receiveMessageDto.getProcessStatus() == ReceiveMessageDto.ProcessStatus.PROCESS_FAIL || receiveMessageDto.getProcessStatus() == ReceiveMessageDto.ProcessStatus.PROCESS_NOT_POSSIBLE) {
resultType = AckDto.ResultType.PROCESS_FAIL;
}
AckDto ackDto = AckDto.builder().result(resultType).messageUuid(receiveMessageDto.getMessageUuid()).resultText(processMessage).build();
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.setContentType(MediaType.APPLICATION_JSON);

@ -102,18 +102,23 @@ public class PostmanSchedulerService {
/**
* AckDto postProcessingSqlId . sent .complete
* @param messageUuid : AckDto messageUuid. sentMessageFileMap
* @param ackDto : AckDto messageUuid sentMessageFileMap
*/
@SuppressWarnings("unchecked")
public void makeCompleteSentMessage(String messageUuid) {
if(this.sentMessageFileMap.containsKey(messageUuid)) {
File sentMessageFile = new File(this.sentMessageFileMap.get(messageUuid));
public void makeCompleteSentMessage(AckDto ackDto) {
if(this.sentMessageFileMap.containsKey(ackDto.getMessageUuid())) {
File sentMessageFile = new File(this.sentMessageFileMap.get(ackDto.getMessageUuid()));
if(sentMessageFile.exists()) {
try (ObjectInputStream objectInputStream = new ObjectInputStream(new FileInputStream(sentMessageFile))) {
ReceiveMessageDto receiveMessageDto = (ReceiveMessageDto)objectInputStream.readObject();
ObjectMapper objectMapper = new ObjectMapper();
List<Map<String, Object>> dataMapList;
dataMapList = (List<Map<String, Object>>) objectMapper.readValue(receiveMessageDto.getData(), List.class);
Map<String, Object> addParamMap = new HashMap<>();
addParamMap.put("messageUuid", receiveMessageDto.getMessageUuid());
addParamMap.put("processStatus", ackDto.getResult().toString());
addParamMap.put("resultText", ackDto.getResultText());
dataMapList.forEach(map -> map.putAll(addParamMap));
AgentConfigDto.Postman postman = this.postmanMap.get(receiveMessageDto.getSenderPostmanId());
sqlExecuteService.update(postman.getMessage().getDataSourceId(), postman.getMessage().getPostProcessingSqlId(), dataMapList);
} catch (FileNotFoundException e) {
@ -129,7 +134,7 @@ public class PostmanSchedulerService {
File completeMessageFile = new File(sentMessageFile.getAbsolutePath() + ".complete");
sentMessageFile.renameTo(completeMessageFile);
}
this.sentMessageFileMap.remove(messageUuid);
this.sentMessageFileMap.remove(ackDto.getMessageUuid());
}
}
@ -182,19 +187,31 @@ public class PostmanSchedulerService {
log.debug("postman to {} send a message UUID {} (data count: {})", receiveMessageDto.getRecipientHostId(), receiveMessageDto.getMessageUuid(), dataMapList.size());
AckDto ackDto = MessageUtils.send(this.agentConfigReader, postman.getPostmanId(), receiveMessageDto);
log.debug("postman received ack from {} ack: {}", receiveMessageDto.getRecipientHostId(), ackDto);
if(ackDto.getResult() == AckDto.ResultType.RECEIVE_SUCCESS) {
if(ackDto.getResult() == AckDto.ResultType.RECEIVE_SUCCESS) { // 전송 완료 및 Ack 수신 후 전송한 dataMapList 에 messageUuid, processStatus, resultText를 추가하여 postProcessingSqlId 실행
receiveMessageDto.setProcessStatus(ReceiveMessageDto.ProcessStatus.PROCESS_RECEIVED);
receiveMessageDto.setReceivedTimestamp(System.currentTimeMillis());
String postProcessingSqlId = postman.getMessage().getPostProcessingSqlId();
sqlExecuteService.update(dataSourceId, postProcessingSqlId, new HashMap<String, Object>());
Map<String, Object> addParamMap = new HashMap<>();
addParamMap.put("messageUuid", receiveMessageDto.getMessageUuid());
addParamMap.put("processStatus", receiveMessageDto.getProcessStatus().toString());
addParamMap.put("resultText", ackDto.getResultText());
dataMapList.forEach(map -> map.putAll(addParamMap));
sqlExecuteService.update(dataSourceId, postProcessingSqlId, dataMapList);
}
else if(ackDto.getResult() == AckDto.ResultType.TRANSFER_SUCCESS) {
receiveMessageDto.setReceivedTimestamp(System.currentTimeMillis());
receiveMessageDto.setProcessStatus(ReceiveMessageDto.ProcessStatus.PROCESS_SEND);
}
else if(ackDto.getResult() == AckDto.ResultType.RECEIVE_FAIL || ackDto.getResult() == AckDto.ResultType.TRANSFER_FAIL) {
else if(ackDto.getResult() == AckDto.ResultType.RECEIVE_FAIL || ackDto.getResult() == AckDto.ResultType.TRANSFER_FAIL) { // 전송 완료 및 Nak 수신 후 전송한 dataMapList 에 messageUuid, processStatus, resultText를 추가하여 postProcessingSqlId 실행
log.warn("대상 agent {}[{}]에게 전송하였으나 상대방이 수신하지 못하였습니다. 응답: {} 응답메시지: {}", knownAgent.getHostId(), knownAgent.getHostName(), ackDto.getResult(), ackDto.getResultText());
receiveMessageDto.setProcessStatus(ReceiveMessageDto.ProcessStatus.PROCESS_FAIL);
String postProcessingSqlId = postman.getMessage().getPostProcessingSqlId();
Map<String, Object> addParamMap = new HashMap<>();
addParamMap.put("messageUuid", receiveMessageDto.getMessageUuid());
addParamMap.put("processStatus", receiveMessageDto.getProcessStatus().toString());
addParamMap.put("resultText", ackDto.getResultText());
dataMapList.forEach(map -> map.putAll(addParamMap));
sqlExecuteService.update(dataSourceId, postProcessingSqlId, dataMapList);
}
if(agentConfigReader.isConnectedConsole()) {
MessageUtils.announceMessageHistory(this.agentConfigReader, receiveMessageDto);

Loading…
Cancel
Save