#### Librerias #### library(ff) library(readr) library(snowfall) # para paralelizar library(tictoc) # medidas de tiempo para script complejos ######## # Ejemplo sencillo # Generando ejemplo de datos df (dataframe). Ejemplo muy sencillo id = 1:12 genero = sample(factor(c("masculino","femenino","no_ide")), 12, TRUE) rating = matrix(sample(1:6, 12*10, TRUE), 12, 10) # rellenar esa matriz colnames(rating) = paste("r", 1:10, sep="") # Les ponemos una columnas df = data.frame(id, genero, rating) head(df,12) is(df) # que tipo de objeto esel objeto creados. Debe ser yb dataframe # Llevandolo a formato ff fid = as.ff(id) # Para llevarlo a un dataframe tipo ff fgenero = as.ff(genero) frating = as.ff(rating) fdf = ffdf(id=fid, genero = fgenero, frating) head(fdf,12) is(fdf) # que tipo de objeto es # Aunque df es un data.frame y fdf un objeto ff al compararlos se toman por identicos identical(df, fdf[,]) # Si solicitamos filas o columnas de fdf son de tipo data.frame fdf[1:3,] # data.frame is(fdf[1:3,]) fdf[,1:4] # data.frame is(fdf[,1:4]) # Si solicitamos vector, lista o variable de fdf son de tipo ffdf o ff_vector fdf[1:4] # ffdf is(fdf[1:4]) fdf[] # ffdf is(fdf[]) fdf[[2]] # ff_vector is(fdf[[2]]) fdf$genero # ff_vector is(fdf$genero) # Usemos "resumen" para calcula el mínimo, media, maximo y largo de un vector resumen = function(x) c(min = min(x,na.rm=TRUE), media = mean(x,na.rm=TRUE), max = max(x,na.rm=TRUE),largo = length(x)) # Veamos un ejemplo para el siguiente vector v = c(1,5,3) resumen(v) ############ # Archivo de 875 Mb archivo1 = "~/Desktop/R ZORA Ejemplos/CLASE 1/minitrain.csv" # archivo1 = "~/Desktop/R ZORA Ejemplos/CLASE 1/train_data2.csv" # Archivo de 15 Gb archivo2 = "~/Desktop/R ZORA Ejemplos/CLASE 1/train_data.csv" # Lectura comun train1 = read_csv(archivo1) dim(train1) # dimension del arreglo os1 = object.size(train1) # tamaño que ocupa el arreglo en memoria format(os1, units = 'KB') file.size(archivo1)/1024 #megas # Lectura con ff diri_ff = 'ff_diri' if(!dir.exists(diri_ff)) dir.create(diri_ff) options(fftempdir=diri_ff) train2 = read.csv.ffdf(file = archivo1,VERBOSE = TRUE,first.rows = -1,header=T) dim(train2) # dimension del arreglo os2 = object.size(train2) # tamaño que ocupa el arreglo en memoria format(os2, units = 'KB') # Lectura con ff por partes train3 = read.csv.ffdf(file = archivo1,VERBOSE = TRUE,next.rows=5000) dim(train3) os3 = object.size(train3) # tamaño que ocupa el arreglo en memoria format(os3, units = 'KB') # Comparacion de tiempos de uso tic('En_RAM') table(train1[,10]) t1 = toc() tic('En_disco') table(train3[,10]) t3 = toc() # Tarda más en procesarse el leido por ff t1$callback_msg t3$callback_msg ## Nos quedamos con la que usa menor espacio en memoria basura1 = gc() #### Borrando train1 ya que train3 es lo mismo y ocupa menos mmemoria. También train2 rm(train1,train2) basura2 = gc() basura1 basura2 # Lectura con ff por partes de la de 15 GB archivo2 = "train_data.csv" train_big = read.csv.ffdf(file = archivo2,VERBOSE = TRUE,next.rows=100000) dim(train_big) is(train_big) os4 = object.size(train_big) # tamaño que ocupa el arreglo en memoria format(os4, units = 'KB') format(os4, units = 'MB') file.size(archivo2) #### Revisando el arreglo grande names(train_big) head(train_big,15) # Correcto para arreglos, aqu? da error para objetos ffdf en algunas funciones res_a = resumen(train_big$D_39) # Estas otras funcionan res_1 = resumen(train_big[,'D_39']) res_1 res_2 = resumen(train_big$D_39[]) res_2 res_3 = resumen(train_big[,4]) res_3 # Se puede mostrar variables por lineas [desde:hasta] train_big$D_39[2000:2005] train_big[,'D_39'][2000:2005] train_big[[4]][2000:2005] # Se puede consultar el nombre del archivo *.ff en disco que almacena a cada variable basename(filename(train_big$D_39)) basename(filename(train_big$B_24)) # Convirtiendo a dataframe is(train_big) f39 = train_big$D_39 is(f39) head(f39) format(object.size(f39),units = 'KB') d39 = as.data.frame(train_big[,'D_39']) is(d39) head(d39) format(object.size(d39),units = 'KB') format(object.size(d39),units = 'MB') ########### SECUENCIAL vs PARALELO ## Secuencial t1 = system.time( s1 <- lapply( chunk(f39,from=1,to=5531000,by=500000), function(i) resumen(f39[i]) )) resultado_s = crbind(s1) head(resultado_s) ## 4 cpus # inicializaci?n de los nodos con "ff": sfInit, sfExport, sfClusterEval sfInit(parallel=TRUE, cpus=4, type="SOCK") sfLibrary(ff) sfExport("f39",'resumen') # no exporta el mismo ff multiples veces sfClusterEval(open(f39)) # explicitely opening avoids a gc problem t2 = system.time( s2 <- sfLapply( chunk(f39,from=1,to=5531000,by=500000), function(i) resumen(f39[i])) ) # sfLapply para ejecutar en paralelo sfClusterEval(close(f39)) sfStop() resultado_p = crbind(s2) head(resultado_p) compara = rbind(t1,t2) rownames(compara) = c('Secuencial','Paralelo') compara