You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
99 lines
3.4 KiB
99 lines
3.4 KiB
--------------------------------------------------------------
|
|
-- Limits resource usage while executing tasks.
|
|
-- Tasks added will be run in parallel, with a maximum of
|
|
-- simultaneous tasks to prevent consuming all/too many resources.
|
|
-- Every task added will immediately be scheduled (if there is room)
|
|
-- using the `wait` method one can wait for completion.
|
|
|
|
local copas = require("copas")
|
|
local pack = table.pack or function(...) return {n=select('#',...),...} end
|
|
local unpack = function(t) return (table.unpack or unpack)(t, 1, t.n or #t) end
|
|
|
|
local pcall = pcall
|
|
if _VERSION=="Lua 5.1" and not jit then -- obsolete: only for Lua 5.1 compatibility
|
|
pcall = require("coxpcall").pcall
|
|
end
|
|
|
|
-- Add a task to the queue, returns the coroutine created
|
|
-- identical to `copas.addthread`. Can be called while the
|
|
-- set of tasks is executing.
|
|
local function add(self, task, ...)
|
|
local carg = pack(...)
|
|
local coro = copas.addthread(function()
|
|
copas.sleep(-1) -- go to sleep until being woken
|
|
local suc, err = pcall(task, unpack(carg)) -- start the task
|
|
self:removethread(coroutine.running()) -- dismiss ourselves
|
|
if not suc then error(err) end -- rethrow error
|
|
end)
|
|
table.insert(self.queue, coro) -- store in list
|
|
self:next()
|
|
return coro
|
|
end
|
|
|
|
-- remove a task from the queue. Can be called while the
|
|
-- set of tasks is executing. Will NOT stop the task if
|
|
-- it is already running.
|
|
local function remove(self, coro)
|
|
self.queue[coro] = nil
|
|
if self.running[coro] then
|
|
-- it is in the already running set
|
|
self.running[coro] = nil
|
|
self.count = self.count - 1
|
|
else
|
|
-- check the queue and remove if found
|
|
for i, item in ipairs(self.queue) do
|
|
if coro == item then
|
|
table.remove(self.queue, i)
|
|
break
|
|
end
|
|
end
|
|
end
|
|
self:next()
|
|
end
|
|
|
|
-- schedules the next task (if any) for execution, signals completeness
|
|
local function nxt(self)
|
|
while self.count < self.maxt do
|
|
local coro = self.queue[1]
|
|
if not coro then break end -- queue is empty, so nothing to add
|
|
-- move it to running and restart the task
|
|
table.remove(self.queue, 1)
|
|
self.running[coro] = coro
|
|
self.count = self.count + 1
|
|
copas.wakeup(coro)
|
|
end
|
|
if self.count == 0 and next(self.waiting) then
|
|
-- all tasks done, resume the waiting tasks so they can unblock/return
|
|
for coro in pairs(self.waiting) do
|
|
copas.wakeup(coro)
|
|
end
|
|
end
|
|
end
|
|
|
|
-- Waits for the tasks. Yields until all are finished
|
|
local function wait(self)
|
|
if self.count == 0 then return end -- There's nothing to do...
|
|
local coro = coroutine.running()
|
|
-- now store this coroutine (so we know which to wakeup) and go to sleep
|
|
self.waiting[coro] = true
|
|
copas.sleep(-1)
|
|
self.waiting[coro] = nil
|
|
end
|
|
|
|
-- creats a new tasksrunner, with maximum maxt simultaneous threads
|
|
local function new(maxt)
|
|
return {
|
|
maxt = maxt or 99999, -- max simultaneous tasks
|
|
count = 0, -- count of running tasks
|
|
queue = {}, -- tasks waiting (list/array)
|
|
running = {}, -- tasks currently running (indexed by coroutine)
|
|
waiting = {}, -- coroutines, waiting for all tasks being finished (indexed by coro)
|
|
addthread = add,
|
|
removethread = remove,
|
|
next = nxt,
|
|
wait = wait,
|
|
}
|
|
end
|
|
|
|
return { new = new }
|
|
|
|
|