Skip to content
This repository was archived by the owner on May 24, 2024. It is now read-only.

Commit 1ca6446

Browse files
Use new last sync file (#80)
* use new last sync file Signed-off-by: Ayman <[email protected]> * use shared latest version Signed-off-by: Ayman <[email protected]> * enable multi thread Signed-off-by: Ayman <[email protected]> --------- Signed-off-by: Ayman <[email protected]> Co-authored-by: Ayman <[email protected]>
1 parent 15cd98c commit 1ca6446

File tree

3 files changed

+95
-14
lines changed

3 files changed

+95
-14
lines changed

cmd/git/git.go

Lines changed: 92 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1862,9 +1862,7 @@ func (j *DSGit) GitEnrichItems(ctx *shared.Ctx, thrN int, items []interface{}, d
18621862
j.log.WithFields(logrus.Fields{"operation": "GitEnrichItems"}).Infof("%s", string(jsonBytes))
18631863
}
18641864
*docs = []interface{}{}
1865-
gMaxUpstreamDtMtx.Lock()
1866-
defer gMaxUpstreamDtMtx.Unlock()
1867-
err = j.cacheProvider.SetLastSync(j.endpoint, gMaxUpstreamDt)
1865+
err = j.setLastSync(ctx)
18681866
if err != nil {
18691867
return
18701868
}
@@ -2438,7 +2436,7 @@ func (j *DSGit) ParseNextCommit(ctx *shared.Ctx) (commit map[string]interface{},
24382436

24392437
// Sync - sync git data source
24402438
func (j *DSGit) Sync(ctx *shared.Ctx) (err error) {
2441-
thrN := 1 //shared.GetThreadsNum(ctx)
2439+
thrN := shared.GetThreadsNum(ctx)
24422440
lastSync := os.Getenv("LAST_SYNC")
24432441
if lastSync != "" {
24442442
i, err := strconv.ParseInt(lastSync, 10, 64)
@@ -2452,12 +2450,24 @@ func (j *DSGit) Sync(ctx *shared.Ctx) (err error) {
24522450
j.log.WithFields(logrus.Fields{"operation": "Sync"}).Infof("%s fetching from %v (%d threads)", j.URL, ctx.DateFrom, thrN)
24532451
}
24542452
if ctx.DateFrom == nil {
2455-
cachedLastSync, er := j.cacheProvider.GetLastSync(j.endpoint)
2453+
lastSyncDataB, er := j.cacheProvider.GetLastSyncFile(j.endpoint)
24562454
if er != nil {
24572455
err = er
24582456
return
24592457
}
2460-
ctx.DateFrom = &cachedLastSync
2458+
var lastSyncData lastSyncFile
2459+
if er = json.Unmarshal(lastSyncDataB, &lastSyncData); er != nil {
2460+
var cachedLastSync time.Time
2461+
err = json.Unmarshal(lastSyncDataB, &cachedLastSync)
2462+
if err != nil {
2463+
err = er
2464+
return
2465+
}
2466+
lastSyncData = lastSyncFile{
2467+
LastSync: cachedLastSync,
2468+
}
2469+
}
2470+
ctx.DateFrom = &lastSyncData.LastSync
24612471
if ctx.DateFrom != nil {
24622472
j.log.WithFields(logrus.Fields{"operation": "Sync"}).Infof("%s resuming from %v (%d threads)", j.URL, ctx.DateFrom, thrN)
24632473
}
@@ -2737,11 +2747,7 @@ func (j *DSGit) Sync(ctx *shared.Ctx) (err error) {
27372747
j.handleDataLakeOrphans()
27382748
}
27392749
// NOTE: Non-generic ends here
2740-
gMaxUpstreamDtMtx.Lock()
2741-
defer gMaxUpstreamDtMtx.Unlock()
2742-
if !gMaxUpstreamDt.IsZero() {
2743-
err = j.cacheProvider.SetLastSync(j.endpoint, gMaxUpstreamDt)
2744-
}
2750+
err = j.setLastSync(ctx)
27452751
return
27462752
}
27472753

@@ -3026,6 +3032,74 @@ func createHash(content git.Commit) (string, error) {
30263032
return contentHash, err
30273033
}
30283034

3035+
func (j *DSGit) setLastSync(ctx *shared.Ctx) error {
3036+
commitsCount, err := j.getCommitsCount(ctx)
3037+
if err != nil {
3038+
return err
3039+
}
3040+
3041+
commitID, err := j.getHead(ctx)
3042+
if err != nil {
3043+
return err
3044+
}
3045+
3046+
gMaxUpstreamDtMtx.Lock()
3047+
defer gMaxUpstreamDtMtx.Unlock()
3048+
3049+
lastSyncData := lastSyncFile{
3050+
LastSync: gMaxUpstreamDt,
3051+
Target: commitsCount,
3052+
Total: len(createdCommits),
3053+
Head: commitID,
3054+
}
3055+
3056+
lastSyncDataB, err := jsoniter.Marshal(lastSyncData)
3057+
if err != nil {
3058+
return err
3059+
}
3060+
3061+
if !gMaxUpstreamDt.IsZero() {
3062+
err = j.cacheProvider.SetLastSyncFile(j.endpoint, lastSyncDataB)
3063+
if err != nil {
3064+
return err
3065+
}
3066+
}
3067+
3068+
return nil
3069+
}
3070+
3071+
func (j *DSGit) getCommitsCount(ctx *shared.Ctx) (int, error) {
3072+
count := 0
3073+
cmdLine := []string{"git", "rev-list", "--count", j.DefaultBranch}
3074+
sout, serr, err := shared.ExecCommand(ctx, cmdLine, j.GitPath, GitDefaultEnv)
3075+
if err != nil {
3076+
j.log.WithFields(logrus.Fields{"operation": "gitCommitsCount"}).Errorf("error executing command: %v, error: %v, output: %s, output error: %s", cmdLine, err, sout, serr)
3077+
return count, err
3078+
}
3079+
result := strings.TrimSpace(sout)
3080+
count, err = strconv.Atoi(result)
3081+
if err != nil {
3082+
j.log.WithFields(logrus.Fields{"operation": "gitCommitsCount"}).Errorf("error converting: %v, to int error: %v", result, err)
3083+
return count, err
3084+
}
3085+
return count, nil
3086+
}
3087+
3088+
func (j *DSGit) getHead(ctx *shared.Ctx) (string, error) {
3089+
if ctx.Debug > 0 {
3090+
j.log.WithFields(logrus.Fields{"operation": "getHead"}).Debugf("parsing logs from %s", j.GitPath)
3091+
}
3092+
// git rev-parse HEAD
3093+
cmdLine := []string{"git", "rev-parse", "head"}
3094+
sout, serr, err := shared.ExecCommand(ctx, cmdLine, j.GitPath, GitDefaultEnv)
3095+
if err != nil {
3096+
j.log.WithFields(logrus.Fields{"operation": "getHead"}).Errorf("error executing command: %v, error: %v, output: %s, output error: %s", cmdLine, err, sout, serr)
3097+
return "", err
3098+
}
3099+
commitID := strings.TrimSpace(sout)
3100+
return commitID, nil
3101+
}
3102+
30293103
// CommitCache single commit cache schema
30303104
type CommitCache struct {
30313105
Timestamp string `json:"timestamp"`
@@ -3056,3 +3130,10 @@ type clocResult struct {
30563130
Comment int `json:"comment"`
30573131
NumberOfFiles int `json:"nFiles"`
30583132
}
3133+
3134+
type lastSyncFile struct {
3135+
LastSync time.Time `json:"last_sync"`
3136+
Target int `json:"target,omitempty"`
3137+
Total int `json:"total,omitempty"`
3138+
Head string `json:"head,omitempty"`
3139+
}

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ module github.com/LF-Engineering/insights-datasource-git
33
go 1.17
44

55
require (
6-
github.com/LF-Engineering/insights-datasource-shared v1.5.21
6+
github.com/LF-Engineering/insights-datasource-shared v1.5.26
77
github.com/LF-Engineering/lfx-event-schema v0.1.37
88
github.com/aws/aws-lambda-go v1.27.1
99
github.com/aws/aws-sdk-go v1.42.25

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ github.com/DataDog/datadog-go/v5 v5.0.2 h1:UFtEe7662/Qojxkw1d6SboAeA0CPI3naKhVAS
77
github.com/DataDog/datadog-go/v5 v5.0.2/go.mod h1:ZI9JFB4ewXbw1sBnF4sxsR2k1H3xjV+PUAOUsHvKpcU=
88
github.com/DataDog/sketches-go v1.2.1 h1:qTBzWLnZ3kM2kw39ymh6rMcnN+5VULwFs++lEYUUsro=
99
github.com/DataDog/sketches-go v1.2.1/go.mod h1:1xYmPLY1So10AwxV6MJV0J53XVH+WL9Ad1KetxVivVI=
10-
github.com/LF-Engineering/insights-datasource-shared v1.5.21 h1:cZHytRoA5pZHsQlpeasV5K2b2jZ66ikylvNvzlBSj9s=
11-
github.com/LF-Engineering/insights-datasource-shared v1.5.21/go.mod h1:9DmFQbC8nnm1C7k+/tDo3Rmqzzx7AzmhPBlFouXaBZ8=
10+
github.com/LF-Engineering/insights-datasource-shared v1.5.26 h1:zA4Vc/gTjcVJPr4uARisSl4MQiF1Br9XT9i71G5Gza0=
11+
github.com/LF-Engineering/insights-datasource-shared v1.5.26/go.mod h1:9DmFQbC8nnm1C7k+/tDo3Rmqzzx7AzmhPBlFouXaBZ8=
1212
github.com/LF-Engineering/lfx-event-schema v0.1.14/go.mod h1:CfFIZ4mwzo88umf5+KxDQEzqlVkPG7Vx8eLK2oDfWIs=
1313
github.com/LF-Engineering/lfx-event-schema v0.1.37 h1:ny46D2NdCXokvJZ01GJcw2RfQM64ousJjaYsrRj5zzg=
1414
github.com/LF-Engineering/lfx-event-schema v0.1.37/go.mod h1:CfFIZ4mwzo88umf5+KxDQEzqlVkPG7Vx8eLK2oDfWIs=

0 commit comments

Comments
 (0)