Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode;
import org.apache.iotdb.db.storageengine.load.active.ActiveLoadAgent;
import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
import org.apache.iotdb.db.trigger.executor.TriggerExecutor;
Expand Down Expand Up @@ -223,6 +224,8 @@ protected void start() {
ConfigNodeInfo.getInstance().storeConfigNodeList();
// Register this DataNode to the cluster when first start
sendRegisterRequestToConfigNode(false);
// Clean up active load listening directories on first startup
ActiveLoadAgent.cleanupListeningDirectories();
} else {
// Send restart request of this DataNode
sendRestartRequestToConfigNode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,26 @@

package org.apache.iotdb.db.storageengine.load.active;

import org.apache.iotdb.db.conf.IoTDBDescriptor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;

public class ActiveLoadAgent {

private static final Logger LOGGER = LoggerFactory.getLogger(ActiveLoadAgent.class);

private final ActiveLoadTsFileLoader activeLoadTsFileLoader;
private final ActiveLoadDirScanner activeLoadDirScanner;
private final ActiveLoadMetricsCollector activeLoadMetricsCollector;
Expand Down Expand Up @@ -48,4 +66,108 @@ public synchronized void start() {
activeLoadDirScanner.start();
activeLoadMetricsCollector.start();
}

/**
* Clean up all listening directories for active load on DataNode first startup. This method will
* clean up all files and subdirectories in the listening directories, including: 1. Pending
* directories (configured by load_active_listening_dirs) 2. Pipe directory (for pipe data sync)
* 3. Failed directory (for failed files)
*/
public static void cleanupListeningDirectories() {
try {
final Set<String> dirsToClean = new HashSet<>();

try {
// Add configured listening dirs
if (IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningEnable()) {
dirsToClean.addAll(
Arrays.asList(
IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningDirs()));
}

// Add pipe dir
dirsToClean.add(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningPipeDir());

// Add failed dir
dirsToClean.add(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningFailDir());
} catch (Exception e) {
LOGGER.warn("Failed to get active load listening directories configuration", e);
return;
}

int totalFilesDeleted = 0;
int totalSubDirsDeleted = 0;

for (final String dirPath : dirsToClean) {
try {
final File dir = new File(dirPath);

if (!dir.exists() || !dir.isDirectory()) {
continue;
}

// Convert to absolute path for comparison
final String absoluteDirPath = dir.getAbsolutePath();

final long[] fileCount = {0};
final long[] subdirCount = {0};

Files.walkFileTree(
dir.toPath(),
new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
try {
Files.delete(file);
fileCount[0]++;
} catch (Exception e) {
LOGGER.debug("Failed to delete file: {}", file.toAbsolutePath(), e);
}
return FileVisitResult.CONTINUE;
}

@Override
public FileVisitResult postVisitDirectory(Path subDir, IOException exc) {
if (exc != null) {
LOGGER.debug(
"Error occurred while visiting directory: {}",
subDir.toAbsolutePath(),
exc);
return FileVisitResult.CONTINUE;
}
if (!subDir.toFile().getAbsolutePath().equals(absoluteDirPath)) {
try {
Files.delete(subDir);
subdirCount[0]++;
} catch (Exception e) {
LOGGER.debug("Failed to delete directory: {}", subDir.toAbsolutePath(), e);
}
}
return FileVisitResult.CONTINUE;
}

@Override
public FileVisitResult visitFileFailed(Path file, IOException exc) {
LOGGER.debug("Failed to visit file: {}", file.toAbsolutePath(), exc);
return FileVisitResult.CONTINUE;
}
});

totalFilesDeleted += fileCount[0];
totalSubDirsDeleted += subdirCount[0];
} catch (Exception e) {
LOGGER.warn("Failed to cleanup directory: {}", dirPath, e);
}
}

if (totalFilesDeleted > 0 || totalSubDirsDeleted > 0) {
LOGGER.info(
"Cleaned up active load listening directories, deleted {} files and {} subdirectories",
totalFilesDeleted,
totalSubDirsDeleted);
}
} catch (Throwable t) {
LOGGER.warn("Unexpected error during cleanup of active load listening directories", t);
}
}
}
Loading