diff --git a/src/main/java/com/bsmlab/dfx/agent/task/dropbox/DropBoxSchedulerService.java b/src/main/java/com/bsmlab/dfx/agent/task/dropbox/DropBoxSchedulerService.java index d38b183..805ee65 100644 --- a/src/main/java/com/bsmlab/dfx/agent/task/dropbox/DropBoxSchedulerService.java +++ b/src/main/java/com/bsmlab/dfx/agent/task/dropbox/DropBoxSchedulerService.java @@ -24,6 +24,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.ObjectInputStream; import java.time.Duration; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ScheduledFuture; @@ -36,18 +37,19 @@ public class DropBoxSchedulerService { private final AgentConfigReader agentConfigReader; private final SqlExecuteService sqlExecuteService; private final DropBoxService dropBoxService; - + private Map> scheduledFutureMap; /** * 수신한 메시지를 처리하는 쓰레드 * DB 저장 로직과 파일 또는 파일 + 메타데이터 저장 로직으로 구성됨 * @param messageFilePath 메지시 파일 절대 경로 */ @SuppressWarnings("unchecked") - public void processDropBox(String messageFilePath) { + public void processDropBox(String messageFilePath) throws Exception { ReceiveMessageDto receiveMessageDto = ReceiveMessageDto.builder().build(); ReceiveMessageDto.ProcessStatus processStatus; String processMessage = ""; String messageUuid; + Exception processException = null; try (ObjectInputStream objectInputStream = new ObjectInputStream(new FileInputStream(messageFilePath))) { receiveMessageDto = (ReceiveMessageDto)objectInputStream.readObject(); messageUuid = receiveMessageDto.getMessageUuid(); @@ -59,9 +61,6 @@ public class DropBoxSchedulerService { List> dataMapList; dataMapList = (List>) objectMapper.readValue(receiveMessageDto.getData(), List.class); sqlExecuteService.insert(dropBox.getDataSourceId(), dropBox.getSqlId(), dataMapList); -// for(Map dataMap : dataMapList) { -// sqlExecuteService.insert(dropBox.getDataSourceId(), dropBox.getSqlId(), dataMap); -// } } else if(dropBox.getTaskType() == AgentConfigDto.TaskType.RECEIVE_FILE) { // 1.2 파일 수신 메시지 처리 @@ -93,26 +92,30 @@ public class DropBoxSchedulerService { dropBoxService.clearMessageFile(messageFilePath); processStatus = ReceiveMessageDto.ProcessStatus.PROCESS_NOT_POSSIBLE; processMessage = e.getMessage(); - log.error("process not possible - messageFilePath: "); + log.error("process not possible - messageFilePath: {}", messageFilePath); log.error("{}", e, e); + processException = e; } catch (IOException e) { //수신한 메시지를 로드하지 못했으니 메시지 처리 불가. ReceiveMessageDto 를 알 수 없으므로 로그 남기고 종료 processStatus = ReceiveMessageDto.ProcessStatus.PROCESS_FAIL; processMessage = e.getMessage(); - log.error("cannot load a message file - messageFilePath: "); + log.error("cannot load a message file - messageFilePath: {}", messageFilePath); log.error("{}", e, e); + processException = e; } catch (ClassNotFoundException e) { //ReceiveMessageDto 변환 실패. 로그 남기고 파일을 미처리 상태 경로로 옮기고 ReceiveMessageDto 를 알 수 없으므로 종료 processStatus = ReceiveMessageDto.ProcessStatus.PROCESS_FAIL; processMessage = e.getMessage(); - log.error("cannot parse a message file - messageFilePath: "); + log.error("cannot parse a message file - messageFilePath: {}", messageFilePath); log.error("{}", e, e); + processException = e; } catch (Exception e) { //기타 메시지 처리 중 오류 processStatus = ReceiveMessageDto.ProcessStatus.PROCESS_FAIL; processMessage = e.getMessage(); - log.error("cannot process a message file - messageFilePath: "); + log.error("cannot process a message file - messageFilePath: {}", messageFilePath); log.error("{}", e, e); + processException = e; } //2. 결과 Ack 전송 if(StringUtils.isNotBlank(receiveMessageDto.getMessageUuid()) && StringUtils.isNotBlank(receiveMessageDto.getSenderHostId())) { @@ -127,6 +130,10 @@ public class DropBoxSchedulerService { } } catch (IOException ex) { log.error("{}", ex, ex); + processException = ex; + } + if(processException != null) { + throw processException; } } @@ -161,23 +168,44 @@ public class DropBoxSchedulerService { } } - private void run() { - try { + public void launch() { // 실행확인됨 + log.debug("{} launch", this.getClass().getName()); + this.scheduledFutureMap = new HashMap<>(); + for(int i = 0; i < this.agentConfigReader.getAgentConfigDto().getDropBoxConfig().getThreadPoolSize(); i++) { + this.scheduleTask(i); + } + } + + private void scheduleTask(int taskId) { + Runnable task = () -> { String messageFilePath = dropBoxService.poll(); - if(StringUtils.isNotBlank(messageFilePath)) { - log.info("dropBoxProcessThread {} process a message {}", Thread.currentThread().getName(), messageFilePath); - this.processDropBox(messageFilePath); + try { + if(StringUtils.isNotBlank(messageFilePath)) { + log.info("dropBoxProcessThread {} process a message {} (extra poll size: {})", Thread.currentThread().getName(), messageFilePath, dropBoxService.getPollSize()); + this.processDropBox(messageFilePath); + } + Thread.sleep(10); } - Thread.sleep(10); - } catch (InterruptedException e) { - log.error("{}", e, e); - Thread.currentThread().interrupt(); - } + catch(InterruptedException e) { + log.error("{}", e, e); + Thread.currentThread().interrupt(); + this.rescheduleTask(taskId); + } + catch(Exception e) { + log.error("Exception on processDropBox. messageFilePath: {}", messageFilePath, e); + this.rescheduleTask(taskId); + } + }; + ScheduledFuture future = dropBoxProcessorThreadPoolTaskScheduler.scheduleWithFixedDelay(task, Duration.ofMillis(10)); + this.scheduledFutureMap.put(taskId, future); } - public void launch() { // 실행확인됨 - log.debug("{} launch", this.getClass().getName()); - ScheduledFuture scheduledFuture = dropBoxProcessorThreadPoolTaskScheduler.scheduleWithFixedDelay(this::run, Duration.ofMillis(10)); + private void rescheduleTask(int taskId) { + ScheduledFuture future = this.scheduledFutureMap.get(taskId); + if(future != null && !future.isCancelled()) { + future.cancel(true); + scheduleTask(taskId); + } } } diff --git a/src/main/java/com/bsmlab/dfx/agent/task/dropbox/DropBoxService.java b/src/main/java/com/bsmlab/dfx/agent/task/dropbox/DropBoxService.java index 76ffeae..0e7121e 100644 --- a/src/main/java/com/bsmlab/dfx/agent/task/dropbox/DropBoxService.java +++ b/src/main/java/com/bsmlab/dfx/agent/task/dropbox/DropBoxService.java @@ -116,6 +116,10 @@ public class DropBoxService { return queue.poll(); } + public int getPollSize() { + return queue.size(); + } + public void moveMessageFile(String messageFilePath, boolean isSuccess) throws IOException { File root = null; if(isSuccess) { diff --git a/src/main/java/com/bsmlab/dfx/agent/task/postman/PostmanSchedulerService.java b/src/main/java/com/bsmlab/dfx/agent/task/postman/PostmanSchedulerService.java index 06a1333..d0589aa 100644 --- a/src/main/java/com/bsmlab/dfx/agent/task/postman/PostmanSchedulerService.java +++ b/src/main/java/com/bsmlab/dfx/agent/task/postman/PostmanSchedulerService.java @@ -98,7 +98,7 @@ public class PostmanSchedulerService { HttpEntity bodyEntity = new HttpEntity<>(receiveMessageDto, httpHeaders); RestTemplate restTemplate = new RestTemplate(); String url = "http://" + knownAgent.getHostName() + ":" + knownAgent.getListenPort() + "/listen"; - log.debug("postman to {} send a message UUID {}", receiveMessageDto.getRecipientHostId(), receiveMessageDto.getMessageUuid()); + log.debug("postman to {} send a message UUID {} (data count: {})", receiveMessageDto.getRecipientHostId(), receiveMessageDto.getMessageUuid(), dataMapList.size()); response = restTemplate.postForObject(url, bodyEntity, String.class); AckDto ackDto = objectMapper.readValue(response, new TypeReference() {}); log.debug("postman received ack from {} ack: {}", receiveMessageDto.getRecipientHostId(), ackDto); diff --git a/src/main/java/com/bsmlab/dfx/agent/task/status/StatusCheckerSchedulerService.java b/src/main/java/com/bsmlab/dfx/agent/task/status/StatusCheckerSchedulerService.java index 807c656..07b37e3 100644 --- a/src/main/java/com/bsmlab/dfx/agent/task/status/StatusCheckerSchedulerService.java +++ b/src/main/java/com/bsmlab/dfx/agent/task/status/StatusCheckerSchedulerService.java @@ -4,6 +4,7 @@ import com.bsmlab.dfx.agent.config.AgentConfigDto; import com.bsmlab.dfx.agent.config.AgentConfigReader; import com.bsmlab.dfx.agent.listener.dto.AckDto; import com.bsmlab.dfx.agent.listener.dto.CommandDto; +import com.bsmlab.dfx.agent.task.dropbox.DropBoxService; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; @@ -28,6 +29,7 @@ import java.util.concurrent.ScheduledFuture; public class StatusCheckerSchedulerService { private final ThreadPoolTaskScheduler statusCheckerThreadPoolTaskScheduler; private final AgentConfigReader agentConfigReader; + private final DropBoxService dropBoxService; private ScheduledFuture scheduledFuture; // StartupRunner 로 부터 실행됨 @@ -39,6 +41,7 @@ public class StatusCheckerSchedulerService { // statusCheckerThreadPoolTaskScheduler 가 실행함 public void run() { + log.info("status-check extra poll size: {}", dropBoxService.getPollSize()); List knownAgentList = this.agentConfigReader.getAgentConfigDto().getKnownAgentList(); if(knownAgentList != null) { for(AgentConfigDto.KnownAgent knownAgent : knownAgentList) {