Skip to content

Commit 3f19bfd

Browse files
refactor(interactive): Add error handling for Interactive runtime (#4435)
Reintroducing boost leaf for handling error for Interactive runtime module. Related PR: #4188 --------- Co-authored-by: xiaolei.zl <[email protected]>
1 parent c6ae400 commit 3f19bfd

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

91 files changed

+1703
-787
lines changed

flex/bin/adhoc_runner.cc

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,32 @@ gs::runtime::Context eval_plan(
8282
const std::map<std::string, std::string>& params) {
8383
gs::runtime::GraphReadInterface gri(txn);
8484
gs::runtime::OprTimer timer;
85-
return gs::runtime::PlanParser::get()
86-
.parse_read_pipeline(gri.schema(), gs::runtime::ContextMeta(), plan)
87-
.Execute(gri, gs::runtime::Context(), params, timer);
85+
86+
gs::runtime::Context ctx;
87+
{
88+
ctx = bl::try_handle_all(
89+
[&plan, &params, &gri, &timer]() {
90+
return gs::runtime::PlanParser::get()
91+
.parse_read_pipeline(gri.schema(), gs::runtime::ContextMeta(),
92+
plan)
93+
.value()
94+
.Execute(gri, gs::runtime::Context(), params, timer);
95+
},
96+
[&ctx](const gs::Status& err) {
97+
LOG(FATAL) << "Error in execution: " << err.error_message();
98+
return ctx;
99+
},
100+
[&](const bl::error_info& err) {
101+
LOG(FATAL) << "Error: " << err.error().value() << ", "
102+
<< err.exception()->what();
103+
return ctx;
104+
},
105+
[&]() {
106+
LOG(FATAL) << "Unknown error in execution";
107+
return ctx;
108+
});
109+
}
110+
return ctx;
88111
}
89112

90113
int main(int argc, char** argv) {

flex/bin/rt_admin.cc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ int main(int argc, char** argv) {
6161
if (op == "SHOW_STORED_PROCEDURES") {
6262
encoder.put_string(op);
6363
encoder.put_byte(0);
64+
encoder.put_byte(0);
6465
} else if (op == "QUERY_VERTEX") {
6566
if (argc < 4) {
6667
std::cerr << "usage for vertex query: rt_admin query_vertex "
@@ -74,6 +75,7 @@ int main(int argc, char** argv) {
7475
encoder.put_string(label_name);
7576
encoder.put_long(vertex_id);
7677
encoder.put_byte(0);
78+
encoder.put_byte(0);
7779
} else if (op == "QUERY_EDGE") {
7880
if (argc < 7) {
7981
std::cerr << "usage for edge query: rt_admin query_edge <src-label> "
@@ -103,13 +105,14 @@ int main(int argc, char** argv) {
103105
encoder.put_long(dst_id);
104106
encoder.put_string(edge_label);
105107
encoder.put_byte(0);
108+
encoder.put_byte(0);
106109
} else {
107110
std::cerr << "unexpected op - " << op << std::endl;
108111
return -1;
109112
}
110113

111114
std::string content(buf.data(), buf.size());
112-
auto res = cli.Post("/interactive/app", content, "text/plain");
115+
auto res = cli.Post("/v1/graph/current/query", content, "text/plain");
113116

114117
std::string ret = res->body;
115118
if (op == "SHOW_STORED_PROCEDURES") {

flex/bin/rt_server.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ int main(int argc, char** argv) {
3939
"warmup,w", bpo::value<bool>()->default_value(false),
4040
"warmup graph data")("memory-level,m",
4141
bpo::value<int>()->default_value(1))(
42+
"compiler-path,c", bpo::value<std::string>()->default_value(""))(
4243
"sharding-mode", bpo::value<std::string>()->default_value("cooperative"));
4344
google::InitGoogleLogging(argv[0]);
4445
FLAGS_logtostderr = true;
@@ -69,6 +70,7 @@ int main(int argc, char** argv) {
6970
return -1;
7071
}
7172
data_path = vm["data-path"].as<std::string>();
73+
std::string compiler_path = vm["compiler-path"].as<std::string>();
7274

7375
setenv("TZ", "Asia/Shanghai", 1);
7476
tzset();
@@ -80,7 +82,7 @@ int main(int argc, char** argv) {
8082
if (!schema.ok()) {
8183
LOG(FATAL) << "Failed to load schema: " << schema.status().error_message();
8284
}
83-
gs::GraphDBConfig config(schema.value(), data_path, shard_num);
85+
gs::GraphDBConfig config(schema.value(), data_path, compiler_path, shard_num);
8486
config.memory_level = memory_level;
8587
if (config.memory_level >= 2) {
8688
config.enable_auto_compaction = true;

flex/bin/stored_procedure_runner.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ int main(int argc, char** argv) {
137137
if (!schema.ok()) {
138138
LOG(FATAL) << "Failed to load schema: " << schema.status().error_message();
139139
}
140-
gs::GraphDBConfig config(schema.value(), data_path, 1);
140+
gs::GraphDBConfig config(schema.value(), data_path);
141141
config.memory_level = memory_level;
142142
if (config.memory_level >= 2) {
143143
config.enable_auto_compaction = true;

flex/engines/graph_db/app/cypher_app_utils.cc

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,11 +76,12 @@ void generate_compiler_configs(const std::string& graph_yaml,
7676

7777
bool generate_plan(
7878
const std::string& query, const std::string& statistics,
79-
const std::string& compiler_yaml, const std::string& tmp_dir,
79+
const std::string& compiler_jar_path, const std::string& compiler_yaml,
80+
const std::string& tmp_dir,
8081
std::unordered_map<std::string, physical::PhysicalPlan>& plan_cache) {
8182
// dump query to file
82-
const char* compiler_jar = getenv("COMPILER_JAR");
83-
if (compiler_jar == nullptr) {
83+
const char* compiler_jar = compiler_jar_path.c_str();
84+
if (compiler_jar_path == "") {
8485
std::cerr << "COMPILER_JAR is not set!" << std::endl;
8586
compiler_jar =
8687
"../../interactive_engine/compiler/target/"

flex/engines/graph_db/app/cypher_app_utils.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@
2424
namespace gs {
2525
bool generate_plan(
2626
const std::string& query, const std::string& statistics,
27-
const std::string& compiler_yaml, const std::string& tmp_dir,
27+
const std::string& compiler_jar_path, const std::string& compiler_yaml,
28+
const std::string& tmp_dir,
2829
std::unordered_map<std::string, physical::PhysicalPlan>& plan_cache);
2930
void parse_params(std::string_view sw,
3031
std::map<std::string, std::string>& params);

flex/engines/graph_db/app/cypher_read_app.cc

Lines changed: 44 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,44 @@ bool CypherReadApp::Query(const GraphDBSession& graph, Decoder& input,
2323
auto txn = graph.GetReadTransaction();
2424

2525
gs::runtime::GraphReadInterface gri(txn);
26-
auto ctx = runtime::PlanParser::get()
27-
.parse_read_pipeline(graph.schema(),
28-
gs::runtime::ContextMeta(), plan)
29-
.Execute(gri, runtime::Context(), {}, timer_);
3026

27+
gs::runtime::Context ctx;
28+
gs::Status status = gs::Status::OK();
29+
{
30+
ctx = bl::try_handle_all(
31+
[this, &gri, &plan]() -> bl::result<runtime::Context> {
32+
return runtime::PlanParser::get()
33+
.parse_read_pipeline(gri.schema(), gs::runtime::ContextMeta(),
34+
plan)
35+
.value()
36+
.Execute(gri, runtime::Context(), {}, timer_);
37+
},
38+
[&status](const gs::Status& err) {
39+
status = err;
40+
return runtime::Context();
41+
},
42+
[&](const bl::error_info& err) {
43+
status =
44+
gs::Status(gs::StatusCode::INTERNAL_ERROR,
45+
"Error: " + std::to_string(err.error().value()) +
46+
", Exception: " + err.exception()->what());
47+
return runtime::Context();
48+
},
49+
[&]() {
50+
status = gs::Status(gs::StatusCode::UNKNOWN, "Unknown error");
51+
return runtime::Context();
52+
});
53+
}
54+
55+
if (!status.ok()) {
56+
LOG(ERROR) << "Error: " << status.ToString();
57+
// We encode the error message to the output, so that the client can
58+
// get the error message.
59+
output.put_string(status.ToString());
60+
return false;
61+
}
3162
runtime::Sink::sink(ctx, txn, output);
63+
return true;
3264
} else {
3365
size_t sep = bytes.find_first_of("&?");
3466
auto query_str = bytes.substr(0, sep);
@@ -38,7 +70,6 @@ bool CypherReadApp::Query(const GraphDBSession& graph, Decoder& input,
3870
auto query = std::string(query_str.data(), query_str.size());
3971
if (!pipeline_cache_.count(query)) {
4072
if (plan_cache_.count(query)) {
41-
// LOG(INFO) << "Hit cache for query ";
4273
} else {
4374
auto& query_cache = db_.getQueryCache();
4475
std::string_view plan_str;
@@ -52,9 +83,10 @@ bool CypherReadApp::Query(const GraphDBSession& graph, Decoder& input,
5283
const std::string statistics = db_.work_dir() + "/statistics.json";
5384
const std::string& compiler_yaml = db_.work_dir() + "/graph.yaml";
5485
const std::string& tmp_dir = db_.work_dir() + "/runtime/tmp/";
86+
const auto& compiler_path = db_.schema().get_compiler_path();
5587
for (int i = 0; i < 3; ++i) {
56-
if (!generate_plan(query, statistics, compiler_yaml, tmp_dir,
57-
plan_cache_)) {
88+
if (!generate_plan(query, statistics, compiler_path, compiler_yaml,
89+
tmp_dir, plan_cache_)) {
5890
LOG(ERROR) << "Generate plan failed for query: " << query;
5991
} else {
6092
query_cache.put(query, plan_cache_[query].SerializeAsString());
@@ -65,16 +97,18 @@ bool CypherReadApp::Query(const GraphDBSession& graph, Decoder& input,
6597
}
6698
const auto& plan = plan_cache_[query];
6799
pipeline_cache_.emplace(
68-
query, runtime::PlanParser::get().parse_read_pipeline(
69-
db_.schema(), gs::runtime::ContextMeta(), plan));
100+
query, runtime::PlanParser::get()
101+
.parse_read_pipeline(db_.schema(),
102+
gs::runtime::ContextMeta(), plan)
103+
.value());
70104
}
71105
auto txn = graph.GetReadTransaction();
72106

73107
gs::runtime::GraphReadInterface gri(txn);
74108
auto ctx = pipeline_cache_.at(query).Execute(gri, runtime::Context(),
75109
params, timer_);
76110

77-
runtime::Sink::sink_encoder(ctx, gri, output);
111+
runtime::Sink::sink_encoder(ctx.value(), gri, output);
78112
}
79113
return true;
80114
}

flex/engines/graph_db/app/cypher_write_app.cc

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,11 @@ bool CypherWriteApp::Query(GraphDBSession& graph, Decoder& input,
3434
}
3535
plan_cache_[query] = plan;
3636
} else {
37+
const auto& compiler_path = db_.schema().get_compiler_path();
38+
3739
for (int i = 0; i < 3; ++i) {
38-
if (!generate_plan(query, statistics, compiler_yaml, tmp_dir,
39-
plan_cache_)) {
40+
if (!generate_plan(query, statistics, compiler_path, compiler_yaml,
41+
tmp_dir, plan_cache_)) {
4042
LOG(ERROR) << "Generate plan failed for query: " << query;
4143
} else {
4244
query_cache.put(query, plan_cache_[query].SerializeAsString());
@@ -46,9 +48,9 @@ bool CypherWriteApp::Query(GraphDBSession& graph, Decoder& input,
4648
}
4749
}
4850
const auto& plan = plan_cache_[query];
49-
pipeline_cache_.emplace(
50-
query,
51-
runtime::PlanParser::get().parse_write_pipeline(db_.schema(), plan));
51+
pipeline_cache_.emplace(query, runtime::PlanParser::get()
52+
.parse_write_pipeline(db_.schema(), plan)
53+
.value());
5254
} else {
5355
}
5456

flex/engines/graph_db/database/graph_db.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ GraphDB& GraphDB::get() {
7373
Result<bool> GraphDB::Open(const Schema& schema, const std::string& data_dir,
7474
int32_t thread_num, bool warmup, bool memory_only,
7575
bool enable_auto_compaction) {
76-
GraphDBConfig config(schema, data_dir, thread_num);
76+
GraphDBConfig config(schema, data_dir, "", thread_num);
7777
config.warmup = warmup;
7878
if (memory_only) {
7979
config.memory_level = 1;
@@ -118,6 +118,7 @@ Result<bool> GraphDB::Open(const GraphDBConfig& config) {
118118
// is not serialized and deserialized.
119119
auto& mutable_schema = graph_.mutable_schema();
120120
mutable_schema.SetPluginDir(schema.GetPluginDir());
121+
mutable_schema.set_compiler_path(config.compiler_path);
121122
std::vector<std::pair<std::string, std::string>> plugin_name_paths;
122123
const auto& plugins = schema.GetPlugins();
123124
for (auto plugin_pair : plugins) {

flex/engines/graph_db/database/graph_db.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,10 @@ struct SessionLocalContext;
4343

4444
struct GraphDBConfig {
4545
GraphDBConfig(const Schema& schema_, const std::string& data_dir_,
46-
int thread_num_ = 1)
46+
const std::string& compiler_path_ = "", int thread_num_ = 1)
4747
: schema(schema_),
4848
data_dir(data_dir_),
49+
compiler_path(compiler_path_),
4950
thread_num(thread_num_),
5051
warmup(false),
5152
enable_monitoring(false),
@@ -54,6 +55,7 @@ struct GraphDBConfig {
5455

5556
Schema schema;
5657
std::string data_dir;
58+
std::string compiler_path;
5759
int thread_num;
5860
bool warmup;
5961
bool enable_monitoring;

0 commit comments

Comments
 (0)