DropBoxScheduler 쓰레드 종료 또는 오류시 재시작 로직 추가

로그 보강.
main
semin.baek 7 months ago
parent 255ac3d703
commit 9f74754fa0

@ -24,6 +24,7 @@ import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.ObjectInputStream; import java.io.ObjectInputStream;
import java.time.Duration; import java.time.Duration;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
@ -36,18 +37,19 @@ public class DropBoxSchedulerService {
private final AgentConfigReader agentConfigReader; private final AgentConfigReader agentConfigReader;
private final SqlExecuteService sqlExecuteService; private final SqlExecuteService sqlExecuteService;
private final DropBoxService dropBoxService; private final DropBoxService dropBoxService;
private Map<Integer, ScheduledFuture<?>> scheduledFutureMap;
/** /**
* *
* DB + * DB +
* @param messageFilePath * @param messageFilePath
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void processDropBox(String messageFilePath) { public void processDropBox(String messageFilePath) throws Exception {
ReceiveMessageDto receiveMessageDto = ReceiveMessageDto.builder().build(); ReceiveMessageDto receiveMessageDto = ReceiveMessageDto.builder().build();
ReceiveMessageDto.ProcessStatus processStatus; ReceiveMessageDto.ProcessStatus processStatus;
String processMessage = ""; String processMessage = "";
String messageUuid; String messageUuid;
Exception processException = null;
try (ObjectInputStream objectInputStream = new ObjectInputStream(new FileInputStream(messageFilePath))) { try (ObjectInputStream objectInputStream = new ObjectInputStream(new FileInputStream(messageFilePath))) {
receiveMessageDto = (ReceiveMessageDto)objectInputStream.readObject(); receiveMessageDto = (ReceiveMessageDto)objectInputStream.readObject();
messageUuid = receiveMessageDto.getMessageUuid(); messageUuid = receiveMessageDto.getMessageUuid();
@ -59,9 +61,6 @@ public class DropBoxSchedulerService {
List<Map<String, Object>> dataMapList; List<Map<String, Object>> dataMapList;
dataMapList = (List<Map<String, Object>>) objectMapper.readValue(receiveMessageDto.getData(), List.class); dataMapList = (List<Map<String, Object>>) objectMapper.readValue(receiveMessageDto.getData(), List.class);
sqlExecuteService.insert(dropBox.getDataSourceId(), dropBox.getSqlId(), dataMapList); sqlExecuteService.insert(dropBox.getDataSourceId(), dropBox.getSqlId(), dataMapList);
// for(Map<String, Object> dataMap : dataMapList) {
// sqlExecuteService.insert(dropBox.getDataSourceId(), dropBox.getSqlId(), dataMap);
// }
} }
else if(dropBox.getTaskType() == AgentConfigDto.TaskType.RECEIVE_FILE) { else if(dropBox.getTaskType() == AgentConfigDto.TaskType.RECEIVE_FILE) {
// 1.2 파일 수신 메시지 처리 // 1.2 파일 수신 메시지 처리
@ -93,26 +92,30 @@ public class DropBoxSchedulerService {
dropBoxService.clearMessageFile(messageFilePath); dropBoxService.clearMessageFile(messageFilePath);
processStatus = ReceiveMessageDto.ProcessStatus.PROCESS_NOT_POSSIBLE; processStatus = ReceiveMessageDto.ProcessStatus.PROCESS_NOT_POSSIBLE;
processMessage = e.getMessage(); processMessage = e.getMessage();
log.error("process not possible - messageFilePath: "); log.error("process not possible - messageFilePath: {}", messageFilePath);
log.error("{}", e, e); log.error("{}", e, e);
processException = e;
} catch (IOException e) { } catch (IOException e) {
//수신한 메시지를 로드하지 못했으니 메시지 처리 불가. ReceiveMessageDto 를 알 수 없으므로 로그 남기고 종료 //수신한 메시지를 로드하지 못했으니 메시지 처리 불가. ReceiveMessageDto 를 알 수 없으므로 로그 남기고 종료
processStatus = ReceiveMessageDto.ProcessStatus.PROCESS_FAIL; processStatus = ReceiveMessageDto.ProcessStatus.PROCESS_FAIL;
processMessage = e.getMessage(); processMessage = e.getMessage();
log.error("cannot load a message file - messageFilePath: "); log.error("cannot load a message file - messageFilePath: {}", messageFilePath);
log.error("{}", e, e); log.error("{}", e, e);
processException = e;
} catch (ClassNotFoundException e) { } catch (ClassNotFoundException e) {
//ReceiveMessageDto 변환 실패. 로그 남기고 파일을 미처리 상태 경로로 옮기고 ReceiveMessageDto 를 알 수 없으므로 종료 //ReceiveMessageDto 변환 실패. 로그 남기고 파일을 미처리 상태 경로로 옮기고 ReceiveMessageDto 를 알 수 없으므로 종료
processStatus = ReceiveMessageDto.ProcessStatus.PROCESS_FAIL; processStatus = ReceiveMessageDto.ProcessStatus.PROCESS_FAIL;
processMessage = e.getMessage(); processMessage = e.getMessage();
log.error("cannot parse a message file - messageFilePath: "); log.error("cannot parse a message file - messageFilePath: {}", messageFilePath);
log.error("{}", e, e); log.error("{}", e, e);
processException = e;
} catch (Exception e) { } catch (Exception e) {
//기타 메시지 처리 중 오류 //기타 메시지 처리 중 오류
processStatus = ReceiveMessageDto.ProcessStatus.PROCESS_FAIL; processStatus = ReceiveMessageDto.ProcessStatus.PROCESS_FAIL;
processMessage = e.getMessage(); processMessage = e.getMessage();
log.error("cannot process a message file - messageFilePath: "); log.error("cannot process a message file - messageFilePath: {}", messageFilePath);
log.error("{}", e, e); log.error("{}", e, e);
processException = e;
} }
//2. 결과 Ack 전송 //2. 결과 Ack 전송
if(StringUtils.isNotBlank(receiveMessageDto.getMessageUuid()) && StringUtils.isNotBlank(receiveMessageDto.getSenderHostId())) { if(StringUtils.isNotBlank(receiveMessageDto.getMessageUuid()) && StringUtils.isNotBlank(receiveMessageDto.getSenderHostId())) {
@ -127,6 +130,10 @@ public class DropBoxSchedulerService {
} }
} catch (IOException ex) { } catch (IOException ex) {
log.error("{}", ex, ex); log.error("{}", ex, ex);
processException = ex;
}
if(processException != null) {
throw processException;
} }
} }
@ -161,23 +168,44 @@ public class DropBoxSchedulerService {
} }
} }
private void run() { public void launch() { // 실행확인됨
try { log.debug("{} launch", this.getClass().getName());
this.scheduledFutureMap = new HashMap<>();
for(int i = 0; i < this.agentConfigReader.getAgentConfigDto().getDropBoxConfig().getThreadPoolSize(); i++) {
this.scheduleTask(i);
}
}
private void scheduleTask(int taskId) {
Runnable task = () -> {
String messageFilePath = dropBoxService.poll(); String messageFilePath = dropBoxService.poll();
if(StringUtils.isNotBlank(messageFilePath)) { try {
log.info("dropBoxProcessThread {} process a message {}", Thread.currentThread().getName(), messageFilePath); if(StringUtils.isNotBlank(messageFilePath)) {
this.processDropBox(messageFilePath); log.info("dropBoxProcessThread {} process a message {} (extra poll size: {})", Thread.currentThread().getName(), messageFilePath, dropBoxService.getPollSize());
this.processDropBox(messageFilePath);
}
Thread.sleep(10);
} }
Thread.sleep(10); catch(InterruptedException e) {
} catch (InterruptedException e) { log.error("{}", e, e);
log.error("{}", e, e); Thread.currentThread().interrupt();
Thread.currentThread().interrupt(); this.rescheduleTask(taskId);
} }
catch(Exception e) {
log.error("Exception on processDropBox. messageFilePath: {}", messageFilePath, e);
this.rescheduleTask(taskId);
}
};
ScheduledFuture<?> future = dropBoxProcessorThreadPoolTaskScheduler.scheduleWithFixedDelay(task, Duration.ofMillis(10));
this.scheduledFutureMap.put(taskId, future);
} }
public void launch() { // 실행확인됨 private void rescheduleTask(int taskId) {
log.debug("{} launch", this.getClass().getName()); ScheduledFuture<?> future = this.scheduledFutureMap.get(taskId);
ScheduledFuture<?> scheduledFuture = dropBoxProcessorThreadPoolTaskScheduler.scheduleWithFixedDelay(this::run, Duration.ofMillis(10)); if(future != null && !future.isCancelled()) {
future.cancel(true);
scheduleTask(taskId);
}
} }
} }

@ -116,6 +116,10 @@ public class DropBoxService {
return queue.poll(); return queue.poll();
} }
public int getPollSize() {
return queue.size();
}
public void moveMessageFile(String messageFilePath, boolean isSuccess) throws IOException { public void moveMessageFile(String messageFilePath, boolean isSuccess) throws IOException {
File root = null; File root = null;
if(isSuccess) { if(isSuccess) {

@ -98,7 +98,7 @@ public class PostmanSchedulerService {
HttpEntity<ReceiveMessageDto> bodyEntity = new HttpEntity<>(receiveMessageDto, httpHeaders); HttpEntity<ReceiveMessageDto> bodyEntity = new HttpEntity<>(receiveMessageDto, httpHeaders);
RestTemplate restTemplate = new RestTemplate(); RestTemplate restTemplate = new RestTemplate();
String url = "http://" + knownAgent.getHostName() + ":" + knownAgent.getListenPort() + "/listen"; String url = "http://" + knownAgent.getHostName() + ":" + knownAgent.getListenPort() + "/listen";
log.debug("postman to {} send a message UUID {}", receiveMessageDto.getRecipientHostId(), receiveMessageDto.getMessageUuid()); log.debug("postman to {} send a message UUID {} (data count: {})", receiveMessageDto.getRecipientHostId(), receiveMessageDto.getMessageUuid(), dataMapList.size());
response = restTemplate.postForObject(url, bodyEntity, String.class); response = restTemplate.postForObject(url, bodyEntity, String.class);
AckDto ackDto = objectMapper.readValue(response, new TypeReference<AckDto>() {}); AckDto ackDto = objectMapper.readValue(response, new TypeReference<AckDto>() {});
log.debug("postman received ack from {} ack: {}", receiveMessageDto.getRecipientHostId(), ackDto); log.debug("postman received ack from {} ack: {}", receiveMessageDto.getRecipientHostId(), ackDto);

@ -4,6 +4,7 @@ import com.bsmlab.dfx.agent.config.AgentConfigDto;
import com.bsmlab.dfx.agent.config.AgentConfigReader; import com.bsmlab.dfx.agent.config.AgentConfigReader;
import com.bsmlab.dfx.agent.listener.dto.AckDto; import com.bsmlab.dfx.agent.listener.dto.AckDto;
import com.bsmlab.dfx.agent.listener.dto.CommandDto; import com.bsmlab.dfx.agent.listener.dto.CommandDto;
import com.bsmlab.dfx.agent.task.dropbox.DropBoxService;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
@ -28,6 +29,7 @@ import java.util.concurrent.ScheduledFuture;
public class StatusCheckerSchedulerService { public class StatusCheckerSchedulerService {
private final ThreadPoolTaskScheduler statusCheckerThreadPoolTaskScheduler; private final ThreadPoolTaskScheduler statusCheckerThreadPoolTaskScheduler;
private final AgentConfigReader agentConfigReader; private final AgentConfigReader agentConfigReader;
private final DropBoxService dropBoxService;
private ScheduledFuture<?> scheduledFuture; private ScheduledFuture<?> scheduledFuture;
// StartupRunner 로 부터 실행됨 // StartupRunner 로 부터 실행됨
@ -39,6 +41,7 @@ public class StatusCheckerSchedulerService {
// statusCheckerThreadPoolTaskScheduler 가 실행함 // statusCheckerThreadPoolTaskScheduler 가 실행함
public void run() { public void run() {
log.info("status-check extra poll size: {}", dropBoxService.getPollSize());
List<AgentConfigDto.KnownAgent> knownAgentList = this.agentConfigReader.getAgentConfigDto().getKnownAgentList(); List<AgentConfigDto.KnownAgent> knownAgentList = this.agentConfigReader.getAgentConfigDto().getKnownAgentList();
if(knownAgentList != null) { if(knownAgentList != null) {
for(AgentConfigDto.KnownAgent knownAgent : knownAgentList) { for(AgentConfigDto.KnownAgent knownAgent : knownAgentList) {

Loading…
Cancel
Save