### Thread
```Thread()```函数用于创建并发线程。
```Thread()```函数返回一个```Thread```对象,用于管理创建出的并发线程、线程通信等。
```Thread```对象
Thread(func, ...args)
Thread(...items)
参数```func```是用于并发执行的函数(通过引用传递),支持传入匿名函数。```func```可接受多个参数,这些参数将在并发执行时通过```...args```传入。因此,```func```的参数列表需要与```...args```一致。
func
true
function
参数```arg```是在回调执行时传递给```func```(即并发线程执行函数)的实际参数;参数```arg```可能有多个,```func```的参数列表需要与```...args```一致。
arg
false
string、number、bool、object、array、function、空值等系统支持的所有类型
参数```item```是一个数组,包含待并发执行的函数引用及其参数,调用```Thread```函数时的参数```item```可以传入多组。
item
true
array
```javascript
function test1(a, b, c) {
Log("test1:", a, b, c)
}
function main() {
var t1 = threading.Thread(test1, 1, 2, 3)
var t2 = threading.Thread(function (msg) {
Log("msg:", msg)
}, "Hello thread2")
t1.join()
t2.join()
}
同时创建一个自定义函数和一个匿名函数的并发线程。
function test1(msg) {
Log("msg:", msg)
test2("Hello test2")
}
function main() {
var t1 = threading.Thread(
[function(a, b, c) {Log(a, b, c)}, 1, 2, 3],
[test1, "Hello test1"],
[`function test2(msg) {Log("msg:", msg)}`])
t1.join()
}
使用Thread(...items)
形式来创建并发线程,顺序执行多个函数。
function testFunc1(p) {
Log("testFunc1 p:", p)
}
function main() {
threading.Thread(function(pfn) {
var threadName = threading.currentThread().name()
var threadId = threading.currentThread().id()
pfn(`in thread threadName: ${threadName}, threadId: ${threadId}`)
}, testFunc1).join()
}
支持向并发执行函数的参数传入函数。
function ml(input) {
const net = new brain.NeuralNetwork()
net.train([
{ input: [0, 0], output: [0] },
{ input: [0, 1], output: [1] },
{ input: [1, 0], output: [1] },
{ input: [1, 1], output: [0] },
])
return net.run(input)
}
function main() {
var ret = threading.Thread([ml, [1, 0]], [HttpQuery("https://unpkg.com/brain.js")]).join()
// ret: {"id":1,"terminated":false,"elapsed":337636000,"ret":{"0":0.9339330196380615}}
Log(ret)
}
支持传入函数字符串,可以动态导入外部库进行并发计算。
传入Thread()
函数用于并发执行的线程函数func
运行在隔离环境中,因此无法直接引用线程外部的变量,引用时会编译失败。同时,线程内不支持引用其它闭包函数。线程内部可调用平台提供的所有API,但不能调用用户自定义的其他函数。
当线程执行完毕且没有被持续引用时,系统底层会自动回收线程相关的资源,不需要显式调用join()
函数来释放资源。如果有持续引用导致无法释放资源,并发数量超过2000个会报错:InternalError: too many routine wait, max is 2000
。
支持回测系统、实盘环境;所有并发线程相关的函数,在回测系统中仅作为代码兼容支持,实际不会真正并发线程执行,本章不再赘述。
{@fun/Threads/threading/getThread getThread}, {@fun/Threads/threading/mainThread mainThread}, {@fun/Threads/threading/currentThread currentThread}, {@fun/Threads/threading/Lock Lock}, {@fun/Threads/threading/Condition Condition}, {@fun/Threads/threading/Event Event}, {@fun/Threads/threading/Dict Dict}, {@fun/Threads/threading/pending pending}, {@fun/Threads/threading/eventLoop eventLoop}
```getThread()```函数返回通过参数指定threadId的```Thread```对象
```Thread```对象
getThread(threadId)
参数```threadId```为线程对象Id,通过指定参数获取对应的线程对象。
threadId
true
number
```javascript
function main() {
var t1 = threading.Thread(function () {
// Thread 对象有方法:id(),用于获取线程的Id,可以查看文档对应Thread对象的章节
var id = threading.currentThread().id()
var thread1 = threading.getThread(id)
Log("id:", id, ", thread1.id():", thread1.id())
Log(`id == thread1.id():`, id == thread1.id())
})
t1.join()
}
通过threadId
获取指定的线程对象。
支持回测系统、实盘环境。
如果期望获取的线程已经执行完毕、释放,此时无法通过threading.getThread(threadId)
获取该线程的线程对象。
{@fun/Threads/threading/Thread Thread}, {@fun/Threads/threading/mainThread mainThread}, {@fun/Threads/threading/currentThread currentThread}, {@fun/Threads/threading/Lock Lock}, {@fun/Threads/threading/Condition Condition}, {@fun/Threads/threading/Event Event}, {@fun/Threads/threading/Dict Dict}, {@fun/Threads/threading/pending pending}, {@fun/Threads/threading/eventLoop eventLoop}
```mainThread()```函数返回主线程的线程对象。
```Thread```对象
mainThread()
```javascript
function main() {
Log("主线程的threadId:", threading.mainThread().id())
}
获取主线程的Thread
对象,输出主线程的threadId
。
function test() {
Log("test函数中输出主线程Id:", threading.mainThread().id())
}
function main() {
var t1 = threading.Thread(test)
t1.join()
}
在并发的线程中也可以获取主线程的线程对象。
支持回测系统、实盘环境。
{@fun/Threads/threading/getThread getThread}, {@fun/Threads/threading/Thread Thread}, {@fun/Threads/threading/currentThread currentThread}, {@fun/Threads/threading/Lock Lock}, {@fun/Threads/threading/Condition Condition}, {@fun/Threads/threading/Event Event}, {@fun/Threads/threading/Dict Dict}, {@fun/Threads/threading/pending pending}, {@fun/Threads/threading/eventLoop eventLoop}
```currentThread()```函数返回当前线程的线程对象。
```Thread```对象
currentThread()
```javascript
function test() {
Log("当前线程的Id:", threading.currentThread().id())
}
function main() {
var t1 = threading.Thread(test)
t1.join()
}
获取当前线程的Thread
对象,输出当前线程的threadId
。
支持回测系统、实盘环境。
{@fun/Threads/threading/Thread Thread}, {@fun/Threads/threading/mainThread mainThread}, {@fun/Threads/threading/Thread Thread}, {@fun/Threads/threading/Lock Lock}, {@fun/Threads/threading/Condition Condition}, {@fun/Threads/threading/Event Event}, {@fun/Threads/threading/Dict Dict}, {@fun/Threads/threading/pending pending}, {@fun/Threads/threading/eventLoop eventLoop}
```Lock()```函数返回一个线程锁对象。
```ThreadLock```对象
Lock()
```javascript
function consumer(productionQuantity, dict, lock) {
for (var i = 0; i < productionQuantity; i++) {
lock.acquire()
var count = dict.get("count")
Log("consumer:", count)
Sleep(1000)
lock.release()
}
}
function producer(productionQuantity, dict, lock) {
for (var i = 0; i < productionQuantity; i++) {
lock.acquire()
dict.set("count", i)
Log("producer:", i)
Sleep(1000)
lock.release()
}
}
function main() {
var dict = threading.Dict()
dict.set("count", -1)
var lock = threading.Lock()
var productionQuantity = 10
var producerThread = threading.Thread(producer, productionQuantity, dict, lock)
var consumerThread = threading.Thread(consumer, productionQuantity, dict, lock)
consumerThread.join()
producerThread.join()
}
两个并发线程访问公共资源。
支持回测系统、实盘环境。
{@fun/Threads/threading/getThread getThread}, {@fun/Threads/threading/mainThread mainThread}, {@fun/Threads/threading/currentThread currentThread}, {@fun/Threads/threading/Thread Thread}, {@fun/Threads/threading/Condition Condition}, {@fun/Threads/threading/Event Event}, {@fun/Threads/threading/Dict Dict}, {@fun/Threads/threading/pending pending}, {@fun/Threads/threading/eventLoop eventLoop}
```Condition()```函数返回一个```ThreadCondition```对象。
```ThreadCondition```对象
Condition()
```javascript
function consumer(productionQuantity, dict, condition) {
for (var i = 0; i < productionQuantity; i++) {
condition.acquire()
while (dict.get("array").length == 0) {
condition.wait()
}
var arr = dict.get("array")
var count = arr.shift()
dict.set("array", arr)
Log("consumer:", count, ", array:", arr)
condition.release()
Sleep(1000)
}
}
function producer(productionQuantity, dict, condition) {
for (var i = 0; i < productionQuantity; i++) {
condition.acquire()
var arr = dict.get("array")
arr.push(i)
dict.set("array", arr)
Log("producer:", i, ", array:", arr)
condition.notify()
condition.release()
Sleep(1000)
}
}
function main() {
var dict = threading.Dict()
dict.set("array", [])
var condition = threading.Condition()
var productionQuantity = 10
var producerThread = threading.Thread(producer, productionQuantity, dict, condition)
var consumerThread = threading.Thread(consumer, productionQuantity, dict, condition)
consumerThread.join()
producerThread.join()
}
两个并发线程访问公共资源。
回测系统没有实现该功能,仅仅是定义。
{@fun/Threads/threading/getThread getThread}, {@fun/Threads/threading/mainThread mainThread}, {@fun/Threads/threading/currentThread currentThread}, {@fun/Threads/threading/Lock Lock}, {@fun/Threads/threading/Thread Thread}, {@fun/Threads/threading/Event Event}, {@fun/Threads/threading/Dict Dict}, {@fun/Threads/threading/pending pending}, {@fun/Threads/threading/eventLoop eventLoop}
```Event()```函数返回一个```ThreadEvent```对象。
```ThreadEvent```对象
Event()
```javascript
function consumer(productionQuantity, dict, pEvent, cEvent) {
for (var i = 0; i < productionQuantity; i++) {
while (dict.get("array").length == 0) {
pEvent.wait()
}
if (pEvent.isSet()) {
pEvent.clear()
}
var arr = dict.get("array")
var count = arr.shift()
dict.set("array", arr)
Log("consumer:", count, ", array:", arr)
cEvent.set()
Sleep(1000)
}
}
function producer(productionQuantity, dict, pEvent, cEvent) {
for (var i = 0; i < productionQuantity; i++) {
while (dict.get("array").length != 0) {
cEvent.wait()
}
if (cEvent.isSet()) {
cEvent.clear()
}
var arr = dict.get("array")
arr.push(i)
dict.set("array", arr)
Log("producer:", i, ", array:", arr)
pEvent.set()
Sleep(1000)
}
}
function main() {
var dict = threading.Dict()
dict.set("array", [])
var pEvent = threading.Event()
var cEvent = threading.Event()
var productionQuantity = 10
var producerThread = threading.Thread(producer, productionQuantity, dict, pEvent, cEvent)
var consumerThread = threading.Thread(consumer, productionQuantity, dict, pEvent, cEvent)
consumerThread.join()
producerThread.join()
}
两个并发线程访问公共资源。
支持回测系统、实盘环境。
{@fun/Threads/threading/getThread getThread}, {@fun/Threads/threading/mainThread mainThread}, {@fun/Threads/threading/currentThread currentThread}, {@fun/Threads/threading/Lock Lock}, {@fun/Threads/threading/Condition Condition}, {@fun/Threads/threading/Thread Thread}, {@fun/Threads/threading/Dict Dict}, {@fun/Threads/threading/pending pending}, {@fun/Threads/threading/eventLoop eventLoop}
```Dict()```函数返回一个```ThreadDict```对象。
```ThreadDict```对象
Dict()
```javascript
function threadFun1(obj) {
obj["age"] = 100
while (true) {
Log("threadFun1 obj:", obj)
Sleep(5000)
}
}
function threadFun2(obj) {
while (true) {
Log("threadFun2 obj:", obj)
Sleep(5000)
}
}
function main() {
var obj = {"age": 10}
var t1 = threading.Thread(threadFun1, obj)
var t2 = threading.Thread(threadFun2, obj)
t1.join()
t2.join()
}
给并发的线程执行函数传入普通的对象,测试修改对象的键值后是否引起其它线程中的对象键值变动。
function threadFun1(threadDict) {
threadDict.set("age", 100)
while (true) {
Log(`threadFun1 threadDict.get("age"):`, threadDict.get("age"))
Sleep(5000)
}
}
function threadFun2(threadDict) {
while (true) {
Log(`threadFun2 threadDict.get("age"):`, threadDict.get("age"))
Sleep(5000)
}
}
function main() {
var threadDict = threading.Dict()
threadDict.set("age", 10)
var t1 = threading.Thread(threadFun1, threadDict)
var t2 = threading.Thread(threadFun2, threadDict)
t1.join()
t2.join()
}
给并发的线程执行函数传入Dict()
函数创建的ThreadDict
对象,测试修改对象的键值后是否引起其它线程中的对象键值变动。
线程并发函数传入普通对象时为深拷贝传递,在并发线程中修改键值,并不会影响到其它线程中的字典。
支持回测系统、实盘环境。
{@fun/Threads/threading/getThread getThread}, {@fun/Threads/threading/mainThread mainThread}, {@fun/Threads/threading/currentThread currentThread}, {@fun/Threads/threading/Lock Lock}, {@fun/Threads/threading/Condition Condition}, {@fun/Threads/threading/Event Event}, {@fun/Threads/threading/Thread Thread}, {@fun/Threads/threading/pending pending}, {@fun/Threads/threading/eventLoop eventLoop}
```pending()```函数返回当前策略程序正在运行的并发线程数。
number
pending()
```javascript
function threadFun1() {
Log("threadFun1")
Sleep(3000)
}
function threadFun2() {
for (var i = 0; i < 3; i++) {
LogStatus(_D(), "print from threadFun2")
Sleep(3000)
}
}
function main() {
Log(`begin -- threading.pending():`, threading.pending())
var t1 = threading.Thread(threadFun1)
var t2 = threading.Thread(threadFun2)
Log(`after threading.Thread -- threading.pending():`, threading.pending())
t1.join()
t2.join()
Log(`after thread.join -- threading.pending():`, threading.pending())
}
创建两个并发运行的线程,在不同时间节点调用pending()
函数。
策略main()
函数开始运行时直接调用pending()
函数会返回1,因为策略main()
函数所在的主线程也是一个pending中的线程。
支持回测系统、实盘环境。
{@fun/Threads/threading/getThread getThread}, {@fun/Threads/threading/mainThread mainThread}, {@fun/Threads/threading/currentThread currentThread}, {@fun/Threads/threading/Lock Lock}, {@fun/Threads/threading/Condition Condition}, {@fun/Threads/threading/Event Event}, {@fun/Threads/threading/Dict Dict}, {@fun/Threads/threading/Thread Thread}, {@fun/Threads/threading/eventLoop eventLoop}
NetSettings Thread