#### Librerias #### library(ff) library(ffbase) #library(bigmemory) #library(bit) library(readr) library(snowfall) #### Funciones ##### # Usemos "resumen" para calcula el mínimo, media, maximo y largo de un vector resumen = function(x) c(min = min(x), media = mean(x), max=max(x),largo=length(x)) # Veamos un ejemplo para el siguiente vector v = c(1,5,3) resumen(v) ######### LECTURA #### # Tamaño del archivo csv de interes en Mb file.size("2008V.csv")/1024/1024 # MOSTRANDO LA OCUPACIÓN DE MEMORIA # Lectura comun start<-Sys.time() aero1 <- read_csv("2008V.csv") dim(aero1) # dimension del arreglo filas x columnas format(object.size(aero1),"Mb") # megas que ocupa de memoria el arreglo object.size(aero1) print(Sys.time() - start) # Lectura con ff start<-Sys.time() aero2=read.csv.ffdf(file="2008V.csv",VERBOSE = TRUE,first.rows = -1,header=T) dim(aero2) # dimension del arreglo format(object.size(aero2),"Mb") # megas que ocupa de memoria el arreglo object.size(aero2) print(Sys.time() - start) # Uso de memoria # memory.limit(max = F) disponible solo en Windows # memory.limit() disponible solo en Windows # Informa sobre la memoria y ayuda a limpiar gc() # la ultima columna muestra el espacio máximo utilizado #### Borrando aero1 ya que aero2 es lo mismo y ocupa menos mmemoria rm(aero1) # lectura tradicional # memory.size (max = F) # memory.limit() # Informa sobre la memoria y ayuda a limpiar, a?n usando rm puede quedar # ocupada la memoria. Esto la limpia gc() #### Revisando el arreglo que quedo # Muestra su forma de lista y al final la de arreglo aero2 # un mapa del archivo # Lista de variables (columnas) names(aero2) # Se puede consultar el nombre del archivo *.ff en disco que almacena a cada variable basename(filename(aero2$Year)) basename(filename(aero2$Origin)) # mostrar variables por lineas [desde:hasta] aero2$Distance[2000:2005] aero2[,'Distance'][2000:2005] aero2[[19]][2000:2005] # Estudiar que valores toman las variables (tipo de variable) tab1 = table(aero2[,'CancellationCode']) tab1 tab2 = names(table(aero2[,'Origin'])) tab2 tab3 = table(aero2[,'Distance']) tab3 # Aplicar funciones a variables (aplicar summary a la columna "distancia") descri = summary(aero2[,'Distance'],na.rm=TRUE) descri ######## TRABAJAR POR BLOQUES # Guardar numero de filas del arreglo y la variable de distancias de vuelo n = dim(aero2)[1] # [1] da filas, [2] columnas, [] da ambos n dist = aero2$Distance # Seleccionar por variable, en este caso: distancia dist # En secuencial # chunk se usa con objetos que provengan de "ff" # calcular en 10 paquetes (10 particiones). Usando función "resumen" p1 <- lapply( chunk(dist,length = 10) , function(i) resumen(dist[i])) p1 crbind(p1) # Tambien puedo hacer el corte de las particiones selectivas: ejemplo desde 1 a 100 # calcular los 100 primeros valores en paquetes de tamaño 10 p2 <- lapply( chunk(dist,from=1,to=100,by=10), function(i) resumen(dist[i]) ) crbind(p2) # los paquetes se cortan de acuerdo al criterio de R # calcular todo en paquetes de tamaño 100 mil (1e5) (no respeta el tamaño solicitado) p3 <- lapply( chunk(dist,from=1,to=n,by=1e5), function(i) resumen(dist[i]) ) crbind(p3) # tampoco los corta como yo quiero # R pone como prioridad la carga equiliobrada # SOLUCION: le indico exactamente donde quiero que pique la partición # calcular 500 mil exactos en paquetes de tamaño 100 mil (1e5) p4 <- lapply( chunk(dist,from=1,to=5e5,by=1e5), function(i) resumen(dist[i]) ) crbind(p4) # Pero sobra un trozo de datos # se calcula la cola restante, de 500.001 hasta n p5 <- lapply( chunk(dist,from=5e5+1,to=n,by=1e5), function(i) resumen(dist[i]) ) crbind(p5) # Unimos p4 y p5 para tener juntos los resultados de todos los paquetes y calcular el resultado final res = rbind(crbind(p4),crbind(p5)) fin = c(min=min(res[,1]),med=mean(res[,2]),max=max(res[,3])) fin ########### SECUENCIAL vs PARALELO ## Secuencial (el mismo calculo para p4) t1 = system.time( s1 <- lapply( chunk(dist,from=1,to=5e5,by=1e5), function(i) resumen(dist[i]) )) r1 = crbind(s1) t1 ## 2 cpus # inicialización de los nodos con "ff": sfInit, sfExport, sfClusterEval sfInit(parallel=TRUE, cpus=2, type="SOCK") sfLibrary(ff) sfExport("dist",'resumen') # do not export the same ff multiple times sfClusterEval(open(dist)) # explicitely opening avoids a gc problem t2 = system.time( s2 <- sfLapply( chunk(dist,from=1,to=5e5,by=1e5), function(i) resumen(dist[i])) ) # sfLapply para ejecutar en paralelo sfClusterEval(close(dist)) # for completeness r2 = crbind(s2) sfStop() t2 ## 4 cpus sfInit(parallel=TRUE, cpus=4, type="SOCK") sfLibrary(ff) sfExport("dist",'resumen') # do not export the same ff multiple times sfClusterEval(open(dist)) # explicitely opening avoids a gc problem t4 = system.time( s4 <- sfLapply( chunk(dist,from=1,to=5e5,by=1e5), function(i) resumen(dist[i])) ) sfClusterEval(close(dist)) # for completeness r4 = crbind(s2) sfStop() t4 ######## Comparacion ts = rbind(t1,t2,t4)[,1:3] dimnames(ts)[[1]] = c('secuencial','2 nodos','4 nodos') ts