|
|
@ -47,34 +47,36 @@ def refresh_feeds(db, logger, max_threads = 1, use_feed_events = false)
|
|
|
|
|
|
|
|
|
|
|
|
# Spawn thread to handle feed events
|
|
|
|
# Spawn thread to handle feed events
|
|
|
|
if use_feed_events
|
|
|
|
if use_feed_events
|
|
|
|
queue = Deque(String).new(30)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
spawn do
|
|
|
|
spawn do
|
|
|
|
loop do
|
|
|
|
queue = Deque(String).new(30)
|
|
|
|
if event = queue.shift?
|
|
|
|
|
|
|
|
feed = JSON.parse(event)
|
|
|
|
|
|
|
|
email = feed["email"].as_s
|
|
|
|
|
|
|
|
action = feed["action"].as_s
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
view_name = "subscriptions_#{sha256(email)}"
|
|
|
|
spawn do
|
|
|
|
|
|
|
|
loop do
|
|
|
|
|
|
|
|
if event = queue.shift?
|
|
|
|
|
|
|
|
feed = JSON.parse(event)
|
|
|
|
|
|
|
|
email = feed["email"].as_s
|
|
|
|
|
|
|
|
action = feed["action"].as_s
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
view_name = "subscriptions_#{sha256(email)}"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
case action
|
|
|
|
|
|
|
|
when "refresh"
|
|
|
|
|
|
|
|
db.exec("REFRESH MATERIALIZED VIEW #{view_name}")
|
|
|
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
|
|
case action
|
|
|
|
# Delete any future events that we just processed
|
|
|
|
when "refresh"
|
|
|
|
queue.delete(event)
|
|
|
|
db.exec("REFRESH MATERIALIZED VIEW #{view_name}")
|
|
|
|
else
|
|
|
|
|
|
|
|
sleep 1.second
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
|
|
# Delete any future events that we just processed
|
|
|
|
Fiber.yield
|
|
|
|
queue.delete(event)
|
|
|
|
|
|
|
|
else
|
|
|
|
|
|
|
|
sleep 1.second
|
|
|
|
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
|
|
Fiber.yield
|
|
|
|
|
|
|
|
end
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
PG.connect_listen(PG_URL, "feeds") do |event|
|
|
|
|
PG.connect_listen(PG_URL, "feeds") do |event|
|
|
|
|
queue << event.payload
|
|
|
|
queue << event.payload
|
|
|
|
|
|
|
|
end
|
|
|
|
end
|
|
|
|
end
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
|
|