[PLSQL]Oracle Advanced Queue (DBMS_AQ/DMBS_AQADM)

摘要:
AQ其实类似于一个messagequeue,至于为什么叫Advancedqueue,就不清楚了:)messagequeue典型的应用场景是“生产者-消费者”模式。在这个过程中,最重要的还是subscriber怎么知道何时来queuetable里面取message,当然subscriber可以一直不停地查询queuetable或者每隔多长时间来查询一次,这显然都不是很好,最好还是有种方式来提醒subscriber。AQ支持所谓的callback方式来让subscriber及时获得queuetable中的数据。AQ相关接口OracleAdvancedQueue主要提供了DBMS_AQ和DBMS_AQADM两个package程序接口。


扯在前面

Oracle通过AQ (Oracle Streams Advanced Queuing)来提供“进程间” (或者跨会话 -- inter-session) 通信的功能。关于inter-session communication, 貌似DBMS_PIPE也可以做到,这个打算写另外一片水文来介绍,在此不表。 AQ 其实类似于一个message queue, 至于为什么叫Advanced queue,就不清楚了:) message queue典型的应用场景是“生产者-消费者” 模式 (or 发布/订阅 (publisher/subscriber))。

AQ基础在于有个实实在在的"queue table", 因为数据表中的数据是持久化的,共享的,可以被多个session同时访问,因此很容易就实现了多个session信息传递的目的。有了queue table是不够的,自然还是需要有个queue, 然后可以通过这个介质publisher可以把 信息发布到queue table中,subscriber然后可以通过queue来访问queue table来取得数据。在这个过程中,最重要的还是subscriber 怎么知道何时来queue table里面取message, 当然subscriber可以一直不停地查询queue table 或者每隔多长时间来查询一次,这显然都不是很好,最好还是有种方式来提醒subscriber。 AQ支持所谓的callback方式来让subscriber及时获得queue table中的数据。

AQ相关接口

Oracle Advanced Queue 主要提供了DBMS_AQ 和 DBMS_AQADM两个package程序接口。有必要先了解这两个package都提供了什么东西...

相关类型定义

主要包括如下一些:
- ENQUEUE_OPTIONS_T

- DEQUEUE_OPTIONS_T

- MESSAGE_PROPERTIES_T

- AQ$_REG_INFO

- AQ$_DESCRIPTOR

DBMS_AQ


这个包主要定义了enqueue/dequeue等过程。 如下所示 (Note that this is far from a complete list of interfaces)

- ENQUEUE procedure

- ENQUEUE_ARRAY function

- DEQUEUE procedure

- DEQUEUE_ARRAY function

- REGISTER Procedure (Register for message notifications)

- UNREGISTER procedure (Unregister a subscription which turns off notification)

- LISTEN procedures ( Listen to one or more queues on behalf of a list of agents)

- POST procedures (Posts to an anonymous subscription which allows all clients who are registered for the subscription to get notifications)

AQ 比较有用的应该是它提供的 callback procedure来支持异步调用的功能。不过有个限制就是自定义的callback procedure必须满足一定的接口规范,如下所示:

如果message的类型是RAW, 则接口如下

procedureplsqlcallback(
context
INRAW,
reginfo
INSYS.AQ$_REG_INFO,
descr
INSYS.AQ$_DESCRIPTOR,
payload
INRAW,
payloadl
INNUMBER);

如果message的类型的自定义的object类型,则接口如下:

procedureplsqlcallback(
context
INRAW,
reginfo
INSYS.AQ$_REG_INFO,
descr
INSYS.AQ$_DESCRIPTOR,
payload
INVARCHAR2,
payloadl
INNUMBER);

DBMS_AQADM

顾名思义,这个包提供了用来管理AQ的接口。 主要包括以下一些接口,

- CREATE_QUEUE_TABLE procedure

- CREATE_QUEUE procedure

- DROP_QUEUE procedure

- DROP_QUEUE_TABLE procedure

- PURGE_QUEUE_TABLE procedure

- START_QUEUE procedure

- STOP_QUEUE procedure

- ADD_SUBSCRIBER procedure

- REMOVE_SUBSCRIBER procedure

AQ in Action

1. Create a message type ( a.k.a. payload type)

SQL>createtype test_msg_type as2object (message varchar2(4000));
3/
Type created.

2. Create a queue table based on the payload type just created.

SQL>begin2dbms_aqadm.create_queue_table
3( queue_table =>'test_queue_table',
4queue_payload_type =>'test_msg_type');
5end;
6/
PL
/SQL proceduresuccessfully completed.

3. Create a queue and start the queue

SQL>begin2dbms_aqadm.create_queue
3( queue_name =>'test_queue',
4queue_table =>'test_queue_table');
56dbms_aqadm.start_queue
7( queue_name =>'test_queue');
89end;
10/
PL
/SQL proceduresuccessfully completed.

Now let's see what Oracle has created behind the scene so far.

SQL>selectobject_name, object_type fromuser_objects;
OBJECT_NAMEOBJECT_TYPE
---------------------------------------- --------------------TEST_QUEUE_TABLE TABLE
TEST_MSG_TYPE TYPE
SYS_C0054669
INDEX
SYS_LOB0000262448C00030$$ LOB
AQ$_TEST_QUEUE_TABLE_T
INDEX
AQ$_TEST_QUEUE_TABLE_I
INDEX
AQ$_TEST_QUEUE_TABLE_E QUEUE
AQ$_TEST_QUEUE_TABLE_F
VIEW
AQ$TEST_QUEUE_TABLE
VIEW
TEST_QUEUE QUEUE
10rows selected.

Note there is another queue -- AQ$_TEST_QUEUE_TABLE_E created for us. Just as the suffix "E" implies, this queue will be used to store the message if the AQ cannot retrieve a message from our user-queue.

Now let's see what is inside our queue table. Obviously, nothing!

SQL>select*fromtest_queue_table;
no rows selected
SQL
>

4. Enqueue messages

1declare2v_enqueue_options dbms_aq.enqueue_options_t;
3v_message_properties dbms_aq.message_properties_t;
4v_message_handle raw(16);
5v_payload test_msg_type;
6begin7v_payload :=test_msg_type('Hello There');
8dbms_aq.enqueue
9( queue_name =>'test_queue',
10enqueue_options =>v_enqueue_options,
11message_properties =>v_message_properties,
12payload =>v_payload,
13msgid =>v_message_handle);
14commit;
15*end;
16/
PL
/SQL proceduresuccessfully completed.
SQL
>

Note the enqueue action is essentially a transaction (insert into the queue table), hence we needed to commit it to let other sessions can see the data in the queue table.

Now let's see what's inside the queue table.

SQL>selectcount(*) fromtest_queue_table;
COUNT(*)
----------1
SQL
>selectcount(*) fromaq$test_queue_table;
COUNT(*)
----------1
SQL>selectuser_data fromaq$test_queue_table;
USER_DATA(MESSAGE)
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------TEST_MSG_TYPE('Hello There')
SQL
>

5. Browsing messages

DBMS_AQ.DEQUEUE can be used to either dequeue message from the queue (remove the message from the queue table) which is the default behavior, or browse the message from the queue ( will not remove the message from the queue).

To browse the messages, we can set the dequeue message mode to be DBMS_AQ.BROWSE. See below an example,

SQL>declare2v_dequeue_options dbms_aq.dequeue_options_t;
3v_message_properties dbms_aq.message_properties_t;
4v_message_handle raw(16);
5v_payload test_msg_type;
6begin78v_dequeue_options.dequeue_mode :=dbms_aq.browse;
910dbms_aq.dequeue(
11queue_name =>'test_queue',
12dequeue_options =>v_dequeue_options,
13message_properties =>v_message_properties,
14payload =>v_payload,
15msgid =>v_message_handle);
1617dbms_output.put_line('Browsed message: '||v_payload.message);
1819end;
20/
PL
/SQL proceduresuccessfully completed.
SQL
>setserveroutput on
SQL
>/
Browsed message: Hello There
PL
/SQL proceduresuccessfully completed.
SQL
>

We can verify the message is still out there in the queue (table) by querying the view aq$test_queue_table.

SQL>selectuser_data fromaq$test_queue_table;

USER_DATA(MESSAGE)
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------TEST_MSG_TYPE('Hello There')

SQL>

6. Dequeue messages

Now let's do the real dequeuing operation. Note this doesn't have to be from the same session since enqueues are committed transactions and AQ is table-based. Similarly, dequeue is also a transaction. If we are happy with the message, we must commit the dequeue as well.

1declare2v_dequeue_options dbms_aq.dequeue_options_t;
3v_message_properties dbms_aq.message_properties_t;
4v_message_handle raw(16);
5v_payload test_msg_type;
6begin7dbms_aq.dequeue(
8queue_name =>'test_queue',
9dequeue_options =>v_dequeue_options,
10message_properties =>v_message_properties,
11payload =>v_payload,
12msgid =>v_message_handle);
13dbms_output.put_line('Dequeue message: '||v_payload.message);
14 commit;
15*end;
16/
Dequeue message: Hello There
PL
/SQL proceduresuccessfully completed.
SQL
>

We can confirm the message is gone by querying the queue table...

SQL>selectcount(*) fromtest_queue_table;
COUNT(*)
----------0

7. Clean Up

Before going on to the topic of notification, let's do the clean up first.
SQL>begin2dbms_aqadm.stop_queue('test_queue');
3dbms_aqadm.drop_queue('test_queue');
4dbms_aqadm.drop_queue_table('test_queue_table');
5end;
6/
PL
/SQL proceduresuccessfully completed.
SQL
>
SQL>selectobject_name, object_type fromuser_objects;
OBJECT_NAMEOBJECT_TYPE
---------------------------------------- --------------------TEST_MSG_TYPE TYPE
SQL
>

8. Notification

The examples above shows how to dequeue the messages manually. This is not pleasant in the real world. Most of the time, we'd like there would be some mechanism to notify dequeuing instead of dequeuing initiatively.

First create queue table for multiple consumers..

1begin2dbms_aqadm.create_queue_table
3( queue_table =>'test_queue_table',
4queue_payload_type =>'test_msg_type',
5multiple_consumers =>true);
6*end;
SQL
>/
PL
/SQL proceduresuccessfully completed.
SQL
>

Then create the queue and start it as usual,

SQL>begin2dbms_aqadm.create_queue
3( queue_name =>'test_queue',
4queue_table =>'test_queue_table');
56dbms_aqadm.start_queue
7( queue_name =>'test_queue');
8end;
9/
PL
/SQL proceduresuccessfully completed.
SQL
>

To demonstrate the asynchronous nature of notification via callback, we are going to store the queued message in a normal application table.

SQL>createtabletest_message_table
2( message varchar2(4000));
Tablecreated.

Now the key point comes, we create a callback plsql procedure. This procedure will dequeue the message and save it in the table test_message_table when there is notification. Remember the callback procedure interface signature must be as follows,

1createorreplaceproceduretest_queue_callback_procedure
2( context raw,
3reginfo sys.aq$_reg_info,
4descr sys.aq$_descriptor,
5payload raw,
6payloadl number)
7AS8v_dequeue_options dbms_aq.dequeue_options_t;
9v_message_properties dbms_aq.message_properties_t;
10v_message_handle raw(16);
11v_payload test_msg_type;
12begin13v_dequeue_options.msgid :=descr.msg_id;
14v_dequeue_options.consumer_name :=descr.consumer_name;
15dbms_aq.dequeue
16( queue_name =>descr.queue_name,
17dequeue_options =>v_dequeue_options,
18message_properties =>v_message_properties,
19payload =>v_payload,
20msgid =>v_message_handle);
21insertintotest_message_table(message)
22values('Message ['||v_payload.message ||']'||23'dequeued at ['||to_char(systimestamp, 'yyyy-mm-dd hh24:mi:ss.FF3') ||']');
24commit;
25*end;
SQL
>/Procedurecreated.

We need to add a named subscriber to the queue and register the action that the subscriber will take on notification.

1begin2dbms_aqadm.add_subscriber
3( queue_name =>'test_queue',
4subscriber =>sys.aq$_agent
5('test_queue_subscriber',
6null,
7null)
8);
9dbms_aq.register
10(
11sys.aq$_reg_info_list
12( sys.aq$_reg_info
13( 'test_queue:test_queue_suscriber',
14dbms_aq.namespace_aq,
15'plsql://test_queue_callback_procedure',
16HEXTORAW('FF')
17)
18),
19120);
21*end;
22/
PL
/SQL proceduresuccessfully completed.
SQL
>

Refer toAQ$_REG_INFO Typefor the detailed definition.

Now let's see what will happen when we enqueue a message...

1declare2v_enqueue_options dbms_aq.enqueue_options_t;
3v_message_properties dbms_aq.message_properties_t;
4v_message_handle raw(16);
5v_payload test_msg_type;
6begin7v_payload :=test_msg_type(
8to_char(systimestamp,
9'yyyy-mm-dd hh24:mi:ss.ff3'));
10dbms_aq.enqueue
11(
12queue_name =>'test_queue',
13enqueue_options =>v_enqueue_options,
14message_properties =>v_message_properties,
15payload =>v_payload,
16msgid =>v_message_handle);
17commit;
18*end;
SQL
>/
PL
/SQL proceduresuccessfully completed.
SQL
>

To see if the message was automatically dequeued, let's check out the table test_message_table,

SQL>select*fromtest_message_table;
MESSAGE
----------------------------------------------------------------------------------------- ----------------------------------------------------------------------------------------- ----------------------Message [2010-11-06 16:07:47.537]dequeued at [2010-11-06 16:07:51.599]
SQL
>selectcount(*) fromtest_queue_table;
COUNT(*)
----------0
SQL
>

Clean up: remove the subscriber as follows,

1begin2DBMS_AQADM.REMOVE_SUBSCRIBER
3( queue_name =>'test_queue',
4subscriber =>sys.aq$_agent
5('test_queue_subscriber', null, null)
6);
7*end;
SQL
>/
PL
/SQL proceduresuccessfully completed.

Acknowledgements

本文的例子来自Adrian Billington的 introduction to advanced queuing

免责声明:文章转载自《[PLSQL]Oracle Advanced Queue (DBMS_AQ/DMBS_AQADM)》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇Python中的MySQL接口:PyMySQL & MySQLdbc#操作文件夹得读写权限下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

oracle中Blob和Clob类型的区别

一、oracle中Blob和Clob类型的区别BLOB和CLOB都是大字段类型,BLOB是按二进制来存储的,而CLOB是可以直接存储文字的。其实两个是可以互换的的,或者可以直接用LOB字段代替这两个。但是为了更好的管理ORACLE数据库,通常像图片、文件、音乐等信息就用BLOB字段来存储,先将文件转为二进制再存储进去。而像文章或者是较长的文字,就用CLOB...

R语言做正态分布检验

摘自:吴喜之:《非参数统计》(第二版),中国统计出版社,2006年10月:P164-165 1、ks.test() 例如零假设为N(15,0.2),则ks.test(x,"pnorm",15,0.2)。如果不是正态分布,还可以选"pexp", "pgamma"等。 2、shapiro.test() 可以进行关于正态分布的Shapiro-Wilk检验。...

pytest文档69-Hook函数之参数化生成测试用例pytest_generate_tests

前言 pytest 实现参数化有三种方式 pytest.fixture() 使用 fixture 传 params 参数实现参数化 @ pytest.mark.parametrize 允许在测试函数或类中定义多组参数,在用例中实现参数化 pytest_generate_tests 允许定义自定义参数化方案或扩展。 pytest_generate_tes...

如何同步两个SQLServer数据库的内容 dodo

如何同步两个SQLServer数据库的内容? 程序代码可以有版本管理CVS进行同步管理,可是数据库同步就非常麻烦,只能自己改了一个后再去改另一个,如果忘记了更改另一个经常造成两个数据库的结构或内容上不一致. 分发与复制 用强制   订阅实现数据库同步操作  大量和批量的数据可以用数据库的同步机制处理: // 说明: 为方便操作,所有操作均在发布服...

Oracle常见的33个等待事件

Buffer busy waits         原因:        当一个会话试图修改一个数据块,但这个数据块正在被另一个会话修改时。        当一个会话需要读取一个数据块,但这个数据块正在被另一个会话读取到内存中时。        备注:数据处理的最小单位是块 select name,parameter1,parameter2,paramet...

Oracle常用函数汇总

在Oracle OCP考试中,相当一部分知识点涉及到对于Oracle常见函数的考查。尽管Oracle官方文档SQL Language Reference中Functions一章内列举了所有Oracle自带函数,但如果要系统的看一遍,还是要花费相当的精力,更何况还是英文呢。如果碰到一个不熟悉的,就查一下,不经常用,又很容易遗忘。下面就对Oracle常见函数做...