Skip to content
This repository was archived by the owner on May 24, 2024. It is now read-only.

Commit 6babd9d

Browse files
Change caching mechanism (#59)
* change caching mevhanism Signed-off-by: Ayman <[email protected]> * clean up Signed-off-by: Ayman <[email protected]> * use v2 path Signed-off-by: Ayman <[email protected]> * handle orphand field Signed-off-by: Ayman <[email protected]> * clean up Signed-off-by: Ayman <[email protected]> * code clean up Signed-off-by: Ayman <[email protected]> * test changes Signed-off-by: Ayman <[email protected]> * handle csv issue Signed-off-by: Ayman <[email protected]> Signed-off-by: Ayman <[email protected]> Co-authored-by: Ayman <[email protected]>
1 parent b68cb7a commit 6babd9d

File tree

2 files changed

+112
-21
lines changed

2 files changed

+112
-21
lines changed

.circleci/config.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ workflows:
9191
- build
9292
filters:
9393
branches:
94-
only: main
94+
only: change-caching-mechanism #main
9595
tags:
9696
ignore: /.*/
9797
- approve_prod:

cmd/git/git.go

Lines changed: 111 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@ package main
22

33
import (
44
"bufio"
5+
"bytes"
56
"context"
7+
"crypto/sha256"
8+
"encoding/csv"
69
"encoding/json"
710
"flag"
811
"fmt"
@@ -513,6 +516,8 @@ var (
513516
// max upstream date
514517
gMaxUpstreamDt time.Time
515518
gMaxUpstreamDtMtx = &sync.Mutex{}
519+
cachedCommits = make(map[string]CommitCache)
520+
commitsCacheFile = "commits-cache.csv"
516521
)
517522

518523
// Publisher - for streaming data to Kinesis
@@ -1766,25 +1771,23 @@ func (j *DSGit) GitEnrichItems(ctx *shared.Ctx, thrN int, items []interface{}, d
17661771
data := j.GetModelData(ctx, *docs)
17671772
if j.Publisher != nil {
17681773
formattedData := make([]interface{}, 0)
1769-
vals := make([]map[string]interface{}, 0)
1774+
commits := make([]CommitCache, 0)
17701775
for _, d := range data {
1771-
isCreated, err := j.cacheProvider.IsKeyCreated(j.endpoint, d.Payload.ID)
1772-
if err != nil {
1773-
j.log.WithFields(logrus.Fields{"operation": "GitEnrichItems"}).Errorf("error creating cache for commit %s, error %v", d.Payload.SHA, err)
1774-
continue
1775-
}
1776+
isCreated := isKeyCreated(d.Payload.ID)
17761777
if !isCreated {
17771778
formattedData = append(formattedData, d)
1778-
b, err := json.Marshal(d)
1779-
if err != nil {
1779+
b, er := json.Marshal(d)
1780+
if er != nil {
17801781
j.log.WithFields(logrus.Fields{"operation": "GitEnrichItems"}).Errorf("error marshall data for commit %s, error %v", d.Payload.SHA, err)
17811782
continue
17821783
}
1783-
vals = append(vals, map[string]interface{}{
1784-
"id": d.Payload.ID,
1785-
"data": map[string]interface{}{
1786-
"content": b,
1787-
},
1784+
tStamp := d.Payload.SyncTimestamp.Unix()
1785+
contentHash := fmt.Sprintf("%x", sha256.Sum256(b))
1786+
commits = append(commits, CommitCache{
1787+
Timestamp: fmt.Sprintf("%v", tStamp),
1788+
EntityID: d.Payload.ID,
1789+
SourceEntityID: d.Payload.SHA,
1790+
Hash: contentHash,
17881791
})
17891792
}
17901793
}
@@ -1796,13 +1799,10 @@ func (j *DSGit) GitEnrichItems(ctx *shared.Ctx, thrN int, items []interface{}, d
17961799
return
17971800
}
17981801
}
1799-
if len(vals) > 0 {
1800-
for i, o := range vals {
1801-
o["data"].(map[string]interface{})["path"] = path
1802-
vals[i] = o
1803-
}
1804-
err = j.cacheProvider.Create(j.endpoint, vals)
1802+
if err = j.createCacheFile(commits, path); err != nil {
1803+
return
18051804
}
1805+
18061806
} else {
18071807
var jsonBytes []byte
18081808
jsonBytes, err = jsoniter.Marshal(data)
@@ -2396,6 +2396,35 @@ func (j *DSGit) Sync(ctx *shared.Ctx) (err error) {
23962396
j.log.WithFields(logrus.Fields{"operation": "Sync"}).Infof("%s resuming from %v (%d threads)", j.URL, ctx.DateFrom, thrN)
23972397
}
23982398
}
2399+
comB, err := j.cacheProvider.GetFileByKey(j.endpoint, commitsCacheFile)
2400+
if err != nil {
2401+
return
2402+
}
2403+
reader := csv.NewReader(bytes.NewBuffer(comB))
2404+
records, err := reader.ReadAll()
2405+
if err != nil {
2406+
return
2407+
}
2408+
for i, record := range records {
2409+
if i == 0 {
2410+
continue
2411+
}
2412+
orphaned, err := strconv.ParseBool(record[5])
2413+
if err != nil {
2414+
orphaned = false
2415+
}
2416+
if lastSync != "" {
2417+
orphaned = true
2418+
}
2419+
cachedCommits[record[1]] = CommitCache{
2420+
Timestamp: record[0],
2421+
EntityID: record[1],
2422+
SourceEntityID: record[2],
2423+
FileLocation: record[3],
2424+
Hash: record[4],
2425+
Orphaned: orphaned,
2426+
}
2427+
}
23992428
if ctx.DateTo != nil {
24002429
j.log.WithFields(logrus.Fields{"operation": "Sync"}).Infof("%s fetching till %v (%d threads)", j.URL, ctx.DateTo, thrN)
24012430
}
@@ -2740,7 +2769,69 @@ func (j *DSGit) initStructuredLogger() {
27402769

27412770
// AddCacheProvider - adds cache provider
27422771
func (j *DSGit) AddCacheProvider() {
2743-
cacheProvider := cache.NewManager(GitDataSource, os.Getenv("STAGE"))
2772+
cacheProvider := cache.NewManager(fmt.Sprintf("v2/%s", GitDataSource), os.Getenv("STAGE"))
27442773
j.cacheProvider = *cacheProvider
27452774
j.endpoint = strings.ReplaceAll(strings.TrimPrefix(strings.TrimPrefix(strings.TrimPrefix(j.URL, "https://"), "git://"), "http://"), "/", "-")
27462775
}
2776+
2777+
func (j *DSGit) createCacheFile(cache []CommitCache, path string) error {
2778+
for _, comm := range cache {
2779+
comm.FileLocation = path
2780+
cachedCommits[comm.EntityID] = comm
2781+
}
2782+
records := [][]string{
2783+
{"timestamp", "entity_id", "source_entity_id", "file_location", "hash", "orphaned"},
2784+
}
2785+
for _, c := range cachedCommits {
2786+
records = append(records, []string{c.Timestamp, c.EntityID, c.SourceEntityID, c.FileLocation, c.Hash, strconv.FormatBool(c.Orphaned)})
2787+
}
2788+
2789+
csvFile, err := os.Create(commitsCacheFile)
2790+
if err != nil {
2791+
return err
2792+
}
2793+
2794+
w := csv.NewWriter(csvFile)
2795+
err = w.WriteAll(records)
2796+
if err != nil {
2797+
return err
2798+
}
2799+
err = csvFile.Close()
2800+
if err != nil {
2801+
return err
2802+
}
2803+
file, err := os.ReadFile(commitsCacheFile)
2804+
if err != nil {
2805+
return err
2806+
}
2807+
err = os.Remove(commitsCacheFile)
2808+
if err != nil {
2809+
return err
2810+
}
2811+
err = j.cacheProvider.UpdateFileByKey(j.endpoint, commitsCacheFile, file)
2812+
if err != nil {
2813+
return err
2814+
}
2815+
2816+
return nil
2817+
}
2818+
2819+
func isKeyCreated(id string) bool {
2820+
c, ok := cachedCommits[id]
2821+
if ok {
2822+
c.Orphaned = false
2823+
cachedCommits[id] = c
2824+
return true
2825+
}
2826+
return false
2827+
}
2828+
2829+
// CommitCache single commit cache schema
2830+
type CommitCache struct {
2831+
Timestamp string `json:"timestamp"`
2832+
EntityID string `json:"entity_id"`
2833+
SourceEntityID string `json:"source_entity_id"`
2834+
FileLocation string `json:"file_location"`
2835+
Hash string `json:"hash"`
2836+
Orphaned bool `json:"orphaned"`
2837+
}

0 commit comments

Comments
 (0)