@@ -18,6 +18,7 @@ import (
1818
1919 "github.com/LF-Engineering/insights-datasource-git/build"
2020 shared "github.com/LF-Engineering/insights-datasource-shared"
21+ "github.com/LF-Engineering/insights-datasource-shared/cache"
2122 elastic "github.com/LF-Engineering/insights-datasource-shared/elastic"
2223 logger "github.com/LF-Engineering/insights-datasource-shared/ingestjob"
2324 "github.com/LF-Engineering/lfx-event-schema/service"
@@ -579,6 +580,8 @@ type DSGit struct {
579580 // RepositorySource for example git, github or gerrit
580581 RepositorySource string
581582 log * logrus.Entry
583+ cacheProvider cache.Manager
584+ endpoint string
582585}
583586
584587// PublisherPushEvents - this is a fake function to test publisher locally
@@ -1769,7 +1772,10 @@ func (j *DSGit) GitEnrichItems(ctx *shared.Ctx, thrN int, items []interface{}, d
17691772 * docs = []interface {}{}
17701773 gMaxUpstreamDtMtx .Lock ()
17711774 defer gMaxUpstreamDtMtx .Unlock ()
1772- shared .SetLastUpdate (ctx , j .URL , gMaxUpstreamDt )
1775+ err = j .cacheProvider .SetLastSync (j .endpoint , gMaxUpstreamDt )
1776+ if err != nil {
1777+ return
1778+ }
17731779 }
17741780 }
17751781 if final {
@@ -2328,7 +2334,12 @@ func (j *DSGit) Sync(ctx *shared.Ctx) (err error) {
23282334 j .log .WithFields (logrus.Fields {"operation" : "Sync" }).Infof ("%s fetching from %v (%d threads)" , j .URL , ctx .DateFrom , thrN )
23292335 }
23302336 if ctx .DateFrom == nil {
2331- ctx .DateFrom = shared .GetLastUpdate (ctx , j .URL )
2337+ cachedLastSync , er := j .cacheProvider .GetLastSync (j .endpoint )
2338+ if er != nil {
2339+ err = er
2340+ return
2341+ }
2342+ ctx .DateFrom = & cachedLastSync
23322343 if ctx .DateFrom != nil {
23332344 j .log .WithFields (logrus.Fields {"operation" : "Sync" }).Infof ("%s resuming from %v (%d threads)" , j .URL , ctx .DateFrom , thrN )
23342345 }
@@ -2591,7 +2602,7 @@ func (j *DSGit) Sync(ctx *shared.Ctx) (err error) {
25912602 // NOTE: Non-generic ends here
25922603 gMaxUpstreamDtMtx .Lock ()
25932604 defer gMaxUpstreamDtMtx .Unlock ()
2594- shared . SetLastUpdate ( ctx , j . URL , gMaxUpstreamDt )
2605+ err = j . cacheProvider . SetLastSync ( j . endpoint , gMaxUpstreamDt )
25952606 return
25962607}
25972608
@@ -2600,7 +2611,7 @@ func main() {
26002611 ctx shared.Ctx
26012612 git DSGit
26022613 )
2603- git .initStructuredLogger ()
2614+ git .initStructuredLogger (& ctx )
26042615 err := git .Init (& ctx )
26052616 if err != nil {
26062617 git .log .WithFields (logrus.Fields {"operation" : "main" }).Errorf ("Error: %+v" , err )
@@ -2610,6 +2621,7 @@ func main() {
26102621 shared .SetSyncMode (true , false )
26112622 shared .SetLogLoggerError (false )
26122623 shared .AddLogger (& git .Logger , GitDataSource , logger .Internal , []map [string ]string {{"REPO_URL" : git .URL , "ProjectSlug" : ctx .Project }})
2624+ git .AddCacheProvider ()
26132625 git .WriteLog (& ctx , timestamp , logger .InProgress , "" )
26142626 err = git .Sync (& ctx )
26152627 if err != nil {
@@ -2621,15 +2633,29 @@ func main() {
26212633}
26222634
26232635// createStructuredLogger...
2624- func (j * DSGit ) initStructuredLogger () {
2636+ func (j * DSGit ) initStructuredLogger (ctx * shared.Ctx ) {
2637+ endpointURL := ""
2638+ if shared .FlagPassed (ctx , "url" ) && * j .FlagURL != "" {
2639+ endpointURL = * j .FlagURL
2640+ }
2641+ if ctx .EnvSet ("URL" ) {
2642+ endpointURL = ctx .Env ("URL" )
2643+ }
26252644 logrus .SetFormatter (& logrus.JSONFormatter {})
26262645 log := logrus .WithFields (
26272646 logrus.Fields {
26282647 "environment" : os .Getenv ("STAGE" ),
26292648 "commit" : build .GitCommit ,
26302649 "version" : build .Version ,
26312650 "service" : build .AppName ,
2632- "endpoint" : j . URL ,
2651+ "endpoint" : endpointURL ,
26332652 })
26342653 j .log = log
26352654}
2655+
2656+ // AddCacheProvider - adds cache provider
2657+ func (j * DSGit ) AddCacheProvider () {
2658+ cacheProvider := cache .NewManager (GitDataSource , os .Getenv ("STAGE" ))
2659+ j .cacheProvider = * cacheProvider
2660+ j .endpoint = strings .ReplaceAll (strings .TrimPrefix (strings .TrimPrefix (j .URL , "https://" ), "http://" ), "/" , "-" )
2661+ }
0 commit comments