Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2091,8 +2091,8 @@ public ContainerCommandResponseProto readBlock(
return malformedRequest(request);
}
try {
readBlockImpl(request, blockFile, kvContainer, streamObserver, false);
// TODO metrics.incContainerBytesStats(Type.ReadBlock, readBlock.getLen());
final long bytesRead = readBlockImpl(request, blockFile, kvContainer, streamObserver, false);
metrics.incContainerBytesStats(Type.ReadBlock, bytesRead);
} catch (StorageContainerException ex) {
responseProto = ContainerUtils.logAndReturnError(LOG, ex, request);
} catch (IOException ioe) {
Expand All @@ -2108,7 +2108,7 @@ public ContainerCommandResponseProto readBlock(
return responseProto;
}

private void readBlockImpl(ContainerCommandRequestProto request, RandomAccessFileChannel blockFile,
private long readBlockImpl(ContainerCommandRequestProto request, RandomAccessFileChannel blockFile,
Container kvContainer, StreamObserver<ContainerCommandResponseProto> streamObserver, boolean verifyChecksum)
throws IOException {
final ReadBlockRequestProto readBlock = request.getReadBlock();
Expand Down Expand Up @@ -2148,7 +2148,7 @@ private void readBlockImpl(ContainerCommandRequestProto request, RandomAccessFil

final ByteBuffer buffer = ByteBuffer.allocate(responseDataSize);
blockFile.position(adjustedOffset);
int totalDataLength = 0;
long totalDataLength = 0;
int numResponses = 0;
final long rounded = roundUp(readBlock.getLength() + offsetAlignment, bytesPerChecksum);
final long requiredLength = Math.min(rounded, blockData.getSize() - adjustedOffset);
Expand Down Expand Up @@ -2186,6 +2186,7 @@ private void readBlockImpl(ContainerCommandRequestProto request, RandomAccessFil
totalDataLength += dataLength;
numResponses++;
}
return totalDataLength;
}

static List<ByteString> getChecksums(long blockOffset, int readLength, int bytesPerChunk, int bytesPerChecksum,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,16 @@
import static org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeTestUtils.verifyAllDataChecksumsMatch;
import static org.apache.hadoop.ozone.container.common.ContainerTestUtils.createBlockMetaData;
import static org.apache.hadoop.ozone.container.common.impl.ContainerImplTestUtils.newContainerSet;
import static org.apache.ozone.test.MetricsAsserts.assertCounter;
import static org.apache.ozone.test.MetricsAsserts.getMetrics;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
Expand All @@ -53,6 +56,7 @@
import com.google.common.collect.Sets;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Clock;
Expand All @@ -68,22 +72,30 @@
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.security.token.TokenVerifier;
import org.apache.hadoop.hdds.utils.io.RandomAccessFileChannel;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.ozone.common.ChunkBuffer;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
import org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeWriter;
import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient;
import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.impl.ContainerImplTestUtils;
import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
Expand All @@ -94,6 +106,7 @@
import org.apache.hadoop.ozone.container.common.report.IncrementalReportSender;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
Expand All @@ -106,6 +119,7 @@
import org.apache.hadoop.util.Time;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ozone.test.GenericTestUtils.LogCapturer;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -938,4 +952,138 @@ private KeyValueHandler createKeyValueHandler(Path path) throws IOException {

return kvHandler;
}

private static class HandlerWithVolumeSet {
private final KeyValueHandler handler;
private final MutableVolumeSet volumeSet;
private final ContainerSet containerSet;

HandlerWithVolumeSet(KeyValueHandler handler, MutableVolumeSet volumeSet, ContainerSet containerSet) {
this.handler = handler;
this.volumeSet = volumeSet;
this.containerSet = containerSet;
}

KeyValueHandler getHandler() {
return handler;
}

MutableVolumeSet getVolumeSet() {
return volumeSet;
}

ContainerSet getContainerSet() {
return containerSet;
}
}

private HandlerWithVolumeSet createKeyValueHandlerWithVolumeSet(Path path) throws IOException {
ContainerMetrics.remove();
final ContainerSet containerSet = newContainerSet();
final MutableVolumeSet volumeSet = mock(MutableVolumeSet.class);

HddsVolume hddsVolume = new HddsVolume.Builder(path.toString()).conf(conf)
.clusterID(CLUSTER_ID).datanodeUuid(DATANODE_UUID)
.volumeSet(volumeSet)
.build();
hddsVolume.format(CLUSTER_ID);
hddsVolume.createWorkingDir(CLUSTER_ID, null);
hddsVolume.createTmpDirs(CLUSTER_ID);
when(volumeSet.getVolumesList()).thenReturn(Collections.singletonList(hddsVolume));

final KeyValueHandler kvHandler = ContainerTestUtils.getKeyValueHandler(conf,
DATANODE_UUID, containerSet, volumeSet);
kvHandler.setClusterID(CLUSTER_ID);
hddsVolume.getVolumeInfoStats().unregister();
hddsVolume.getVolumeIOStats().unregister();

ContainerController controller = new ContainerController(containerSet,
Collections.singletonMap(ContainerType.KeyValueContainer, kvHandler));
OnDemandContainerScanner onDemandScanner = new OnDemandContainerScanner(
conf.getObject(ContainerScannerConfiguration.class), controller);
containerSet.registerOnDemandScanner(onDemandScanner);

return new HandlerWithVolumeSet(kvHandler, volumeSet, containerSet);
}

@Test
public void testReadBlockMetrics() throws Exception {
Path testDir = Files.createTempDirectory("testReadBlockMetrics");
RandomAccessFileChannel blockFile = null;
try {
conf.set(OZONE_SCM_CONTAINER_LAYOUT_KEY, ContainerLayoutVersion.FILE_PER_BLOCK.name());
HandlerWithVolumeSet handlerWithVolume = createKeyValueHandlerWithVolumeSet(testDir);
KeyValueHandler kvHandler = handlerWithVolume.getHandler();
MutableVolumeSet volumeSet = handlerWithVolume.getVolumeSet();
ContainerSet containerSet = handlerWithVolume.getContainerSet();

long containerID = ContainerTestHelper.getTestContainerID();
KeyValueContainerData containerData = new KeyValueContainerData(
containerID, ContainerLayoutVersion.FILE_PER_BLOCK,
(long) StorageUnit.GB.toBytes(1), UUID.randomUUID().toString(),
DATANODE_UUID);
KeyValueContainer container = new KeyValueContainer(containerData, conf);
container.create(volumeSet, new RoundRobinVolumeChoosingPolicy(), CLUSTER_ID);
containerSet.addContainer(container);

BlockID blockID = ContainerTestHelper.getTestBlockID(containerID);
BlockData blockData = new BlockData(blockID);
ChunkInfo chunkInfo = new ChunkInfo("chunk1", 0, 1024);
blockData.addChunk(chunkInfo.getProtoBufMessage());
kvHandler.getBlockManager().putBlock(container, blockData);

ChunkBuffer data = ChunkBuffer.wrap(ByteBuffer.allocate(1024));
kvHandler.getChunkManager().writeChunk(container, blockID, chunkInfo, data,
DispatcherContext.getHandleWriteChunk());

ContainerCommandRequestProto readBlockRequest =
ContainerCommandRequestProto.newBuilder()
.setCmdType(ContainerProtos.Type.ReadBlock)
.setContainerID(containerID)
.setDatanodeUuid(DATANODE_UUID)
.setReadBlock(ContainerProtos.ReadBlockRequestProto.newBuilder()
.setBlockID(blockID.getDatanodeBlockIDProtobuf())
.setOffset(0)
.setLength(1024)
.build())
.build();

final AtomicInteger responseCount = new AtomicInteger(0);

StreamObserver<ContainerCommandResponseProto> streamObserver =
new StreamObserver<ContainerCommandResponseProto>() {
@Override
public void onNext(ContainerCommandResponseProto response) {
assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
responseCount.incrementAndGet();
}

@Override
public void onError(Throwable t) {
fail("ReadBlock failed", t);
}

@Override
public void onCompleted() {
}
};

blockFile = new RandomAccessFileChannel();
ContainerCommandResponseProto response = kvHandler.readBlock(
readBlockRequest, container, blockFile, streamObserver);

assertNull(response, "ReadBlock should return null on success");
assertTrue(responseCount.get() > 0, "Should receive at least one response");

MetricsRecordBuilder containerMetrics = getMetrics(
ContainerMetrics.STORAGE_CONTAINER_METRICS);
assertCounter("bytesReadBlock", 1024L, containerMetrics);
} finally {
if (blockFile != null) {
blockFile.close();
}
FileUtils.deleteDirectory(testDir.toFile());
ContainerMetrics.remove();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ static void runTestClientServer(
assertCounter("numReadChunk", 1L, containerMetrics);
assertCounter("bytesWriteChunk", 1024L, containerMetrics);
assertCounter("bytesReadChunk", 1024L, containerMetrics);
// bytesReadBlock is tested in TestKeyValueHandler.testReadBlockMetrics

String sec = DFS_METRICS_PERCENTILES_INTERVALS + "s";
Thread.sleep((DFS_METRICS_PERCENTILES_INTERVALS + 1) * 1000);
Expand Down
Loading