Basic SocketAsyncEventArgs Server and Client

A while ago I tried to write a simple server and client that worked with the SocketAsyncEventArgs class. I also used BlockingCollection which was introduced in .NET 4.

Here’s the client executable:

using System;
using System.Text;
using System.Net;
using System.Threading;
using UdpLibrary;

namespace Client
{
class Client
{
static UdpSocket socket;
static void Main(string[] args)
{
socket = new UdpSocket();
socket.Bind(IPAddress.Any);

// Start a thread that prints anything sent to us
new Thread(() => printit()).Start();
string s;
while (true)
{
s = Console.ReadLine();
// Send input to server 100 times
for (int i = 0; i < 100; i++)
socket.Send(IPAddress.Loopback, 10100, Encoding.UTF8.GetBytes(s));
}
}

static void printit()
{
EndPoint ip;
byte[] buffer;
string s;
while (true)
{
socket.Receive(out ip, out buffer);
// Print out anything we receive
s = Encoding.UTF8.GetString(buffer);
Console.WriteLine(ip.ToString() + " " + s);
}
}
}
}

The server executable:

using System;
using System.Text;
using System.Net;
using UdpLibrary;

namespace Server
{
class Server
{
static void Main(string[] args)
{
UdpSocket socket = new UdpSocket();
socket.Bind(IPAddress.Loopback, 10100);

EndPoint ip;
byte[] buffer;
string s;
while (true)
{
socket.Receive(out ip, out buffer);
s = Encoding.UTF8.GetString(buffer);
// Print out what you recv then send back echo
Console.WriteLine(ip.ToString() + " " + s);
socket.Send(((IPEndPoint)ip).Address, ((IPEndPoint)ip).Port, Encoding.UTF8.GetBytes(s));
}
}
}
}

The “UdpLibrary” both executables reference:

using System;
using System.Threading;
using System.Net;
using System.Net.Sockets;
using System.Collections.Concurrent;

namespace UdpLibrary
{
public class UdpSocket
{
bool running;
int _transferDelay;
double _loss;
Random rnd;
Socket socket;
struct PacketStruct
{
public EndPoint ip;
public byte[] buffer;
}
BlockingCollection<PacketStruct> sendQueue;
BlockingCollection<PacketStruct> receiveQueue;

public UdpSocket()
{
running = false;
sendQueue = new BlockingCollection<PacketStruct>();
receiveQueue = new BlockingCollection<PacketStruct>();
rnd = new Random();
}

~UdpSocket()
{
running = false;
}

public bool Send(IPAddress address, int port, byte[] buffer, int timeout = -1)
{
// Don't really send, but add to the queue to be sent out
PacketStruct tmp;
tmp.ip = new IPEndPoint(address, port);
tmp.buffer = new byte[buffer.Length];
Buffer.BlockCopy(buffer, 0, tmp.buffer, 0, buffer.Length);
return sendQueue.TryAdd(tmp, timeout);
}

public bool Receive(out EndPoint ip, out byte[] buffer, int timeout = -1)
{
// See if there's anything in the queue for us to receive
PacketStruct tmp;
bool ret = receiveQueue.TryTake(out tmp, timeout);
if (ret)
{
ip = tmp.ip;
buffer = new byte[tmp.buffer.Length];
Buffer.BlockCopy(tmp.buffer, 0, buffer, 0, tmp.buffer.Length);
}
else
{
ip = null;
buffer = null;
}
return ret;
}

public void Bind(IPAddress address, int port = 0, int transferDelay = 0, double loss = 0)
{
if (running)
return;
socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
try
{
socket.Bind(new IPEndPoint(address, port));
}
catch
{
return;
}
_transferDelay = transferDelay;
_loss = loss;

// Bind to port and start a thread
Thread thread = new Thread(() => ThreadProc());
thread.Start();
}

void ThreadProc()
{
running = true;

byte[] buffer = new byte[1300];
SocketAsyncEventArgs receiveEvent = new SocketAsyncEventArgs();
receiveEvent.Completed += receiveEvent_Completed;
receiveEvent.RemoteEndPoint = new IPEndPoint(IPAddress.Any, 0);
receiveEvent.SetBuffer(buffer, 0, buffer.Length);

if (!socket.ReceiveMessageFromAsync(receiveEvent))
throw new NotImplementedException();

PacketStruct tmp;
while (running)
{
sendQueue.TryTake(out tmp, -1);

SocketAsyncEventArgs sendEvent = new SocketAsyncEventArgs();
sendEvent.Completed += sendEvent_Completed;
sendEvent.RemoteEndPoint = tmp.ip;
sendEvent.SetBuffer(tmp.buffer, 0, tmp.buffer.Length);

if (!socket.SendToAsync(sendEvent))
throw new NotImplementedException();
}
}

void sendEvent_Completed(object sender, SocketAsyncEventArgs e)
{
if (e.SocketError == SocketError.Success && e.LastOperation == SocketAsyncOperation.SendTo)
{
PacketStruct tmp;
if (sendQueue.TryTake(out tmp))
{
e.RemoteEndPoint = tmp.ip;
e.SetBuffer(tmp.buffer, 0, tmp.buffer.Length);

if (!((Socket)sender).SendToAsync(e))
throw new NotImplementedException();
}
else
{
e.Completed -= sendEvent_Completed;
}
return;
}
throw new NotImplementedException();
}

void receiveEvent_Completed(object sender, SocketAsyncEventArgs e)
{
if (e.SocketError == SocketError.MessageSize)
{
if (!((Socket)sender).ReceiveMessageFromAsync(e))
throw new NotImplementedException();
return;
}
if (e.SocketError == SocketError.Success && e.LastOperation == SocketAsyncOperation.ReceiveMessageFrom)
{
if (_loss != 0)
{
lock (rnd)
{
if (rnd.NextDouble() < _loss)
{
if (!((Socket)sender).ReceiveMessageFromAsync(e))
throw new NotImplementedException();
return;
}
}
}

if (_transferDelay != 0)
Thread.Sleep(_transferDelay);

PacketStruct tmp;
tmp.ip = e.RemoteEndPoint;
tmp.buffer = new byte[e.BytesTransferred];
Buffer.BlockCopy(e.Buffer, 0, tmp.buffer, 0, e.BytesTransferred);
receiveQueue.TryAdd(tmp, -1);

if (!((Socket)sender).ReceiveMessageFromAsync(e))
throw new NotImplementedException();
return;
}
if (e.SocketError == SocketError.ConnectionReset)
{
}
throw new NotImplementedException();
}
}
}

Maybe this can help someone?

Leave a Reply

Your email address will not be published. Required fields are marked *