Refactor refresh_feeds

pull/560/head
Omar Roth 6 years ago
parent c07ad0941c
commit ad09e734da
No known key found for this signature in database
GPG Key ID: B8254FB7EC3D37F2

@ -129,8 +129,8 @@ end
# Start jobs # Start jobs
refresh_channels(PG_DB, logger, config.channel_threads, config.full_refresh) refresh_channels(PG_DB, logger, config)
refresh_feeds(PG_DB, logger, config.feed_threads, config.use_feed_events) refresh_feeds(PG_DB, logger, config)
subscribe_to_feeds(PG_DB, logger, HMAC_KEY, config) subscribe_to_feeds(PG_DB, logger, HMAC_KEY, config)
statistics = { statistics = {

@ -105,7 +105,7 @@ struct Config
hmac_key: String?, # HMAC signing key for CSRF tokens and verifying pubsub subscriptions hmac_key: String?, # HMAC signing key for CSRF tokens and verifying pubsub subscriptions
domain: String?, # Domain to be used for links to resources on the site where an absolute URL is required domain: String?, # Domain to be used for links to resources on the site where an absolute URL is required
use_pubsub_feeds: {type: Bool | Int32, default: false}, # Subscribe to channels using PubSubHubbub (requires domain, hmac_key) use_pubsub_feeds: {type: Bool | Int32, default: false}, # Subscribe to channels using PubSubHubbub (requires domain, hmac_key)
use_feed_events: {type: Bool, default: false}, # Update feeds on receiving notifications use_feed_events: {type: Bool | Int32, default: false}, # Update feeds on receiving notifications
default_home: {type: String, default: "Top"}, default_home: {type: String, default: "Top"},
feed_menu: {type: Array(String), default: ["Popular", "Top", "Trending", "Subscriptions"]}, feed_menu: {type: Array(String), default: ["Popular", "Top", "Trending", "Subscriptions"]},
top_enabled: {type: Bool, default: true}, top_enabled: {type: Bool, default: true},

@ -1,4 +1,4 @@
def refresh_channels(db, logger, max_threads = 1, full_refresh = false) def refresh_channels(db, logger, config)
max_channel = Channel(Int32).new max_channel = Channel(Int32).new
spawn do spawn do
@ -20,7 +20,7 @@ def refresh_channels(db, logger, max_threads = 1, full_refresh = false)
active_threads += 1 active_threads += 1
spawn do spawn do
begin begin
channel = fetch_channel(id, db, full_refresh) channel = fetch_channel(id, db, config.full_refresh)
db.exec("UPDATE channels SET updated = $1, author = $2, deleted = false WHERE id = $3", Time.now, channel.author, id) db.exec("UPDATE channels SET updated = $1, author = $2, deleted = false WHERE id = $3", Time.now, channel.author, id)
rescue ex rescue ex
@ -39,20 +39,46 @@ def refresh_channels(db, logger, max_threads = 1, full_refresh = false)
end end
end end
max_channel.send(max_threads) max_channel.send(config.channel_threads)
end end
def refresh_feeds(db, logger, max_threads = 1, use_feed_events = false) def refresh_feeds(db, logger, config)
max_channel = Channel(Int32).new
# Spawn thread to handle feed events # Spawn thread to handle feed events
if use_feed_events if config.use_feed_events
case config.use_feed_events
when Bool
max_feed_event_threads = config.use_feed_events.as(Bool).to_unsafe
when Int32
max_feed_event_threads = config.use_feed_events.as(Int32)
end
max_feed_event_channel = Channel(Int32).new
spawn do spawn do
queue = Deque(String).new(30) queue = Deque(String).new(30)
PG.connect_listen(PG_URL, "feeds") do |event|
if !queue.includes? event.payload
queue << event.payload
end
end
max_threads = max_feed_event_channel.receive
active_threads = 0
active_channel = Channel(Bool).new
spawn do
loop do loop do
if event = queue.shift? until queue.empty?
event = queue.shift
if active_threads >= max_threads
if active_channel.receive
active_threads -= 1
end
end
active_threads += 1
spawn do
begin
feed = JSON.parse(event) feed = JSON.parse(event)
email = feed["email"].as_s email = feed["email"].as_s
action = feed["action"].as_s action = feed["action"].as_s
@ -63,23 +89,21 @@ def refresh_feeds(db, logger, max_threads = 1, use_feed_events = false)
when "refresh" when "refresh"
db.exec("REFRESH MATERIALIZED VIEW #{view_name}") db.exec("REFRESH MATERIALIZED VIEW #{view_name}")
end end
rescue ex
# Delete any future events that we just processed
queue.delete(event)
else
sleep 1.second
end end
Fiber.yield active_channel.send(true)
end end
end end
PG.connect_listen(PG_URL, "feeds") do |event| sleep 5.seconds
queue << event.payload
end end
end end
max_feed_event_channel.send(max_feed_event_threads.as(Int32))
end end
max_channel = Channel(Int32).new
spawn do spawn do
max_threads = max_channel.receive max_threads = max_channel.receive
active_threads = 0 active_threads = 0
@ -144,7 +168,7 @@ def refresh_feeds(db, logger, max_threads = 1, use_feed_events = false)
end end
end end
max_channel.send(max_threads) max_channel.send(config.feed_threads)
end end
def subscribe_to_feeds(db, logger, key, config) def subscribe_to_feeds(db, logger, key, config)

Loading…
Cancel
Save