Skip to content
This repository was archived by the owner on May 24, 2024. It is now read-only.

Commit 72a0f31

Browse files
Cache all events (#77)
* cache all events Signed-off-by: Ayman <[email protected]> * clean up Signed-off-by: Ayman <[email protected]> * code clean up Signed-off-by: Ayman <[email protected]> --------- Signed-off-by: Ayman <[email protected]> Co-authored-by: Ayman <[email protected]>
1 parent db35169 commit 72a0f31

File tree

1 file changed

+124
-65
lines changed

1 file changed

+124
-65
lines changed

cmd/git/git.go

Lines changed: 124 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -520,6 +520,7 @@ var (
520520
gMaxUpstreamDtMtx = &sync.Mutex{}
521521
cachedCommits = make(map[string]CommitCache)
522522
commitsCacheFile = "commits-cache.csv"
523+
createdCommits = make(map[string]bool)
523524
)
524525

525526
// Publisher - for streaming data to Kinesis
@@ -1786,36 +1787,67 @@ func (j *DSGit) GitEnrichItems(ctx *shared.Ctx, thrN int, items []interface{}, d
17861787
data := j.GetModelData(ctx, *docs)
17871788
if j.Publisher != nil {
17881789
formattedData := make([]interface{}, 0)
1790+
updatedData := make([]interface{}, 0)
17891791
commits := make([]CommitCache, 0)
1792+
updateCommits := make([]CommitCache, 0)
17901793
for _, d := range data {
1791-
isCreated := isKeyCreated(d.Payload.ID)
1792-
if !isCreated {
1794+
contentHash, er := createHash(d.Payload)
1795+
if er != nil {
1796+
j.log.WithFields(logrus.Fields{"operation": "GitEnrichItems"}).Errorf("error hash data for commit %s, error %v", d.Payload, err)
1797+
continue
1798+
}
1799+
hashExist := isHashCreated(contentHash)
1800+
isCreated := isCommitCreated(d.Payload.ID)
1801+
if !hashExist && !isCreated {
17931802
formattedData = append(formattedData, d)
1794-
b, er := json.Marshal(d)
1795-
if er != nil {
1796-
j.log.WithFields(logrus.Fields{"operation": "GitEnrichItems"}).Errorf("error marshall data for commit %s, error %v", d.Payload.SHA, err)
1797-
continue
1798-
}
17991803
tStamp := d.Payload.SyncTimestamp.Unix()
1800-
contentHash := fmt.Sprintf("%x", sha256.Sum256(b))
18011804
commits = append(commits, CommitCache{
18021805
Timestamp: fmt.Sprintf("%v", tStamp),
18031806
EntityID: d.Payload.ID,
18041807
SourceEntityID: d.Payload.SHA,
18051808
Hash: contentHash,
18061809
})
1810+
createdCommits[d.Payload.ID] = true
18071811
}
1812+
if isCreated && !hashExist {
1813+
updatedEvent := git.CommitUpdatedEvent{
1814+
CommitBaseEvent: d.CommitBaseEvent,
1815+
BaseEvent: service.BaseEvent{
1816+
Type: CommitUpdated,
1817+
CRUDInfo: d.BaseEvent.CRUDInfo,
1818+
},
1819+
Payload: d.Payload,
1820+
}
1821+
updatedData = append(updatedData, updatedEvent)
1822+
tStamp := d.Payload.SyncTimestamp.Unix()
1823+
updateCommits = append(updateCommits, CommitCache{
1824+
Timestamp: fmt.Sprintf("%v", tStamp),
1825+
EntityID: d.Payload.ID,
1826+
SourceEntityID: d.Payload.SHA,
1827+
Hash: contentHash,
1828+
})
1829+
}
1830+
18081831
}
1809-
path := ""
18101832
if len(formattedData) > 0 {
1811-
path, err = j.Publisher.PushEvents(CommitCreated, "insights", GitDataSource, "commits", os.Getenv("STAGE"), formattedData, j.endpoint)
1833+
path, err := j.Publisher.PushEvents(CommitCreated, "insights", GitDataSource, "commits", os.Getenv("STAGE"), formattedData, j.endpoint)
18121834
if err != nil {
18131835
j.log.WithFields(logrus.Fields{"operation": "GitEnrichItems"}).Errorf("Error: %+v", err)
18141836
return
18151837
}
1838+
if err = j.createCacheFile(commits, path); err != nil {
1839+
return
1840+
}
18161841
}
1817-
if err = j.createCacheFile(commits, path); err != nil {
1818-
return
1842+
if len(updatedData) > 0 {
1843+
path, err := j.Publisher.PushEvents(CommitUpdated, "insights", GitDataSource, "commits", os.Getenv("STAGE"), updatedData, j.endpoint)
1844+
if err != nil {
1845+
j.log.WithFields(logrus.Fields{"operation": "GitEnrichItems"}).Errorf("Error: %+v", err)
1846+
return
1847+
}
1848+
if err = j.createCacheFile(updateCommits, path); err != nil {
1849+
return
1850+
}
18191851
}
18201852

18211853
} else {
@@ -2428,53 +2460,7 @@ func (j *DSGit) Sync(ctx *shared.Ctx) (err error) {
24282460
j.log.WithFields(logrus.Fields{"operation": "Sync"}).Infof("%s resuming from %v (%d threads)", j.URL, ctx.DateFrom, thrN)
24292461
}
24302462
}
2431-
comB, err := j.cacheProvider.GetFileByKey(j.endpoint, commitsCacheFile)
2432-
if err != nil {
2433-
return
2434-
}
2435-
reader := csv.NewReader(bytes.NewBuffer(comB))
2436-
records, err := reader.ReadAll()
2437-
if err != nil {
2438-
return
2439-
}
2440-
for i, record := range records {
2441-
if i == 0 {
2442-
continue
2443-
}
2444-
orphaned, err := strconv.ParseBool(record[5])
2445-
if err != nil {
2446-
orphaned = false
2447-
}
2448-
if lastSync != "" {
2449-
orphaned = true
2450-
}
2451-
2452-
var fromDL bool
2453-
if len(record) > 6 {
2454-
fromDL, err = strconv.ParseBool(record[6])
2455-
if err != nil {
2456-
fromDL = false
2457-
}
2458-
}
2459-
2460-
var content string
2461-
if len(record) > 7 {
2462-
if record[7] != "" {
2463-
content = record[7]
2464-
}
2465-
}
2466-
2467-
cachedCommits[record[1]] = CommitCache{
2468-
Timestamp: record[0],
2469-
EntityID: record[1],
2470-
SourceEntityID: record[2],
2471-
FileLocation: record[3],
2472-
Hash: record[4],
2473-
Orphaned: orphaned,
2474-
FromDL: fromDL,
2475-
Content: content,
2476-
}
2477-
}
2463+
j.getCache(lastSync)
24782464
if ctx.DateTo != nil {
24792465
j.log.WithFields(logrus.Fields{"operation": "Sync"}).Infof("%s fetching till %v (%d threads)", j.URL, ctx.DateTo, thrN)
24802466
}
@@ -2892,16 +2878,73 @@ func (j *DSGit) createCacheFile(cache []CommitCache, path string) error {
28922878
return nil
28932879
}
28942880

2895-
func isKeyCreated(id string) bool {
2896-
c, ok := cachedCommits[id]
2881+
func isHashCreated(hash string) bool {
2882+
c, ok := cachedCommits[hash]
28972883
if ok {
28982884
c.Orphaned = false
2899-
cachedCommits[id] = c
2885+
cachedCommits[hash] = c
29002886
return true
29012887
}
29022888
return false
29032889
}
29042890

2891+
func (j *DSGit) getCache(lastSync string) {
2892+
commentBytes, err := j.cacheProvider.GetFileByKey(j.endpoint, commitsCacheFile)
2893+
if err != nil {
2894+
return
2895+
}
2896+
reader := csv.NewReader(bytes.NewBuffer(commentBytes))
2897+
records, err := reader.ReadAll()
2898+
if err != nil {
2899+
return
2900+
}
2901+
for i, record := range records {
2902+
if i == 0 {
2903+
continue
2904+
}
2905+
orphaned, err := strconv.ParseBool(record[5])
2906+
if err != nil {
2907+
orphaned = false
2908+
}
2909+
if lastSync != "" {
2910+
orphaned = true
2911+
}
2912+
2913+
var fromDL bool
2914+
if len(record) > 6 {
2915+
fromDL, err = strconv.ParseBool(record[6])
2916+
if err != nil {
2917+
fromDL = false
2918+
}
2919+
}
2920+
2921+
var content string
2922+
if len(record) > 7 {
2923+
if record[7] != "" {
2924+
content = record[7]
2925+
}
2926+
}
2927+
2928+
cachedCommits[record[4]] = CommitCache{
2929+
Timestamp: record[0],
2930+
EntityID: record[1],
2931+
SourceEntityID: record[2],
2932+
FileLocation: record[3],
2933+
Hash: record[4],
2934+
Orphaned: orphaned,
2935+
FromDL: fromDL,
2936+
Content: content,
2937+
}
2938+
2939+
createdCommits[record[1]] = true
2940+
}
2941+
}
2942+
2943+
func isCommitCreated(id string) bool {
2944+
_, ok := cachedCommits[id]
2945+
return ok
2946+
}
2947+
29052948
// handleDataLakeOrphans Update commits in DL with new orphaned status
29062949
func (j *DSGit) handleDataLakeOrphans() {
29072950
formattedData := make([]interface{}, 0)
@@ -2949,12 +2992,17 @@ func (j *DSGit) handleDataLakeOrphans() {
29492992
return
29502993
}
29512994
for _, c := range formattedData {
2952-
id := c.(git.CommitUpdatedEvent).Payload.ID
2953-
commit := cachedCommits[id]
2995+
payload := c.(git.CommitUpdatedEvent).Payload
2996+
contentHash, er := createHash(payload)
2997+
if er != nil {
2998+
j.log.WithFields(logrus.Fields{"operation": "handleDataLakeOrphans"}).Errorf("error hashing commit data: %+v", err)
2999+
continue
3000+
}
3001+
commit := cachedCommits[contentHash]
29543002
commit.FromDL = false
29553003
commit.FileLocation = path
29563004
commit.Content = ""
2957-
cachedCommits[id] = commit
3005+
cachedCommits[contentHash] = commit
29583006
}
29593007
if err = j.createCacheFile([]CommitCache{}, ""); err != nil {
29603008
j.log.WithFields(logrus.Fields{"operation": "handleDataLakeOrphans"}).Errorf("error updating commits cache: %+v", err)
@@ -2965,6 +3013,17 @@ func (j *DSGit) handleDataLakeOrphans() {
29653013

29663014
}
29673015

3016+
func createHash(content git.Commit) (string, error) {
3017+
content.SyncTimestamp = time.Time{}
3018+
b, err := json.Marshal(content)
3019+
if err != nil {
3020+
return "", err
3021+
}
3022+
contentHash := fmt.Sprintf("%x", sha256.Sum256(b))
3023+
3024+
return contentHash, err
3025+
}
3026+
29683027
// CommitCache single commit cache schema
29693028
type CommitCache struct {
29703029
Timestamp string `json:"timestamp"`

0 commit comments

Comments
 (0)