Skip to content

Commit 0d476a7

Browse files
committed
wip - started implementing speed factors
1 parent fe65dd9 commit 0d476a7

1 file changed

Lines changed: 65 additions & 32 deletions

File tree

cmd/slowql-replayer/main.go

Lines changed: 65 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -35,17 +35,19 @@ type options struct {
3535
loglvl string
3636
pprof string
3737
workers int
38+
factor float64
3839
usePass bool
3940
noDryRun bool
4041
}
4142

4243
type database struct {
43-
kind slowql.Kind
44-
datasource string
45-
drv *sql.DB
46-
noDryRun bool
47-
logger *logrus.Logger
48-
wrks int
44+
kind slowql.Kind
45+
datasource string
46+
drv *sql.DB
47+
noDryRun bool
48+
logger *logrus.Logger
49+
wrks int
50+
speedFactor float64
4951
}
5052

5153
type results struct {
@@ -73,13 +75,17 @@ func main() {
7375
flag.StringVar(&opt.loglvl, "l", "info", "Logging level")
7476
flag.StringVar(&opt.pprof, "pprof", "", "pprof server address")
7577
flag.IntVar(&opt.workers, "w", 100, "Number of maximum simultaneous connections to database")
78+
flag.Float64Var(&opt.factor, "x", 1, "Speed factor")
7679
flag.BoolVar(&opt.usePass, "p", false, "Use a password to connect to database")
7780
flag.BoolVar(&opt.noDryRun, "no-dry-run", false, "Replay the requests on the database for real")
7881
flag.Parse()
7982

80-
if err := opt.parse(); err != nil {
83+
if errs := opt.parse(); len(errs) > 0 {
8184
flag.Usage()
82-
logrus.Fatalf("cannot parse options: %s", err)
85+
for _, e := range errs {
86+
logrus.Warn(e)
87+
}
88+
logrus.Fatal("cannot parse options")
8389
}
8490

8591
db, err := opt.createDB()
@@ -117,6 +123,10 @@ func main() {
117123
db.logger.Warn("replaying with dry run")
118124
}
119125

126+
db.logger.Infof("replay started on %s", time.Now().Format("Mon Jan 2 15:04:05"))
127+
db.logger.Infof("estimated time of end: %s", time.Now().
128+
Add(realExec/time.Duration(db.speedFactor)).Format("Mon Jan 2 15:04:05"))
129+
120130
r, err := db.replay(f)
121131
if err != nil {
122132
db.logger.Fatalf("cannot replay %s: %s", opt.kind, err)
@@ -130,32 +140,33 @@ func main() {
130140

131141
// parse ensures that no options has been omitted. It also asks for a password
132142
// if it is required
133-
func (o *options) parse() error {
143+
func (o *options) parse() []error {
144+
var errs []error
134145
if o.user == "" {
135-
return errors.New("no user provided")
146+
errs = append(errs, errors.New("no user provided"))
136147
} else if o.host == "" {
137-
return errors.New("no host provided")
148+
errs = append(errs, errors.New("no host provided"))
138149
} else if o.file == "" {
139-
return errors.New("no slow query log file provided")
150+
errs = append(errs, errors.New("no slow query log file provided"))
140151
} else if o.kind == "" {
141-
return errors.New("no database kind provided")
152+
errs = append(errs, errors.New("no database kind provided"))
142153
} else if o.database == "" {
143-
return errors.New("no database provided")
154+
errs = append(errs, errors.New("no database provided"))
144155
} else if o.workers <= 0 {
145-
return errors.New("cannot create negative number or zero workers")
156+
errs = append(errs, errors.New("cannot create negative number or zero workers"))
146157
}
147158

148159
if o.usePass {
149160
fmt.Printf("Password: ")
150161
bytes, err := term.ReadPassword(syscall.Stdin)
151162
if err != nil {
152-
return err
163+
errs = append(errs, err)
153164
}
154165
fmt.Println()
155166

156167
o.pass = string(bytes)
157168
}
158-
return nil
169+
return errs
159170
}
160171

161172
// createDB creates a database object according to what has been specified in
@@ -211,6 +222,9 @@ func (o options) createDB() (*database, error) {
211222
db.wrks = o.workers
212223
db.logger.Debugf("workers number set to %d", db.wrks)
213224

225+
db.speedFactor = o.factor
226+
db.logger.Debugf("speed factor: %f", db.speedFactor)
227+
214228
maxOpen := db.wrks * 2
215229
maxIdle := db.wrks / 4
216230

@@ -231,22 +245,23 @@ func (db *database) replay(f io.Reader) (results, error) {
231245

232246
jobs := make(chan job, 65535)
233247
errors := make(chan error, 16384)
248+
queries := make(chan int, 65535)
234249
var wg sync.WaitGroup
235250

236251
db.logger.Debug("starting workers pool")
237252
var workersCounter int
238253
for i := 0; i < db.wrks; i++ {
239254
wg.Add(1)
240255
workersCounter++
241-
go db.worker(jobs, errors, db.noDryRun, &wg)
256+
go db.worker(jobs, errors, queries, db.noDryRun, &wg)
242257
}
243258
db.logger.Debugf("created %d workers successfully", workersCounter)
244259
db.logger.Debug("starting errors collector")
245260
go r.errorsCollector(errors)
246261

247-
db.logger.Infof("replay started on %s", time.Now().Format("Mon Jan 2 15:04:05"))
248262
s := newSpinner(34)
249263
s.Start()
264+
go updateSpinner(s, queries)
250265

251266
firstPass := true
252267

@@ -256,13 +271,11 @@ func (db *database) replay(f io.Reader) (results, error) {
256271
for {
257272
q := p.GetNext()
258273
if q == (query.Query{}) {
259-
s.Stop()
260274
break
261275
}
262276
db.logger.Tracef("query: %s", q.Query)
263277

264278
r.queries++
265-
s.Suffix = " queries replayed: " + strconv.Itoa(r.queries)
266279

267280
// We need a reference time
268281
if firstPass {
@@ -272,25 +285,30 @@ func (db *database) replay(f io.Reader) (results, error) {
272285

273286
var j job
274287
delta := q.Time.Sub(reference)
275-
j.idle = start.Add(delta)
288+
diviser := time.Duration(db.speedFactor)
289+
if diviser >= 1.0 {
290+
j.idle = start.Add(delta / diviser)
291+
} else if diviser > 0.0 {
292+
j.idle = start.Add(delta * (1 / diviser))
293+
} else {
294+
j.idle = start.Add(delta)
295+
}
276296
j.query = q.Query
277297
db.logger.Tracef("next sleeping time: %s", j.idle)
278-
// time.Sleep(sleeping)
298+
279299
jobs <- j
280-
// For MariaDB, when there is multiple queries in a short amount of
281-
// time, the Time field is not repeated, so we do not have to update
282-
// the previous date.
283-
// if now != (time.Time{}) {
284-
// previousDate = now
285-
// }
286300
}
287301
close(jobs)
288-
db.logger.Debug("closed queries channel")
302+
db.logger.Debug("closed jobs channel")
289303

290304
wg.Wait()
291305
close(errors)
292306
db.logger.Debug("closed errors channel")
293307

308+
s.Stop()
309+
close(queries)
310+
db.logger.Debug("spinned stopped and update channel closed")
311+
294312
r.duration = time.Since(start)
295313
db.logger.Infof("replay ended on %s", time.Now().Format("Mon Jan 2 15:04:05"))
296314
return r, nil
@@ -333,6 +351,7 @@ Statistics
333351
├─ Queries: %d
334352
├─ Errors: %d
335353
├─ Queries success rate: %s
354+
├─ Speed factor: %.4f
336355
├─ Duration difference: %s
337356
└─ Replayer speed: %s
338357
@@ -351,6 +370,7 @@ Statistics
351370
Bold(r.queries),
352371
Bold(r.errors),
353372
Bold(prcSuccess),
373+
Bold(o.factor),
354374
Bold(durationDelta),
355375
Bold(prcSpeedStr),
356376
// footnote
@@ -362,7 +382,19 @@ func newSpinner(t int) *spinner.Spinner {
362382
return spinner.New(spinner.CharSets[t], 100*time.Millisecond)
363383
}
364384

365-
func (db database) worker(jobs chan job, errors chan error, noDryRun bool, wg *sync.WaitGroup) {
385+
func updateSpinner(s *spinner.Spinner, newQueries chan int) {
386+
var queries int
387+
for {
388+
_, ok := <-newQueries
389+
if !ok {
390+
return
391+
}
392+
queries++
393+
s.Suffix = " queries replayed: " + strconv.Itoa(queries)
394+
}
395+
}
396+
397+
func (db database) worker(jobs chan job, errors chan error, queries chan int, noDryRun bool, wg *sync.WaitGroup) {
366398
defer wg.Done()
367399
for {
368400
j, ok := <-jobs
@@ -378,12 +410,13 @@ func (db database) worker(jobs chan job, errors chan error, noDryRun bool, wg *s
378410
rows, err := db.drv.Query(j.query)
379411
if err != nil {
380412
errors <- err
381-
db.logger.Debugf("failed to execute query:\n%s\nerror: %s", j.query, err)
413+
db.logger.Tracef("failed to execute query:\n%s\nerror: %s", j.query, err)
382414
}
383415
if rows != nil {
384416
rows.Close()
385417
}
386418
}
419+
queries <- 42
387420
}
388421
}
389422

0 commit comments

Comments
 (0)