Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/ingester/ckissu/ckissu.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ func (i *Issu) addColumnDatasource(index int, d *DatasourceInfo, isMapTable, isA
return nil, fmt.Errorf("invalid table name %s", d.name)
}

rawTable := flow_metrics.GetMetricsTables(ckdb.MergeTree, common.CK_VERSION, ckdb.DF_CLUSTER, ckdb.DF_STORAGE_POLICY, i.ckdbType, 7, 1, 7, 1, i.cfg.GetCKDBColdStorages())[flow_metrics.MetricsTableNameToID(d.name[:lastDotIndex+1]+d.baseTable)]
rawTable := flow_metrics.GetMetricsTables(ckdb.MergeTree, common.CK_VERSION, ckdb.DF_CLUSTER, ckdb.DF_STORAGE_POLICY, i.ckdbType, 7, 1, 7, 1, i.cfg.GetCKDBColdStorages())[flow_metrics.MetricsTableNameToID(d.name[:lastDotIndex+1]+"1m")]
// create table mv
aggrInterval := ckdb.AggregationHour
if d.interval == ckdb.TimeFuncDay {
Expand Down
23 changes: 22 additions & 1 deletion server/ingester/ckissu/updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"github.com/deepflowio/deepflow/server/libs/ckdb"
)

var AllColumnAdds = [][]*ColumnAdds{ColumnAdd64, ColumnAdd65, ColumnAdd66}
var AllColumnAdds = [][]*ColumnAdds{ColumnAdd64, ColumnAdd65, ColumnAdd66, ColumnAdd70}
var AllIndexAdds = [][]*IndexAdd{getIndexAdds(IndexAdd64), getIndexAdds(IndexAdd65)}
var AllColumnMods = [][]*ColumnMod{}
var AllColumnRenames = [][]*ColumnRename{getColumnRenames(ColumnRename65)}
Expand Down Expand Up @@ -376,3 +376,24 @@ var ColumnAdd66 = []*ColumnAdds{
ColumnType: ckdb.UInt8,
},
}

var ColumnAdd70 = []*ColumnAdds{
{
Dbs: []string{"deepflow_tenant"},
Tables: []string{"deepflow_collector", "deepflow_collector_local"},
ColumnNames: []string{"host"},
ColumnType: ckdb.LowCardinalityString,
},
{
Dbs: []string{"deepflow_admin"},
Tables: []string{"deepflow_server", "deepflow_server_local"},
ColumnNames: []string{"host"},
ColumnType: ckdb.LowCardinalityString,
},
{
Dbs: []string{"ext_metrics"},
Tables: []string{"metrics", "metrics_local"},
ColumnNames: []string{"host"},
ColumnType: ckdb.LowCardinalityString,
},
}
2 changes: 1 addition & 1 deletion server/ingester/common/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@
package common

const (
CK_VERSION = "v6.6.3.0" // 用于表示clickhouse的表版本号
CK_VERSION = "v7.0.0.0" // 用于表示clickhouse的表版本号
)
12 changes: 2 additions & 10 deletions server/ingester/config/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/client-go/rest"

libs "github.com/deepflowio/deepflow/server/libs/kubernetes"
"github.com/deepflowio/deepflow/server/libs/utils"
corev1 "k8s.io/api/core/v1"
)

Expand Down Expand Up @@ -170,15 +171,6 @@ func (w *Watcher) Run() {
}
}

func indexOf(ss []string, s string) int {
for i, v := range ss {
if v == s {
return i
}
}
return -1
}

func (w *Watcher) getMyClickhouseEndpointsExternal() ([]Endpoint, error) {
podNames, err := w.getPodNames()
if err != nil {
Expand All @@ -197,7 +189,7 @@ func (w *Watcher) getMyClickhouseEndpointsExternal() ([]Endpoint, error) {
// 2. Input the list of all clickhouse endpoints, and sort by IP
// 3, my corresponding 'clickhouse endpoint' is on position 'index%len' in the 'clickhouse endpoints list'
func getMyClickhouseEndpoints(podNames []string, myName string, endpoints []Endpoint) ([]Endpoint, error) {
myIndex := indexOf(podNames, myName)
myIndex := utils.IndexOf(podNames, myName)
if myIndex < 0 {
return nil, fmt.Errorf("can't find my pod name(%s) in pods(%v)", myName, podNames)
}
Expand Down
2 changes: 2 additions & 0 deletions server/ingester/ext_metrics/dbwriter/ext_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type ExtMetrics struct {
UniversalTag flow_metrics.UniversalTag

VTableName string
Host string

AgentID uint16

Expand Down Expand Up @@ -107,6 +108,7 @@ func (m *ExtMetrics) Columns() []*ckdb.Column {
ckdb.NewColumn("tag_values", ckdb.ArrayLowCardinalityString).SetComment("额外的tag对应的值"),
ckdb.NewColumn("metrics_float_names", ckdb.ArrayLowCardinalityString).SetComment("额外的float类型metrics"),
ckdb.NewColumn("metrics_float_values", ckdb.ArrayFloat64).SetComment("额外的float metrics值"),
ckdb.NewColumn("host", ckdb.LowCardinalityString),
)

return columns
Expand Down
10 changes: 10 additions & 0 deletions server/ingester/ext_metrics/dbwriter/ext_metrics_column_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/deepflowio/deepflow/server/libs/ckdb"
"github.com/deepflowio/deepflow/server/libs/datatype"
flow_metrics "github.com/deepflowio/deepflow/server/libs/flow-metrics"
"github.com/deepflowio/deepflow/server/libs/utils"
)

type ExtMetricsBlock struct {
Expand All @@ -33,6 +34,7 @@ type ExtMetricsBlock struct {
ColTagValues *proto.ColArr[string]
ColMetricsFloatNames *proto.ColArr[string]
ColMetricsFloatValues *proto.ColArr[float64]
ColHost *proto.ColLowCardinality[string]
}

func (b *ExtMetricsBlock) Reset() {
Expand All @@ -46,6 +48,7 @@ func (b *ExtMetricsBlock) Reset() {
b.ColTagValues.Reset()
b.ColMetricsFloatNames.Reset()
b.ColMetricsFloatValues.Reset()
b.ColHost.Reset()
}

func (b *ExtMetricsBlock) ToInput(input proto.Input) proto.Input {
Expand All @@ -57,6 +60,7 @@ func (b *ExtMetricsBlock) ToInput(input proto.Input) proto.Input {
proto.InputColumn{Name: ckdb.COLUMN_TAG_VALUES, Data: b.ColTagValues},
proto.InputColumn{Name: ckdb.COLUMN_METRICS_FLOAT_NAMES, Data: b.ColMetricsFloatNames},
proto.InputColumn{Name: ckdb.COLUMN_METRICS_FLOAT_VALUES, Data: b.ColMetricsFloatValues},
proto.InputColumn{Name: ckdb.COLUMN_HOST, Data: b.ColHost},
)
if b.MsgType != datatype.MESSAGE_TYPE_DFSTATS && b.MsgType != datatype.MESSAGE_TYPE_SERVER_DFSTATS {
input = b.UniversalTagBlock.ToInput(input)
Expand All @@ -73,6 +77,7 @@ func (n *ExtMetrics) NewColumnBlock() ckdb.CKColumnBlock {
ColTagValues: new(proto.ColStr).Array(),
ColMetricsFloatNames: new(proto.ColStr).LowCardinality().Array(),
ColMetricsFloatValues: new(proto.ColFloat64).Array(),
ColHost: new(proto.ColStr).LowCardinality(),
}
}

Expand All @@ -89,4 +94,9 @@ func (n *ExtMetrics) AppendToColumnBlock(b ckdb.CKColumnBlock) {
block.ColTagValues.Append(n.TagValues)
block.ColMetricsFloatNames.Append(n.MetricsFloatNames)
block.ColMetricsFloatValues.Append(n.MetricsFloatValues)
if i := utils.IndexOf(n.TagNames, ckdb.COLUMN_HOST); i >= 0 {
block.ColHost.Append(n.TagValues[i])
} else {
block.ColHost.Append("")
}
}
1 change: 1 addition & 0 deletions server/libs/ckdb/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ const (
COLUMN_GPROCESS_ID = "gprocess_id"
COLUMN_GPROCESS_ID_0 = "gprocess_id_0"
COLUMN_GPROCESS_ID_1 = "gprocess_id_1"
COLUMN_HOST = "host"
COLUMN_HOST_ID = "host_id"
COLUMN_HOST_ID_0 = "host_id_0"
COLUMN_HOST_ID_1 = "host_id_1"
Expand Down
9 changes: 9 additions & 0 deletions server/libs/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,3 +524,12 @@ func EscapeJsonString(str string) string {
func CloneStringSlice(strs []string) []string {
return append([]string{}, strs...)
}

func IndexOf(strs []string, str string) int {
for i, v := range strs {
if v == str {
return i
}
}
return -1
}
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
# Name , ClientName , ServerName , Type , EnumFile , Category , Permission , Deprecated
time , time , time , time , , Timestamp , 111 , 0
# Name , ClientName , ServerName , Type , EnumFile , Category , Permission , Deprecated
time , time , time , time , , Timestamp , 111 , 0
host , host , host , string , , Universal Tag , 111 , 0
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
# Name , DisplayName , Description
time , 时间 ,
host , 数据节点 ,
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
# Name , DisplayName , Description
time , Time ,
host , Data Node ,
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
# Name , ClientName , ServerName , Type , EnumFile , Category , Permission , Deprecated
time , time , time , time , , Timestamp , 111 , 0
# Name , ClientName , ServerName , Type , EnumFile , Category , Permission , Deprecated
time , time , time , time , , Timestamp , 111 , 0
host , host , host , string , , Universal Tag , 111 , 0
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
# Name , DisplayName , Description
time , 时间 ,
host , 采集节点 ,
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
# Name , DisplayName , Description
time , Time ,
host , Collection Node ,
Loading