1818
1919package com .alibaba .graphscope .common .ir .meta .schema ;
2020
21+ import com .alibaba .graphscope .groot .common .schema .api .*;
2122import com .alibaba .graphscope .groot .common .util .IrSchemaParser ;
22- import com .fasterxml .jackson .databind . JsonNode ;
23+ import com .fasterxml .jackson .core . JsonProcessingException ;
2324import com .fasterxml .jackson .databind .ObjectMapper ;
25+ import com .google .common .base .Preconditions ;
26+ import com .google .common .collect .ImmutableMap ;
2427import com .google .common .collect .Lists ;
2528
26- import org .checkerframework .checker .nullness .qual .Nullable ;
29+ import org .apache .calcite .rel .type .RelDataType ;
30+ import org .apache .calcite .rel .type .RelDataTypeFactory ;
2731import org .slf4j .Logger ;
2832import org .slf4j .LoggerFactory ;
2933import org .yaml .snakeyaml .Yaml ;
3034
3135import java .util .List ;
3236import java .util .Map ;
37+ import java .util .stream .Collectors ;
3338
3439public class SchemaSpecManager {
3540 private static final Logger logger = LoggerFactory .getLogger (SchemaSpecManager .class );
36- private final IrGraphSchema parent ;
41+ private final GraphSchema rootSchema ;
42+ private final boolean isColumnId ;
43+ private final RelDataTypeFactory typeFactory ;
3744 private final List <SchemaSpec > specifications ;
3845
39- public SchemaSpecManager (IrGraphSchema parent ) {
40- this .parent = parent ;
41- this .specifications = Lists .newArrayList (convert (null , SchemaSpec .Type .IR_CORE_IN_JSON ));
46+ public SchemaSpecManager (
47+ GraphSchema rootSchema , boolean isColumnId , RelDataTypeFactory typeFactory ) {
48+ this .rootSchema = rootSchema ;
49+ this .isColumnId = isColumnId ;
50+ this .typeFactory = typeFactory ;
51+ this .specifications = Lists .newArrayList ();
4252 }
4353
44- public SchemaSpecManager (IrGraphSchema parent , SchemaSpec input ) {
45- this .parent = parent ;
54+ public SchemaSpecManager (
55+ GraphSchema rootSchema ,
56+ boolean isColumnId ,
57+ RelDataTypeFactory typeFactory ,
58+ SchemaSpec input ) {
59+ this .rootSchema = rootSchema ;
60+ this .isColumnId = isColumnId ;
61+ this .typeFactory = typeFactory ;
4662 this .specifications = Lists .newArrayList (input );
4763 }
4864
@@ -52,62 +68,103 @@ public SchemaSpec getSpec(SchemaSpec.Type type) {
5268 return spec ;
5369 }
5470 }
55- // if not exist, try to create a new JsonSpecification with content converted from others
5671 SchemaSpec newSpec ;
57- List <SchemaSpec .Type > existing = Lists .newArrayList ();
58- for (SchemaSpec spec : specifications ) {
59- if ((newSpec = convert (spec , type )) != null ) {
60- specifications .add (newSpec );
61- return newSpec ;
62- }
63- existing .add (spec .getType ());
72+ switch (type ) {
73+ case IR_CORE_IN_JSON :
74+ newSpec =
75+ new SchemaSpec (
76+ type , IrSchemaParser .getInstance ().parse (rootSchema , isColumnId ));
77+ break ;
78+ case FLEX_IN_JSON :
79+ SchemaSpec yamlSpec = getSpec (SchemaSpec .Type .FLEX_IN_YAML );
80+ Preconditions .checkArgument (
81+ yamlSpec != null ,
82+ "cannot get schema specification of type " + SchemaSpec .Type .FLEX_IN_YAML );
83+ Yaml yaml = new Yaml ();
84+ Map yamlMap = yaml .load (yamlSpec .getContent ());
85+ ObjectMapper mapper = new ObjectMapper ();
86+ try {
87+ newSpec = new SchemaSpec (type , mapper .writeValueAsString (yamlMap ));
88+ } catch (JsonProcessingException e ) {
89+ throw new RuntimeException (e );
90+ }
91+ break ;
92+ case FLEX_IN_YAML :
93+ default :
94+ newSpec = convertToFlex (rootSchema );
95+ break ;
6496 }
65- throw new IllegalArgumentException (
66- "spec type ["
67- + type
68- + "] cannot be converted from any existing spec types "
69- + existing );
97+ this .specifications .add (newSpec );
98+ return newSpec ;
7099 }
71100
72- private @ Nullable SchemaSpec convert (@ Nullable SchemaSpec source , SchemaSpec .Type target ) {
73- try {
74- if (source != null && source .getType () == target ) {
75- return source ;
76- }
77- switch (target ) {
78- case IR_CORE_IN_JSON :
79- return new SchemaSpec (
80- target ,
81- IrSchemaParser .getInstance ()
82- .parse (parent .getGraphSchema (), parent .isColumnId ()));
83- case FLEX_IN_JSON :
84- if (source != null && source .getType () == SchemaSpec .Type .FLEX_IN_YAML ) {
85- Yaml yaml = new Yaml ();
86- Map rootMap = yaml .load (source .getContent ());
87- ObjectMapper mapper = new ObjectMapper ();
88- return new SchemaSpec (target , mapper .writeValueAsString (rootMap ));
89- }
90- // todo: convert from JSON in IR_CORE to JSON in FLEX
91- return null ;
92- case FLEX_IN_YAML :
93- default :
94- if (source != null && source .getType () == SchemaSpec .Type .FLEX_IN_JSON ) {
95- ObjectMapper mapper = new ObjectMapper ();
96- JsonNode rootNode = mapper .readTree (source .getContent ());
97- Map rootMap = mapper .convertValue (rootNode , Map .class );
98- Yaml yaml = new Yaml ();
99- return new SchemaSpec (target , yaml .dump (rootMap ));
100- }
101- // todo: convert from JSON in IR_CORE to YAML in FlEX
102- return null ;
103- }
104- } catch (Exception e ) {
105- logger .warn (
106- "can not convert from {} to {} due to some unexpected exception:" ,
107- source == null ? null : source .getType (),
108- target ,
109- e );
110- return null ;
101+ private SchemaSpec convertToFlex (GraphSchema schema ) {
102+ List <Map > vertices =
103+ schema .getVertexList ().stream ()
104+ .map (this ::convertVertex )
105+ .collect (Collectors .toList ());
106+ List <Map > edges =
107+ schema .getEdgeList ().stream ().map (this ::convertEdge ).collect (Collectors .toList ());
108+ Map <String , Object > flexMap =
109+ ImmutableMap .of (
110+ "schema" , ImmutableMap .of ("vertex_types" , vertices , "edge_types" , edges ));
111+ Yaml yaml = new Yaml ();
112+ return new SchemaSpec (SchemaSpec .Type .FLEX_IN_YAML , yaml .dump (flexMap ));
113+ }
114+
115+ private Map <String , Object > convertVertex (GraphVertex vertex ) {
116+ return ImmutableMap .of (
117+ "type_name" , vertex .getLabel (),
118+ "type_id" , vertex .getLabelId (),
119+ "properties" ,
120+ vertex .getPropertyList ().stream ()
121+ .map (this ::convertProperty )
122+ .collect (Collectors .toList ()),
123+ "primary_keys" ,
124+ vertex .getPrimaryKeyList ().stream ()
125+ .map (GraphProperty ::getName )
126+ .collect (Collectors .toList ()));
127+ }
128+
129+ private Map <String , Object > convertEdge (GraphEdge edge ) {
130+ return ImmutableMap .of (
131+ "type_name" , edge .getLabel (),
132+ "type_id" , edge .getLabelId (),
133+ "vertex_type_pair_relations" ,
134+ edge .getRelationList ().stream ()
135+ .map (this ::convertRelation )
136+ .collect (Collectors .toList ()),
137+ "properties" ,
138+ edge .getPropertyList ().stream ()
139+ .map (this ::convertProperty )
140+ .collect (Collectors .toList ()),
141+ "primary_keys" ,
142+ edge .getPrimaryKeyList ().stream ()
143+ .map (GraphProperty ::getName )
144+ .collect (Collectors .toList ()));
145+ }
146+
147+ private Map <String , Object > convertRelation (EdgeRelation relation ) {
148+ return ImmutableMap .of (
149+ "source_vertex" , relation .getSource ().getLabel (),
150+ "destination_vertex" , relation .getTarget ().getLabel ());
151+ }
152+
153+ private Map <String , Object > convertProperty (GraphProperty property ) {
154+ RelDataType propertyType ;
155+ if (property instanceof IrGraphProperty ) {
156+ propertyType = ((IrGraphProperty ) property ).getRelDataType ();
157+ } else {
158+ propertyType =
159+ (new IrDataTypeConvertor .Groot (typeFactory , false ))
160+ .convert (property .getDataType ());
111161 }
162+ // convert property type to flex format
163+ IrDataTypeConvertor .Flex flexConvertor = new IrDataTypeConvertor .Flex (typeFactory , false );
164+ GSDataTypeDesc typeDesc = flexConvertor .convert (propertyType );
165+ return ImmutableMap .of (
166+ "property_id" , property .getId (),
167+ "property_name" , property .getName (),
168+ "property_type" , typeDesc .getYamlDesc ());
112169 }
113170}
0 commit comments