forked from ServiceStack/ServiceStack
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathPacketProcessingClient.cs
More file actions
136 lines (117 loc) · 3.76 KB
/
PacketProcessingClient.cs
File metadata and controls
136 lines (117 loc) · 3.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#if !SL5
using System;
using System.Collections.Generic;
using System.Net.Sockets;
namespace ServiceStack.Messaging.Rcon
{
/// <summary>
/// Processing client used to interface with ServiceStack and allow a message to be processed.
/// Not an actual client.
/// </summary>
internal class ProcessingClient : IMessageQueueClient, IOneWayClient
{
Packet thePacket;
Socket theClient;
Server theServer;
bool givenPacket = false;
public ProcessingClient(Packet packet, Socket client, Server server)
{
thePacket = packet;
theClient = client;
theServer = server;
}
public void Publish<T>(T messageBody)
{
if (typeof(IMessage).IsAssignableFrom(typeof(T)))
Publish((IMessage)messageBody);
else
Publish<T>(new Message<T>(messageBody));
}
public void Publish<T>(IMessage<T> message)
{
Publish(message.ToInQueueName(), message);
}
public void SendOneWay(object requestDto)
{
Publish(MessageFactory.Create(requestDto));
}
public void SendOneWay(string queueName, object requestDto)
{
Publish(queueName, MessageFactory.Create(requestDto));
}
public void SendAllOneWay(IEnumerable<object> requests)
{
if (requests == null) return;
foreach (var request in requests)
{
SendOneWay(request);
}
}
/// <summary>
/// Publish the specified message into the durable queue @queueName
/// </summary>
public void Publish(string queueName, IMessage message)
{
var messageBytes = message.ToBytes();
theServer.Publish(queueName, messageBytes, theClient, thePacket.Sequence);
}
/// <summary>
/// Publish the specified message into the transient queue @queueName
/// </summary>
public void Notify(string queueName, IMessage message)
{
var messageBytes = message.ToBytes();
theServer.Notify(queueName, messageBytes, theClient, thePacket.Sequence);
}
/// <summary>
/// Synchronous blocking get.
/// </summary>
public IMessage<T> Get<T>(string queueName, TimeSpan? timeOut = null)
{
if (givenPacket)
return null;
var ret = thePacket.Words[1];
givenPacket = true;
return ret.ToMessage<T>();
}
/// <summary>
/// Non blocking get message
/// </summary>
public IMessage<T> GetAsync<T>(string queueName)
{
return Get<T>(queueName, TimeSpan.MinValue);
}
public void Ack(IMessage message)
{
}
public void Nak(IMessage message, bool requeue, Exception exception = null)
{
var msgEx = exception as MessagingException;
if (!requeue && msgEx != null && msgEx.ResponseDto != null)
{
var msg = MessageFactory.Create(msgEx.ResponseDto);
Publish(msg.ToDlqQueueName(), msg);
return;
}
var queueName = requeue
? message.ToInQueueName()
: message.ToDlqQueueName();
Publish(queueName, message);
}
public IMessage<T> CreateMessage<T>(object mqResponse)
{
return (IMessage<T>)mqResponse;
}
public string GetTempQueueName()
{
return QueueNames.GetTempQueueName();
}
public void Nak(IMessage message)
{
}
public void Dispose()
{
}
}
}
#endif