-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy paththread_groups.lua
More file actions
133 lines (111 loc) · 4.14 KB
/
Copy paththread_groups.lua
File metadata and controls
133 lines (111 loc) · 4.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
-- Copyright (C) 2020-2022 Dmitrii Maximenko <d.s.maximenko@gmail.com>
-- Use of this source code is governed by an MIT-style
-- license that can be found in the LICENSE file or at
-- https://opensource.org/licenses/MIT
ffi = require("ffi")
ffi.cdef[[
void sb_event_start(int thread_id);
void sb_event_stop(int thread_id);
bool sb_more_events(int thread_id);
double sb_rand_uniform_double(void);
static double sb_rand_exp(double lambda, double uniform_parameter)
{
return -lambda * log(1 - uniform_parameter);
}
typedef struct timeval {
long tv_sec;
long tv_usec;
} timeval;
int gettimeofday(struct timeval* t, void* tzp);
int usleep(unsigned int usec);
]]
gettimeofday_struct = ffi.new("struct timeval")
local function gettimeofday()
ffi.C.gettimeofday(gettimeofday_struct, nil)
return tonumber(gettimeofday_struct.tv_sec) * 1000000 + tonumber(gettimeofday_struct.tv_usec)
end
-- ----------------------------------------------------------------------
-- This event loop can work with muliple groups of threads.
-- For that purpose global variable 'thread_groups' should
-- be defined in benchmark script.end
-- thread_groups = {{id=i, event=func1, rate_controller=func2, rate=k, thread_amount=n}, {}, ...}
-- id is any number or string.
-- func1 is function as event(), func2 is functions witch controls sleeps period (optional),
-- n is amount of threads in the group (im sum of groups threads_amount less then in --threads
-- than all other threads go to the last group)
-- rate is integer corresponding rate of the thread
-- ----------------------------------------------------------------------
function thread_run(thread_id)
while ffi.C.sb_more_events(thread_id) do
ffi.C.sb_event_start(thread_id)
local event_func = nil
local rate_controller_func = nil
local rate = 0
local acc = 0
for i = 1, #thread_groups
do
acc = acc + thread_groups[i].thread_amount
if thread_id < acc
then
event_func = thread_groups[i].event
rate_controller_func = thread_groups[i].rate_controller
rate = thread_groups[i].rate
break
end
-- if total amount of threads more then sum of amounts in groups
-- all other threads rely to the last group
if i == #thread_groups
then
event_func = thread_groups[i].event
rate_controller_func = thread_groups[i].rate_controller
rate = thread_groups[i].rate
end
end
local success, ret
repeat
success, ret = pcall(event_func, thread_id)
if not success then
if type(ret) == "table" and
ret.errcode == sysbench.error.RESTART_EVENT
then
if sysbench.hooks.before_restart_event then
sysbench.hooks.before_restart_event(ret)
end
else
error(ret, 2) -- propagate unknown errors
end
end
until success
-- Stop the benchmark if event() returns a value other than nil or false
if ret then
break
end
ffi.C.sb_event_stop(thread_id)
-- It supposed that rete controller return execution flow after some pause
if rate_controller_func ~= nil and rate > 0
then
pcall(rate_controller_func, rate)
end
end
end
-- This function controls rate of thread's events in that way:
-- It stores the last time it was called and calculate random
-- exponentially distributed interval to the next event (with median 1sec/rate).
-- When it called again it measure time and if it passed less time then 'inteval'
-- it sleeps for the rest part of interval.
function default_rate_controller(rate)
local Tcur = gettimeofday()
local lambda = 1e6 / rate
local uniform_parameter = ffi.C.sb_rand_uniform_double()
local interval = -lambda * math.log(uniform_parameter)
if Tnext == nil
then
Tnext = Tcur + interval
else
Tnext = Tnext + interval
if Tnext > Tcur
then
ffi.C.usleep(Tnext - Tcur);
end
end
end