diff --git a/docs/flows.md b/docs/flows.md index 9d42583..36d82d1 100644 --- a/docs/flows.md +++ b/docs/flows.md @@ -195,7 +195,7 @@ my-flow -> -> pod/pod-$arg ``` will create two pods: `pod-a` and `pod-b`. -## Replication of flows +## Replication Flow replication is an AppController feature that makes specified number of flow graph copies, each one with a unique name and then merges them into a single graph. Because each replica name may be used in some of resource @@ -234,6 +234,96 @@ If there were 7 of them, 4 replicas would be deleted.\ `kubeac run my-flow` if there are no replicas exist, create one, otherwise validate status of resources of existing replicas. +### Replication of dependencies + +With commandline parameters one can create number of flow replicas. But sometimes there is a need to have flow +that creates several replicas of another flow, or just several resources with the same specification that differ +only in name. + +One possible solution is to utilize technique shown above: make parameter value be part of resource name and +then duplicate the dependency that leads to this resource and pass different parameter value along each of +dependencies. This works well for small and fixed number of replicas. But if the number goes big, it becomes hard +to manage such number of dependency objects. Moreover if the number itself is not fixed but rather passed as a +parameter replicating resource by manual replication of dependencies becomes impossible. + +Luckily, the dependencies can be automatically replicated. This is done through the `generateFor` field of the +`Dependency` object. `generateFor` is a map where keys are argument names and values are list expressions. Each list +expression is comma-separated list of values. If the value has a form of `number..number`, it is expended into a +list of integers in the given range. For example `"1..3, 10..11, abc"` will turn into `["1", "2", "3", "10", "11", "abc"]`. +Then the dependency is going to be replicated automatically with each replica getting on of the list values as an +additional argument. There can be several `generateFor` arguments. In this case there is going to be one dependency +for each combination of the list values. For example, + +```YAML +apiVersion: appcontroller.k8s/v1alpha1 +kind: Dependency +metadata: + name: dependency +parent: pod/podName +child: flow/flowName-$x-$y +generateFor: + x: 1..2 + y: a, b +``` + +has the same effect as + +```YAML +apiVersion: appcontroller.k8s/v1alpha1 +kind: Dependency +metadata: + name: dependency1 +parent: pod/podName +child: flow/flowName-$x-$y +args: + x: 1 + y: a +--- +apiVersion: appcontroller.k8s/v1alpha1 +kind: Dependency +metadata: + name: dependency2 +parent: pod/podName +child: flow/flowName-$x-$y +args: + x: 2 + y: a +--- +apiVersion: appcontroller.k8s/v1alpha1 +kind: Dependency +metadata: + name: dependency3 +parent: pod/podName +child: flow/flowName-$x-$y +args: + x: 1 + y: b +--- +apiVersion: appcontroller.k8s/v1alpha1 +kind: Dependency +metadata: + name: dependency4 +parent: pod/podName +child: flow/flowName-$x-$y +args: + x: 2 + y: b +``` + +Besides simplifying the dependency graph, dependency replication makes possible to have dynamic number of replicas +by using parameter value right inside the list expressions: + +```YAML +apiVersion: appcontroller.k8s/v1alpha1 +kind: Dependency +metadata: + name: dependency +parent: pod/podName +child: flow/flowName-$index +generateFor: + index: 1..$replicaCount +``` + ### Replica-spaces and contexts Replica-space, is a tag that all replicas of the flow share. When new `Replica` object for the flow is created, diff --git a/pkg/client/dependencies.go b/pkg/client/dependencies.go index 075b48d..5b72158 100644 --- a/pkg/client/dependencies.go +++ b/pkg/client/dependencies.go @@ -41,6 +41,9 @@ type Dependency struct { // Arguments passed to dependent resource Args map[string]string `json:"args,omitempty"` + + // map of variable name -> list expression. New dependencies are generated by replication and iteration over those lists + GenerateFor map[string]string `json:"generateFor,omitempty"` } // DependencyList is a k8s object representing list of dependencies diff --git a/pkg/interfaces/interfaces.go b/pkg/interfaces/interfaces.go index 384c010..929d627 100644 --- a/pkg/interfaces/interfaces.go +++ b/pkg/interfaces/interfaces.go @@ -82,7 +82,6 @@ type GraphContext interface { Scheduler() Scheduler GetArg(string) string Graph() DependencyGraph - Dependency() *client.Dependency } // DependencyGraphOptions contains all the input required to build a dependency graph diff --git a/pkg/resources/flow.go b/pkg/resources/flow.go index 564801f..4f7c0ce 100644 --- a/pkg/resources/flow.go +++ b/pkg/resources/flow.go @@ -15,9 +15,7 @@ package resources import ( - "fmt" "log" - "strings" "github.com/Mirantis/k8s-AppController/pkg/client" "github.com/Mirantis/k8s-AppController/pkg/interfaces" @@ -29,7 +27,6 @@ type flow struct { flow *client.Flow context interfaces.GraphContext originalName string - instanceName string currentGraph interfaces.DependencyGraph } @@ -52,19 +49,12 @@ func (flowTemplateFactory) Kind() string { func (flowTemplateFactory) New(def client.ResourceDefinition, c client.Interface, gc interfaces.GraphContext) interfaces.Resource { newFlow := parametrizeResource(def.Flow, gc, []string{"*"}).(*client.Flow) - dep := gc.Dependency() - var depName string - if dep != nil { - depName = strings.Replace(dep.Name, dep.GenerateName, "", 1) - } - return report.SimpleReporter{ BaseResource: &flow{ Base: Base{def.Meta}, flow: newFlow, context: gc, originalName: def.Flow.Name, - instanceName: fmt.Sprintf("%s%s", depName, gc.GetArg("AC_NAME")), }} } @@ -98,7 +88,7 @@ func (f *flow) buildDependencyGraph(replicaCount int, silent bool) (interfaces.D options := interfaces.DependencyGraphOptions{ FlowName: f.originalName, Args: args, - FlowInstanceName: f.instanceName, + FlowInstanceName: f.context.GetArg("AC_ID"), ReplicaCount: replicaCount, Silent: silent, FixedNumberOfReplicas: fixedNumberOfReplicas, diff --git a/pkg/scheduler/dependency_graph.go b/pkg/scheduler/dependency_graph.go index 8dab597..737840d 100644 --- a/pkg/scheduler/dependency_graph.go +++ b/pkg/scheduler/dependency_graph.go @@ -19,6 +19,7 @@ import ( "fmt" "log" "sort" + "strconv" "strings" "time" @@ -40,12 +41,12 @@ type dependencyGraph struct { } type graphContext struct { - args map[string]string - graph *dependencyGraph - scheduler *scheduler - flow *client.Flow - dependency *client.Dependency - replica string + args map[string]string + graph *dependencyGraph + scheduler *scheduler + flow *client.Flow + id string + replica string } var _ interfaces.GraphContext = &graphContext{} @@ -62,6 +63,8 @@ func (gc graphContext) GetArg(name string) string { return gc.replica case "AC_FLOW_NAME": return gc.flow.Name + case "AC_ID": + return gc.id default: val, ok := gc.args[name] if ok { @@ -84,11 +87,6 @@ func (gc graphContext) Graph() interfaces.DependencyGraph { return gc.graph } -// Dependency returns Dependency for which child is the resource being created with this context -func (gc graphContext) Dependency() *client.Dependency { - return gc.dependency -} - // newScheduledResourceFor returns new scheduled resource for given resource in init state func newScheduledResourceFor(r interfaces.Resource, suffix string, context *graphContext, existing bool) *ScheduledResource { return &ScheduledResource{ @@ -159,7 +157,9 @@ func groupDependencies(dependencies []client.Dependency, defaultFlow = []client.Dependency{} addResource := func(name string) { if !strings.HasPrefix(name, "flow/") && !isDependant[name] { - defaultFlow = append(defaultFlow, client.Dependency{Parent: defaultFlowName, Child: name}) + dep := client.Dependency{Parent: defaultFlowName, Child: name} + dep.Name = name + defaultFlow = append(defaultFlow, dep) isDependant[name] = true } } @@ -328,11 +328,11 @@ func getArgFunc(gc interfaces.GraphContext) func(string) string { func (sched *scheduler) prepareContext(parentContext *graphContext, dependency *client.Dependency, replica string) *graphContext { context := &graphContext{ - scheduler: sched, - graph: parentContext.graph, - flow: parentContext.flow, - replica: replica, - dependency: dependency, + scheduler: sched, + graph: parentContext.graph, + flow: parentContext.flow, + replica: replica, + id: getVertexID(dependency, replica), } context.args = make(map[string]string) @@ -344,6 +344,15 @@ func (sched *scheduler) prepareContext(parentContext *graphContext, dependency * return context } +func getVertexID(dependency *client.Dependency, replica string) string { + var depName string + if dependency != nil { + depName = strings.Replace(dependency.Name, dependency.GenerateName, "", 1) + } + depName += replica + return depName +} + func (sched *scheduler) updateContext(context, parentContext *graphContext, dependency client.Dependency) { for key, value := range dependency.Args { context.args[key] = copier.EvaluateString(value, parentContext.GetArg) @@ -661,6 +670,105 @@ func (sched *scheduler) BuildDependencyGraph(options interfaces.DependencyGraphO return depGraph, nil } +func listDependencies(dependencies map[string][]client.Dependency, parent string, flow *client.Flow, + useDestructionSelector bool, context *graphContext) []client.Dependency { + + deps := filterDependencies(dependencies, parent, flow, useDestructionSelector) + var result []client.Dependency + for _, dep := range deps { + if len(dep.GenerateFor) == 0 { + result = append(result, dep) + continue + } + + var keys []string + for k := range dep.GenerateFor { + keys = append(keys, k) + } + sort.Strings(keys) + lists := make([][]string, len(dep.GenerateFor)) + for i, key := range keys { + lists[i] = expandListExpression(copier.EvaluateString(dep.GenerateFor[key], getArgFunc(context))) + } + for n, combination := range permute(lists) { + newArgs := make(map[string]string, len(dep.Args)+len(keys)) + for k, v := range dep.Args { + newArgs[k] = v + } + for i, key := range keys { + newArgs[key] = combination[i] + } + depCopy := dep + depCopy.Args = newArgs + depCopy.Name += strconv.Itoa(n + 1) + result = append(result, depCopy) + } + } + return result +} + +func permute(variants [][]string) [][]string { + switch len(variants) { + case 0: + return variants + case 1: + var result [][]string + for _, v := range variants[0] { + result = append(result, []string{v}) + } + return result + default: + var result [][]string + for _, tail := range variants[len(variants)-1] { + for _, p := range permute(variants[:len(variants)-1]) { + result = append(result, append(p, tail)) + } + } + return result + } +} + +func expandListExpression(expr string) []string { + var result []string + for _, part := range strings.Split(expr, ",") { + part = strings.TrimSpace(part) + if part == "" { + continue + } + + isRange := true + var from, to int + + rangeParts := strings.SplitN(part, "..", 2) + if len(rangeParts) != 2 { + isRange = false + } + + var err error + if isRange { + from, err = strconv.Atoi(rangeParts[0]) + if err != nil { + isRange = false + } + } + if isRange { + to, err = strconv.Atoi(rangeParts[1]) + if err != nil { + isRange = false + } + } + + if isRange { + for i := from; i <= to; i++ { + result = append(result, strconv.Itoa(i)) + } + } else { + result = append(result, part) + } + } + return result +} + func (sched *scheduler) fillDependencyGraph(rootContext *graphContext, resDefs map[string]client.ResourceDefinition, dependencies map[string][]client.Dependency, @@ -683,7 +791,7 @@ func (sched *scheduler) fillDependencyGraph(rootContext *graphContext, for e := queue.Front(); e != nil; e = e.Next() { parent := e.Value.(*Block) - deps := filterDependencies(dependencies, parent.dependency.Child, flow, useDestructionSelector) + deps := listDependencies(dependencies, parent.dependency.Child, flow, useDestructionSelector, replicaContext) for _, dep := range deps { if parent.scheduledResource != nil && strings.HasPrefix(parent.scheduledResource.Key(), "flow/") { diff --git a/pkg/scheduler/dependency_graph_test.go b/pkg/scheduler/dependency_graph_test.go index 9cc7068..07ae319 100644 --- a/pkg/scheduler/dependency_graph_test.go +++ b/pkg/scheduler/dependency_graph_test.go @@ -15,6 +15,7 @@ package scheduler import ( + "strings" "testing" "github.com/Mirantis/k8s-AppController/pkg/client" @@ -319,3 +320,76 @@ func TestDependencyToFlowMatching(t *testing.T) { } } } + +// TestPermute tests permute function +func TestPermute(t *testing.T) { + alphabets := [][]string{ + {"1", "2", "3"}, + {"+", "-"}, + {"a", "b"}, + {"="}, + } + + expected := map[string]bool{ + "1+a=": true, + "1+b=": true, + "1-a=": true, + "1-b=": true, + "2+a=": true, + "2+b=": true, + "2-a=": true, + "2-b=": true, + "3+a=": true, + "3+b=": true, + "3-a=": true, + "3-b=": true, + } + permutations := permute(alphabets) + for _, combination := range permutations { + combinationStr := strings.Join(combination, "") + if !expected[combinationStr] { + t.Errorf("unexpected combination %s", combinationStr) + } else { + delete(expected, combinationStr) + } + } + if len(expected) != 0 { + t.Error("not all combinations were generated") + } + + alphabets = append(alphabets, make([]string, 0)) + if len(permute(alphabets)) != 0 { + t.Error("empty alphabet didin't result in empty permutation list") + } +} + +// TestExpendListExpression tests list expression translation to list of strings +func TestExpendListExpression(t *testing.T) { + table := map[string][]string{ + "1": {"1"}, + "1..5": {"1", "2", "3", "4", "5"}, + "2..-1": {}, + "a, b": {"a", "b"}, + "a, b, 2..4": {"a", "b", "2", "3", "4"}, + "-1..1, 2..4, x": {"-1", "0", "1", "2", "3", "4", "x"}, + "a..b": {"a..b"}, + "..": {".."}, + "1...3": {"1...3"}, + "1..b": {"1..b"}, + "a..b, 1..3": {"a..b", "1", "2", "3"}, + "a..b, c..d": {"a..b", "c..d"}, + "": {}, + } + for expr, expected := range table { + result := expandListExpression(expr) + if len(result) != len(expected) { + t.Errorf("unexpected result length for expression %s: %d != %d", expr, len(result), len(expected)) + } else { + for i := range expected { + if expected[i] != result[i] { + t.Errorf("invalid entry %d for expression %s: %s != %s", i, expr, expected[i], result[i]) + } + } + } + } +} diff --git a/pkg/scheduler/flows_test.go b/pkg/scheduler/flows_test.go index 464865c..700e7e0 100644 --- a/pkg/scheduler/flows_test.go +++ b/pkg/scheduler/flows_test.go @@ -1367,3 +1367,102 @@ func TestSyncOnVoidResource(t *testing.T) { depGraph.Deploy(stopChan) ensureReplicas(c, t, replicaCount, replicaCount) } + +// TestConsumeReplicatedFlow tests case, where each replica of the outer flow consumes N replicas of another flow +// by replicating dependency which leads to the consumed flow +func TestConsumeReplicatedFlow(t *testing.T) { + dep := mocks.MakeDependency("flow/outer", "flow/inner/$AC_NAME-$i", "flow=outer") + dep.GenerateFor = map[string]string{"i": "1..3"} + + c := mocks.NewClient( + mocks.MakeFlow("inner"), + mocks.MakeFlow("outer"), + mocks.MakeResourceDefinition("job/ready-$AC_NAME"), + dep, + mocks.MakeDependency("flow/inner", "job/ready-$AC_NAME", "flow=inner"), + ) + depGraph, err := New(c, nil, 0).BuildDependencyGraph( + interfaces.DependencyGraphOptions{ReplicaCount: 2, FlowName: "outer"}) + if err != nil { + t.Fatal(err) + } + stopChan := make(chan struct{}) + depGraph.Deploy(stopChan) + + ensureReplicas(c, t, 2*3, 3*2+2) +} + +// TestComplexDependencyReplication tests complex dependency generation over two list expressions +func TestComplexDependencyReplication(t *testing.T) { + dep := mocks.MakeDependency("flow/test", "job/ready-$x-$y", "flow=test") + dep.GenerateFor = map[string]string{ + "x": "1..3, 8..9", + "y": "a, b", + } + + c := mocks.NewClient( + mocks.MakeFlow("test"), + mocks.MakeResourceDefinition("job/ready-$x-$y"), + dep, + ) + depGraph, err := New(c, nil, 0).BuildDependencyGraph( + interfaces.DependencyGraphOptions{ReplicaCount: 1, FlowName: "test"}) + if err != nil { + t.Fatal(err) + } + stopChan := make(chan struct{}) + depGraph.Deploy(stopChan) + + expectedJobNames := map[string]bool{ + "ready-1-a": true, + "ready-2-a": true, + "ready-3-a": true, + "ready-8-a": true, + "ready-9-a": true, + "ready-1-b": true, + "ready-2-b": true, + "ready-3-b": true, + "ready-8-b": true, + "ready-9-b": true, + } + jobs := ensureReplicas(c, t, len(expectedJobNames), 1) + for _, j := range jobs { + if !expectedJobNames[j.Name] { + t.Errorf("unexpected job %s", j.Name) + } else { + delete(expectedJobNames, j.Name) + } + } + if len(expectedJobNames) != 0 { + t.Error("not all jobs were found") + } +} + +// TestDynamicDependencyReplication tests that variables can be used in list expressions used for dependency replication +func TestDynamicDependencyReplication(t *testing.T) { + flow := mocks.MakeFlow("test") + flow.Flow.Parameters = map[string]client.FlowParameter{ + "replicaCount": mocks.MakeFlowParameter("1"), + } + + dep := mocks.MakeDependency("flow/test", "job/ready-$index", "flow=test") + dep.GenerateFor = map[string]string{ + "index": "1..$replicaCount", + } + + c := mocks.NewClient( + flow, + mocks.MakeResourceDefinition("job/ready-$index"), + dep, + ) + depGraph, err := New(c, nil, 0).BuildDependencyGraph( + interfaces.DependencyGraphOptions{ReplicaCount: 1, FlowName: "test", + Args: map[string]string{"replicaCount": "7"}}) + if err != nil { + t.Fatal(err) + } + stopChan := make(chan struct{}) + depGraph.Deploy(stopChan) + + ensureReplicas(c, t, 7, 1) +}