from WorkingPyDemo import * from scipy.ndimage.filters import gaussian_filter import paramiko from PIL import ImageFilter from scipy.signal import convolve2d from scipy.stats import multivariate_normal from skimage.restoration import wiener def setup_remote_sftpclient(): """ Setup the SFTP client Returns: sftp_client (paramiko.SFTPClient): the SFTP client """ client = paramiko.SSHClient() client.load_system_host_keys() client.connect("192.168.0.107", username="elphel") sftp_client = client.open_sftp() return sftp_client def create_average(images, which_sensor): """ Create an average image from a list of images Parameters: images (list): a list of images which_sensor (int): the sensor number Returns: average_image (np.array): the average image """ same_sensor_images = [] which_sensor = str(which_sensor) average_image = np.zeros_like(np.array(Image.open(images[0]))[1:]) for i, image_name in enumerate(images): if int(which_sensor) > 9: if image_name[-7:-5] == which_sensor: same_sensor_images.append(image_name) else: if image_name[-7:-5] == "_" + which_sensor: same_sensor_images.append(image_name) for i, image_name in enumerate(same_sensor_images): image_object = Image.open(image_name) average_image = np.array(image_object)[1:] + average_image return average_image/len(same_sensor_images) def remote_create_average(images, which_sensor): """ Create an average image from a list of remote images Parameters: images (list): a list of images which_sensor (int): the sensor number Returns: average_image (np.array): the average image """ sftp_client = setup_remote_sftpclient() averages = [] same_sensor_images = images which_sensor = str(which_sensor) small_set_images = [] for i, image_name in enumerate(same_sensor_images): # print(image_name) image_object = sftp_client.open(image_name) image_object = Image.open(image_object) small_set_images.append(np.array(image_object)[1:]) # print(np.array(image_object).shape) # print(np.array(image_object)[1:] + average_image) if (i % 100 == 0) and i!=0: image_object = np.mean(np.array(small_set_images),axis = 0) averages.append(image_object) small_set_images = [] print(i) if len(small_set_images) > 1: image_object = np.mean(np.array(small_set_images),axis = 0) averages.append(image_object) sftp_client.close() if len(averages) > 1: return np.mean(averages,axis=0) else: return np.array(averages[0]) def remote_file_extractor(headname = "/media/elphel/NVME/lwir16-proc/te0607/scenes/"): """Find all the files in the directory Parameters: dirname (str): the directory name Returns: files (list): a list of all the files in the directory """ sftp_client = setup_remote_sftpclient() # sftp_client.listdir("media/elphel/NVME/lwir16-proc/te0607/scenes/") dirs_in_scenes = sftp_client.listdir(headname) scenes = [] for i, curr_folder in enumerate(dirs_in_scenes): if "." not in curr_folder: smaller_dirs = sftp_client.listdir(headname + curr_folder) for small_folder in smaller_dirs: scenes.append(headname + curr_folder + "/" + small_folder) sftp_client.close() return scenes def remote_image_extractor(scenes): """Find all the remote files in the directory Parameters: dirname (str): the directory name Returns: image_folder (list): a list of all the file paths in the directory """ sftp_client = setup_remote_sftpclient() image_folder = [] for scene in scenes: files = sftp_client.listdir(scene) for file in files: if file[-5:] != ".tiff" or file[-7:] == "_6.tiff": continue else: image_folder.append(os.path.join(scene, file)) sftp_client.close() return image_folder #returns a list of file paths to .tiff files in the specified directory given in file_extractor def find_only_in_channel(images, channel_name = "10"): """ Find the images that are only in the specified channel Parameters: images (list): a list of images channel_name (str): the channel name Returns: same_sensor_images (list): a list of images that are only in the specified channel """ same_sensor_images = [] for i, image_name in enumerate(images): if int(channel_name) > 9: if image_name[-7:-5] == channel_name: same_sensor_images.append(image_name) else: if image_name[-7:-5] == "_" + channel_name: same_sensor_images.append(image_name) return same_sensor_images def adjust_to_original(new_image, average_image): """ Adjust the image to the original image Parameters: new_image (np.array): the new image average_image (np.array): the average image Returns: adjusted_image (np.array): the adjusted image """ original_image_min = np.min(new_image) original_image_max = np.max(new_image) average_image = average_image - np.mean(average_image) # adjusted_image = average_image - new_image # adjusted_image = gaussian_filter(new_image,sigma=1) adjusted_image = new_image - (average_image - gaussian_filter(average_image,sigma=1)) # adjusted_image = gaussian_filter(adjusted_image,sigma=20) # adjusted_image = new_image + (average_image - np.array(Image.fromarray(average_image).convert("L").filter(ImageFilter.GaussianBlur(radius=4)))) # plt.subplot(121) # plt.imshow(color_adjust((adjusted_image)),cmap='gray',vmin = 0, vmax=1) # plt.subplot(122) # plt.imshow(color_adjust(new_image),cmap='gray',vmin = 0, vmax=1) # plt.show() adjusted_image = adjusted_image - np.min(adjusted_image) adjusted_image = adjusted_image*(original_image_max-original_image_min)/np.max(adjusted_image) adjusted_image = adjusted_image + original_image_min # print(adjusted_image.dtype) return adjusted_image.astype(np.uint16) def color_adjust(visual_array): """ Adjust the image to the original image Parameters: visual_array (np.array): the visual array Returns: adjusted_array (np.array): the adjusted image """ min_of_errors = np.min(visual_array) adjusted_array = visual_array - min_of_errors adjusted_array = adjusted_array/np.max(adjusted_array) return adjusted_array def save_new_average(quantity_of_images = 1500, channel = "10", head_dir = "/media/elphel/NVME/lwir16-proc/te0607/scenes/"): """ Save the new average image Parameters: quantity_of_images (int): the number of images to use in the average channel (str): the channel name head_dir (str): the head directory """ scenes = remote_file_extractor(head_dir) images = find_only_in_channel(remote_image_extractor(scenes),channel) if quantity_of_images != "all": images = np.random.choice(images,quantity_of_images,replace = False) # images = images[:quantity_of_images] average_image = remote_create_average(images,channel) # average_image = average_image.astype(int) average_savable_image = Image.fromarray(average_image) average_savable_image.save("Average_On_Channel(" + channel + ").tiff") def save_testable_images(images, selected_channel, quantity_of_images): """ Saves the testable images Parameters: images (list): a list of images selected_channel (str): the channel name quantity_of_images (int): the number of images to use in the average """ sftp_client = setup_remote_sftpclient() images = find_only_in_channel(images, selected_channel) # image_locations = np.random.choice(len(images), quantity_of_images, replace=False) image_locations = np.arange(quantity_of_images) + 10000 selected_images = np.array(images)[image_locations] average_image = np.array(Image.open("Average_On_Channel(" + selected_channel + ").tiff")) images_in_each_direction = 80 for i, item in enumerate(selected_images): # if image_locations[i] < images_in_each_direction: # average_image = remote_create_average(images[image_locations[i] - image_locations[i]: image_locations[i] + images_in_each_direction + image_locations[i]], selected_channel) # elif image_locations[i] > (len(images) - images_in_each_direction): # average_image = remote_create_average(images[image_locations[i] - images_in_each_direction - (len(images) - image_locations[i]): image_locations[i] + (len(images) - image_locations[i])], selected_channel) # else: # average_image = remote_create_average(images[image_locations[i] - images_in_each_direction: image_locations[i] + images_in_each_direction], selected_channel) image = Image.open(sftp_client.open(item)) wherelastslash = item.rfind("/") image = np.array(image)[1:] savable_original = Image.fromarray(image) savable_original.save("original_images(" + selected_channel + ")/innerfolder/original" + item[wherelastslash + 1:]) altered_image = adjust_to_original(image, average_image) altered_image = Image.fromarray(altered_image) altered_image.save("averaged_images(" + selected_channel + ")/innerfolder" + item[wherelastslash:]) # average_image = Image.fromarray(average_image) sftp_client.close() def save_new_gauss(): """ Save the new gauss image """ # x,y = np.mgrid[-1:1:.003125, -1:1:.003125] x,y = np.mgrid[-1:1:.44, -1:1:.44] print(x.shape) pos = np.dstack((x,y)) # grid = np.zeros((l,l)) # gauss = np.exp(-0.5 * np.square(ax) / np.square(sig)) # normal_grid = multivariate_normal.pdf(grid, mean = [0]*l, cov = [5]*l) normal_grid = multivariate_normal([0, 0], [[1.1, 5], [2, 1]]).pdf(pos) normal_grid = normal_grid/np.sum(normal_grid) end_image = Image.fromarray(normal_grid) # fig2 = plt.figure() # ax2 = fig2.add_subplot(111) # ax2.contourf(x, y, normal_grid) # plt.show() # print(np.sum(np.array(end_image))) end_image.save("gaussian_kernel.tiff") if __name__ == "__main__": # save_new_average(350,"11") # save_new_gauss() gaussian_kernel = np.array(Image.open("gaussian_kernel.tiff")) # gaussian_kernel = np.array([[-1, -1, -1], [-1, 8, -1], [-1, -1, -1]]) scenes = remote_file_extractor("/media/elphel/NVME/lwir16-proc/te0607/scenes/") images = remote_image_extractor(scenes) images = find_only_in_channel(images, "11") # average_image = np.array(Image.open("Average_On_Channel(" + "11" + ").tiff")) save_testable_images(images,"11",5) # plt.imshow(color_adjust(average_image),cmap='gray',vmin = 0, vmax=1) # plt.show() print(len(images)) average_image = remote_create_average(images[5000 - 10: 5000 + 10], "11") # plt.imshow(color_adjust((average_image - gaussian_filter(average_image,sigma=5))),cmap='gray',vmin = 0, vmax=1) # plt.show() sftp_client = setup_remote_sftpclient() # print(len(images)) # print(images[10000]) test_image = sftp_client.open(images[10000]) test_image = Image.open(test_image) test_image = np.array(test_image)[1:] # print(test_image) plt.subplot(121) plt.imshow(color_adjust(adjust_to_original(test_image,average_image)),cmap='gray',vmin = 0, vmax=1) # plt.subplot(122) # plt.show() # print(gaussian_kernel) # little_more_blurred = convolve2d(np.pad(test_image,2, mode = 'edge'),gaussian_kernel, "valid") # print(little_more_blurred) # print(little_more_blurred.shape) # print(little_inverter(gaussian_kernel)@gaussian_kernel) # altered_image = Image.fromarray(little_more_blurred.astype(np.uint16)) # altered_image.save("averaged_images(" + "11" + ")/innerfolder" + "/testable.tiff") # plt.imshow(color_adjust(little_more_blurred[3:-3,3:-3]),cmap='gray',vmin = 0, vmax=1) # plt.show() # renewed_array = # H = fft(kernel) # deconvolved = np.real(ifft(fft(signal)*np.conj(H)/(H*np.conj(H) + lambd**2))) # back_to_normal = wiener(little_more_blurred,gaussian_kernel,.02,clip=False) # print(back_to_normal) # print(back_to_normal.shape) # plt.imshow(color_adjust(back_to_normal), cmap='gray',vmin = 0, vmax=1) # plt.show() # newimage = Image.fromarray(test_image - average_image) # newimage.save("NoInterference.tiff") # plt.subplot(121) # plt.imshow(color_adjust(test_image),cmap='gray',vmin = 0, vmax=1) # plt.subplot(122) # plt.imshow(color_adjust(adjust_to_original(test_image,average_image)),cmap='gray',vmin = 0, vmax=1) # plt.show() sftp_client.close() # print(np.linalg.inv(np.array([[3,0,-1],[0,3,3],[1,-3,-4]]))) # print(np.linalg.pinv(np.array([[-1,-1,1], [-1,0,1], [-1,1,1], [0,-1,1]]))) # last_image = Image.open("NoInterference.tiff") # plt.imshow(color_adjust(np.array(last_image)),cmap='gray',vmin = 0, vmax=1) # plt.show()