Thursday, November 20, 2008

Oracle Streams Advanced Queuing (AQ)

Advanced Queuing provides database-integrated message queuing functionality. Advanced Queuing leverages the functions of the Oracle database so that messages can be stored persistently, propagated between queues on different machines and databases, and transmitted using Oracle Net Services, HTTP(S), and SMTP.

You must set up the following data structures for certain examples to work:
CONNECT system/manager;
DROP USER aqadm CASCADE;
GRANT CONNECT, RESOURCE TO aqadm;
CREATE USER aqadm IDENTIFIED BY aqadm;
GRANT EXECUTE ON DBMS_AQADM TO aqadm;
GRANT Aq_administrator_role TO aqadm;
DROP USER aq CASCADE;
CREATE USER aq IDENTIFIED BY aq;
GRANT CONNECT, RESOURCE TO aq;
GRANT EXECUTE ON dbms_aq TO aq;
Creating a Queue Table and Queue of Object Type
CREATE OR REPLACETYPE TEST_TRANSACTION AS OBJECT(f_user_id VARCHAR2 (40),f_points NUMBER,f_update_date DATE)
commit;
BEGIN
DBMS_AQADM.CREATE_QUEUE_TABLE (
queue_table => 'TEST_TRANSACTION_TABLE',multiple_consumers => TRUE, queue_payload_type => 'TEST_TRANSACTION');
DBMS_AQADM.CREATE_QUEUE (queue_name => 'TEST_QUEUE',queue_table => 'TEST_TRANSACTION_TABLE');DBMS_AQADM.START_QUEUE (queue_name => 'TEST_QUEUE');
end;
Adding subscriber to the queue
DECLAREsubscriber SYS.AQ$_AGENT;
BEGIN
subscriber := SYS.AQ$_AGENT('SUDHEER', NULL, NULL);
SYS.DBMS_AQADM.ADD_SUBSCRIBER(queue_name => 'TEST_QUEUE',subscriber => subscriber);
END;/
commit;
Procedure to Enqueue
PROCEDURE ENQUEUE_OBJ_MSG(message IN TEST_TRANSACTION)
DECLAREmessage TEST_TRANSACTION;enqueue_options DBMS_AQ.enqueue_options_t;message_properties DBMS_AQ.message_properties_t;message_handle RAW(16);BEGINmessage := TEST_TRANSACTION('TESUSER',5,NULL);dbms_aq.enqueue(queue_name => 'TEST_QUEUE',enqueue_options => enqueue_options,message_properties => message_properties,payload => message,msgid => message_handle);
COMMIT;
END;
SELECT USER_DATA FROM AQ$TEST_TRANSACTION_TABLE
Java Agent to access Queue
public static void main(String args[]){

try{
Class.forName("oracle.jdbc.driver.OracleDriver");
Class.forName("oracle.AQ.AQOracleDriver");
Connection connection = DriverManager.getConnection( "jdbc:oracle:thin:@10.1.66.109:1521:real", "ppokerops", "ppokerops");
connection.setAutoCommit(false);
AQSession aqsession = AQDriverManager.createAQSession(connection);
AQQueue queue = aqsession.getQueue("ppokerops","QUEUE1");
queue.start();
TransDAO data = null;
AQMessage message = queue.createMessage();
AQDequeueOption dq_option = new AQDequeueOption(); dq_option.setConsumerName("SUDHEER");
message = queue.dequeue(dq_option, Class.forName("com.pg.gateway.AQ.TransDAO"));
data = (TransDAO)message.getObjectPayload().getPayloadData();
connection.commit();
queue.close();
aqsession.close();
connection.close();
System.out.println("HERE"+data.getF_userid());
} catch(Exception e) {
e.printStackTrace();
}}
REFERENCE
http://download.oracle.com/docs/cd/B10500_01/appdev.920/a96587/qintro.htm
http://www.itk.ilstu.edu/docs/oracle/server.101/b10785/aq_start.htm#i634413

 
Free Domain Names @ .co.nr!