旧バージョンのデフォルトの同期メッセージングを使用する
Liferay DXP 7.4 U48/Portal GA49 以下
デフォルトの同期メッセージングでは、メッセージバススレッドが登録されたメッセージリスナーにメッセージをディスパッチしている間、送信者はブロックします。 応答メッセージが受信されるか、送信者スレッドがタイムアウトすると、送信者はブロックを解除します。
送信者は、受信した 最初の 応答メッセージでブロックを解除します。
サンプルプロジェクトを使用して、デフォルトの同期メッセージを送信します。 次に、例を変更してメッセージをタイムアウトにします。
デフォルトの同期メッセージを送信する
サンプルプロジェクトでは、デフォルトモードでSynchronousMessageSenderを使用してメッセージを送信し、応答を待ちます。
サンプルプロジェクトでは、SynchronousMessageSenderを使用して、2つのリスナーに直接メッセージを送信します。
新しいLiferay インスタンスを起動し、以下を実行します。
docker run -it -m 8g -p 8080:8080 liferay/portal:7.4.3.48-ga48
http://localhost:8080でLiferayにサインインします。 メールアドレス test@liferay.com とパスワード testを使用してください。 プロンプトが表示されたら、パスワードを learnに変更します。
次に、以下の手順に従います。
-
サンプルをダウンロードして解凍します。
curl https://resources.learn.liferay.com/examples/liferay-m4q7.zip -Ounzip liferay-m4q7.zip -
サンプルのプロジェクトモジュールをビルドしてデプロイします。
cd liferay-m4q7./gradlew deploy -Ddeploy.docker.container.id=$(docker ps -lq)注このコマンドは、モジュールJARをDockerコンテナ上の
/opt/liferay/osgi/modulesにコピーするのと同じです。 -
Dockerコンテナコンソールにモジュールの起動が表示されます。
STARTED com.acme.m4q7.able.impl_1.0.0 STARTED com.acme.m4q7.baker.impl_1.0.0 STARTED com.acme.m4q7.charlie.impl_1.0.0 -
ブラウザで
http://localhost:8080にあるLiferayインスタンスにアクセスし、認証情報を使用してサインインします。 -
Gogo シェルを開きます。
-
Gogoシェルコマンドフィールドに、
m4q7:sendMessageと入力し、その後にメッセージを入力します。 例えば、m4q7:sendMessage foo -
出力が次のようになっていることを確認します。
INFO [acme/m4q7_able-2][M4Q7CharlieMessageListener:23] Received message payload foo INFO [acme/m4q7_baker-2][M4Q7BakerMessageListener:21] Received message payload M4Q7CharlieMessageListener INFO [pipe-m4q7:sendMessage foo][M4Q7BakerOSGiCommands:28] Response: M4Q7CharlieMessageListener
acme/m4q7_able宛先で、M4Q7CharlieMessageListenerがGogoシェルメッセージを受信しました。 acme/m4q7_baker宛先で、M4Q7BakerMessageListenerがM4Q7CharlieMessageListenerから応答メッセージを受信しました。 最後に、M4Q7BakerOSGiCommandsのsendMessageメソッドが、メッセージ送信者から返された応答オブジェクトをログに記録しました。
プロジェクト概要
3つのサンプルモジュールクラスは、宛先を管理し、メッセージをリッスンし、メッセージを送信します。
m4q7-able-implモジュール:M4Q7AbleMessagingConfiguratorは、acme/m4q7_ableという名前のメッセージ宛先を作成し、それをメッセージバスに登録します。
m4q7-baker-implモジュール:
M4Q7BakerOSGiCommandsは、acme/m4q7_able宛先にメッセージを送信し、応答をログに記録します。M4Q7BakerMessagingConfiguratorは、acme/m4q7_bakerという名前のメッセージ宛先を作成し、それをメッセージバスに登録します。M4Q7BakerMessageListenerは、acme/m4q7_baker宛先に送信されたメッセージをリッスンし、メッセージペイロードをログに記録します。
m4q7-charlie-implモジュール:M4Q7CharlieMessageListenerは、acme/m4q7_able宛先に送信されたメッセージをリッスンし、メッセージペイロードをログに記録し、元のメッセージの応答先に応答メッセージを送信します。
イベントフローは次のとおりです。
m4q7:sendMessageM4Q7BakerOSGiCommandsのsendMessage(String)メソッドは、Gogoシェルコマンドでトリガーし、メッセージ内のコマンド引数をacme/m4q7_able宛先に送信します。- メッセージバススレッドは、メッセージを
M4Q7CharlieMessageListenerに配信します。 M4Q7CharlieMessageListenerは、メッセージペイロードをログに記録し、応答メッセージ内の独自のクラス名を元のメッセージの応答先acme/m4q7_bakerに送信します。M4Q7BakerMessageListenerは応答メッセージを受信し、そのペイロードをログに記録します。- 処理は
M4Q7BakerOSGiCommandsに戻り、元のメッセージへの応答をログに記録します。
次に、宛先コンフィギュレーターから順に、各クラスを調べます。
宛先コンフィグレーターを調べる
m4q7-able-implモジュールとm4q7-baker-implモジュールには、それぞれ宛先コンフィギュレータークラスM4Q7AbleMessagingConfiguratorとM4Q7BakerMessagingConfiguratorがあります。 それぞれが宛先を作成して構成します。
M4Q7AbleMessagingConfiguratorクラスは、acme/m4q7_able宛先を構成します。
@Component
public class M4Q7AbleMessagingConfigurator {
@Activate
private void _activate(BundleContext bundleContext) {
DestinationConfiguration destinationConfiguration =
DestinationConfiguration.createSerialDestinationConfiguration(
"acme/m4q7_able");
Destination destination = _destinationFactory.createDestination(
destinationConfiguration);
_serviceRegistration = bundleContext.registerService(
Destination.class, destination,
MapUtil.singletonDictionary(
"destination.name", destination.getName()));
}
@Deactivate
private void _deactivate() {
if (_serviceRegistration != null) {
_serviceRegistration.unregister();
}
}
@Reference
private DestinationFactory _destinationFactory;
private ServiceRegistration<Destination> _serviceRegistration;
}
M4Q7BakerMessagingConfiguratorクラスは、acme/m4q7_baker宛先を構成します。
@Component
public class M4Q7BakerMessagingConfigurator {
@Activate
private void _activate(BundleContext bundleContext) {
DestinationConfiguration destinationConfiguration =
DestinationConfiguration.createSerialDestinationConfiguration(
"acme/m4q7_baker");
Destination destination = _destinationFactory.createDestination(
destinationConfiguration);
_serviceRegistration = bundleContext.registerService(
Destination.class, destination,
MapUtil.singletonDictionary(
"destination.name", destination.getName()));
}
@Deactivate
private void _deactivate() {
if (_serviceRegistration != null) {
_serviceRegistration.unregister();
}
}
@Reference
private DestinationFactory _destinationFactory;
private ServiceRegistration<Destination> _serviceRegistration;
}
両方のコンフィギュレータは コンポーネント クラスです。 これらは、 @Reference アノテーションを使用して、 DestinationFactory インスタンスを挿入します。
_activate(BundleContext) メソッドは、 DestinationFactory と DestinationConfiguration を使用して、 シリアル 宛先を作成します。 最後に、 _activate(BundleContext) メソッドは、 BundleContextを使用して、 OSGi サービスに Destination を登録します。
警告
デフォルトの同期メッセージングでは、シリアルまたはパラレルの宛先のみを使用してください。 これらは、 DestinationConfigurationの createSerialDestinationConfiguration(String) メソッドと createParallelDestinationConfiguration(String) メソッドを呼び出すことによって作成できます。
メッセージ送信者のタイムアウトが無効になるため、デフォルトの同期メッセージングでは同期宛先を使用しないでください。
コンフィギュレーターが無効になると、それらの_deactivate()メソッドは宛先サービスの登録を解除します。
リスナーを調べる
m4q7-charlie-impl モジュールの M4Q7CharlieMessageListener クラスは、 acme/m4q7_able Destinationに送信されるメッセージをリッスンします。 これは、 メッセージのリッスン で示されているのと同じ方法で登録されます。
M4Q7CharlieMessageListenerクラス:
@Component(
property = "destination.name=acme/m4q7_able",
service = MessageListener.class
)
public class M4Q7CharlieMessageListener implements MessageListener {
@Override
public void receive(Message message) {
if (_log.isInfoEnabled()) {
_log.info("Received message payload " + message.getPayload());
}
_messageBus.sendMessage(
message.getResponseDestinationName(),
new Message() {
{
setPayload("M4Q7CharlieMessageListener");
setResponseId(message.getResponseId());
}
});
}
private static final Log _log = LogFactoryUtil.getLog(
M4Q7CharlieMessageListener.class);
@Reference
private MessageBus _messageBus;
}
M4Q7CharlieMessageListenerがメッセージを受信すると、そのreceive(Message)メソッドはメッセージペイロードをログに記録し、元のメッセージの応答先に応答メッセージを送信します。 このメソッドは、応答メッセージのペイロードをリスナークラス名に設定し、応答メッセージIDを元のメッセージの応答IDに設定します。
重要
デフォルトの同期メッセージングでは、応答メッセージは元のメッセージの応答 ID を使用する必要があり、 を応答の宛先に送信する必要があります。
m4q7-baker-implモジュールのM4Q7BakerMessageListenerクラスは、M4Q7BakerOSGiCommandsのメッセージの応答先であるacme/m4q7_bakerに送信されたメッセージをリッスンします。
M4Q7BakerMessageListenerクラス:
@Component(
property = "destination.name=acme/m4q7_baker",
service = MessageListener.class
)
public class M4Q7BakerMessageListener implements MessageListener {
@Override
public void receive(Message message) {
if (_log.isInfoEnabled()) {
_log.info("Received message payload " + message.getPayload());
}
}
private static final Log _log = LogFactoryUtil.getLog(
M4Q7BakerMessageListener.class);
}
M4Q7BakerMessageListenerがメッセージを受信すると、そのreceive(Message)メソッドはメッセージペイロードをログに記録します。
送信者を調べる
m4q7-baker-impl モジュールの M4Q7BakerOSGiCommands クラスは、コマンド引数をメッセージで "acme/m4q7_able" 宛先に送信することをトリガーする OSGi コマンドを提供します。
@Component(
property = {"osgi.command.function=sendMessage", "osgi.command.scope=m4q7"},
service = M4Q7BakerOSGiCommands.class
)
public class M4Q7BakerOSGiCommands {
public void sendMessage(String payload) throws MessageBusException {
Message message = new Message();
message.setPayload(payload);
message.setResponseDestinationName("acme/m4q7_baker");
Object response = _synchronousMessageSender.send(
"acme/m4q7_able", message, 10000);
if (_log.isInfoEnabled()) {
_log.info("Response: " + response);
}
}
private static final Log _log = LogFactoryUtil.getLog(
M4Q7BakerOSGiCommands.class);
@Reference(target = "(mode=DEFAULT)")
private SynchronousMessageSender _synchronousMessageSender;
}
M4Q7BakerOSGiCommandsは、独自のクラスタイプのサービスComponentです。 @Reference アノテーションを使用して、 SynchronousMessageSender を挿入します。これは、 デフォルト モード (アノテーションの target = "(mode=DEFAULT)" 属性によって指定されます) に設定されます。
注
デフォルト モードでは、 SynchronousMessageSenderの send メソッドは、応答メッセージが受信されるか、送信者がタイムアウトするまで、呼び出し元のクラスをブロックします。
M4Q7BakerOSGiCommandsの@Componentプロパティは、m4q7スコープでsendMessageと呼ばれるGogoシェルコマンド関数を定義します。 このコマンドは入力Stringを受け取り、M4Q7BakerOSGiCommandsのsendMessage(String)メソッドにマッピングします。
sendMessage(String) メソッドは、Gogo シェル コマンドの String をペイロードとして、 "acme/m4q7_baker" を応答の宛先として、 Message を作成します。
sendMessage(String) メソッドは、 SynchronousMessageSenderの send(String, Message, long) メソッドを呼び出して、 "acme/m4q7_able" 送信先名、メッセージ インスタンス、および 10000 ミリ秒のタイムアウトを渡してメッセージを送信します。 デフォルトモードでは、SynchronousMessageSenderはメッセージバススレッドを使用してメッセージをメッセージリスナーに配信します。 元のメッセージの応答 ID を持つメッセージが "acme/m4q7_baker" 応答宛先で受信されるまで、実行は M4Q7BakerOSGiCommands クラスでブロックされます。 レスポンスを受信すると、 M4Q7BakerOSGiCommands sendMessage(String) メソッドで実行が継続され、メッセージ応答をログに記録します。 一致する応答メッセージを受信する前にタイムアウトが期限切れになると、SynchronousMessageSenderのsend(String, Message, long)メソッドはMessageBusExceptionをスローします。
重要
デフォルトの同期メッセージングでは、応答メッセージは元のメッセージの応答 ID を使用する必要があり、 を応答の宛先に送信する必要があります。
メッセージリスナーが応答メッセージを返すのを確認したので、応答のタイムアウトをテストできます。
応答タイムアウトのデモを実行する
メッセージ応答ロジックをオフにしてタイムアウトを強制する方法は次のとおりです。
-
M4Q7CharlieMessageListenerのreceive(Message)メソッドで、_messageBus.sendMessage(...)呼び出しをコメントアウトします。
@Override
public void receive(Message message) {
if (_log.isInfoEnabled()) {
Object payload = message.getPayload();
_log.info("Received message payload " + payload.toString());
}
// _messageBus.sendMessage(
// message.getResponseDestinationName(),
// new Message() {
// {
// setPayload("M4Q7CharlieMessageListener");
// setResponseId(message.getResponseId());
// }
// });
}
-
サンプルプロジェクトを再デプロイします。
./gradlew deploy -Ddeploy.docker.container.id=$(docker ps -lq)
-
Gogoシェルコマンドフィールドに、m4q7:sendMessageと入力し、その後にメッセージを入力します。 例えば、
m4q7:sendMessage foo
-
Gogoシェルページが次のようになっていることを確認します。

-
Dockerコンソールのメッセージが次のようになっていることを確認します。
INFO [acme/m4q7_able-2][M4Q7CharlieMessageListener:23] Received message payload foo
M4Q7CharlieMessageListenerはメッセージを受信しましたが、応答しませんでした。 SynchronousMessageSenderは、Gogoシェルページに出力されたMessageBusExceptionをスローしました。
タイムアウトと同期してメッセージを送信しました。
次のステップ
ダイレクト モードを使用した同期メッセージングについて詳しく知りたい場合は、「 以前のバージョンでのダイレクト同期メッセージングの使用」を参照してください。
メッセージを送信した後すぐに処理を続行する場合は、「 非同期メッセージングの使用」を参照してください。
関連トピック