Unverified Commit 0d9ffe56 authored by Eugen Rochko's avatar Eugen Rochko Committed by GitHub
Browse files

Add request pool to improve delivery performance (#10353)

* Add request pool to improve delivery performance

Fix #7909

* Ensure connection is closed when exception interrupts execution

* Remove Timeout#timeout from socket connection

* Fix infinite retrial loop on HTTP::ConnectionError

* Close sockets on failure, reduce idle time to 90 seconds

* Add MAX_REQUEST_POOL_SIZE option to limit concurrent connections to the same server

* Use a shared pool size, 512 by default, to stay below open file limit

* Add some tests

* Add more tests

* Reduce MAX_IDLE_TIME from 90 to 30 seconds, reap every 30 seconds

* Use a shared pool that returns preferred connection but re-purposes other ones when needed

* Fix wrong connection being returned on subsequent calls within the same thread

* Reduce mutex calls on flushes from 2 to 1 and add test for reaping
parent 2cfa427e
......@@ -148,3 +148,4 @@ group :production do
end
gem 'concurrent-ruby', require: false
gem 'connection_pool', require: false
......@@ -666,6 +666,7 @@ DEPENDENCIES
cld3 (~> 3.2.4)
climate_control (~> 0.2)
concurrent-ruby
connection_pool
derailed_benchmarks
devise (~> 4.6)
devise-two-factor (~> 3.0)
......
# frozen_string_literal: true
require 'connection_pool'
require_relative './shared_timed_stack'
class ConnectionPool::SharedConnectionPool < ConnectionPool
def initialize(options = {}, &block)
super(options, &block)
@available = ConnectionPool::SharedTimedStack.new(@size, &block)
end
delegate :size, :flush, to: :@available
def with(preferred_tag, options = {})
Thread.handle_interrupt(Exception => :never) do
conn = checkout(preferred_tag, options)
begin
Thread.handle_interrupt(Exception => :immediate) do
yield conn
end
ensure
checkin(preferred_tag)
end
end
end
def checkout(preferred_tag, options = {})
if ::Thread.current[key(preferred_tag)]
::Thread.current[key_count(preferred_tag)] += 1
::Thread.current[key(preferred_tag)]
else
::Thread.current[key_count(preferred_tag)] = 1
::Thread.current[key(preferred_tag)] = @available.pop(preferred_tag, options[:timeout] || @timeout)
end
end
def checkin(preferred_tag)
if ::Thread.current[key(preferred_tag)]
if ::Thread.current[key_count(preferred_tag)] == 1
@available.push(::Thread.current[key(preferred_tag)])
::Thread.current[key(preferred_tag)] = nil
else
::Thread.current[key_count(preferred_tag)] -= 1
end
else
raise ConnectionPool::Error, 'no connections are checked out'
end
nil
end
private
def key(tag)
:"#{@key}-#{tag}"
end
def key_count(tag)
:"#{@key_count}-#{tag}"
end
end
# frozen_string_literal: true
class ConnectionPool::SharedTimedStack
def initialize(max = 0, &block)
@create_block = block
@max = max
@created = 0
@queue = []
@tagged_queue = Hash.new { |hash, key| hash[key] = [] }
@mutex = Mutex.new
@resource = ConditionVariable.new
end
def push(connection)
@mutex.synchronize do
store_connection(connection)
@resource.broadcast
end
end
alias << push
def pop(preferred_tag, timeout = 5.0)
deadline = current_time + timeout
@mutex.synchronize do
loop do
return fetch_preferred_connection(preferred_tag) unless @tagged_queue[preferred_tag].empty?
connection = try_create(preferred_tag)
return connection if connection
to_wait = deadline - current_time
raise Timeout::Error, "Waited #{timeout} sec" if to_wait <= 0
@resource.wait(@mutex, to_wait)
end
end
end
def empty?
size.zero?
end
def size
@mutex.synchronize do
@queue.size
end
end
def flush
@mutex.synchronize do
@queue.delete_if do |connection|
delete = !connection.in_use && (connection.dead || connection.seconds_idle >= RequestPool::MAX_IDLE_TIME)
if delete
@tagged_queue[connection.site].delete(connection)
connection.close
@created -= 1
end
delete
end
end
end
private
def try_create(preferred_tag)
if @created == @max && !@queue.empty?
throw_away_connection = @queue.pop
@tagged_queue[throw_away_connection.site].delete(throw_away_connection)
@create_block.call(preferred_tag)
elsif @created != @max
connection = @create_block.call(preferred_tag)
@created += 1
connection
end
end
def fetch_preferred_connection(preferred_tag)
connection = @tagged_queue[preferred_tag].pop
@queue.delete(connection)
connection
end
def current_time
Process.clock_gettime(Process::CLOCK_MONOTONIC)
end
def store_connection(connection)
@tagged_queue[connection.site].push(connection)
@queue.push(connection)
end
end
......@@ -17,15 +17,21 @@ end
class Request
REQUEST_TARGET = '(request-target)'
# We enforce a 5s timeout on DNS resolving, 5s timeout on socket opening
# and 5s timeout on the TLS handshake, meaning the worst case should take
# about 15s in total
TIMEOUT = { connect: 5, read: 10, write: 10 }.freeze
include RoutingHelper
def initialize(verb, url, **options)
raise ArgumentError if url.blank?
@verb = verb
@url = Addressable::URI.parse(url).normalize
@options = options.merge(use_proxy? ? Rails.configuration.x.http_client_proxy : { socket_class: Socket })
@headers = {}
@verb = verb
@url = Addressable::URI.parse(url).normalize
@http_client = options.delete(:http_client)
@options = options.merge(use_proxy? ? Rails.configuration.x.http_client_proxy : { socket_class: Socket })
@headers = {}
raise Mastodon::HostValidationError, 'Instance does not support hidden service connections' if block_hidden_service?
......@@ -50,15 +56,24 @@ class Request
def perform
begin
response = http_client.headers(headers).public_send(@verb, @url.to_s, @options)
response = http_client.public_send(@verb, @url.to_s, @options.merge(headers: headers))
rescue => e
raise e.class, "#{e.message} on #{@url}", e.backtrace[0]
end
begin
yield response.extend(ClientLimit) if block_given?
response = response.extend(ClientLimit)
# If we are using a persistent connection, we have to
# read every response to be able to move forward at all.
# However, simply calling #to_s or #flush may not be safe,
# as the response body, if malicious, could be too big
# for our memory. So we use the #body_with_limit method
response.body_with_limit if http_client.persistent?
yield response if block_given?
ensure
http_client.close
http_client.close unless http_client.persistent?
end
end
......@@ -76,6 +91,10 @@ class Request
%w(http https).include?(parsed_url.scheme) && parsed_url.host.present?
end
def http_client
HTTP.use(:auto_inflate).timeout(:per_operation, TIMEOUT.dup).follow(max_hops: 2)
end
end
private
......@@ -116,16 +135,8 @@ class Request
end
end
def timeout
# We enforce a 1s timeout on DNS resolving, 10s timeout on socket opening
# and 5s timeout on the TLS handshake, meaning the worst case should take
# about 16s in total
{ connect: 5, read: 10, write: 10 }
end
def http_client
@http_client ||= HTTP.use(:auto_inflate).timeout(:per_operation, timeout).follow(max_hops: 2)
@http_client ||= Request.http_client
end
def use_proxy?
......@@ -169,20 +180,41 @@ class Request
return super(host, *args) if thru_hidden_service?(host)
outer_e = nil
port = args.first
Resolv::DNS.open do |dns|
dns.timeouts = 5
addresses = dns.getaddresses(host).take(2)
time_slot = 10.0 / addresses.size
addresses.each do |address|
begin
raise Mastodon::HostValidationError if PrivateAddressCheck.private_address?(IPAddr.new(address.to_s))
::Timeout.timeout(time_slot, HTTP::TimeoutError) do
return super(address.to_s, *args)
sock = ::Socket.new(::Socket::AF_INET, ::Socket::SOCK_STREAM, 0)
sockaddr = ::Socket.pack_sockaddr_in(port, address.to_s)
sock.setsockopt(::Socket::IPPROTO_TCP, ::Socket::TCP_NODELAY, 1)
begin
sock.connect_nonblock(sockaddr)
rescue IO::WaitWritable
if IO.select(nil, [sock], nil, Request::TIMEOUT[:connect])
begin
sock.connect_nonblock(sockaddr)
rescue Errno::EISCONN
# Yippee!
rescue
sock.close
raise
end
else
sock.close
raise HTTP::TimeoutError, "Connect timed out after #{Request::TIMEOUT[:connect]} seconds"
end
end
return sock
rescue => e
outer_e = e
end
......
# frozen_string_literal: true
require_relative './connection_pool/shared_connection_pool'
class RequestPool
def self.current
@current ||= RequestPool.new
end
class Reaper
attr_reader :pool, :frequency
def initialize(pool, frequency)
@pool = pool
@frequency = frequency
end
def run
return unless frequency&.positive?
Thread.new(frequency, pool) do |t, p|
loop do
sleep t
p.flush
end
end
end
end
MAX_IDLE_TIME = 30
WAIT_TIMEOUT = 5
MAX_POOL_SIZE = ENV.fetch('MAX_REQUEST_POOL_SIZE', 512).to_i
class Connection
attr_reader :site, :last_used_at, :created_at, :in_use, :dead, :fresh
def initialize(site)
@site = site
@http_client = http_client
@last_used_at = nil
@created_at = current_time
@dead = false
@fresh = true
end
def use
@last_used_at = current_time
@in_use = true
retries = 0
begin
yield @http_client
rescue HTTP::ConnectionError
# It's possible the connection was closed, so let's
# try re-opening it once
close
if @fresh || retries.positive?
raise
else
@http_client = http_client
retries += 1
retry
end
rescue StandardError
# If this connection raises errors of any kind, it's
# better if it gets reaped as soon as possible
close
@dead = true
raise
end
ensure
@fresh = false
@in_use = false
end
def seconds_idle
current_time - (@last_used_at || @created_at)
end
def close
@http_client.close
end
private
def http_client
Request.http_client.persistent(@site, timeout: MAX_IDLE_TIME)
end
def current_time
Process.clock_gettime(Process::CLOCK_MONOTONIC)
end
end
def initialize
@pool = ConnectionPool::SharedConnectionPool.new(size: MAX_POOL_SIZE, timeout: WAIT_TIMEOUT) { |site| Connection.new(site) }
@reaper = Reaper.new(self, 30)
@reaper.run
end
def with(site, &block)
@pool.with(site) do |connection|
ActiveSupport::Notifications.instrument('with.request_pool', miss: connection.fresh, host: connection.site) do
connection.use(&block)
end
end
end
delegate :size, :flush, to: :@pool
end
......@@ -17,6 +17,7 @@ class ActivityPub::DeliveryWorker
@json = json
@source_account = Account.find(source_account_id)
@inbox_url = inbox_url
@host = Addressable::URI.parse(inbox_url).normalized_site
perform_request
......@@ -28,16 +29,18 @@ class ActivityPub::DeliveryWorker
private
def build_request
request = Request.new(:post, @inbox_url, body: @json)
def build_request(http_client)
request = Request.new(:post, @inbox_url, body: @json, http_client: http_client)
request.on_behalf_of(@source_account, :uri, sign_with: @options[:sign_with])
request.add_headers(HEADERS)
end
def perform_request
light = Stoplight(@inbox_url) do
build_request.perform do |response|
raise Mastodon::UnexpectedResponseError, response unless response_successful?(response) || response_error_unsalvageable?(response)
request_pool.with(@host) do |http_client|
build_request(http_client).perform do |response|
raise Mastodon::UnexpectedResponseError, response unless response_successful?(response) || response_error_unsalvageable?(response)
end
end
end
......@@ -57,4 +60,8 @@ class ActivityPub::DeliveryWorker
def failure_tracker
@failure_tracker ||= DeliveryFailureTracker.new(@inbox_url)
end
def request_pool
RequestPool.current
end
end
# frozen_string_literal: true
require 'rails_helper'
describe ConnectionPool::SharedConnectionPool do
class MiniConnection
attr_reader :site
def initialize(site)
@site = site
end
end
subject { described_class.new(size: 5, timeout: 5) { |site| MiniConnection.new(site) } }
describe '#with' do
it 'runs a block with a connection' do
block_run = false
subject.with('foo') do |connection|
expect(connection).to be_a MiniConnection
block_run = true
end
expect(block_run).to be true
end
end
end
# frozen_string_literal: true
require 'rails_helper'
describe ConnectionPool::SharedTimedStack do
class MiniConnection
attr_reader :site
def initialize(site)
@site = site
end
end
subject { described_class.new(5) { |site| MiniConnection.new(site) } }
describe '#push' do
it 'keeps the connection in the stack' do
subject.push(MiniConnection.new('foo'))
expect(subject.size).to eq 1
end
end
describe '#pop' do
it 'returns a connection' do
expect(subject.pop('foo')).to be_a MiniConnection
end
it 'returns the same connection that was pushed in' do
connection = MiniConnection.new('foo')
subject.push(connection)
expect(subject.pop('foo')).to be connection
end
it 'does not create more than maximum amount of connections' do
expect { 6.times { subject.pop('foo', 0) } }.to raise_error Timeout::Error
end
it 'repurposes a connection for a different site when maximum amount is reached' do
5.times { subject.push(MiniConnection.new('foo')) }
expect(subject.pop('bar')).to be_a MiniConnection
end
end
describe '#empty?' do
it 'returns true when no connections on the stack' do
expect(subject.empty?).to be true
end
it 'returns false when there are connections on the stack' do
subject.push(MiniConnection.new('foo'))
expect(subject.empty?).to be false
end
end
describe '#size' do
it 'returns the number of connections on the stack' do
2.times { subject.push(MiniConnection.new('foo')) }
expect(subject.size).to eq 2
end
end
end
# frozen_string_literal: true
require 'rails_helper'
describe RequestPool do
subject { described_class.new }
describe '#with' do
it 'returns a HTTP client for a host' do
subject.with('http://example.com') do |http_client|
expect(http_client).to be_a HTTP::Client
end
end
it 'returns the same instance of HTTP client within the same thread for the same host' do
test_client = nil
subject.with('http://example.com') { |http_client| test_client = http_client }
expect(test_client).to_not be_nil
subject.with('http://example.com') { |http_client| expect(http_client).to be test_client }
end
it 'returns different HTTP clients for different hosts' do
test_client = nil
subject.with('http://example.com') { |http_client| test_client = http_client }
expect(test_client).to_not be_nil
subject.with('http://example.org') { |http_client| expect(http_client).to_not be test_client }
end
it 'grows to the number of threads accessing it' do
stub_request(:get, 'http://example.com/').to_return(status: 200, body: 'Hello!')
subject
threads = 20.times.map do |i|
Thread.new do
20.times do
subject.with('http://example.com') do |http_client|
http_client.get('/').flush
end
end
end
end
threads.map(&:join)
expect(subject.size).to be > 1
end
it 'closes idle connections' do
stub_request(:get, 'http://example.com/').to_return(status: 200, body: 'Hello!')
subject.with('http://example.com') do |http_client|
http_client.get('/').flush
end
expect(subject.size).to eq 1
sleep RequestPool::MAX_IDLE_TIME + 30 + 1
expect(subject.size).to eq 0
end
end
end