Unity Job System

摘要:
参考链接:http://esprog.hatenablog.com/entry/2018/05/19/150313 https://blogs.unity3d.com/2018/10/22/what-is-a-job-system/作为一个多线程系统,Job system由于其与ECS的内在集成而非常重要。我还根据作业系统的使用类型来查看作业,说实话

  参考链接 : 

  http://esprog.hatenablog.com/entry/2018/05/19/150313

  https://blogs.unity3d.com/2018/10/22/what-is-a-job-system/

  Job系统作为一个多线程系统, 它因为跟ECS有天生的融合关系所以比较重要的样子, 我也按照使用类型的分类来看看Job System到底怎么样.

  Job说实话就是一套封装的多线程系统, 我相信所有开发人员都能自己封装一套, 所以Unity推出这个的时候跟着ECS一起推出, 因为单独推出来的话肯定推不动, 多线程, 线程安全, 线程锁, 线程共享资源, 这些都没什么区别, 我从一个简单列表的功能来说吧.

  先来一个普通的多线程 :  

using System.Collections;
using System.Collections.Generic;
using UnityEngine;

using System;
using System.Threading;

public class NormalListAccessTest01 : MonoBehaviour
{
    public class RunData
    {
        public List<int> datas = new List<int>();
        public float speed;
        public float deltaTime;
    }

    public static void RunOnThread<T>(System.Action<T> call, T obj, System.Action endCall = null)
    {
        System.Threading.ThreadPool.QueueUserWorkItem((_obj) =>
        {
            call.Invoke(obj);
            if(endCall != null)
            { ThreadMaster.Instance.CallFromMainThread(endCall); }
        });
    }

    private void OnGUI()
    {
        if(GUI.Button(new Rect(100, 100, 100, 50), "Run Test"))
        {
            ThreadMaster.GetOrCreate();
            var data = new RunData();
            data.deltaTime = Time.deltaTime;
            data.speed = 100.0f;
            for(int i = 0; i < 10000; i++)
            {
                data.datas.Add(i);
            }
            RunOnThread<RunData>((_data) =>
            {
                // 这是在工作线程里
                Debug.Log("Start At : " + System.DateTime.Now.ToString("HH:mm:ss fff"));
                var move = _data.deltaTime * _data.speed;
                for(int i = 0; i < _data.datas.Count; i++)
                {
                    var val = _data.datas[i] + 1;
                    _data.datas[i] = val;
                }
            }, data, () =>
            {
                // 这是在主线程里
                Debug.Log(data.datas[0]);
                Debug.Log("End At : " + System.DateTime.Now.ToString("HH:mm:ss fff"));
            });
        }
    }
}

  线程转换的一个简单封装ThreadMaster : 

using System.Collections;
using System.Collections.Generic;
using UnityEngine;

public class ThreadMaster : MonoBehaviour
{
    private static ThreadMaster _instance;
    public static ThreadMaster Instance
    {
        get
        {
            return GetOrCreate();
        }
    }

    private volatile List<System.Action> _calls = new List<System.Action>();

    public static ThreadMaster GetOrCreate()
    {
        if(_instance == false)
        {
            _instance = new GameObject("ThreadMaster").AddComponent<ThreadMaster>();
        }
        return _instance;
    }
    public void CallFromMainThread(System.Action call)
    {
        _calls.Add(call);
    }
    void Update()
    {
        if(_calls.Count > 0)
        {
            for(int i = 0; i < _calls.Count; i++)
            {
                var call = _calls[i];
                call.Invoke();
            }
            _calls.Clear();
        }
    }
}

  没有加什么锁, 简单运行没有问题, 下面来个Job的跑一下:  

using UnityEngine;
using Unity.Collections;
using Unity.Jobs;

public class JobSystemSample00 : MonoBehaviour
{
    struct VelocityJob : IJob
    {
        public NativeArray<int> datas;

        public void Execute()
        {
            for(var i = 0; i < datas.Length; i++)
            {
                datas[i] = datas[i] + 1;
            }
        }
    }

    public void Test()
    {
        var datas = new NativeArray<int>(100, Allocator.Persistent);

        var job = new VelocityJob()
        {
            datas = datas
        };

        JobHandle jobHandle = job.Schedule();
        JobHandle.ScheduleBatchedJobs();

        //Debug.Log(datas[0]);     // Error : You must call JobHandle.Complete()
        jobHandle.Complete();
        Debug.Log(datas[0]);

        datas.Dispose();
    }

    private void OnGUI()
    {
        if(GUI.Button(new Rect(100, 100, 100, 50), "Start Test"))
        {
            Test();
        }
    }
}

  这里就有一个大问题了, 在有注释的地方 // Error : You must call JobHandle.Complete(), 是说在Job没有调用Complete()时, 去获取相关数组内容是非法的! 而这个jobHandle.Complete(); 无法通过工作线程去调用, 也就是说Job的运行它是无法自行结束的, 无法发出运行结束的通知的, 对比上面封装的普通多线程弱爆了.  而这个Complete()函数如果在工作线程执行完成前调用, 会强制立即执行(文档也是写 Wait for the job to complete), 也就是说它只能在主线程调用并且会阻塞主线程, 这样就可以定性了, 它的Job System不是为了提供一般使用的多线程封装给我们用的, 可是它又是很强大的, 因为它能使用高效的内存结构, 能保证数据访问安全, 能在需要的时候调用Complete方法强制等待工作线程执行完毕(如果没猜错的话, 引擎对这个做了很大优化, 并不是简单等待), 还有BurstCompile等, 如果我们封装成功了的话, 就是很好的多线程库了.

  PS : 打个比方一个mesh的渲染, 在渲染之前必须计算完所有坐标转换, Job的好处就是可以进行多线程并行的计算, 然后还能被主线程强制执行完毕, 比在主线程中单独计算强多了. 而这个强制执行才是核心逻辑.

  经过几次测试, 几乎没有办法简单扩展Job系统来让它成为像上面一样拥有自动完成通知的系统, 如下 : 

  1. 添加JobHandle变量到IJob中, 在Execute结束时调用  

    struct VelocityJob : IJob
    {
        public NativeArray<int> datas;

        [Unity.Collections.LowLevel.Unsafe.NativeDisableUnsafePtrRestriction]
        public JobHandle selfHandle;    // 是这个IJob调用Schedule的句柄

        public void Execute()
        {
            for(var i = 0; i < datas.Length; i++)
            {
                datas[i] = datas[i] + 1;
            }
            selfHandle.Complete();
        }
    }

  报错, InvalidOperationException: VelocityJob.selfHandle.jobGroup uses unsafe Pointers which is not allowed. 无法解决, 直接就无法在IJob结构体中添加JobHandle变量. 并且无法在工作线程中调用Complete方法.

  2. 添加回调函数进去

    struct VelocityJob : IJob
    {
        public NativeArray<int> datas;

        public System.Action endCall;

        public void Execute()
        {
            for(var i = 0; i < datas.Length; i++)
            {
                datas[i] = datas[i] + 1;
            }
            if(endCall != null)
            {
                endCall.Invoke();
            }
        }
    }

  报错, Job系统的struct里面只能存在值类型的变量 !!-_-

  3. 使用全局的引用以及线程转换逻辑来做成自动回调的形式, 虽然可以使用了可是非常浪费资源 :

using UnityEngine;
using Unity.Collections;
using Unity.Jobs;
using System.Collections.Generic;

public class JobSystemSample01 : MonoBehaviour
{
    private static int _id = 0;
    public static int NewID => _id++;
    public static Dictionary<int, IJobCall> ms_handleRef = new Dictionary<int, IJobCall>();

    public class IJobCall
    {
        public JobHandle jobHandle;
        public System.Action endCall;
    }
    struct VelocityJob : IJob
    {
        public NativeArray<int> datas;

        public int refID;
        public void Execute()
        {
            for(var i = 0; i < datas.Length; i++)
            {
                datas[i] = datas[i] + 1;
            }
            var handle = ms_handleRef[refID];
            ThreadMaster.Instance.CallFromMainThread(() =>
            {
                handle.jobHandle.Complete();
                if(handle.endCall != null)
                {
                    handle.endCall.Invoke();
                }
            });
        }
    }

    public void Test()
    {
        ThreadMaster.GetOrCreate();
        var datas = new NativeArray<int>(100, Allocator.Persistent);
        int id = NewID;
        var job = new VelocityJob() { refID = id, datas = datas };
        ms_handleRef[id] = new IJobCall()
        {
            jobHandle = job.Schedule(),
            endCall = () => { Debug.Log(datas[0]); datas.Dispose(); }
        };
    }

    private void OnGUI()
    {
        if(GUI.Button(new Rect(100, 100, 100, 50), "Start Test"))
        {
            Test();
        }
    }
}

  通过上面封装就可以作为一般多线程使用了, 并且我们获得了引擎提供的数据安全和高效逻辑性, 再加上利用BurstCpmpile和只读属性, 能够提升一些计算效率吧. ECS on Job已经在另外一篇中说过了, 这里忽略了.

  ----------------------------------------------

  当我测试到IJobParallelFor的时候, 发现并行并不像GPU那样的并行那么美好, 因为GPU它本身就是全并行的, 像卷积之类的, 它跟像素的处理顺序本身就没有关系, 可是我们的逻辑有些会受顺序的影响. 先看看下面的代码 : 

using UnityEngine;
using Unity.Collections;
using Unity.Jobs;


public class IJobParallelForSample01 : MonoBehaviour
{
    struct VelocityJob : IJobParallelFor
    {
        public NativeArray<int> datas;

        public void Execute(int index)
        {
            if(index == 0)
            {
                index = datas.Length - 1;
            }
            datas[index] = datas[index - 1] + 1;
        }
    }

    public void Test()
    {
        var datas = new NativeArray<int>(100, Allocator.Persistent);
        for(int i = 0; i < datas.Length; i++)
        {
            datas[i] = i;
        }
        var job = new VelocityJob()
        {
            datas = datas
        };

        var jobHandle = job.Schedule(datas.Length, 20);
        JobHandle.ScheduleBatchedJobs();
        
        jobHandle.Complete();
        Debug.Log(datas[0]);

        datas.Dispose();
    }

    private void OnGUI()
    {
        if(GUI.Button(new Rect(100, 100, 100, 50), "Start Test"))
        {
            Test();
        }
    }
}

  主要的是Schedule的方法上 : public static JobHandle Schedule<T>(this T jobData, int arrayLength, int innerloopBatchCount, JobHandle dependsOn = default) where T : struct, IJobParallelFor;

  第二个参数innerloopBatchCount表示的是分块的大小, 比如我们数组长度是100,  每20个元素分成一块, 一共可以分5块, 如果你的CPU核心数大于等于5它就能开5个线程来处理, 可是你不能去获取这个块之外的Index的数据:

Unity Job System第1张

  显然这里数据每20个一组被分为了5组, 在5个线程里, 然后跨组获取数据就报错了.

  测试一下线程数是否5个 : 

    struct VelocityJob : IJobParallelFor
    {
        public NativeArray<int> datas;

        public void Execute(int index)
        {
            throw new System.Exception(index + " ERROR");
        }
    }

Unity Job System第2张

   5个线程报错, 应该每个线程内的处理也是按照for的顺序来的.

  把每个块改成5的大小, 看看它能开几个线程:

 var jobHandle = job.Schedule(datas.Length, 5);

Unity Job System第3张

  恩开了8个, 我的机器确实是8核的, 不过它的分块不是我想的0-5-10-15, 或者0-12-24-36 而是整10的, 不知道为什么, 因为按照我设定每个分组是5, 而整体平均100/8=12.5而不应该是整10的, 具体不详.

  如果我们要跟其它元素进行交互, 就只能把处理单元设置到跟数组一样大, 才能在一个块中处理:

using UnityEngine;
using Unity.Collections;
using Unity.Jobs;


public class IJobParallelForSample01 : MonoBehaviour
{
    struct VelocityJob : IJobParallelFor
    {
        public NativeArray<int> datas;

        public void Execute(int index)
        {
            if(index > 0 && index < datas.Length - 1)
            {
                datas[index] = datas[datas.Length - 1];
            }
        }
    }

    public void Test()
    {
        var datas = new NativeArray<int>(10, Allocator.Persistent);
        for(int i = 0; i < datas.Length; i++)
        {
            datas[i] = i;
        }
        var job = new VelocityJob()
        {
            datas = datas
        };

        var jobHandle = job.Schedule(datas.Length, datas.Length);
        JobHandle.ScheduleBatchedJobs();

        jobHandle.Complete();
        Debug .Log(datas[0]);

        datas.Dispose();
    }

    private void OnGUI()
    {
        if(GUI.Button(new Rect(100, 100, 100, 50), "Start Test"))
        {
            Test();
        }
    }
}

Unity Job System第4张

  顺便测试一下各个线程的分配情况:

    private volatile static Dictionary<int, List<int>> ms_threads = new Dictionary<int, List<int>>();

    struct VelocityJob : IJobParallelFor
    {
        public NativeArray<int> datas;

        public void Execute(int index)
        {
            Debug.Log(index + " : " + System.Threading.Thread.CurrentThread.ManagedThreadId);
            lock(ms_threads)
            {
                List<int> val = null;
                ms_threads.TryGetValue(System.Threading.Thread.CurrentThread.ManagedThreadId, out val);
                if(val == null)
                {
                    val = new List<int>();
                    ms_threads[System.Threading.Thread.CurrentThread.ManagedThreadId] = val;
                }
                val.Add(index);
            }
        }
    }
        var jobHandle = job.Schedule(100, 5);

  结果是分为8个线程, 4个线程的块为10, 4个为15

Unity Job System第5张Unity Job System第6张

  所以不能想当然的去获取其它Index的内容, 毕竟分块逻辑不一定.

免责声明:文章转载自《Unity Job System》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇SqlServer2005到OracleSpringMVC学习笔记3下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

Unity推荐设置(HoloLens开发系列)

本文翻译自:Recommended settings for Unity   Unity提供了一系列默认选项,这些选项能够适用于所有平台的一般情况。但是,Unity同样为HoloLens提供了一些特殊行为,这些行为可以通过项目设置来改变。 本文内容 1 全息启动画面 2 追踪丢失(Tracking loss) 3 功能 4 了解更多   全息启动画面...

Qt 学习之路 2(75):线程总结

前面我们已经详细介绍过有关线程的一些值得注意的事项。现在我们开始对线程做一些总结。 有关线程,你可以做的是: 在QThread子类添加信号。这是绝对安全的,并且也是正确的(前面我们已经详细介绍过,发送者的线程依附性没有关系) 不应该做的是: 调用moveToThread(this)函数 指定连接类型:这通常意味着你正在做错误的事情,比如将QThread控...

Docker之构建上下文详解

昨天写了使用 Dockerfile 定制镜像。其中构建上下文这一块没有写,今天把这一块单独拿出来写一下。 Docker镜像构建 简单说下构建镜像步骤: cd Dockerfile 所在目录; 执行 docker build 构建命令: docker build -t imageName:imageTag . 通过上面的工作流,很容易形成这样的理解误...

Java并发机制和底层实现原理

  Java代码在编译后会变成Java字节码,字节码被类加载器加载到JVM里,JVM执行字节码转化为汇编指令在CPU上执行。Java中的并发机制依赖于JVM的实现和CPU的指令。      Java语言规范第三版中对volatile的定义如下:Java编程语言允许线程访问共享变量,为了确保共享变量能被准确和一致的更新,线程应该确保通过排它锁单独获得这个变量...

Git分支学习简记

简介 开始过了两遍Git的内容,第二天就已经忘记了分支(branch)的概念,开始还觉得不太用的到。然后又看了第二遍,才发现为什么大家说这个是Git里边极其重要的一个东西。 所谓branch,就类似于树的枝干,有一个主干,在Git里成为master,意思也很好理解;这个是必须存在的,然后你可以分出去其他的树干(但是都不是主干)。像树的分枝都会回到主干那里一...

Nodejs事件引擎libuv源码剖析之:高效线程池(threadpool)的实现

     声明:本文为原创博文,转载请注明出处。      Nodejs编程是全异步的,这就意味着我们不必每次都阻塞等待该次操作的结果,而事件完成(就绪)时会主动回调通知我们。在网络编程中,一般都是基于Reactor线程模型的变种,无论其怎么演化,其核心组件都包含了Reactor实例(提供事件注册、注销、通知功能)、多路复用器(由操作系统提供,比如kque...