From 7426b84980f791cf2f0e5bf7bafb0ce2312293fd Mon Sep 17 00:00:00 2001 From: "semin.baek" Date: Thu, 15 May 2025 14:56:04 +0900 Subject: [PATCH] =?UTF-8?q?DropBox=20Config=20=EB=B3=80=EA=B2=BD=20(?= =?UTF-8?q?=EC=B2=98=EB=A6=AC=20=EC=99=84=EB=A3=8C=20=EB=A9=94=EC=8B=9C?= =?UTF-8?q?=EC=A7=80=20=ED=8C=8C=EC=9D=BC=20=EC=82=AD=EC=A0=9C=20=EA=B4=80?= =?UTF-8?q?=EB=A0=A8)=20=EC=B2=98=EB=A6=AC=20=EC=99=84=EB=A3=8C=20?= =?UTF-8?q?=ED=8C=8C=EC=9D=BC=20=EC=82=AD=EC=A0=9C=20Scheduler=20=EA=B0=9C?= =?UTF-8?q?=EB=B0=9C=20-=20bsm-lab/dfxagent#11?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dfxagent-bd-test-cubrid-local.json | 4 +- .../dfxagent-bd-test-cubrid.json | 4 +- .../dfxagent-bd-test-oracle-local.json | 4 +- .../dfxagent-bd-test-oracle.json | 4 +- .../dfx/agent/config/AgentConfigDto.java | 2 +- .../dfx/agent/config/AgentConfigReader.java | 2 +- .../agent/config/DfxAgentConfiguration.java | 12 +- .../dfx/agent/config/StartupRunner.java | 5 + .../agent/task/dropbox/DropBoxService.java | 14 +- .../task/postman/PostmanSchedulerService.java | 228 +++++++++--------- .../status/FileCleanerSchedulerService.java | 66 +++++ 11 files changed, 218 insertions(+), 127 deletions(-) create mode 100644 src/main/java/com/bsmlab/dfx/agent/task/status/FileCleanerSchedulerService.java diff --git a/src/docs/settings-examples/dfxagent-bd-test-cubrid-local.json b/src/docs/settings-examples/dfxagent-bd-test-cubrid-local.json index 8343efa..3b0ba62 100644 --- a/src/docs/settings-examples/dfxagent-bd-test-cubrid-local.json +++ b/src/docs/settings-examples/dfxagent-bd-test-cubrid-local.json @@ -11,7 +11,7 @@ } ], "statusChecker": { - "cron": "2 0/1 * * * *" + "cron": "0/5 * * * * *" }, "dataSourceConfig": [ { @@ -41,7 +41,7 @@ } ] }, - "dropBox": { + "dropBoxConfig": { "receivedMessageStorageRoot": "D:/projects/bsm-lab/dfx/dfxagent/src/docs/messages/received", "processedMessageStorageRoot": "D:/projects/bsm-lab/dfx/dfxagent/src/docs/messages/processed", "failureMessageStorageRoot": "D:/projects/bsm-lab/dfx/dfxagent/src/docs/messages/failure", diff --git a/src/docs/settings-examples/dfxagent-bd-test-cubrid.json b/src/docs/settings-examples/dfxagent-bd-test-cubrid.json index 22e040d..1edde28 100644 --- a/src/docs/settings-examples/dfxagent-bd-test-cubrid.json +++ b/src/docs/settings-examples/dfxagent-bd-test-cubrid.json @@ -11,7 +11,7 @@ } ], "statusChecker": { - "cron": "2 0/1 * * * *" + "cron": "0/5 * * * * *" }, "dataSourceConfig": [ { @@ -41,7 +41,7 @@ } ] }, - "dropBox": { + "dropBoxConfig": { "receivedMessageStorageRoot": "/home/dfxagent/agent/messages/received", "processedMessageStorageRoot": "/home/dfxagent/agent/messages/processed", "failureMessageStorageRoot": "/home/dfxagent/agent/messages/failure", diff --git a/src/docs/settings-examples/dfxagent-bd-test-oracle-local.json b/src/docs/settings-examples/dfxagent-bd-test-oracle-local.json index 64176bd..bb7debf 100644 --- a/src/docs/settings-examples/dfxagent-bd-test-oracle-local.json +++ b/src/docs/settings-examples/dfxagent-bd-test-oracle-local.json @@ -13,7 +13,7 @@ } ], "statusChecker": { - "cron": "2 0/1 * * * *" + "cron": "0/5 * * * * *" }, "dataSourceConfig": [ { @@ -43,7 +43,7 @@ } ] }, - "dropBox": { + "dropBoxConfig": { "receivedMessageStorageRoot": "D:/projects/bsm-lab/dfx/dfxagent/src/docs/messages/received", "processedMessageStorageRoot": "D:/projects/bsm-lab/dfx/dfxagent/src/docs/messages/processed", "failureMessageStorageRoot": "D:/projects/bsm-lab/dfx/dfxagent/src/docs/messages/failure", diff --git a/src/docs/settings-examples/dfxagent-bd-test-oracle.json b/src/docs/settings-examples/dfxagent-bd-test-oracle.json index 345294b..e44c63d 100644 --- a/src/docs/settings-examples/dfxagent-bd-test-oracle.json +++ b/src/docs/settings-examples/dfxagent-bd-test-oracle.json @@ -11,7 +11,7 @@ } ], "statusChecker": { - "cron": "2 0/1 * * * *" + "cron": "0/5 * * * * *" }, "dataSourceConfig": [ { @@ -41,7 +41,7 @@ } ] }, - "dropBox": { + "dropBoxConfig": { "receivedMessageStorageRoot": "/home/dfxagent/agent/messages/received", "processedMessageStorageRoot": "/home/dfxagent/agent/messages/processed", "failureMessageStorageRoot": "/home/dfxagent/agent/messages/failure", diff --git a/src/main/java/com/bsmlab/dfx/agent/config/AgentConfigDto.java b/src/main/java/com/bsmlab/dfx/agent/config/AgentConfigDto.java index 8be60cc..859afa1 100644 --- a/src/main/java/com/bsmlab/dfx/agent/config/AgentConfigDto.java +++ b/src/main/java/com/bsmlab/dfx/agent/config/AgentConfigDto.java @@ -14,7 +14,7 @@ public class AgentConfigDto { private StatusChecker statusChecker; private List dataSourceConfig; private List sqlMapperLocations; - private DropBoxConfig dropBox; + private DropBoxConfig dropBoxConfig; private PostmanConfig postmanConfig; private LoggingConfig logging; diff --git a/src/main/java/com/bsmlab/dfx/agent/config/AgentConfigReader.java b/src/main/java/com/bsmlab/dfx/agent/config/AgentConfigReader.java index 16d8c0d..55e7540 100644 --- a/src/main/java/com/bsmlab/dfx/agent/config/AgentConfigReader.java +++ b/src/main/java/com/bsmlab/dfx/agent/config/AgentConfigReader.java @@ -50,7 +50,7 @@ public class AgentConfigReader { public AgentConfigDto.DropBox getDropBox(String dropBoxId) { AgentConfigDto.DropBox found = null; - for(AgentConfigDto.DropBox dropBox : this.agentConfigDto.getDropBox().getDropBoxList()) { + for(AgentConfigDto.DropBox dropBox : this.agentConfigDto.getDropBoxConfig().getDropBoxList()) { if(dropBox.getDropBoxId().equals(dropBoxId)) { found = dropBox; break; diff --git a/src/main/java/com/bsmlab/dfx/agent/config/DfxAgentConfiguration.java b/src/main/java/com/bsmlab/dfx/agent/config/DfxAgentConfiguration.java index 06fd756..182f38f 100644 --- a/src/main/java/com/bsmlab/dfx/agent/config/DfxAgentConfiguration.java +++ b/src/main/java/com/bsmlab/dfx/agent/config/DfxAgentConfiguration.java @@ -183,7 +183,7 @@ public class DfxAgentConfiguration { @Bean(name = "dropBoxProcessorThreadPoolTaskScheduler") public ThreadPoolTaskScheduler dropBoxProcessorThreadPoolTaskScheduler(AgentConfigReader agentConfigReader) { ThreadPoolTaskScheduler dropBoxProcessorThreadPoolTaskScheduler = new ThreadPoolTaskScheduler(); - dropBoxProcessorThreadPoolTaskScheduler.setPoolSize(agentConfigReader.getAgentConfigDto().getDropBox().getThreadPoolSize()); + dropBoxProcessorThreadPoolTaskScheduler.setPoolSize(agentConfigReader.getAgentConfigDto().getDropBoxConfig().getThreadPoolSize()); dropBoxProcessorThreadPoolTaskScheduler.setThreadNamePrefix("dropBoxProcessor-"); dropBoxProcessorThreadPoolTaskScheduler.setWaitForTasksToCompleteOnShutdown(true); // 종료 시 대기 여부 dropBoxProcessorThreadPoolTaskScheduler.initialize(); @@ -209,4 +209,14 @@ public class DfxAgentConfiguration { statusCheckerThreadPoolTaskScheduler.initialize(); return statusCheckerThreadPoolTaskScheduler; } + + // 파일 정리 쓰레드 설정 + @Bean(name = "fileCleanerThreadPoolTaskScheduler") + public ThreadPoolTaskScheduler fileCleanerThreadPoolTaskScheduler() { + ThreadPoolTaskScheduler fileCleanerThreadPoolTaskScheduler = new ThreadPoolTaskScheduler(); + fileCleanerThreadPoolTaskScheduler.setPoolSize(1); + fileCleanerThreadPoolTaskScheduler.setThreadNamePrefix("file-cleaner-"); + fileCleanerThreadPoolTaskScheduler.initialize(); + return fileCleanerThreadPoolTaskScheduler; + } } diff --git a/src/main/java/com/bsmlab/dfx/agent/config/StartupRunner.java b/src/main/java/com/bsmlab/dfx/agent/config/StartupRunner.java index 2530b8b..a1f766f 100644 --- a/src/main/java/com/bsmlab/dfx/agent/config/StartupRunner.java +++ b/src/main/java/com/bsmlab/dfx/agent/config/StartupRunner.java @@ -4,6 +4,7 @@ import com.bsmlab.dfx.agent.config.datasource.DynamicDataSourceService; import com.bsmlab.dfx.agent.task.dropbox.DropBoxService; import com.bsmlab.dfx.agent.task.dropbox.DropBoxSchedulerService; import com.bsmlab.dfx.agent.task.postman.PostmanSchedulerService; +import com.bsmlab.dfx.agent.task.status.FileCleanerSchedulerService; import com.bsmlab.dfx.agent.task.status.StatusCheckerSchedulerService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -20,6 +21,7 @@ public class StartupRunner implements ApplicationRunner { private final PostmanSchedulerService postmanSchedulerService; private final DropBoxSchedulerService dropBoxSchedulerService; private final StatusCheckerSchedulerService statusCheckerSchedulerService; + private final FileCleanerSchedulerService fileCleanerSchedulerService; private final DropBoxService dropBoxService; @Override @@ -38,6 +40,9 @@ public class StartupRunner implements ApplicationRunner { // 다른 agent 상태 확인 서비스 log.info("✅ StatusCheckerSchedulerService 기동"); statusCheckerSchedulerService.launch(); + // 처리 완료 파일 정리 서비스 + fileCleanerSchedulerService.launch(); + log.info("✅ FileCleanerSchedulerService 기동"); // receivedMessageStorageRoot 하위에 처리되지 않은 메시지를 처리 queue에 넣는다. int messageCount = dropBoxService.addNotProcessedMessageFile(); log.info("✅ 미처리 메시지 체크 - {} 건 처리 등록", messageCount); diff --git a/src/main/java/com/bsmlab/dfx/agent/task/dropbox/DropBoxService.java b/src/main/java/com/bsmlab/dfx/agent/task/dropbox/DropBoxService.java index e8501b8..76ffeae 100644 --- a/src/main/java/com/bsmlab/dfx/agent/task/dropbox/DropBoxService.java +++ b/src/main/java/com/bsmlab/dfx/agent/task/dropbox/DropBoxService.java @@ -36,7 +36,7 @@ public class DropBoxService { */ public boolean isExistToday(ReceiveMessageDto receiveMessageDto) { boolean isExist = false; - File root = new File(agentConfigReader.getAgentConfigDto().getDropBox().getReceivedMessageStorageRoot()); + File root = new File(agentConfigReader.getAgentConfigDto().getDropBoxConfig().getReceivedMessageStorageRoot()); Date today = new Date(System.currentTimeMillis()); SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd/"); String todayDirectoryString = root.getAbsolutePath() + "/" + dateFormat.format(today); @@ -56,9 +56,9 @@ public class DropBoxService { * @param receiveMessageDto 수신한 메지시의 인스턴스 */ public void add(ReceiveMessageDto receiveMessageDto) { - File root = new File(agentConfigReader.getAgentConfigDto().getDropBox().getReceivedMessageStorageRoot()); + File root = new File(agentConfigReader.getAgentConfigDto().getDropBoxConfig().getReceivedMessageStorageRoot()); Date today = new Date(System.currentTimeMillis()); - SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd/hh"); + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd/HH"); String targetDirectoryString = root.getAbsolutePath() + "/" + dateFormat.format(today); File targetDirectory = new File(targetDirectoryString); if(!targetDirectory.exists()) { @@ -81,7 +81,7 @@ public class DropBoxService { * 기동 시 호출함. 수신하였으나 처리하지 않은 메시지 파일을 찾아서 처리 queue 에 넣는다. */ public int addNotProcessedMessageFile() { - File root = new File(agentConfigReader.getAgentConfigDto().getDropBox().getReceivedMessageStorageRoot()); + File root = new File(agentConfigReader.getAgentConfigDto().getDropBoxConfig().getReceivedMessageStorageRoot()); List fileList = new ArrayList<>(); this.findAndAddMessageFile(root, fileList); for(File file : fileList) { @@ -119,13 +119,13 @@ public class DropBoxService { public void moveMessageFile(String messageFilePath, boolean isSuccess) throws IOException { File root = null; if(isSuccess) { - root = new File(agentConfigReader.getAgentConfigDto().getDropBox().getProcessedMessageStorageRoot()); + root = new File(agentConfigReader.getAgentConfigDto().getDropBoxConfig().getProcessedMessageStorageRoot()); } else { - root = new File(agentConfigReader.getAgentConfigDto().getDropBox().getFailureMessageStorageRoot()); + root = new File(agentConfigReader.getAgentConfigDto().getDropBoxConfig().getFailureMessageStorageRoot()); } Date today = new Date(System.currentTimeMillis()); - SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd/hh"); + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd/HH"); String targetDirectoryString = root.getAbsolutePath() + "/" + dateFormat.format(today); File targetDirectory = new File(targetDirectoryString); if(!targetDirectory.exists()) { diff --git a/src/main/java/com/bsmlab/dfx/agent/task/postman/PostmanSchedulerService.java b/src/main/java/com/bsmlab/dfx/agent/task/postman/PostmanSchedulerService.java index 56e042f..77f81b5 100644 --- a/src/main/java/com/bsmlab/dfx/agent/task/postman/PostmanSchedulerService.java +++ b/src/main/java/com/bsmlab/dfx/agent/task/postman/PostmanSchedulerService.java @@ -21,6 +21,7 @@ import org.springframework.scheduling.support.CronTrigger; import org.springframework.stereotype.Service; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; +import org.springframework.web.client.ResourceAccessException; import org.springframework.web.client.RestTemplate; import java.io.File; @@ -69,132 +70,141 @@ public class PostmanSchedulerService { } public void run(AgentConfigDto.Postman postman) { - ObjectMapper objectMapper = new ObjectMapper(); - String senderHostId = agentConfigReader.getAgentConfigDto().getMyHostId(); - long senderTimestamp = System.currentTimeMillis(); - String messageUuid = UUID.randomUUID().toString(); - // DB TO DB 전송 - if(AgentConfigDto.TaskType.DB_READ_THEN_SEND == postman.getTaskType()) { - String dataSourceId = postman.getMessage().getDataSourceId(); - String sqlId = postman.getMessage().getSqlId(); - try { - List> dataMapList = sqlExecuteService.select(dataSourceId, sqlId, null); - String dataString = objectMapper.writeValueAsString(dataMapList); - ReceiveMessageDto receiveMessageDto = ReceiveMessageDto.builder().senderHostId(senderHostId).senderTimestamp(senderTimestamp) - .messageUuid(messageUuid).messageType(AgentConfigDto.MessageType.TRANSFER_DB_TO_DB) - .recipientHostId(postman.getRecipientHostId()).recipientDropBoxId(postman.getRecipientDropBoxId()) - .data(dataString).build(); - HttpHeaders httpHeaders = new HttpHeaders(); - httpHeaders.setContentType(MediaType.APPLICATION_JSON); - HttpEntity bodyEntity = new HttpEntity<>(receiveMessageDto, httpHeaders); - RestTemplate restTemplate = new RestTemplate(); - AgentConfigDto.KnownAgent knownAgent = agentConfigReader.getKnownAgent(postman.getRecipientHostId()); - String url = "http://" + knownAgent.getHostName() + ":" + knownAgent.getListenPort() + "/listen"; - log.debug("postman to {} send a message UUID {}", receiveMessageDto.getRecipientHostId(), receiveMessageDto.getMessageUuid()); - String response = restTemplate.postForObject(url, bodyEntity, String.class); - AckDto ackDto = objectMapper.readValue(response, new TypeReference() {}); - log.debug("postman received ack from {} ack: {}", receiveMessageDto.getRecipientHostId(), ackDto); - if(AckDto.ResultType.RECEIVE_SUCCESS != ackDto.getResult()) { - throw new DfxException(postman.getRecipientHostId() + "에게 전송하였으나 상대방이 수신하지 못하였습니다." + response); - } - else { - String postProcessingSqlId = postman.getMessage().getPostProcessingSqlId(); - sqlExecuteService.update(dataSourceId, postProcessingSqlId, new HashMap()); - } - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } catch (DfxException e) { - throw new RuntimeException(e); - } - } - else if(AgentConfigDto.TaskType.FILE_READ_THEN_SEND == postman.getTaskType()) { - File rootDirectoryfile = new File(postman.getMessage().getWatchDirectory()); - SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy/MM/dd/hh/mm/ss"); - Date today = new Date(System.currentTimeMillis()); - String dateString = simpleDateFormat.format(today); - List fileReadyList = new ArrayList<>(); - List workingFileList = new ArrayList<>(); - if(rootDirectoryfile.isDirectory() && rootDirectoryfile.listFiles().length > 0) { - // 대상 파일 찾기 -> 작업 디렉토리(working)로 이동 -> 작업 완료 후 완료 디렉토리(done)로 이동 - File[] files = rootDirectoryfile.listFiles(); - for(File file : files) { - if(file.isFile()) { - fileReadyList.add(file); - } - } - List> data = new ArrayList<>(); - String dataString = null; - String workingDirectory = rootDirectoryfile.getAbsolutePath() + "/working/" + dateString; - File workingDirectoryFile = new File(workingDirectory); - if(!workingDirectoryFile.exists()) { - workingDirectoryFile.mkdirs(); - } + AgentConfigDto.KnownAgent knownAgent = agentConfigReader.getKnownAgent(postman.getRecipientHostId()); + if("ALIVE".equals(agentConfigReader.getKnownAgentStatus(knownAgent.getHostId()))) { + ObjectMapper objectMapper = new ObjectMapper(); + String senderHostId = agentConfigReader.getAgentConfigDto().getMyHostId(); + long senderTimestamp = System.currentTimeMillis(); + String messageUuid = UUID.randomUUID().toString(); + // DB TO DB 전송 + if(AgentConfigDto.TaskType.DB_READ_THEN_SEND == postman.getTaskType()) { + String dataSourceId = postman.getMessage().getDataSourceId(); + String sqlId = postman.getMessage().getSqlId(); + String response = null; try { - // 작업 디렉토리(working)로 이동 - for(File file : fileReadyList) { - Files.move(file.toPath(), workingDirectoryFile.toPath(), StandardCopyOption.REPLACE_EXISTING); - workingFileList.add(new File(workingDirectoryFile.getAbsoluteFile() + "/" + file.getName())); - } - // ReceiveMessageDto - messageJson 만들기 - for(File file : workingFileList) { - Map map = new HashMap<>(); - Map paramMap = new HashMap<>(); - map.put("file-name", file.getName()); - if(postman.getMessage() != null && StringUtils.isNotEmpty(postman.getMessage().getMetaDropBoxId()) - && StringUtils.isNotEmpty(postman.getMessage().getMetaDataSqlId()) && StringUtils.isNotEmpty(postman.getMessage().getMetaDataDataSourceId())) { - map.put("meta-drop-box-id", postman.getMessage().getMetaDropBoxId()); - paramMap.put("fileName", file.getName()); - List> messageDataMapList = sqlExecuteService.select(postman.getMessage().getDataSourceId(), postman.getMessage().getMetaDataSqlId(), paramMap); - if(messageDataMapList != null && messageDataMapList.get(0) != null) { - map.put("meta-data", messageDataMapList.get(0)); - } - } - data.add(map); - } - dataString = objectMapper.writeValueAsString(data); + List> dataMapList = sqlExecuteService.select(dataSourceId, sqlId, null); + String dataString = objectMapper.writeValueAsString(dataMapList); ReceiveMessageDto receiveMessageDto = ReceiveMessageDto.builder().senderHostId(senderHostId).senderTimestamp(senderTimestamp) .messageUuid(messageUuid).messageType(AgentConfigDto.MessageType.TRANSFER_DB_TO_DB) .recipientHostId(postman.getRecipientHostId()).recipientDropBoxId(postman.getRecipientDropBoxId()) .data(dataString).build(); - String messageString = objectMapper.writeValueAsString(receiveMessageDto); - // http 준비 HttpHeaders httpHeaders = new HttpHeaders(); - httpHeaders.setContentType(MediaType.MULTIPART_FORM_DATA); - MultiValueMap body = new LinkedMultiValueMap<>(); - // 첫 번째 멀티파트는 message json - body.add("json", new HttpEntity<>(messageString, httpHeaders)); - // 두 번째 이후 멀티파트는 파일 - for(File file : workingFileList) { - body.add(file.getName(), new FileSystemResource(file)); - } - // 전송 + httpHeaders.setContentType(MediaType.APPLICATION_JSON); + HttpEntity bodyEntity = new HttpEntity<>(receiveMessageDto, httpHeaders); RestTemplate restTemplate = new RestTemplate(); - AgentConfigDto.KnownAgent knownAgent = agentConfigReader.getKnownAgent(postman.getRecipientHostId()); - String url = "https://" + knownAgent.getHostName() + ":" + knownAgent.getListenPort() + "/listen"; - String response = restTemplate.postForObject(url, body, String.class); + String url = "http://" + knownAgent.getHostName() + ":" + knownAgent.getListenPort() + "/listen"; + log.debug("postman to {} send a message UUID {}", receiveMessageDto.getRecipientHostId(), receiveMessageDto.getMessageUuid()); + response = restTemplate.postForObject(url, bodyEntity, String.class); AckDto ackDto = objectMapper.readValue(response, new TypeReference() {}); + log.debug("postman received ack from {} ack: {}", receiveMessageDto.getRecipientHostId(), ackDto); if(AckDto.ResultType.RECEIVE_SUCCESS != ackDto.getResult()) { - throw new DfxException(postman.getRecipientHostId() + "에게 전송하였으나 상대방이 수신하지 못하였습니다." + response); + log.warn("대상 agent {}[{}]에게 전송하였으나 상대방이 수신하지 못하였습니다. 응답: {} 응답메시지: {}" + , knownAgent.getHostId(), knownAgent.getHostName(), ackDto.getResult(), ackDto.getResultText()); } else { - // 작업 완료(done) 디렉토리로 이동 - String doneDirectory = rootDirectoryfile + "/done/" + dateString; - File doneDirectoryFile = new File(doneDirectory); - if(!doneDirectoryFile.exists()) { - doneDirectoryFile.mkdirs(); + String postProcessingSqlId = postman.getMessage().getPostProcessingSqlId(); + sqlExecuteService.update(dataSourceId, postProcessingSqlId, new HashMap()); + } + } + catch (JsonProcessingException e) { + log.warn("대상 agent {}[{}]에게 전송하였으나 응답메시지가 비정상입니다. 응답메시지: {}", knownAgent.getHostId(), knownAgent.getHostName(), response, e); + } + catch (ResourceAccessException e) { + log.warn("대상 agent {}[{}]에게 전송하였으나 응답하지 않습니다. exception: {}", knownAgent.getHostId(), knownAgent.getHostName(), e.getLocalizedMessage()); + agentConfigReader.setKnownAgentStatus(knownAgent.getHostId(), "DOWN"); + } + } + else if(AgentConfigDto.TaskType.FILE_READ_THEN_SEND == postman.getTaskType()) { + File rootDirectoryfile = new File(postman.getMessage().getWatchDirectory()); + SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy/MM/dd/hh/mm/ss"); + Date today = new Date(System.currentTimeMillis()); + String dateString = simpleDateFormat.format(today); + List fileReadyList = new ArrayList<>(); + List workingFileList = new ArrayList<>(); + if(rootDirectoryfile.isDirectory() && rootDirectoryfile.listFiles().length > 0) { + // 대상 파일 찾기 -> 작업 디렉토리(working)로 이동 -> 작업 완료 후 완료 디렉토리(done)로 이동 + File[] files = rootDirectoryfile.listFiles(); + for(File file : files) { + if(file.isFile()) { + fileReadyList.add(file); } - for(File file : workingFileList) { + } + List> data = new ArrayList<>(); + String dataString = null; + String workingDirectory = rootDirectoryfile.getAbsolutePath() + "/working/" + dateString; + File workingDirectoryFile = new File(workingDirectory); + if(!workingDirectoryFile.exists()) { + workingDirectoryFile.mkdirs(); + } + try { + // 작업 디렉토리(working)로 이동 + for(File file : fileReadyList) { Files.move(file.toPath(), workingDirectoryFile.toPath(), StandardCopyOption.REPLACE_EXISTING); + workingFileList.add(new File(workingDirectoryFile.getAbsoluteFile() + "/" + file.getName())); + } + // ReceiveMessageDto - messageJson 만들기 + for(File file : workingFileList) { + Map map = new HashMap<>(); + Map paramMap = new HashMap<>(); + map.put("file-name", file.getName()); + if(postman.getMessage() != null && StringUtils.isNotEmpty(postman.getMessage().getMetaDropBoxId()) + && StringUtils.isNotEmpty(postman.getMessage().getMetaDataSqlId()) && StringUtils.isNotEmpty(postman.getMessage().getMetaDataDataSourceId())) { + map.put("meta-drop-box-id", postman.getMessage().getMetaDropBoxId()); + paramMap.put("fileName", file.getName()); + List> messageDataMapList = sqlExecuteService.select(postman.getMessage().getDataSourceId(), postman.getMessage().getMetaDataSqlId(), paramMap); + if(messageDataMapList != null && messageDataMapList.get(0) != null) { + map.put("meta-data", messageDataMapList.get(0)); + } + } + data.add(map); } + dataString = objectMapper.writeValueAsString(data); + ReceiveMessageDto receiveMessageDto = ReceiveMessageDto.builder().senderHostId(senderHostId).senderTimestamp(senderTimestamp) + .messageUuid(messageUuid).messageType(AgentConfigDto.MessageType.TRANSFER_DB_TO_DB) + .recipientHostId(postman.getRecipientHostId()).recipientDropBoxId(postman.getRecipientDropBoxId()) + .data(dataString).build(); + String messageString = objectMapper.writeValueAsString(receiveMessageDto); + // http 준비 + HttpHeaders httpHeaders = new HttpHeaders(); + httpHeaders.setContentType(MediaType.MULTIPART_FORM_DATA); + MultiValueMap body = new LinkedMultiValueMap<>(); + // 첫 번째 멀티파트는 message json + body.add("json", new HttpEntity<>(messageString, httpHeaders)); + // 두 번째 이후 멀티파트는 파일 + for(File file : workingFileList) { + body.add(file.getName(), new FileSystemResource(file)); + } + // 전송 + RestTemplate restTemplate = new RestTemplate(); + String url = "https://" + knownAgent.getHostName() + ":" + knownAgent.getListenPort() + "/listen"; + String response = restTemplate.postForObject(url, body, String.class); + AckDto ackDto = objectMapper.readValue(response, new TypeReference() {}); + if(AckDto.ResultType.RECEIVE_SUCCESS != ackDto.getResult()) { + throw new DfxException(postman.getRecipientHostId() + "에게 전송하였으나 상대방이 수신하지 못하였습니다." + response); + } + else { + // 작업 완료(done) 디렉토리로 이동 + String doneDirectory = rootDirectoryfile + "/done/" + dateString; + File doneDirectoryFile = new File(doneDirectory); + if(!doneDirectoryFile.exists()) { + doneDirectoryFile.mkdirs(); + } + for(File file : workingFileList) { + Files.move(file.toPath(), workingDirectoryFile.toPath(), StandardCopyOption.REPLACE_EXISTING); + } + } + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } catch (DfxException e) { + throw new RuntimeException(e); + } catch (IOException e) { + throw new RuntimeException(e); } - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } catch (DfxException e) { - throw new RuntimeException(e); - } catch (IOException e) { - throw new RuntimeException(e); } } } + else { + log.warn("대상 agent {}[{}] 실행 상태가 정상(ALIVE)이 아니므로 전송을 중지합니다. postman ID: {}", knownAgent.getHostId(), knownAgent.getHostName(), postman.getPostmanId()); + } } } diff --git a/src/main/java/com/bsmlab/dfx/agent/task/status/FileCleanerSchedulerService.java b/src/main/java/com/bsmlab/dfx/agent/task/status/FileCleanerSchedulerService.java new file mode 100644 index 0000000..21cbec7 --- /dev/null +++ b/src/main/java/com/bsmlab/dfx/agent/task/status/FileCleanerSchedulerService.java @@ -0,0 +1,66 @@ +package com.bsmlab.dfx.agent.task.status; + +import com.bsmlab.dfx.agent.config.AgentConfigReader; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; +import org.springframework.stereotype.Service; + +import java.io.File; +import java.text.SimpleDateFormat; +import java.time.Duration; +import java.util.*; +import java.util.concurrent.ScheduledFuture; + +@Slf4j +@RequiredArgsConstructor +@Service +public class FileCleanerSchedulerService { + private final ThreadPoolTaskScheduler fileCleanerThreadPoolTaskScheduler; + private final AgentConfigReader agentConfigReader; + private ScheduledFuture scheduledFuture; + + // StartupRunner 로 부터 실행됨 + public void launch() { + log.debug("FileCleanerSchedulerService launch"); + this.scheduledFuture = fileCleanerThreadPoolTaskScheduler.scheduleWithFixedDelay(this::run, Duration.ofHours(1)); + } + + public void run() { + String processMesssageStorageRoot = agentConfigReader.getAgentConfigDto().getDropBoxConfig().getProcessedMessageStorageRoot(); + File storageRoot = new File(processMesssageStorageRoot); + List allDirectoryList = new ArrayList<>(); + this.findDirectory(storageRoot, allDirectoryList); + Calendar calendar = Calendar.getInstance(); + int toDateCount = agentConfigReader.getAgentConfigDto().getDropBoxConfig().getRetentionDaysOfProcessedMessage() * -1; + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd"); + for(int i = 0; i < toDateCount; i++) { + calendar.roll(Calendar.DATE, (i * -1)); + Date currentDate = calendar.getTime(); + String retentionDirectoryString = processMesssageStorageRoot + "/" + dateFormat.format(currentDate); + Iterator iterator = allDirectoryList.iterator(); + while(iterator.hasNext()) { + File file = iterator.next(); + if(file.getAbsolutePath().contains(retentionDirectoryString)) { + allDirectoryList.remove(file); + } + } + } + for(File file : allDirectoryList) { + if(file.exists()) { + file.delete(); + } + } + } + + private void findDirectory(File parent, List fileList) { + File[] files = parent.listFiles(); + for(File file : files) { + if(!".".equals(file.getName()) && !"..".equals(file.getName()) && file.isDirectory()) { + fileList.add(file); + this.findDirectory(file, fileList); + } + } + } + +}