ALIVE 메시지 송신 기능 완료
 - StatusCheckerSchedulerService : 주기적으로 상태 확인함
 - Postman 등 메시지를 전송할 때 상태를 확인하고 전송하는 것으로 수정하여야 함
main
icksishu 8 months ago
parent 30c4e43447
commit e0ea8cff19

@ -10,6 +10,9 @@
"dropBoxIdList": []
}
],
"statusChecker": {
"cron": "0 0/10 * * * *"
},
"dataSourceConfig": [
{
"dataSourceId": "cubrid",

@ -4,6 +4,9 @@
"myListenPort": 17801,
"knownAgentList": [
],
"statusChecker": {
"cron": "0 0/10 * * * *"
},
"dataSourceConfig": [
{
"dataSourceId": "cubrid",

@ -12,6 +12,9 @@
]
}
],
"statusChecker": {
"cron": "0 0/10 * * * *"
},
"dataSourceConfig": [
{
"dataSourceId": "ds-oracle",

@ -10,6 +10,9 @@
"dropBoxIdList": ["db-bd-cubrid-save"]
}
],
"statusChecker": {
"cron": "0 0/10 * * * *"
},
"dataSourceConfig": [
{
"dataSourceId": "ds-oracle",

@ -11,6 +11,7 @@ public class AgentConfigDto {
private String myHostId;
private int myListenPort;
private List<KnownAgent> knownAgentList;
private StatusChecker statusChecker;
private List<DataSourceConfig> dataSourceConfig;
private List<String> sqlMapperLocations;
private DropBoxConfig dropBox;
@ -27,6 +28,11 @@ public class AgentConfigDto {
private List<String> dropBoxIdList;
}
@Data
public static class StatusChecker {
private String cron;
}
@Data
public static class DataSourceConfig {
private String dataSourceId;

@ -9,18 +9,28 @@ import org.springframework.stereotype.Component;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Slf4j
@Component
@Getter
public class AgentConfigReader {
private AgentConfigDto agentConfigDto = null;
private Map<String, String> knownAgentStatusMap;
public void loadConfigFile(String configFilePath) {
try {
ObjectMapper objectMapper = new ObjectMapper();
this.agentConfigDto = objectMapper.readValue(new File(configFilePath), AgentConfigDto.class);
this.knownAgentStatusMap = new HashMap<>();
List<AgentConfigDto.KnownAgent> knownAgentList = this.agentConfigDto.getKnownAgentList();
if(knownAgentList != null) {
for(AgentConfigDto.KnownAgent knownAgent : knownAgentList) {
this.knownAgentStatusMap.put(knownAgent.getHostId(), "DOWN");
}
}
} catch (DatabindException e) {
log.error("cannot parse a setting file. {}", configFilePath, e);
log.error(e.getMessage(), e);
@ -30,6 +40,14 @@ public class AgentConfigReader {
}
}
public synchronized String getKnownAgentStatus(String knownAgentHostId) {
return this.knownAgentStatusMap.get(knownAgentHostId);
}
public synchronized void setKnownAgentStatus(String knownAgentHostId, String knownAgentStatus) {
this.knownAgentStatusMap.put(knownAgentHostId, knownAgentStatus);
}
public AgentConfigDto.DropBox getDropBox(String dropBoxId) {
AgentConfigDto.DropBox found = null;
for(AgentConfigDto.DropBox dropBox : this.agentConfigDto.getDropBox().getDropBoxList()) {

@ -203,4 +203,14 @@ public class DfxAgentConfiguration {
scheduledPostmanThreadPoolTaskScheduler.initialize();
return scheduledPostmanThreadPoolTaskScheduler;
}
// 연결된 agent 상태 확인 쓰레드 설정
@Bean(name = "statusCheckerThreadPoolTaskScheduler")
public ThreadPoolTaskScheduler statusCheckerThreadPoolTaskScheduler() {
ThreadPoolTaskScheduler statusCheckerThreadPoolTaskScheduler = new ThreadPoolTaskScheduler();
statusCheckerThreadPoolTaskScheduler.setPoolSize(1);
statusCheckerThreadPoolTaskScheduler.setThreadNamePrefix("status-checker-");
statusCheckerThreadPoolTaskScheduler.initialize();
return statusCheckerThreadPoolTaskScheduler;
}
}

@ -3,6 +3,7 @@ package com.bsmlab.dfx.agent.config;
import com.bsmlab.dfx.agent.config.datasource.DynamicDataSourceService;
import com.bsmlab.dfx.agent.task.dropbox.DropBoxTaskExecutorService;
import com.bsmlab.dfx.agent.task.postman.PostmanSchedulerService;
import com.bsmlab.dfx.agent.task.status.StatusCheckerSchedulerService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
@ -17,21 +18,21 @@ public class StartupRunner implements ApplicationRunner {
private final DynamicDataSourceService dynamicDataSourceService;
private final PostmanSchedulerService postmanSchedulerService;
private final DropBoxTaskExecutorService dropBoxTaskExecutorService;
private final StatusCheckerSchedulerService statusCheckerSchedulerService;
@Override
public void run(ApplicationArguments args) throws Exception {
System.out.println("✅ StartupRunner.run() 호출됨");
log.debug("✅ StartupRunner start [run]");
// DfxAgentConfiguration 에서 생성한 빈 중 DataSource 관련 설정을 마무리한다.
dynamicDataSourceService.setSqlSessionFactoryMap(dfxAgentConfiguration.getTemporarySqlSessionFactoryMap());
dynamicDataSourceService.setTransactionManagerMap(dfxAgentConfiguration.getTemporaryTransactionManagerMap());
// PostmanSchedulerService, DropBoxTaskExecutorService 는 @RequiredArgsConstructor 이기 때문에 자동으로 injection 된다
// 그 후 @PostConstruct 로직이 실행될 것이다.
log.debug("StartupRunner start [run]");
log.debug("{} ready", postmanSchedulerService.getClass().getName());
log.debug("{} ready", dropBoxTaskExecutorService.getClass().getName());
//TODO 4. Worker 쓰레드 생성
//4. Worker 쓰레드 생성
// 송신 메시지 전송 서비스
postmanSchedulerService.launch();
// 수신한 메시지 처리 서비스
dropBoxTaskExecutorService.launch();
// 다른 agent 상태 확인 서비스
statusCheckerSchedulerService.launch();
log.debug("✅ StartupRunner end");
}
}

@ -0,0 +1,79 @@
package com.bsmlab.dfx.agent.task.status;
import com.bsmlab.dfx.agent.config.AgentConfigDto;
import com.bsmlab.dfx.agent.config.AgentConfigReader;
import com.bsmlab.dfx.agent.listener.dto.AckDto;
import com.bsmlab.dfx.agent.listener.dto.CommandDto;
import com.bsmlab.dfx.agent.listener.dto.ReceiveMessageDto;
import com.bsmlab.dfx.agent.support.exception.DfxException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ScheduledFuture;
@Service
@Slf4j
@RequiredArgsConstructor
public class StatusCheckerSchedulerService {
private final ThreadPoolTaskScheduler statusCheckerThreadPoolTaskScheduler;
private final AgentConfigReader agentConfigReader;
private ScheduledFuture<?> scheduledFuture;
// StartupRunner 로 부터 실행됨
public void launch() {
log.debug("StatusCheckerSchedulerService launch");
String cron = agentConfigReader.getAgentConfigDto().getStatusChecker().getCron();
this.scheduledFuture = statusCheckerThreadPoolTaskScheduler.schedule(() -> run(), new CronTrigger(cron));
}
// statusCheckerThreadPoolTaskScheduler 가 실행함
public void run() {
List<AgentConfigDto.KnownAgent> knownAgentList = this.agentConfigReader.getAgentConfigDto().getKnownAgentList();
if(knownAgentList != null) {
for(AgentConfigDto.KnownAgent knownAgent : knownAgentList) {
String messageUuid = UUID.randomUUID().toString();
CommandDto commandDto = CommandDto.builder().commandType(CommandDto.CommandType.ALIVE).messageUuid(messageUuid).build();
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<CommandDto> bodyEntity = new HttpEntity<>(commandDto, httpHeaders);
RestTemplate restTemplate = new RestTemplate();
String url = "http://" + knownAgent.getHostName() + ":" + knownAgent.getListenPort() + "/command";
log.debug("StatusChecker to {} send a message UUID {}", knownAgent.getHostName(), messageUuid);
String response = restTemplate.postForObject(url, bodyEntity, String.class);
ObjectMapper objectMapper = new ObjectMapper();
AckDto ackDto = null;
try {
ackDto = objectMapper.readValue(response, new TypeReference<AckDto>() {});
log.debug("StatusChecker received ack from {} ack: {}", knownAgent.getHostName(), ackDto);
if(AckDto.ResultType.PROCESS_SUCCESS == ackDto.getResult()) {
// known agent is alive
this.agentConfigReader.setKnownAgentStatus(knownAgent.getHostId(), "ALIVE");
log.debug("known agent {} {} is ALIVE", knownAgent.getHostId(), knownAgent.getHostName());
}
else {
// known agent is down
this.agentConfigReader.setKnownAgentStatus(knownAgent.getHostId(), "DOWN");
log.debug("known agent {} {} is DOWN", knownAgent.getHostId(), knownAgent.getHostName());
}
} catch (JsonProcessingException e) {
this.agentConfigReader.setKnownAgentStatus(knownAgent.getHostId(), "DOWN");
log.debug("known agent {} {} alive message is not valid then set DOWN", knownAgent.getHostId(), knownAgent.getHostName());
}
}
}
}
}
Loading…
Cancel
Save