This repository was archived by the owner on Nov 20, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathprocess.lua
More file actions
79 lines (66 loc) · 2.44 KB
/
process.lua
File metadata and controls
79 lines (66 loc) · 2.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
-- Submodule for distributed processes.
module('concurrent._distributed._process', package.seeall)
last = -1 -- Counter for the last auxiliary process.
-- The existing version of this function for process creation is renamed.
_spawn = concurrent.spawn
-- Creates a process either local or remote. If the process is a local process
-- the old renamed version of the function is used, otherwise an auxiliary
-- system process takes care of the creation of a remote process. Returns
-- the either the local or the remote PID of the newly created process.
function spawn(...)
local args = { ... }
if type(args[1]) == 'function' then
return _spawn(unpack(args))
end
local node = args[1]
table.remove(args, 1)
local func = args[1]
table.remove(args, 1)
local pid, errmsg = spawn_system(spawn_process, concurrent.self(), node,
func, args)
local msg = concurrent._scheduler.wait()
if not msg.pid then
return nil, msg.errmsg
end
return { msg.pid, node }
end
-- Auxiliary system process that creates a remote process.
function spawn_process(parent, node, func, args)
concurrent.send({ -1, node} , { subject = 'SPAWN',
from = { pid = concurrent.self(), node = concurrent.node() },
func = func, args = args })
local msg = concurrent.receive()
concurrent._scheduler.barriers[parent] = msg
end
-- Handles spawn requests from a remote node.
function controller_spawn(msg)
local func = loadstring('return ' .. msg.func)
if func then
local pid, errmsg = spawn(func(), unpack(msg.args))
concurrent.send({ msg.from.pid, msg.from.node }, { pid = pid,
errmsg = errmsg })
end
end
-- Creates auxiliary system functions, that are mostly similar to normal
-- processes, but have a negative number as a PID and lack certain capabilities.
function spawn_system(func, ...)
local co = coroutine.create(
function (...)
coroutine.yield()
func(...)
end
)
last = last - 1
local pid = last
concurrent._process.processes[pid] = co
concurrent._message.mailboxes[pid] = {}
concurrent._scheduler.timeouts[pid] = 0
local status, errmsg = concurrent._process.resume(co, ...)
if not status then
return nil, errmsg
end
return pid
end
-- Controller to handle spawn requests.
concurrent._distributed._network.controllers['SPAWN'] = controller_spawn
concurrent.spawn = spawn