Fix update_feeds job

pull/733/head
Omar Roth 6 years ago
parent 35e63fa3f5
commit bcc8ba73bf

@ -1,5 +1,6 @@
crawl_threads: 1 crawl_threads: 1
channel_threads: 1 channel_threads: 1
feed_threads: 1
video_threads: 1 video_threads: 1
db: db:
user: kemal user: kemal
@ -10,4 +11,3 @@ db:
full_refresh: false full_refresh: false
https_only: false https_only: false
geo_bypass: true geo_bypass: true
update_feeds: true

@ -31,6 +31,7 @@ HMAC_KEY = CONFIG.hmac_key || Random::Secure.random_bytes(32)
crawl_threads = CONFIG.crawl_threads crawl_threads = CONFIG.crawl_threads
channel_threads = CONFIG.channel_threads channel_threads = CONFIG.channel_threads
feed_threads = CONFIG.feed_threads
video_threads = CONFIG.video_threads video_threads = CONFIG.video_threads
Kemal.config.extra_options do |parser| Kemal.config.extra_options do |parser|
@ -51,6 +52,14 @@ Kemal.config.extra_options do |parser|
exit exit
end end
end end
parser.on("-f THREADS", "--feed-threads=THREADS", "Number of threads for refreshing feeds (default: #{feed_threads})") do |number|
begin
feed_threads = number.to_i
rescue ex
puts "THREADS must be integer"
exit
end
end
parser.on("-v THREADS", "--video-threads=THREADS", "Number of threads for refreshing videos (default: #{video_threads})") do |number| parser.on("-v THREADS", "--video-threads=THREADS", "Number of threads for refreshing videos (default: #{video_threads})") do |number|
begin begin
video_threads = number.to_i video_threads = number.to_i
@ -85,6 +94,8 @@ end
refresh_channels(PG_DB, channel_threads, CONFIG.full_refresh) refresh_channels(PG_DB, channel_threads, CONFIG.full_refresh)
refresh_feeds(PG_DB, feed_threads)
video_threads.times do |i| video_threads.times do |i|
spawn do spawn do
refresh_videos(PG_DB) refresh_videos(PG_DB)
@ -98,12 +109,6 @@ spawn do
end end
end end
if CONFIG.update_feeds
spawn do
update_feeds(PG_DB)
end
end
decrypt_function = [] of {name: String, value: Int32} decrypt_function = [] of {name: String, value: Int32}
spawn do spawn do
update_decrypt_function do |function| update_decrypt_function do |function|

@ -2,6 +2,7 @@ class Config
YAML.mapping({ YAML.mapping({
crawl_threads: Int32, crawl_threads: Int32,
channel_threads: Int32, channel_threads: Int32,
feed_threads: Int32,
video_threads: Int32, video_threads: Int32,
db: NamedTuple( db: NamedTuple(
user: String, user: String,
@ -15,7 +16,6 @@ class Config
hmac_key: String?, hmac_key: String?,
full_refresh: Bool, full_refresh: Bool,
geo_bypass: Bool, geo_bypass: Bool,
update_feeds: Bool,
}) })
end end

@ -104,15 +104,42 @@ def refresh_videos(db)
end end
end end
def update_feeds(db) def refresh_feeds(db, max_threads = 1)
loop do max_channel = Channel(Int32).new
users = db.query_all("SELECT email FROM users", as: String)
spawn do
max_threads = max_channel.receive
active_threads = 0
active_channel = Channel(Bool).new
users.each do |email| loop do
db.query("SELECT email FROM users") do |rs|
rs.each do
email = rs.read(String)
view_name = "subscriptions_#{sha256(email)[0..7]}" view_name = "subscriptions_#{sha256(email)[0..7]}"
if active_threads >= max_threads
if active_channel.receive
active_threads -= 1
end
end
active_threads += 1
spawn do
begin
db.exec("REFRESH MATERIALIZED VIEW #{view_name}") db.exec("REFRESH MATERIALIZED VIEW #{view_name}")
rescue ex
STDOUT << "REFRESH " << email << " : " << ex.message << "\n"
end
active_channel.send(true)
end
end
end end
end end
end
max_channel.send(max_threads)
end end
def pull_top_videos(config, db) def pull_top_videos(config, db)

Loading…
Cancel
Save