diff --git a/.golangci.yml b/.golangci.yml index 3aa05d5..f7fc99f 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -22,6 +22,8 @@ linters-settings: linters: enable-all: true disable: + - gocognit + - gocyclo - funlen - nestif - cyclop diff --git a/cmd/catp/catp/app.go b/cmd/catp/catp/app.go index d54575e..4ccd48c 100644 --- a/cmd/catp/catp/app.go +++ b/cmd/catp/catp/app.go @@ -4,522 +4,50 @@ package catp import ( "bufio" "bytes" - "context" - "encoding/json" "errors" "flag" "fmt" "io" "log" "os" - "path" "path/filepath" "runtime/pprof" "sort" "strings" - "sync" "sync/atomic" "time" "github.com/bool64/dev/version" "github.com/bool64/progress" - "github.com/klauspost/compress/zstd" gzip "github.com/klauspost/pgzip" - "golang.org/x/time/rate" ) -var versionExtra []string - -type runner struct { - mu sync.Mutex - output io.Writer - - pr *progress.Progress - progressJSON string - - sizes map[string]int64 - matches int64 - totalBytes int64 - outDir string - - parallel int - - currentBytes int64 - currentBytesUncompressed int64 - currentLines int64 - - // pass is a slice of OR items, that are slices of AND items. - pass [][][]byte - // skip is a slice of OR items, that are slices of AND items. - skip [][][]byte - - finalPass bool - - currentFile *progress.CountingReader - currentTotal int64 - - lastErr error - lastStatusTime int64 - lastBytesUncompressed int64 - - rateLimit float64 - limiter *rate.Limiter - - noProgress bool - countLines bool - - hasOptions bool - options Options - - hasCompression bool -} - -// humanReadableBytes converts bytes to a human-readable string (TB, GB, MB, KB, or bytes). -func humanReadableBytes(bytes int64) string { - const ( - Byte = 1 - KByte = Byte * 1024 - MByte = KByte * 1024 - GByte = MByte * 1024 - TByte = GByte * 1024 - ) - - switch { - case bytes >= TByte: - return fmt.Sprintf("%.2f TB", float64(bytes)/float64(TByte)) - case bytes >= GByte: - return fmt.Sprintf("%.2f GB", float64(bytes)/float64(GByte)) - case bytes >= MByte: - return fmt.Sprintf("%.2f MB", float64(bytes)/float64(MByte)) - case bytes >= KByte: - return fmt.Sprintf("%.2f KB", float64(bytes)/float64(KByte)) - default: - return fmt.Sprintf("%d B", bytes) - } -} - -// st renders Status as a string. -func (r *runner) st(s progress.Status) string { - var res string - - type progressJSON struct { - progress.Status - BytesCompleted int64 `json:"bytes_completed"` - BytesTotal int64 `json:"bytes_total"` - CurrentFilePercent float64 `json:"current_file_percent,omitempty"` - Matches *int64 `json:"matches,omitempty"` - ElapsedSeconds float64 `json:"elapsed_sec"` - RemainingSeconds float64 `json:"remaining_sec"` - } - - pr := progressJSON{ - Status: s, - } - - currentBytesUncompressed := atomic.LoadInt64(&r.currentBytesUncompressed) - currentBytes := atomic.LoadInt64(&r.currentBytes) - - if len(r.sizes) > 1 && r.parallel <= 1 { - pr.CurrentFilePercent = 100 * float64(r.currentFile.Bytes()) / float64(r.currentTotal) - - if s.LinesCompleted != 0 { - res = fmt.Sprintf("all: %.1f%% bytes read, %s: %.1f%% bytes read, %d lines processed, %.1f l/s, %.1f MB/s, elapsed %s, remaining %s", - s.DonePercent, s.Task, pr.CurrentFilePercent, s.LinesCompleted, s.SpeedLPS, s.SpeedMBPS, - s.Elapsed.Round(10*time.Millisecond).String(), s.Remaining.String()) - } else { - res = fmt.Sprintf("all: %.1f%% bytes read, %s: %.1f%% bytes read, %.1f MB/s, elapsed %s, remaining %s", - s.DonePercent, s.Task, pr.CurrentFilePercent, s.SpeedMBPS, - s.Elapsed.Round(10*time.Millisecond).String(), s.Remaining.String()) - } - } else { - if s.LinesCompleted != 0 { - if r.totalBytes == -1 { // STDIN - res = fmt.Sprintf("%s read, %d lines processed, %.1f l/s, %.1f MB/s, elapsed %s", - humanReadableBytes(s.BytesCompleted), s.LinesCompleted, s.SpeedLPS, s.SpeedMBPS, - s.Elapsed.Round(10*time.Millisecond).String()) - } else { - res = fmt.Sprintf("%s: %.1f%% bytes read, %d lines processed, %.1f l/s, %.1f MB/s, elapsed %s, remaining %s", - s.Task, s.DonePercent, s.LinesCompleted, s.SpeedLPS, s.SpeedMBPS, - s.Elapsed.Round(10*time.Millisecond).String(), s.Remaining.String()) - } - } else { - if r.totalBytes == -1 { - res = fmt.Sprintf("%s read, %.1f MB/s, elapsed %s", - humanReadableBytes(s.BytesCompleted), s.SpeedMBPS, - s.Elapsed.Round(10*time.Millisecond).String()) - } else { - res = fmt.Sprintf("%s: %.1f%% bytes read, %.1f MB/s, elapsed %s, remaining %s", - s.Task, s.DonePercent, s.SpeedMBPS, - s.Elapsed.Round(10*time.Millisecond).String(), s.Remaining.String()) - } - } - } - - if currentBytesUncompressed > currentBytes && r.hasCompression { - lastBytesUncompressed := atomic.LoadInt64(&r.lastBytesUncompressed) - lastStatusTime := atomic.LoadInt64(&r.lastStatusTime) - now := time.Now().Unix() - - if lastBytesUncompressed != 0 && lastStatusTime != 0 && lastStatusTime != now { - spdMPBS := (float64(currentBytesUncompressed-lastBytesUncompressed) / float64(now-lastStatusTime)) / (1024 * 1024) - res = strings.ReplaceAll(res, "MB/s", fmt.Sprintf("MB/s (uncomp %.1f MB/s)", spdMPBS)) - } - - atomic.StoreInt64(&r.lastStatusTime, now) - atomic.StoreInt64(&r.lastBytesUncompressed, currentBytesUncompressed) - } - - if len(r.pass) > 0 || len(r.skip) > 0 || r.options.PrepareLine != nil { - m := atomic.LoadInt64(&r.matches) - pr.Matches = &m - res += fmt.Sprintf(", matches %d", m) - } - - if r.progressJSON != "" { - pr.ElapsedSeconds = pr.Elapsed.Truncate(time.Second).Seconds() - pr.RemainingSeconds = pr.Remaining.Round(time.Second).Seconds() - pr.BytesCompleted = currentBytes - pr.BytesTotal = atomic.LoadInt64(&r.totalBytes) - - if j, err := json.Marshal(pr); err == nil { - if err = os.WriteFile(r.progressJSON, append(j, '\n'), 0o600); err != nil { - println("failed to write progress JSON: " + err.Error()) - } - } - } - - return res -} - -func (r *runner) readFile(rd io.Reader, out io.Writer) { - b := bufio.NewReaderSize(rd, 64*1024) - - _, err := io.Copy(out, b) - if err != nil { - log.Fatal(err) - } -} - -func (r *runner) scanFile(filename string, rd io.Reader, out io.Writer) { - s := bufio.NewScanner(rd) - s.Buffer(make([]byte, 64*1024), 10*1024*1024) - - lines := 0 - buf := make([]byte, 64*1024) - - linesPush := 1000 - if r.rateLimit < 100 { - linesPush = 1 - } - - for s.Scan() { - lines++ - - if r.limiter != nil { - _ = r.limiter.Wait(context.Background()) //nolint:errcheck // No failure condition here. - } - - if lines >= linesPush { - atomic.AddInt64(&r.currentLines, int64(lines)) - lines = 0 - } - - line := s.Bytes() - - if !r.shouldWrite(line) { - continue - } - - if r.hasOptions { - if r.options.PrepareLine != nil { - buf = buf[:0] - line = r.options.PrepareLine(filename, lines, line, &buf) - } - - if line == nil { - continue - } - } - - atomic.AddInt64(&r.matches, 1) - - if r.parallel > 1 && r.outDir == "" { - r.mu.Lock() - } - - if _, err := out.Write(append(line, '\n')); err != nil { - r.lastErr = err - - if r.parallel > 1 && r.outDir == "" { - r.mu.Unlock() - } - - return - } - - if r.parallel > 1 && r.outDir == "" { - r.mu.Unlock() - } - } - - atomic.AddInt64(&r.currentLines, int64(lines)) - - if err := s.Err(); err != nil { - r.mu.Lock() - defer r.mu.Unlock() - - r.lastErr = err - } -} - -func (r *runner) shouldWrite(line []byte) bool { - shouldWrite := false - passed := false - - if len(r.pass) == 0 { - shouldWrite = true - } else { - for _, orFilter := range r.pass { - orPassed := true - - for _, andFilter := range orFilter { - if !bytes.Contains(line, andFilter) { - orPassed = false - - break - } - } - - if orPassed { - shouldWrite = true - passed = true - - break - } - } - } - - if !shouldWrite { - return shouldWrite - } - - if passed && r.finalPass { - return true - } - - for _, orFilter := range r.skip { - orPassed := true - - for _, andFilter := range orFilter { - if !bytes.Contains(line, andFilter) { - orPassed = false - - break - } - } - - if orPassed { - shouldWrite = false - - break - } - } - - return shouldWrite -} - -func (r *runner) cat(filename string) (err error) { //nolint:gocyclo - var rd io.Reader - - if filename == "-" { - rd = os.Stdin - } else { - file, err := os.Open(filename) //nolint:gosec - if err != nil { - return err - } - - defer func() { - if clErr := file.Close(); clErr != nil && err == nil { - err = clErr - } - }() - - rd = io.Reader(file) - } - - if !r.noProgress { - cr := progress.NewCountingReader(rd) - cr.SetBytes(&r.currentBytes) - cr.SetLines(nil) - - if r.parallel <= 1 { - cr = progress.NewCountingReader(rd) - cr.SetLines(nil) - r.currentFile = cr - r.currentTotal = r.sizes[filename] - } - - rd = cr - } - - if rd, err = r.openReader(rd, filename); err != nil { - return err - } - - if !r.noProgress { - crl := progress.NewCountingReader(rd) - - crl.SetBytes(&r.currentBytesUncompressed) - crl.SetLines(nil) - - rd = crl - } - - out := r.output - - if r.outDir != "" { - fn := r.outDir + "/" + path.Base(filename) - if strings.HasSuffix(fn, ".gz") { - fn = strings.TrimSuffix(fn, ".gz") - } else { - fn = strings.TrimSuffix(fn, ".zst") - } - - w, err := os.Create(fn) //nolint:gosec - if err != nil { - return err - } - - defer func() { - if clErr := w.Close(); clErr != nil && err == nil { - err = clErr - } - }() - - out = w - } - - if r.parallel <= 1 && !r.noProgress { - r.pr.Start(func(t *progress.Task) { - t.TotalBytes = func() int64 { - return r.totalBytes - } - - t.CurrentBytes = r.currentFile.Bytes - t.CurrentLines = func() int64 { return atomic.LoadInt64(&r.currentLines) } - t.Task = filename - t.Continue = true - t.PrintOnStart = true - }) - } - - if r.rateLimit > 0 { - r.limiter = rate.NewLimiter(rate.Limit(r.rateLimit), 100) - } - - if len(r.pass) > 0 || len(r.skip) > 0 || r.parallel > 1 || r.hasOptions || r.countLines || r.rateLimit > 0 { - r.scanFile(filename, rd, out) - } else { - r.readFile(rd, out) - } - - if r.parallel <= 1 && !r.noProgress { - r.pr.Stop() - } - - return r.lastErr -} - -func (r *runner) openReader(rd io.Reader, filename string) (io.Reader, error) { - var err error - - switch { - case strings.HasSuffix(filename, ".gz"): - if rd, err = gzip.NewReader(rd); err != nil { - return nil, fmt.Errorf("failed to init gzip reader: %w", err) - } - case strings.HasSuffix(filename, ".zst"): - if r.parallel >= 1 { - if rd, err = zstdReader(rd); err != nil { - return nil, fmt.Errorf("failed to init zst reader: %w", err) - } - } else { - if rd, err = zstd.NewReader(rd); err != nil { - return nil, fmt.Errorf("failed to init zst reader: %w", err) - } - } - } - - return rd, nil -} - -func startProfiling(cpuProfile string, memProfile string) { - f, err := os.Create(cpuProfile) //nolint:gosec - if err != nil { - log.Fatal(err) - } - - if err = pprof.StartCPUProfile(f); err != nil { - log.Fatal(err) - } - - go func() { - time.Sleep(10 * time.Second) - pprof.StopCPUProfile() - println("CPU profile written to", cpuProfile) - - if memProfile != "" { - f, err := os.Create(memProfile) //nolint:gosec - if err != nil { - log.Fatal(err) - } - - if err := pprof.WriteHeapProfile(f); err != nil { - log.Fatal("writing heap profile:", err) - } - - println("Memory profile written to", memProfile) - } - }() -} - -type stringFlags []string - -func (i *stringFlags) String() string { - return "my string representation" -} +// Main is the entry point for catp CLI tool. +func Main(options ...func(o *Options)) error { //nolint:funlen,cyclop,gocognit,gocyclo,maintidx + r := &runner{} -func (i *stringFlags) Set(value string) error { - *i = append(*i, value) + flag.Var(flagFunc(func(v string) error { + r.filters = append(r.filters, filter{pass: true, ms: v, and: bytes.Split([]byte(v), []byte("^"))}) - return nil -} - -// Options allows behavior customisations. -type Options struct { - // PrepareLine is invoked for every line, if result is nil, line is skipped. - // You can use buf to avoid allocations for a result, and change its capacity if needed. - PrepareLine func(filename string, lineNr int, line []byte, buf *[]byte) []byte -} + return nil + }), "pass", "filter matching, may contain multiple AND patterns separated by ^,\n"+ + "if filter matches, line is passed to the output (may be filtered out by preceding -skip)\n"+ + "other -pass values are evaluated if preceding pass/skip did not match,\n"+ + "for example, you can use \"-pass bar^baz -pass foo -skip fo\" to only keep lines that have (bar AND baz) OR foo, but not fox") -// Main is the entry point for catp CLI tool. -func Main(options ...func(o *Options)) error { //nolint:funlen,cyclop,gocognit,gocyclo,maintidx - var ( - pass stringFlags - skip stringFlags - ) + flag.BoolFunc("pass-any", "finishes matching and gets the value even if previous -pass did not match,\n"+ + "if previous -skip matched, the line would be skipped any way.", func(s string) error { + r.filters = append(r.filters, filter{pass: true}) - r := &runner{} + return nil + }) - flag.Var(&pass, "pass", "filter matching, may contain multiple AND patterns separated by ^,\n"+ - "if filter matches, line is passed to the output (unless filtered out by -skip)\n"+ - "each -pass value is added with OR logic,\n"+ - "for example, you can use \"-pass bar^baz -pass foo\" to only keep lines that have (bar AND baz) OR foo") + flag.Var(flagFunc(func(v string) error { + r.filters = append(r.filters, filter{pass: false, ms: v, and: bytes.Split([]byte(v), []byte("^"))}) - flag.Var(&skip, "skip", "filter matching, may contain multiple AND patterns separated by ^,\n"+ - "if filter matches, line is removed from the output (even if it passed -pass)\n"+ - "each -skip value is added with OR logic,\n"+ + return nil + }), "skip", "filter matching, may contain multiple AND patterns separated by ^,\n"+ + "if filter matches, line is removed from the output (may be kept if it passed preceding -pass)\n"+ "for example, you can use \"-skip quux^baz -skip fooO\" to skip lines that have (quux AND baz) OR fooO") flag.IntVar(&r.parallel, "parallel", 1, "number of parallel readers if multiple files are provided\n"+ @@ -529,7 +57,6 @@ func Main(options ...func(o *Options)) error { //nolint:funlen,cyclop,gocognit,g cpuProfile := flag.String("dbg-cpu-prof", "", "write first 10 seconds of CPU profile to file") memProfile := flag.String("dbg-mem-prof", "", "write heap profile to file after 10 seconds") output := flag.String("output", "", "output to file (can have .gz or .zst ext for compression) instead of STDOUT") - flag.BoolVar(&r.finalPass, "final-pass", false, "don't check skip if pass was successful") flag.BoolVar(&r.noProgress, "no-progress", false, "disable progress printing") flag.BoolVar(&r.countLines, "l", false, "count lines") flag.Float64Var(&r.rateLimit, "rate-limit", 0, "output rate limit lines per second") @@ -675,28 +202,6 @@ func Main(options ...func(o *Options)) error { //nolint:funlen,cyclop,gocognit,g } } - if len(pass) > 0 { - for _, orFilter := range pass { - var og [][]byte - for _, andFilter := range strings.Split(orFilter, "^") { - og = append(og, []byte(andFilter)) - } - - r.pass = append(r.pass, og) - } - } - - if len(skip) > 0 { - for _, orFilter := range skip { - var og [][]byte - for _, andFilter := range strings.Split(orFilter, "^") { - og = append(og, []byte(andFilter)) - } - - r.skip = append(r.skip, og) - } - } - r.sizes = make(map[string]int64) r.progressJSON = *progressJSON r.pr = &progress.Progress{ diff --git a/cmd/catp/catp/app_test.go b/cmd/catp/catp/app_test.go index 82b4daa..b71fd84 100644 --- a/cmd/catp/catp/app_test.go +++ b/cmd/catp/catp/app_test.go @@ -10,9 +10,9 @@ import ( func Test_Main(t *testing.T) { os.Args = []string{ "catp", + "-skip", "dbg", "-pass", "linux^64", "-pass", "windows", - "-skip", "dbg", "-output", "testdata/filtered.log", "testdata/release-assets.yml", } diff --git a/cmd/catp/catp/catp.go b/cmd/catp/catp/catp.go new file mode 100644 index 0000000..4582453 --- /dev/null +++ b/cmd/catp/catp/catp.go @@ -0,0 +1,500 @@ +package catp + +import ( + "bufio" + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "log" + "os" + "path" + "runtime/pprof" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/bool64/progress" + "github.com/klauspost/compress/zstd" + gzip "github.com/klauspost/pgzip" + "golang.org/x/time/rate" +) + +var versionExtra []string + +type runner struct { + mu sync.Mutex + output io.Writer + + pr *progress.Progress + progressJSON string + + sizes map[string]int64 + matches int64 + totalBytes int64 + outDir string + + parallel int + + currentBytes int64 + currentBytesUncompressed int64 + currentLines int64 + + filters []filter + + currentFile *progress.CountingReader + currentTotal int64 + + lastErr error + lastStatusTime int64 + lastBytesUncompressed int64 + + rateLimit float64 + limiter *rate.Limiter + + noProgress bool + countLines bool + + hasOptions bool + options Options + + hasCompression bool +} + +type ( + filter struct { + pass bool // Skip is false. + and [][]byte + ms string + } + flagFunc func(v string) error +) + +func (f flagFunc) String() string { return "" } +func (f flagFunc) Set(value string) error { return f(value) } + +// humanReadableBytes converts bytes to a human-readable string (TB, GB, MB, KB, or bytes). +func humanReadableBytes(bytes int64) string { + const ( + Byte = 1 + KByte = Byte * 1024 + MByte = KByte * 1024 + GByte = MByte * 1024 + TByte = GByte * 1024 + ) + + switch { + case bytes >= TByte: + return fmt.Sprintf("%.2f TB", float64(bytes)/float64(TByte)) + case bytes >= GByte: + return fmt.Sprintf("%.2f GB", float64(bytes)/float64(GByte)) + case bytes >= MByte: + return fmt.Sprintf("%.2f MB", float64(bytes)/float64(MByte)) + case bytes >= KByte: + return fmt.Sprintf("%.2f KB", float64(bytes)/float64(KByte)) + default: + return fmt.Sprintf("%d B", bytes) + } +} + +// st renders Status as a string. +func (r *runner) st(s progress.Status) string { + var res string + + type progressJSON struct { + progress.Status + BytesCompleted int64 `json:"bytes_completed"` + BytesTotal int64 `json:"bytes_total"` + CurrentFilePercent float64 `json:"current_file_percent,omitempty"` + Matches *int64 `json:"matches,omitempty"` + ElapsedSeconds float64 `json:"elapsed_sec"` + RemainingSeconds float64 `json:"remaining_sec"` + } + + pr := progressJSON{ + Status: s, + } + + currentBytesUncompressed := atomic.LoadInt64(&r.currentBytesUncompressed) + currentBytes := atomic.LoadInt64(&r.currentBytes) + + if len(r.sizes) > 1 && r.parallel <= 1 { + pr.CurrentFilePercent = 100 * float64(r.currentFile.Bytes()) / float64(r.currentTotal) + + if s.LinesCompleted != 0 { + res = fmt.Sprintf("all: %.1f%% bytes read, %s: %.1f%% bytes read, %d lines processed, %.1f l/s, %.1f MB/s, elapsed %s, remaining %s", + s.DonePercent, s.Task, pr.CurrentFilePercent, s.LinesCompleted, s.SpeedLPS, s.SpeedMBPS, + s.Elapsed.Round(10*time.Millisecond).String(), s.Remaining.String()) + } else { + res = fmt.Sprintf("all: %.1f%% bytes read, %s: %.1f%% bytes read, %.1f MB/s, elapsed %s, remaining %s", + s.DonePercent, s.Task, pr.CurrentFilePercent, s.SpeedMBPS, + s.Elapsed.Round(10*time.Millisecond).String(), s.Remaining.String()) + } + } else { + if s.LinesCompleted != 0 { + if r.totalBytes == -1 { // STDIN + res = fmt.Sprintf("%s read, %d lines processed, %.1f l/s, %.1f MB/s, elapsed %s", + humanReadableBytes(s.BytesCompleted), s.LinesCompleted, s.SpeedLPS, s.SpeedMBPS, + s.Elapsed.Round(10*time.Millisecond).String()) + } else { + res = fmt.Sprintf("%s: %.1f%% bytes read, %d lines processed, %.1f l/s, %.1f MB/s, elapsed %s, remaining %s", + s.Task, s.DonePercent, s.LinesCompleted, s.SpeedLPS, s.SpeedMBPS, + s.Elapsed.Round(10*time.Millisecond).String(), s.Remaining.String()) + } + } else { + if r.totalBytes == -1 { + res = fmt.Sprintf("%s read, %.1f MB/s, elapsed %s", + humanReadableBytes(s.BytesCompleted), s.SpeedMBPS, + s.Elapsed.Round(10*time.Millisecond).String()) + } else { + res = fmt.Sprintf("%s: %.1f%% bytes read, %.1f MB/s, elapsed %s, remaining %s", + s.Task, s.DonePercent, s.SpeedMBPS, + s.Elapsed.Round(10*time.Millisecond).String(), s.Remaining.String()) + } + } + } + + if currentBytesUncompressed > currentBytes && r.hasCompression { + lastBytesUncompressed := atomic.LoadInt64(&r.lastBytesUncompressed) + lastStatusTime := atomic.LoadInt64(&r.lastStatusTime) + now := time.Now().Unix() + + if lastBytesUncompressed != 0 && lastStatusTime != 0 && lastStatusTime != now { + spdMPBS := (float64(currentBytesUncompressed-lastBytesUncompressed) / float64(now-lastStatusTime)) / (1024 * 1024) + res = strings.ReplaceAll(res, "MB/s", fmt.Sprintf("MB/s (uncomp %.1f MB/s)", spdMPBS)) + } + + atomic.StoreInt64(&r.lastStatusTime, now) + atomic.StoreInt64(&r.lastBytesUncompressed, currentBytesUncompressed) + } + + if len(r.filters) > 0 || r.options.PrepareLine != nil { + m := atomic.LoadInt64(&r.matches) + pr.Matches = &m + res += fmt.Sprintf(", matches %d", m) + } + + if r.progressJSON != "" { + pr.ElapsedSeconds = pr.Elapsed.Truncate(time.Second).Seconds() + pr.RemainingSeconds = pr.Remaining.Round(time.Second).Seconds() + pr.BytesCompleted = currentBytes + pr.BytesTotal = atomic.LoadInt64(&r.totalBytes) + + if j, err := json.Marshal(pr); err == nil { + if err = os.WriteFile(r.progressJSON, append(j, '\n'), 0o600); err != nil { + println("failed to write progress JSON: " + err.Error()) + } + } + } + + return res +} + +func (r *runner) readFile(rd io.Reader, out io.Writer) { + b := bufio.NewReaderSize(rd, 64*1024) + + _, err := io.Copy(out, b) + if err != nil { + log.Fatal(err) + } +} + +func (r *runner) scanFile(filename string, rd io.Reader, out io.Writer) { + s := bufio.NewScanner(rd) + s.Buffer(make([]byte, 64*1024), 10*1024*1024) + + lines := 0 + buf := make([]byte, 64*1024) + + linesPush := 1000 + if r.rateLimit < 100 { + linesPush = 1 + } + + flusher, _ := out.(interface { //nolint:errcheck // nil is good enough. + Flush() error + }) + + for s.Scan() { + lines++ + + if r.limiter != nil { + _ = r.limiter.Wait(context.Background()) //nolint:errcheck // No failure condition here. + } + + if lines >= linesPush { + atomic.AddInt64(&r.currentLines, int64(lines)) + lines = 0 + + if flusher != nil { + if r.parallel > 1 && r.outDir == "" { + r.mu.Lock() + if err := flusher.Flush(); err != nil { + r.lastErr = err + } + r.mu.Unlock() + } else { + if err := flusher.Flush(); err != nil { + r.lastErr = err + } + } + } + } + + line := s.Bytes() + + if !r.shouldWrite(line) { + continue + } + + if r.hasOptions { + if r.options.PrepareLine != nil { + buf = buf[:0] + line = r.options.PrepareLine(filename, lines, line, &buf) + } + + if line == nil { + continue + } + } + + atomic.AddInt64(&r.matches, 1) + + if r.parallel > 1 && r.outDir == "" { + r.mu.Lock() + } + + if _, err := out.Write(append(line, '\n')); err != nil { + r.lastErr = err + + if r.parallel > 1 && r.outDir == "" { + r.mu.Unlock() + } + + return + } + + if r.parallel > 1 && r.outDir == "" { + r.mu.Unlock() + } + } + + atomic.AddInt64(&r.currentLines, int64(lines)) + + if err := s.Err(); err != nil { + r.mu.Lock() + defer r.mu.Unlock() + + r.lastErr = err + } +} + +func (r *runner) shouldWrite(line []byte) bool { + shouldWrite := true + + for _, f := range r.filters { + if f.pass { + shouldWrite = false + } + + andMatched := true + + for _, andFilter := range f.and { + if !bytes.Contains(line, andFilter) { + andMatched = false + + break + } + } + + if andMatched { + return f.pass + } + } + + return shouldWrite +} + +func (r *runner) cat(filename string) (err error) { //nolint:gocyclo + var rd io.Reader + + if filename == "-" { + rd = os.Stdin + } else { + file, err := os.Open(filename) //nolint:gosec + if err != nil { + return err + } + + defer func() { + if clErr := file.Close(); clErr != nil && err == nil { + err = clErr + } + }() + + rd = io.Reader(file) + } + + if !r.noProgress { + cr := progress.NewCountingReader(rd) + cr.SetBytes(&r.currentBytes) + cr.SetLines(nil) + + if r.parallel <= 1 { + cr = progress.NewCountingReader(rd) + cr.SetLines(nil) + r.currentFile = cr + r.currentTotal = r.sizes[filename] + } + + rd = cr + } + + if rd, err = r.openReader(rd, filename); err != nil { + return err + } + + if !r.noProgress { + crl := progress.NewCountingReader(rd) + + crl.SetBytes(&r.currentBytesUncompressed) + crl.SetLines(nil) + + rd = crl + } + + out := r.output + + if r.outDir != "" { + fn := r.outDir + "/" + path.Base(filename) + + w, err := os.Create(fn) //nolint:gosec + if err != nil { + return err + } + + defer func() { + if clErr := w.Close(); clErr != nil && err == nil { + err = clErr + } + }() + + out = w + + if strings.HasSuffix(fn, ".gz") { + z := gzip.NewWriter(w) + out = z + + defer func() { + if clErr := z.Close(); clErr != nil && err == nil { + err = clErr + } + }() + } else if strings.HasSuffix(fn, ".zst") { + z, err := zstdWriter(w) + if err != nil { + return err + } + + out = z + + defer func() { + if clErr := z.Close(); clErr != nil && err == nil { + err = clErr + } + }() + } + } + + if r.parallel <= 1 && !r.noProgress { + r.pr.Start(func(t *progress.Task) { + t.TotalBytes = func() int64 { + return r.totalBytes + } + + t.CurrentBytes = r.currentFile.Bytes + t.CurrentLines = func() int64 { return atomic.LoadInt64(&r.currentLines) } + t.Task = filename + t.Continue = true + t.PrintOnStart = true + }) + } + + if r.rateLimit > 0 { + r.limiter = rate.NewLimiter(rate.Limit(r.rateLimit), 100) + } + + if len(r.filters) > 0 || r.parallel > 1 || r.hasOptions || r.countLines || r.rateLimit > 0 { + r.scanFile(filename, rd, out) + } else { + r.readFile(rd, out) + } + + if r.parallel <= 1 && !r.noProgress { + r.pr.Stop() + } + + return r.lastErr +} + +func (r *runner) openReader(rd io.Reader, filename string) (io.Reader, error) { + var err error + + switch { + case strings.HasSuffix(filename, ".gz"): + if rd, err = gzip.NewReader(rd); err != nil { + return nil, fmt.Errorf("failed to init gzip reader: %w", err) + } + case strings.HasSuffix(filename, ".zst"): + if r.parallel >= 1 { + if rd, err = zstdReader(rd); err != nil { + return nil, fmt.Errorf("failed to init zst reader: %w", err) + } + } else { + if rd, err = zstd.NewReader(rd); err != nil { + return nil, fmt.Errorf("failed to init zst reader: %w", err) + } + } + } + + return rd, nil +} + +func startProfiling(cpuProfile string, memProfile string) { + f, err := os.Create(cpuProfile) //nolint:gosec + if err != nil { + log.Fatal(err) + } + + if err = pprof.StartCPUProfile(f); err != nil { + log.Fatal(err) + } + + go func() { + time.Sleep(10 * time.Second) + pprof.StopCPUProfile() + println("CPU profile written to", cpuProfile) + + if memProfile != "" { + f, err := os.Create(memProfile) //nolint:gosec + if err != nil { + log.Fatal(err) + } + + if err := pprof.WriteHeapProfile(f); err != nil { + log.Fatal("writing heap profile:", err) + } + + println("Memory profile written to", memProfile) + } + }() +} + +// Options allows behavior customisations. +type Options struct { + // PrepareLine is invoked for every line, if result is nil, line is skipped. + // You can use buf to avoid allocations for a result, and change its capacity if needed. + PrepareLine func(filename string, lineNr int, line []byte, buf *[]byte) []byte +}