From e0ea8cff19651498ac511e8ad0923c72d10c5174 Mon Sep 17 00:00:00 2001 From: icksishu Date: Sat, 10 May 2025 08:35:56 +0900 Subject: [PATCH] =?UTF-8?q?bsm-lab/dfxagent#2=20ALIVE=20=EB=A9=94=EC=8B=9C?= =?UTF-8?q?=EC=A7=80=20=EC=86=A1=EC=8B=A0=20=EA=B8=B0=EB=8A=A5=20=EC=99=84?= =?UTF-8?q?=EB=A3=8C=20=20-=20StatusCheckerSchedulerService=20:=20?= =?UTF-8?q?=EC=A3=BC=EA=B8=B0=EC=A0=81=EC=9C=BC=EB=A1=9C=20=EC=83=81?= =?UTF-8?q?=ED=83=9C=20=ED=99=95=EC=9D=B8=ED=95=A8=20=20-=20Postman=20?= =?UTF-8?q?=EB=93=B1=20=EB=A9=94=EC=8B=9C=EC=A7=80=EB=A5=BC=20=EC=A0=84?= =?UTF-8?q?=EC=86=A1=ED=95=A0=20=EB=95=8C=20=EC=83=81=ED=83=9C=EB=A5=BC=20?= =?UTF-8?q?=ED=99=95=EC=9D=B8=ED=95=98=EA=B3=A0=20=EC=A0=84=EC=86=A1?= =?UTF-8?q?=ED=95=98=EB=8A=94=20=EA=B2=83=EC=9C=BC=EB=A1=9C=20=EC=88=98?= =?UTF-8?q?=EC=A0=95=ED=95=98=EC=97=AC=EC=95=BC=20=ED=95=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dfxagent-bd-test-cubrid-local.json | 3 + .../dfxagent-bd-test-cubrid.json | 3 + .../dfxagent-bd-test-oracle-local.json | 3 + .../dfxagent-bd-test-oracle.json | 3 + .../dfx/agent/config/AgentConfigDto.java | 6 ++ .../dfx/agent/config/AgentConfigReader.java | 18 +++++ .../agent/config/DfxAgentConfiguration.java | 10 +++ .../dfx/agent/config/StartupRunner.java | 17 ++-- .../status/StatusCheckerSchedulerService.java | 79 +++++++++++++++++++ 9 files changed, 134 insertions(+), 8 deletions(-) create mode 100644 src/main/java/com/bsmlab/dfx/agent/task/status/StatusCheckerSchedulerService.java diff --git a/src/docs/settings-examples/dfxagent-bd-test-cubrid-local.json b/src/docs/settings-examples/dfxagent-bd-test-cubrid-local.json index 5637d12..3779e8a 100644 --- a/src/docs/settings-examples/dfxagent-bd-test-cubrid-local.json +++ b/src/docs/settings-examples/dfxagent-bd-test-cubrid-local.json @@ -10,6 +10,9 @@ "dropBoxIdList": [] } ], + "statusChecker": { + "cron": "0 0/10 * * * *" + }, "dataSourceConfig": [ { "dataSourceId": "cubrid", diff --git a/src/docs/settings-examples/dfxagent-bd-test-cubrid.json b/src/docs/settings-examples/dfxagent-bd-test-cubrid.json index 8a42839..85f3db5 100644 --- a/src/docs/settings-examples/dfxagent-bd-test-cubrid.json +++ b/src/docs/settings-examples/dfxagent-bd-test-cubrid.json @@ -4,6 +4,9 @@ "myListenPort": 17801, "knownAgentList": [ ], + "statusChecker": { + "cron": "0 0/10 * * * *" + }, "dataSourceConfig": [ { "dataSourceId": "cubrid", diff --git a/src/docs/settings-examples/dfxagent-bd-test-oracle-local.json b/src/docs/settings-examples/dfxagent-bd-test-oracle-local.json index c23173c..dbe38b9 100644 --- a/src/docs/settings-examples/dfxagent-bd-test-oracle-local.json +++ b/src/docs/settings-examples/dfxagent-bd-test-oracle-local.json @@ -12,6 +12,9 @@ ] } ], + "statusChecker": { + "cron": "0 0/10 * * * *" + }, "dataSourceConfig": [ { "dataSourceId": "ds-oracle", diff --git a/src/docs/settings-examples/dfxagent-bd-test-oracle.json b/src/docs/settings-examples/dfxagent-bd-test-oracle.json index f4f9e62..969425d 100644 --- a/src/docs/settings-examples/dfxagent-bd-test-oracle.json +++ b/src/docs/settings-examples/dfxagent-bd-test-oracle.json @@ -10,6 +10,9 @@ "dropBoxIdList": ["db-bd-cubrid-save"] } ], + "statusChecker": { + "cron": "0 0/10 * * * *" + }, "dataSourceConfig": [ { "dataSourceId": "ds-oracle", diff --git a/src/main/java/com/bsmlab/dfx/agent/config/AgentConfigDto.java b/src/main/java/com/bsmlab/dfx/agent/config/AgentConfigDto.java index 50eca7c..12be142 100644 --- a/src/main/java/com/bsmlab/dfx/agent/config/AgentConfigDto.java +++ b/src/main/java/com/bsmlab/dfx/agent/config/AgentConfigDto.java @@ -11,6 +11,7 @@ public class AgentConfigDto { private String myHostId; private int myListenPort; private List knownAgentList; + private StatusChecker statusChecker; private List dataSourceConfig; private List sqlMapperLocations; private DropBoxConfig dropBox; @@ -27,6 +28,11 @@ public class AgentConfigDto { private List dropBoxIdList; } + @Data + public static class StatusChecker { + private String cron; + } + @Data public static class DataSourceConfig { private String dataSourceId; diff --git a/src/main/java/com/bsmlab/dfx/agent/config/AgentConfigReader.java b/src/main/java/com/bsmlab/dfx/agent/config/AgentConfigReader.java index 498b958..04aad7f 100644 --- a/src/main/java/com/bsmlab/dfx/agent/config/AgentConfigReader.java +++ b/src/main/java/com/bsmlab/dfx/agent/config/AgentConfigReader.java @@ -9,18 +9,28 @@ import org.springframework.stereotype.Component; import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; @Slf4j @Component @Getter public class AgentConfigReader { private AgentConfigDto agentConfigDto = null; + private Map knownAgentStatusMap; public void loadConfigFile(String configFilePath) { try { ObjectMapper objectMapper = new ObjectMapper(); this.agentConfigDto = objectMapper.readValue(new File(configFilePath), AgentConfigDto.class); + this.knownAgentStatusMap = new HashMap<>(); + List knownAgentList = this.agentConfigDto.getKnownAgentList(); + if(knownAgentList != null) { + for(AgentConfigDto.KnownAgent knownAgent : knownAgentList) { + this.knownAgentStatusMap.put(knownAgent.getHostId(), "DOWN"); + } + } } catch (DatabindException e) { log.error("cannot parse a setting file. {}", configFilePath, e); log.error(e.getMessage(), e); @@ -30,6 +40,14 @@ public class AgentConfigReader { } } + public synchronized String getKnownAgentStatus(String knownAgentHostId) { + return this.knownAgentStatusMap.get(knownAgentHostId); + } + + public synchronized void setKnownAgentStatus(String knownAgentHostId, String knownAgentStatus) { + this.knownAgentStatusMap.put(knownAgentHostId, knownAgentStatus); + } + public AgentConfigDto.DropBox getDropBox(String dropBoxId) { AgentConfigDto.DropBox found = null; for(AgentConfigDto.DropBox dropBox : this.agentConfigDto.getDropBox().getDropBoxList()) { diff --git a/src/main/java/com/bsmlab/dfx/agent/config/DfxAgentConfiguration.java b/src/main/java/com/bsmlab/dfx/agent/config/DfxAgentConfiguration.java index ce638dc..91c5057 100644 --- a/src/main/java/com/bsmlab/dfx/agent/config/DfxAgentConfiguration.java +++ b/src/main/java/com/bsmlab/dfx/agent/config/DfxAgentConfiguration.java @@ -203,4 +203,14 @@ public class DfxAgentConfiguration { scheduledPostmanThreadPoolTaskScheduler.initialize(); return scheduledPostmanThreadPoolTaskScheduler; } + + // 연결된 agent 상태 확인 쓰레드 설정 + @Bean(name = "statusCheckerThreadPoolTaskScheduler") + public ThreadPoolTaskScheduler statusCheckerThreadPoolTaskScheduler() { + ThreadPoolTaskScheduler statusCheckerThreadPoolTaskScheduler = new ThreadPoolTaskScheduler(); + statusCheckerThreadPoolTaskScheduler.setPoolSize(1); + statusCheckerThreadPoolTaskScheduler.setThreadNamePrefix("status-checker-"); + statusCheckerThreadPoolTaskScheduler.initialize(); + return statusCheckerThreadPoolTaskScheduler; + } } diff --git a/src/main/java/com/bsmlab/dfx/agent/config/StartupRunner.java b/src/main/java/com/bsmlab/dfx/agent/config/StartupRunner.java index fd66e30..a2a7900 100644 --- a/src/main/java/com/bsmlab/dfx/agent/config/StartupRunner.java +++ b/src/main/java/com/bsmlab/dfx/agent/config/StartupRunner.java @@ -3,6 +3,7 @@ package com.bsmlab.dfx.agent.config; import com.bsmlab.dfx.agent.config.datasource.DynamicDataSourceService; import com.bsmlab.dfx.agent.task.dropbox.DropBoxTaskExecutorService; import com.bsmlab.dfx.agent.task.postman.PostmanSchedulerService; +import com.bsmlab.dfx.agent.task.status.StatusCheckerSchedulerService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.ApplicationArguments; @@ -17,21 +18,21 @@ public class StartupRunner implements ApplicationRunner { private final DynamicDataSourceService dynamicDataSourceService; private final PostmanSchedulerService postmanSchedulerService; private final DropBoxTaskExecutorService dropBoxTaskExecutorService; + private final StatusCheckerSchedulerService statusCheckerSchedulerService; @Override public void run(ApplicationArguments args) throws Exception { - System.out.println("✅ StartupRunner.run() 호출됨"); + log.debug("✅ StartupRunner start [run]"); // DfxAgentConfiguration 에서 생성한 빈 중 DataSource 관련 설정을 마무리한다. dynamicDataSourceService.setSqlSessionFactoryMap(dfxAgentConfiguration.getTemporarySqlSessionFactoryMap()); dynamicDataSourceService.setTransactionManagerMap(dfxAgentConfiguration.getTemporaryTransactionManagerMap()); - - // PostmanSchedulerService, DropBoxTaskExecutorService 는 @RequiredArgsConstructor 이기 때문에 자동으로 injection 된다 - // 그 후 @PostConstruct 로직이 실행될 것이다. - log.debug("StartupRunner start [run]"); - log.debug("{} ready", postmanSchedulerService.getClass().getName()); - log.debug("{} ready", dropBoxTaskExecutorService.getClass().getName()); - //TODO 4. Worker 쓰레드 생성 + //4. Worker 쓰레드 생성 + // 송신 메시지 전송 서비스 postmanSchedulerService.launch(); + // 수신한 메시지 처리 서비스 dropBoxTaskExecutorService.launch(); + // 다른 agent 상태 확인 서비스 + statusCheckerSchedulerService.launch(); + log.debug("✅ StartupRunner end"); } } diff --git a/src/main/java/com/bsmlab/dfx/agent/task/status/StatusCheckerSchedulerService.java b/src/main/java/com/bsmlab/dfx/agent/task/status/StatusCheckerSchedulerService.java new file mode 100644 index 0000000..86de1b8 --- /dev/null +++ b/src/main/java/com/bsmlab/dfx/agent/task/status/StatusCheckerSchedulerService.java @@ -0,0 +1,79 @@ +package com.bsmlab.dfx.agent.task.status; + +import com.bsmlab.dfx.agent.config.AgentConfigDto; +import com.bsmlab.dfx.agent.config.AgentConfigReader; +import com.bsmlab.dfx.agent.listener.dto.AckDto; +import com.bsmlab.dfx.agent.listener.dto.CommandDto; +import com.bsmlab.dfx.agent.listener.dto.ReceiveMessageDto; +import com.bsmlab.dfx.agent.support.exception.DfxException; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.http.MediaType; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; +import org.springframework.scheduling.support.CronTrigger; +import org.springframework.stereotype.Service; +import org.springframework.web.client.RestTemplate; + +import java.util.HashMap; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ScheduledFuture; + +@Service +@Slf4j +@RequiredArgsConstructor +public class StatusCheckerSchedulerService { + private final ThreadPoolTaskScheduler statusCheckerThreadPoolTaskScheduler; + private final AgentConfigReader agentConfigReader; + private ScheduledFuture scheduledFuture; + + // StartupRunner 로 부터 실행됨 + public void launch() { + log.debug("StatusCheckerSchedulerService launch"); + String cron = agentConfigReader.getAgentConfigDto().getStatusChecker().getCron(); + this.scheduledFuture = statusCheckerThreadPoolTaskScheduler.schedule(() -> run(), new CronTrigger(cron)); + } + + // statusCheckerThreadPoolTaskScheduler 가 실행함 + public void run() { + List knownAgentList = this.agentConfigReader.getAgentConfigDto().getKnownAgentList(); + if(knownAgentList != null) { + for(AgentConfigDto.KnownAgent knownAgent : knownAgentList) { + String messageUuid = UUID.randomUUID().toString(); + CommandDto commandDto = CommandDto.builder().commandType(CommandDto.CommandType.ALIVE).messageUuid(messageUuid).build(); + HttpHeaders httpHeaders = new HttpHeaders(); + httpHeaders.setContentType(MediaType.APPLICATION_JSON); + HttpEntity bodyEntity = new HttpEntity<>(commandDto, httpHeaders); + RestTemplate restTemplate = new RestTemplate(); + String url = "http://" + knownAgent.getHostName() + ":" + knownAgent.getListenPort() + "/command"; + log.debug("StatusChecker to {} send a message UUID {}", knownAgent.getHostName(), messageUuid); + String response = restTemplate.postForObject(url, bodyEntity, String.class); + ObjectMapper objectMapper = new ObjectMapper(); + AckDto ackDto = null; + try { + ackDto = objectMapper.readValue(response, new TypeReference() {}); + log.debug("StatusChecker received ack from {} ack: {}", knownAgent.getHostName(), ackDto); + if(AckDto.ResultType.PROCESS_SUCCESS == ackDto.getResult()) { + // known agent is alive + this.agentConfigReader.setKnownAgentStatus(knownAgent.getHostId(), "ALIVE"); + log.debug("known agent {} {} is ALIVE", knownAgent.getHostId(), knownAgent.getHostName()); + } + else { + // known agent is down + this.agentConfigReader.setKnownAgentStatus(knownAgent.getHostId(), "DOWN"); + log.debug("known agent {} {} is DOWN", knownAgent.getHostId(), knownAgent.getHostName()); + } + } catch (JsonProcessingException e) { + this.agentConfigReader.setKnownAgentStatus(knownAgent.getHostId(), "DOWN"); + log.debug("known agent {} {} alive message is not valid then set DOWN", knownAgent.getHostId(), knownAgent.getHostName()); + } + + } + } + } +}