黄a在线观看-黄a在线-黄a大片-黄色片在线看-黄色毛片免费-黄色大片网站

您的位置:首頁技術文章
文章詳情頁

ASP.NET Core WebSocket集群實現思路詳解

瀏覽:285日期:2022-06-09 10:01:54
目錄
  • 前言
  • 實現
  • nginx配置
  • 一對一發送
  • 群組發送
  • 發送所有人
  • 整合到一起
  • 一對一處理
  • 群組處理
  • 全員消息處理
  • 示例源碼
  • 總結

前言

提到WebSocket相信大家都聽說過,它的初衷是為了解決客戶端瀏覽器與服務端進行雙向通信,是在單個TCP連接上進行全雙工通訊的協議。在沒有WebSocket之前只能通過瀏覽器到服務端的請求應答模式比如輪詢,來實現服務端的變更響應到客戶端,現在服務端也可以主動發送數據到客戶端瀏覽器。WebSocket協議和Http協議平行,都屬于TCP/IP四層模型中的第四層應用層。由于WebSocket握手階段采用HTTP協議,所以也需要進行跨域處理。它的協議標識是wswss對應了常規標識和安全通信協議標識。本文重點并不是介紹WebSocket協議相關,而是提供一種基于ASP.NET Core原生WebSocket的方式實現集群的實現思路。關于這套思路其實很早之前我就構思過了,只是之前一直沒有系統的整理出來,本篇文章就來和大家分享一下,由于主要是提供一種思路,所以涉及到具體細節或者業務相關的可能沒有體現出來,還望大家理解。

實現

咱們的重點關鍵字就是兩個WebSocket集群,實現的框架便是基于ASP.NET Core,我也基于golang實現了一套,本文涉及到的相關源碼和golang版本的實現都已上傳至我的github,具體倉庫地址可以轉到文末自行跳轉到#示例源碼中查看。既然涉及到集群,這里咱們就用nginx作為反向代理,來搭建一個集群實例。大致的示例結構如下圖所示

redis在這里扮演的角色呢,是用來處理Server端的消息相互傳遞用的,主要是使用的redis的pub/sub功能來實現的,這里便涉及到幾個核心問題

  • 首先,集群狀態每個用戶被分發到具體的哪臺服務器上是不得而知的
  • 其次,處在不同Server端的不同用戶間的相互通信是需要一個傳遞媒介
  • 最后,針對不同的場景比如單發消息、分組消息、全部通知等要有不同的處理策略

這里需要考慮的是,如果需要搭建實時通信服務器的話,需要注意集群的隔離性,主要是和核心業務進行隔離,畢竟WebSocket需要保持長鏈接、且消息的大小需要評估。

上面提到了redis的主要功能就是用來傳遞消息用的,畢竟每個server服務器是無狀態的。這當然不是必須的,任何可以進行消息分發的中間件都可以,比如消息隊列rabbitmq、kafka、rocketmq、mqtt等,甚至只要能把要處理的消息存儲起來都可以比如緩存甚至是關系型數據庫等等。這壓力使用redis主要是因為操作起來簡單、輕量級、靈活,讓大家關注點在思路上,而不是使用中案件的代碼上。

nginx配置

通過上面的圖我們可以看到,我們這里構建集群示例使用的nginx,如果讓nginx支持WebSocket的話,需要額外的配置,這個在網上有很多相關的文章介紹,這里就來列一下咱們示例的nginx配置,在配置文件nginx.conf

//上游服務器地址也就是websocket服務的真實地址upstream wsbackend {    server 127.0.0.1:5001;    server 127.0.0.1:5678;}server {    listen       5000;    server_name  localhost;    location ~/chat/{//upstream地址proxy_pass http://wsbackend;proxy_connect_timeout 60s; proxy_read_timeout 3600s;proxy_send_timeout 3600s;//記得轉發避免踩坑proxy_set_header Host $host;proxy_http_version 1.1; //http升級成websocket協議的頭標識proxy_set_header Upgrade $http_upgrade;proxy_set_header Connection "Upgrade";    }}

這套配置呢,在搜索引擎上能收到很多,不過不妨礙我把使用的粘貼出來。這一套親測有效,也是我使用的配置,請放心使用。個人認為如果是線上環境采用的負載均衡策略可以選擇ip_hash的方式,保證同一個ip的客戶端用戶可以分發到一臺WebSocket實例中去,這樣的話能盡量避免使用redis的用戶頻道做消息傳遞。好了,接下來準備開始展示具體實現的代碼了。

一對一發送

首先介紹的就是一對一發送的情況,也就是我把消息發給你,聊天的時候私聊的情況。這里呢涉及到兩種情況

  • 如果你需要通信的客戶端和你連接在一個Server端里,這樣的話可以直接在鏈接里找到這個端的通信實例直接發送。
  • 如果你需要通信的客戶端和你不在一個Server端里,這個時候咱們就需要借助redis的pub/sub的功能,把消息傳遞給另一個Server端。

咱們通過一張圖大致的展示一下它的工作方式

解釋一下,每個客戶端注冊到WebSocket服務里的時候會在redis里訂閱一個user:用戶唯一標識的頻道,這個頻道用于接收和當前WebSocket連接不在一個服務端的其他WebSocket發送過來的消息。

每次發送消息的時候你會知道你要發送給誰,不在當前服務器的話則發送到redis的user:用戶唯一標識頻道,這樣的話目標WebSocket就能收到消息了。

首先是注入相關的依賴項,這里我使用的redis客戶端是freeredis,主要是因為操作起來簡單,具體實現代碼如下

var builder = WebApplication.CreateBuilder(args);//注冊freeredisbuilder.Services.AddSingleton(provider => {    var logger = provider.GetService<ILogger<WebSocketChannelHandler>>();    RedisClient cli = new RedisClient("127.0.0.1:6379");    cli.Notice += (s, e) => logger?.LogInformation(e.Log);    return cli;});//注冊WebSocket具體操作的類builder.Services.AddSingleton<WebSocketHandler>();builder.Services.AddControllers();var app = builder.Build();var webSocketOptions = new WebSocketOptions{    KeepAliveInterval = TimeSpan.FromMinutes(2)};//注冊WebSocket中間件app.UseWebSockets(webSocketOptions);app.MapGet("/", () => "Hello World!");app.MapControllers();app.Run();

接下來我們定義一個Controller用來處理WebSocket請求

public class WebSocketController : ControllerBase{    private readonly ILogger<WebSocketController> _logger;    private readonly WebSocketHandler _socketHandler;    public WebSocketController(ILogger<WebSocketController> logger, WebSocketHandler socketHandler, WebSocketChannelHandler webSocketChannelHandler)    {_logger = logger;_socketHandler = socketHandler;    }    //這里的id代表當前連接的客戶端唯一標識比如用戶唯一標識    [HttpGet("/chat/user/{id}")]    public async Task ChatUser(string id)    {//判斷是否是WebSocket請求if (HttpContext.WebSockets.IsWebSocketRequest){    _logger.LogInformation($"user:{id}-{Request.HttpContext.Connection.RemoteIpAddress}:{Request.HttpContext.Connection.RemotePort} join");    var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync();    //處理請求相關    await _socketHandler.Handle(id, webSocket);}else{    HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest;}    }}

這里的WebSocketHandler是用來處理具體邏輯用的,咱們看一下相關代碼

public class WebSocketHandler:IDisposable{    //存儲當前服務用戶的集合    private readonly UserConnection UserConnection = new();    //redis頻道前綴    private readonly string userPrefix = "user:";    //用戶對應的redis頻道    private readonly ConcurrentDictionary<string, IDisposable> _disposables = new();    private readonly ILogger<WebSocketHandler> _logger;    //redis客戶端    private readonly RedisClient _redisClient;    public WebSocketHandler(ILogger<WebSocketHandler> logger, RedisClient redisClient)    {_logger = logger;_redisClient = redisClient;    }    public async Task Handle(string id, WebSocket webSocket)    {//把當前用戶連接存儲起來_ = UserConnection.GetOrAdd(id, webSocket);//訂閱一個當前用戶的頻道await SubMsg($"{userPrefix}{id}");var buffer = new byte[1024 * 4];//接收發送過來的消息,這個方法是阻塞的,如果沒收到消息則一直阻塞var receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);//循環接收消息while (webSocket.State == WebSocketState.Open){    try    {//因為緩沖區長度是固定的所以要獲取實際長度string msg = Encoding.UTF8.GetString(buffer[..receiveResult.Count]).TrimEnd("\0");//接收的到消息轉換成實體MsgBody msgBody = JsonConvert.DeserializeObject<MsgBody>(msg);//發送到其他客戶端的數據byte[] sendByte = Encoding.UTF8.GetBytes($"user {id} send:{msgBody.Msg}");_logger.LogInformation($"user {id} send:{msgBody.Msg}");//判斷目標客戶端是否在當前當前服務,如果在當前服務直接扎到目標連接直接發送if (UserConnection.TryGetValue(msgBody.Id, out var targetSocket)){    if (targetSocket.State == WebSocketState.Open)    {await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), receiveResult.MessageType, true, CancellationToken.None);    }}else{    //如果要發送的目標端不在當前服務,則發送給目標redis端的頻道    ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = id, ToId = msgBody.Id, Msg = msgBody.Msg };    //目標的redis頻道    _redisClient.Publish($"{userPrefix}{msgBody.Id}", JsonConvert.SerializeObject(channelMsgBody));}//繼續阻塞循環接收消息receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);    }    catch (Exception ex)    {_logger.LogError(ex, ex.Message);break;    }}//循環結束意味著當前端已經退出//從當前用戶的集合移除當前用戶_ = UserConnection.TryRemove(id, out _);//關閉當前WebSocket連接await webSocket.CloseAsync(receiveResult.CloseStatus.Value, receiveResult.CloseStatusDescription, CancellationToken.None);//在當前訂閱集合移除當前用戶_disposables.TryRemove($"{userPrefix}{id}", out var disposable);//關閉當前用戶的通道disposable.Dispose();    }    private async Task SubMsg(string channel)    {//訂閱當前用戶頻道var sub = _redisClient.Subscribe(channel,  async (channel, data) => {    //接收過來當前頻道數據,說明發送端不在當前服務    ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());    byte[] sendByte = Encoding.UTF8.GetBytes($"user {msgBody.FromId} send:{msgBody.Msg}");    //在當前服務找到目標的WebSocket連接并發送消息    if (UserConnection.TryGetValue(msgBody.ToId, out var targetSocket))    {if (targetSocket.State == WebSocketState.Open){    await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);}    }});//把redis訂閱頻道添加到集合中_disposables.TryAdd(channel, sub);    }    //程序退出的時候取消當前服務訂閱的redis頻道    public void Dispose()    {foreach (var disposable in _disposables){    disposable.Value.Dispose();}_disposables.Clear();    }}

這里涉及到幾個輔助相關的類,其中UserConnection類是存儲注冊到當前服務的連接,MsgBody類用來接受客戶端發送過來的消息,ChannelMsgBody是用來發送redis頻道的相關消息,因為要把相關消息通過redis發布出去,咱們列一下這幾個類的相關代碼

//注冊到當前服務的連接public class UserConnection : IEnumerable<KeyValuePair<string, WebSocket>>{    //存儲用戶唯一標識和WebSocket的對應關系    private ConcurrentDictionary<string, WebSocket> _users = new ConcurrentDictionary<string, WebSocket>();    //當前服務的用戶數量    public int Count => _users.Count;    public WebSocket GetOrAdd(string userId, WebSocket webSocket)    {return _users.GetOrAdd(userId, webSocket);    }    public bool TryGetValue(string userId, out WebSocket webSocket)    {return _users.TryGetValue(userId, out webSocket);    }    public bool TryRemove(string userId, out WebSocket webSocket)    {return _users.TryRemove(userId, out webSocket);    }    public void Clear()    {_users.Clear();    }    public IEnumerator<KeyValuePair<string, WebSocket>> GetEnumerator()    {return _users.GetEnumerator();    }    IEnumerator IEnumerable.GetEnumerator()    {return this.GetEnumerator();    }}//客戶端消息public class MsgBody{    //目標用戶標識    public string Id { get; set; }    //要發送的消息    public string Msg { get; set; }}//頻道訂閱消息public class ChannelMsgBody{    //用戶標識    public string FromId { get; set; }    //目標用戶標識,也就是要發送給誰    public string ToId { get; set; }    //要發送的消息    public string Msg { get; set; }}

這樣的話關于一對一發送消息的相關邏輯就實現完成了,啟動兩個Server端,由于nginx默認的負載均衡策略是輪詢,所以注冊兩個用戶的話會被分發到不同的服務里去

Postman連接三個連接唯一標識分別是1、2、3,模擬一下消息發送,效果如下,發送效果

接收效果

群組發送

上面我們展示了一對一發送的情況,接下來我們來看一下,群組發送的情況。群組發送的話就是只要大家都加入一個群組,只要客戶端在群組里發送一條消息,則注冊到當前群組內的所有客戶端都可以收到消息。相對于一對一的情況就是如果當前WebSocket服務端如果存在用戶加入某個群組,則當前當前WebSocket服務端則可以訂閱一個group:群組唯一標識的redis頻道,集群中的其他WebSocket服務器通過這個redis頻道接收群組消息,通過一張圖描述一下

群組的實現方式相對于一對一要簡單一點

  • 發送端可以不用考慮當前服務中的客戶端連接,一股腦的交給redis把消息發布出去
  • 如果有WebSocket服務中的用戶訂閱了當前分組則可以接受消息,獲取組內的用戶循環發送消息

展示一下代碼實現的方式,首先是定義一個action用于表示群組的相關場景

//包含兩個標識一個是組別標識一個是注冊到組別的用戶[HttpGet("/chat/group/{groupId}/{userId}")]public async Task ChatGroup(string groupId, string userId){    if (HttpContext.WebSockets.IsWebSocketRequest)    {_logger.LogInformation($"group:{groupId} user:{userId}-{Request.HttpContext.Connection.RemoteIpAddress}:{Request.HttpContext.Connection.RemotePort} join");var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync();//調用HandleGroup處理群組相關的消息await _socketHandler.HandleGroup(groupId, userId, webSocket);    }    else    {HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest;    }}

接下來看一下HandleGroup的相關邏輯,還是在WebSocketHandler類中,看一下代碼實現

public class WebSocketHandler:IDisposable{    private readonly UserConnection UserConnection = new();    private readonly GroupUser GroupUser = new();    private readonly SemaphoreSlim _lock = new(1, 1);    private readonly ConcurrentDictionary<string, IDisposable> _disposables = new();    private readonly string groupPrefix = "group:";    private readonly ILogger<WebSocketHandler> _logger;    private readonly RedisClient _redisClient;    public WebSocketHandler(ILogger<WebSocketHandler> logger, RedisClient redisClient)    {_logger = logger;_redisClient = redisClient;    }    public async Task HandleGroup(string groupId, string userId, WebSocket webSocket)    {//因為群組的集合可能會存在很多用戶一起訪問所以限制訪問數量await _lock.WaitAsync();//初始化群組容器 群唯一標識為key 群員容器為valuevar currentGroup = GroupUser.Groups.GetOrAdd(groupId, new UserConnection { });//當前用戶加入當前群組_ = currentGroup.GetOrAdd(userId, webSocket);//只有有當前WebSocket服務的第一個加入當前組的時候才去訂閱群組頻道//如果不限制的話則會出現如果當前WebSocket服務有多個用戶在一個組內則會重復收到redis消息if (currentGroup.Count == 1){    //訂閱redis頻道    await SubGroupMsg($"{groupPrefix}{groupId}");}_lock.Release();var buffer = new byte[1024 * 4];//阻塞接收WebSocket消息var receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);//服務不退出的話則一直等待接收while (webSocket.State == WebSocketState.Open){    try    {string msg = Encoding.UTF8.GetString(buffer[..receiveResult.Count]).TrimEnd("\0");_logger.LogInformation($"group 【{groupId}】 user 【{userId}】 send:{msg}");//組裝redis頻道發布的消息,目標為群組標識ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = userId, ToId = groupId, Msg = msg };//通過redis發布消息_redisClient.Publish($"{groupPrefix}{groupId}", JsonConvert.SerializeObject(channelMsgBody));receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);    }    catch (Exception ex)    {_logger.LogError(ex, ex.Message);break;    }}//如果客戶端退出則在當前群組集合刪除當前用戶_ = currentGroup.TryRemove(userId, out _);await webSocket.CloseAsync(receiveResult.CloseStatus.Value, receiveResult.CloseStatusDescription, CancellationToken.None);    }    private async Task SubGroupMsg(string channel)    {var sub = _redisClient.Subscribe(channel, async (channel, data) => {    ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());    byte[] sendByte = Encoding.UTF8.GetBytes($"group 【{msgBody.ToId}】 user 【{msgBody.FromId}】 send:{msgBody.Msg}");    //在當前WebSocket服務器找到當前群組里的用戶    GroupUser.Groups.TryGetValue(msgBody.ToId, out var currentGroup);    //循環當前WebSocket服務器里的用戶發送消息    foreach (var user in currentGroup)    {//不用給自己發送了if (user.Key == msgBody.FromId){    continue;}if (user.Value.State == WebSocketState.Open){    await user.Value.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);}    }});//把當前頻道加入訂閱集合_disposables.TryAdd(channel, sub);    }}

這里涉及到了GroupUser類,是來存儲群組和群組用戶的對應關系的,定義如下

public class GroupUser{    //key為群組的唯一標識    public ConcurrentDictionary<string, UserConnection> Groups = new ConcurrentDictionary<string, UserConnection>();}

演示一下把兩個用戶添加到一個群組內,然后發送接收消息的場景,用戶u1發送

用戶u2接收

發送所有人

發送給所有用戶的邏輯比較簡單,不用考慮到用戶限制,只要用戶連接到了WebSocket集群則都可以接收到這個消息,大致工作方式如下圖所示

這個比較簡單,咱們直接看實現代碼,首先是定義一個地址,用于發布消息

//把用戶注冊進去[HttpGet("/chat/all/{id}")]public async Task ChatAll(string id){    if (HttpContext.WebSockets.IsWebSocketRequest)    {_logger.LogInformation($"all user:{id}-{Request.HttpContext.Connection.RemoteIpAddress}:{Request.HttpContext.Connection.RemotePort} join");var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync();await _socketHandler.HandleAll(id, webSocket);    }    else    {HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest;    }}

具體的實現邏輯還是在HandleGroup類里,是HandleAll方法,看一下具體實現

public class WebSocketHandler:IDisposable{    private readonly UserConnection AllConnection = new();    private readonly ConcurrentDictionary<string, IDisposable> _disposables = new();    private readonly string all = "all";    private readonly ILogger<WebSocketHandler> _logger;    private readonly RedisClient _redisClient;    public WebSocketHandler(ILogger<WebSocketHandler> logger, RedisClient redisClient)    {_logger = logger;_redisClient = redisClient;    }    public async Task HandleAll(string id, WebSocket webSocket)    {await _lock.WaitAsync();//把用戶加入用戶集合_ = AllConnection.GetOrAdd(id, webSocket);//WebSocket集群中的每個服務只定義一次if (AllConnection.Count == 1){    await SubAllMsg(all);}_lock.Release();var buffer = new byte[1024 * 4];//阻塞接收信息var receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);while (webSocket.State == WebSocketState.Open){    try    {string msg = Encoding.UTF8.GetString(buffer[..receiveResult.Count]).TrimEnd("\0");_logger.LogInformation($"user {id} send:{msg}");//獲取接收信息ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = id, Msg = msg };//把消息通過redis發布到集群中的其他服務_redisClient.Publish(all, JsonConvert.SerializeObject(channelMsgBody));receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);    }    catch (Exception ex)    {_logger.LogError(ex, ex.Message);break;    }}//用戶退出則刪除集合中的當前用戶信息_ = AllConnection.TryRemove(id, out _);await webSocket.CloseAsync(receiveResult.CloseStatus.Value, receiveResult.CloseStatusDescription, CancellationToken.None);    }    private async Task SubAllMsg(string channel)    {var sub = _redisClient.Subscribe(channel, async (channel, data) => {    ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());    byte[] sendByte = Encoding.UTF8.GetBytes($"user 【{msgBody.FromId}】 send all:{msgBody.Msg}");    //接收到消息后遍歷用戶集合把消息發送給所有用戶    foreach (var user in AllConnection)    {   //如果包含當前用戶跳過if (user.Key == msgBody.FromId){    continue;}if (user.Value.State == WebSocketState.Open){    await user.Value.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);}    }});_disposables.TryAdd(channel, sub);    }}

效果在這里就不展示了,和群組的效果是類似的,只是一個是部分用戶,一個是全部的用戶。

整合到一起

上面我們分別展示了一對一、群組、所有人的場景,但是實際使用的時候,每個用戶只需要注冊到WebSocket集群一次也就是保持一個連接即可,而不是一對一一個連接、注冊群組一個連接、所有消息的時候一個連接。所以我們需要把上面的演示整合一下,一個用戶只需要連接到WebSocket集群一次即可,至于發送給誰,加入什么群組,接收全部消息等都是連接后通過一些標識區分的,而不必每個類型的操作都注冊一次,就和微信和QQ一樣我只要登錄了即可,至于其他操作都是靠數據標識區分的。接下來咱們就整合一下代碼達到這個效果,大致的思路是

  • 用戶連接到WebSocket集群,把用戶和連接保存到當前WebSocket服務器的用戶集合中去。
  • 一對一發送的時候,只需要在具體的服務器中找到具體的客戶端發送消息
  • 群組的時候,先把當前用戶標識加入群組集合即可,接收消息的時候根據群組集合里的用戶標識去用戶集合里去拿具體的WebSocket連接發送消息
  • 全員消息的時候,直接遍歷集群中的每個WebSocket服務里的用戶集合里的WebSocket連接訓話發送消息

這樣的話就保證了每個客戶端用戶在集群中只會綁定一個連接,首先還是單獨定義一個action,用于讓客戶端用戶連接上來,具體實現代碼如下所示

public class WebSocketChannelController : ControllerBase{    private readonly ILogger<WebSocketController> _logger;    private readonly WebSocketChannelHandler _webSocketChannelHandler;    public WebSocketChannelController(ILogger<WebSocketController> logger, WebSocketChannelHandler webSocketChannelHandler)    {_logger = logger;_webSocketChannelHandler = webSocketChannelHandler;    }    //只需要把當前用戶連接到服務即可    [HttpGet("/chat/channel/{id}")]    public async Task Channel(string id)    {if (HttpContext.WebSockets.IsWebSocketRequest){    _logger.LogInformation($"user:{id}-{Request.HttpContext.Connection.RemoteIpAddress}:{Request.HttpContext.Connection.RemotePort} join");    var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync();    await _webSocketChannelHandler.HandleChannel(id, webSocket);}else{    HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest;}    }}

接下來看一下WebSocketChannelHandler類的HandleChannel方法實現,用于處理不同的消息,比如一對一、群組、全員消息等不同類型的消息

public class WebSocketChannelHandler : IDisposable{    //用于存儲當前WebSocket服務器鏈接上來的所有用戶對應關系    private readonly UserConnection UserConnection = new();    //用于存儲群組和用戶關系,用戶集合采用HashSet保證每個用戶只加入一個群組一次    private readonly ConcurrentDictionary<string, HashSet<string>> GroupUser = new ConcurrentDictionary<string, HashSet<string>>();    private readonly SemaphoreSlim _lock = new(1, 1);    //存放redis訂閱實例    private readonly ConcurrentDictionary<string, IDisposable> _disposables = new();    //一對一redis頻道前綴    private readonly string userPrefix = "user:";    //群組redis頻道前綴    private readonly string groupPrefix = "group:";    //全員redis頻道    private readonly string all = "all";    private readonly ILogger<WebSocketHandler> _logger;    private readonly RedisClient _redisClient;    public WebSocketChannelHandler(ILogger<WebSocketHandler> logger, RedisClient redisClient)    {_logger = logger;_redisClient = redisClient;    }    public async Task HandleChannel(string id, WebSocket webSocket)    {await _lock.WaitAsync();//每次連接進來就添加到用戶集合_ = UserConnection.GetOrAdd(id, webSocket);//每個WebSocket服務實例只需要訂閱一次全員消息頻道await SubMsg($"{userPrefix}{id}");if (UserConnection.Count == 1){    await SubAllMsg(all);}_lock.Release();var buffer = new byte[1024 * 4];//接收客戶端消息var receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);while (webSocket.State == WebSocketState.Open){    try    {string msg = Encoding.UTF8.GetString(buffer[..receiveResult.Count]).TrimEnd("\0");//讀取客戶端消息ChannelData channelData = JsonConvert.DeserializeObject<ChannelData>(msg);//判斷消息類型switch (channelData.Method){    //一對一    case "One":await HandleOne(id, channelData.MsgBody, receiveResult);break;    //把用戶加入群組    case "UserGroup":await AddUserGroup(id, channelData.Group, webSocket);break;    //處理群組消息    case "Group":await HandleGroup(channelData.Group, id, webSocket, channelData.MsgBody);break;    //處理全員消息    default:await HandleAll(id, channelData.MsgBody);break;}receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);    }    catch (Exception ex)    {_logger.LogError(ex, ex.Message);break;    }}await webSocket.CloseAsync(receiveResult.CloseStatus.Value, receiveResult.CloseStatusDescription, CancellationToken.None);//在群組中移除當前用戶foreach (var users in GroupUser.Values){    lock (users)    {users.Remove(id);    }}//當前客戶端用戶退出則移除連接_ = UserConnection.TryRemove(id, out _);//取消用戶頻道訂閱_disposables.Remove($"{userPrefix}{id}", out var sub);sub?.Dispose();    }    public void Dispose()    {foreach (var disposable in _disposables){    disposable.Value.Dispose();}_disposables.Clear();    }}

這里涉及到了ChannelData類是用于接收客戶端消息的類模板,具體定義如下

public class ChannelData{    //消息類型 比如一對一 群組 全員    public string Method { get; set; }    //群組標識    public string Group { get; set; }    //消息體    public object MsgBody { get; set; }}

類中并不會包含當前用戶信息,因為連接到當前服務的時候已經提供了客戶端唯一標識。結合上面的處理代碼我們可以看出,客戶端用戶連接到WebSocket實例之后,先注冊當前用戶的redis訂閱頻道并且當前實例僅注冊一次全員消息的redis頻道,用于處理非當前實例注冊客戶端的一對一消息處理和全員消息處理,然后等待接收客戶端消息,根據客戶端消息的消息類型來判斷是進行一對一、群組、或者全員的消息類型處理,它的工作流程入下圖所示

由代碼和上面的流程圖可知,它根據不同的標識去處理不同類型的消息,接下來我們可以看下每種消息類型的處理方式。

一對一處理

首先是一對一的消息處理情況,看一下具體的處理邏輯,首先是一對一發布消息

 private async Task HandleOne(string id, object msg, WebSocketReceiveResult receiveResult) {    MsgBody msgBody = JsonConvert.DeserializeObject<MsgBody>(JsonConvert.SerializeObject(msg));    byte[] sendByte = Encoding.UTF8.GetBytes($"user {id} send:{msgBody.Msg}");    _logger.LogInformation($"user {id} send:{msgBody.Msg}");    //判斷目標用戶是否在當前WebSocket服務器    if (UserConnection.TryGetValue(msgBody.Id, out var targetSocket))    {if (targetSocket.State == WebSocketState.Open){    await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), receiveResult.MessageType, true, CancellationToken.None);}    }    else    {//如果不在當前服務器,則直接把消息發布到具體的用戶頻道去,由具體用戶去訂閱ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = id, ToId = msgBody.Id, Msg = msgBody.Msg };_redisClient.Publish($"{userPrefix}{msgBody.Id}", JsonConvert.SerializeObject(channelMsgBody));    }}

接下來是用于處理訂閱其他用戶發送過來消息的邏輯,這個和整合之前的邏輯是一致的,在當前服務器中找到用戶對應的連接,發送消息

private async Task SubMsg(string channel){    var sub = _redisClient.Subscribe(channel, async (channel, data) =>    {ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());byte[] sendByte = Encoding.UTF8.GetBytes($"user {msgBody.FromId} send:{msgBody.Msg}");if (UserConnection.TryGetValue(msgBody.ToId, out var targetSocket)){    if (targetSocket.State == WebSocketState.Open)    {await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);    }    else    {_ = UserConnection.TryRemove(msgBody.FromId, out _);    }}    });    //把訂閱實例加入集合    _disposables.TryAdd(channel, sub);}

如果給某個用戶發送消息則可以使用如下的消息格式

{"Method":"One", "MsgBody":{"Id":"2","Msg":"Hello"}}

Method為One代表著是私聊一對一的情況,消息體內Id為要發送給的具體用戶標識和消息體。

群組處理

接下來看群組處理方式,這個和之前的邏輯是有出入的,首先是用戶要先加入到某個群組然后才能接收群組消息或者在群組中發送消息,之前是一個用戶對應多個連接,整合了之后集群中每個用戶只關聯唯一的一個WebSocket連接,首先看用戶加入群組的邏輯

private async Task AddUserGroup(string user, string group, WebSocket webSocket){    //獲取群組信息    var currentGroup = GroupUser.GetOrAdd(group, new HashSet<string>());    lock (currentGroup)    {       //把用戶標識加入當前組_ = currentGroup.Add(user);    }    //每個組的redis頻道,在每臺WebSocket服務器實例只注冊一次訂閱    if (currentGroup.Count == 1)    {//訂閱當前組消息await SubGroupMsg($"{groupPrefix}{group}");    }    string addMsg = $"user 【{user}】 add  to group 【{group}】";    byte[] sendByte = Encoding.UTF8.GetBytes(addMsg);    await webSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);    //如果有用戶加入群組,則通知其他群成員    ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = user, ToId = group, Msg = addMsg };    _redisClient.Publish($"{groupPrefix}{group}", JsonConvert.SerializeObject(channelMsgBody));}

用戶想要在群組內發消息,則必須先加入到一個具體的群組內,具體的加入群組的格式如下

{"Method":"UserGroup", "Group":"g1"}

Method為UserGroup代表著用戶加入群組的業務類型,Group代表著你要加入的群組唯一標識。接下來就看下,用戶發送群組消息的邏輯了

private async Task HandleGroup(string groupId, string userId, WebSocket webSocket, object msgBody){    //判斷群組是否存在    var hasValue = GroupUser.TryGetValue(groupId, out var users);    if (!hasValue)    {byte[] sendByte = Encoding.UTF8.GetBytes($"group【{groupId}】 not exists");await webSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);return;    }    //只有加入到當前群組,才能在群組內發送消息    if (!users.Contains(userId))    {byte[] sendByte = Encoding.UTF8.GetBytes($"user 【{userId}】 not in 【{groupId}】");await webSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);return;    }    _logger.LogInformation($"group 【{groupId}】 user 【{userId}】 send:{msgBody}");    //發送群組消息    ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = userId, ToId = groupId, Msg = msgBody.ToString() };    _redisClient.Publish($"{groupPrefix}{groupId}", JsonConvert.SerializeObject(channelMsgBody));}

加入群組之后則可以發送和接收群組內的消息了,給群組發送消息的格式如下

{"Method":"Group", "Group":"g1", "MsgBody":"Hi All"}

Method為Group代表著用戶加入群組的業務類型,Group則代表你要發送到具體的群組的唯一標識,MsgBody則是發送到群組內的消息。最后再來看下訂閱群組內消息的情況,也就是處理群組消息的邏輯

private async Task SubGroupMsg(string channel){    var sub = _redisClient.Subscribe(channel, async (channel, data) =>    {//接收群組訂閱消息ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());byte[] sendByte = Encoding.UTF8.GetBytes($"group 【{msgBody.ToId}】 user 【{msgBody.FromId}】 send:{msgBody.Msg}");//獲取當前服務器實例中當前群組的所有用戶連接GroupUser.TryGetValue(msgBody.ToId, out var currentGroup);foreach (var user in currentGroup){    if (user == msgBody.FromId)    {continue;    }    //通過群組內的用戶標識去用戶集合獲取用戶集合里的用戶唯一連接發送消息    if (UserConnection.TryGetValue(user, out var targetSocket) && targetSocket.State == WebSocketState.Open)    {await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);    }    else    {currentGroup.Remove(user);    }}    });    _disposables.TryAdd(channel, sub);}

全員消息處理

全員消息處理相對來說思路比較簡單,因為當服務啟動的時候就會監聽redis的全員消息頻道,這樣的話具體的實現也就只包含發送和接收全員消息了,首先看一下全員消息發送的邏輯

private async Task HandleAll(string id, object msgBody){    _logger.LogInformation($"user {id} send:{msgBody}");    //直接給redis的全員頻道發送消息    ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = id, Msg = msgBody.ToString() };    _redisClient.Publish(all, JsonConvert.SerializeObject(channelMsgBody));}

全員消息的發送數據格式如下所示

{"Method":"All", "MsgBody":"Hello All"}

Method為All代表著全員消息類型,MsgBody則代表著具體消息。接收消息出里同樣很簡單,訂閱redis全員消息頻道,然后遍歷當前WebSocket服務器實例內的所有用戶獲取連接發送消息,具體邏輯如下

private async Task SubAllMsg(string channel){    var sub = _redisClient.Subscribe(channel, async (channel, data) =>    {ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());byte[] sendByte = Encoding.UTF8.GetBytes($"user 【{msgBody.FromId}】 send all:{msgBody.Msg}");//獲取當前服務器實例內所有用戶的連接foreach (var user in UserConnection){    //不給自己發送消息,因為發送的時候可以通過具體的業務代碼處理    if (user.Key == msgBody.FromId)    {continue;    }    //給每個用戶發送消息    if (user.Value.State == WebSocketState.Open)    {await user.Value.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);    }    else    {_ = UserConnection.TryRemove(user.Key, out _);    }}    });    _disposables.TryAdd(channel, sub);}

示例源碼

由于篇幅有限,沒辦法設計到全部的相關源碼,因此在這里貼出來github相關的地址,方便大家查看和運行源碼。相關的源碼我這里實現了兩個版本,一個是基于asp.net core的版本,一個是基于golang的版本。兩份源碼的實現思路是一致的,所以這兩份代碼可以運行在一套集群示例里,配置在一套nginx里,并且連接到同一個redis實例里即可

  • asp.net core源碼示例https://github.com/softlgl/WebsocketCluster
  • golang源碼示例https://github.com/softlgl/websocket-cluster

倉庫里還涉及到本人閑暇之余開源的其他倉庫,由于本人能力有限難登大雅之堂,就不做廣告了,有興趣的同學可以自行瀏覽一下。

總結

本文基于ASP.NET Core框架提供了一個基于WebSocket做集群的示例,由于思想是通用的,所以基于這個思路樓主也實現了golang版本。其實在之前就想自己動手搞一搞關于WebSocket集群方面的設計,本篇文章算是對之前想法的一個落地操作。其核心思路文章已經做了相關介紹,由于這些只是博主關于構思的實現,可能有很多細節尚未體現到,還希望大家多多理解。其核心思路總結一下

  • 首先是,利用可以構建WebSocket服務的框架,在當前服務實例中保存當前客戶端用戶和WebSocket的連接關系
  • 如果消息的目標客戶端不在當前服務器,可以利用redis頻道、消息隊列相關、甚至是數據庫類的共享回話發送的消息,由目標服務器獲取目標是否屬于自己的ws會話
  • 本文設計的思路使用的是無狀態的方式,即WebSocket服務實例之間不存在直接的消息通信和相互的服務地址存儲,當然也可以利用redis等存儲在線用戶信息等,這個可以參考具體業務自行設計

讀萬卷書,行萬里路。在這個時刻都在變化點的環境里,唯有不斷的進化自己,多接觸多嘗試不用的事物,多擴展自己的認知思維,方能構建自己的底層邏輯。畢竟越底層越抽象,越通用越抽象。面對未知的挑戰,自身作為自己堅強的后盾,可能才會讓自己更踏實。

以上就是ASP.NET Core WebSocket集群實現思路詳解的詳細內容,更多關于ASP.NET Core WebSocket的資料請關注其它相關文章!

標簽: ASP.NET
主站蜘蛛池模板: 息与子猛烈交尾一区二区 | 最新国产网址 | 国产午夜亚洲精品午夜鲁丝片 | 99年国精产品一二二区传媒 | jjzz日本女人 | 久久91视频 | 国产欧美一区二区三区鸳鸯浴 | 懂色在线 | 婷婷在线观看视频 | 亚洲大尺度无码无码专区 | 无码高潮爽到爆的喷水视频app | 国产亚洲va综合人人澡精品 | 国产精品无码久久综合网 | 在线观看老湿视频福利 | 啪啪免费网站 | 亚洲啪啪综合av一区 | 久久精品久久久精品美女 | 国产成年视频 | 亚洲最大色综合成人av | 亚洲精品午夜久久久 | 在线黄视频 | 国产精品男女啪啪 | 久久久麻豆 | 亚洲第一色网站 | 国产视频久久久久久久 | 妺妺窝人体色www聚色窝 | 国产精品综合一区二区三区 | 又白又嫩毛又多15p 国产热の有码热の无码视频 | 又色又爽又黄gif动态图 | 五月激情小说网 | 伊人久久精品无码麻豆一区 | 亚洲精品18 | 亚洲国产精品久久网午夜 | 免费观看黄色网址 | 国产伦精品一区二区三区在线 | 99久久精品免费看国产免费粉嫩 | 在线观看av网页 | 久操视频在线观看免费 | 在线a| 嫩草av影院 | 国产妇女馒头高清泬20p多 | 一级黄色片网址 | 欧美另类肥妇 | 欧美日韩精品一区二区天天拍小说 | 欧洲av在线免费观看 | 91精品国产综合久久久密臀九色 | 九色porny丨精品自拍视频 | 亚洲www.| 国产精品一区二区久久乐夜夜嗨 | 国产精品一区二区含羞草 | 黑鬼大战白妞高潮喷白浆 | 91pony九色丨交换 | 国产精品一区二区在线免费观看 | 青青草视频在线观看 | 7777奇米四色眼影国产馆 | 深夜福利成人 | 五月婷婷六月情 | 一区二区三区视频免费看 | 久久久亚洲一区二区三区 | 亚洲精品一区二区三区蜜桃 | 成人网站免费看黄a站视频 欧美性猛交xxxx黑人 | 亚洲码欧美码一区二区三区 | 国产成人精品视频一区二区不卡 | 亚洲免费不卡视频 | 国产主播喷水 | 一级成人欧美一区在线观看 | 欧美极品jiizzhd欧美爆 | www五月天com | 一起艹在线观看 | 雨宫琴音一区二区三区 | 国产日韩在线观看一区 | 日本黄页网站免费大全 | 91精品国产色综合久久 | 日日狠狠久久偷偷四色综合免费 | 欧美性猛交久久久乱大交小说 | 夜色综合网 | 午夜小视频在线播放 | 国产a三级 | 99久久婷婷国产精品综合 | 最新精品国偷自产在线下载 | 热久久伊人 | 娇小xxxx性开放国产精 | 午夜av亚洲翘臀国产精网 | 美女翘臀少妇啪啪呻吟流水 | 尤物97国产精品久久精品国产 | 五月天婷婷激情网 | 亚洲午夜久久久精品一区二区三区 | 国产精品亚洲精品一区二区三区 | 免费无码肉片在线观看 | 夜夜躁狠狠躁日日躁2020 | 久久99精品久久久大学生 | 一a级毛片| 精品国产乱码久久久久久浪潮 | 国产精品久久久久久久久久久久午夜 | 最新国产福利在线观看精品 | 国产视频精选 | 天堂草在线观看 | 成人综合婷婷国产精品久久 | 一区二区av在线 | 日韩精品91偷拍在线观看 | 看三级毛片| 国产精品久久香蕉免费播放 | 中文字幕久久熟女人妻av免费 | 国内乱子对白免费在线 | 精品少妇一区二区三区在线观看 | 青青草原成人网 | 农村少妇无套内谢粗又长 | 人妻无码一区二区三区免费 | 超碰免费在线观看 | 色婷婷综合在线 | 永久黄网站色视频免费直播 | 国产性猛交普通话对白 | 少妇白浆高潮无码免费区 | 777欧美| 韩国美女黄色片 | 国产精品suv一区二区三区 | 色播在线视频 | 99久久人妻精品免费一区 | 香港三日本三级少妇三99 | 国内精品人妻无码久久久影院导航 | 91色乱码一区二区三区 | 国产成人精品三级麻豆 | 亚洲色偷精品一区二区三区 | 狠狠色噜噜狠狠狠狠色综合网 | 亚洲精品乱码久久久久久久久久久久 | 日韩国产在线一区 | 亚色在线观看 | 大香伊人中文字幕精品 | 99这里只有精品 | 老司机一区二区 | 国产91 精品高潮白浆喷水 | 一区二区不卡在线 | 久久精品女人天堂av免费观看 | 免费国产黄色网址 | 男人添女人下部高潮视频 | 亚洲精品免费在线 | 蜜臀av性久久久久av蜜臀妖精 | 亚洲粉嫩美女无套露脸 | 国产片网址 | 国内视频一区二区三区 | 天天做夜夜爱 | 亚洲第一成年网 | 国产免费又爽又色又粗视频 | 樱花草在线播放免费中文 | 暖暖日本在线观看 | 嫩草视屏 | 成人免费视频软件网站 | 国产精品久久久久久久久久久免费看 | 亚洲一区福利视频 | 97国产精品 | 国产伦精品一区二区三区无广告 | 亚洲精选一区二区 | 好吊色欧美一区二区三区视频 | 91超碰在线免费观看 | 寂寞寡妇让我吃奶 | 天天射天天色天天干 | 欧美天堂色 | 国产成人欧美一区二区三区 | 中文天堂资源在线www | 成在线人免费无码高潮喷水 | 女同舌吻互慰一区二区 | 色网址在线 | 天天干天天射天天爽 | 97精品伊人久久久大香线蕉 | 高柳家在线观看 | 日本黄网在线观看 | 99久久久无码国产精品试看 | 日韩亚洲欧美中文在线 | jizz在线播放| 人妻互换一二三区激情视频 | 深夜视频在线免费观看 | 最近日本免费观看高清视频 | 日韩一级伦理片 | 国产精品18久久久久久首页狼 | 麻花豆传媒剧国产免费mv入口 | 黄色一级片在线免费观看 | 国自产拍偷拍精品啪啪一区二区 | 色噜噜狠狠色综合免费视频 | 一区二区不卡免费视频 | 久久午夜激情 | av国产传媒精品免费 | 不卡的av在线播放 | 亚洲国产理论片在线播放 | 91啪国产在线 | 国产精品女丝袜白丝袜 | 综合久久国产 | 国产精品久久久久久一区二区三区 | 亚洲精品视频二区 | 亚洲国产一区二区天堂 | 琪琪五月天| 蜜臀av首页 | 18中国性生交xxxxxhd | 97超级碰碰人国产在线观看 | 红杏成av人影院在线观看 | 妞干网av | 亚洲欧洲日韩一区二区三区 | www.av网址| 免费日本黄色 | 亚洲国产毛片aaaaa无费看 | 日韩伦理av| 97操碰 | 伊人五月天婷婷 | 狠狠操五月天 | 韩日少妇 | 国产三男一女4p免费男黑人 | 精品久久久久久久久久国产潘金莲 | 成人做爰视频www | 日韩精品极品视频在线观看免费 | 免费观看全黄做爰大片小说 | 久久久久久久av | 九九色| 少妇人妻偷人精品免费视频 | 国产97在线观看 | 久久人人爽人人爽人人片av卡 | av在线亚洲男人的天堂 | 毛片网站在线观看视频 | 好吊妞视频988gao免费 | 黑人超碰 | 久久久久久中文 | 丰满少妇又爽又紧又丰满在线观看 | 天天噜日日噜 | 欧美精品区 | 日韩精品一区二区三区免费视频 | 国产一区二区福利 | 国产一级二级日本在线 | 国产精品午夜小视频观看 | 蜜桃aaa | 精品国产a∨无码一区二区三区 | 国产精品一区在线 | 成人免费xxxxxxx | 欧美 日本 国产 | 永久www成人看片 | 国产精品无码一区二区三区不卡 | 99精品国产综合久久久久久 | 亚洲一区精品在线观看 | а√天堂8资源中文在线 | 国产手机在线αⅴ片无码观看 | 小鲜肉自慰网站 | 国产亚洲精品美女久久久 | 波多野结衣一区二区三区 | 亚洲国产成人久久 | av片亚洲| av免费网| 国内视频自拍 | 国产精品九色 | 亚洲日韩精品a∨片无码 | 性生交生活片1 | 中韩乱幕日产无线码一区 | 青青草成人影视 | 人人看人人乐 | 日本xxxx高潮少妇 | 中文字幕日韩精品亚洲一区小树林 | 久久影视传媒 | 久久久久爽爽爽爽一区老女人 | av网站免费看| 粉豆av| 好吊爽视频988gaocom | 国产成人精品免费视频 | 国产清纯白嫩美女正无套播放 | 国产在线观看免费 | 国产精品久久久久久久久久免 | 一本一道久久久a久久久精品91 | 韩国黄色网 | 粉嫩av久久一区二区三区 | 欧美激情va永久在线播放 | 久久久亚洲欧洲日产国码二区 | 精品福利一区 | 三级全黄做爰龚玥菲在线 | 国产无套水多在线观看 | 无码av中文一区二区三区桃花岛 | 国产福利一区二区三区视频 | 日本三级成本人网站 | 国产精品99久久精品爆乳 | 小镇姑娘国语版在线观看免费 | 国产中文字幕乱人伦在线观看 | 一级全黄少妇性色生活免费看 | 懂色av噜噜一区二区三区av88 | 精品无码人妻一区二区三区不卡 | 国产69精品久久久久999小说 | 欧美人妻精品一区二区三区 | 怡红院a∨人人爰人人爽 | 日韩一区2区 | 色婷婷av一区二区三区之一色屋 | 又粗又黄又硬又爽的免费视频 | 精品中文字幕在线 | 国产色视频一区二区三区qq号 | 超碰免费在线 | 色综合久久蜜芽国产精品 | 国产午夜久久久 | 瑜伽美女健身视频集锦 | 天天干狠狠操 | 手机国产乱子伦精品视频 | 精品久久久久久18免费网站 | 国产这里只有精品 | 天堂一码二码三码四码区乱码 | 久久久久爱| 国产成人一区二区三区别 | 一曲二曲三曲在线观看中文字幕动漫 | 免费日本视频 | 99久久久无码国产精品9 | 国产一级片网址 | 波多野结衣丝袜 | 欧美日韩在线视频一区二区三区 | 激情开心成人网 | 国产毛片一区二区精品 | 香蕉网在线播放 | 可以免费看毛片的网站 | 中文字幕在线观看av | 国产在线视频一区二区董小宛性色 | 吃奶呻吟打开双腿 | 无码中文av有码中文a | 性xxx法国hd极品 | 日韩在线精品成人av在线 | 国产激情啪啪 | 国产午夜精品久久精品电影 | 叼嘿视频在线免费观看 | 欧美国产在线视频 | 91抖音在线观看 | 亚洲性一区二区 | 夫の部长が调教中文字幕 | 在线播放日韩精品 | 亚洲视频网 | 日本一区二区三区在线视频 | 天堂中文视频在线 | www精品国产| 香蕉中文网| 久久久久国产精品嫩草影院 | 色婷婷88av视频一二三区 | 免费视频精品 | 国产又黄又大视频 | 欧洲熟妇色xxxx欧美老妇老头多毛 | 天堂网www网在线最新版 | 久草福利在线观看 | eeuss影院www在线窝窝 | 天天插天天射 | 18禁无遮挡免费视频网站 | 久久久久久a亚洲欧洲av | 成人午夜网 | 50路60路老熟妇啪啪 | 欧美 丝袜 自拍 制服 另类 | 亚洲欧洲日韩 | 一级a爰片久久毛片 | 风韵犹存少妇69xx视频 | 日韩精品视频免费 | 91看片淫黄大片一级在线观看 | 叼嘿视频在线免费观看 | 热久久这里只有精品 | 久久综合给久久狠狠97色 | 日欧137片内射在线视频播放 | 亚洲精品欧美精品 | 日本高清视频网站www | 日本www免费 | 日本理伦少妇4做爰 | 国产福利91 | 天天躁夜夜躁狠狠躁2020色戒 | 国产黑色丝袜在线视频 | 欧美视频中文在线看 | 人妻少妇精品视频专区 | 国产妇女乱码一区二区三区 | 第一区免费在线观看 | 欧美精品一区二区三区蜜臀 | 麻豆精品影院 | 久久久精品影视 | 四色网址 | 国产五区| 俺也去av | 国产无遮挡又黄又爽又色 | 中文字幕乱码亚洲无线码小说 | 免费在线日韩av | 嫩草影院黄色 | 国产一区欧美 | 欧美日本一区 | 国产成人福利av综合导航 | 亚洲啪啪少妇裸体艺术 | 一级视频毛片 | 少妇性xxxx性开放黄色 | 色翁荡息又大又硬又粗视频 | 国产一区二区中文字幕 | 欧美性生活视频 | 无码人妻aⅴ一区二区三区69岛 | 91精品情国产情侣高潮对白文档 | 又粗又硬又大又爽免费视频播放 | 亚洲色爱图小说专区 | 妺妺窝人体色www聚色窝仙踪 | 蜜桃av噜噜一区二区三区 | 国产精品嫩草影院久久久 | 国产精品嫩草影院入口日本一区二 | 国产精品久久久久久久久久妞妞 | 国产成人精品久久二区二区91 | 中文字幕在线日韩 | 青青草久久 | h片在线观看视频 | 丰满少妇熟乱xxxxx视频 | 黄色.com| 99黄色网| 中文在线www | 日韩欧美一区在线观看 | 国产精品视频看看 | 西西午夜无码大胆啪啪国模 | 欧美大荫蒂xxx | 久久99精品久久久久久噜噜 | 国产免费xoxo在线视频 | 亚洲性久久久 | 国产黄色www | 日本高潮视频 | www国产精品一区 | 亚洲蜜芽在线精品一区 | 欧美狠狠| 婷婷综合另类小说色区 | 婷婷综合激情 | 日本韩国一级淫片a免费 | 欧美日韩国产成人在线 | 男女啪啪猛烈无遮挡猛进猛出 | 精品中文字幕一区二区三区av | 成人av高清在线观看 | 亚洲精品久久久久成人2007 | 欧美精选一区二区三区 | 亚洲第一成年免费网站 | 亚欧激情 | 亚洲成人精品一区二区三区 | 91九色国产ts另类人妖 | 亚洲天堂网一区二区 | 中文字幕亚洲天堂 | 羞羞色院91精品网站 | 嫩草精品 | 欧美日韩成人在线观看 | 精品无人码麻豆乱码1区2区 | 91精品国产综合久久久久久久久久 | 亚洲精品久久久久中文字幕二区 | 毛片无遮挡 | 国产精品一区二区三区四区在线观看 | 在线免费av网站 | 久久国产精品久久久久久 | 久久精品视 | 91成人免费看片 | 人与兽黄色毛片 | 成人午夜大片 | 色妇网| 日韩日韩日韩日韩日韩 | 亚洲高清精品视频 | 国内精品久久久久久99蜜桃 | 欧日韩一区二区三区 | 精品国产一二区 | 乱视频在线| 欧美黑人又粗又大的性格特点 | 亚洲中文字幕久久久一区 | 国产一二视频 | 麻豆私人影院 | 色狠狠av| 国产一二在线 | 国产精品无码久久久久久久久久 | 综合久久五月 | 伊人激情视频 | 麻豆tv在线观看 | 又大又黄又粗又爽的免费视频 | 久久国产加勒比精品无码 | 快色视频网站 | 蜜桃av噜噜一区二区三 | 少妇被粗大的猛进出69影院 | 国产一级爽片 | 91快播视频 | 国产中文欧美日韩在线 | 国产羞羞| 国产妇女乱一性一交 | 国产精品无码一本二本三本色 | 一起草最新网址 | 国产乱肥老妇国产一区二 | 中文字幕伊人 | 亚洲综合一二三 | 成人av影院 | 欧美夜夜爽 | 国产精品99久久久精品 | 精品黄色网 | 青青草91 | 欧美性猛交ⅹxxx乱大交妖精 | 欧美一级免费观看 | 韩国三级做爰视频 | 久草视频一区 | 久久久99精品免费观看 | 日韩一级片免费在线观看 | 好紧好湿好黄的视频 | 久久久人| lutube成人福利在线观看污 | 亚洲 欧美 国产 制服 动漫 | 乱女午夜精品一区二区三区 | 国产午夜在线 | 在线观看亚洲国产 | 亚洲国产日韩精品一区二区三区 | 成人国产精品入麻豆 | 朝鲜大乳女奶水奶水吃奶视频在线 | 人人爽久久涩噜噜噜小说 | 爱情岛亚洲论坛入口首页 | 欧美xxxxhd| 久久亚洲精品国产一区 | 亚洲一区二区三区在线观看视频 | 国产亚州精品女人久久久久久 | 狂野欧美性猛交免费视频 | 亚洲精品国产精品久久99热 | 两口子真实刺激高潮视频 | 欧美一级黄色片免费看 | 黄色小视频链接 | 天天综合日韩 | 成人h片在线观看 | 激情婷婷av | 久久久久成人精品免费播放动漫 | 色就是色网站 | 玖玖爱资源站 | 狠狠躁天天躁综合网 | 久久天天躁夜夜躁狠狠85麻豆 | 亚洲熟女少妇一区二区 | av无码久久久久不卡网站下载 | 东京热一本无码av | 午夜精品久久久久久久久久久久 | 久久99精品久久久久久国产越南 | 91麻豆产精品久久久久久夏晴子 | 福利视频二区 | 8090理论片午夜理伦片 | 亚洲色婷婷久久精品av蜜桃久久 | 日本19禁啪啪无遮挡网站 | 综合天堂av久久久久久久 | 色插综合| 国产又大又黑又粗 | 国产av亚洲精品ai换脸电影 | 95av成人女人啪啪 | 国产第99页 | 夜夜夜夜猛噜噜噜噜噜婷婷 | 无码人妻丰满熟妇精品区 | 午夜精品久久久久久久99热额 | 久久99国产精品久久99果冻传媒新版本 | 久久久夜夜夜 | 亚洲精品一区二区三区在线 | 日韩专区av | 久久久a级片| 国产裸体歌舞一区二区 | 亚洲免费区 | 亚洲欧美中文日韩v在线观看 | 人人爽人人片人人片av | 久久国内精品自在自线 | 黄色网在线 | 视频一区在线观看 | 唐人社导航福利精品 | 小荡货奶真大水多好紧视频 | 色噜噜狠狠色综合免费视频 | 九九视频九九热 | 精品国产自在精品国产精小说 | 午夜精品久久久久久中宇牛牛影视 | 日本japanese丰满少妇 | 久久99国产亚洲高清观看首页 | 狠狠综合久久久久综合网 | 无码人妻av免费一区二区三区 | 少妇逼逼 | 亚洲国产日韩精品二三四区竹菊 | 亚洲中字幕 | 久久精品99国产国产精 | 亚洲视频在线免费播放 | 欧美成人在线影院 | 国产精品人人妻人人爽 | 欧美精品免费在线观看 | 精品久久ai | 亚洲精品久久久中文字幕痴女 | 欧产日产国产精品乱噜噜 | 国产亚洲成av片在线观看 | 国产精品日 | 久草在线免费资源 | 精品无码午夜福利理论片 | 亚洲嫩草影院 | 少妇被粗大的猛烈进出免费视频 | 亚洲最大成人网站 | 韩日一区二区三区 | 日韩视频专区 | 国产精品一品二区三区的使用体验 | 人妻熟妇乱又伦精品视频app | 亚洲一级理论片 | 国产成人无码av一区二区 | 欧美日韩免费一区 | 精品久久福利 | 亚洲欧洲国产精品 | 川上奈美侵犯中文字幕在线 | 久久国产精品久久精品国产 | 国产一级二级三级视频 | 99精品偷拍在线中文字幕 | 制服丝袜快播 | 免费人成网站在线观看欧美高清 | 日本japanese学生丰满 | 喷水少妇 | 91亚洲乱码卡一卡二卡新区豆 | 综合色婷婷一区二区亚洲欧美国产 | 久久夜靖品2区 | 久久精品久久久久久久久久16 | 一级二级在线观看 | 欧美一区二区三区国产 | 亚洲网av | 国产精品2区 | 日韩激情视频一区二区 | 国产午夜精品久久久久久 | 日本欧美一区 | 亚洲三级免费 | 亚洲国产精品成人午夜在线观看 | 国产人与禽zoz0性伦多活几年 | 少妇太爽了在线观看免费 | 澳门永久av免费网站 | 日本乱轮视频 | 亚洲精品欧美二区三区中文字幕 | 少妇人妻偷人精品无码视频新浪 | 精品久久久久久久中文字幕 | 91精品国产99久久久久 | 女教师淫辱の教室蜜av臀 | 自拍一级片 | 污污的网站在线免费观看 | 久久久国产精品视频 | 色哟哟国产精品色哟哟 | 精品国产制服丝袜高跟 | 三级三级三级a级全黄网站 三级三级三级三级 | www91视频聊天com | 97成人精品 |