mirror of
https://github.com/redis/redis.git
synced 2026-01-14 23:08:08 -05:00
## Introduction Redis introduced IO Thread in 6.0, allowing IO threads to handle client request reading, command parsing and reply writing, thereby improving performance. The current IO thread implementation has a few drawbacks. - The main thread is blocked during IO thread read/write operations and must wait for all IO threads to complete their current tasks before it can continue execution. In other words, the entire process is synchronous. This prevents the efficient utilization of multi-core CPUs for parallel processing. - When the number of clients and requests increases moderately, it causes all IO threads to reach full CPU utilization due to the busy wait mechanism used by the IO threads. This makes it challenging for us to determine which part of Redis has reached its bottleneck. - When IO threads are enabled with TLS and io-threads-do-reads, a disconnection of a connection with pending data may result in it being assigned to multiple IO threads simultaneously. This can cause race conditions and trigger assertion failures. Related issue: https://github.com/redis/redis/issues/12540 Therefore, we designed an asynchronous IO threads solution. The IO threads adopt an event-driven model, with the main thread dedicated to command processing, meanwhile, the IO threads handle client read and write operations in parallel. ## Implementation ### Overall As before, we did not change the fact that all client commands must be executed on the main thread, because Redis was originally designed to be single-threaded, and processing commands in a multi-threaded manner would inevitably introduce numerous race and synchronization issues. But now each IO thread has independent event loop, therefore, IO threads can use a multiplexing approach to handle client read and write operations, eliminating the CPU overhead caused by busy-waiting. the execution process can be briefly described as follows: the main thread assigns clients to IO threads after accepting connections, IO threads will notify the main thread when clients finish reading and parsing queries, then the main thread processes queries from IO threads and generates replies, IO threads handle writing reply to clients after receiving clients list from main thread, and then continue to handle client read and write events. ### Each IO thread has independent event loop We now assign each IO thread its own event loop. This approach eliminates the need for the main thread to perform the costly `epoll_wait` operation for handling connections (except for specific ones). Instead, the main thread processes requests from the IO threads and hands them back once completed, fully offloading read and write events to the IO threads. Additionally, all TLS operations, including handling pending data, have been moved entirely to the IO threads. This resolves the issue where io-threads-do-reads could not be used with TLS. ### Event-notified client queue To facilitate communication between the IO threads and the main thread, we designed an event-notified client queue. Each IO thread and the main thread have two such queues to store clients waiting to be processed. These queues are also integrated with the event loop to enable handling. We use pthread_mutex to ensure the safety of queue operations, as well as data visibility and ordering, and race conditions are minimized, as each IO thread and the main thread operate on independent queues, avoiding thread suspension due to lock contention. And we implemented an event notifier based on `eventfd` or `pipe` to support event-driven handling. ### Thread safety Since the main thread and IO threads can execute in parallel, we must handle data race issues carefully. **client->flags** The primary tasks of IO threads are reading and writing, i.e. `readQueryFromClient` and `writeToClient`. However, IO threads and the main thread may concurrently modify or access `client->flags`, leading to potential race conditions. To address this, we introduced an io-flags variable to record operations performed by IO threads, thereby avoiding race conditions on `client->flags`. **Pause IO thread** In the main thread, we may want to operate data of IO threads, maybe uninstall event handler, access or operate query/output buffer or resize event loop, we need a clean and safe context to do that. We pause IO thread in `IOThreadBeforeSleep`, do some jobs and then resume it. To avoid thread suspended, we use busy waiting to confirm the target status. Besides we use atomic variable to make sure memory visibility and ordering. We introduce these functions to pause/resume IO Threads as below. ``` pauseIOThread, resumeIOThread pauseAllIOThreads, resumeAllIOThreads pauseIOThreadsRange, resumeIOThreadsRange ``` Testing has shown that `pauseIOThread` is highly efficient, allowing the main thread to execute nearly 200,000 operations per second during stress tests. Similarly, `pauseAllIOThreads` with 8 IO threads can handle up to nearly 56,000 operations per second. But operations performed between pausing and resuming IO threads must be quick; otherwise, they could cause the IO threads to reach full CPU utilization. **freeClient and freeClientAsync** The main thread may need to terminate a client currently running on an IO thread, for example, due to ACL rule changes, reaching the output buffer limit, or evicting a client. In such cases, we need to pause the IO thread to safely operate on the client. **maxclients and maxmemory-clients updating** When adjusting `maxclients`, we need to resize the event loop for all IO threads. Similarly, when modifying `maxmemory-clients`, we need to traverse all clients to calculate their memory usage. To ensure safe operations, we pause all IO threads during these adjustments. **Client info reading** The main thread may need to read a client’s fields to generate a descriptive string, such as for the `CLIENT LIST` command or logging purposes. In such cases, we need to pause the IO thread handling that client. If information for all clients needs to be displayed, all IO threads must be paused. **Tracking redirect** Redis supports the tracking feature and can even send invalidation messages to a connection with a specified ID. But the target client may be running on IO thread, directly manipulating the client’s output buffer is not thread-safe, and the IO thread may not be aware that the client requires a response. In such cases, we pause the IO thread handling the client, modify the output buffer, and install a write event handler to ensure proper handling. **clientsCron** In the `clientsCron` function, the main thread needs to traverse all clients to perform operations such as timeout checks, verifying whether they have reached the soft output buffer limit, resizing the output/query buffer, or updating memory usage. To safely operate on a client, the IO thread handling that client must be paused. If we were to pause the IO thread for each client individually, the efficiency would be very low. Conversely, pausing all IO threads simultaneously would be costly, especially when there are many IO threads, as clientsCron is invoked relatively frequently. To address this, we adopted a batched approach for pausing IO threads. At most, 8 IO threads are paused at a time. The operations mentioned above are only performed on clients running in the paused IO threads, significantly reducing overhead while maintaining safety. ### Observability In the current design, the main thread always assigns clients to the IO thread with the least clients. To clearly observe the number of clients handled by each IO thread, we added the new section in INFO output. The `INFO THREADS` section can show the client count for each IO thread. ``` # Threads io_thread_0:clients=0 io_thread_1:clients=2 io_thread_2:clients=2 ``` Additionally, in the `CLIENT LIST` output, we also added a field to indicate the thread to which each client is assigned. `id=244 addr=127.0.0.1:41870 laddr=127.0.0.1:6379 ... resp=2 lib-name= lib-ver= io-thread=1` ## Trade-off ### Special Clients For certain special types of clients, keeping them running on IO threads would result in severe race issues that are difficult to resolve. Therefore, we chose not to offload these clients to the IO threads. For replica, monitor, subscribe, and tracking clients, main thread may directly write them a reply when conditions are met. Race issues are difficult to resolve, so we have them processed in the main thread. This includes the Lua debug clients as well, since we may operate connection directly. For blocking client, after the IO thread reads and parses a command and hands it over to the main thread, if the client is identified as a blocking type, it will be remained in the main thread. Once the blocking operation completes and the reply is generated, the client is transferred back to the IO thread to send the reply and wait for event triggers. ### Clients Eviction To support client eviction, it is necessary to update each client’s memory usage promptly during operations such as read, write, or command execution. However, when a client operates on an IO thread, it is not feasible to update the memory usage immediately due to the risk of data races. As a result, memory usage can only be updated either in the main thread while processing commands or in the `ClientsCron` periodically. The downside of this approach is that updates might experience a delay of up to one second, which could impact the precision of memory management for eviction. To avoid incorrectly evicting clients. We adopted a best-effort compensation solution, when we decide to eviction a client, we update its memory usage again before evicting, if the memory used by the client does not decrease or memory usage bucket is not changed, then we will evict it, otherwise, not evict it. However, we have not completely solved this problem. Due to the delay in memory usage updates, it may lead us to make incorrect decisions about the need to evict clients. ### Defragment In the majority of cases we do NOT use the data from argv directly in the db. 1. key names We store a copy that we allocate in the main thread, see `sdsdup()` in `dbAdd()`. 2. hash key and value We store key as hfield and store value as sds, see `hfieldNew()` and `sdsdup()` in `hashTypeSet()`. 3. other datatypes They don't even use SDS, so there is no reference issues. But in some cases client the data from argv may be retain by the main thread. As a result, during fragmentation cleanup, we need to move allocations from the IO thread’s arena to the main thread’s arena. We always allocate new memory in the main thread’s arena, but the memory released by IO threads may not yet have been reclaimed. This ultimately causes the fragmentation rate to be higher compared to creating and allocating entirely within a single thread. The following cases below will lead to memory allocated by the IO thread being kept by the main thread. 1. string related command: `append`, `getset`, `mset` and `set`. If `tryObjectEncoding()` does not change argv, we will keep it directly in the main thread, see the code in `tryObjectEncoding()`(specifically `trimStringObjectIfNeeded()`) 2. block related command. the key names will be kept in `c->db->blocking_keys`. 3. watch command the key names will be kept in `c->db->watched_keys`. 4. [s]subscribe command channel name will be kept in `serverPubSubChannels`. 5. script load command script will be kept in `server.lua_scripts`. 7. some module API: `RM_RetainString`, `RM_HoldString` Those issues will be handled in other PRs. ## Testing ### Functional Testing The commit with enabling IO Threads has passed all TCL tests, but we did some changes: **Client query buffer**: In the original code, when using a reusable query buffer, ownership of the query buffer would be released after the command was processed. However, with IO threads enabled, the client transitions from an IO thread to the main thread for processing. This causes the ownership release to occur earlier than the command execution. As a result, when IO threads are enabled, the client's information will never indicate that a shared query buffer is in use. Therefore, we skip the corresponding query buffer tests in this case. **Defragment**: Add a new defragmentation test to verify the effect of io threads on defragmentation. **Command delay**: For deferred clients in TCL tests, due to clients being assigned to different threads for execution, delays may occur. To address this, we introduced conditional waiting: the process proceeds to the next step only when the `client list` contains the corresponding commands. ### Sanitizer Testing The commit passed all TCL tests and reported no errors when compiled with the `fsanitizer=thread` and `fsanitizer=address` options enabled. But we made the following modifications: we suppressed the sanitizer warnings for clients with watched keys when updating `client->flags`, we think IO threads read `client->flags`, but never modify it or read the `CLIENT_DIRTY_CAS` bit, main thread just only modifies this bit, so there is no actual data race. ## Others ### IO thread number In the new multi-threaded design, the main thread is primarily focused on command processing to improve performance. Typically, the main thread does not handle regular client I/O operations but is responsible for clients such as replication and tracking clients. To avoid breaking changes, we still consider the main thread as the first IO thread. When the io-threads configuration is set to a low value (e.g., 2), performance does not show a significant improvement compared to a single-threaded setup for simple commands (such as SET or GET), as the main thread does not consume much CPU for these simple operations. This results in underutilized multi-core capacity. However, for more complex commands, having a low number of IO threads may still be beneficial. Therefore, it’s important to adjust the `io-threads` based on your own performance tests. Additionally, you can clearly monitor the CPU utilization of the main thread and IO threads using `top -H -p $redis_pid`. This allows you to easily identify where the bottleneck is. If the IO thread is the bottleneck, increasing the `io-threads` will improve performance. If the main thread is the bottleneck, the overall performance can only be scaled by increasing the number of shards or replicas. --------- Co-authored-by: debing.sun <debing.sun@redis.com> Co-authored-by: oranagra <oran@redislabs.com>
309 lines
11 KiB
Tcl
309 lines
11 KiB
Tcl
set testmodule [file normalize tests/modules/blockedclient.so]
|
|
|
|
start_server {tags {"modules"}} {
|
|
r module load $testmodule
|
|
|
|
test {Locked GIL acquisition} {
|
|
assert_match "OK" [r acquire_gil]
|
|
}
|
|
|
|
test {Locked GIL acquisition during multi} {
|
|
r multi
|
|
r acquire_gil
|
|
assert_equal {{Blocked client is not supported inside multi}} [r exec]
|
|
}
|
|
|
|
test {Locked GIL acquisition from RM_Call} {
|
|
assert_equal {Blocked client is not allowed} [r do_rm_call acquire_gil]
|
|
}
|
|
|
|
test {Blocking command are not block the client on RM_Call} {
|
|
r lpush l test
|
|
assert_equal [r do_rm_call blpop l 0] {l test}
|
|
|
|
r lpush l test
|
|
assert_equal [r do_rm_call brpop l 0] {l test}
|
|
|
|
r lpush l1 test
|
|
assert_equal [r do_rm_call brpoplpush l1 l2 0] {test}
|
|
assert_equal [r do_rm_call brpop l2 0] {l2 test}
|
|
|
|
r lpush l1 test
|
|
assert_equal [r do_rm_call blmove l1 l2 LEFT LEFT 0] {test}
|
|
assert_equal [r do_rm_call brpop l2 0] {l2 test}
|
|
|
|
r ZADD zset1 0 a 1 b 2 c
|
|
assert_equal [r do_rm_call bzpopmin zset1 0] {zset1 a 0}
|
|
assert_equal [r do_rm_call bzpopmax zset1 0] {zset1 c 2}
|
|
|
|
r xgroup create s g $ MKSTREAM
|
|
r xadd s * foo bar
|
|
assert {[r do_rm_call xread BLOCK 0 STREAMS s 0-0] ne {}}
|
|
assert {[r do_rm_call xreadgroup group g c BLOCK 0 STREAMS s >] ne {}}
|
|
|
|
assert {[r do_rm_call blpop empty_list 0] eq {}}
|
|
assert {[r do_rm_call brpop empty_list 0] eq {}}
|
|
assert {[r do_rm_call brpoplpush empty_list1 empty_list2 0] eq {}}
|
|
assert {[r do_rm_call blmove empty_list1 empty_list2 LEFT LEFT 0] eq {}}
|
|
|
|
assert {[r do_rm_call bzpopmin empty_zset 0] eq {}}
|
|
assert {[r do_rm_call bzpopmax empty_zset 0] eq {}}
|
|
|
|
r xgroup create empty_stream g $ MKSTREAM
|
|
assert {[r do_rm_call xread BLOCK 0 STREAMS empty_stream $] eq {}}
|
|
assert {[r do_rm_call xreadgroup group g c BLOCK 0 STREAMS empty_stream >] eq {}}
|
|
|
|
}
|
|
|
|
test {Monitor disallow inside RM_Call} {
|
|
set e {}
|
|
catch {
|
|
r do_rm_call monitor
|
|
} e
|
|
set e
|
|
} {*ERR*DENY BLOCKING*}
|
|
|
|
test {subscribe disallow inside RM_Call} {
|
|
set e {}
|
|
catch {
|
|
r do_rm_call subscribe x
|
|
} e
|
|
set e
|
|
} {*ERR*DENY BLOCKING*}
|
|
|
|
test {RM_Call from blocked client} {
|
|
r hset hash foo bar
|
|
r do_bg_rm_call hgetall hash
|
|
} {foo bar}
|
|
|
|
test {RM_Call from blocked client with script mode} {
|
|
r do_bg_rm_call_format S hset k foo bar
|
|
} {1}
|
|
|
|
test {RM_Call from blocked client with oom mode} {
|
|
r config set maxmemory 1
|
|
# will set server.pre_command_oom_state to 1
|
|
assert_error {OOM command not allowed*} {r hset hash foo bar}
|
|
r config set maxmemory 0
|
|
# now its should be OK to call OOM commands
|
|
r do_bg_rm_call_format M hset k1 foo bar
|
|
} {1} {needs:config-maxmemory}
|
|
|
|
test {RESP version carries through to blocked client} {
|
|
for {set client_proto 2} {$client_proto <= 3} {incr client_proto} {
|
|
if {[lsearch $::denytags "resp3"] >= 0} {
|
|
if {$client_proto == 3} {continue}
|
|
} elseif {$::force_resp3} {
|
|
if {$client_proto == 2} {continue}
|
|
}
|
|
r hello $client_proto
|
|
r readraw 1
|
|
set ret [r do_fake_bg_true]
|
|
if {$client_proto == 2} {
|
|
assert_equal $ret {:1}
|
|
} else {
|
|
assert_equal $ret "#t"
|
|
}
|
|
r readraw 0
|
|
r hello 2
|
|
}
|
|
}
|
|
|
|
foreach call_type {nested normal} {
|
|
test "Busy module command - $call_type" {
|
|
set busy_time_limit 50
|
|
set old_time_limit [lindex [r config get busy-reply-threshold] 1]
|
|
r config set busy-reply-threshold $busy_time_limit
|
|
set rd [redis_deferring_client]
|
|
|
|
# run command that blocks until released
|
|
set start [clock clicks -milliseconds]
|
|
if {$call_type == "nested"} {
|
|
$rd do_rm_call slow_fg_command 0
|
|
} else {
|
|
$rd slow_fg_command 0
|
|
}
|
|
$rd flush
|
|
|
|
# send another command after the blocked one, to make sure we don't attempt to process it
|
|
$rd ping
|
|
$rd flush
|
|
|
|
# make sure we get BUSY error, and that we didn't get it too early
|
|
wait_for_condition 50 100 {
|
|
([catch {r ping} reply] == 1) &&
|
|
([string match {*BUSY Slow module operation*} $reply])
|
|
} else {
|
|
fail "Failed waiting for busy slow response"
|
|
}
|
|
assert_morethan_equal [expr [clock clicks -milliseconds]-$start] $busy_time_limit
|
|
|
|
# abort the blocking operation
|
|
r stop_slow_fg_command
|
|
wait_for_condition 50 100 {
|
|
[catch {r ping} e] == 0
|
|
} else {
|
|
fail "Failed waiting for busy command to end"
|
|
}
|
|
assert_equal [$rd read] "1"
|
|
assert_equal [$rd read] "PONG"
|
|
|
|
# run command that blocks for 200ms
|
|
set start [clock clicks -milliseconds]
|
|
if {$call_type == "nested"} {
|
|
$rd do_rm_call slow_fg_command 200000
|
|
} else {
|
|
$rd slow_fg_command 200000
|
|
}
|
|
$rd flush
|
|
after 10 ;# try to make sure redis started running the command before we proceed
|
|
|
|
# make sure we didn't get BUSY error, it simply blocked till the command was done
|
|
r ping
|
|
assert_morethan_equal [expr [clock clicks -milliseconds]-$start] 200
|
|
$rd read
|
|
|
|
$rd close
|
|
r config set busy-reply-threshold $old_time_limit
|
|
}
|
|
}
|
|
|
|
test {RM_Call from blocked client} {
|
|
set busy_time_limit 50
|
|
set old_time_limit [lindex [r config get busy-reply-threshold] 1]
|
|
r config set busy-reply-threshold $busy_time_limit
|
|
|
|
# trigger slow operation
|
|
r set_slow_bg_operation 1
|
|
r hset hash foo bar
|
|
set rd [redis_deferring_client]
|
|
set start [clock clicks -milliseconds]
|
|
$rd do_bg_rm_call hgetall hash
|
|
|
|
# send another command after the blocked one, to make sure we don't attempt to process it
|
|
$rd ping
|
|
$rd flush
|
|
|
|
# wait till we know we're blocked inside the module
|
|
wait_for_condition 50 100 {
|
|
[r is_in_slow_bg_operation] eq 1
|
|
} else {
|
|
fail "Failed waiting for slow operation to start"
|
|
}
|
|
|
|
# make sure we get BUSY error, and that we didn't get here too early
|
|
assert_error {*BUSY Slow module operation*} {r ping}
|
|
assert_morethan_equal [expr [clock clicks -milliseconds]-$start] $busy_time_limit
|
|
# abort the blocking operation
|
|
r set_slow_bg_operation 0
|
|
|
|
wait_for_condition 50 100 {
|
|
[r is_in_slow_bg_operation] eq 0
|
|
} else {
|
|
fail "Failed waiting for slow operation to stop"
|
|
}
|
|
assert_equal [r ping] {PONG}
|
|
|
|
r config set busy-reply-threshold $old_time_limit
|
|
assert_equal [$rd read] {foo bar}
|
|
assert_equal [$rd read] {PONG}
|
|
$rd close
|
|
}
|
|
|
|
test {blocked client reaches client output buffer limit} {
|
|
r hset hash big [string repeat x 50000]
|
|
r hset hash bada [string repeat x 50000]
|
|
r hset hash boom [string repeat x 50000]
|
|
r config set client-output-buffer-limit {normal 100000 0 0}
|
|
r client setname myclient
|
|
catch {r do_bg_rm_call hgetall hash} e
|
|
assert_match "*I/O error*" $e
|
|
reconnect
|
|
set clients [r client list]
|
|
assert_no_match "*name=myclient*" $clients
|
|
}
|
|
|
|
test {module client error stats} {
|
|
r config resetstat
|
|
|
|
# simple module command that replies with string error
|
|
assert_error "ERR unknown command 'hgetalllll', with args beginning with:" {r do_rm_call hgetalllll}
|
|
assert_equal [errorrstat ERR r] {count=1}
|
|
|
|
# simple module command that replies with string error
|
|
assert_error "ERR unknown subcommand 'bla'. Try CONFIG HELP." {r do_rm_call config bla}
|
|
assert_equal [errorrstat ERR r] {count=2}
|
|
|
|
# module command that replies with string error from bg thread
|
|
assert_error "NULL reply returned" {r do_bg_rm_call hgetalllll}
|
|
assert_equal [errorrstat NULL r] {count=1}
|
|
|
|
# module command that returns an arity error
|
|
r do_rm_call set x x
|
|
assert_error "ERR wrong number of arguments for 'do_rm_call' command" {r do_rm_call}
|
|
assert_equal [errorrstat ERR r] {count=3}
|
|
|
|
# RM_Call that propagates an error
|
|
assert_error "WRONGTYPE*" {r do_rm_call hgetall x}
|
|
assert_equal [errorrstat WRONGTYPE r] {count=1}
|
|
assert_match {*calls=1,*,rejected_calls=0,failed_calls=1} [cmdrstat hgetall r]
|
|
|
|
# RM_Call from bg thread that propagates an error
|
|
assert_error "WRONGTYPE*" {r do_bg_rm_call hgetall x}
|
|
assert_equal [errorrstat WRONGTYPE r] {count=2}
|
|
assert_match {*calls=2,*,rejected_calls=0,failed_calls=2} [cmdrstat hgetall r]
|
|
|
|
assert_equal [s total_error_replies] 6
|
|
assert_match {*calls=5,*,rejected_calls=0,failed_calls=4} [cmdrstat do_rm_call r]
|
|
assert_match {*calls=2,*,rejected_calls=0,failed_calls=2} [cmdrstat do_bg_rm_call r]
|
|
}
|
|
|
|
set master [srv 0 client]
|
|
set master_host [srv 0 host]
|
|
set master_port [srv 0 port]
|
|
start_server [list overrides [list loadmodule "$testmodule"]] {
|
|
set replica [srv 0 client]
|
|
set replica_host [srv 0 host]
|
|
set replica_port [srv 0 port]
|
|
|
|
# Start the replication process...
|
|
$replica replicaof $master_host $master_port
|
|
wait_for_sync $replica
|
|
|
|
test {WAIT command on module blocked client} {
|
|
pause_process [srv 0 pid]
|
|
|
|
$master do_bg_rm_call_format ! hset bk1 foo bar
|
|
|
|
assert_equal [$master wait 1 1000] 0
|
|
resume_process [srv 0 pid]
|
|
assert_equal [$master wait 1 1000] 1
|
|
assert_equal [$replica hget bk1 foo] bar
|
|
}
|
|
}
|
|
|
|
test {Unblock by timer} {
|
|
# When the client is unlock, we will get the OK reply.
|
|
assert_match "OK" [r unblock_by_timer 100 0]
|
|
}
|
|
|
|
test {block time is shorter than timer period} {
|
|
# This command does not have the reply.
|
|
set rd [redis_deferring_client]
|
|
$rd unblock_by_timer 100 10
|
|
# Wait for the client to unlock.
|
|
after 120
|
|
$rd close
|
|
}
|
|
|
|
test {block time is equal to timer period} {
|
|
# These time is equal, they will be unlocked in the same event loop,
|
|
# when the client is unlock, we will get the OK reply from timer.
|
|
assert_match "OK" [r unblock_by_timer 100 100]
|
|
}
|
|
|
|
test "Unload the module - blockedclient" {
|
|
assert_equal {OK} [r module unload blockedclient]
|
|
}
|
|
}
|