parent
1bf5433ec6
commit
76788090af
@ -0,0 +1,115 @@
|
||||
package com.bsmlab.dfx.agent.task;
|
||||
|
||||
import com.bsmlab.dfx.agent.config.AgentConfigDto;
|
||||
import com.bsmlab.dfx.agent.config.AgentConfigReader;
|
||||
import com.bsmlab.dfx.agent.config.datasource.SqlExecuteService;
|
||||
import com.bsmlab.dfx.agent.listener.dto.AckDto;
|
||||
import com.bsmlab.dfx.agent.listener.dto.ReceiveMessageDto;
|
||||
import com.bsmlab.dfx.agent.support.exception.DfxException;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.http.HttpEntity;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
|
||||
import org.springframework.scheduling.support.CronTrigger;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.web.client.RestTemplate;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
|
||||
@Service
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
public class PostmanSchedulerService {
|
||||
private final ThreadPoolTaskScheduler threadPoolTaskScheduler;
|
||||
private final AgentConfigReader agentConfigReader;
|
||||
private final SqlExecuteService sqlExecuteService;
|
||||
private Map<String, ScheduledFuture<?>> scheduledFutureMap = new HashMap<>();
|
||||
private Map<String, AgentConfigDto.Postman> postmanMap = new HashMap<>();
|
||||
|
||||
public void startPostman(String postmanId) {
|
||||
AgentConfigDto.Postman postman = this.postmanMap.get(postmanId);
|
||||
this.stop(postmanId);
|
||||
String cron = postman.getAction().getCron();
|
||||
ScheduledFuture<?> scheduledFuture = threadPoolTaskScheduler.schedule(() -> run(postman), new CronTrigger(cron));
|
||||
scheduledFutureMap.put(postmanId, scheduledFuture);
|
||||
}
|
||||
|
||||
private void stop(String postmanId) {
|
||||
if(this.scheduledFutureMap.get(postmanId) != null) {
|
||||
ScheduledFuture<?> scheduledFuture = this.scheduledFutureMap.get(postmanId);
|
||||
scheduledFuture.cancel(false);
|
||||
this.scheduledFutureMap.remove(postmanId);
|
||||
}
|
||||
}
|
||||
|
||||
@PostConstruct
|
||||
public void launch() {
|
||||
List<AgentConfigDto.Postman> postmanList = agentConfigReader.getScheduledTypePostmanList();
|
||||
for(AgentConfigDto.Postman postman : postmanList) {
|
||||
this.startPostman(postman.getPostmanId());
|
||||
}
|
||||
/*
|
||||
{
|
||||
"postman-id": "postman2",
|
||||
"task-type": "DB_READ_THEN_SEND",
|
||||
"action": {
|
||||
"type": "SCHEDULED",
|
||||
"cron": "1 0 3 * * *"
|
||||
},
|
||||
"message": {
|
||||
"message-type": "SAVE_DB_DATA",
|
||||
"dataSourceId": "dfcms",
|
||||
"sql-id": "dfcms.selectSome"
|
||||
},
|
||||
"recipient-host-id": "third-agent",
|
||||
"recipient-drop-box-id": "drop3",
|
||||
"routing-host-id-list": ["first-agent", "second-agent", "third-agent"]
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
public void run(AgentConfigDto.Postman postman) {
|
||||
ObjectMapper objectMapper = new ObjectMapper();
|
||||
String senderHostId = agentConfigReader.getAgentConfigDto().getHostId();
|
||||
long senderTimestamp = System.currentTimeMillis();
|
||||
String messageUuid = UUID.randomUUID().toString();
|
||||
// DB TO DB 전송
|
||||
if(AgentConfigDto.MessageType.SAVE_DB_DATA == postman.getMessage().getMessageType()) {
|
||||
String dataSourceId = postman.getMessage().getDataSourceId();
|
||||
String sqlId = postman.getMessage().getSqlId();
|
||||
try {
|
||||
List<Map<String, Object>> dataMapList = sqlExecuteService.select(dataSourceId, sqlId, null);
|
||||
String dataString = objectMapper.writeValueAsString(dataMapList);
|
||||
ReceiveMessageDto receiveMessageDto = ReceiveMessageDto.builder().senderHostId(senderHostId).senderTimestamp(senderTimestamp)
|
||||
.messageUuid(messageUuid).messageType(AgentConfigDto.MessageType.SAVE_DB_DATA)
|
||||
.recipientHostId(postman.getRecipientHostId()).recipientDropBoxId(postman.getRecipientDropBoxId())
|
||||
.data(dataString).build();
|
||||
HttpHeaders httpHeaders = new HttpHeaders();
|
||||
httpHeaders.setContentType(MediaType.APPLICATION_JSON);
|
||||
HttpEntity<ReceiveMessageDto> bodyEntity = new HttpEntity<>(receiveMessageDto, httpHeaders);
|
||||
RestTemplate restTemplate = new RestTemplate();
|
||||
AgentConfigDto.KnownAgent knownAgent = agentConfigReader.getKnownAgent(postman.getRecipientHostId());
|
||||
String url = "https://" + knownAgent.getHostName() + ":" + knownAgent.getListenPort() + "/listen";
|
||||
String response = restTemplate.postForObject(url, bodyEntity, String.class);
|
||||
AckDto ackDto = objectMapper.readValue(response, new TypeReference<AckDto>() {});
|
||||
if(AckDto.ResultType.RECEIVE_SUCCESS != ackDto.getResult()) {
|
||||
throw new DfxException(postman.getRecipientHostId() + "에게 전송하였으나 상대방이 수신하지 못하였습니다." + response);
|
||||
}
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new RuntimeException(e);
|
||||
} catch (DfxException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Reference in new issue