Skip to content
This repository was archived by the owner on May 24, 2024. It is now read-only.

Commit fca5abc

Browse files
handle hot repos data lake orphans (#96)
Signed-off-by: Ayman <[email protected]> Co-authored-by: Ayman <[email protected]>
1 parent be8da62 commit fca5abc

File tree

3 files changed

+130
-15
lines changed

3 files changed

+130
-15
lines changed

cmd/git/git.go

Lines changed: 127 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -538,6 +538,7 @@ var (
538538
CachedCommitsUpdates = make(map[string]CommitCache)
539539
CommitsByYearHalfCacheFile = "commits-cache-%s-%s.csv"
540540
CurrentCacheYearHalf = YearFirstHalf
541+
FirstCommitAt time.Time
541542
)
542543

543544
// Publisher - for streaming data to Kinesis
@@ -3008,6 +3009,7 @@ func (j *DSGit) SyncV2(ctx *shared.Ctx) (err error) {
30083009
if err != nil {
30093010
return err
30103011
}
3012+
FirstCommitAt = firstCommit.Author.When
30113013
from := firstCommit.Author.When.Add(time.Second * -60)
30123014
if ctx.DateFrom.After(from) {
30133015
from = *ctx.DateFrom
@@ -3471,7 +3473,11 @@ func (j *DSGit) createYearHalfCacheFile(cache []CommitCache, path string) error
34713473
return err
34723474
}
34733475
if len(nextYearHalfCache) > 0 {
3474-
//CurrentCacheYear = nextYearHalfCache[0].CommitDate.Year()
3476+
CurrentCacheYear = nextYearHalfCache[0].CommitDate.Year()
3477+
CurrentCacheYearHalf = YearFirstHalf
3478+
if nextYearHalfCache[0].CommitDate.Month() > 6 {
3479+
CurrentCacheYearHalf = YearSecondHalf
3480+
}
34753481
updateYearHalf(nextYearHalfCache[0].CommitDate)
34763482
if err = j.createYearHalfCacheFile(nextYearHalfCache, path); err != nil {
34773483
return err
@@ -3713,8 +3719,17 @@ func (j *DSGit) getYearHalfCache(lastSync string) {
37133719
}
37143720
}
37153721

3716-
func loadCacheToMemory(records [][]string) {
3717-
lastSync := os.Getenv("LAST_SYNC")
3722+
func (j *DSGit) getCacheFileByKey(key string, lastSync string) (map[string]CommitCache, error) {
3723+
commits := make(map[string]CommitCache)
3724+
commentBytes, err := j.cacheProvider.GetFileByKey(j.endpoint, key)
3725+
if err != nil {
3726+
return commits, err
3727+
}
3728+
reader := csv.NewReader(bytes.NewBuffer(commentBytes))
3729+
records, err := reader.ReadAll()
3730+
if err != nil {
3731+
return commits, err
3732+
}
37183733
for i, record := range records {
37193734
if i == 0 {
37203735
continue
@@ -3742,7 +3757,7 @@ func loadCacheToMemory(records [][]string) {
37423757
}
37433758
}
37443759

3745-
cachedCommits[record[4]] = CommitCache{
3760+
commits[record[4]] = CommitCache{
37463761
Timestamp: record[0],
37473762
EntityID: record[1],
37483763
SourceEntityID: record[2],
@@ -3753,6 +3768,7 @@ func loadCacheToMemory(records [][]string) {
37533768
Content: content,
37543769
}
37553770
}
3771+
return commits, nil
37563772
}
37573773

37583774
func isCommitCreated(id string) bool {
@@ -3828,6 +3844,103 @@ func (j *DSGit) handleDataLakeOrphans() {
38283844

38293845
}
38303846

3847+
// handleHotRepoDataLakeOrphans Update hot repository commits in DL with new orphaned status
3848+
func (j *DSGit) handleHotRepoDataLakeOrphans() {
3849+
year := FirstCommitAt.Year()
3850+
half := YearFirstHalf
3851+
yearSTR := strconv.Itoa(year)
3852+
3853+
cacheFileName := fmt.Sprintf(CommitsByYearHalfCacheFile, yearSTR, half)
3854+
for {
3855+
commits, err := j.getCacheFileByKey(cacheFileName, "")
3856+
if err != nil {
3857+
if year > CurrentCacheYear {
3858+
break
3859+
}
3860+
if year == CurrentCacheYear && CurrentCacheYearHalf == YearFirstHalf && half == YearSecondHalf {
3861+
break
3862+
}
3863+
continue
3864+
}
3865+
3866+
if half == YearSecondHalf {
3867+
year++
3868+
half = YearFirstHalf
3869+
}
3870+
if half == YearFirstHalf {
3871+
half = YearSecondHalf
3872+
}
3873+
3874+
formattedData := j.handleSingleCacheFile(commits)
3875+
if len(formattedData) > 0 {
3876+
path, err := j.Publisher.PushEvents(CommitUpdated, "insights", GitDataSource, "commits", os.Getenv("STAGE"), formattedData, j.endpoint)
3877+
if err != nil {
3878+
j.log.WithFields(logrus.Fields{"operation": "handleDataLakeOrphans"}).Errorf("error pushing data lake orphand commits: %+v", err)
3879+
return
3880+
}
3881+
for _, c := range formattedData {
3882+
payload := c.(git.CommitUpdatedEvent).Payload
3883+
contentHash, er := createHash(payload)
3884+
if er != nil {
3885+
j.log.WithFields(logrus.Fields{"operation": "handleDataLakeOrphans"}).Errorf("error hashing commit data: %+v", err)
3886+
continue
3887+
}
3888+
commit := cachedCommits[contentHash]
3889+
commit.FromDL = false
3890+
commit.FileLocation = path
3891+
commit.Content = ""
3892+
cachedCommits[contentHash] = commit
3893+
}
3894+
if err = j.createUpdateCacheFile([]CommitCache{}, ""); err != nil {
3895+
j.log.WithFields(logrus.Fields{"operation": "handleDataLakeOrphans"}).Errorf("error updating commits cache: %+v", err)
3896+
return
3897+
}
3898+
j.log.WithFields(logrus.Fields{"operation": "handleDataLakeOrphans"}).Infof("updated %d orphand commits from data lake", len(formattedData))
3899+
}
3900+
}
3901+
3902+
}
3903+
3904+
func (j *DSGit) handleSingleCacheFile(commits map[string]CommitCache) []interface{} {
3905+
formattedData := make([]interface{}, 0)
3906+
baseEvent := service.BaseEvent{
3907+
Type: CommitUpdated,
3908+
CRUDInfo: service.CRUDInfo{
3909+
CreatedBy: GitConnector,
3910+
UpdatedBy: GitConnector,
3911+
CreatedAt: time.Now().Unix(),
3912+
UpdatedAt: time.Now().Unix(),
3913+
},
3914+
}
3915+
commitBaseEvent := git.CommitBaseEvent{
3916+
Connector: insights.GitConnector,
3917+
ConnectorVersion: GitBackendVersion,
3918+
Source: insights.Source(j.RepositorySource),
3919+
}
3920+
for _, v := range commits {
3921+
if v.Orphaned {
3922+
commitB, err := b64.StdEncoding.DecodeString(v.Content)
3923+
if err != nil {
3924+
j.log.WithFields(logrus.Fields{"operation": "handleDataLakeOrphans"}).Errorf("error decode datalake orphand commit: %+v", err)
3925+
}
3926+
var commit git.Commit
3927+
err = jsoniter.Unmarshal(commitB, &commit)
3928+
if err != nil {
3929+
j.log.WithFields(logrus.Fields{"operation": "handleDataLakeOrphans"}).Errorf("error unmarshall datalake orphand commit: %+v", err)
3930+
continue
3931+
}
3932+
commit.Orphaned = true
3933+
commitEvent := git.CommitUpdatedEvent{
3934+
CommitBaseEvent: commitBaseEvent,
3935+
BaseEvent: baseEvent,
3936+
Payload: commit,
3937+
}
3938+
formattedData = append(formattedData, commitEvent)
3939+
}
3940+
}
3941+
return formattedData
3942+
}
3943+
38313944
func createHash(content git.Commit) (string, error) {
38323945
commitHashFields := CommitHashFields{
38333946
ID: content.ID,
@@ -3860,10 +3973,11 @@ func (j *DSGit) setLastSync(ctx *shared.Ctx) error {
38603973
defer gMaxUpstreamDtMtx.Unlock()
38613974

38623975
lastSyncData := lastSyncFile{
3863-
LastSync: gMaxUpstreamDt,
3864-
Target: commitsCount,
3865-
Total: len(createdCommits),
3866-
Head: commitID,
3976+
LastSync: gMaxUpstreamDt,
3977+
Target: commitsCount,
3978+
Total: len(createdCommits),
3979+
Head: commitID,
3980+
FirstCommitAt: FirstCommitAt,
38673981
}
38683982

38693983
lastSyncDataB, err := jsoniter.Marshal(lastSyncData)
@@ -4145,10 +4259,11 @@ type clocResult struct {
41454259
}
41464260

41474261
type lastSyncFile struct {
4148-
LastSync time.Time `json:"last_sync"`
4149-
Target int `json:"target,omitempty"`
4150-
Total int `json:"total,omitempty"`
4151-
Head string `json:"head,omitempty"`
4262+
LastSync time.Time `json:"last_sync"`
4263+
Target int `json:"target,omitempty"`
4264+
Total int `json:"total,omitempty"`
4265+
Head string `json:"head,omitempty"`
4266+
FirstCommitAt time.Time `json:"first_commit_At"`
41524267
}
41534268

41544269
// CommitHashFields elected fields from commit schema to hash

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ module github.com/LF-Engineering/insights-datasource-git
33
go 1.17
44

55
require (
6-
github.com/LF-Engineering/insights-datasource-shared v1.5.30-0.20230411073313-68b5e7a0b0ef
6+
github.com/LF-Engineering/insights-datasource-shared v1.5.30-0.20230414034312-36df1857433a
77
github.com/LF-Engineering/lfx-event-schema v0.1.37
88
github.com/aws/aws-lambda-go v1.27.1
99
github.com/aws/aws-sdk-go v1.42.25

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
2-
github.com/LF-Engineering/insights-datasource-shared v1.5.30-0.20230411073313-68b5e7a0b0ef h1:Mwv6SkvJgLQi2/jdJCSWbjG/CFolOiQtRb3Ydhb4Oe8=
3-
github.com/LF-Engineering/insights-datasource-shared v1.5.30-0.20230411073313-68b5e7a0b0ef/go.mod h1:9DmFQbC8nnm1C7k+/tDo3Rmqzzx7AzmhPBlFouXaBZ8=
2+
github.com/LF-Engineering/insights-datasource-shared v1.5.30-0.20230414034312-36df1857433a h1:PjDcobccTBbR/w4+i7qCmJBIW8k3ZYxH7HENP0jcTMc=
3+
github.com/LF-Engineering/insights-datasource-shared v1.5.30-0.20230414034312-36df1857433a/go.mod h1:9DmFQbC8nnm1C7k+/tDo3Rmqzzx7AzmhPBlFouXaBZ8=
44
github.com/LF-Engineering/lfx-event-schema v0.1.14/go.mod h1:CfFIZ4mwzo88umf5+KxDQEzqlVkPG7Vx8eLK2oDfWIs=
55
github.com/LF-Engineering/lfx-event-schema v0.1.37 h1:ny46D2NdCXokvJZ01GJcw2RfQM64ousJjaYsrRj5zzg=
66
github.com/LF-Engineering/lfx-event-schema v0.1.37/go.mod h1:CfFIZ4mwzo88umf5+KxDQEzqlVkPG7Vx8eLK2oDfWIs=

0 commit comments

Comments
 (0)