Move to solid_queue

This commit is contained in:
Eugene Burmakin
2025-05-17 23:05:52 +02:00
parent 723ccffa5a
commit 35a0533b2b
20 changed files with 362 additions and 45 deletions

View File

@@ -20,6 +20,7 @@ gem 'httparty'
gem 'importmap-rails'
gem 'kaminari'
gem 'lograge'
gem 'mission_control-jobs'
gem 'oj'
gem 'pg'
gem 'prometheus_exporter'
@@ -42,6 +43,8 @@ gem 'sidekiq-limit_fetch'
gem 'sprockets-rails'
gem 'stimulus-rails'
gem 'strong_migrations'
gem 'solid_cable', '~> 3.0'
gem 'solid_queue', '~> 1.1'
gem 'tailwindcss-rails'
gem 'turbo-rails'
gem 'tzinfo-data', platforms: %i[mingw mswin x64_mingw jruby]

View File

@@ -218,6 +218,16 @@ GEM
mini_mime (1.1.5)
mini_portile2 (2.8.8)
minitest (5.25.5)
mission_control-jobs (1.0.2)
actioncable (>= 7.1)
actionpack (>= 7.1)
activejob (>= 7.1)
activerecord (>= 7.1)
importmap-rails (>= 1.2.1)
irb (~> 1.13)
railties (>= 7.1)
stimulus-rails
turbo-rails
msgpack (1.7.3)
multi_json (1.15.0)
multi_xml (0.7.1)
@@ -420,6 +430,18 @@ GEM
simplecov_json_formatter (~> 0.1)
simplecov-html (0.13.1)
simplecov_json_formatter (0.1.4)
solid_cable (3.0.8)
actioncable (>= 7.2)
activejob (>= 7.2)
activerecord (>= 7.2)
railties (>= 7.2)
solid_queue (1.1.5)
activejob (>= 7.1)
activerecord (>= 7.1)
concurrent-ruby (>= 1.3.1)
fugit (~> 1.11.0)
railties (>= 7.1)
thor (~> 1.3.1)
sprockets (4.2.1)
concurrent-ruby (~> 1.0)
rack (>= 2.2.4, < 4)
@@ -505,6 +527,7 @@ DEPENDENCIES
jwt
kaminari
lograge
mission_control-jobs
oj
pg
prometheus_exporter
@@ -530,6 +553,8 @@ DEPENDENCIES
sidekiq-cron
sidekiq-limit_fetch
simplecov
solid_cable (~> 3.0)
solid_queue (~> 1.1)
sprockets-rails
stackprof
stimulus-rails

View File

@@ -1,7 +1,11 @@
# frozen_string_literal: true
class ApplicationJob < ActiveJob::Base
# Automatically retry jobs that encountered a deadlock
# retry_on ActiveRecord::Deadlocked
retry_on Exception, wait: :polynomially_longer, attempts: 25
# Most jobs are safe to ignore if the underlying records are no longer available
# discard_on ActiveJob::DeserializationError
end

View File

@@ -0,0 +1,9 @@
# frozen_string_literal: true
class Jobs::CleanFinishedJob < ApplicationJob
queue_as :default
def perform
SolidQueue::Job.clear_finished_in_batches
end
end

6
bin/jobs Executable file
View File

@@ -0,0 +1,6 @@
#!/usr/bin/env ruby
require_relative "../config/environment"
require "solid_queue/cli"
SolidQueue::Cli.start(ARGV)

View File

@@ -1,11 +1,23 @@
# Async adapter only works within the same process, so for manually triggering cable updates from a console,
# and seeing results in the browser, you must do so from the web console (running inside the dev process),
# not a terminal started via bin/rails console! Add "console" to any action or any ERB template view
# to make the web console appear.
development:
adapter: redis
url: <%= ENV['REDIS_URL'] %>
adapter: solid_cable
connects_to:
database:
writing: cable
polling_interval: 0.1.seconds
message_retention: 1.day
test:
adapter: test
production:
adapter: redis
url: <%= ENV.fetch("REDIS_URL") { "redis://localhost:6379/1" } %>
channel_prefix: dawarich_production
adapter: solid_cable
connects_to:
database:
writing: cable
polling_interval: 0.1.seconds
message_retention: 1.day

View File

@@ -10,17 +10,53 @@ default: &default
timeout: 5000
development:
<<: *default
database: <%= ENV['DATABASE_NAME'] || 'dawarich_development' %>
primary:
<<: *default
database: <%= ENV['DATABASE_NAME'] || 'dawarich_development' %>
queue:
<<: *default
database: <%= ENV['DATABASE_QUEUE_NAME'] || 'dawarich_development_queue' %>
migrations_paths: db/queue_migrate
cable:
<<: *default
database: <%= ENV['DATABASE_CABLE_NAME'] || 'dawarich_development_cable' %>
migrations_paths: db/cable_migrate
test:
<<: *default
database: <%= ENV['DATABASE_NAME'] || 'dawarich_test' %>
primary:
<<: *default
database: <%= ENV['DATABASE_NAME'] || 'dawarich_test' %>
queue:
<<: *default
database: <%= ENV['DATABASE_QUEUE_NAME'] || 'dawarich_test_queue' %>
migrations_paths: db/queue_migrate
cable:
<<: *default
database: <%= ENV['DATABASE_CABLE_NAME'] || 'dawarich_test_cable' %>
migrations_paths: db/cable_migrate
production:
<<: *default
database: <%= ENV['DATABASE_NAME'] || 'dawarich_production' %>
primary:
<<: *default
database: <%= ENV['DATABASE_NAME'] || 'dawarich_production' %>
queue:
<<: *default
database: <%= ENV['DATABASE_QUEUE_NAME'] || 'dawarich_production_queue' %>
migrations_paths: db/queue_migrate
cable:
<<: *default
database: <%= ENV['DATABASE_CABLE_NAME'] || 'dawarich_production_cable' %>
migrations_paths: db/cable_migrate
staging:
<<: *default
database: <%= ENV['DATABASE_NAME'] || 'dawarich_staging' %>
primary:
<<: *default
database: <%= ENV['DATABASE_NAME'] || 'dawarich_staging' %>
queue:
<<: *default
database: <%= ENV['DATABASE_QUEUE_NAME'] || 'dawarich_staging_queue' %>
migrations_paths: db/queue_migrate
cable:
<<: *default
database: <%= ENV['DATABASE_CABLE_NAME'] || 'dawarich_staging_cable' %>
migrations_paths: db/cable_migrate

View File

@@ -68,6 +68,14 @@ Rails.application.configure do
# Highlight code that enqueued background job in logs.
config.active_job.verbose_enqueue_logs = true
config.active_job.queue_adapter = :solid_queue
config.solid_queue.silence_polling = true
# :queue is the name of the database connection
config.solid_queue.connects_to = { database: { writing: :queue } }
config.mission_control.jobs.http_basic_auth_enabled = false
config.solid_queue.logger = ActiveSupport::Logger.new($stdout)
# Suppress logger output for asset requests.
config.assets.quiet = true
@@ -95,7 +103,7 @@ Rails.application.configure do
config.force_ssl = ENV.fetch('APPLICATION_PROTOCOL', 'http').downcase == 'https'
# Direct logs to STDOUT
config.logger = Logger.new($stdout)
config.logger = ActiveSupport::Logger.new($stdout)
config.lograge.enabled = true
config.lograge.formatter = Lograge::Formatters::Json.new

View File

@@ -60,7 +60,7 @@ Rails.application.configure do
config.force_ssl = ENV.fetch('APPLICATION_PROTOCOL', 'http').downcase == 'https'
# Direct logs to STDOUT
config.logger = Logger.new($stdout)
config.logger = ActiveSupport::Logger.new($stdout)
config.lograge.enabled = true
config.lograge.formatter = Lograge::Formatters::Json.new
@@ -77,7 +77,10 @@ Rails.application.configure do
config.cache_store = :redis_cache_store, { url: ENV['REDIS_URL'] }
# Use a real queuing backend for Active Job (and separate queues per environment).
# config.active_job.queue_adapter = :resque
config.active_job.queue_adapter = :solid_queue
config.solid_queue.connects_to = { database: { writing: :queue } }
config.solid_queue.silence_polling = true
config.solid_queue.logger = ActiveSupport::Logger.new($stdout)
# config.active_job.queue_name_prefix = "dawarich_production"
config.action_mailer.perform_caching = false

View File

@@ -43,6 +43,9 @@ preload_app!
# Allow puma to be restarted by `bin/rails restart` command.
plugin :tmp_restart
# If env var is set or we're in development, solid_queue will run in puma
plugin :solid_queue if ENV['SOLID_QUEUE_IN_PUMA'] || Rails.env.development?
# Prometheus exporter
if ENV['PROMETHEUS_EXPORTER_ENABLED'].to_s == 'true'
require 'prometheus_exporter/instrumentation'

27
config/queue.yml Normal file
View File

@@ -0,0 +1,27 @@
default: &default
dispatchers:
- polling_interval: 1
batch_size: 500
workers:
- queues: "*"
threads: 3
processes: <%= ENV.fetch("JOB_CONCURRENCY", 1) %>
polling_interval: 2
- queues: imports
threads: 5
processes: 1
polling_interval: 1
- queues: exports
threads: 5
processes: 1
polling_interval: 2
development:
<<: *default
test:
<<: *default
production:
<<: *default

34
config/recurring.yml Normal file
View File

@@ -0,0 +1,34 @@
periodic_cleanup:
class: "Jobs::CleanFinishedJob"
queue: default
schedule: every month
bulk_stats_calculating_job:
class: "BulkStatsCalculatingJob"
queue: stats
schedule: every hour
area_visits_calculation_scheduling_job:
class: "AreaVisitsCalculationSchedulingJob"
queue: visit_suggesting
schedule: every day at 0:00
visit_suggesting_job:
class: "BulkVisitsSuggestingJob"
queue: visit_suggesting
schedule: every day at 00:05
watcher_job:
class: "Import::WatcherJob"
queue: imports
schedule: every hour
app_version_checking_job:
class: "AppVersionCheckingJob"
queue: default
schedule: every 6 hours
cache_preheating_job:
class: "Cache::PreheatingJob"
queue: default
schedule: every day at 0:00

View File

@@ -6,6 +6,7 @@ Rails.application.routes.draw do
mount ActionCable.server => '/cable'
mount Rswag::Api::Engine => '/api-docs'
mount Rswag::Ui::Engine => '/api-docs'
mount MissionControl::Jobs::Engine, at: '/jobs' # Protec just as sidekiq
unless DawarichSettings.self_hosted?
Sidekiq::Web.use(Rack::Auth::Basic) do |username, password|

26
db/cable_schema.rb Normal file
View File

@@ -0,0 +1,26 @@
# This file is auto-generated from the current state of the database. Instead
# of editing this file, please use the migrations feature of Active Record to
# incrementally modify your database, and then regenerate this schema definition.
#
# This file is the source Rails uses to define your schema when running `bin/rails
# db:schema:load`. When creating a new database, `bin/rails db:schema:load` tends to
# be faster and is potentially less error prone than running all of your
# migrations from scratch. Old migrations may fail to apply correctly if those
# migrations use external dependencies or application code.
#
# It's strongly recommended that you check this file into your version control system.
ActiveRecord::Schema[8.0].define(version: 1) do
# These are extensions that must be enabled in order to support this database
enable_extension "pg_catalog.plpgsql"
create_table "solid_cable_messages", force: :cascade do |t|
t.binary "channel", null: false
t.binary "payload", null: false
t.datetime "created_at", null: false
t.bigint "channel_hash", null: false
t.index ["channel"], name: "index_solid_cable_messages_on_channel"
t.index ["channel_hash"], name: "index_solid_cable_messages_on_channel_hash"
t.index ["created_at"], name: "index_solid_cable_messages_on_created_at"
end
end

144
db/queue_schema.rb Normal file
View File

@@ -0,0 +1,144 @@
# This file is auto-generated from the current state of the database. Instead
# of editing this file, please use the migrations feature of Active Record to
# incrementally modify your database, and then regenerate this schema definition.
#
# This file is the source Rails uses to define your schema when running `bin/rails
# db:schema:load`. When creating a new database, `bin/rails db:schema:load` tends to
# be faster and is potentially less error prone than running all of your
# migrations from scratch. Old migrations may fail to apply correctly if those
# migrations use external dependencies or application code.
#
# It's strongly recommended that you check this file into your version control system.
ActiveRecord::Schema[8.0].define(version: 1) do
# These are extensions that must be enabled in order to support this database
enable_extension "pg_catalog.plpgsql"
create_table "solid_queue_blocked_executions", force: :cascade do |t|
t.bigint "job_id", null: false
t.string "queue_name", null: false
t.integer "priority", default: 0, null: false
t.string "concurrency_key", null: false
t.datetime "expires_at", null: false
t.datetime "created_at", null: false
t.index ["concurrency_key", "priority", "job_id"], name: "index_solid_queue_blocked_executions_for_release"
t.index ["expires_at", "concurrency_key"], name: "index_solid_queue_blocked_executions_for_maintenance"
t.index ["job_id"], name: "index_solid_queue_blocked_executions_on_job_id", unique: true
end
create_table "solid_queue_claimed_executions", force: :cascade do |t|
t.bigint "job_id", null: false
t.bigint "process_id"
t.datetime "created_at", null: false
t.index ["job_id"], name: "index_solid_queue_claimed_executions_on_job_id", unique: true
t.index ["process_id", "job_id"], name: "index_solid_queue_claimed_executions_on_process_id_and_job_id"
end
create_table "solid_queue_failed_executions", force: :cascade do |t|
t.bigint "job_id", null: false
t.text "error"
t.datetime "created_at", null: false
t.index ["job_id"], name: "index_solid_queue_failed_executions_on_job_id", unique: true
end
create_table "solid_queue_jobs", force: :cascade do |t|
t.string "queue_name", null: false
t.string "class_name", null: false
t.text "arguments"
t.integer "priority", default: 0, null: false
t.string "active_job_id"
t.datetime "scheduled_at"
t.datetime "finished_at"
t.string "concurrency_key"
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.index ["active_job_id"], name: "index_solid_queue_jobs_on_active_job_id"
t.index ["class_name"], name: "index_solid_queue_jobs_on_class_name"
t.index ["finished_at"], name: "index_solid_queue_jobs_on_finished_at"
t.index ["queue_name", "finished_at"], name: "index_solid_queue_jobs_for_filtering"
t.index ["scheduled_at", "finished_at"], name: "index_solid_queue_jobs_for_alerting"
end
create_table "solid_queue_pauses", force: :cascade do |t|
t.string "queue_name", null: false
t.datetime "created_at", null: false
t.index ["queue_name"], name: "index_solid_queue_pauses_on_queue_name", unique: true
end
create_table "solid_queue_processes", force: :cascade do |t|
t.string "kind", null: false
t.datetime "last_heartbeat_at", null: false
t.bigint "supervisor_id"
t.integer "pid", null: false
t.string "hostname"
t.text "metadata"
t.datetime "created_at", null: false
t.string "name", null: false
t.index ["last_heartbeat_at"], name: "index_solid_queue_processes_on_last_heartbeat_at"
t.index ["name", "supervisor_id"], name: "index_solid_queue_processes_on_name_and_supervisor_id", unique: true
t.index ["supervisor_id"], name: "index_solid_queue_processes_on_supervisor_id"
end
create_table "solid_queue_ready_executions", force: :cascade do |t|
t.bigint "job_id", null: false
t.string "queue_name", null: false
t.integer "priority", default: 0, null: false
t.datetime "created_at", null: false
t.index ["job_id"], name: "index_solid_queue_ready_executions_on_job_id", unique: true
t.index ["priority", "job_id"], name: "index_solid_queue_poll_all"
t.index ["queue_name", "priority", "job_id"], name: "index_solid_queue_poll_by_queue"
end
create_table "solid_queue_recurring_executions", force: :cascade do |t|
t.bigint "job_id", null: false
t.string "task_key", null: false
t.datetime "run_at", null: false
t.datetime "created_at", null: false
t.index ["job_id"], name: "index_solid_queue_recurring_executions_on_job_id", unique: true
t.index ["task_key", "run_at"], name: "index_solid_queue_recurring_executions_on_task_key_and_run_at", unique: true
end
create_table "solid_queue_recurring_tasks", force: :cascade do |t|
t.string "key", null: false
t.string "schedule", null: false
t.string "command", limit: 2048
t.string "class_name"
t.text "arguments"
t.string "queue_name"
t.integer "priority", default: 0
t.boolean "static", default: true, null: false
t.text "description"
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.index ["key"], name: "index_solid_queue_recurring_tasks_on_key", unique: true
t.index ["static"], name: "index_solid_queue_recurring_tasks_on_static"
end
create_table "solid_queue_scheduled_executions", force: :cascade do |t|
t.bigint "job_id", null: false
t.string "queue_name", null: false
t.integer "priority", default: 0, null: false
t.datetime "scheduled_at", null: false
t.datetime "created_at", null: false
t.index ["job_id"], name: "index_solid_queue_scheduled_executions_on_job_id", unique: true
t.index ["scheduled_at", "priority", "job_id"], name: "index_solid_queue_dispatch_all"
end
create_table "solid_queue_semaphores", force: :cascade do |t|
t.string "key", null: false
t.integer "value", default: 1, null: false
t.datetime "expires_at", null: false
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.index ["expires_at"], name: "index_solid_queue_semaphores_on_expires_at"
t.index ["key", "value"], name: "index_solid_queue_semaphores_on_key_and_value"
t.index ["key"], name: "index_solid_queue_semaphores_on_key", unique: true
end
add_foreign_key "solid_queue_blocked_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
add_foreign_key "solid_queue_claimed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
add_foreign_key "solid_queue_failed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
add_foreign_key "solid_queue_ready_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
add_foreign_key "solid_queue_recurring_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
add_foreign_key "solid_queue_scheduled_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
end

View File

@@ -8,11 +8,9 @@ RSpec.describe AreaVisitsCalculationSchedulingJob, type: :job do
let(:user) { create(:user) }
it 'calls the AreaVisitsCalculationService' do
Sidekiq::Testing.inline! do
expect(AreaVisitsCalculatingJob).to receive(:perform_later).with(user.id).and_call_original
expect(AreaVisitsCalculatingJob).to receive(:perform_later).with(user.id).and_call_original
described_class.new.perform
end
described_class.new.perform
end
end
end

View File

@@ -52,7 +52,6 @@ RSpec.describe DataMigrations::MigratePlacesLonlatJob, type: :job do
described_class.perform_now(user.id)
place1.reload
# SRID should be 4326 (WGS84)
expect(place1.lonlat.srid).to eq(4326)
end
end
@@ -64,14 +63,6 @@ RSpec.describe DataMigrations::MigratePlacesLonlatJob, type: :job do
end.not_to raise_error
end
end
context 'when user does not exist' do
it 'raises ActiveRecord::RecordNotFound' do
expect do
described_class.perform_now(-1)
end.to raise_error(ActiveRecord::RecordNotFound)
end
end
end
describe 'queue' do

View File

@@ -63,14 +63,6 @@ RSpec.describe VisitSuggestingJob, type: :job do
end
end
context 'when user not found' do
it 'raises an error' do
expect do
described_class.perform_now(user_id: -1, start_at: start_at, end_at: end_at)
end.to raise_error(ActiveRecord::RecordNotFound)
end
end
context 'with string dates' do
let(:string_start) { start_at.to_s }
let(:string_end) { end_at.to_s }

View File

@@ -55,16 +55,12 @@ RSpec.describe Imports::Create do
context 'when import is successful' do
it 'schedules stats creating' do
Sidekiq::Testing.inline! do
expect { service.call }.to \
have_enqueued_job(Stats::CalculatingJob).with(user.id, 2024, 3)
end
expect { service.call }.to \
have_enqueued_job(Stats::CalculatingJob).with(user.id, 2024, 3)
end
it 'schedules visit suggesting' do
Sidekiq::Testing.inline! do
expect { service.call }.to have_enqueued_job(VisitSuggestingJob)
end
expect { service.call }.to have_enqueued_job(VisitSuggestingJob)
end
end

View File

@@ -10,7 +10,6 @@ RSpec.describe Imports::Watcher do
before do
stub_const('Imports::Watcher::WATCHED_DIR_PATH', watched_dir_path)
Sidekiq::Testing.inline!
end
after { Sidekiq::Testing.fake! }