在分布式调度系统中,如果要实现调度服务器与多台计算节点服务器之间通信,采用socket来实现是一种实现方式,当然我们也可以通过数据存储任务,子节点来完成任务,但是往往使用数据作为任务存储都需要定制开发,要维护数据库中任务记录状态等等。开发的东西还是有点多,而且还不够灵活。因此,我个人是比较偏向于使用socket来实现任务的调度工作。原因:使用socket实现调度比较灵活,而且扩展性都比较好。
实现思路:调度服务器要实现调度工作,它必须与所有计算节点之间建立连接。而且他需要知道每台计算节点的任务状况,因此服务器节点必须存储与所有计算节点的socket连接对象。
在客户端唯一需要知道的就是它归属的调度服务器的通信IP和端口,因此client是发送连接的主动方,由调度服务器监听是否有client请求建立连接,当建立连接成功后,把该连接信息存储到一个结合中以便监控client的存货状态及通信使用。
扩展:
由于server端是存储了所有server与client的连接对象,因此我们是可以基于此demo的基础上实现聊天系统:
* 每当一个与用户发言时,是由server接收到的某个用户的发言信息的,此时服务器端可以通过循环发送该用户发送的信息给每个已经连接连接的用户(排除发送者)。
Server端代码(Window Console Project):
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Net.Sockets; using System.Net; namespace SocketServerAcceptMultipleClient { public class SocketServer { // 创建一个和客户端通信的套接字 static Socket socketwatch = null; //定义一个集合,存储客户端信息 static Dictionary<string, Socket> clientConnectionItems = new Dictionary<string, Socket> { }; public static void Main(string[] args) { //定义一个套接字用于监听客户端发来的消息,包含三个参数(IP4寻址协议,流式连接,Tcp协议) socketwatch = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); //服务端发送信息需要一个IP地址和端口号 IPAddress address = IPAddress.Parse("127.0.0.1"); //将IP地址和端口号绑定到网络节点point上 IPEndPoint point = new IPEndPoint(address, 8098); //此端口专门用来监听的 //监听绑定的网络节点 socketwatch.Bind(point); //将套接字的监听队列长度限制为20 socketwatch.Listen(20); //负责监听客户端的线程:创建一个监听线程 Thread threadwatch = new Thread(watchconnecting); //将窗体线程设置为与后台同步,随着主线程结束而结束 threadwatch.IsBackground = true; //启动线程 threadwatch.Start(); Console.WriteLine("开启监听。。。"); Console.WriteLine("点击输入任意数据回车退出程序。。。"); Console.ReadKey(); Console.WriteLine("退出监听,并关闭程序。"); } //监听客户端发来的请求 static void watchconnecting() { Socket connection = null; //持续不断监听客户端发来的请求 while (true) { try { connection = socketwatch.Accept(); } catch (Exception ex) { //提示套接字监听异常 Console.WriteLine(ex.Message); break; } //获取客户端的IP和端口号 IPAddress clientIP = (connection.RemoteEndPoint as IPEndPoint).Address; int clientPort = (connection.RemoteEndPoint as IPEndPoint).Port; //让客户显示"连接成功的"的信息 string sendmsg = "连接服务端成功!rn" + "本地IP:" + clientIP + ",本地端口" + clientPort.ToString(); byte[] arrSendMsg = Encoding.UTF8.GetBytes(sendmsg); connection.Send(arrSendMsg); //客户端网络结点号 string remoteEndPoint = connection.RemoteEndPoint.ToString(); //显示与客户端连接情况 Console.WriteLine("成功与" + remoteEndPoint + "客户端建立连接!tn"); //添加客户端信息 clientConnectionItems.Add(remoteEndPoint, connection); //IPEndPoint netpoint = new IPEndPoint(clientIP,clientPort); IPEndPoint netpoint = connection.RemoteEndPoint as IPEndPoint; //创建一个通信线程 ParameterizedThreadStart pts = new ParameterizedThreadStart(recv); Thread thread = new Thread(pts); //设置为后台线程,随着主线程退出而退出 thread.IsBackground = true; //启动线程 thread.Start(connection); } } /// <summary> /// 接收客户端发来的信息,客户端套接字对象 /// </summary> /// <param name="socketclientpara"></param> static void recv(object socketclientpara) { Socket socketServer = socketclientpara as Socket; while (true) { //创建一个内存缓冲区,其大小为1024*1024字节 即1M byte[] arrServerRecMsg = new byte[1024 * 1024]; //将接收到的信息存入到内存缓冲区,并返回其字节数组的长度 try { int length = socketServer.Receive(arrServerRecMsg); //将机器接受到的字节数组转换为人可以读懂的字符串 string strSRecMsg = Encoding.UTF8.GetString(arrServerRecMsg, 0, length); //将发送的字符串信息附加到文本框txtMsg上 Console.WriteLine("客户端:" + socketServer.RemoteEndPoint + ",time:" + GetCurrentTime() + "rn" + strSRecMsg + "rnn"); socketServer.Send(Encoding.UTF8.GetBytes("测试server 是否可以发送数据给client ")); } catch (Exception ex) { clientConnectionItems.Remove(socketServer.RemoteEndPoint.ToString()); Console.WriteLine("Client Count:" + clientConnectionItems.Count); //提示套接字监听异常 Console.WriteLine("客户端" + socketServer.RemoteEndPoint + "已经中断连接" + "rn" + ex.Message + "rn" + ex.StackTrace + "rn"); //关闭之前accept出来的和客户端进行通信的套接字 socketServer.Close(); break; } } } /// /// 获取当前系统时间的方法 /// 当前时间 static DateTime GetCurrentTime() { DateTime currentTime = new DateTime(); currentTime = DateTime.Now; return currentTime; } } }
Client端代码(Window Form Project):
using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Linq; using System.Text; using System.Windows.Forms; using System.Threading; using System.Net.Sockets; using System.Net; using System.Diagnostics; namespace SocketClient { public partial class Main : Form { //创建 1个客户端套接字 和1个负责监听服务端请求的线程 Thread threadclient = null; Socket socketclient = null; public Main() { InitializeComponent(); StartPosition = FormStartPosition.CenterScreen; //关闭对文本框的非法线程操作检查 TextBox.CheckForIllegalCrossThreadCalls = false; this.btnSendMessage.Enabled = false; this.btnSendMessage.Visible = false; this.txtMessage.Visible = false; } private void btnConnection_Click(object sender, EventArgs e) { this.btnConnection.Enabled = false; //定义一个套接字监听 socketclient = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); //获取文本框中的IP地址 IPAddress address = IPAddress.Parse("127.0.0.1"); //将获取的IP地址和端口号绑定在网络节点上 IPEndPoint point = new IPEndPoint(address, 8098); try { //客户端套接字连接到网络节点上,用的是Connect socketclient.Connect(point); this.btnSendMessage.Enabled = true; this.btnSendMessage.Visible = true; this.txtMessage.Visible = true; } catch (Exception) { Debug.WriteLine("连接失败rn"); this.txtDebugInfo.AppendText("连接失败rn"); return; } threadclient = new Thread(recv); threadclient.IsBackground = true; threadclient.Start(); } // 接收服务端发来信息的方法 void recv() { int x = 0; //持续监听服务端发来的消息 while (true) { try { //定义一个1M的内存缓冲区,用于临时性存储接收到的消息 byte[] arrRecvmsg = new byte[1024 * 1024]; //将客户端套接字接收到的数据存入内存缓冲区,并获取长度 int length = socketclient.Receive(arrRecvmsg); //将套接字获取到的字符数组转换为人可以看懂的字符串 string strRevMsg = Encoding.UTF8.GetString(arrRecvmsg, 0, length); if (x == 1) { this.txtDebugInfo.AppendText("服务器:" + GetCurrentTime() + "rn" + strRevMsg + "rnn"); Debug.WriteLine("服务器:" + GetCurrentTime() + "rn" + strRevMsg + "rnn"); } else { this.txtDebugInfo.AppendText(strRevMsg + "rnn"); Debug.WriteLine(strRevMsg + "rnn"); x = 1; } } catch (Exception ex) { Debug.WriteLine("远程服务器已经中断连接" + "rnn"); Debug.WriteLine("远程服务器已经中断连接" + "rn"); break; } } } //获取当前系统时间 DateTime GetCurrentTime() { DateTime currentTime = new DateTime(); currentTime = DateTime.Now; return currentTime; } //发送字符信息到服务端的方法 void ClientSendMsg(string sendMsg) { //将输入的内容字符串转换为机器可以识别的字节数组 byte[] arrClientSendMsg = Encoding.UTF8.GetBytes(sendMsg); //调用客户端套接字发送字节数组 socketclient.Send(arrClientSendMsg); //将发送的信息追加到聊天内容文本框中 Debug.WriteLine("hello...." + ": " + GetCurrentTime() + "rn" + sendMsg + "rnn"); this.txtDebugInfo.AppendText("hello...." + ": " + GetCurrentTime() + "rn" + sendMsg + "rnn"); } private void btnSendMessage_Click(object sender, EventArgs e) { //调用ClientSendMsg方法 将文本框中输入的信息发送给服务端 ClientSendMsg(this.txtMessage.Text.Trim()); this.txtMessage.Clear(); } } }
测试结果截图:
server端:
client端:
代码下载地址
链接:http://pan.baidu.com/s/1kVBUOD5 密码:16ib
https://www.cnblogs.com/yy3b2007com/p/7476458.html
原文链接:https://blog.csdn.net/ba_wang_mao/article/details/106045479
原创文章,作者:优速盾-小U,如若转载,请注明出处:https://www.cdnb.net/bbs/archives/6130