diff --git a/src/main/java/com/bsmlab/dfx/agent/config/DfxAgentConfiguration.java b/src/main/java/com/bsmlab/dfx/agent/config/DfxAgentConfiguration.java index 789f2e0..8b8047a 100644 --- a/src/main/java/com/bsmlab/dfx/agent/config/DfxAgentConfiguration.java +++ b/src/main/java/com/bsmlab/dfx/agent/config/DfxAgentConfiguration.java @@ -205,24 +205,11 @@ public class DfxAgentConfiguration { @Bean(name = "scheduledPostmanThreadPoolTaskScheduler") public ThreadPoolTaskScheduler scheduledPostmanThreadPoolTaskScheduler() { ThreadPoolTaskScheduler scheduledPostmanThreadPoolTaskScheduler = new ThreadPoolTaskScheduler(); + scheduledPostmanThreadPoolTaskScheduler.setPoolSize(10); + scheduledPostmanThreadPoolTaskScheduler.setThreadNamePrefix("dynamic-scheduler-"); + scheduledPostmanThreadPoolTaskScheduler.initialize(); + return scheduledPostmanThreadPoolTaskScheduler; /** - import org.springframework.context.annotation.Bean; - import org.springframework.context.annotation.Configuration; - import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; - - @Configuration - public class SchedulerConfig { - - @Bean - public ThreadPoolTaskScheduler taskScheduler() { - ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); - scheduler.setPoolSize(5); - scheduler.setThreadNamePrefix("dynamic-scheduler-"); - scheduler.initialize(); - return scheduler; - } - } - import org.springframework.scheduling.Trigger; import org.springframework.scheduling.TriggerContext; import org.springframework.scheduling.support.CronTrigger; @@ -292,6 +279,5 @@ public class DfxAgentConfiguration { */ - return scheduledPostmanThreadPoolTaskScheduler; } } diff --git a/src/main/java/com/bsmlab/dfx/agent/task/PostmanSchedulerService.java b/src/main/java/com/bsmlab/dfx/agent/task/PostmanSchedulerService.java new file mode 100644 index 0000000..df127e5 --- /dev/null +++ b/src/main/java/com/bsmlab/dfx/agent/task/PostmanSchedulerService.java @@ -0,0 +1,115 @@ +package com.bsmlab.dfx.agent.task; + +import com.bsmlab.dfx.agent.config.AgentConfigDto; +import com.bsmlab.dfx.agent.config.AgentConfigReader; +import com.bsmlab.dfx.agent.config.datasource.SqlExecuteService; +import com.bsmlab.dfx.agent.listener.dto.AckDto; +import com.bsmlab.dfx.agent.listener.dto.ReceiveMessageDto; +import com.bsmlab.dfx.agent.support.exception.DfxException; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import jakarta.annotation.PostConstruct; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.http.MediaType; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; +import org.springframework.scheduling.support.CronTrigger; +import org.springframework.stereotype.Service; +import org.springframework.web.client.RestTemplate; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ScheduledFuture; + +@Service +@Slf4j +@RequiredArgsConstructor +public class PostmanSchedulerService { + private final ThreadPoolTaskScheduler threadPoolTaskScheduler; + private final AgentConfigReader agentConfigReader; + private final SqlExecuteService sqlExecuteService; + private Map> scheduledFutureMap = new HashMap<>(); + private Map postmanMap = new HashMap<>(); + + public void startPostman(String postmanId) { + AgentConfigDto.Postman postman = this.postmanMap.get(postmanId); + this.stop(postmanId); + String cron = postman.getAction().getCron(); + ScheduledFuture scheduledFuture = threadPoolTaskScheduler.schedule(() -> run(postman), new CronTrigger(cron)); + scheduledFutureMap.put(postmanId, scheduledFuture); + } + + private void stop(String postmanId) { + if(this.scheduledFutureMap.get(postmanId) != null) { + ScheduledFuture scheduledFuture = this.scheduledFutureMap.get(postmanId); + scheduledFuture.cancel(false); + this.scheduledFutureMap.remove(postmanId); + } + } + + @PostConstruct + public void launch() { + List postmanList = agentConfigReader.getScheduledTypePostmanList(); + for(AgentConfigDto.Postman postman : postmanList) { + this.startPostman(postman.getPostmanId()); + } + /* + { + "postman-id": "postman2", + "task-type": "DB_READ_THEN_SEND", + "action": { + "type": "SCHEDULED", + "cron": "1 0 3 * * *" + }, + "message": { + "message-type": "SAVE_DB_DATA", + "dataSourceId": "dfcms", + "sql-id": "dfcms.selectSome" + }, + "recipient-host-id": "third-agent", + "recipient-drop-box-id": "drop3", + "routing-host-id-list": ["first-agent", "second-agent", "third-agent"] + } + */ + } + + public void run(AgentConfigDto.Postman postman) { + ObjectMapper objectMapper = new ObjectMapper(); + String senderHostId = agentConfigReader.getAgentConfigDto().getHostId(); + long senderTimestamp = System.currentTimeMillis(); + String messageUuid = UUID.randomUUID().toString(); + // DB TO DB 전송 + if(AgentConfigDto.MessageType.SAVE_DB_DATA == postman.getMessage().getMessageType()) { + String dataSourceId = postman.getMessage().getDataSourceId(); + String sqlId = postman.getMessage().getSqlId(); + try { + List> dataMapList = sqlExecuteService.select(dataSourceId, sqlId, null); + String dataString = objectMapper.writeValueAsString(dataMapList); + ReceiveMessageDto receiveMessageDto = ReceiveMessageDto.builder().senderHostId(senderHostId).senderTimestamp(senderTimestamp) + .messageUuid(messageUuid).messageType(AgentConfigDto.MessageType.SAVE_DB_DATA) + .recipientHostId(postman.getRecipientHostId()).recipientDropBoxId(postman.getRecipientDropBoxId()) + .data(dataString).build(); + HttpHeaders httpHeaders = new HttpHeaders(); + httpHeaders.setContentType(MediaType.APPLICATION_JSON); + HttpEntity bodyEntity = new HttpEntity<>(receiveMessageDto, httpHeaders); + RestTemplate restTemplate = new RestTemplate(); + AgentConfigDto.KnownAgent knownAgent = agentConfigReader.getKnownAgent(postman.getRecipientHostId()); + String url = "https://" + knownAgent.getHostName() + ":" + knownAgent.getListenPort() + "/listen"; + String response = restTemplate.postForObject(url, bodyEntity, String.class); + AckDto ackDto = objectMapper.readValue(response, new TypeReference() {}); + if(AckDto.ResultType.RECEIVE_SUCCESS != ackDto.getResult()) { + throw new DfxException(postman.getRecipientHostId() + "에게 전송하였으나 상대방이 수신하지 못하였습니다." + response); + } + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } catch (DfxException e) { + throw new RuntimeException(e); + } + } + } +}