Skip to content
This repository was archived by the owner on May 24, 2024. It is now read-only.

Commit fed371c

Browse files
Add cache sharding (#92)
* add cache sharding Signed-off-by: Ayman <[email protected]> * code clean up Signed-off-by: Ayman <[email protected]> --------- Signed-off-by: Ayman <[email protected]> Co-authored-by: Ayman <[email protected]>
1 parent d281099 commit fed371c

File tree

1 file changed

+249
-12
lines changed

1 file changed

+249
-12
lines changed

cmd/git/git.go

Lines changed: 249 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ const (
105105
// GitConnector ...
106106
GitConnector = "git-connector"
107107
PackSize = 1000
108+
HotRepoCount = 50000
108109
)
109110

110111
var (
@@ -523,11 +524,16 @@ var (
523524
// GitTrailerPPAuthors - trailer name to authors map (for pair programming)
524525
GitTrailerPPAuthors = map[string]string{"Signed-off-by": "authors_signed_off", "Co-authored-by": "co_authors"}
525526
// max upstream date
526-
gMaxUpstreamDt time.Time
527-
gMaxUpstreamDtMtx = &sync.Mutex{}
528-
cachedCommits = make(map[string]CommitCache)
529-
commitsCacheFile = "commits-cache.csv"
530-
createdCommits = make(map[string]bool)
527+
gMaxUpstreamDt time.Time
528+
gMaxUpstreamDtMtx = &sync.Mutex{}
529+
cachedCommits = make(map[string]CommitCache)
530+
commitsCacheFile = "commits-cache.csv"
531+
createdCommits = make(map[string]bool)
532+
IsHotRep = false
533+
CommitsByYearCacheFile = "commits-cache-%s.csv"
534+
CommitsUpdateCacheFile = "commits-update-cache.csv"
535+
CurrentCacheYear = 1970
536+
CachedCommitsUpdates = make(map[string]CommitCache)
531537
)
532538

533539
// Publisher - for streaming data to Kinesis
@@ -1826,6 +1832,7 @@ func (j *DSGit) GitEnrichItems(ctx *shared.Ctx, thrN int, items []interface{}, d
18261832
SourceEntityID: d.Payload.SHA,
18271833
Content: commitStr,
18281834
Hash: contentHash,
1835+
CommitDate: d.Payload.CommittedTimestamp,
18291836
})
18301837
createdCommits[d.Payload.ID] = true
18311838
}
@@ -1846,6 +1853,7 @@ func (j *DSGit) GitEnrichItems(ctx *shared.Ctx, thrN int, items []interface{}, d
18461853
SourceEntityID: d.Payload.SHA,
18471854
Content: commitStr,
18481855
Hash: contentHash,
1856+
CommitDate: d.Payload.CommittedTimestamp,
18491857
})
18501858
}
18511859

@@ -1856,8 +1864,14 @@ func (j *DSGit) GitEnrichItems(ctx *shared.Ctx, thrN int, items []interface{}, d
18561864
j.log.WithFields(logrus.Fields{"operation": "GitEnrichItems"}).Errorf("Error: %+v", err)
18571865
return
18581866
}
1859-
if err = j.createCacheFile(commits, path); err != nil {
1860-
return
1867+
if !IsHotRep {
1868+
if err = j.createCacheFile(commits, path); err != nil {
1869+
return
1870+
}
1871+
} else {
1872+
if err = j.createYearCacheFile(commits, path); err != nil {
1873+
return
1874+
}
18611875
}
18621876
}
18631877
if len(updatedData) > 0 {
@@ -1866,9 +1880,16 @@ func (j *DSGit) GitEnrichItems(ctx *shared.Ctx, thrN int, items []interface{}, d
18661880
j.log.WithFields(logrus.Fields{"operation": "GitEnrichItems"}).Errorf("Error: %+v", err)
18671881
return
18681882
}
1869-
if err = j.createCacheFile(updateCommits, path); err != nil {
1870-
return
1883+
if !IsHotRep {
1884+
if err = j.createCacheFile(updateCommits, path); err != nil {
1885+
return
1886+
}
1887+
} else {
1888+
if err = j.createUpdateCacheFile(updateCommits, path); err != nil {
1889+
return
1890+
}
18711891
}
1892+
18721893
}
18731894

18741895
} else {
@@ -2939,7 +2960,6 @@ func (j *DSGit) SyncV2(ctx *shared.Ctx) (err error) {
29392960
j.log.WithFields(logrus.Fields{"operation": "Sync"}).Infof("%s resuming from %v (%d threads)", j.URL, ctx.DateFrom, thrN)
29402961
}
29412962
}
2942-
j.getCache(lastSync)
29432963
if ctx.DateTo != nil {
29442964
j.log.WithFields(logrus.Fields{"operation": "Sync"}).Infof("%s fetching till %v (%d threads)", j.URL, ctx.DateTo, thrN)
29452965
}
@@ -2984,7 +3004,7 @@ func (j *DSGit) SyncV2(ctx *shared.Ctx) (err error) {
29843004
if err != nil {
29853005
return err
29863006
}
2987-
from := firstCommit.Author.When
3007+
from := firstCommit.Author.When.Add(time.Second * -60)
29883008
if ctx.DateFrom.After(from) {
29893009
from = *ctx.DateFrom
29903010
}
@@ -3133,6 +3153,19 @@ func (j *DSGit) SyncV2(ctx *shared.Ctx) (err error) {
31333153
return
31343154
}
31353155

3156+
commitsCount, err := j.getCommitsCount(ctx)
3157+
if err != nil {
3158+
return err
3159+
}
3160+
if commitsCount >= HotRepoCount {
3161+
IsHotRep = true
3162+
CurrentCacheYear = from.Year()
3163+
j.getYearCache(lastSync)
3164+
j.getUpdateCache(lastSync)
3165+
} else {
3166+
j.getCache(lastSync)
3167+
}
3168+
31363169
for from.Before(headCommit.Author.When) {
31373170
until := from.Add(24 * time.Hour * 30)
31383171
comms, er := getRepoCommits(r, from, until)
@@ -3387,6 +3420,104 @@ func (j *DSGit) createCacheFile(cache []CommitCache, path string) error {
33873420
return nil
33883421
}
33893422

3423+
func (j *DSGit) createYearCacheFile(cache []CommitCache, path string) error {
3424+
nextYearCache := make([]CommitCache, 0)
3425+
for _, comm := range cache {
3426+
comm.FileLocation = path
3427+
if comm.CommitDate.Year() == CurrentCacheYear {
3428+
cachedCommits[comm.EntityID] = comm
3429+
} else {
3430+
nextYearCache = append(nextYearCache, comm)
3431+
}
3432+
}
3433+
records := [][]string{
3434+
{"timestamp", "entity_id", "source_entity_id", "file_location", "hash", "orphaned", "from_dl", "content"},
3435+
}
3436+
for _, c := range cachedCommits {
3437+
records = append(records, []string{c.Timestamp, c.EntityID, c.SourceEntityID, c.FileLocation, c.Hash, strconv.FormatBool(c.Orphaned), strconv.FormatBool(c.FromDL), c.Content})
3438+
}
3439+
3440+
yearSTR := strconv.Itoa(CurrentCacheYear)
3441+
cacheFile := fmt.Sprintf(CommitsByYearCacheFile, yearSTR)
3442+
csvFile, err := os.Create(cacheFile)
3443+
if err != nil {
3444+
return err
3445+
}
3446+
3447+
w := csv.NewWriter(csvFile)
3448+
err = w.WriteAll(records)
3449+
if err != nil {
3450+
return err
3451+
}
3452+
err = csvFile.Close()
3453+
if err != nil {
3454+
return err
3455+
}
3456+
file, err := os.ReadFile(cacheFile)
3457+
if err != nil {
3458+
return err
3459+
}
3460+
err = os.Remove(cacheFile)
3461+
if err != nil {
3462+
return err
3463+
}
3464+
err = j.cacheProvider.UpdateFileByKey(j.endpoint, cacheFile, file)
3465+
if err != nil {
3466+
return err
3467+
}
3468+
if len(nextYearCache) > 0 {
3469+
CurrentCacheYear = nextYearCache[0].CommitDate.Year()
3470+
if err = j.createYearCacheFile(nextYearCache, path); err != nil {
3471+
return err
3472+
}
3473+
cachedCommits = make(map[string]CommitCache)
3474+
j.getYearCache(os.Getenv("LAST_SYNC"))
3475+
}
3476+
return nil
3477+
}
3478+
3479+
func (j *DSGit) createUpdateCacheFile(cache []CommitCache, path string) error {
3480+
for _, comm := range cache {
3481+
comm.FileLocation = path
3482+
CachedCommitsUpdates[comm.EntityID] = comm
3483+
}
3484+
records := [][]string{
3485+
{"timestamp", "entity_id", "source_entity_id", "file_location", "hash", "orphaned", "from_dl", "content"},
3486+
}
3487+
for _, c := range CachedCommitsUpdates {
3488+
records = append(records, []string{c.Timestamp, c.EntityID, c.SourceEntityID, c.FileLocation, c.Hash, strconv.FormatBool(c.Orphaned), strconv.FormatBool(c.FromDL), c.Content})
3489+
}
3490+
3491+
csvFile, err := os.Create(CommitsUpdateCacheFile)
3492+
if err != nil {
3493+
return err
3494+
}
3495+
3496+
w := csv.NewWriter(csvFile)
3497+
err = w.WriteAll(records)
3498+
if err != nil {
3499+
return err
3500+
}
3501+
err = csvFile.Close()
3502+
if err != nil {
3503+
return err
3504+
}
3505+
file, err := os.ReadFile(CommitsUpdateCacheFile)
3506+
if err != nil {
3507+
return err
3508+
}
3509+
err = os.Remove(CommitsUpdateCacheFile)
3510+
if err != nil {
3511+
return err
3512+
}
3513+
err = j.cacheProvider.UpdateFileByKey(j.endpoint, CommitsUpdateCacheFile, file)
3514+
if err != nil {
3515+
return err
3516+
}
3517+
3518+
return nil
3519+
}
3520+
33903521
func isHashCreated(hash string) bool {
33913522
c, ok := cachedCommits[hash]
33923523
if ok {
@@ -3449,6 +3580,111 @@ func (j *DSGit) getCache(lastSync string) {
34493580
}
34503581
}
34513582

3583+
func (j *DSGit) getUpdateCache(lastSync string) {
3584+
commentBytes, err := j.cacheProvider.GetFileByKey(j.endpoint, CommitsUpdateCacheFile)
3585+
if err != nil {
3586+
return
3587+
}
3588+
reader := csv.NewReader(bytes.NewBuffer(commentBytes))
3589+
records, err := reader.ReadAll()
3590+
if err != nil {
3591+
return
3592+
}
3593+
for i, record := range records {
3594+
if i == 0 {
3595+
continue
3596+
}
3597+
orphaned, err := strconv.ParseBool(record[5])
3598+
if err != nil {
3599+
orphaned = false
3600+
}
3601+
if lastSync != "" {
3602+
orphaned = true
3603+
}
3604+
3605+
var fromDL bool
3606+
if len(record) > 6 {
3607+
fromDL, err = strconv.ParseBool(record[6])
3608+
if err != nil {
3609+
fromDL = false
3610+
}
3611+
}
3612+
3613+
var content string
3614+
if len(record) > 7 {
3615+
if record[7] != "" {
3616+
content = record[7]
3617+
}
3618+
}
3619+
3620+
CachedCommitsUpdates[record[4]] = CommitCache{
3621+
Timestamp: record[0],
3622+
EntityID: record[1],
3623+
SourceEntityID: record[2],
3624+
FileLocation: record[3],
3625+
Hash: record[4],
3626+
Orphaned: orphaned,
3627+
FromDL: fromDL,
3628+
Content: content,
3629+
}
3630+
3631+
createdCommits[record[1]] = true
3632+
}
3633+
}
3634+
3635+
func (j *DSGit) getYearCache(lastSync string) {
3636+
yearSTR := strconv.Itoa(CurrentCacheYear)
3637+
commentBytes, err := j.cacheProvider.GetFileByKey(j.endpoint, fmt.Sprintf(CommitsByYearCacheFile, yearSTR))
3638+
if err != nil {
3639+
return
3640+
}
3641+
reader := csv.NewReader(bytes.NewBuffer(commentBytes))
3642+
records, err := reader.ReadAll()
3643+
if err != nil {
3644+
return
3645+
}
3646+
for i, record := range records {
3647+
if i == 0 {
3648+
continue
3649+
}
3650+
orphaned, err := strconv.ParseBool(record[5])
3651+
if err != nil {
3652+
orphaned = false
3653+
}
3654+
if lastSync != "" {
3655+
orphaned = true
3656+
}
3657+
3658+
var fromDL bool
3659+
if len(record) > 6 {
3660+
fromDL, err = strconv.ParseBool(record[6])
3661+
if err != nil {
3662+
fromDL = false
3663+
}
3664+
}
3665+
3666+
var content string
3667+
if len(record) > 7 {
3668+
if record[7] != "" {
3669+
content = record[7]
3670+
}
3671+
}
3672+
3673+
cachedCommits[record[4]] = CommitCache{
3674+
Timestamp: record[0],
3675+
EntityID: record[1],
3676+
SourceEntityID: record[2],
3677+
FileLocation: record[3],
3678+
Hash: record[4],
3679+
Orphaned: orphaned,
3680+
FromDL: fromDL,
3681+
Content: content,
3682+
}
3683+
3684+
createdCommits[record[1]] = true
3685+
}
3686+
}
3687+
34523688
func isCommitCreated(id string) bool {
34533689
_, ok := cachedCommits[id]
34543690
return ok
@@ -3513,7 +3749,7 @@ func (j *DSGit) handleDataLakeOrphans() {
35133749
commit.Content = ""
35143750
cachedCommits[contentHash] = commit
35153751
}
3516-
if err = j.createCacheFile([]CommitCache{}, ""); err != nil {
3752+
if err = j.createUpdateCacheFile([]CommitCache{}, ""); err != nil {
35173753
j.log.WithFields(logrus.Fields{"operation": "handleDataLakeOrphans"}).Errorf("error updating commits cache: %+v", err)
35183754
return
35193755
}
@@ -3816,6 +4052,7 @@ type CommitCache struct {
38164052
Orphaned bool `json:"orphaned"`
38174053
FromDL bool `json:"from_dl"`
38184054
Content string `json:"content"`
4055+
CommitDate time.Time
38194056
}
38204057

38214058
// ReportData schema

0 commit comments

Comments
 (0)