非同期メッセージングの使用
メッセージバスの非同期オプションは、「ファイア・アンド・フォーゲット」動作を提供します。メッセージを送信し、応答を待たずに処理を続行します。
非同期メッセージは、 シリアル または パラレル の宛先に送信されます。
-
シリアル宛先の場合、メッセージバスはメッセージをキューに入れ、メッセージごとに1つのワーカースレッドを委任します。 スレッドはメッセージリスナーを順番に処理します。
-
パラレル宛先の場合、メッセージバスはメッセージをキューに入れ、1つのメッセージリスナーにつきメッセージごとに1つのワーカースレッドを委任します。 スレッドはメッセージリスナーを同時に処理します。
別のクラス(メッセージリスナー)がリッスンしているシリアル宛先にメッセージを送信することから始めます。
メッセージを送る
新しいLiferay インスタンスを起動し、以下を実行します。
docker run -it -m 8g -p 8080:8080 liferay/portal:7.4.3.132-ga132
http://localhost:8080でLiferayにサインインします。 メールアドレス test@liferay.com とパスワード testを使用してください。 プロンプトが表示されたら、パスワードを learnに変更します。
サンプルプロジェクトでメッセージを送信することから始めます。
-
サンプルをダウンロードして解凍します。
curl https://resources.learn.liferay.com/examples/liferay-n8k5.zip -Ounzip liferay-n8k5.zip -
宛先モジュール
n8k5-able-implをビルドしてデプロイします。cd liferay-n8k5/n8k5-able-impl../gradlew deploy -Ddeploy.docker.container.id=$(docker ps -lq)注このコマンドは、モジュール JAR を Docker コンテナ上の
/opt/liferay/osgi/modulesにコピーするのと同じです。Dockerコンテナコンソールに、モジュールが起動されたことが示されます。
STARTED com.acme.n8k5.able.impl_1.0.0 -
リスナーモジュール
n8k5-charlie-implをビルドしてデプロイします。cd ../n8k5-charlie-impl../gradlew deploy -Ddeploy.docker.container.id=$(docker ps -lq)Dockerコンテナコンソールに、モジュールが起動されたことが示されます。
STARTED com.acme.n8k5.charlie.impl_1.0.0 -
送信者モジュール
n8k5-baker-implをビルドしてデプロイします。cd ../n8k5-baker-impl../gradlew deploy -Ddeploy.docker.container.id=$(docker ps -lq)Docker コンテナ コンソールで、
N8K5Bakerがメッセージを送信し、N8K5CharlieMessageListenerがメッセージを受信し、n8k5-baker-implモジュールが起動したことを確認します。INFO [pipe-start 2025][N8K5Baker:24] Sent message to acme/n8k5_able INFO [acme/n8k5_able-4][N8K5CharlieMessageListener:21] Received message payload N8K5Baker#_activate STARTED com.acme.n8k5.baker.impl_1.0.0 [2025]
N8K5Bakerは、宛先acme/n8k5_ableにメッセージを送信したことを報告しました。 N8K5CharlieMessageListenerは、宛先acme/n8k5_ableでペイロードN8K5Baker#_activateを含むメッセージを受信しました。 これで、サンプルコードを調べることができます。
プロジェクト概要
この例の3つのモジュールには、それぞれ1つのクラスがあります。 各クラスは、メッセージングコンポーネントの1つ(宛先、送信者、リスナー)を表します。
クラスの例:
| クラス | 説明 |
|---|---|
n8k5-able-impl の N8K5AbleMessagingConfigurator | acme/n8k5_ableという名前のメッセージ宛先を作成し、メッセージバスに登録します。 |
n8k5-baker-impl の N8K5Baker | acme/n8k5_able宛先にメッセージを送信します。 |
n8k5-charlie-impl の N8K5CharlieMessageListener | acme/n8k5_able宛先に送信されたメッセージをリッスンします。 |
これらがどのように相互作用するかを以下に示します。
N8K5Bakerが有効になり(たとえば、n8k5-baker-implモジュールが起動したとき)、acme/n8k5_able宛先にメッセージを送信します。- メッセージバスがメッセージを
N8K5CharlieMessageListenerに送信します。 N8K5CharlieMessageListenerがメッセージを受信します。
宛先構成と送信者クラスを調べます。 リスナー クラス N8K5CharlieMessageListener は、 メッセージのリッスン で示されているのと同じ方法で登録されます。
宛先構成を調べる
n8k5-able-implモジュールのN8K5AbleMessagingConfiguratorクラスは、宛先を作成して構成します。 コードは次のとおりです。
@Component
public class N8K5AbleMessagingConfigurator {
@Activate
private void _activate(BundleContext bundleContext) {
Destination destination = _destinationFactory.createDestination(
DestinationConfiguration.createSerialDestinationConfiguration(
"acme/n8k5_able"));
_serviceRegistration = bundleContext.registerService(
Destination.class, destination,
MapUtil.singletonDictionary(
"destination.name", destination.getName()));
}
@Deactivate
private void _deactivate() {
if (_serviceRegistration != null) {
_serviceRegistration.unregister();
}
}
@Reference
private DestinationFactory _destinationFactory;
private ServiceRegistration<Destination> _serviceRegistration;
}
どのクラスでも宛先を作成して構成できますが、 コンポーネント には、 DestinationFactoryのような注入された依存関係がある場合があります。 _destinationFactory フィールドの @Reference アノテーションは、Liferay の OSGi フレームワークに DestinationFactory インスタンスを挿入するように指示します。
_activate メソッドで、 N8K5AbleMessagingConfigurator は DestinationFactory と DestinationConfiguration を使用して、 serial 宛先 acme/n8k5_ableを作成します。 次に、OSGiフレームワークBundleContextを使用して、Destinationに対するサービスを登録します。 N8K5AbleMessagingConfiguratorが無効化されると、_deactivateメソッドはサービスの登録を解除します。
送信者を調べる
以下の N8K5Baker クラスは、ペイロード "N8K5Baker#_activate" を持つメッセージを acme/n8k5_ableという名前の宛先に送信します。
@Activate
private void _activate() {
Message message = new Message();
message.setPayload("N8K5Baker");
_messageBus.sendMessage("acme/n8k5_able", message);
}
@Reference
private MessageBus _messageBus;
コンポーネントとして、N8K5Bakerは@Referenceアノテーションを使用してMessageBusインスタンスを挿入します。
コンポーネントのアクティブ化時に、N8K5Bakerは、アクティブ化メソッド_activate()を介してメッセージを作成して送信します。 Message インスタンスを構築し、それにペイロードを追加します。 ペイロードは、Messageに入力できるもののうちの1つです。
主なメッセージ入力方法は次のとおりです。
| メソッド | 説明 |
|---|---|
setPayload(Object) | Messageのメインコンテンツを追加します。 |
setResponseDestinationName(String) | 応答を受信するためのDestinationを参照します。 |
setValues(Map<String,Object>) | Mapから追加データを提供します。 |
N8K5Baker は、 MessageBusの sendMessage(String, Message) メソッドを呼び出して、 Destination 、 acme/n8k5_able にメッセージを送信します。 MessageBus は新しいスレッドを開始し、 Message を MessageListener インスタンス ( acme/n8k5_able Destinationに登録されています) に送信します。 N8K5Bakerのスレッドが継続します。
Messageへの応答を受信する場合は、 Message に応答の宛先を設定し、 N8K5Bakerなどのクラスを MessageListener としてその宛先に登録します。 詳細については、 メッセージのリッスン を参照してください。
応答処理の追加
メッセージ受信者からの応答が必要な場合は、返信の応答先を設定します。
- メッセージ応答用に別の宛先を登録します。
- クラス(例えば、元の送信者)を
MessageListenerとして応答先に登録します。 - メッセージで応答先を渡します。
MessageListenerに応答ロジックを追加します。
ステップ1:応答の宛先を登録する
N8K5AbleDestinationConfiguratorが宛先を管理するのと同じ方法で、応答先を管理するようにN8K5Bakerを変更できます。 _activate()メソッドのシグネチャを_activate(BundleContext bundleContext)に置き換え、acme/n8k5_baker応答先のサービスを作成、構成、および登録するコードを追加します。 サービスの登録を解除する_deactivate()メソッドを追加します。 _activate(BundleContext bundleContext)メソッドと_deactivate()メソッドは次のようになります。
@Activate
private void _activate(BundleContext bundleContext) {
Destination destination = _destinationFactory.createDestination(
DestinationConfiguration.createSerialDestinationConfiguration(
"acme/n8k5_baker"));
_serviceRegistration = bundleContext.registerService(
Destination.class, destination,
MapUtil.singletonDictionary(
"destination.name", destination.getName()));
Message message = new Message();
message.setPayload("N8K5Baker#_activate");
_messageBus.sendMessage("acme/n8k5_able", message);
}
@Deactivate
private void _deactivate() {
if (_serviceRegistration != null) {
_serviceRegistration.unregister();
}
}
@Reference
private DestinationFactory _destinationFactory;
private ServiceRegistration<Destination> _serviceRegistration;
ステップ2:N8K5Bakerを応答先のリスナーとして登録する
送信者N8K5Bakerの変更点は次のとおりです。
@Componentアノテーションを更新し、N8K5BakerタイプMessageListener.classのサービスを宣言し、プロパティ"destination.name=acme/n8k5_baker"を介してN8K5Bakerをその応答宛先にマッピングします。MessageListenerインターフェースを実装します。receive(Message)メソッドをメッセージ処理ロジックでオーバーライドします。
送信者の変更は次のようになります。
@Component(
property = "destination.name=acme/n8k5_baker",
service = MessageListener.class
)
public class N8K5Baker implements MessageListener {
@Override
public void receive(Message message) {
Object payload = message.getPayload();
_log.info("Received message payload " + payload.toString());
}
// Existing methods and fields
private static final Log _log = LogFactoryUtil.getLog(N8K5Baker.class);
}
ステップ3:メッセージの応答先を渡す
N8K5Bakerが送信するメッセージの応答先としてacme/n8k5_bakerを設定します。 次のようになります。
@Activate
private void _activate(BundleContext bundleContext) {
// Destination setup
Message message = new Message();
message.setPayload("N8K5Baker#_activate");
message.setResponseDestinationName("acme/n8k5_baker");
_messageBus.sendMessage("acme/n8k5_able", message);
}
ステップ4:MessageListenerに応答ロジックを追加する
MessageListenerのreceive(Message)メソッドで、応答を設定し、メッセージから応答先を取得し、MessageBusインスタンスを使用して応答メッセージを応答先に送信します。 次のようになります。
public void receive(Message message) {
// Message processing
message.setResponse("N8K5CharlieMessageListener");
Message responseMessage = new Message();
responseMessage.setDestinationName(
message.getResponseDestinationName());
responseMessage.setPayload("N8K5CharlieMessageListener");
responseMessage.setResponseId(message.getResponseId());
_messageBus.sendMessage(
message.getResponseDestinationName(), responseMessage);
}
// Existing methods and fields
@Reference
private MessageBus _messageBus;
変更をテストする
サンプルプロジェクトを再デプロイして、変更をテストします。
cd ../../liferay-n8k5.zip
./gradlew deploy -Ddeploy.docker.container.id=$(docker ps -lq)
出力は次のようになります。
STARTED com.acme.n8k5.charlie.impl_1.0.0 [2020]
STARTED com.acme.n8k5.baker.impl_1.0.0 [2025]
INFO [acme/n8k5_able-2][N8K5CharlieMessageListener:23] Received message payload N8K5Baker#_activate
INFO [acme/n8k5_baker-2][N8K5Baker:30] Received message payload N8K5CharlieMessageListener
N8K5CharlieMessageListenerは、N8K5Bakerのメッセージを受信してから、応答メッセージを応答先に送信します。 N8K5Bakerは応答メッセージを受信し、メッセージペイロードを出力します。
クラスが再度メッセージを交換するようにしたい場合は、 Gogo Shellでモジュール (OSGi バンドル) を再起動できます。 バンドルを一覧表示して (lb)、バンドル ID を取得し、バンドルを停止して (stop <id>)、バンドルを再起動します (start <id>)。
OSGi コンポーネントではないクラスでは、 MessageBusUtil と Destination、 DestinationConfiguration、 Message、および MessageListener インスタンスを使用してメッセージを送信できます。
示されているように Destination サービスを登録できますが、 BundleContext を別の方法で取得する必要があります (たとえば、次の呼び出しを行う必要があります: Bundle bundle = FrameworkUtil.getBundle(YourClass.class); BundleContext bundleContext = bundle.getBundleContext())。
2つのクラス間で非同期的にメッセージを交換しました。
次のステップ
非同期メッセージングに慣れてきたので、最適なパフォーマンスになるように調整できます。 方法については、 メッセージング パフォーマンスのチューニングを参照してください。
デフォルト モードと 直接 モードを使用した同期メッセージングについて詳しく知りたい場合は、詳細については 以前のバージョンでの直接同期メッセージングの使用 および 以前のバージョンでのデフォルトの同期メッセージングの使用 を参照してください。