Skip to content

Commit 613a464

Browse files
authored
fix(interactive): file mode discovery ; support control secondary catch up ; support compact specific partition (#4626)
- support zk kafka discovery by local file - support control secondary catch up - support compact specific partition
1 parent 7ec6d6c commit 613a464

File tree

17 files changed

+455
-5
lines changed

17 files changed

+455
-5
lines changed

.devcontainer/devcontainer.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
"name": "GraphScope",
66
// Or use a Dockerfile or Docker Compose file. More info: https://containers.dev/guide/dockerfile
77
"image": "registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-dev:v0.24.2-amd64",
8-
98
// Features to add to the dev container. More info: https://containers.dev/features.
109
"features": {
1110
"ghcr.io/devcontainers/features/common-utils:2":{

interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CommonConfig.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,4 +84,10 @@ public class CommonConfig {
8484

8585
public static final Config<Integer> COLLECT_STATISTICS_INTERVAL_MIN =
8686
Config.intConfig("collect.statistics.interval.min", 60);
87+
88+
public static final Config<String> SERVERS_DISCOVERY_MODE =
89+
Config.stringConfig("servers.discovery.mode", "service");
90+
91+
public static final Config<Integer> FILE_DISCOVERY_INTERVAL_MS =
92+
Config.intConfig("file.discovery.interval.ms", 60000);
8793
}

interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/KafkaConfig.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,13 @@
1313
*/
1414
package com.alibaba.graphscope.groot.common.config;
1515

16+
import com.alibaba.graphscope.groot.common.meta.ServerDiscoverMode;
17+
18+
import java.io.IOException;
19+
import java.nio.file.Files;
20+
import java.nio.file.Path;
21+
import java.nio.file.Paths;
22+
1623
public class KafkaConfig {
1724
public static final Config<String> KAFKA_SERVERS =
1825
Config.stringConfig("kafka.servers", "localhost:9092");
@@ -28,4 +35,60 @@ public class KafkaConfig {
2835

2936
public static final Config<Integer> KAFKA_MAX_MESSAGE_MB =
3037
Config.intConfig("kafka.max.message.mb", 20);
38+
39+
private static volatile String cachedKafkaServers;
40+
private static volatile long lastUpdateTime = 0L;
41+
42+
public static String getKafkaServers(Configs configs) {
43+
String kafkaServerDiscoveryMode = CommonConfig.SERVERS_DISCOVERY_MODE.get(configs);
44+
if (kafkaServerDiscoveryMode == null) {
45+
return KAFKA_SERVERS.get(configs);
46+
}
47+
ServerDiscoverMode discoverMode = ServerDiscoverMode.fromMode(kafkaServerDiscoveryMode);
48+
switch (discoverMode) {
49+
case SERVICE:
50+
return KAFKA_SERVERS.get(configs);
51+
case FILE:
52+
String filePath = KAFKA_SERVERS.get(configs);
53+
Integer refreshIntervalMs = CommonConfig.FILE_DISCOVERY_INTERVAL_MS.get(configs);
54+
long now = System.currentTimeMillis();
55+
if (cachedKafkaServers == null || (now - lastUpdateTime) > refreshIntervalMs) {
56+
synchronized (KafkaConfig.class) {
57+
if (cachedKafkaServers == null
58+
|| (now - lastUpdateTime) > refreshIntervalMs) {
59+
try {
60+
Path path = Paths.get(filePath);
61+
if (Files.exists(path)) {
62+
cachedKafkaServers = Files.readString(path).trim();
63+
lastUpdateTime = now;
64+
} else {
65+
throw new IllegalArgumentException(
66+
"Kafka servers file not found: " + filePath);
67+
}
68+
} catch (IOException e) {
69+
throw new RuntimeException(
70+
"read Kafka ip file error: " + filePath, e);
71+
}
72+
}
73+
}
74+
}
75+
return cachedKafkaServers;
76+
default:
77+
return KAFKA_SERVERS.get(configs);
78+
}
79+
}
80+
81+
public static void setKafkaServersToFile(String kafkaAddress, Configs configs) {
82+
String filePath = KAFKA_SERVERS.get(configs);
83+
Path path = Paths.get(filePath);
84+
synchronized (KafkaConfig.class) {
85+
try {
86+
Files.writeString(path, kafkaAddress);
87+
cachedKafkaServers = kafkaAddress.trim();
88+
lastUpdateTime = System.currentTimeMillis();
89+
} catch (IOException e) {
90+
throw new RuntimeException("Failed to write Kafka servers file: " + filePath, e);
91+
}
92+
}
93+
}
3194
}

interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/ZkConfig.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,13 @@
1515
*/
1616
package com.alibaba.graphscope.groot.common.config;
1717

18+
import com.alibaba.graphscope.groot.common.meta.ServerDiscoverMode;
19+
20+
import java.io.IOException;
21+
import java.nio.file.Files;
22+
import java.nio.file.Path;
23+
import java.nio.file.Paths;
24+
1825
public class ZkConfig {
1926
public static final Config<String> ZK_BASE_PATH =
2027
Config.stringConfig("zk.base.path", "/graphscope/default_graph");
@@ -42,4 +49,57 @@ public class ZkConfig {
4249

4350
public static final Config<String> ZK_AUTH_PASSWORD =
4451
Config.stringConfig("zk.auth.password", "");
52+
53+
private static volatile String cachedZkServers;
54+
private static volatile long lastUpdateTime = 0L;
55+
56+
public static String getZkServers(Configs configs) {
57+
String kafkaServerDiscoveryMode = CommonConfig.SERVERS_DISCOVERY_MODE.get(configs);
58+
if (kafkaServerDiscoveryMode == null) {
59+
return ZK_CONNECT_STRING.get(configs);
60+
}
61+
ServerDiscoverMode discoverMode = ServerDiscoverMode.fromMode(kafkaServerDiscoveryMode);
62+
switch (discoverMode) {
63+
case SERVICE:
64+
return ZK_CONNECT_STRING.get(configs);
65+
case FILE:
66+
String filePath = ZK_CONNECT_STRING.get(configs);
67+
Integer refreshIntervalMs = CommonConfig.FILE_DISCOVERY_INTERVAL_MS.get(configs);
68+
long now = System.currentTimeMillis();
69+
if (cachedZkServers == null || (now - lastUpdateTime) > refreshIntervalMs) {
70+
synchronized (KafkaConfig.class) {
71+
if (cachedZkServers == null || (now - lastUpdateTime) > refreshIntervalMs) {
72+
try {
73+
Path path = Paths.get(filePath);
74+
if (Files.exists(path)) {
75+
cachedZkServers = Files.readString(path).trim();
76+
lastUpdateTime = now;
77+
} else {
78+
throw new IllegalArgumentException(
79+
"Zk servers file not found: " + filePath);
80+
}
81+
} catch (IOException e) {
82+
throw new RuntimeException(
83+
"read Zk servers file error: " + filePath, e);
84+
}
85+
}
86+
}
87+
}
88+
return cachedZkServers;
89+
default:
90+
return ZK_CONNECT_STRING.get(configs);
91+
}
92+
}
93+
94+
public static void setZkServersToFile(String zkAddress, Configs configs) {
95+
String filePath = ZK_CONNECT_STRING.get(configs);
96+
Path path = Paths.get(filePath);
97+
try {
98+
Files.writeString(path, zkAddress);
99+
cachedZkServers = zkAddress.trim();
100+
lastUpdateTime = System.currentTimeMillis();
101+
} catch (IOException e) {
102+
throw new RuntimeException("Failed to write Zk servers file: " + filePath, e);
103+
}
104+
}
45105
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package com.alibaba.graphscope.groot.common.meta;
2+
3+
public enum ServerDiscoverMode {
4+
5+
// use k8s dns as service discover
6+
SERVICE("service"),
7+
8+
FILE("file");
9+
10+
private String mode;
11+
12+
ServerDiscoverMode(String mode) {
13+
this.mode = mode;
14+
}
15+
16+
public String getMode() {
17+
return mode;
18+
}
19+
20+
public static ServerDiscoverMode fromMode(String modeStr) {
21+
for (ServerDiscoverMode mode : ServerDiscoverMode.values()) {
22+
if (mode.getMode().equals(modeStr)) {
23+
return mode;
24+
}
25+
}
26+
return null;
27+
}
28+
}

interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/GrootClient.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -713,12 +713,26 @@ public boolean compactDB() {
713713
return response.getSuccess();
714714
}
715715

716+
public boolean compactPartition(int partitionId) {
717+
CompactPartitionRequest request =
718+
CompactPartitionRequest.newBuilder().setPartitionId(partitionId).build();
719+
CompactPartitionResponse response = this.clientStub.compactPartition(request);
720+
return response.getSuccess();
721+
}
722+
716723
public boolean reopenSecondary() {
717724
ReopenSecondaryRequest request = ReopenSecondaryRequest.newBuilder().build();
718725
ReopenSecondaryResponse response = this.clientStub.reopenSecondary(request);
719726
return response.getSuccess();
720727
}
721728

729+
public boolean updateCatchUpStatus(boolean enableCatchUpPrimary) {
730+
UpdateCatchUpStatusRequest request =
731+
UpdateCatchUpStatusRequest.newBuilder().setEnable(enableCatchUpPrimary).build();
732+
UpdateCatchUpStatusResponse response = this.clientStub.updateCatchUpStatus(request);
733+
return response.getSuccess();
734+
}
735+
722736
public String loadJsonSchema(Path jsonFile) throws IOException {
723737
String json = new String(Files.readAllBytes(jsonFile), StandardCharsets.UTF_8);
724738
return loadSchema(json, 0);

interactive_engine/groot-module/README-zh.md

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,3 +54,40 @@ Frontend 是无状态的,当 Frontend 挂掉时它正在处理的写请求会
5454

5555
当 Coordinator 挂掉时,写链路可以继续写数据,但是新写入的数据均不可见。Coordinator failover 时,首先从 MetaStore 恢复 SnapshotId、QueueOffsets 信息,然后继续接收 Store 汇报的进度、处理并更新 Frontend 的 QuerySnapshotId、更新 Ingestor 的 WriteSnapshotId。
5656

57+
## 3. 新增特性
58+
#### 1. 支持通过本地file发现zookeeper和kafka(适用于dns不可用的k8s集群)
59+
前提:需要zk和kafka的启动脚本里, 把自己的ip写到自定义路径的文件里
60+
```
61+
servers.discovery.mode=file // 默认service(通过k8s dns进行服务发现)
62+
zk.connect.string=${file_path}
63+
kafka.servers=${file_path}
64+
```
65+
66+
#### 2. 支持手动开启/关闭Secondary实例的Catch Up状态
67+
```
68+
/**
69+
* true为开启catch up, false为关闭catch up
70+
* catch up 周期参数(单位毫秒): store.catchup.interval.ms
71+
*/
72+
com.alibaba.graphscope.groot.sdk.GrootClient.updateCatchUpStatus(boolean enableCatchUpPrimary)
73+
```
74+
75+
#### 3. 支持manual compact指定partition
76+
当前的compact会一次性compact所有partition, secondary会catch up大量sst变更导致compact期间rt不稳定, 可调用该接口控制compact速率, 提高secondary rt稳定性
77+
```
78+
/**
79+
* 获取Groot所有分区数
80+
*/
81+
com.alibaba.graphscope.groot.sdk.GrootClient.getPartitionNum()
82+
83+
/**
84+
* compact指定的partition
85+
*/
86+
com.alibaba.graphscope.groot.sdk.GrootClient.compactPartition(int partitionId)
87+
88+
// 简单使用示例
89+
int partitionNum = grootClient.getPartitionNum();
90+
for (int partitionId = 0; partitionId < partitionNum; partitionId++) {
91+
grootClient.compactPartition(partitionId);
92+
}
93+
```

interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/CuratorUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ public class CuratorUtils {
3636
private static final Logger logger = LoggerFactory.getLogger(CuratorUtils.class);
3737

3838
public static CuratorFramework makeCurator(Configs configs) {
39-
String connectionString = ZkConfig.ZK_CONNECT_STRING.get(configs);
39+
String connectionString = ZkConfig.getZkServers(configs);
40+
logger.info("zk connectionString is :" + connectionString);
4041
int sessionTimeoutMs = ZkConfig.ZK_SESSION_TIMEOUT_MS.get(configs);
4142
int connectionTimeoutMs = ZkConfig.ZK_CONNECTION_TIMEOUT_MS.get(configs);
4243
int baseSleepMs = ZkConfig.ZK_BASE_SLEEP_MS.get(configs);

interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientService.java

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -580,4 +580,100 @@ private void finish(Throwable t) {
580580
});
581581
}
582582
}
583+
584+
@Override
585+
public void updateCatchUpStatus(
586+
UpdateCatchUpStatusRequest request,
587+
StreamObserver<UpdateCatchUpStatusResponse> responseObserver) {
588+
boolean status = request.getEnable();
589+
logger.info("updateCatchUpStatus:{}", status);
590+
int storeCount = this.metaService.getStoreCount();
591+
AtomicInteger counter = new AtomicInteger(storeCount);
592+
AtomicBoolean finished = new AtomicBoolean(false);
593+
for (int i = 0; i < storeCount; i++) {
594+
this.frontendStoreClients
595+
.getClient(i)
596+
.updateCatchUpStatus(
597+
request,
598+
new CompletionCallback<UpdateCatchUpStatusResponse>() {
599+
600+
@Override
601+
public void onCompleted(UpdateCatchUpStatusResponse res) {
602+
if (!finished.get() && counter.decrementAndGet() == 0) {
603+
finish(null);
604+
}
605+
}
606+
607+
@Override
608+
public void onError(Throwable t) {
609+
logger.error("failed update catch up status", t);
610+
finish(t);
611+
}
612+
613+
private void finish(Throwable t) {
614+
if (finished.getAndSet(true)) {
615+
return;
616+
}
617+
logger.info("updateCatchUpStatus. Error [" + t + "]");
618+
if (t != null) {
619+
responseObserver.onError(t);
620+
} else {
621+
UpdateCatchUpStatusResponse res =
622+
UpdateCatchUpStatusResponse.newBuilder()
623+
.setSuccess(true)
624+
.build();
625+
responseObserver.onNext(res);
626+
responseObserver.onCompleted();
627+
}
628+
}
629+
});
630+
}
631+
}
632+
633+
@Override
634+
public void compactPartition(
635+
CompactPartitionRequest request,
636+
StreamObserver<CompactPartitionResponse> responseObserver) {
637+
logger.info("compactPartition start");
638+
int storeCount = this.metaService.getStoreCount();
639+
AtomicInteger counter = new AtomicInteger(storeCount);
640+
AtomicBoolean finished = new AtomicBoolean(false);
641+
for (int i = 0; i < storeCount; i++) {
642+
this.frontendStoreClients
643+
.getClient(i)
644+
.compactPartition(
645+
request,
646+
new CompletionCallback<CompactPartitionResponse>() {
647+
@Override
648+
public void onCompleted(CompactPartitionResponse res) {
649+
if (!finished.get() && counter.decrementAndGet() == 0) {
650+
finish(null);
651+
}
652+
}
653+
654+
@Override
655+
public void onError(Throwable t) {
656+
logger.error("failed to compact partition", t);
657+
finish(t);
658+
}
659+
660+
private void finish(Throwable t) {
661+
if (finished.getAndSet(true)) {
662+
return;
663+
}
664+
logger.info("compact partition finished. Error [" + t + "]");
665+
if (t != null) {
666+
responseObserver.onError(t);
667+
} else {
668+
CompactPartitionResponse res =
669+
CompactPartitionResponse.newBuilder()
670+
.setSuccess(true)
671+
.build();
672+
responseObserver.onNext(res);
673+
responseObserver.onCompleted();
674+
}
675+
}
676+
});
677+
}
678+
}
583679
}

0 commit comments

Comments
 (0)