From 81ef96b80dacecc17fc053dcb24a6925b14de061 Mon Sep 17 00:00:00 2001 From: giberta1 Date: Fri, 19 Apr 2002 15:22:14 +0000 Subject: [PATCH] Rewrite the loading engine, Add configurable feeld and message breaks, Add new command line options: -fb and -mb. --- MQSLoad.java | 382 ++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 331 insertions(+), 51 deletions(-) diff --git a/MQSLoad.java b/MQSLoad.java index b5563a9..26486e5 100644 --- a/MQSLoad.java +++ b/MQSLoad.java @@ -1,16 +1,14 @@ // $RCSfile: MQSLoad.java,v $ -// $Revision: 1.2 $ +// $Revision: 1.3 $ // $Name: $ -// $Date: 2002/04/15 10:46:14 $ +// $Date: 2002/04/19 15:22:14 $ // $Author: giberta1 $ /* - * 07/12/2001 - 11:45:14 - * - * MQSLoad.java - MQ/Seires data file to queue loader - * Copyright (C) 2001 Arnaud G. Gibert - * arnaud.gibert@midas-kapiti.fr - * www.midas-kapiti.fr + * MQSLoad.java - Data file to MQ/Series queue loader + * Copyright (C) 2001-2002 Arnaud G. Gibert + * arnaud.gibert@misys.com + * www.miys-ibs.fr * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License @@ -42,16 +40,163 @@ public class MQSLoad private String MsgQ_Name; private int MsgQ_Open_Options = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING; private String File_Name; + private String Field_Break = ""; + private String Message_Break = "\n"; - - public static void main( String args[]) + + + + + //------------------------------------------------------------------------------------------------------------------------- + // + //------------------------------------------------------------------------------------------------------------------------- + + public static void main( String args[]) { new MQSLoad( args); } - private void MQSInit() throws Exception + + + //------------------------------------------------------------------------------------------------------------------------- + // + //------------------------------------------------------------------------------------------------------------------------- + + public static String Str_Format( String Fmt) + { + int idx; + boolean esc = false; + String result = ""; + + + for( idx = 0; idx < Fmt.length(); idx++) + { + if( esc) + { + switch( Fmt.charAt( idx)) + { + case '/': + { + result = result + "\\"; + break; + } + + case 'n': + { + result = result + "\n"; + break; + } + + case 'r': + { + result = result + "\r"; + break; + } + + case 't': + { + result = result + "\t"; + break; + } + + default: + { + result = result + "?"; + } + } + + esc = false; + } + else + { + if( Fmt.charAt( idx) == '\\') + { + esc = true; + } + else + { + result = result + Fmt.charAt( idx); + } + } + } + + return( result); + } + + + + + + //------------------------------------------------------------------------------------------------------------------------- + // + //------------------------------------------------------------------------------------------------------------------------- + + private void Arg_Parse( String args[]) throws Exception + { + int argc = 0; + + + try + { + while( argc < args.length) + { + if( args[argc].equals( "-fb")) + { + if( argc < ( args.length + 1)) + { + Field_Break = Str_Format( args[++argc]); + } + else + { + System.out.println( "Invalid number of command line options..."); + throw new Exception(); + } + } + else if ( args[argc].equals( "-mb")) + { + if( argc < ( args.length + 1)) + { + Message_Break = Str_Format( args[++argc]); + } + else + { + System.out.println( "Invalid number of command line options..."); + throw new Exception(); + } + } + else + { + MsgQ_Name = args[argc++]; + File_Name = args[argc++]; + + if( argc < args.length) + { + System.out.println( "Invalid number of command line options..."); + throw new Exception(); + } + } + + ++argc; + } + } + + catch( Exception Expt) + { + throw Expt; + } + } + + + + + + //------------------------------------------------------------------------------------------------------------------------- + // + //------------------------------------------------------------------------------------------------------------------------- + + private void MQSInit() throws Exception { try { @@ -79,8 +224,14 @@ public class MQSLoad } } + + + //------------------------------------------------------------------------------------------------------------------------- + // + //------------------------------------------------------------------------------------------------------------------------- + private void MQSDeInit() throws Exception { try @@ -99,6 +250,12 @@ public class MQSLoad + + + //------------------------------------------------------------------------------------------------------------------------- + // + //------------------------------------------------------------------------------------------------------------------------- + private void MQSPut_Msg( MQMessage Msg) throws Exception { try @@ -114,71 +271,194 @@ public class MQSLoad } + + - public MQSLoad( String args[]) + //------------------------------------------------------------------------------------------------------------------------- + // + //------------------------------------------------------------------------------------------------------------------------- + + private void Counter_Print( int Msg_Nb) + { + if( ( Msg_Nb % 50) == 0) + { + System.out.print( "\n(" + Msg_Nb + ")\t"); + } + + if( ( Msg_Nb % 10) == 0) + { + System.out.print( " "); + } + + System.out.print( "."); + } + + + + + + //------------------------------------------------------------------------------------------------------------------------- + // + //------------------------------------------------------------------------------------------------------------------------- + + private void Load_File( BufferedInputStream Input_File) throws Exception { byte input_char; int msg_nb = 0; MQMessage output_msg = new MQMessage(); + int buffer_size, cur_buffer_size, window_size, cur_window_size, prefetch_size, buffer_offset, read_size; + int cur_pos, end_pos, next_message, next_field; + byte[] buffer_byte; + String buffer_string, out_string = null; + try + { + output_msg.format = MQC.MQFMT_STRING; + + + window_size = 1024 * 64; + prefetch_size = Field_Break.length() + Message_Break.length(); + buffer_size = window_size + prefetch_size; + + buffer_offset = 0; + + buffer_byte = new byte[buffer_size]; + + do + { + read_size = Input_File.read( buffer_byte, buffer_offset, buffer_size - buffer_offset); +// System.out.println( "Load buffer read_size: (" + read_size + ") asked: (" + (buffer_size - buffer_offset) + ")"); + + if( read_size > 0) + { + cur_window_size = Math.min( ( read_size + buffer_offset), window_size); + cur_buffer_size = ( read_size + buffer_offset); + + buffer_string = new String( buffer_byte, 0, cur_buffer_size); + cur_pos = 0; + + while( cur_pos < cur_window_size) + { +// System.out.println( "- CurPos: (" + cur_pos + ")"); + next_message = buffer_string.indexOf( Message_Break, cur_pos); + + if( ( next_message == -1) || ( next_message > cur_window_size)) + { + next_message = cur_window_size + 1; + end_pos = cur_window_size; + } + else + { +// System.out.println( "+ Get message next_message: (" + next_message + ")"); + end_pos = next_message; + } + + while( cur_pos < end_pos) + { +// System.out.println( "+ CurPos: (" + cur_pos + ") end_pos: (" + end_pos + ")"); + if( Field_Break.length() > 0) + { + next_field = buffer_string.indexOf( Field_Break, cur_pos); + } + else + { + next_field = -1; + } + + if( ( next_field == -1) || ( next_field > end_pos)) + { + out_string = buffer_string.substring( cur_pos, end_pos); + cur_pos = end_pos; + } + else + { +// System.out.println( "+ Get field next_field: (" + next_field + ")"); + out_string = buffer_string.substring( cur_pos, next_field); + cur_pos = next_field + Field_Break.length(); + } + + output_msg.writeString( out_string); + } + + if( next_message <= cur_window_size) + { +// System.out.println( "* Write Msg CurPos: (" + cur_pos + ")"); + output_msg.messageId = MQC.MQMI_NONE; + MQSPut_Msg( output_msg); + output_msg.clearMessage(); + out_string = null; + cur_pos += Message_Break.length(); + + Counter_Print( msg_nb++); + } + } + + if( cur_buffer_size > cur_window_size) + { + buffer_offset = cur_buffer_size - cur_window_size; + System.arraycopy( buffer_byte, cur_window_size, buffer_byte, 0, buffer_offset); + } + } + } + while( read_size > 0); + + if( out_string != null) + { +// System.out.println( "* Flushing Msg"); + output_msg.messageId = MQC.MQMI_NONE; + MQSPut_Msg( output_msg); + + Counter_Print( msg_nb++); + } + + System.out.println( "\n"); + } + + catch( Exception Expt) + { + throw Expt; + } + } + + + + + + //------------------------------------------------------------------------------------------------------------------------- + // + //------------------------------------------------------------------------------------------------------------------------- + + public MQSLoad( String args[]) + { try { - if( args.length != 2) + if( ( args.length < 2) || ( args.length > 6)) { - System.out.println( "Usage: MQSLoad "); + System.out.println( "Usage: MQSLoad [-fb \"field_break\"] [-mb \"message_break\"] "); } else { System.out.println( "MQS Load Starting..."); - - MsgQ_Name = args[0]; - File_Name = args[1]; - + + Arg_Parse( args); + System.out.println( "Output MsgQueue Name: (" + MsgQ_Name + ") Input File Name: (" + File_Name + ")"); - + System.out.println( "Field Break: (" + Field_Break + ") Message_Break: (" + Message_Break + ")"); MQSInit(); try - { - output_msg.format = MQC.MQFMT_STRING; - - BufferedInputStream input_file = new BufferedInputStream( new FileInputStream(File_Name)); + { + BufferedInputStream input_file = new BufferedInputStream( new FileInputStream( File_Name)); try { System.out.println("Input File Open: (" + input_file + ") !"); + + Load_File( input_file); - while( ( input_char = (byte)input_file.read()) != -1) - { - switch( input_char) - { - case '\r': - case ';': - { - break; - } - - case '\n': - { - System.out.println( "Sending Msg Nb (" + ++msg_nb + ") !"); - - output_msg.messageId = MQC.MQMI_NONE; - MQSPut_Msg(output_msg); - output_msg.clearMessage(); - - break; - } - - default: - { - output_msg.writeByte( input_char); - break; - } - } - } } catch( Exception Expt)