DropBox Config 변경 (처리 완료 메시지 파일 삭제 관련)

처리 완료 파일 삭제 Scheduler 개발 - bsm-lab/dfxagent#11
main
semin.baek 8 months ago
parent 4b49f71a8f
commit 7426b84980

@ -11,7 +11,7 @@
} }
], ],
"statusChecker": { "statusChecker": {
"cron": "2 0/1 * * * *" "cron": "0/5 * * * * *"
}, },
"dataSourceConfig": [ "dataSourceConfig": [
{ {
@ -41,7 +41,7 @@
} }
] ]
}, },
"dropBox": { "dropBoxConfig": {
"receivedMessageStorageRoot": "D:/projects/bsm-lab/dfx/dfxagent/src/docs/messages/received", "receivedMessageStorageRoot": "D:/projects/bsm-lab/dfx/dfxagent/src/docs/messages/received",
"processedMessageStorageRoot": "D:/projects/bsm-lab/dfx/dfxagent/src/docs/messages/processed", "processedMessageStorageRoot": "D:/projects/bsm-lab/dfx/dfxagent/src/docs/messages/processed",
"failureMessageStorageRoot": "D:/projects/bsm-lab/dfx/dfxagent/src/docs/messages/failure", "failureMessageStorageRoot": "D:/projects/bsm-lab/dfx/dfxagent/src/docs/messages/failure",

@ -11,7 +11,7 @@
} }
], ],
"statusChecker": { "statusChecker": {
"cron": "2 0/1 * * * *" "cron": "0/5 * * * * *"
}, },
"dataSourceConfig": [ "dataSourceConfig": [
{ {
@ -41,7 +41,7 @@
} }
] ]
}, },
"dropBox": { "dropBoxConfig": {
"receivedMessageStorageRoot": "/home/dfxagent/agent/messages/received", "receivedMessageStorageRoot": "/home/dfxagent/agent/messages/received",
"processedMessageStorageRoot": "/home/dfxagent/agent/messages/processed", "processedMessageStorageRoot": "/home/dfxagent/agent/messages/processed",
"failureMessageStorageRoot": "/home/dfxagent/agent/messages/failure", "failureMessageStorageRoot": "/home/dfxagent/agent/messages/failure",

@ -13,7 +13,7 @@
} }
], ],
"statusChecker": { "statusChecker": {
"cron": "2 0/1 * * * *" "cron": "0/5 * * * * *"
}, },
"dataSourceConfig": [ "dataSourceConfig": [
{ {
@ -43,7 +43,7 @@
} }
] ]
}, },
"dropBox": { "dropBoxConfig": {
"receivedMessageStorageRoot": "D:/projects/bsm-lab/dfx/dfxagent/src/docs/messages/received", "receivedMessageStorageRoot": "D:/projects/bsm-lab/dfx/dfxagent/src/docs/messages/received",
"processedMessageStorageRoot": "D:/projects/bsm-lab/dfx/dfxagent/src/docs/messages/processed", "processedMessageStorageRoot": "D:/projects/bsm-lab/dfx/dfxagent/src/docs/messages/processed",
"failureMessageStorageRoot": "D:/projects/bsm-lab/dfx/dfxagent/src/docs/messages/failure", "failureMessageStorageRoot": "D:/projects/bsm-lab/dfx/dfxagent/src/docs/messages/failure",

@ -11,7 +11,7 @@
} }
], ],
"statusChecker": { "statusChecker": {
"cron": "2 0/1 * * * *" "cron": "0/5 * * * * *"
}, },
"dataSourceConfig": [ "dataSourceConfig": [
{ {
@ -41,7 +41,7 @@
} }
] ]
}, },
"dropBox": { "dropBoxConfig": {
"receivedMessageStorageRoot": "/home/dfxagent/agent/messages/received", "receivedMessageStorageRoot": "/home/dfxagent/agent/messages/received",
"processedMessageStorageRoot": "/home/dfxagent/agent/messages/processed", "processedMessageStorageRoot": "/home/dfxagent/agent/messages/processed",
"failureMessageStorageRoot": "/home/dfxagent/agent/messages/failure", "failureMessageStorageRoot": "/home/dfxagent/agent/messages/failure",

@ -14,7 +14,7 @@ public class AgentConfigDto {
private StatusChecker statusChecker; private StatusChecker statusChecker;
private List<DataSourceConfig> dataSourceConfig; private List<DataSourceConfig> dataSourceConfig;
private List<String> sqlMapperLocations; private List<String> sqlMapperLocations;
private DropBoxConfig dropBox; private DropBoxConfig dropBoxConfig;
private PostmanConfig postmanConfig; private PostmanConfig postmanConfig;
private LoggingConfig logging; private LoggingConfig logging;

@ -50,7 +50,7 @@ public class AgentConfigReader {
public AgentConfigDto.DropBox getDropBox(String dropBoxId) { public AgentConfigDto.DropBox getDropBox(String dropBoxId) {
AgentConfigDto.DropBox found = null; AgentConfigDto.DropBox found = null;
for(AgentConfigDto.DropBox dropBox : this.agentConfigDto.getDropBox().getDropBoxList()) { for(AgentConfigDto.DropBox dropBox : this.agentConfigDto.getDropBoxConfig().getDropBoxList()) {
if(dropBox.getDropBoxId().equals(dropBoxId)) { if(dropBox.getDropBoxId().equals(dropBoxId)) {
found = dropBox; found = dropBox;
break; break;

@ -183,7 +183,7 @@ public class DfxAgentConfiguration {
@Bean(name = "dropBoxProcessorThreadPoolTaskScheduler") @Bean(name = "dropBoxProcessorThreadPoolTaskScheduler")
public ThreadPoolTaskScheduler dropBoxProcessorThreadPoolTaskScheduler(AgentConfigReader agentConfigReader) { public ThreadPoolTaskScheduler dropBoxProcessorThreadPoolTaskScheduler(AgentConfigReader agentConfigReader) {
ThreadPoolTaskScheduler dropBoxProcessorThreadPoolTaskScheduler = new ThreadPoolTaskScheduler(); ThreadPoolTaskScheduler dropBoxProcessorThreadPoolTaskScheduler = new ThreadPoolTaskScheduler();
dropBoxProcessorThreadPoolTaskScheduler.setPoolSize(agentConfigReader.getAgentConfigDto().getDropBox().getThreadPoolSize()); dropBoxProcessorThreadPoolTaskScheduler.setPoolSize(agentConfigReader.getAgentConfigDto().getDropBoxConfig().getThreadPoolSize());
dropBoxProcessorThreadPoolTaskScheduler.setThreadNamePrefix("dropBoxProcessor-"); dropBoxProcessorThreadPoolTaskScheduler.setThreadNamePrefix("dropBoxProcessor-");
dropBoxProcessorThreadPoolTaskScheduler.setWaitForTasksToCompleteOnShutdown(true); // 종료 시 대기 여부 dropBoxProcessorThreadPoolTaskScheduler.setWaitForTasksToCompleteOnShutdown(true); // 종료 시 대기 여부
dropBoxProcessorThreadPoolTaskScheduler.initialize(); dropBoxProcessorThreadPoolTaskScheduler.initialize();
@ -209,4 +209,14 @@ public class DfxAgentConfiguration {
statusCheckerThreadPoolTaskScheduler.initialize(); statusCheckerThreadPoolTaskScheduler.initialize();
return statusCheckerThreadPoolTaskScheduler; return statusCheckerThreadPoolTaskScheduler;
} }
// 파일 정리 쓰레드 설정
@Bean(name = "fileCleanerThreadPoolTaskScheduler")
public ThreadPoolTaskScheduler fileCleanerThreadPoolTaskScheduler() {
ThreadPoolTaskScheduler fileCleanerThreadPoolTaskScheduler = new ThreadPoolTaskScheduler();
fileCleanerThreadPoolTaskScheduler.setPoolSize(1);
fileCleanerThreadPoolTaskScheduler.setThreadNamePrefix("file-cleaner-");
fileCleanerThreadPoolTaskScheduler.initialize();
return fileCleanerThreadPoolTaskScheduler;
}
} }

@ -4,6 +4,7 @@ import com.bsmlab.dfx.agent.config.datasource.DynamicDataSourceService;
import com.bsmlab.dfx.agent.task.dropbox.DropBoxService; import com.bsmlab.dfx.agent.task.dropbox.DropBoxService;
import com.bsmlab.dfx.agent.task.dropbox.DropBoxSchedulerService; import com.bsmlab.dfx.agent.task.dropbox.DropBoxSchedulerService;
import com.bsmlab.dfx.agent.task.postman.PostmanSchedulerService; import com.bsmlab.dfx.agent.task.postman.PostmanSchedulerService;
import com.bsmlab.dfx.agent.task.status.FileCleanerSchedulerService;
import com.bsmlab.dfx.agent.task.status.StatusCheckerSchedulerService; import com.bsmlab.dfx.agent.task.status.StatusCheckerSchedulerService;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -20,6 +21,7 @@ public class StartupRunner implements ApplicationRunner {
private final PostmanSchedulerService postmanSchedulerService; private final PostmanSchedulerService postmanSchedulerService;
private final DropBoxSchedulerService dropBoxSchedulerService; private final DropBoxSchedulerService dropBoxSchedulerService;
private final StatusCheckerSchedulerService statusCheckerSchedulerService; private final StatusCheckerSchedulerService statusCheckerSchedulerService;
private final FileCleanerSchedulerService fileCleanerSchedulerService;
private final DropBoxService dropBoxService; private final DropBoxService dropBoxService;
@Override @Override
@ -38,6 +40,9 @@ public class StartupRunner implements ApplicationRunner {
// 다른 agent 상태 확인 서비스 // 다른 agent 상태 확인 서비스
log.info("✅ StatusCheckerSchedulerService 기동"); log.info("✅ StatusCheckerSchedulerService 기동");
statusCheckerSchedulerService.launch(); statusCheckerSchedulerService.launch();
// 처리 완료 파일 정리 서비스
fileCleanerSchedulerService.launch();
log.info("✅ FileCleanerSchedulerService 기동");
// receivedMessageStorageRoot 하위에 처리되지 않은 메시지를 처리 queue에 넣는다. // receivedMessageStorageRoot 하위에 처리되지 않은 메시지를 처리 queue에 넣는다.
int messageCount = dropBoxService.addNotProcessedMessageFile(); int messageCount = dropBoxService.addNotProcessedMessageFile();
log.info("✅ 미처리 메시지 체크 - {} 건 처리 등록", messageCount); log.info("✅ 미처리 메시지 체크 - {} 건 처리 등록", messageCount);

@ -36,7 +36,7 @@ public class DropBoxService {
*/ */
public boolean isExistToday(ReceiveMessageDto receiveMessageDto) { public boolean isExistToday(ReceiveMessageDto receiveMessageDto) {
boolean isExist = false; boolean isExist = false;
File root = new File(agentConfigReader.getAgentConfigDto().getDropBox().getReceivedMessageStorageRoot()); File root = new File(agentConfigReader.getAgentConfigDto().getDropBoxConfig().getReceivedMessageStorageRoot());
Date today = new Date(System.currentTimeMillis()); Date today = new Date(System.currentTimeMillis());
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd/"); SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd/");
String todayDirectoryString = root.getAbsolutePath() + "/" + dateFormat.format(today); String todayDirectoryString = root.getAbsolutePath() + "/" + dateFormat.format(today);
@ -56,9 +56,9 @@ public class DropBoxService {
* @param receiveMessageDto * @param receiveMessageDto
*/ */
public void add(ReceiveMessageDto receiveMessageDto) { public void add(ReceiveMessageDto receiveMessageDto) {
File root = new File(agentConfigReader.getAgentConfigDto().getDropBox().getReceivedMessageStorageRoot()); File root = new File(agentConfigReader.getAgentConfigDto().getDropBoxConfig().getReceivedMessageStorageRoot());
Date today = new Date(System.currentTimeMillis()); Date today = new Date(System.currentTimeMillis());
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd/hh"); SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd/HH");
String targetDirectoryString = root.getAbsolutePath() + "/" + dateFormat.format(today); String targetDirectoryString = root.getAbsolutePath() + "/" + dateFormat.format(today);
File targetDirectory = new File(targetDirectoryString); File targetDirectory = new File(targetDirectoryString);
if(!targetDirectory.exists()) { if(!targetDirectory.exists()) {
@ -81,7 +81,7 @@ public class DropBoxService {
* . queue . * . queue .
*/ */
public int addNotProcessedMessageFile() { public int addNotProcessedMessageFile() {
File root = new File(agentConfigReader.getAgentConfigDto().getDropBox().getReceivedMessageStorageRoot()); File root = new File(agentConfigReader.getAgentConfigDto().getDropBoxConfig().getReceivedMessageStorageRoot());
List<File> fileList = new ArrayList<>(); List<File> fileList = new ArrayList<>();
this.findAndAddMessageFile(root, fileList); this.findAndAddMessageFile(root, fileList);
for(File file : fileList) { for(File file : fileList) {
@ -119,13 +119,13 @@ public class DropBoxService {
public void moveMessageFile(String messageFilePath, boolean isSuccess) throws IOException { public void moveMessageFile(String messageFilePath, boolean isSuccess) throws IOException {
File root = null; File root = null;
if(isSuccess) { if(isSuccess) {
root = new File(agentConfigReader.getAgentConfigDto().getDropBox().getProcessedMessageStorageRoot()); root = new File(agentConfigReader.getAgentConfigDto().getDropBoxConfig().getProcessedMessageStorageRoot());
} }
else { else {
root = new File(agentConfigReader.getAgentConfigDto().getDropBox().getFailureMessageStorageRoot()); root = new File(agentConfigReader.getAgentConfigDto().getDropBoxConfig().getFailureMessageStorageRoot());
} }
Date today = new Date(System.currentTimeMillis()); Date today = new Date(System.currentTimeMillis());
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd/hh"); SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd/HH");
String targetDirectoryString = root.getAbsolutePath() + "/" + dateFormat.format(today); String targetDirectoryString = root.getAbsolutePath() + "/" + dateFormat.format(today);
File targetDirectory = new File(targetDirectoryString); File targetDirectory = new File(targetDirectoryString);
if(!targetDirectory.exists()) { if(!targetDirectory.exists()) {

@ -21,6 +21,7 @@ import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap; import org.springframework.util.MultiValueMap;
import org.springframework.web.client.ResourceAccessException;
import org.springframework.web.client.RestTemplate; import org.springframework.web.client.RestTemplate;
import java.io.File; import java.io.File;
@ -69,132 +70,141 @@ public class PostmanSchedulerService {
} }
public void run(AgentConfigDto.Postman postman) { public void run(AgentConfigDto.Postman postman) {
ObjectMapper objectMapper = new ObjectMapper(); AgentConfigDto.KnownAgent knownAgent = agentConfigReader.getKnownAgent(postman.getRecipientHostId());
String senderHostId = agentConfigReader.getAgentConfigDto().getMyHostId(); if("ALIVE".equals(agentConfigReader.getKnownAgentStatus(knownAgent.getHostId()))) {
long senderTimestamp = System.currentTimeMillis(); ObjectMapper objectMapper = new ObjectMapper();
String messageUuid = UUID.randomUUID().toString(); String senderHostId = agentConfigReader.getAgentConfigDto().getMyHostId();
// DB TO DB 전송 long senderTimestamp = System.currentTimeMillis();
if(AgentConfigDto.TaskType.DB_READ_THEN_SEND == postman.getTaskType()) { String messageUuid = UUID.randomUUID().toString();
String dataSourceId = postman.getMessage().getDataSourceId(); // DB TO DB 전송
String sqlId = postman.getMessage().getSqlId(); if(AgentConfigDto.TaskType.DB_READ_THEN_SEND == postman.getTaskType()) {
try { String dataSourceId = postman.getMessage().getDataSourceId();
List<Map<String, Object>> dataMapList = sqlExecuteService.select(dataSourceId, sqlId, null); String sqlId = postman.getMessage().getSqlId();
String dataString = objectMapper.writeValueAsString(dataMapList); String response = null;
ReceiveMessageDto receiveMessageDto = ReceiveMessageDto.builder().senderHostId(senderHostId).senderTimestamp(senderTimestamp)
.messageUuid(messageUuid).messageType(AgentConfigDto.MessageType.TRANSFER_DB_TO_DB)
.recipientHostId(postman.getRecipientHostId()).recipientDropBoxId(postman.getRecipientDropBoxId())
.data(dataString).build();
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<ReceiveMessageDto> bodyEntity = new HttpEntity<>(receiveMessageDto, httpHeaders);
RestTemplate restTemplate = new RestTemplate();
AgentConfigDto.KnownAgent knownAgent = agentConfigReader.getKnownAgent(postman.getRecipientHostId());
String url = "http://" + knownAgent.getHostName() + ":" + knownAgent.getListenPort() + "/listen";
log.debug("postman to {} send a message UUID {}", receiveMessageDto.getRecipientHostId(), receiveMessageDto.getMessageUuid());
String response = restTemplate.postForObject(url, bodyEntity, String.class);
AckDto ackDto = objectMapper.readValue(response, new TypeReference<AckDto>() {});
log.debug("postman received ack from {} ack: {}", receiveMessageDto.getRecipientHostId(), ackDto);
if(AckDto.ResultType.RECEIVE_SUCCESS != ackDto.getResult()) {
throw new DfxException(postman.getRecipientHostId() + "에게 전송하였으나 상대방이 수신하지 못하였습니다." + response);
}
else {
String postProcessingSqlId = postman.getMessage().getPostProcessingSqlId();
sqlExecuteService.update(dataSourceId, postProcessingSqlId, new HashMap<String, Object>());
}
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
} catch (DfxException e) {
throw new RuntimeException(e);
}
}
else if(AgentConfigDto.TaskType.FILE_READ_THEN_SEND == postman.getTaskType()) {
File rootDirectoryfile = new File(postman.getMessage().getWatchDirectory());
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy/MM/dd/hh/mm/ss");
Date today = new Date(System.currentTimeMillis());
String dateString = simpleDateFormat.format(today);
List<File> fileReadyList = new ArrayList<>();
List<File> workingFileList = new ArrayList<>();
if(rootDirectoryfile.isDirectory() && rootDirectoryfile.listFiles().length > 0) {
// 대상 파일 찾기 -> 작업 디렉토리(working)로 이동 -> 작업 완료 후 완료 디렉토리(done)로 이동
File[] files = rootDirectoryfile.listFiles();
for(File file : files) {
if(file.isFile()) {
fileReadyList.add(file);
}
}
List<Map<String, Object>> data = new ArrayList<>();
String dataString = null;
String workingDirectory = rootDirectoryfile.getAbsolutePath() + "/working/" + dateString;
File workingDirectoryFile = new File(workingDirectory);
if(!workingDirectoryFile.exists()) {
workingDirectoryFile.mkdirs();
}
try { try {
// 작업 디렉토리(working)로 이동 List<Map<String, Object>> dataMapList = sqlExecuteService.select(dataSourceId, sqlId, null);
for(File file : fileReadyList) { String dataString = objectMapper.writeValueAsString(dataMapList);
Files.move(file.toPath(), workingDirectoryFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
workingFileList.add(new File(workingDirectoryFile.getAbsoluteFile() + "/" + file.getName()));
}
// ReceiveMessageDto - messageJson 만들기
for(File file : workingFileList) {
Map<String, Object> map = new HashMap<>();
Map<String, Object> paramMap = new HashMap<>();
map.put("file-name", file.getName());
if(postman.getMessage() != null && StringUtils.isNotEmpty(postman.getMessage().getMetaDropBoxId())
&& StringUtils.isNotEmpty(postman.getMessage().getMetaDataSqlId()) && StringUtils.isNotEmpty(postman.getMessage().getMetaDataDataSourceId())) {
map.put("meta-drop-box-id", postman.getMessage().getMetaDropBoxId());
paramMap.put("fileName", file.getName());
List<Map<String, Object>> messageDataMapList = sqlExecuteService.select(postman.getMessage().getDataSourceId(), postman.getMessage().getMetaDataSqlId(), paramMap);
if(messageDataMapList != null && messageDataMapList.get(0) != null) {
map.put("meta-data", messageDataMapList.get(0));
}
}
data.add(map);
}
dataString = objectMapper.writeValueAsString(data);
ReceiveMessageDto receiveMessageDto = ReceiveMessageDto.builder().senderHostId(senderHostId).senderTimestamp(senderTimestamp) ReceiveMessageDto receiveMessageDto = ReceiveMessageDto.builder().senderHostId(senderHostId).senderTimestamp(senderTimestamp)
.messageUuid(messageUuid).messageType(AgentConfigDto.MessageType.TRANSFER_DB_TO_DB) .messageUuid(messageUuid).messageType(AgentConfigDto.MessageType.TRANSFER_DB_TO_DB)
.recipientHostId(postman.getRecipientHostId()).recipientDropBoxId(postman.getRecipientDropBoxId()) .recipientHostId(postman.getRecipientHostId()).recipientDropBoxId(postman.getRecipientDropBoxId())
.data(dataString).build(); .data(dataString).build();
String messageString = objectMapper.writeValueAsString(receiveMessageDto);
// http 준비
HttpHeaders httpHeaders = new HttpHeaders(); HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.setContentType(MediaType.MULTIPART_FORM_DATA); httpHeaders.setContentType(MediaType.APPLICATION_JSON);
MultiValueMap<String, Object> body = new LinkedMultiValueMap<>(); HttpEntity<ReceiveMessageDto> bodyEntity = new HttpEntity<>(receiveMessageDto, httpHeaders);
// 첫 번째 멀티파트는 message json
body.add("json", new HttpEntity<>(messageString, httpHeaders));
// 두 번째 이후 멀티파트는 파일
for(File file : workingFileList) {
body.add(file.getName(), new FileSystemResource(file));
}
// 전송
RestTemplate restTemplate = new RestTemplate(); RestTemplate restTemplate = new RestTemplate();
AgentConfigDto.KnownAgent knownAgent = agentConfigReader.getKnownAgent(postman.getRecipientHostId()); String url = "http://" + knownAgent.getHostName() + ":" + knownAgent.getListenPort() + "/listen";
String url = "https://" + knownAgent.getHostName() + ":" + knownAgent.getListenPort() + "/listen"; log.debug("postman to {} send a message UUID {}", receiveMessageDto.getRecipientHostId(), receiveMessageDto.getMessageUuid());
String response = restTemplate.postForObject(url, body, String.class); response = restTemplate.postForObject(url, bodyEntity, String.class);
AckDto ackDto = objectMapper.readValue(response, new TypeReference<AckDto>() {}); AckDto ackDto = objectMapper.readValue(response, new TypeReference<AckDto>() {});
log.debug("postman received ack from {} ack: {}", receiveMessageDto.getRecipientHostId(), ackDto);
if(AckDto.ResultType.RECEIVE_SUCCESS != ackDto.getResult()) { if(AckDto.ResultType.RECEIVE_SUCCESS != ackDto.getResult()) {
throw new DfxException(postman.getRecipientHostId() + "에게 전송하였으나 상대방이 수신하지 못하였습니다." + response); log.warn("대상 agent {}[{}]에게 전송하였으나 상대방이 수신하지 못하였습니다. 응답: {} 응답메시지: {}"
, knownAgent.getHostId(), knownAgent.getHostName(), ackDto.getResult(), ackDto.getResultText());
} }
else { else {
// 작업 완료(done) 디렉토리로 이동 String postProcessingSqlId = postman.getMessage().getPostProcessingSqlId();
String doneDirectory = rootDirectoryfile + "/done/" + dateString; sqlExecuteService.update(dataSourceId, postProcessingSqlId, new HashMap<String, Object>());
File doneDirectoryFile = new File(doneDirectory); }
if(!doneDirectoryFile.exists()) { }
doneDirectoryFile.mkdirs(); catch (JsonProcessingException e) {
log.warn("대상 agent {}[{}]에게 전송하였으나 응답메시지가 비정상입니다. 응답메시지: {}", knownAgent.getHostId(), knownAgent.getHostName(), response, e);
}
catch (ResourceAccessException e) {
log.warn("대상 agent {}[{}]에게 전송하였으나 응답하지 않습니다. exception: {}", knownAgent.getHostId(), knownAgent.getHostName(), e.getLocalizedMessage());
agentConfigReader.setKnownAgentStatus(knownAgent.getHostId(), "DOWN");
}
}
else if(AgentConfigDto.TaskType.FILE_READ_THEN_SEND == postman.getTaskType()) {
File rootDirectoryfile = new File(postman.getMessage().getWatchDirectory());
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy/MM/dd/hh/mm/ss");
Date today = new Date(System.currentTimeMillis());
String dateString = simpleDateFormat.format(today);
List<File> fileReadyList = new ArrayList<>();
List<File> workingFileList = new ArrayList<>();
if(rootDirectoryfile.isDirectory() && rootDirectoryfile.listFiles().length > 0) {
// 대상 파일 찾기 -> 작업 디렉토리(working)로 이동 -> 작업 완료 후 완료 디렉토리(done)로 이동
File[] files = rootDirectoryfile.listFiles();
for(File file : files) {
if(file.isFile()) {
fileReadyList.add(file);
} }
for(File file : workingFileList) { }
List<Map<String, Object>> data = new ArrayList<>();
String dataString = null;
String workingDirectory = rootDirectoryfile.getAbsolutePath() + "/working/" + dateString;
File workingDirectoryFile = new File(workingDirectory);
if(!workingDirectoryFile.exists()) {
workingDirectoryFile.mkdirs();
}
try {
// 작업 디렉토리(working)로 이동
for(File file : fileReadyList) {
Files.move(file.toPath(), workingDirectoryFile.toPath(), StandardCopyOption.REPLACE_EXISTING); Files.move(file.toPath(), workingDirectoryFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
workingFileList.add(new File(workingDirectoryFile.getAbsoluteFile() + "/" + file.getName()));
}
// ReceiveMessageDto - messageJson 만들기
for(File file : workingFileList) {
Map<String, Object> map = new HashMap<>();
Map<String, Object> paramMap = new HashMap<>();
map.put("file-name", file.getName());
if(postman.getMessage() != null && StringUtils.isNotEmpty(postman.getMessage().getMetaDropBoxId())
&& StringUtils.isNotEmpty(postman.getMessage().getMetaDataSqlId()) && StringUtils.isNotEmpty(postman.getMessage().getMetaDataDataSourceId())) {
map.put("meta-drop-box-id", postman.getMessage().getMetaDropBoxId());
paramMap.put("fileName", file.getName());
List<Map<String, Object>> messageDataMapList = sqlExecuteService.select(postman.getMessage().getDataSourceId(), postman.getMessage().getMetaDataSqlId(), paramMap);
if(messageDataMapList != null && messageDataMapList.get(0) != null) {
map.put("meta-data", messageDataMapList.get(0));
}
}
data.add(map);
} }
dataString = objectMapper.writeValueAsString(data);
ReceiveMessageDto receiveMessageDto = ReceiveMessageDto.builder().senderHostId(senderHostId).senderTimestamp(senderTimestamp)
.messageUuid(messageUuid).messageType(AgentConfigDto.MessageType.TRANSFER_DB_TO_DB)
.recipientHostId(postman.getRecipientHostId()).recipientDropBoxId(postman.getRecipientDropBoxId())
.data(dataString).build();
String messageString = objectMapper.writeValueAsString(receiveMessageDto);
// http 준비
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.setContentType(MediaType.MULTIPART_FORM_DATA);
MultiValueMap<String, Object> body = new LinkedMultiValueMap<>();
// 첫 번째 멀티파트는 message json
body.add("json", new HttpEntity<>(messageString, httpHeaders));
// 두 번째 이후 멀티파트는 파일
for(File file : workingFileList) {
body.add(file.getName(), new FileSystemResource(file));
}
// 전송
RestTemplate restTemplate = new RestTemplate();
String url = "https://" + knownAgent.getHostName() + ":" + knownAgent.getListenPort() + "/listen";
String response = restTemplate.postForObject(url, body, String.class);
AckDto ackDto = objectMapper.readValue(response, new TypeReference<AckDto>() {});
if(AckDto.ResultType.RECEIVE_SUCCESS != ackDto.getResult()) {
throw new DfxException(postman.getRecipientHostId() + "에게 전송하였으나 상대방이 수신하지 못하였습니다." + response);
}
else {
// 작업 완료(done) 디렉토리로 이동
String doneDirectory = rootDirectoryfile + "/done/" + dateString;
File doneDirectoryFile = new File(doneDirectory);
if(!doneDirectoryFile.exists()) {
doneDirectoryFile.mkdirs();
}
for(File file : workingFileList) {
Files.move(file.toPath(), workingDirectoryFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
}
}
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
} catch (DfxException e) {
throw new RuntimeException(e);
} catch (IOException e) {
throw new RuntimeException(e);
} }
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
} catch (DfxException e) {
throw new RuntimeException(e);
} catch (IOException e) {
throw new RuntimeException(e);
} }
} }
} }
else {
log.warn("대상 agent {}[{}] 실행 상태가 정상(ALIVE)이 아니므로 전송을 중지합니다. postman ID: {}", knownAgent.getHostId(), knownAgent.getHostName(), postman.getPostmanId());
}
} }
} }

@ -0,0 +1,66 @@
package com.bsmlab.dfx.agent.task.status;
import com.bsmlab.dfx.agent.config.AgentConfigReader;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.stereotype.Service;
import java.io.File;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ScheduledFuture;
@Slf4j
@RequiredArgsConstructor
@Service
public class FileCleanerSchedulerService {
private final ThreadPoolTaskScheduler fileCleanerThreadPoolTaskScheduler;
private final AgentConfigReader agentConfigReader;
private ScheduledFuture<?> scheduledFuture;
// StartupRunner 로 부터 실행됨
public void launch() {
log.debug("FileCleanerSchedulerService launch");
this.scheduledFuture = fileCleanerThreadPoolTaskScheduler.scheduleWithFixedDelay(this::run, Duration.ofHours(1));
}
public void run() {
String processMesssageStorageRoot = agentConfigReader.getAgentConfigDto().getDropBoxConfig().getProcessedMessageStorageRoot();
File storageRoot = new File(processMesssageStorageRoot);
List<File> allDirectoryList = new ArrayList<>();
this.findDirectory(storageRoot, allDirectoryList);
Calendar calendar = Calendar.getInstance();
int toDateCount = agentConfigReader.getAgentConfigDto().getDropBoxConfig().getRetentionDaysOfProcessedMessage() * -1;
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd");
for(int i = 0; i < toDateCount; i++) {
calendar.roll(Calendar.DATE, (i * -1));
Date currentDate = calendar.getTime();
String retentionDirectoryString = processMesssageStorageRoot + "/" + dateFormat.format(currentDate);
Iterator<File> iterator = allDirectoryList.iterator();
while(iterator.hasNext()) {
File file = iterator.next();
if(file.getAbsolutePath().contains(retentionDirectoryString)) {
allDirectoryList.remove(file);
}
}
}
for(File file : allDirectoryList) {
if(file.exists()) {
file.delete();
}
}
}
private void findDirectory(File parent, List<File> fileList) {
File[] files = parent.listFiles();
for(File file : files) {
if(!".".equals(file.getName()) && !"..".equals(file.getName()) && file.isDirectory()) {
fileList.add(file);
this.findDirectory(file, fileList);
}
}
}
}
Loading…
Cancel
Save