From ffb13ee948489d9ad2834d4212be32aa3e329151 Mon Sep 17 00:00:00 2001 From: agibert Date: Tue, 9 Dec 2008 15:01:02 +0000 Subject: [PATCH] - Total loader rewrite using byte data type: Raw file loading implemenred, - Add MQ character set support, - Add Character Set option (-cs). --- MQSLoad.java | 1442 +++++++++++++++++++++++++++----------------------- 1 file changed, 776 insertions(+), 666 deletions(-) diff --git a/MQSLoad.java b/MQSLoad.java index acebeed..9805970 100644 --- a/MQSLoad.java +++ b/MQSLoad.java @@ -1,7 +1,7 @@ // $RCSfile: MQSLoad.java,v $ -// $Revision: 1.15 $ +// $Revision: 1.16 $ // $Name: $ -// $Date: 2008/12/04 15:02:16 $ +// $Date: 2008/12/09 15:01:02 $ // $Author: agibert $ /* @@ -35,768 +35,878 @@ import com.ibm.mq.*; public class MQSLoad { - private String MQSLoad_Revision = "$Revision: 1.15 $"; - private String MQSLoad_Tag = "$Name: $"; - private String MQSLoad_Date = "$Date: 2008/12/04 15:02:16 $"; - private String MQSLoad_Author = "$Author: agibert $"; - private MQQueueManager QMng = null; - private String QMng_Name = ""; - private MQQueue MsgQ; - private String MsgQ_Name; - private int MsgQ_Open_Options = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING; - private int Sleep_Time = 0; - private String Field_Break = ""; - private String Message_Break = "\r\n"; - private String Message_Tail = ""; - private int Message_Skip = 0; - private int Message_Count = 0; - private int Repeat_Count = 1; - private boolean Keep_Message_Break = false; - private String File_Name; - private int Msg_Counter = 0; + private String MQSLoad_Revision = "$Revision: 1.16 $"; + private String MQSLoad_Tag = "$Name: $"; + private String MQSLoad_Date = "$Date: 2008/12/09 15:01:02 $"; + private String MQSLoad_Author = "$Author: agibert $"; + private MQQueueManager QMng = null; + private String QMng_Name = ""; + private MQQueue MsgQ; + private String MsgQ_Name; + private int MsgQ_Open_Options = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING; + private int Character_Set = 1208; + private int Sleep_Time = 0; + private String Field_Break = ""; + private String Message_Break = "\r\n"; + private String Message_Tail = ""; + private int Message_Skip = 0; + private int Message_Count = 0; + private int Repeat_Count = 1; + private boolean Keep_Message_Break = false; + private String File_Name; + private int Msg_Counter = 0; + private int Msg_Read_Nb = 0; + private int Msg_Skiped_Nb = 0; + + - - - - //------------------------------------------------------------------------------------------------------------------------- - // - //------------------------------------------------------------------------------------------------------------------------- + + //------------------------------------------------------------------------------------------------------------------------- + // + //------------------------------------------------------------------------------------------------------------------------- + + public static void main( String args[]) + { + new MQSLoad( args); + } + + + + + + //------------------------------------------------------------------------------------------------------------------------- + // + //------------------------------------------------------------------------------------------------------------------------- - public static void main( String args[]) - { - new MQSLoad( args); - } - - - - - - //------------------------------------------------------------------------------------------------------------------------- - // - //------------------------------------------------------------------------------------------------------------------------- + public static String Str_Format( String UnFmt) + { + int idx; + boolean esc = false; + String fmt = ""; - public static String Str_Format( String UnFmt) - { - int idx; - boolean esc = false; - String fmt = ""; - - - for( idx = 0; idx < UnFmt.length(); idx++) + + for( idx = 0; idx < UnFmt.length(); idx++) + { + if( esc) { - if( esc) - { - switch( UnFmt.charAt( idx)) - { - case '\\': - { - fmt += "\\"; - break; - } - - case 'n': - { - fmt += "\n"; - break; - } - - case 'r': - { - fmt += "\r"; - break; - } - - case 't': - { - fmt += "\t"; - break; - } - - default: - { - fmt += "?"; - break; - } - } - - esc = false; - } - else - { - if( UnFmt.charAt( idx) == '\\') - { - esc = true; - } - else - { - fmt += UnFmt.charAt( idx); - } - } - } - - return( fmt); - } - - - - - - //------------------------------------------------------------------------------------------------------------------------- - // - //------------------------------------------------------------------------------------------------------------------------- - - public static String Str_UnFormat( String Fmt) - { - int idx; - String unfmt = ""; - - - for( idx = 0; idx < Fmt.length(); idx++) - { - switch( Fmt.charAt( idx)) + switch( UnFmt.charAt( idx)) { case '\\': { - unfmt += "\\\\"; + fmt += "\\"; break; } - - case '\n': + + case 'n': { - unfmt += "\\n"; + fmt += "\n"; break; } - - case '\r': + + case 'r': { - unfmt += "\\r"; + fmt += "\r"; break; } - - case '\t': + + case 't': { - unfmt += "\\t"; + fmt += "\t"; break; } - + default: { - unfmt += Fmt.charAt( idx); + fmt += "?"; break; } } + + esc = false; } - - return( unfmt); - } + else + { + if( UnFmt.charAt( idx) == '\\') + { + esc = true; + } + else + { + fmt += UnFmt.charAt( idx); + } + } + } + + return( fmt); + } - //------------------------------------------------------------------------------------------------------------------------- - // - //------------------------------------------------------------------------------------------------------------------------- + //------------------------------------------------------------------------------------------------------------------------- + // + //------------------------------------------------------------------------------------------------------------------------- - private void Arg_Parse( String args[]) throws Exception - { - int argc = 0; + public static String Str_UnFormat( String Fmt) + { + int idx; + String unfmt = ""; - try + for( idx = 0; idx < Fmt.length(); idx++) + { + switch( Fmt.charAt( idx)) { - while( argc < args.length) + case '\\': { - if( args[argc].equals( "-qm")) + unfmt += "\\\\"; + break; + } + + case '\n': + { + unfmt += "\\n"; + break; + } + + case '\r': + { + unfmt += "\\r"; + break; + } + + case '\t': + { + unfmt += "\\t"; + break; + } + + default: + { + unfmt += Fmt.charAt( idx); + break; + } + } + } + + return( unfmt); + } + + + + + + //------------------------------------------------------------------------------------------------------------------------- + // + //------------------------------------------------------------------------------------------------------------------------- + + private void Arg_Parse( String args[]) throws Exception + { + int argc = 0; + + + try + { + while( argc < args.length) + { + if( args[argc].equals( "-qm")) + { + if( argc < ( args.length + 1)) { - if( argc < ( args.length + 1)) - { - QMng_Name = Str_Format( args[++argc]); - } - else - { - System.out.println( "Invalid number of command line options..."); - throw new Exception(); - } - } - else if( args[argc].equals( "-fb")) - { - if( argc < ( args.length + 1)) - { - Field_Break = Str_Format( args[++argc]); - } - else - { - System.out.println( "Invalid number of command line options..."); - throw new Exception(); - } - } - else if ( args[argc].equals( "-mb")) - { - if( argc < ( args.length + 1)) - { - Message_Break = Str_Format( args[++argc]); - } - else - { - System.out.println( "Invalid number of command line options..."); - throw new Exception(); - } - } - else if ( args[argc].equals( "-mt")) - { - if( argc < ( args.length + 1)) - { - Message_Tail = Str_Format( args[++argc]); - } - else - { - System.out.println( "Invalid number of command line options..."); - throw new Exception(); - } - } - else if ( args[argc].equals( "-st")) - { - if( argc < ( args.length + 1)) - { - Sleep_Time = Integer.parseInt( args[++argc]); - } - else - { - System.out.println( "Invalid number of command line options..."); - throw new Exception(); - } - } - else if ( args[argc].equals( "-ms")) - { - if( argc < ( args.length + 1)) - { - Message_Skip = Integer.parseInt( args[++argc]); - } - else - { - System.out.println( "Invalid number of command line options..."); - throw new Exception(); - } - } - else if ( args[argc].equals( "-mc")) - { - if( argc < ( args.length + 1)) - { - Message_Count = Integer.parseInt( args[++argc]); - } - else - { - System.out.println( "Invalid number of command line options..."); - throw new Exception(); - } - } - else if ( args[argc].equals( "-rc")) - { - if( argc < ( args.length + 1)) - { - Repeat_Count = Integer.parseInt( args[++argc]); - } - else - { - System.out.println( "Invalid number of command line options..."); - throw new Exception(); - } - } - else if ( args[argc].equals( "-kmb")) - { - Keep_Message_Break = true; + QMng_Name = Str_Format( args[++argc]); } else { - MsgQ_Name = args[argc++]; - File_Name = args[argc++]; - - if( argc < args.length) - { - System.out.println( "Invalid number of command line options..."); - throw new Exception(); - } + System.out.println( "Invalid number of command line options..."); + throw new Exception(); } - - ++argc; } - } - - catch( Exception Expt) - { - throw Expt; + else if ( args[argc].equals( "-cs")) + { + if( argc < ( args.length + 1)) + { + Character_Set = Integer.parseInt( args[++argc]); + } + else + { + System.out.println( "Invalid number of command line options..."); + throw new Exception(); + } + } + else if ( args[argc].equals( "-st")) + { + if( argc < ( args.length + 1)) + { + Sleep_Time = Integer.parseInt( args[++argc]); + } + else + { + System.out.println( "Invalid number of command line options..."); + throw new Exception(); + } + } + else if( args[argc].equals( "-fb")) + { + if( argc < ( args.length + 1)) + { + Field_Break = Str_Format( args[++argc]); + } + else + { + System.out.println( "Invalid number of command line options..."); + throw new Exception(); + } + } + else if ( args[argc].equals( "-mb")) + { + if( argc < ( args.length + 1)) + { + Message_Break = Str_Format( args[++argc]); + } + else + { + System.out.println( "Invalid number of command line options..."); + throw new Exception(); + } + } + else if ( args[argc].equals( "-mt")) + { + if( argc < ( args.length + 1)) + { + Message_Tail = Str_Format( args[++argc]); + } + else + { + System.out.println( "Invalid number of command line options..."); + throw new Exception(); + } + } + else if ( args[argc].equals( "-ms")) + { + if( argc < ( args.length + 1)) + { + Message_Skip = Integer.parseInt( args[++argc]); + } + else + { + System.out.println( "Invalid number of command line options..."); + throw new Exception(); + } + } + else if ( args[argc].equals( "-mc")) + { + if( argc < ( args.length + 1)) + { + Message_Count = Integer.parseInt( args[++argc]); + } + else + { + System.out.println( "Invalid number of command line options..."); + throw new Exception(); + } + } + else if ( args[argc].equals( "-rc")) + { + if( argc < ( args.length + 1)) + { + Repeat_Count = Integer.parseInt( args[++argc]); + } + else + { + System.out.println( "Invalid number of command line options..."); + throw new Exception(); + } + } + else if ( args[argc].equals( "-kmb")) + { + Keep_Message_Break = true; + } + else + { + MsgQ_Name = args[argc++]; + File_Name = args[argc++]; + + if( argc < args.length) + { + System.out.println( "Invalid number of command line options..."); + throw new Exception(); + } + } + + ++argc; } } + + catch( Exception Expt) + { + throw Expt; + } + } - //------------------------------------------------------------------------------------------------------------------------- - // - //------------------------------------------------------------------------------------------------------------------------- + //------------------------------------------------------------------------------------------------------------------------- + // + //------------------------------------------------------------------------------------------------------------------------- - private void MQSInit() throws Exception - { - short retry = 0; - final short RETRY_MAX = 10; + private void MQSInit() throws Exception + { + short retry = 0; + final short RETRY_MAX = 10; - while( QMng == null) - { - try - { - QMng = new MQQueueManager( QMng_Name); -// System.out.println( "QManager Open: (" + QMng + ") !"); - } - - catch( Exception Expt) - { - System.out.print("!"); - - if( retry++ > RETRY_MAX) - { - throw Expt; - } - } - } - + while( QMng == null) + { try { - MsgQ = QMng.accessQueue( MsgQ_Name, MsgQ_Open_Options, null, null, null); -// System.out.println( "MsgQ Open: (" + MsgQ + ") !"); + QMng = new MQQueueManager( QMng_Name); + // System.out.println( "QManager Open: (" + QMng + ") !"); } + + catch( Exception Expt) + { + System.out.print("!"); + + if( retry++ > RETRY_MAX) + { + throw Expt; + } + } + } + + try + { + MsgQ = QMng.accessQueue( MsgQ_Name, MsgQ_Open_Options, null, null, null); + // System.out.println( "MsgQ Open: (" + MsgQ + ") !"); + } - catch( Exception Expt) - { - QMng.disconnect(); -// System.out.println( "QManager Close: (" + QMng + ") !"); + catch( Exception Expt) + { + QMng.disconnect(); + // System.out.println( "QManager Close: (" + QMng + ") !"); - throw Expt; - } - } + throw Expt; + } + } - //------------------------------------------------------------------------------------------------------------------------- - // - //------------------------------------------------------------------------------------------------------------------------- + //------------------------------------------------------------------------------------------------------------------------- + // + //------------------------------------------------------------------------------------------------------------------------- - private void MQSDeInit() throws Exception - { - try - { - MsgQ.close(); -// System.out.println( "MsgQ Close: (" + MsgQ + ") !"); - QMng.disconnect(); -// System.out.println( "QManager Close: (" + QMng + ") !"); - } + private void MQSDeInit() throws Exception + { + try + { + MsgQ.close(); + // System.out.println( "MsgQ Close: (" + MsgQ + ") !"); + QMng.disconnect(); + // System.out.println( "QManager Close: (" + QMng + ") !"); + } - catch( Exception Expt) - { - throw Expt; - } - } + catch( Exception Expt) + { + throw Expt; + } + } - //------------------------------------------------------------------------------------------------------------------------- - // - //------------------------------------------------------------------------------------------------------------------------- + //------------------------------------------------------------------------------------------------------------------------- + // + //------------------------------------------------------------------------------------------------------------------------- - private void MQSPut_Msg( MQMessage Msg) throws Exception - { - try - { - MQPutMessageOptions pmo = new MQPutMessageOptions(); - MsgQ.put( Msg, pmo); - } + private void MQSPut_Msg( MQMessage Msg) throws Exception + { + try + { + MQPutMessageOptions pmo = new MQPutMessageOptions(); + MsgQ.put( Msg, pmo); + } - catch( Exception Expt) - { - throw Expt; - } - } + catch( Exception Expt) + { + throw Expt; + } + } - //------------------------------------------------------------------------------------------------------------------------- - // - //------------------------------------------------------------------------------------------------------------------------- + //------------------------------------------------------------------------------------------------------------------------- + // + //------------------------------------------------------------------------------------------------------------------------- - private void Counter_Print( String Mark) + private int Buffer_Lookup( byte[] Buffer, int Offset, int End, byte[] Message_Break) + { + int i = Offset; + int j = 0; + + + while( ( i < End) && ( j < Message_Break.length)) { - if( ( Msg_Counter % 50) == 0) + if( Buffer[i++] == Message_Break[j]) { - System.out.print( "\n(" + Msg_Counter + ")\t"); - } - - if( ( Msg_Counter % 10) == 0) - { - System.out.print( " "); - } - - System.out.print( Mark); - - Msg_Counter++; - } - - - - - - //------------------------------------------------------------------------------------------------------------------------- - // - //------------------------------------------------------------------------------------------------------------------------- - - private void Stand_By(int timer) throws Exception - { - if( timer > 0) - { - try - { - synchronized( this) - { - wait( timer); - } - } - catch(InterruptedException ie) - { - ie.printStackTrace(); - } - } - } - - - - - - //------------------------------------------------------------------------------------------------------------------------- - // - //------------------------------------------------------------------------------------------------------------------------- - - private int Put_Msg( MQMessage Output_Msg, int Msg_Id, int Msg_Nb, int Msg_Skip, int Msg_Count) throws Exception - { - if( Msg_Id < Msg_Skip) - { - Counter_Print( "*"); + j++; } else { - if( ( Msg_Count != 0) && ( Msg_Id >= ( Msg_Skip + Msg_Count))) - { - Counter_Print( "#"); - } - else - { - Output_Msg.messageId = MQC.MQMI_NONE; - MQSPut_Msg( Output_Msg); - Output_Msg.clearMessage(); - - Counter_Print( "."); - - Msg_Nb++; - - Stand_By( Sleep_Time); - } + j = 0; } - - return(Msg_Nb); } - - - - - //------------------------------------------------------------------------------------------------------------------------- - // - //------------------------------------------------------------------------------------------------------------------------- - - private int Load_File( RandomAccessFile Input_File, int Msg_Nb, int Msg_Skip, int Msg_Count) throws Exception - { - byte input_char; - MQMessage output_msg = new MQMessage(); - int buffer_size, cur_buffer_size, window_size, cur_window_size, prefetch_size, buffer_offset, read_size; - int cur_pos, end_pos, next_message, next_field; - int msg_id = 0; - byte[] buffer_byte; - String buffer_string, out_string = null; - - - try - { - output_msg.format = MQC.MQFMT_STRING; - - - /* Default window size */ - window_size = 1024 * 64; - prefetch_size = Field_Break.length() + Message_Break.length(); - buffer_size = window_size + prefetch_size; - - buffer_offset = 0; - - buffer_byte = new byte[buffer_size]; - - do - { - read_size = Input_File.read( buffer_byte, buffer_offset, buffer_size - buffer_offset); -// System.out.println( "Load buffer read_size: (" + read_size + ") asked: (" + (buffer_size - buffer_offset) + ")"); - - if( read_size == -1) - { - read_size = 0; - } - - /* Compute current buffer and window size */ - cur_window_size = Math.min( ( read_size + buffer_offset), window_size); - cur_buffer_size = ( read_size + buffer_offset); - - buffer_string = new String( buffer_byte, 0, cur_buffer_size); - cur_pos = 0; - -// System.out.println( " CurWS: (" + cur_window_size + ") CurBS: (" + cur_buffer_size + ") Buffer: [" + buffer_string + "]"); - - while( cur_pos < cur_window_size) - { -// System.out.println( "- CurPos: (" + cur_pos + ")"); - next_message = buffer_string.indexOf( Message_Break, cur_pos); - - if( ( next_message == -1) || ( next_message > cur_window_size)) - { - next_message = cur_window_size + 1; - end_pos = cur_window_size; - } - else - { -// System.out.println( "+ Get message next_message: (" + next_message + ")"); - end_pos = next_message; - } - - while( cur_pos < end_pos) - { -// System.out.println( "+ CurPos: (" + cur_pos + ") end_pos: (" + end_pos + ")"); - if( Field_Break.length() > 0) - { - next_field = buffer_string.indexOf( Field_Break, cur_pos); - } - else - { - next_field = -1; - } - - if( ( next_field == -1) || ( next_field > end_pos)) - { - out_string = buffer_string.substring( cur_pos, end_pos); - cur_pos = end_pos; - } - else - { -// System.out.println( "+ Get field next_field: (" + next_field + ")"); - out_string = buffer_string.substring( cur_pos, next_field); - cur_pos = next_field + Field_Break.length(); - } - - output_msg.writeString( out_string); -// System.out.println( "* Msg: [" + out_string + "]"); - - } - - if( next_message <= cur_window_size) - { - /* A message break has been found: send the data... */ -// System.out.println( "* Write Msg CurPos: (" + cur_pos + ")"); - output_msg.writeString( Message_Tail); - - if( Keep_Message_Break) - { - output_msg.writeString( Message_Break); - } - -// System.out.println( "(" + Msg_Nb + ")"); - Msg_Nb = Put_Msg( output_msg, msg_id, Msg_Nb, Msg_Skip, Msg_Count); - - msg_id++; - out_string = null; - cur_pos += Message_Break.length(); - } - } - - buffer_offset = cur_buffer_size - cur_pos; -// System.out.println( "% CurPos: (" + cur_pos + ") BufOff: (" + buffer_offset + ")"); - - if( buffer_offset > 0) - { - /* Copy back prefetch area */ - System.arraycopy( buffer_byte, cur_pos, buffer_byte, 0, buffer_offset); - } - } - while( cur_buffer_size > 0); - - if( out_string != null) - { -// System.out.println( "* Flushing Msg"); - Msg_Nb = Put_Msg( output_msg, msg_id, Msg_Nb, Msg_Skip, Msg_Count); - } - - return( Msg_Nb); - } - - catch( Exception Expt) - { - throw Expt; - } - } - - - - - - //------------------------------------------------------------------------------------------------------------------------- - // - //------------------------------------------------------------------------------------------------------------------------- - - private void Print_Usage( ) throws Exception - { - System.out.println( "Usage: MQSLoad [-qm \"Output_QueueMng_Name\"] [-st \"sleep_time\"] [-fb \"field_break\"] [-mb \"message_break\"] [-mt \"message_tail\"] [-kmb] [-ms \"message_skip\"] [-mc \"message_count\"] [-rc \"repeat_count\"] "); - System.out.println( " Default: Output QueueMng Name: (" + QMng_Name + ") Sleep Time: (" + Sleep_Time + ") Field Break: (" + Str_UnFormat( Field_Break) + ") Message Break: (" + Str_UnFormat( Message_Break) + ") Message Tail: (" + Str_UnFormat( Message_Tail) + ") Keep Message Break: (" + Keep_Message_Break + ") Message Skip: (" + Message_Skip + ") Message Count: (" + Message_Count + ") Repeat Count: (" + Repeat_Count + ")"); - } - - - - - - //------------------------------------------------------------------------------------------------------------------------- - // - //------------------------------------------------------------------------------------------------------------------------- - - private void Print_Args( ) throws Exception - { - System.out.println( "Output QueueMng Name: (" + QMng_Name + ") Output MsgQueue Name: (" + MsgQ_Name + ") Input File Name: (" + File_Name + ")"); - System.out.println( "Sleep Time: (" + Sleep_Time + ") Field Break: (" + Str_UnFormat( Field_Break) + ") Message Break: (" + Str_UnFormat( Message_Break) + ") Message Tail: (" + Str_UnFormat( Message_Tail) + ") Keep Message Break: (" + Keep_Message_Break + ") Message Skip: (" + Message_Skip + ") Message Count: (" + Message_Count + ") Repeat Count: (" + Repeat_Count + ")"); - } - - - - - - //------------------------------------------------------------------------------------------------------------------------- - // - //------------------------------------------------------------------------------------------------------------------------- - - public MQSLoad( String args[]) + if( j == Message_Break.length) { - int msg_nb = 0; - int loop; - long time_begin, time_end; - double time_elapsed, speed; - NumberFormat nf = NumberFormat.getInstance(); - - - try + return( i - j); + } + else + { + return( -1); + } + } + + + + + + //------------------------------------------------------------------------------------------------------------------------- + // + //------------------------------------------------------------------------------------------------------------------------- + + private void Msg_Write( MQMessage Msg, byte[] Buffer, int Offset, int End, byte[] Field_Break) throws Exception + { + int cur_pos = Offset; + int fld_break; + + + /* scan into the buffer */ + while( cur_pos < End) + { + if( Field_Break.length == 0) { - System.out.println("MQSLoad: " + MQSLoad_Tag + " / " + MQSLoad_Date + " / " + MQSLoad_Author); - - if( ( args.length < 2) || ( args.length > 19)) - { - Print_Usage(); - System.exit( 1); - } - else - { - System.out.println( "MQS Load Starting..."); - - Arg_Parse( args); + /* No Field Break */ + // System.out.println( "* Skip FB Lookup..."); - Print_Args(); - - MQSInit(); + fld_break = End; + } + else + { + /* Field Break Lookup */ + // System.out.println( "* FB Lookup: (" + fld_break + ")"); - try - { - RandomAccessFile input_file = new RandomAccessFile( File_Name, "r"); - - try - { -// System.out.println("Input File Open: (" + input_file + ") !"); + fld_break = Buffer_Lookup( Buffer, cur_pos, End, Field_Break); + } + + if( fld_break == -1) + { + /* Write the rest of the buffer */ + // System.out.println( "* Last Field"); - time_begin = System.currentTimeMillis(); - - for( loop = 0; loop < Repeat_Count; loop++ ) - { - try - { - input_file.seek(0); - } - - catch( Exception Expt) - { - if( Repeat_Count > 1) - { - System.out.println( "Repeat Count should be 1 for non seekable file !"); - throw Expt; - } - } - - msg_nb = Load_File( input_file, msg_nb, Message_Skip, Message_Count); - } - - time_end = System.currentTimeMillis(); - - time_elapsed = ( time_end - time_begin) / 1000.0; - speed = msg_nb / time_elapsed; - - nf.setMinimumFractionDigits(2); - nf.setMaximumFractionDigits(2); - - System.out.println( "\n"); - System.out.println( "Loaded Message Nb: (" + msg_nb - + ") Elapsed Time: (" + nf.format(time_elapsed) - + ") s Speed: (" + nf.format(speed) - + ") msg/s"); - } - - catch( Exception Expt) - { - input_file.close(); - throw Expt; - } - - input_file.close(); - } - - catch( Exception Expt) - { - MQSDeInit(); - throw Expt; - } - - MQSDeInit(); - System.out.println( "MQS Load Completed !"); - - System.exit( 0); - } + fld_break = End; } - catch( Exception Expt) + /* Write to MQ-Message */ + // System.out.println( "* Write Field: cur_pos(" + cur_pos + ") Len: (" + ( fld_break - cur_pos) + ")"); + + Msg.write( Buffer, cur_pos, ( fld_break - cur_pos)); + + cur_pos = fld_break + Field_Break.length; + } + } + + + + + + //------------------------------------------------------------------------------------------------------------------------- + // + //------------------------------------------------------------------------------------------------------------------------- + + private void Counter_Print( String Mark) + { + if( ( Msg_Counter % 50) == 0) + { + System.out.print( "\n(" + Msg_Counter + ")\t"); + } + + if( ( Msg_Counter % 10) == 0) + { + System.out.print( " "); + } + + System.out.print( Mark); + + Msg_Counter++; + } + + + + + + //------------------------------------------------------------------------------------------------------------------------- + // + //------------------------------------------------------------------------------------------------------------------------- + + private void Stand_By(int timer) throws Exception + { + if( timer > 0) + { + try { - System.out.println("Exception: (" + Expt + ") !"); - Expt.printStackTrace(); - + synchronized( this) + { + wait( timer); + } + } + catch(InterruptedException ie) + { + ie.printStackTrace(); + } + } + } + + + + + + //------------------------------------------------------------------------------------------------------------------------- + // + //------------------------------------------------------------------------------------------------------------------------- + + private int Msg_Put( MQMessage Output_Msg, int Msg_Id, int Msg_Nb, int Msg_Skip, int Msg_Count) throws Exception + { + if( Msg_Id < Msg_Skip) + { + Counter_Print( "*"); + Msg_Skiped_Nb++; + } + else + { + if( ( Msg_Count != 0) && ( Msg_Id >= ( Msg_Skip + Msg_Count))) + { + Counter_Print( "#"); + Msg_Skiped_Nb++; + } + else + { + Output_Msg.messageId = MQC.MQMI_NONE; + MQSPut_Msg( Output_Msg); + Output_Msg.clearMessage(); + + Counter_Print( "."); + + Msg_Nb++; + Msg_Read_Nb++; + + Stand_By( Sleep_Time); + } + } + + Msg_Id++; + + if( ( Msg_Count != 0) && ( Msg_Id >= ( Msg_Skip + Msg_Count))) + { + /* Last Message to be read */ + return( -1); + } + else + { + return( Msg_Nb); + } + } + + + + + + //------------------------------------------------------------------------------------------------------------------------- + // + //------------------------------------------------------------------------------------------------------------------------- + + private void File_Load( RandomAccessFile Input_File, int Msg_Skip, int Msg_Count) throws Exception + { + MQMessage output_msg = new MQMessage(); + byte[] buffer; + byte[] message_break = Message_Break.getBytes(); + byte[] field_break = Field_Break.getBytes(); + byte[] message_tail = Message_Tail.getBytes(); + int buf_size, work_win_size, break_win_size, prefetch_size, fetch_size; + int cur_buf_size, cur_work_win_size, cur_pos, msg_break; + int msg_id = 0; + int msg_nb = 0; + + + try + { + output_msg.format = MQC.MQFMT_STRING; + output_msg.characterSet = Character_Set; + + work_win_size = 1024 * 64; + break_win_size = Field_Break.length() + message_break.length; + + buf_size = work_win_size + break_win_size; + buffer = new byte[buf_size]; + + prefetch_size = 0; + + do + { + /* Fill the buffer */ + do + { + fetch_size = Input_File.read( buffer, prefetch_size, buf_size - prefetch_size); + // System.out.println( "Load buffer fetch_size: (" + fetch_size + ") asked: (" + (buf_size - prefetch_size) + ")"); + + if( fetch_size == -1) + { + fetch_size = 0; + } + + /* compute the current buffer size, work window size and new prefetch size */ + cur_buf_size = prefetch_size + fetch_size; + cur_work_win_size = Math.min( work_win_size, cur_buf_size); + + if( cur_buf_size < break_win_size) + { + prefetch_size = cur_buf_size; + } + + cur_pos = 0; + + // System.out.println( " Cur Work Win Size: (" + cur_work_win_size + ") Cur Buf Size: (" + cur_buf_size + ") Prefetch Size: (" + prefetch_size + ")"); + } + while( ( fetch_size != 0) && ( cur_buf_size < break_win_size)); + + + + /* scan into the work window */ + while( cur_pos < cur_work_win_size) + { + // System.out.println( "- CurPos: (" + cur_pos + ")"); + + msg_break = Buffer_Lookup( buffer, cur_pos, cur_buf_size, message_break); + + if( msg_break == -1) + { + /* write the message to the MQ buffer */ + // System.out.println( "+ No msg break"); + // System.out.println( "* Write MQ Buffer Offset: (" + cur_pos + ") End: (" + cur_work_win_size + ")"); + Msg_Write( output_msg, buffer, cur_pos, cur_work_win_size, field_break); + + cur_pos = cur_work_win_size; + } + else + { + // System.out.println( "+ Get msg break : (" + msg_break + ")"); + + /* write the message to the MQ buffer */ + Msg_Write( output_msg, buffer, cur_pos, msg_break, field_break); + + /* write the message tail to the MQ buffer */ + output_msg.write( message_tail, 0, message_tail.length); + + if( Keep_Message_Break) + { + /* write the message break to the MQ buffer */ + // System.out.println( "* Write MQ Buffer Offset: (" + cur_pos + ") End: (" + ( msg_break + message_break.length) + ")"); + output_msg.write( buffer, msg_break, message_break.length); + } + + /* send the message */ + // System.out.println( "* Put Msg()"); + + if( ( msg_nb = Msg_Put( output_msg, msg_id, msg_nb, Msg_Skip, Msg_Count)) == -1) + { + return; + } + + msg_id++; + cur_pos = msg_break + message_break.length; + } + } + + + + prefetch_size = cur_buf_size - cur_pos; + // System.out.println( "- Prefetch size: (" + prefetch_size + ")"); + + if( prefetch_size > 0) + { + /* Copy back prefetch area */ + System.arraycopy( buffer, cur_pos, buffer, 0, prefetch_size); + } + } + while( cur_buf_size > 0); + + + + if( output_msg.getMessageLength() > 0) + { + /* flush the message */ + // System.out.println( "* Put Msg()"); + Msg_Put( output_msg, msg_id, msg_nb, Msg_Skip, Msg_Count); + } + + return; + } + + catch( Exception Expt) + { + throw Expt; + } + } + + + + + + //------------------------------------------------------------------------------------------------------------------------- + // + //------------------------------------------------------------------------------------------------------------------------- + + private void Print_Usage( ) throws Exception + { + System.out.println( "Usage: MQSLoad [-qm \"Output_QueueMng_Name\"] [-cs \"character_set\"] [-st \"sleep_time\"] [-fb \"field_break\"] [-mb \"message_break\"] [-mt \"message_tail\"] [-kmb] [-ms \"message_skip\"] [-mc \"message_count\"] [-rc \"repeat_count\"] "); + System.out.println( " Default: Output QueueMng Name: (" + QMng_Name + ") Character Set: (" + Character_Set + ") Sleep Time: (" + Sleep_Time + ") Field Break: (" + Str_UnFormat( Field_Break) + ") Message Break: (" + Str_UnFormat( Message_Break) + ") Message Tail: (" + Str_UnFormat( Message_Tail) + ") Keep Message Break: (" + Keep_Message_Break + ") Message Skip: (" + Message_Skip + ") Message Count: (" + Message_Count + ") Repeat Count: (" + Repeat_Count + ")"); + } + + + + + + //------------------------------------------------------------------------------------------------------------------------- + // + //------------------------------------------------------------------------------------------------------------------------- + + private void Print_Args( ) throws Exception + { + System.out.println( "Output QueueMng Name: (" + QMng_Name + ") Output MsgQueue Name: (" + MsgQ_Name + ") Input File Name: (" + File_Name + ")"); + System.out.println( "Character Set: (" + Character_Set + ") Sleep Time: (" + Sleep_Time + ") Field Break: (" + Str_UnFormat( Field_Break) + ") Message Break: (" + Str_UnFormat( Message_Break) + ") Message Tail: (" + Str_UnFormat( Message_Tail) + ") Keep Message Break: (" + Keep_Message_Break + ") Message Skip: (" + Message_Skip + ") Message Count: (" + Message_Count + ") Repeat Count: (" + Repeat_Count + ")"); + } + + + + + + //------------------------------------------------------------------------------------------------------------------------- + // + //------------------------------------------------------------------------------------------------------------------------- + + public MQSLoad( String args[]) + { + int loop; + long time_begin, time_end; + double time_elapsed, speed; + NumberFormat nf = NumberFormat.getInstance(); + + + try + { + System.out.println("MQSLoad: " + MQSLoad_Tag + " / " + MQSLoad_Date + " / " + MQSLoad_Author); + + if( ( args.length < 2) || ( args.length > 19)) + { + Print_Usage(); System.exit( 1); } + else + { + System.out.println( "MQS Load Starting..."); + + Arg_Parse( args); + Print_Args(); + + MQSInit(); + + try + { + RandomAccessFile input_file = new RandomAccessFile( File_Name, "r"); + + try + { + // System.out.println("Input File Open: (" + input_file + ") !"); + + time_begin = System.currentTimeMillis(); + + for( loop = 0; loop < Repeat_Count; loop++ ) + { + try + { + input_file.seek(0); + } + + catch( Exception Expt) + { + if( Repeat_Count > 1) + { + System.out.println( "Repeat Count should be 1 for non seekable file !"); + throw Expt; + } + } + + File_Load( input_file, Message_Skip, Message_Count); + } + + time_end = System.currentTimeMillis(); + + time_elapsed = ( time_end - time_begin) / 1000.0; + speed = Msg_Read_Nb / time_elapsed; + + nf.setMinimumFractionDigits(2); + nf.setMaximumFractionDigits(2); + + System.out.println( "\n"); + System.out.println( "Loaded Message Nb: (" + Msg_Read_Nb + + ") Skiped Messages Nb: (" + Msg_Skiped_Nb + + ") Elapsed Time: (" + nf.format(time_elapsed) + + ") s Speed: (" + nf.format(speed) + + ") msg/s"); + } + + catch( Exception Expt) + { + input_file.close(); + throw Expt; + } + + input_file.close(); + } + + catch( Exception Expt) + { + MQSDeInit(); + throw Expt; + } + + MQSDeInit(); + System.out.println( "MQS Load Completed !"); + + System.exit( 0); + } } + catch( Exception Expt) + { + System.out.println("Exception: (" + Expt + ") !"); + Expt.printStackTrace(); + + System.exit( 1); + } + } - //------------------------------------------------------------------------------------------------------------------------- + + //------------------------------------------------------------------------------------------------------------------------- }