Skip to content

Commit b09dcc9

Browse files
Fix bug of warm index : FullFileCachedIndexInput was repeatedly closed (#20055)
Signed-off-by: Yongheng Liu <[email protected]>
1 parent 66ed5cb commit b09dcc9

File tree

4 files changed

+58
-9
lines changed

4 files changed

+58
-9
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1212
- Add support for missing proto fields in GRPC FunctionScore and Highlight ([#20169](https://github.com/opensearch-project/OpenSearch/pull/20169))
1313

1414
### Fixed
15+
- Fix bug of warm index: FullFileCachedIndexInput was closed error ([#20055](https://github.com/opensearch-project/OpenSearch/pull/20055))
1516
- Fix flaky test ClusterMaxMergesAtOnceIT.testClusterLevelDefaultUpdatesMergePolicy ([#18056](https://github.com/opensearch-project/OpenSearch/issues/18056))
1617
- Fix bug in Assertion framework(Yaml Rest test): numeric comparison fails when comparing Integer vs Long (or Float vs Double) ([#19376](https://github.com/opensearch-project/OpenSearch/pull/19376))
1718

server/src/main/java/org/opensearch/index/store/remote/filecache/FileCachedIndexInput.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
import java.io.IOException;
1515
import java.nio.file.Path;
16+
import java.util.concurrent.atomic.AtomicBoolean;
1617

1718
/**
1819
* Reference Counted IndexInput. The first FileCachedIndexInput for a file/block is called origin.
@@ -41,7 +42,7 @@ public class FileCachedIndexInput extends IndexInput implements RandomAccessInpu
4142
/** indicates if this IndexInput instance is a clone or not */
4243
protected final boolean isClone;
4344

44-
protected volatile boolean closed = false;
45+
protected final AtomicBoolean closed = new AtomicBoolean(false);
4546

4647
public FileCachedIndexInput(FileCache cache, Path filePath, IndexInput underlyingIndexInput) {
4748
this(cache, filePath, underlyingIndexInput, false);
@@ -139,7 +140,7 @@ public IndexInput slice(String sliceDescription, long offset, long length) throw
139140

140141
@Override
141142
public void close() throws IOException {
142-
if (!closed) {
143+
if (!closed.get()) {
143144
// if the underlying lucene index input is a clone,
144145
// the following line won't close/unmap the file.
145146
luceneIndexInput.close();
@@ -148,7 +149,7 @@ public void close() throws IOException {
148149
if (isClone) {
149150
cache.decRef(filePath);
150151
}
151-
closed = true;
152+
closed.set(true);
152153
}
153154
}
154155
}

server/src/main/java/org/opensearch/index/store/remote/filecache/FullFileCachedIndexInput.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.io.IOException;
1919
import java.lang.ref.Cleaner;
2020
import java.nio.file.Path;
21+
import java.util.concurrent.atomic.AtomicBoolean;
2122

2223
/**
2324
* Extension of {@link FileCachedIndexInput} for full files for handling clones and slices
@@ -37,7 +38,7 @@ public FullFileCachedIndexInput(FileCache cache, Path filePath, IndexInput under
3738

3839
public FullFileCachedIndexInput(FileCache cache, Path filePath, IndexInput underlyingIndexInput, boolean isClone) {
3940
super(cache, filePath, underlyingIndexInput, isClone);
40-
indexInputHolder = new IndexInputHolder(underlyingIndexInput, isClone, cache, filePath);
41+
indexInputHolder = new IndexInputHolder(closed, underlyingIndexInput, isClone, cache, filePath);
4142
CLEANER.register(this, indexInputHolder);
4243
}
4344

@@ -83,7 +84,7 @@ public IndexInput slice(String sliceDescription, long offset, long length) throw
8384
*/
8485
@Override
8586
public void close() throws IOException {
86-
if (!closed) {
87+
if (!closed.get()) {
8788
if (isClone) {
8889
cache.decRef(filePath);
8990
}
@@ -93,17 +94,26 @@ public void close() throws IOException {
9394
logger.trace("FullFileCachedIndexInput already closed");
9495
}
9596
luceneIndexInput = null;
96-
closed = true;
97+
closed.set(true);
9798
}
9899
}
99100

101+
/**
102+
* Run resource cleaning,To be used only in test
103+
*/
104+
public void indexInputHolderRun() {
105+
indexInputHolder.run();
106+
}
107+
100108
private static class IndexInputHolder implements Runnable {
109+
private final AtomicBoolean closed;
101110
private final IndexInput indexInput;
102111
private final FileCache cache;
103112
private final boolean isClone;
104113
private final Path path;
105114

106-
IndexInputHolder(IndexInput indexInput, boolean isClone, FileCache cache, Path path) {
115+
IndexInputHolder(AtomicBoolean closed, IndexInput indexInput, boolean isClone, FileCache cache, Path path) {
116+
this.closed = closed;
107117
this.indexInput = indexInput;
108118
this.isClone = isClone;
109119
this.cache = cache;
@@ -113,8 +123,11 @@ private static class IndexInputHolder implements Runnable {
113123
@Override
114124
public void run() {
115125
try {
116-
indexInput.close();
117-
if (isClone) cache.decRef(path);
126+
if (!closed.get()) {
127+
indexInput.close();
128+
if (isClone) cache.decRef(path);
129+
closed.set(true);
130+
}
118131
} catch (IOException e) {
119132
logger.error("Failed to close IndexInput while clearing phantom reachable object");
120133
}

server/src/test/java/org/opensearch/index/store/remote/filecache/FullFileCachedIndexInputTests.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,40 @@ public void testSlice() throws IOException {
7878
assertFalse(isActiveAndTotalUsageSame());
7979
}
8080

81+
public void testClose() throws IOException {
82+
setupIndexInputAndAddToFileCache();
83+
84+
// Since the file is already in cache and has refCount 1, activeUsage and totalUsage will be same
85+
assertTrue(isActiveAndTotalUsageSame());
86+
87+
fileCache.decRef(filePath);
88+
89+
// 3 Clones
90+
FullFileCachedIndexInput indexInputClone1 = fullFileCachedIndexInput.clone();
91+
FullFileCachedIndexInput indexInputClone2 = fullFileCachedIndexInput.clone();
92+
FullFileCachedIndexInput indexInputClone3 = fullFileCachedIndexInput.clone();
93+
94+
assertEquals((int) fileCache.getRef(filePath), 3);
95+
// Close Clone1, refCount -1
96+
indexInputClone1.close();
97+
assertEquals((int) fileCache.getRef(filePath), 2);
98+
// Mock GC resource cleaning, but the deRef function will not be called again.
99+
indexInputClone1.indexInputHolderRun();
100+
assertEquals((int) fileCache.getRef(filePath), 2);
101+
102+
// Mock GC resource cleaning, refCount -1
103+
indexInputClone2.indexInputHolderRun();
104+
assertEquals((int) fileCache.getRef(filePath), 1);
105+
// Close Clone2, but the deRef function will not be called again.
106+
indexInputClone2.close();
107+
assertEquals((int) fileCache.getRef(filePath), 1);
108+
109+
indexInputClone3.close();
110+
assertEquals((int) fileCache.getRef(filePath), 0);
111+
indexInputClone3.indexInputHolderRun();
112+
assertEquals((int) fileCache.getRef(filePath), 0);
113+
}
114+
81115
private void triggerGarbageCollectionAndAssertClonesClosed() {
82116
try {
83117
// Clones/Slices will be phantom reachable now, triggering gc should call close on them

0 commit comments

Comments
 (0)