Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@
import org.apache.iotdb.session.Session;
import org.apache.iotdb.session.pool.SessionPool;

import org.apache.thrift.TConfiguration;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;

import java.io.File;
Expand Down Expand Up @@ -1119,7 +1122,7 @@ public void shutdownForciblyAllDataNodes() {

@Override
public void ensureNodeStatus(
final List<BaseNodeWrapper> nodes, final List<NodeStatus> targetStatus)
final List<BaseNodeWrapper> nodes, final List<NodeStatus> targetStatusList)
throws IllegalStateException {
Throwable lastException = null;
for (int i = 0; i < retryCount; i++) {
Expand Down Expand Up @@ -1147,20 +1150,37 @@ public void ensureNodeStatus(
+ node.getClientRpcEndPoint().getPort(),
node.getDataNodeId()));
for (int j = 0; j < nodes.size(); j++) {
final String endpoint = nodes.get(j).getIpAndPortString();
BaseNodeWrapper nodeWrapper = nodes.get(j);
String ipAndPortString = nodeWrapper.getIpAndPortString();
final String endpoint = ipAndPortString;
if (!nodeIds.containsKey(endpoint)) {
// Node not exist
// Notice: Never modify this line, since the NodeLocation might be modified in IT
errorMessages.add("The node " + nodes.get(j).getIpAndPortString() + " is not found!");
continue;
}
final String status = showClusterResp.getNodeStatus().get(nodeIds.get(endpoint));
if (!targetStatus.get(j).getStatus().equals(status)) {
final NodeStatus targetStatus = targetStatusList.get(j);
if (!targetStatus.getStatus().equals(status)) {
// Error status
errorMessages.add(
String.format(
"Node %s is in status %s, but expected %s",
endpoint, status, targetStatus.get(j)));
endpoint, status, targetStatusList.get(j)));
continue;
}
if (nodeWrapper instanceof DataNodeWrapper && targetStatus.equals(NodeStatus.Running)) {
final String[] ipPort = nodeWrapper.getIpAndPortString().split(":");
final String ip = ipPort[0];
final int port = Integer.parseInt(ipPort[1]);
try (TSocket socket = new TSocket(new TConfiguration(), ip, port, 1000)) {
socket.open();
} catch (final TTransportException e) {
errorMessages.add(
String.format(
"DataNode %s is not reachable: %s",
nodeWrapper.getIpAndPortString(), e.getMessage()));
}
}
}
if (errorMessages.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -930,7 +930,7 @@ public List<TSStatus> setConfiguration(TSetConfigurationReq req) {
if (!targetDataNodes.isEmpty()) {
DataNodeAsyncRequestContext<Object, TSStatus> clientHandler =
new DataNodeAsyncRequestContext<>(
CnToDnAsyncRequestType.SET_CONFIGURATION, req, dataNodeLocationMap);
CnToDnAsyncRequestType.SET_CONFIGURATION, req, targetDataNodes);
CnToDnInternalServiceAsyncRequestManager.getInstance()
.sendAsyncRequestWithRetry(clientHandler);
responseList.addAll(clientHandler.getResponseList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class CompactionScheduleTaskManager implements IService {
ConcurrentHashMap.newKeySet();
private ReentrantLock lock = new ReentrantLock();
private volatile boolean init = false;
private volatile boolean isStoppingAllScheduleTask = false;

@Override
public void start() throws StartupException {
Expand All @@ -76,8 +77,13 @@ public void start() throws StartupException {
logger.info("Compaction schedule task manager started.");
}

public boolean isStoppingAllScheduleTask() {
return isStoppingAllScheduleTask;
}

public void stopCompactionScheduleTasks() throws InterruptedException {
lock.lock();
isStoppingAllScheduleTask = true;
try {
for (Future<Void> task : submitCompactionScheduleTaskFutures) {
task.cancel(true);
Expand Down Expand Up @@ -121,6 +127,7 @@ public void checkAndMayApplyConfigurationChange() throws InterruptedException {

public void startScheduleTasks() {
lock.lock();
isStoppingAllScheduleTask = false;
try {
// compaction selector
for (int workerId = 0; workerId < compactionSelectorNum; workerId++) {
Expand All @@ -144,6 +151,7 @@ public void startScheduleTasks() {
@Override
public void stop() {
lock.lock();
isStoppingAllScheduleTask = true;
try {
if (!init) {
return;
Expand All @@ -160,6 +168,7 @@ public void stop() {
@Override
public void waitAndStop(long milliseconds) {
lock.lock();
isStoppingAllScheduleTask = true;
try {
if (!init) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,26 @@ public Void call() {
dataRegion.executeCompaction();
}
} catch (InterruptedException ignored) {
boolean isStoppedByUser =
CompactionScheduleTaskManager.getInstance().isStoppingAllScheduleTask();
logger.info(
"[CompactionScheduleTaskWorker-{}] compaction schedule is interrupted", workerId);
return null;
"[CompactionScheduleTaskWorker-{}] compaction schedule is interrupted, isStopByUser: {}",
workerId,
isStoppedByUser);
if (isStoppedByUser) {
return null;
}
} catch (Exception e) {
logger.error(
"[CompactionScheduleTaskWorker-{}] Failed to execute compaction schedule task",
workerId,
e);
} catch (Throwable t) {
logger.error(
"[CompactionScheduleTaskWorker-{}] Failed to execute compaction schedule task and cannot recover",
workerId,
t);
throw t;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,21 @@ public Void call() throws Exception {
}
}
} catch (InterruptedException ignored) {
logger.info("[TTLCheckTask-{}] TTL checker is interrupted", workerId);
return null;
boolean isStoppedByUser =
CompactionScheduleTaskManager.getInstance().isStoppingAllScheduleTask();
logger.info(
"[TTLCheckTask-{}] TTL checker is interrupted, isStoppedByUser: {}",
workerId,
isStoppedByUser);
if (isStoppedByUser) {
return null;
}
} catch (Exception e) {
logger.error("[TTLCheckTask-{}] Failed to execute ttl check", workerId, e);
} catch (Throwable t) {
logger.error(
"[TTLCheckTask-{}] Failed to execute ttl check and cannot recover", workerId, t);
throw t;
}
}
}
Expand Down
Loading