Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 91 additions & 1 deletion docs/flows.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ my-flow -> -> pod/pod-$arg
```
will create two pods: `pod-a` and `pod-b`.

## Replication of flows
## Replication

Flow replication is an AppController feature that makes specified number of flow graph copies, each one with
a unique name and then merges them into a single graph. Because each replica name may be used in some of resource
Expand Down Expand Up @@ -234,6 +234,96 @@ If there were 7 of them, 4 replicas would be deleted.\
`kubeac run my-flow` if there are no replicas exist, create one, otherwise validate status of
resources of existing replicas.

### Replication of dependencies

With commandline parameters one can create number of flow replicas. But sometimes there is a need to have flow
that creates several replicas of another flow, or just several resources with the same specification that differ
only in name.

One possible solution is to utilize technique shown above: make parameter value be part of resource name and
then duplicate the dependency that leads to this resource and pass different parameter value along each of
dependencies. This works well for small and fixed number of replicas. But if the number goes big, it becomes hard
to manage such number of dependency objects. Moreover if the number itself is not fixed but rather passed as a
parameter replicating resource by manual replication of dependencies becomes impossible.

Luckily, the dependencies can be automatically replicated. This is done through the `generateFor` field of the
`Dependency` object. `generateFor` is a map where keys are argument names and values are list expressions. Each list
expression is comma-separated list of values. If the value has a form of `number..number`, it is expended into a
list of integers in the given range. For example `"1..3, 10..11, abc"` will turn into `["1", "2", "3", "10", "11", "abc"]`.
Then the dependency is going to be replicated automatically with each replica getting on of the list values as an
additional argument. There can be several `generateFor` arguments. In this case there is going to be one dependency
for each combination of the list values. For example,

```YAML
apiVersion: appcontroller.k8s/v1alpha1
kind: Dependency
metadata:
name: dependency
parent: pod/podName
child: flow/flowName-$x-$y
generateFor:
x: 1..2
y: a, b
```

has the same effect as

```YAML
apiVersion: appcontroller.k8s/v1alpha1
kind: Dependency
metadata:
name: dependency1
parent: pod/podName
child: flow/flowName-$x-$y
args:
x: 1
y: a
---
apiVersion: appcontroller.k8s/v1alpha1
kind: Dependency
metadata:
name: dependency2
parent: pod/podName
child: flow/flowName-$x-$y
args:
x: 2
y: a
---
apiVersion: appcontroller.k8s/v1alpha1
kind: Dependency
metadata:
name: dependency3
parent: pod/podName
child: flow/flowName-$x-$y
args:
x: 1
y: b
---
apiVersion: appcontroller.k8s/v1alpha1
kind: Dependency
metadata:
name: dependency4
parent: pod/podName
child: flow/flowName-$x-$y
args:
x: 2
y: b
```

Besides simplifying the dependency graph, dependency replication makes possible to have dynamic number of replicas
by using parameter value right inside the list expressions:

```YAML
apiVersion: appcontroller.k8s/v1alpha1
kind: Dependency
metadata:
name: dependency
parent: pod/podName
child: flow/flowName-$index
generateFor:
index: 1..$replicaCount
```

### Replica-spaces and contexts

Replica-space, is a tag that all replicas of the flow share. When new `Replica` object for the flow is created,
Expand Down
3 changes: 3 additions & 0 deletions pkg/client/dependencies.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ type Dependency struct {

// Arguments passed to dependent resource
Args map[string]string `json:"args,omitempty"`

// map of variable name -> list expression. New dependencies are generated by replication and iteration over those lists
GenerateFor map[string]string `json:"generateFor,omitempty"`
}

// DependencyList is a k8s object representing list of dependencies
Expand Down
1 change: 0 additions & 1 deletion pkg/interfaces/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ type GraphContext interface {
Scheduler() Scheduler
GetArg(string) string
Graph() DependencyGraph
Dependency() *client.Dependency
}

// DependencyGraphOptions contains all the input required to build a dependency graph
Expand Down
12 changes: 1 addition & 11 deletions pkg/resources/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@
package resources

import (
"fmt"
"log"
"strings"

"github.com/Mirantis/k8s-AppController/pkg/client"
"github.com/Mirantis/k8s-AppController/pkg/interfaces"
Expand All @@ -29,7 +27,6 @@ type flow struct {
flow *client.Flow
context interfaces.GraphContext
originalName string
instanceName string
currentGraph interfaces.DependencyGraph
}

Expand All @@ -52,19 +49,12 @@ func (flowTemplateFactory) Kind() string {
func (flowTemplateFactory) New(def client.ResourceDefinition, c client.Interface, gc interfaces.GraphContext) interfaces.Resource {
newFlow := parametrizeResource(def.Flow, gc, []string{"*"}).(*client.Flow)

dep := gc.Dependency()
var depName string
if dep != nil {
depName = strings.Replace(dep.Name, dep.GenerateName, "", 1)
}

return report.SimpleReporter{
BaseResource: &flow{
Base: Base{def.Meta},
flow: newFlow,
context: gc,
originalName: def.Flow.Name,
instanceName: fmt.Sprintf("%s%s", depName, gc.GetArg("AC_NAME")),
}}
}

Expand Down Expand Up @@ -98,7 +88,7 @@ func (f *flow) buildDependencyGraph(replicaCount int, silent bool) (interfaces.D
options := interfaces.DependencyGraphOptions{
FlowName: f.originalName,
Args: args,
FlowInstanceName: f.instanceName,
FlowInstanceName: f.context.GetArg("AC_ID"),
ReplicaCount: replicaCount,
Silent: silent,
FixedNumberOfReplicas: fixedNumberOfReplicas,
Expand Down
144 changes: 126 additions & 18 deletions pkg/scheduler/dependency_graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"log"
"sort"
"strconv"
"strings"
"time"

Expand All @@ -40,12 +41,12 @@ type dependencyGraph struct {
}

type graphContext struct {
args map[string]string
graph *dependencyGraph
scheduler *scheduler
flow *client.Flow
dependency *client.Dependency
replica string
args map[string]string
graph *dependencyGraph
scheduler *scheduler
flow *client.Flow
id string
replica string
}

var _ interfaces.GraphContext = &graphContext{}
Expand All @@ -62,6 +63,8 @@ func (gc graphContext) GetArg(name string) string {
return gc.replica
case "AC_FLOW_NAME":
return gc.flow.Name
case "AC_ID":
return gc.id
default:
val, ok := gc.args[name]
if ok {
Expand All @@ -84,11 +87,6 @@ func (gc graphContext) Graph() interfaces.DependencyGraph {
return gc.graph
}

// Dependency returns Dependency for which child is the resource being created with this context
func (gc graphContext) Dependency() *client.Dependency {
return gc.dependency
}

// newScheduledResourceFor returns new scheduled resource for given resource in init state
func newScheduledResourceFor(r interfaces.Resource, suffix string, context *graphContext, existing bool) *ScheduledResource {
return &ScheduledResource{
Expand Down Expand Up @@ -159,7 +157,9 @@ func groupDependencies(dependencies []client.Dependency,
defaultFlow = []client.Dependency{}
addResource := func(name string) {
if !strings.HasPrefix(name, "flow/") && !isDependant[name] {
defaultFlow = append(defaultFlow, client.Dependency{Parent: defaultFlowName, Child: name})
dep := client.Dependency{Parent: defaultFlowName, Child: name}
dep.Name = name
defaultFlow = append(defaultFlow, dep)
isDependant[name] = true
}
}
Expand Down Expand Up @@ -328,11 +328,11 @@ func getArgFunc(gc interfaces.GraphContext) func(string) string {

func (sched *scheduler) prepareContext(parentContext *graphContext, dependency *client.Dependency, replica string) *graphContext {
context := &graphContext{
scheduler: sched,
graph: parentContext.graph,
flow: parentContext.flow,
replica: replica,
dependency: dependency,
scheduler: sched,
graph: parentContext.graph,
flow: parentContext.flow,
replica: replica,
id: getVertexID(dependency, replica),
}

context.args = make(map[string]string)
Expand All @@ -344,6 +344,15 @@ func (sched *scheduler) prepareContext(parentContext *graphContext, dependency *
return context
}

func getVertexID(dependency *client.Dependency, replica string) string {
var depName string
if dependency != nil {
depName = strings.Replace(dependency.Name, dependency.GenerateName, "", 1)
}
depName += replica
return depName
}

func (sched *scheduler) updateContext(context, parentContext *graphContext, dependency client.Dependency) {
for key, value := range dependency.Args {
context.args[key] = copier.EvaluateString(value, parentContext.GetArg)
Expand Down Expand Up @@ -661,6 +670,105 @@ func (sched *scheduler) BuildDependencyGraph(options interfaces.DependencyGraphO
return depGraph, nil
}

func listDependencies(dependencies map[string][]client.Dependency, parent string, flow *client.Flow,
useDestructionSelector bool, context *graphContext) []client.Dependency {

deps := filterDependencies(dependencies, parent, flow, useDestructionSelector)
var result []client.Dependency
for _, dep := range deps {
if len(dep.GenerateFor) == 0 {
result = append(result, dep)
continue
}

var keys []string
for k := range dep.GenerateFor {
keys = append(keys, k)
}
sort.Strings(keys)
lists := make([][]string, len(dep.GenerateFor))
for i, key := range keys {
lists[i] = expandListExpression(copier.EvaluateString(dep.GenerateFor[key], getArgFunc(context)))
}
for n, combination := range permute(lists) {
newArgs := make(map[string]string, len(dep.Args)+len(keys))
for k, v := range dep.Args {
newArgs[k] = v
}
for i, key := range keys {
newArgs[key] = combination[i]
}
depCopy := dep
depCopy.Args = newArgs
depCopy.Name += strconv.Itoa(n + 1)
result = append(result, depCopy)
}
}
return result
}

func permute(variants [][]string) [][]string {
switch len(variants) {
case 0:
return variants
case 1:
var result [][]string
for _, v := range variants[0] {
result = append(result, []string{v})
}
return result
default:
var result [][]string
for _, tail := range variants[len(variants)-1] {
for _, p := range permute(variants[:len(variants)-1]) {
result = append(result, append(p, tail))
}
}
return result
}
}

func expandListExpression(expr string) []string {
var result []string
for _, part := range strings.Split(expr, ",") {
part = strings.TrimSpace(part)
if part == "" {
continue
}

isRange := true
var from, to int

rangeParts := strings.SplitN(part, "..", 2)
if len(rangeParts) != 2 {
isRange = false
}

var err error
if isRange {
from, err = strconv.Atoi(rangeParts[0])
if err != nil {
isRange = false
}
}
if isRange {
to, err = strconv.Atoi(rangeParts[1])
if err != nil {
isRange = false
}
}

if isRange {
for i := from; i <= to; i++ {
result = append(result, strconv.Itoa(i))
}
} else {
result = append(result, part)
}
}
return result
}

func (sched *scheduler) fillDependencyGraph(rootContext *graphContext,
resDefs map[string]client.ResourceDefinition,
dependencies map[string][]client.Dependency,
Expand All @@ -683,7 +791,7 @@ func (sched *scheduler) fillDependencyGraph(rootContext *graphContext,
for e := queue.Front(); e != nil; e = e.Next() {
parent := e.Value.(*Block)

deps := filterDependencies(dependencies, parent.dependency.Child, flow, useDestructionSelector)
deps := listDependencies(dependencies, parent.dependency.Child, flow, useDestructionSelector, replicaContext)

for _, dep := range deps {
if parent.scheduledResource != nil && strings.HasPrefix(parent.scheduledResource.Key(), "flow/") {
Expand Down
Loading