Skip to content
This repository was archived by the owner on May 24, 2024. It is now read-only.

Commit 6901c6c

Browse files
Add commit update event (#68)
* add commit update event Signed-off-by: Ayman <[email protected]> * clean up Signed-off-by: Ayman <[email protected]> Signed-off-by: Ayman <[email protected]> Co-authored-by: Ayman <[email protected]>
1 parent be7a51a commit 6901c6c

File tree

1 file changed

+68
-2
lines changed

1 file changed

+68
-2
lines changed

cmd/git/git.go

Lines changed: 68 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2416,13 +2416,23 @@ func (j *DSGit) Sync(ctx *shared.Ctx) (err error) {
24162416
if lastSync != "" {
24172417
orphaned = true
24182418
}
2419+
2420+
var fromDL bool
2421+
if len(record) > 6 {
2422+
fromDL, err = strconv.ParseBool(record[6])
2423+
if err != nil {
2424+
fromDL = false
2425+
}
2426+
}
2427+
24192428
cachedCommits[record[1]] = CommitCache{
24202429
Timestamp: record[0],
24212430
EntityID: record[1],
24222431
SourceEntityID: record[2],
24232432
FileLocation: record[3],
24242433
Hash: record[4],
24252434
Orphaned: orphaned,
2435+
FromDL: fromDL,
24262436
}
24272437
}
24282438
if ctx.DateTo != nil {
@@ -2680,6 +2690,10 @@ func (j *DSGit) Sync(ctx *shared.Ctx) (err error) {
26802690
}
26812691
}()
26822692
}
2693+
2694+
if lastSync != "" {
2695+
j.handleDataLakeOrphans()
2696+
}
26832697
// NOTE: Non-generic ends here
26842698
gMaxUpstreamDtMtx.Lock()
26852699
defer gMaxUpstreamDtMtx.Unlock()
@@ -2780,10 +2794,10 @@ func (j *DSGit) createCacheFile(cache []CommitCache, path string) error {
27802794
cachedCommits[comm.EntityID] = comm
27812795
}
27822796
records := [][]string{
2783-
{"timestamp", "entity_id", "source_entity_id", "file_location", "hash", "orphaned"},
2797+
{"timestamp", "entity_id", "source_entity_id", "file_location", "hash", "orphaned", "from_dl"},
27842798
}
27852799
for _, c := range cachedCommits {
2786-
records = append(records, []string{c.Timestamp, c.EntityID, c.SourceEntityID, c.FileLocation, c.Hash, strconv.FormatBool(c.Orphaned)})
2800+
records = append(records, []string{c.Timestamp, c.EntityID, c.SourceEntityID, c.FileLocation, c.Hash, strconv.FormatBool(c.Orphaned), strconv.FormatBool(c.FromDL)})
27872801
}
27882802

27892803
csvFile, err := os.Create(commitsCacheFile)
@@ -2826,6 +2840,57 @@ func isKeyCreated(id string) bool {
28262840
return false
28272841
}
28282842

2843+
// handleDataLakeOrphans Update commits in DL with new orphaned status
2844+
func (j *DSGit) handleDataLakeOrphans() {
2845+
formattedData := make([]interface{}, 0)
2846+
baseEvent := service.BaseEvent{
2847+
Type: CommitUpdated,
2848+
CRUDInfo: service.CRUDInfo{
2849+
CreatedBy: GitConnector,
2850+
UpdatedBy: GitConnector,
2851+
CreatedAt: time.Now().Unix(),
2852+
UpdatedAt: time.Now().Unix(),
2853+
},
2854+
}
2855+
commitBaseEvent := git.CommitBaseEvent{
2856+
Connector: insights.GitConnector,
2857+
ConnectorVersion: GitBackendVersion,
2858+
Source: insights.Source(j.RepositorySource),
2859+
}
2860+
2861+
for orphanedID, v := range cachedCommits {
2862+
if v.Orphaned && v.FromDL {
2863+
commit := git.CommitUpdatedEvent{
2864+
CommitBaseEvent: commitBaseEvent,
2865+
BaseEvent: baseEvent,
2866+
Payload: git.Commit{ID: orphanedID, Orphaned: true},
2867+
}
2868+
formattedData = append(formattedData, commit)
2869+
}
2870+
}
2871+
2872+
if len(formattedData) > 0 {
2873+
path, err := j.Publisher.PushEvents(CommitUpdated, "insights", GitDataSource, "commits", os.Getenv("STAGE"), formattedData)
2874+
if err != nil {
2875+
j.log.WithFields(logrus.Fields{"operation": "handleDataLakeOrphans"}).Errorf("error pushing data lake orphand commits: %+v", err)
2876+
return
2877+
}
2878+
for _, c := range formattedData {
2879+
id := c.(git.CommitUpdatedEvent).Payload.ID
2880+
commit := cachedCommits[id]
2881+
commit.FromDL = false
2882+
commit.FileLocation = path
2883+
cachedCommits[id] = commit
2884+
}
2885+
if err = j.createCacheFile([]CommitCache{}, ""); err != nil {
2886+
j.log.WithFields(logrus.Fields{"operation": "handleDataLakeOrphans"}).Errorf("error updating commits cache: %+v", err)
2887+
return
2888+
}
2889+
j.log.WithFields(logrus.Fields{"operation": "handleDataLakeOrphans"}).Infof("updated %d orphand commits from data lake", len(formattedData))
2890+
}
2891+
2892+
}
2893+
28292894
// CommitCache single commit cache schema
28302895
type CommitCache struct {
28312896
Timestamp string `json:"timestamp"`
@@ -2834,4 +2899,5 @@ type CommitCache struct {
28342899
FileLocation string `json:"file_location"`
28352900
Hash string `json:"hash"`
28362901
Orphaned bool `json:"orphaned"`
2902+
FromDL bool `json:"from_dl"`
28372903
}

0 commit comments

Comments
 (0)