受欢迎的博客标签

多核CPU并行编程:.net core 多核CPU并行编程

Published

背景

单核CPU时代:早期的计算机时钟频率较低,1985年intel 80386 工作频率只有20 MHZ,提升CPU 核心的时钟频率带来的性能收益更大,到2006年Intel Core 2 处理器已经能够达到3.5 GHZ 的工作频率了。从2007年开始,CPU 时钟频率的提升就变得缓慢了,主要因为CPU 的功耗随时钟频率呈幂律增长,需要根据散热技术和制程工艺在性能与功耗间寻求平衡,CPU 时钟频率提升有限了。

多核CPU时代:CPU 是用来处理计算任务的,想要在单位时间内处理更多的计算任务,除了提升单核心的时钟频率让其计算的更快之外,还可以增加CPU 核心数,让多个CPU 核心协同计算,CPU 开始往多核心方向发展,到2019年AMD EPYC 2 代已经达到64核心128线程了。为了充分发挥多核心CPU 的性能,操作系统和编程语言对并发执行的支持越来越好,各种编程语言也陆续提供了并发编程的函数库,比如C++11 就新增了并发编程的线程支持库。我们想要让多核CPU 更好的发挥性能,更高效的为我们的程序服务,掌握并发编程思想还是很有必要的。

2006年是双核的普及年,双核处理器出货量开始超过单核处理器出货量。

每台电脑都有一个多核心的 CPU,允许它并行执行多个指令.

 

并发

在单核时代,多线程就有很广泛的应用,这时候多线程大多用于降低阻塞(意思是类似于

while(1)

{

if(flag==1)

break;

sleep(1);

}

这样的代码)带来的CPU资源闲置,注意这里没有浪费CPU资源,去掉sleep(1)就是纯浪费了。

阻 塞一般是等待IO操作(磁盘,数据库,网络等等)时发生。此时如果单线程,CPU例如一个IO操作要耗时10毫秒,CPU就会被阻塞接近10毫秒。

所以这种耗时的IO操作就用一个线程Thread去代为执行,创建这个线程的函数(代码)部分不会被IO操作阻塞,继续干这个程序中其他的事情,而不是干等待(或者去执行其他程序)。

在单核时代,多线程的这个消除阻塞的作用叫做“并发”,和并行是有本质的不同。并发是“伪并行”,看似并行,而实际上还是一个CPU在 执行一切事物,只是切换的太快,我们没法察觉罢了。例如基于UI的程序(俗话说就是图形界面),如果你点一个按钮触发的事件需要执行10秒钟,那么这个程 序就会假死,因为程序在忙着执行,没空搭理用户的其他操作。

 

并行和并发

https://docs.microsoft.com/en-us/dotnet/api/system.threading.tasks?view=net-5.0

如何使用并行编程

├──1. System.Threading.Tasks                                       // System.Threading.Tasks Namespace
│   ├── 1.1 Task Class(System.Threading.Tasks.Task)                             // System.Threading.Tasks.Task see:https://docs.microsoft.com/en-us/dotnet/api/system.threading.tasks.task?view=net-5.0
│   │   ├── common                              // 公共组件
│   │   │   ├── alertTip.vue                    // 弹出框组件
│   │   │   ├── buyCart.vue                     // 购物车组件
│   │   │   ├── computeTime.vue                 // 倒计时组件
│   │   │   ├── loading.vue                     // 页面初始化加载数据的动画组件
│   │   │   ├── mixin.js                        // 组件混合(包括:指令-下拉加载更多,处理图片地址)
│   │   │   ├── ratingStar.vue                  // 评论的五颗星组件
│   │   │   └── shoplist.vue                    // msite和shop页面的餐馆列表公共组件
│   │   ├── footer
│   │   │   └── footGuide.vue                   // 底部公共组件
│   │   └── header
│   │       └── head.vue                        // 头部公共组件
│   ├──1.2 Parallel Class(System.Threading.Tasks.Parallel)                                // System.Threading.Tasks.Parallel  see:https://docs.microsoft.com/en-us/dotnet/api/system.threading.tasks.parallel?view=net-5.0
│   │   ├── Parallel.For                             // 环境切换配置
│   │   ├── Parallel.ForEach                         // 获取数据
│   │   └── Parallel.Invoke
├── 2.System.Linq.ParallelQuery                                      // System.Linq.ParallelQuery<TSource> Namespace
│   ├── PLINQ     class                       // System.Threading.Tasks.Task
│   │   ├── common                              // 公共组件
│   │   │   ├── alertTip.vue                    // 弹出框组件
│   │   │   ├── buyCart.vue                     // 购物车组件
│   │   │   ├── computeTime.vue                 // 倒计时组件
│   │   │   ├── loading.vue                     // 页面初始化加载数据的动画组件
│   │   │   ├── mixin.js                        // 组件混合(包括:指令-下拉加载更多,处理图片地址)
│   │   │   ├── ratingStar.vue                  // 评论的五颗星组件
│   │   │   └── shoplist.vue                    // msite和shop页面的餐馆列表公共组件
│   │   ├── footer
│   │   │   └── footGuide.vue                   // 底部公共组件
│   │   └── header
│   │       └── head.vue                        // 头部公共组件

1.Task Parallel Library

数据并行、任务并行和流水线

Task Parallel Library(任务并行库),简称TPL。TPL主要覆盖了三大使用场景,数据并行、任务并行和流水线.TPL是System.Threading和System.Threading.Tasks命名空间中的一组公共类型和API.TPL以其高度的封装特性,隐藏了并行编程里复杂的处理,使得开发人员可以以较低的门槛进行并行编程。

数据并行:这种场景在于有大量数据需要处理,而且对每一份数据都要执行的同样的操作。

任务并行:有很多相对独立的不同操作,使用任务并行。

void System.Threading.Tasks.Parallel.Invoke(WatchMovie, HaveDinner, ReadBook, WriteBlog);

Parallel.Invoke方法只有在4个方法全部完成之后才会返回。它至少需要4个硬件线程才足以让这4个方法并发运行。

流水线:流水线是以上两种场景的结合,这个也是最复杂最难处理的场景,因为这里面涉及到多个并发的任务进行协调处理。(detail:https://blog.csdn.net/sD7O95O/article/details/83005796)

1.就算你确定是cpu 密集的 也要注意并行度,最好不要超过你的核心数n多倍。

2.对于cpu密集的操作控制并行度可以用 Parallel 的参数。

3.如果其中有IO操作 最好用 TPL.Dataflow 库中的ActionBloack<T>等来控制异步操作并行度

1.1Task

System.Threading.Tasks.Task

c#多核编程的简单使用,其实主要是Task类使用.

创建Task

创建Task的方法有两种,一种是直接创建——new一个出来,一种是通过工厂创建。下面来看一下这两种创建方法:

//第一种创建方式,直接实例化
         var task1 = new Task(() =>
         {
            //TODO you code
         });

 

//第二种创建方式,工厂创建
         var task2 = Task.Factory.StartNew(() =>
         {
            //TODO you code
         });

 

 

异步编程的核心是 Task 和 Task<T> 对象,这两个对象对异步操作建模。 它们受关键字 async 和 await 的支持。 在大多数情况下模型十分简单:

对于 I/O 绑定代码,等待一个在 async 方法中返回 Task 或 Task<T> 的操作。
对于 CPU 绑定代码,等待一个使用 Task.Run 方法在后台线程启动的操作

 

1.2.System.Threading.Tasks.Parallel类

虽然Parallel类在System.Threading.Tasks命名空间下,但是创建并行代码不一定要直接使用Task类的实例,事实上我们可以直接使用Parallel静态类所提供的方法。

Parallel.For:为固定数目的独立For循环迭代提供了负载均衡式的并行执行

三个方法的示例代码:

//使用Parallel.For并发来计算一个目录下所有文件体积总和
long totalSize = 0;
String[] files = Directory.GetFiles(path);
Parallel.For(0, files.Length,
               index => { FileInfo fi = new FileInfo(files[index]);
                          long size = fi.Length;
                          Interlocked.Add(ref totalSize, size);
               } );

//使用Parallel.ForEach并发来计算一个目录下所有文件体积总和
long totalSize = 0;
String[] files = Directory.GetFiles(path);
Parallel.ForEach(files, {currentFile}=> { 
                          FileInfo fi = new FileInfo(currentFile);
                          long size = fi.Length;
                          Interlocked.Add(ref totalSize, size);
               } );

//使用Parallel.Invoke并发调用多个任务
Parallel.Invoke(
    () =>
    {
        Console.WriteLine("Begin first task...");
    }, 

    () =>
    {
        Console.WriteLine("Begin second task...");
    }, 

    () =>
    {
        Console.WriteLine("Begin third task...");
    } 
);

 

 Prerequisites
This sample is written in C# and targets .NET Core 3.1. It requires the .NET Core 3.1 SDK.

ParallelEnumerable.Range
Parallel.For
Parallel.ForEach
Partitioner.Create
Parallel.Invoke

 

Parallel For

c# parallel  display  thread

see an example for a better understanding of the above two types of for loop in C#:
using System;
using System.Threading;  //for thread = {Thread.CurrentThread.ManagedThreadId}
using System.Threading.Tasks;
namespace ParallelProgrammingDemo
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("C# For Loop");
            int number = 10;
            for (int count = 0; count < number; count++)
            {
                //Thread.CurrentThread.ManagedThreadId returns an integer that 
                //represents a unique identifier for the current managed thread.
                Console.WriteLine($"value of count = {count}, thread = {Thread.CurrentThread.ManagedThreadId}");
                //Sleep the loop for 10 miliseconds
                Thread.Sleep(10);
            }
            Console.WriteLine();
            Console.WriteLine("Parallel For Loop");
            Parallel.For(0, number, count =>
            {
                Console.WriteLine($"value of count = {count}, thread = {Thread.CurrentThread.ManagedThreadId}");
                //Sleep the loop for 10 miliseconds
                Thread.Sleep(10);
            });
            Console.ReadLine();
        }
    }
}

output

c# parallel with result

using System;
using System.Threading.Tasks;

public class Example
{
   public static void Main()
   {
     ParallelLoopResult result = Parallel.For(0, 100, ctr =>
            {
                Random rnd = new Random(ctr * 100000);
                Byte[] bytes = new Byte[100];
                rnd.NextBytes(bytes);
                int sum = 0;
                foreach (var byt in bytes)
                    sum += byt;
                Console.WriteLine("Iteration {0,2}: {1:N0}", ctr, sum);
            });
            Console.WriteLine("Result: {0}", result.IsCompleted ? "Completed Normally" :  String.Format("Completed to {0}", result.LowestBreakIteration));
   }
}

output

Iteration 95: 11,957
      Iteration 96: 12,455
      Result: Completed Normally

由每个线程独立计算线程内迭代产生的素数和,最后再对几个和求和。

using System;
using System.Threading.Tasks;
using System.Collections;
using System.Collections.Generic;
using System.Threading;
using System.Diagnostics;

/// <summary>
/// 利用并行编程库Parallel,计算100000内素数的个数
/// </summary>
namespace Paralleler
{
    class Program
    {
        static void Main(string[] args)
        {
            Stopwatch sw = new Stopwatch();

            sw.Start();
            ShareMemory();
            sw.Stop();
            Console.WriteLine($"优化后的共享内存并发模型耗时:{sw.Elapsed}");
        }

        static void ShareMemory()
        {
            var sum = 0;
            Parallel.For(1, 100000 + 1, () => 0, (x, state, local) =>
            {
                var f = true;
                if (x == 1)
                    f = false;
                for (int i = 2; i <= x / 2; i++)
                {
                    if (x % i == 0)  // 被[2,x/2]任一数字整除,就不是质数
                        f = false;
                }
                if (f == true)
                    local++;
                return local;
            },
                 local =>
                 {
                     Interlocked.Add(ref sum, local);
                 }
               );
            Console.WriteLine($"1-100000内质数的个数是{sum}");
        }
    }
}

1.3.PLINQ

System.Linq.ParallelQuery<TSource>类
并行LINQ (PLINQ)
并行LINQ (PLINQ) 是 Task Parallel Library 的替代方案。它很大程度上依赖于 LINQ(语言集成查询)功能。对于在大集合中执行相同的昂贵操作的场景是很有用的。与所有操作都是顺序执行的普通 LINQ to Objects 不同的是,PLINQ可以在多个CPU上并行执行这些操作。 发挥优势所需要的代码改动也是极小的:

1.4并行编程的数据结构

并发编程对集合的正确处理方式

1,集合和数组的线程都不是安全的,需要采用Lock关键字来确保每次只有一个线程来访问

写一个并行循环,这个循环以无序的方式向一个共享集合中添加元素,那么必须加入一个同步机制来保证这是一个线程安全的集合。

System.Collenctions和System.Collenctions.Generic 名称空间中所提供的经典列表、集合和数组的线程都不是安全的,不能接受并发请求,因此需要对相应的操作方法执行串行化。

需要采用Lock关键字,来确保每次只有一个线程来访问  _Products.Add(product); 这个方法.在 lock 语句中,只允许一个线程访问里面的代码块。它将阻止下一个尝试访问它的线程,直到前一个线程退出

static void Main(string[] args)
{
_Products = new List<Product>();
/*创建任务 t1 t1 执行 数据集合添加操作*/
Task t1 = Task.Factory.StartNew(() =>
{
AddProducts();
});
/*创建任务 t2 t2 执行 数据集合添加操作*/
Task t2 = Task.Factory.StartNew(() =>
{
AddProducts();
});
}

 

private static object o = new object();

/*执行集合数据添加操作*/
static void AddProducts()
{
Parallel.For(0, 1000, (i) =>
{
Product product = new Product();
product.Name = "name" + i;
product.Category = "Category" + i;
product.SellPrice = i;
lock (o)
{
_Products.Add(product);
}
});

}
}

其他可选同步基元

其他同步基元
Monitor 只是 .NET Core 中众多同步基元的一员。根据实际情况,其他基元可能更适合。

Mutex 是 Monitor 更重量级的版本,依赖于底层的操作系统,提供跨多个进程同步访问资源[1], 是针对 Mutex 进行同步的推荐替代方案。

SemaphoreSlim 和 Semaphore 可以限制同时访问资源的最大线程数量,而不是像 Monitor 一样只能限制一个线程。 SemaphoreSlim 比 Semaphore 更轻量,但仅限于单个进程。如果可能,您最好使用 SemaphoreSlim 而不是 Semaphore。

ReaderWriterLockSlim 可以区分两种对访问资源的方式。它允许无限数量的读取器 (readers) 同时访问资源,并且限制同时只允许一个写入器 (writers) 访问锁定资源。读取时线程安全,但修改数据时需要独占资源,很好地保护了资源。

AutoResetEvent、ManualResetEvent 和 ManualResetEventSlim 将堵塞传入的线程,直到它们接收到一个信号 (即调用 Set() )。然后等待中的线程将继续执行。AutoResetEvent 在下一次调用 Set() 之前,将一直阻塞,并只允许一个线程继续执行。ManualResetEvent 和 ManualResetEventSlim 不会堵塞线程,除非 Reset() 被调用。ManualResetEventSlim 比前两者更轻量,更值得推荐。

Interlocked 提供一种选择——原子操作,这是替代 locking 和其他同步基元更好的选择(如果适用)

 

2.使用并发集合

当一个临界区需要确保对数据结构的原子访问时,用于并发访问的专用数据结构可能是更好和更有效的替代方案。例如,使用 ConcurrentDictionary 而不是 Dictionary,可以简化 lock 语句示例:

System.Collections.Concurrent新的命名空间可以访问用于解决线程安全问题,通过这个命名空间能访问以下为并发做好了准备的集合。

1.BlockingCollection 与经典的阻塞队列数据结构类似,能够适用于多个任务添加和删除数据,提供阻塞和限界能力。

2.ConcurrentBag 提供对象的线程安全的无序集合

3.ConcurrentDictionary  提供可有多个线程同时访问的键值对的线程安全集合

4.ConcurrentQueue   提供线程安全的先进先出集合

5.ConcurrentStack   提供线程安全的后进先出集合

这些集合通过使用比较并交换和内存屏障等技术,避免使用典型的互斥重量级的锁,从而保证线程安全和性能

var counters = new ConcurrentDictionary< int, int >();
 
counters.TryAdd(key, 0);
lock (syncObject)
{
    counters[key]++;
}

 

 

 

c#Task多核编程

http://www.codeproject.com/Articles/362996/Multi-core-programming-using-Task-Parallel-Library

https://blog.csdn.net/xuwei_xuwei/article/details/18817859

.NET Core parallel computation of PI

https://docs.microsoft.com/en-us/samples/dotnet/samples/parallel-programming-compute-pi-cs/

在 .NET Core 中的并发编程

https://blog.csdn.net/sD7O95O/article/details/79128901

异步编程

https://docs.microsoft.com/zh-cn/dotnet/csharp/async