import condorAPI.*;
import condorAPI.event.*;
import java.io.*;
import java.net.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.Iterator;
import java.util.Calendar;
import java.util.Collections;
import java.sql.*;
import java.util.*;

public class Crawl_Condor_Db {

//Instance Variables
private int  url_id = 1;
private int file_id = 1;
private int dir_id = 1;
private Connection con = null;
int num_urls_per_condor_job = 2500;


public Crawl_Condor_Db(){
	String connectionURL = "jdbc:postgresql://postgres.cs.wisc.edu:5432/statusnet";
	 // Change the connection string according to your db, ip, username and password
  
 	try {
  
     		// Load the Driver class. 
		Class.forName("org.postgresql.Driver");
		// If you are using any other database then load the right driver here.
	        Properties props = new Properties();
	        props.setProperty("user","rashmi123");
 	        props.setProperty("password","postgres");
		props.setProperty("ssl","true");
		props.setProperty("sslfactory","org.postgresql.ssl.NonValidatingFactory");
		//Create the connection using the static getConnection method
		con = DriverManager.getConnection (connectionURL,props);
 	}
	catch (SQLException e) {
	     e.printStackTrace();
	}
	catch (Exception e) {
	    e.printStackTrace();
	}
 }


private void call_condor(int num_condor_jobs)
throws CondorException{

 // create a Condor object
    Condor condor = new Condor();

    // create a JobDescription object using exsiting file 
    // 'test.submit'
    JobDescription jd = new JobDescription();
    	jd.addAttribute("Executable","condor_crawl.py");
	jd.addAttribute("Universe","Vanilla");
	jd.addAttribute("arguments","input.$(Process)");
	jd.addAttribute("input","input.$(Process)");
	jd.addAttribute("output","output.$(Process)");
	jd.addAttribute("should_transfer_files","yes");
	jd.addAttribute("when_to_transfer_output","on_exit");
	jd.addAttribute("notification","Never");
	jd.addAttribute("error","error.$(Process)");
	jd.addQueue(num_condor_jobs);


	jd.setHandlerOnSuccess(new Handler(){
	  public void handle(Event e){
		System.out.print(e.getJob() + " success " );
		JobId job_id = e.getJobId();
		int jobNo = job_id.jobNo;
		readCondorOutFile("output."+jobNo);
		updateDatabase("input."+jobNo,"Input");
		updateDatabase("error."+jobNo,"Error");
		System.out.printf("%tc\n", Calendar.getInstance());
		System.out.println("Urls To Crawl "+ getNumUrls(0));

	  }
	});

	jd.setHandlerOnFailure(new Handler(){
	  public void handle(Event e){
		System.out.print(e.getJob() + " failed ");
		System.out.printf("%tc\n", Calendar.getInstance());
		JobId job_id = e.getJobId();
		int jobNo = job_id.jobNo;
	  }
	});
    // submit the jobDescription and get Cluster
    Cluster c = condor.submit(jd);

    System.out.println("submitted " + c);

    // wait for done
    c.waitFor();
    try{
	    Thread.sleep(60000);
    }
    catch(Exception e){
    }
    System.out.print("Condor Finished Processing " + num_condor_jobs + " jobs");
    System.out.printf(" %tc\n",Calendar.getInstance());
    // print out cluster status
    //System.out.println(c.dump());
  }

private  String changeDomainNameToIp(String url, HashMap<String,String> dns_cache){
	String domain_name = "";
	String host_address;

	try{
		URL u = new URL(url);
		domain_name = u.getHost();
		if(dns_cache.containsKey(domain_name)){
			host_address = dns_cache.get(domain_name);		
		}
		else{
			InetAddress address = InetAddress.getByName(domain_name);
			host_address = address.getHostAddress();
			dns_cache.put(domain_name,host_address);
		}

		url = url.replace(domain_name,host_address);
		//System.out.println("Domain Name is " +domain_name);
		//System.out.println("IP Address is "+host_address);
	 	//System.out.println("Replaced url is "+url);
	
	}catch(UnknownHostException e){
		//System.out.println("Unknown Host Name is: "+domain_name);
		return "";

	}catch(Exception e){
		//System.out.println("Invalid  url: "+url);
		return "";
	}	

        return url;
}


private String getUrlFromLine(String line, char id_present){
	if(id_present == 'N'){
		return line;
	}
	String res[] = line.split(",");
	int length = res.length;
	StringBuffer strBuf=  new StringBuffer();
	for(int i=0;i<length-2;i++){
		strBuf.append(res[i]);
		strBuf.append(',');
	}

	strBuf.append(res[length-2]);
	res = strBuf.toString().split("http://");
	return ("http://"+res[2]);
	
}
private void updateDatabase(String file_name, String file_type){
	
	try{

	Statement stmt = con.createStatement();
        //Execute the SQL statement and get the results in a Resultset
	String update_query = null;
	if(file_type.equalsIgnoreCase("Error")){
	        update_query = "update url_map set processed = 2 where url_name = " + "?" ; //2 indicates urls errored out while crawling
		System.out.print("Started Updating Database for Error file: "+ file_name);
		System.out.printf(" %tc\n",Calendar.getInstance());

	}
	else if(file_type.equalsIgnoreCase("Input")){
		update_query = "update url_map set processed = 1 where url_name = " + "?" ; //1 indicates urls are processed
		System.out.print("Started Updating Database for Input file: "+ file_name);
		System.out.printf(" %tc\n",Calendar.getInstance());

	}
	
	else{
		System.out.println("Error with input file type for updating database");
	}
        PreparedStatement st = con.prepareStatement(update_query);


	BufferedReader br = null;
	String tmp = "";
	int lineCount = 0;
	
		FileReader fr = new FileReader(file_name);
		br = new BufferedReader(fr);
		if(file_type.equalsIgnoreCase("Input")){
			br.readLine();
			br.readLine();
		}
		tmp = br.readLine();
		lineCount++;
		while(tmp!=null){
			String url = null;
			if(file_type.equalsIgnoreCase("Error")){
				 url = getUrlFromLine(tmp,'N');
			}
			else{
				 url = getUrlFromLine(tmp,'Y');
			}
			st.setString(1,url);
			st.addBatch();

			tmp = br.readLine();
			if(lineCount++%1000 == 0){
				System.out.print("Read " + lineCount + " lines from file: " + file_name);
				System.out.printf(" %tc\n",Calendar.getInstance());
		        }	
		}
		try{
			st.executeBatch();
		}
		catch(SQLException e){
			e.printStackTrace();
		}
		br.close();
		st.close();
	}catch (FileNotFoundException e1){
		System.out.println("Input file does not exist");

	}catch (IOException e2){
		System.out.println("IO Exception");

	}catch(SQLException e){
		e.printStackTrace();
	}
	
	System.out.print("Finished Updating Database for Error/Input files: "+ file_name);
	System.out.printf(" %tc\n",Calendar.getInstance());
	
}
private void readCondorOutFile(String file_name){

	try{
	//Create a Statement class to execute the SQL statement
        Statement stmt = con.createStatement();
  
        //Execute the SQL statement and get the results in a Resultset
        String insert_query = "Insert into url_map values(" + "?,?,? )";
        PreparedStatement st = con.prepareStatement(insert_query);

	System.out.print("Started Reading Condor Output file: "+ file_name);
	System.out.printf(" %tc\n",Calendar.getInstance());
	BufferedReader br = null;
	String tmp = "";
	int lineCount = 0;
	
		FileReader fr = new FileReader(file_name);
		br = new BufferedReader(fr);
		tmp = br.readLine();
		lineCount++;
		while(tmp!=null){
			st.setString(1,tmp);		
			st.setInt(2,url_id++);
			st.setInt(3,0); //0 indicates urls are not processed
			try{
				st.executeUpdate();
			}catch(SQLException e){
				//e.printStackTrace();
			}

			tmp = br.readLine();
			if(lineCount++%1000 == 0){
				System.out.print("Read " + lineCount + " lines from file: " + file_name);
				System.out.printf(" %tc\n",Calendar.getInstance());
			}
		
		}
		br.close();
		st.close();
	}catch (FileNotFoundException e1){
		System.out.println("Input file does not exist");

	}catch (IOException e2){
		System.out.println("IO Exception");

	}catch(SQLException e){
		e.printStackTrace();
	}

	
	System.out.print("Finished Reading Condor Output file: "+ file_name);
	System.out.printf(" %tc\n",Calendar.getInstance());
}

private  void writeCondorInputFiles(int num_condor_jobs, 
			            HashMap<String, String> dns_cache, 
				    String file_name){
	
	int file_index = 0;
	int offset_index = 0;
	while(file_index<num_condor_jobs){
		try{
			Statement stmt = con.createStatement();
  
        		//Execute the SQL statement and get the results in a Resultset
		        String select_query = "select url_name,url_id from url_map where processed = 0 limit "
					       +num_urls_per_condor_job 
					       + "offset " + offset_index;
			ResultSet rs = stmt.executeQuery(select_query);

			FileWriter fw = new FileWriter(file_name+ file_index);
			BufferedWriter output = new BufferedWriter(fw);
			output.write("file." + String.valueOf(file_id++)+ "\n");
			output.write("dir."+String.valueOf(dir_id++) + "\n");
			while(rs.next() ){
				String url = rs.getString("url_name");
				int id     = rs.getInt("url_id");
			        //System.out.println(url);
				String changed_url = changeDomainNameToIp(url,dns_cache);
				//System.out.println(changed_url);
			  	if(!changed_url.isEmpty()){	
					output.write(changed_url);
					output.write(url);
					output.write(",");
					output.write(String.valueOf(id));
					output.write("\n");
				}			
			}
			output.close();
			System.out.print("Written to file input#: "+file_index);
			System.out.printf(" %tc\n", Calendar.getInstance());
			offset_index = offset_index + num_urls_per_condor_job;
		 	file_index++;			
		}catch(IOException ie){
			System.out.println("Error while writing to file");
		}catch(SQLException e){
			//e.printStackTrace();
		}
	}
}
private void writeToFile(String file_name){
	try{
		//Create a Statement class to execute the SQL statement
	        Statement stmt = con.createStatement();
  
        	//Execute the SQL statement and get the results in a Resultset
	        String select_query = "select * from  url_map values where processed = 1";
        	ResultSet rs = stmt.executeQuery(select_query);

		FileWriter fw = new FileWriter(file_name);
		BufferedWriter output = new BufferedWriter(fw);
		while(rs.next()){
			String url = rs.getString("url_name");
			output.write(url);
			output.write(",");
			int id = rs.getInt("url_id");
			output.write(Integer.toString(id));
			output.write("\n");
		}
		output.close();

		
	}catch(IOException ie){
		System.out.println("Error while writing to file");
	}catch(SQLException e){
		e.printStackTrace();
	}
}

private int getNumUrls(int processed){
	int num_urls = 0;
	try{
		//Create a Statement class to execute the SQL statement
	        Statement stmt = con.createStatement();
  
        	//Execute the SQL statement and get the results in a Resultset
	        String select_query = "select count(*) as count from  url_map where processed = "+processed;
        	ResultSet rs = stmt.executeQuery(select_query);
	
		while(rs.next()){
			num_urls =  rs.getInt("count");
		}

	}catch(SQLException e){
		e.printStackTrace();
	}
	return num_urls;
}
private int getNumberOfJobs(){
	int num_condor_jobs = 0;
	
	int num_urls = getNumUrls(0);
	num_condor_jobs = num_urls / num_urls_per_condor_job;
	int rem = num_urls%num_urls_per_condor_job;
	if(rem != 0){
		num_condor_jobs = num_condor_jobs+1;
	}
	return num_condor_jobs;
}

	
public static void main(String [] args) 
  throws CondorException{
  	//create three hashmaps for submit_urls, in-process_urls and completed_urls
	HashMap<String, String> dns_cache   = new HashMap<String, String>();


	Crawl_Condor_Db cd = new Crawl_Condor_Db();
	//read seed file and populate submit_urls
	String seed_file = args[0];
	cd.readCondorOutFile(seed_file);
	
	
	int depth = 1;
	int depthCount = 0;
	while(depthCount<depth){
	        System.out.print("Urls Crawled "+ cd.getNumUrls(1));						
		System.out.printf("%tc\n",Calendar.getInstance());
	        System.out.println("Urls To Crawl "+ cd.getNumUrls(0));

		//logic to find the number of queue jobs required
	
		int num_condor_jobs = cd.getNumberOfJobs();	
		System.out.println("No of Condor Jobs " + num_condor_jobs);
		if(num_condor_jobs == 0){
			System.out.println("All the urls in the database are processed");
			break;
		}
		//write to input files to be used by condor
		cd.writeCondorInputFiles(num_condor_jobs,dns_cache,"input.");
		cd.call_condor(num_condor_jobs);		
		//cd.updateDatabase("input.0","input");
		//cd.updateDatabase("error.0","error");
		depthCount++;

	}	
	System.out.print("Urls Crawled "+ cd.getNumUrls(1));						
	System.out.printf("%tc\n",Calendar.getInstance());

	System.out.println("Started Writing Crawled Urls to File");
	cd.writeToFile("url_map");	

	System.out.print("Finished Writing..");						
	System.out.printf("%tc\n",Calendar.getInstance());
}
}
