Skip to content

Commit a86b8b7

Browse files
zhagnluluzhang
andauthored
enhance: move jsonshredding meta from parquet to meta.json (#46130)
#42533 Signed-off-by: luzhang <[email protected]> Co-authored-by: luzhang <[email protected]>
1 parent 3aa0b76 commit a86b8b7

File tree

10 files changed

+792
-43
lines changed

10 files changed

+792
-43
lines changed

internal/core/src/common/Consts.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,9 @@ constexpr const char* JSON_STATS_ROOT_PATH = "json_stats";
5959
constexpr const char* JSON_STATS_DATA_FORMAT_VERSION = "2";
6060
constexpr const char* JSON_STATS_SHARED_INDEX_PATH = "shared_key_index";
6161
constexpr const char* JSON_STATS_SHREDDING_DATA_PATH = "shredding_data";
62+
constexpr const char* JSON_STATS_META_FILE_NAME = "meta.json";
6263
constexpr const char* JSON_KEY_STATS_SHARED_FIELD_NAME = "__shared";
63-
// store key layout type in parquet file metadata
64+
// store key layout type in parquet file metadata (deprecated, now stored in separate file)
6465
inline constexpr const char* JSON_STATS_META_KEY_LAYOUT_TYPE_MAP =
6566
"key_layout_type_map";
6667
// start json stats field id for mock column

internal/core/src/index/json_stats/JsonKeyStats.cpp

Lines changed: 123 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include "index/InvertedIndexUtil.h"
2424
#include "index/Utils.h"
2525
#include "milvus-storage/filesystem/fs.h"
26+
#include "storage/LocalChunkManagerSingleton.h"
2627
#include "storage/MmapManager.h"
2728
#include "storage/Util.h"
2829
#include "common/bson_view.h"
@@ -624,6 +625,64 @@ JsonKeyStats::GetSharedKeyIndexDir() {
624625
return shared_key_index_path.string();
625626
}
626627

628+
std::string
629+
JsonKeyStats::GetMetaFilePath() {
630+
std::filesystem::path json_stats_dir = path_;
631+
std::filesystem::path meta_file_path =
632+
json_stats_dir / JSON_STATS_META_FILE_NAME;
633+
return meta_file_path.string();
634+
}
635+
636+
void
637+
JsonKeyStats::WriteMetaFile() {
638+
json_stats_meta_.SetLayoutTypeMap(key_types_);
639+
json_stats_meta_.SetInt64(META_KEY_NUM_ROWS, num_rows_);
640+
json_stats_meta_.SetInt64(META_KEY_NUM_SHREDDING_COLUMNS,
641+
column_keys_.size());
642+
643+
auto meta_content = json_stats_meta_.Serialize();
644+
auto meta_file_path = GetMetaFilePath();
645+
646+
auto local_chunk_manager =
647+
milvus::storage::LocalChunkManagerSingleton::GetInstance()
648+
.GetChunkManager();
649+
local_chunk_manager->Write(
650+
meta_file_path, meta_content.data(), meta_content.size());
651+
652+
meta_file_size_ = meta_content.size();
653+
LOG_INFO("write meta file: {} with size {} for segment {} for field {}",
654+
meta_file_path,
655+
meta_file_size_,
656+
segment_id_,
657+
field_id_);
658+
}
659+
660+
void
661+
JsonKeyStats::LoadMetaFile(const std::string& local_meta_file_path) {
662+
LOG_INFO("load meta file: {} for segment {} for field {}",
663+
local_meta_file_path,
664+
segment_id_,
665+
field_id_);
666+
667+
auto local_chunk_manager =
668+
storage::LocalChunkManagerSingleton::GetInstance().GetChunkManager();
669+
670+
auto file_size = local_chunk_manager->Size(local_meta_file_path);
671+
std::string meta_content;
672+
meta_content.resize(file_size);
673+
local_chunk_manager->Read(
674+
local_meta_file_path, meta_content.data(), file_size);
675+
676+
key_field_map_ = JsonStatsMeta::DeserializeToKeyFieldMap(meta_content);
677+
678+
LOG_INFO(
679+
"loaded meta file with {} key field entries for segment {} for field "
680+
"{}",
681+
key_field_map_.size(),
682+
segment_id_,
683+
field_id_);
684+
}
685+
627686
BinarySet
628687
JsonKeyStats::Serialize(const Config& config) {
629688
return BinarySet();
@@ -709,6 +768,9 @@ JsonKeyStats::BuildWithFieldData(const std::vector<FieldDataPtr>& field_datas,
709768
BuildKeyStats(field_datas, nullable);
710769
parquet_writer_->Close();
711770
bson_inverted_index_->BuildIndex();
771+
772+
// write meta file with layout type map and other metadata
773+
WriteMetaFile();
712774
}
713775

714776
void
@@ -725,7 +787,6 @@ JsonKeyStats::GetColumnSchemaFromParquet(int64_t column_group_id,
725787
LOG_DEBUG("get column schema: [{}] for segment {}",
726788
file_schema->ToString(true),
727789
segment_id_);
728-
column_group_schemas_[column_group_id] = file_schema;
729790

730791
for (const auto& field : file_schema->fields()) {
731792
auto field_name = field->name();
@@ -845,11 +906,18 @@ JsonKeyStats::LoadShreddingMeta(
845906
auto remote_prefix =
846907
AddBucketName(disk_file_manager_->GetRemoteJsonStatsShreddingPrefix());
847908

848-
// load common meta from parquet, all parquet files have the same meta
849-
// just need to read one file
850-
auto file = CreateColumnGroupParquetPath(
851-
remote_prefix, sorted_files[0].first, sorted_files[0].second[0]);
852-
GetCommonMetaFromParquet(file);
909+
// load common meta from parquet only if key_field_map_ is not already populated
910+
// (for backward compatibility with old data that doesn't have separate meta file)
911+
if (key_field_map_.empty()) {
912+
auto file = CreateColumnGroupParquetPath(
913+
remote_prefix, sorted_files[0].first, sorted_files[0].second[0]);
914+
GetCommonMetaFromParquet(file);
915+
} else {
916+
LOG_INFO(
917+
"skip loading common meta from parquet, already loaded from meta "
918+
"file for segment {}",
919+
segment_id_);
920+
}
853921

854922
// load distinct meta from parquet, distinct meta is different for each parquet file
855923
// main purpose is to get column schema
@@ -1003,7 +1071,11 @@ JsonKeyStats::Load(milvus::tracer::TraceContext ctx, const Config& config) {
10031071
AssertInfo(index_files.has_value(),
10041072
"index file paths is empty when load json stats for segment {}",
10051073
segment_id_);
1006-
// split index_files into shared_key_index and shredding_data
1074+
1075+
// split index_files into meta, shared_key_index, and shredding_data
1076+
// Note: Check directory paths (shared_key_index, shredding_data) BEFORE meta.json,
1077+
// because shared_key_index/meta.json_0 contains "meta.json" but is not the meta file.
1078+
std::vector<std::string> meta_files;
10071079
std::vector<std::string> shared_key_index_files;
10081080
std::vector<std::string> shredding_data_files;
10091081
for (const auto& file : index_files.value()) {
@@ -1012,6 +1084,8 @@ JsonKeyStats::Load(milvus::tracer::TraceContext ctx, const Config& config) {
10121084
} else if (file.find(JSON_STATS_SHREDDING_DATA_PATH) !=
10131085
std::string::npos) {
10141086
shredding_data_files.emplace_back(file);
1087+
} else if (file.find(JSON_STATS_META_FILE_NAME) != std::string::npos) {
1088+
meta_files.emplace_back(file);
10151089
} else {
10161090
ThrowInfo(ErrorCode::UnexpectedError,
10171091
"unknown file path: {} for segment {}",
@@ -1020,6 +1094,20 @@ JsonKeyStats::Load(milvus::tracer::TraceContext ctx, const Config& config) {
10201094
}
10211095
}
10221096

1097+
// load meta file first (contains layout type map)
1098+
if (!meta_files.empty()) {
1099+
AssertInfo(
1100+
meta_files.size() == 1,
1101+
"expected exactly one meta file, got {} for segment {}, field {}",
1102+
meta_files.size(),
1103+
segment_id_,
1104+
field_id_);
1105+
// cache meta file to local disk
1106+
auto local_meta_file = disk_file_manager_->CacheJsonStatsMetaToDisk(
1107+
meta_files[0], load_priority_);
1108+
LoadMetaFile(local_meta_file);
1109+
}
1110+
10231111
// load shredding data
10241112
LoadShreddingData(shredding_data_files);
10251113

@@ -1034,15 +1122,35 @@ JsonKeyStats::Upload(const Config& config) {
10341122
// upload inverted index
10351123
auto bson_index_stats = bson_inverted_index_->UploadIndex();
10361124

1125+
// upload meta file
1126+
auto meta_file_path = GetMetaFilePath();
1127+
AssertInfo(disk_file_manager_->AddJsonStatsMetaLog(meta_file_path),
1128+
"failed to upload meta file: {} for segment {}",
1129+
meta_file_path,
1130+
segment_id_);
1131+
10371132
// upload parquet file, parquet writer has already upload file to remote
10381133
auto shredding_remote_paths_to_size = parquet_writer_->GetPathsToSize();
10391134
auto shared_key_index_remote_paths_to_size =
10401135
bson_index_stats->GetSerializedIndexFileInfo();
1136+
auto meta_remote_paths_to_size =
1137+
disk_file_manager_->GetRemotePathsToFileSize();
10411138

10421139
// get all index files for meta
10431140
std::vector<SerializedIndexFileInfo> index_files;
10441141
index_files.reserve(shredding_remote_paths_to_size.size() +
1045-
shared_key_index_remote_paths_to_size.size());
1142+
shared_key_index_remote_paths_to_size.size() + 1);
1143+
1144+
// add meta file
1145+
for (const auto& [path, size] : meta_remote_paths_to_size) {
1146+
if (path.find(JSON_STATS_META_FILE_NAME) != std::string::npos) {
1147+
auto file_path = path.substr(path.find(JSON_STATS_META_FILE_NAME));
1148+
index_files.emplace_back(file_path, size);
1149+
LOG_INFO(
1150+
"upload meta file: {} for segment {}", file_path, segment_id_);
1151+
}
1152+
}
1153+
10461154
// only store shared_key_index/... and shredding_data/... to meta
10471155
// for saving meta space
10481156
for (const auto& file_info : shared_key_index_remote_paths_to_size) {
@@ -1065,16 +1173,18 @@ JsonKeyStats::Upload(const Config& config) {
10651173

10661174
LOG_INFO(
10671175
"upload json key stats for segment {} with bson mem size: {} "
1068-
"and "
1069-
"shredding data mem size: {} and index files size: {}",
1176+
"and shredding data mem size: {} and meta file size: {} "
1177+
"and index files size: {}",
10701178
segment_id_,
10711179
bson_index_stats->GetMemSize(),
10721180
parquet_writer_->GetTotalSize(),
1181+
meta_file_size_,
10731182
index_files.size());
10741183

1075-
return IndexStats::New(
1076-
bson_index_stats->GetMemSize() + parquet_writer_->GetTotalSize(),
1077-
std::move(index_files));
1184+
return IndexStats::New(bson_index_stats->GetMemSize() +
1185+
parquet_writer_->GetTotalSize() +
1186+
meta_file_size_,
1187+
std::move(index_files));
10781188
}
10791189

10801190
} // namespace milvus::index

internal/core/src/index/json_stats/JsonKeyStats.h

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,15 @@ class JsonKeyStats : public ScalarIndex<std::string> {
457457
std::string
458458
GetSharedKeyIndexDir();
459459

460+
std::string
461+
GetMetaFilePath();
462+
463+
void
464+
WriteMetaFile();
465+
466+
void
467+
LoadMetaFile(const std::string& meta_file_path);
468+
460469
void
461470
AddKeyStats(const std::vector<std::string>& path,
462471
JSONType type,
@@ -648,8 +657,6 @@ class JsonKeyStats : public ScalarIndex<std::string> {
648657
std::unordered_map<int64_t, std::string> field_id_to_name_map_;
649658
// field_name vector, the sequece is the same as the order of files
650659
std::vector<std::string> field_names_;
651-
// column_group_id -> schema, the sequence of schemas is the same as the order of files
652-
std::map<int64_t, std::shared_ptr<arrow::Schema>> column_group_schemas_;
653660
// field_name -> column
654661
mutable std::unordered_map<std::string,
655662
std::shared_ptr<milvus::ChunkedColumnInterface>>
@@ -661,6 +668,10 @@ class JsonKeyStats : public ScalarIndex<std::string> {
661668
SkipIndex skip_index_;
662669
cachinglayer::ResourceUsage cell_size_ = {0, 0};
663670

671+
// Meta file for storing layout type map and other metadata
672+
JsonStatsMeta json_stats_meta_;
673+
int64_t meta_file_size_{0};
674+
664675
// Friend accessor for unit tests to call private methods safely.
665676
friend class ::TraverseJsonForBuildStatsAccessor;
666677
friend class ::CollectSingleJsonStatsInfoAccessor;

internal/core/src/index/json_stats/utils.cpp

Lines changed: 95 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include <boost/filesystem.hpp>
1919
#include <boost/uuid/random_generator.hpp>
2020
#include <boost/uuid/uuid_io.hpp>
21+
#include <nlohmann/json.hpp>
2122
#include "milvus-storage/common/constants.h"
2223

2324
namespace milvus::index {
@@ -207,15 +208,101 @@ CreateArrowSchema(std::map<JsonKey, JsonKeyLayoutType> column_map) {
207208

208209
std::vector<std::pair<std::string, std::string>>
209210
CreateParquetKVMetadata(std::map<JsonKey, JsonKeyLayoutType> column_map) {
210-
nlohmann::json key_type;
211-
for (const auto& [key, type] : column_map) {
212-
key_type[key.ToColumnName()] = ToString(type);
211+
// layout type map is now stored in a separate meta file to reduce parquet file size.
212+
// return empty metadata vector.
213+
return {};
214+
}
215+
216+
std::string
217+
JsonStatsMeta::Serialize() const {
218+
nlohmann::json root;
219+
220+
// Serialize string values
221+
for (const auto& [key, value] : string_values_) {
222+
root[key] = value;
213223
}
214-
// for shared field, not need to store in metadata
215-
std::vector<std::pair<std::string, std::string>> res;
216-
res.push_back(
217-
std::make_pair(JSON_STATS_META_KEY_LAYOUT_TYPE_MAP, key_type.dump()));
218-
return res;
224+
225+
// Serialize int64 values
226+
for (const auto& [key, value] : int64_values_) {
227+
root[key] = value;
228+
}
229+
230+
// Serialize layout type map
231+
if (!layout_type_map_.empty()) {
232+
nlohmann::json layout_map;
233+
for (const auto& [json_key, layout_type] : layout_type_map_) {
234+
layout_map[json_key.ToColumnName()] = ToString(layout_type);
235+
}
236+
root[META_KEY_LAYOUT_TYPE_MAP] = layout_map;
237+
}
238+
239+
return root.dump();
240+
}
241+
242+
JsonStatsMeta
243+
JsonStatsMeta::Deserialize(const std::string& json_str) {
244+
JsonStatsMeta meta;
245+
246+
try {
247+
nlohmann::json root = nlohmann::json::parse(json_str);
248+
249+
for (auto it = root.begin(); it != root.end(); ++it) {
250+
const std::string& key = it.key();
251+
252+
if (key == META_KEY_LAYOUT_TYPE_MAP) {
253+
// Parse layout type map
254+
std::map<JsonKey, JsonKeyLayoutType> layout_map;
255+
for (auto& [column_name, layout_type_str] :
256+
it.value().items()) {
257+
auto json_type = GetJsonTypeFromKeyName(column_name);
258+
auto json_pointer = GetKeyFromColumnName(column_name);
259+
JsonKey json_key(json_pointer, json_type);
260+
auto layout_type =
261+
JsonKeyLayoutTypeFromString(layout_type_str);
262+
layout_map[json_key] = layout_type;
263+
}
264+
meta.SetLayoutTypeMap(layout_map);
265+
} else if (it.value().is_string()) {
266+
meta.SetString(key, it.value().get<std::string>());
267+
} else if (it.value().is_number_integer()) {
268+
meta.SetInt64(key, it.value().get<int64_t>());
269+
}
270+
// Other types can be added as needed
271+
}
272+
} catch (const std::exception& e) {
273+
ThrowInfo(ErrorCode::UnexpectedError,
274+
"Failed to deserialize JsonStatsMeta: {}",
275+
e.what());
276+
}
277+
278+
return meta;
279+
}
280+
281+
std::unordered_map<std::string, std::set<std::string>>
282+
JsonStatsMeta::DeserializeToKeyFieldMap(const std::string& json_str) {
283+
std::unordered_map<std::string, std::set<std::string>> key_field_map;
284+
285+
try {
286+
nlohmann::json root = nlohmann::json::parse(json_str);
287+
288+
auto it = root.find(META_KEY_LAYOUT_TYPE_MAP);
289+
if (it != root.end()) {
290+
for (auto& [column_name, layout_type_str] : it.value().items()) {
291+
auto layout_type = JsonKeyLayoutTypeFromString(layout_type_str);
292+
if (layout_type == JsonKeyLayoutType::SHARED) {
293+
continue;
294+
}
295+
auto json_pointer = GetKeyFromColumnName(column_name);
296+
key_field_map[json_pointer].insert(column_name);
297+
}
298+
}
299+
} catch (const std::exception& e) {
300+
ThrowInfo(ErrorCode::UnexpectedError,
301+
"Failed to deserialize JsonStatsMeta to key_field_map: {}",
302+
e.what());
303+
}
304+
305+
return key_field_map;
219306
}
220307

221308
} // namespace milvus::index

0 commit comments

Comments
 (0)