import condorAPI.*;
import condorAPI.event.*;
import java.io.*;
import java.net.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.Iterator;
import java.util.Calendar;
import java.util.Collections;

public class Crawl_Condor {
private static int  url_id = 1;

private static void call_condor(int num_condor_jobs,final Map<String,Integer> comp_urls, final Map<String,Integer> submit_urls)
throws CondorException{

 // create a Condor object
    Condor condor = new Condor();

    // create a JobDescription object using exsiting file 
    // 'test.submit'
    JobDescription jd = new JobDescription();
    	jd.addAttribute("Executable","condor_crawl.py");
	jd.addAttribute("Universe","Vanilla");
	jd.addAttribute("arguments","input.$(Process)");
	jd.addAttribute("input","input.$(Process)");
	jd.addAttribute("output","output.$(Process)");
	jd.addAttribute("should_transfer_files","yes");
	jd.addAttribute("when_to_transfer_output","on_exit");
	jd.addAttribute("notification","Never");
	jd.addAttribute("error","error.$(Process)");
	//jd.addAttribute("transfer_input_files","bs4,mechanize");
	jd.addQueue(num_condor_jobs);


	jd.setHandlerOnSuccess(new Handler(){
	  public void handle(Event e){
		System.out.print(e.getJob() + " success " );
		System.out.printf("%tc\n", Calendar.getInstance());
		System.out.println("Urls To Crawl "+ submit_urls.size());
		JobId job_id = e.getJobId();
		int jobNo = job_id.jobNo;
		readCondorOutFile(submit_urls,comp_urls,"output."+jobNo);
		updateHashMap(comp_urls,"error."+jobNo);

			
	  }
	});

	jd.setHandlerOnFailure(new Handler(){
	  public void handle(Event e){
		System.out.print(e.getJob() + " failed ");
		System.out.printf("%tc\n", Calendar.getInstance());
		JobId job_id = e.getJobId();
		int jobNo = job_id.jobNo;
		updateHashMap(comp_urls,"input."+jobNo);
		readCondorOutFile(submit_urls,comp_urls,"input."+jobNo);
	  }
	});
    // submit the jobDescription and get Cluster
    Cluster c = condor.submit(jd);

    System.out.println("submitted " + c);

    // wait for done
    c.waitFor();
    try{
	    //Thread.sleep(60000);
    }
    catch(Exception e){
    }
    System.out.print("Condor Finished Processing " + num_condor_jobs + " jobs");
    System.out.printf(" %tc\n",Calendar.getInstance());
    // print out cluster status
    //System.out.println(c.dump());
  }

private static String changeDomainNameToIp(String url, HashMap<String,String> dns_cache){
	String domain_name = "";
	String host_address;

	try{
		URL u = new URL(url);
		domain_name = u.getHost();
		if(dns_cache.containsKey(domain_name)){
			host_address = dns_cache.get(domain_name);		
		}
		else{
			InetAddress address = InetAddress.getByName(domain_name);
			host_address = address.getHostAddress();
			dns_cache.put(domain_name,host_address);
		}

		url = url.replace(domain_name,host_address);
		//System.out.println("Domain Name is " +domain_name);
		//System.out.println("IP Address is "+host_address);
	 	//System.out.println("Replaced url is "+url);
	
	}catch(UnknownHostException e){
		//System.out.println("Unknown Host Name is: "+domain_name);
		return "";

	}catch(Exception e){
		//System.out.println("Invalid  url: "+url);
		return "";
	}	

        return url;
}

private static void updateHashMap(Map<String,Integer> urls, String file_name){

	System.out.print("Started Updating HashMap for Error file: "+ file_name);
	System.out.printf(" %tc\n",Calendar.getInstance());
	BufferedReader br = null;
	String tmp = "";
	int lineCount = 0;
	try{
		FileReader fr = new FileReader(file_name);
		br = new BufferedReader(fr);
		tmp = br.readLine();
		lineCount++;
		while(tmp!=null){
			String res[] = tmp.split(",");
			int length = res.length;
			StringBuffer strBuf=  new StringBuffer();
			for(int i=0;i<length-1;i++){
				strBuf.append(res[i]);
				strBuf.append(',');
			}
			strBuf.append(res[length-1]);
			//urls.put(strBuf.toString(),new Integer(Integer.parseInt(res[length-1])));
			urls.remove(strBuf.toString());
			tmp = br.readLine();
			if(lineCount++%1000 == 0){
				System.out.print("Read " + lineCount + " lines from file: " + file_name);
				System.out.printf(" %tc\n",Calendar.getInstance());
		        }	
		}
		br.close();
	}catch (FileNotFoundException e1){
		System.out.println("Input file does not exist");

	}catch (IOException e2){
		System.out.println("IO Exception");

	}
	
	System.out.print("Finished Updating HashMap for Error file: "+ file_name);
	System.out.printf(" %tc\n",Calendar.getInstance());
	
}
private static void readCondorOutFile(Map<String,Integer> submit_urls,Map<String,Integer> comp_urls, String file_name){
	System.out.print("Started Reading Condor Output file: "+ file_name);
	System.out.printf(" %tc\n",Calendar.getInstance());
	BufferedReader br = null;
	String tmp = "";
	int lineCount = 0;
	try{
		FileReader fr = new FileReader(file_name);
		br = new BufferedReader(fr);
		tmp = br.readLine();
		lineCount++;
		while(tmp!=null){
			
			if(!submit_urls.containsKey(tmp) && !comp_urls.containsKey(tmp)){
				submit_urls.put(tmp,new Integer(url_id));
				url_id++;
			}
			tmp = br.readLine();
			if(lineCount++%1000 == 0){
				if(submit_urls.size() > 10000000){
					System.out.println("Reached Max Limit");
					break;
				}
				System.out.print("Read " + lineCount + " lines from file: " + file_name);
				System.out.printf(" %tc\n",Calendar.getInstance());
			}
		
		}
		br.close();
	}catch (FileNotFoundException e1){
		System.out.println("Input file does not exist");

	}catch (IOException e2){
		System.out.println("IO Exception");

	}
	
	System.out.print("Finished Reading Condor Output file: "+ file_name);
	System.out.printf(" %tc\n",Calendar.getInstance());
}

private static void writeCondorInputFiles(int num_condor_jobs, 
					  int urls_per_input_file,
					  Map<String,Integer> urls, 
					  Map<String, Integer> comp_urls,  
					  HashMap<String, String> dns_cache, 
					  String file_name){
	//int urls_per_input_file = urls.size()/num_condor_jobs;
	int num_input_files = num_condor_jobs;
	Iterator itr = urls.keySet().iterator();
	int file_index = 0;
	while(file_index<num_input_files){
		try{
			FileWriter fw = new FileWriter(file_name+ file_index);
			BufferedWriter output = new BufferedWriter(fw);
			int count = 0;
			while(itr.hasNext() && ((count<urls_per_input_file))){// | (file_index == num_input_files-1))){
				String url = (String)itr.next();
			        //System.out.println(url);
				//String changed_url = changeDomainNameToIp(url,dns_cache);
				//System.out.println(changed_url);
			        //if(!changed_url.isEmpty()){	
					//output.write(changed_url);
					output.write(url);
					output.write(",");
					Integer id = urls.get(url);
					output.write(Integer.toString(id));
					output.write("\n");
					count++;
					comp_urls.put(url,id);
				//}
				itr.remove();
				
			}
			output.close();
			System.out.print("Written to file input#: "+file_index);
			System.out.printf(" %tc\n", Calendar.getInstance());
					
		}catch(IOException ie){
			System.out.println("Error while writing to file");
		}
		file_index++;
	}
}
private static void writeToFile(Map<String,Integer> urls, String file_name){
	Iterator itr = urls.keySet().iterator();
	try{
		FileWriter fw = new FileWriter(file_name);
		BufferedWriter output = new BufferedWriter(fw);
		while(itr.hasNext()){
			String url = (String)itr.next();
			output.write(url);
			output.write(",");
			Integer id = urls.get(url);
			output.write(Integer.toString(id));
			output.write("\n");
		}
		output.close();

		
	}catch(IOException ie){
		System.out.println("Error while writing to file");
	}
}

	/**
	 * @param args
	 */
public static void main(String [] args) 
  throws CondorException{
  	//create three hashmaps for submit_urls, in-process_urls and completed_urls
	Map<String,Integer> submit_urls = Collections.synchronizedMap(new HashMap<String,Integer>(26000000));
	HashMap<String, String> dns_cache   = new HashMap<String, String>();
	Map<String, Integer> comp_urls   = Collections.synchronizedMap(new HashMap<String, Integer>(15000000));
	//read seed file and populate submit_urls
	String seed_file = args[0];
	readCondorOutFile(submit_urls,comp_urls, seed_file);
	
	//set the number of urls to be given to each condor job
	int num_urls_per_condor_job = 2500;

	int depth = 1;
	int depthCount = 0;
	while((depthCount<depth)&&(submit_urls.size()>0)){
	        System.out.print("Urls Crawled "+ comp_urls.size());						
		System.out.printf("%tc\n",Calendar.getInstance());
	        System.out.println("Urls To Crawl "+ submit_urls.size());
		//logic to find the number of queue jobs required
		int num_condor_jobs = submit_urls.size() / num_urls_per_condor_job;
		int rem = submit_urls.size()%num_urls_per_condor_job;
		if(rem != 0){
			num_condor_jobs = num_condor_jobs+1;
		}
		/*if(num_condor_jobs >100){
			num_condor_jobs = 100;
		}*/	
		//write to input files to be used by condor
		writeCondorInputFiles(num_condor_jobs,num_urls_per_condor_job,submit_urls,comp_urls,dns_cache,"input.");
	
		call_condor(num_condor_jobs,comp_urls,submit_urls);		
		depthCount++;

	}	
	System.out.print("Urls Crawled "+ comp_urls.size());						
	System.out.printf("%tc\n",Calendar.getInstance());

	System.out.println("Started Writing Crawled Urls to File");
	writeToFile(comp_urls,"url_map");	

	System.out.print("Finished Writing..");						
	System.out.printf("%tc\n",Calendar.getInstance());
}
}
