You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
lua-lib/copas/queue.lua

191 lines
4.7 KiB

local copas = require "copas"
local Sema = copas.semaphore
local Lock = copas.lock
local Queue = {}
Queue.__index = Queue
local new_name do
local count = 0
function new_name()
count = count + 1
return "copas_queue_" .. count
end
end
-- Creates a new Queue instance
function Queue.new(opts)
opts = opts or {}
local self = {}
setmetatable(self, Queue)
self.name = opts.name or new_name()
self.sema = Sema.new(10^9)
self.head = 1
self.tail = 1
self.list = {}
self.workers = setmetatable({}, { __mode = "k" })
self.stopping = false
self.worker_id = 0
return self
end
-- Pushes an item in the queue (can be 'nil')
-- returns true, or nil+err ("stopping", or "destroyed")
function Queue:push(item)
if self.stopping then
return nil, "stopping"
end
self.list[self.head] = item
self.head = self.head + 1
self.sema:give()
return true
end
-- Pops and item from the queue. If there are no items in the queue it will yield
-- until there are or a timeout happens (exception is when `timeout == 0`, then it will
-- not yield but return immediately). If the timeout is `math.huge` it will wait forever.
-- Returns item, or nil+err ("timeout", or "destroyed")
function Queue:pop(timeout)
local ok, err = self.sema:take(1, timeout)
if not ok then
return ok, err
end
local item = self.list[self.tail]
self.list[self.tail] = nil
self.tail = self.tail + 1
if self.tail == self.head then
-- reset queue
self.list = {}
self.tail = 1
self.head = 1
if self.stopping then
-- we're stopping and last item being returned, so we're done
self:destroy()
end
end
return item
end
-- return the number of items left in the queue
function Queue:get_size()
return self.head - self.tail
end
-- instructs the queue to stop. Will not accept any more 'push' calls.
-- will autocall 'destroy' when the queue is empty.
-- returns immediately. See `finish`
function Queue:stop()
if not self.stopping then
self.stopping = true
self.lock = Lock.new(nil, true)
self.lock:get() -- close the lock
if self:get_size() == 0 then
-- queue is already empty, so "pop" function cannot call destroy on next
-- pop, so destroy now.
self:destroy()
end
end
return true
end
-- Finishes a queue. Calls stop and then waits for the queue to run empty (and be
-- destroyed) before returning. returns true or nil+err ("timeout", or "destroyed")
-- Parameter no_destroy_on_timeout indicates if the queue is not to be forcefully
-- destroyed on a timeout.
function Queue:finish(timeout, no_destroy_on_timeout)
self:stop()
local _, err = self.lock:get(timeout)
-- the lock never gets released, only destroyed, so we have to check the error string
if err == "timeout" then
if not no_destroy_on_timeout then
self:destroy()
end
return nil, err
end
return true
end
do
local destroyed_func = function()
return nil, "destroyed"
end
local destroyed_queue_mt = {
__index = function()
return destroyed_func
end
}
-- destroys a queue immediately. Abandons what is left in the queue.
-- Releases all waiting threads with `nil+"destroyed"`
function Queue:destroy()
if self.lock then
self.lock:destroy()
end
self.sema:destroy()
setmetatable(self, destroyed_queue_mt)
-- clear anything left in the queue
for key in pairs(self.list) do
self.list[key] = nil
end
return true
end
end
-- adds a worker that will handle whatever is passed into the queue. Can be called
-- multiple times to add more workers.
-- The threads automatically exit when the queue is destroyed.
-- worker function signature: `function(item)` (Note: worker functions run
-- unprotected, so wrap code in an (x)pcall if errors are expected, otherwise the
-- worker will exit on an error, and queue handling will stop)
-- Returns the coroutine added.
function Queue:add_worker(worker)
assert(type(worker) == "function", "expected worker to be a function")
local coro
self.worker_id = self.worker_id + 1
local worker_name = self.name .. ":worker_" .. self.worker_id
coro = copas.addnamedthread(worker_name, function()
while true do
local item, err = self:pop(math.huge) -- wait forever
if err then
break -- queue destroyed, exit
end
worker(item) -- TODO: wrap in errorhandling
end
self.workers[coro] = nil
end)
self.workers[coro] = true
return coro
end
-- returns a list/array of current workers (coroutines) handling the queue.
-- (only the workers added by `add_worker`, and still active, will be in this list)
function Queue:get_workers()
local lst = {}
for coro in pairs(self.workers) do
if coroutine.status(coro) ~= "dead" then
lst[#lst+1] = coro
end
end
return lst
end
return Queue