mirror of
https://codeberg.org/ziglang/zig.git
synced 2026-04-26 13:01:34 +03:00
std.http.Client: make mutexes cancelable (#31880)
Makes most of the mutexes cancelable with the exception of the ones in deinit functions. I also fixed the compilation errors with `ConnectionPool.resize`. Reviewed-on: https://codeberg.org/ziglang/zig/pulls/31880 Reviewed-by: Andrew Kelley <andrew@ziglang.org> Co-authored-by: glowsquid <sachabarsayuracko@gmail.com> Co-committed-by: glowsquid <sachabarsayuracko@gmail.com>
This commit is contained in:
+21
-24
@@ -4,8 +4,6 @@
|
||||
//!
|
||||
//! TLS support may be disabled via `std.options.http_disable_tls`.
|
||||
//!
|
||||
//! TODO all the lockUncancelable in this file should be changed to regular lock and
|
||||
//! `error.Canceled` added to more error sets.
|
||||
const Client = @This();
|
||||
|
||||
const builtin = @import("builtin");
|
||||
@@ -84,8 +82,8 @@ pub const ConnectionPool = struct {
|
||||
/// If no connection is found, null is returned.
|
||||
///
|
||||
/// Threadsafe.
|
||||
pub fn findConnection(pool: *ConnectionPool, io: Io, criteria: Criteria) ?*Connection {
|
||||
pool.mutex.lockUncancelable(io);
|
||||
pub fn findConnection(pool: *ConnectionPool, io: Io, criteria: Criteria) Io.Cancelable!?*Connection {
|
||||
try pool.mutex.lock(io);
|
||||
defer pool.mutex.unlock(io);
|
||||
|
||||
var next = pool.free.last;
|
||||
@@ -113,8 +111,8 @@ pub const ConnectionPool = struct {
|
||||
}
|
||||
|
||||
/// Acquires an existing connection from the connection pool. This function is threadsafe.
|
||||
pub fn acquire(pool: *ConnectionPool, io: Io, connection: *Connection) void {
|
||||
pool.mutex.lockUncancelable(io);
|
||||
pub fn acquire(pool: *ConnectionPool, io: Io, connection: *Connection) Io.Cancelable!void {
|
||||
try pool.mutex.lock(io);
|
||||
defer pool.mutex.unlock(io);
|
||||
|
||||
return pool.acquireUnsafe(connection);
|
||||
@@ -150,8 +148,8 @@ pub const ConnectionPool = struct {
|
||||
}
|
||||
|
||||
/// Adds a newly created node to the pool of used connections. This function is threadsafe.
|
||||
pub fn addUsed(pool: *ConnectionPool, io: Io, connection: *Connection) void {
|
||||
pool.mutex.lockUncancelable(io);
|
||||
pub fn addUsed(pool: *ConnectionPool, io: Io, connection: *Connection) Io.Cancelable!void {
|
||||
try pool.mutex.lock(io);
|
||||
defer pool.mutex.unlock(io);
|
||||
|
||||
pool.used.append(&connection.pool_node);
|
||||
@@ -162,18 +160,15 @@ pub const ConnectionPool = struct {
|
||||
/// If the new size is smaller than the current size, then idle connections will be closed until the pool is the new size.
|
||||
///
|
||||
/// Threadsafe.
|
||||
pub fn resize(pool: *ConnectionPool, io: Io, allocator: Allocator, new_size: usize) void {
|
||||
pool.mutex.lockUncancelable(io);
|
||||
pub fn resize(pool: *ConnectionPool, io: Io, new_size: usize) Io.Cancelable!void {
|
||||
try pool.mutex.lock(io);
|
||||
defer pool.mutex.unlock(io);
|
||||
|
||||
const next = pool.free.first;
|
||||
_ = next;
|
||||
while (pool.free_len > new_size) {
|
||||
const popped = pool.free.popFirst() orelse unreachable;
|
||||
const popped: *Connection = @alignCast(@fieldParentPtr("pool_node", pool.free.popFirst().?));
|
||||
pool.free_len -= 1;
|
||||
|
||||
popped.data.close(allocator);
|
||||
allocator.destroy(popped);
|
||||
popped.destroy(io);
|
||||
}
|
||||
|
||||
pool.free_size = new_size;
|
||||
@@ -1323,7 +1318,7 @@ pub fn initDefaultProxies(client: *Client, arena: Allocator, environ_map: *const
|
||||
const io = client.io;
|
||||
|
||||
// Prevent any new connections from being created.
|
||||
client.connection_pool.mutex.lockUncancelable(io);
|
||||
try client.connection_pool.mutex.lock(io);
|
||||
defer client.connection_pool.mutex.unlock(io);
|
||||
|
||||
assert(client.connection_pool.used.first == null); // There are active requests.
|
||||
@@ -1418,7 +1413,7 @@ pub const basic_authorization = struct {
|
||||
|
||||
pub const ConnectTcpError = error{
|
||||
TlsInitializationFailed,
|
||||
} || Allocator.Error || HostName.ConnectError;
|
||||
} || Allocator.Error || HostName.ConnectError || Io.Cancelable;
|
||||
|
||||
/// Reuses a `Connection` if one matching `host` and `port` is already open.
|
||||
///
|
||||
@@ -1451,7 +1446,7 @@ pub fn connectTcpOptions(client: *Client, options: ConnectTcpOptions) ConnectTcp
|
||||
const proxied_host = options.proxied_host orelse host;
|
||||
const proxied_port = options.proxied_port orelse port;
|
||||
|
||||
if (client.connection_pool.findConnection(io, .{
|
||||
if (try client.connection_pool.findConnection(io, .{
|
||||
.host = proxied_host,
|
||||
.port = proxied_port,
|
||||
.protocol = protocol,
|
||||
@@ -1469,18 +1464,20 @@ pub fn connectTcpOptions(client: *Client, options: ConnectTcpOptions) ConnectTcp
|
||||
error.Canceled => |e| return e,
|
||||
else => return error.TlsInitializationFailed,
|
||||
};
|
||||
client.connection_pool.addUsed(io, &tc.connection);
|
||||
errdefer tc.destroy();
|
||||
try client.connection_pool.addUsed(io, &tc.connection);
|
||||
return &tc.connection;
|
||||
},
|
||||
.plain => {
|
||||
const pc = try Connection.Plain.create(client, proxied_host, proxied_port, stream);
|
||||
client.connection_pool.addUsed(io, &pc.connection);
|
||||
errdefer pc.destroy();
|
||||
try client.connection_pool.addUsed(io, &pc.connection);
|
||||
return &pc.connection;
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub const ConnectUnixError = Allocator.Error || std.posix.SocketError || error{NameTooLong} || std.posix.ConnectError;
|
||||
pub const ConnectUnixError = Allocator.Error || std.posix.SocketError || error{NameTooLong} || std.posix.ConnectError || Io.Cancelable;
|
||||
|
||||
/// Connect to `path` as a unix domain socket. This will reuse a connection if one is already open.
|
||||
///
|
||||
@@ -1488,7 +1485,7 @@ pub const ConnectUnixError = Allocator.Error || std.posix.SocketError || error{N
|
||||
pub fn connectUnix(client: *Client, path: []const u8) ConnectUnixError!*Connection {
|
||||
const io = client.io;
|
||||
|
||||
if (client.connection_pool.findConnection(io, .{
|
||||
if (try client.connection_pool.findConnection(io, .{
|
||||
.host = path,
|
||||
.port = 0,
|
||||
.protocol = .plain,
|
||||
@@ -1512,7 +1509,7 @@ pub fn connectUnix(client: *Client, path: []const u8) ConnectUnixError!*Connecti
|
||||
};
|
||||
errdefer client.allocator.free(conn.data.host);
|
||||
|
||||
client.connection_pool.addUsed(conn);
|
||||
try client.connection_pool.addUsed(conn);
|
||||
|
||||
return &conn.data;
|
||||
}
|
||||
@@ -1530,7 +1527,7 @@ pub fn connectProxied(
|
||||
const io = client.io;
|
||||
if (!proxy.supports_connect) return error.TunnelNotSupported;
|
||||
|
||||
if (client.connection_pool.findConnection(io, .{
|
||||
if (try client.connection_pool.findConnection(io, .{
|
||||
.host = proxied_host,
|
||||
.port = proxied_port,
|
||||
.protocol = proxy.protocol,
|
||||
|
||||
Reference in New Issue
Block a user