Calcul de VWAP avec Hadoop en StandAlone

Par | Classé dans Intermédiaire, Java, Performance | Le 12/11/2012

Tags « »

5

Hadoop dans les grandes lignes 

Hadoop est un framework java destiné à faciliter la création d’applications distribuées et scalables. Il permet aux applications de travailler avec des milliers de nœuds et des pétaoctets de données.

Hadoop repose sur son système de fichier (HDFS) ainsi que sur son algorithme de Map/Reduce.

Généralement, tous les tutoriaux ou exemples d’implémentation d’Hadoop concernent un calcul basique de calcul du nombre de différents mots contenu dans des fichiers.

Dans cet article, je vais m’attacher à vous présenter un cas concret d’utilisation d’Hadoop pour le calcul de VWAP.

 

Qu’est ce qu’une VWAP et pourquoi utiliser Hadoop pour la calculer ?

La VWAP (Volume-weighted average price) est un indice financier correspondant au ratio de la valeur échangée par le volume échangé. Plus d’info ici

Notre but est de calculer la VWAP pour tous les titres, sur tout l’historique disponible à des heures données.

Les fichiers historiques sont de la forme txt. Il y a un fichier par jour, ce fichier pèse en moyenne 1GigaOctet. Donc selon la taille de notre historique, ce calcul peut s’avérer très fastidieux et couteux en ressources.

Voici à quoi ressemble un fichier en entrée (enfin une infime partie) :

C;FR0000130338;09:00:03;3;310;24.6;50;24.5
C;FR0000130338;09:00:03;4;231;24.62;665;24.4
C;FR0000130338;09:00:03;5;686;24.63;309;24.37
C;BE0003796134;09:00:03;1;3360;4.51;5306;4.5
N;FR0000120222;09:00:00;00000042;67.99
N;FR0000120222;09:00:00;00000019;67.99
N;FR0000120222;09:00:00;00000012;67.99
N;FR0000120222;09:00:00;00000002;67.99
N;FR0000120222;09:00:00;00000041;67.99

En séparant par “;”

  1. C ou N, C pour carnet d’ordre N pour une transaction
  2. Code ISIN du titre
  3. Heure de la transaction
  4. Volume de titres échangés
  5. Prix

Hadoop va donc être utilisé pour ces fonctionnalitées de calcul distribués afin de répartir la charge de parsing et calcul sur différentes machines et tout aggréger pour donner en sortie un fichier avec les vwap demandées.

Voici un extrait du fichier de sortie :

FR0010112524-20100104-09:00:00     25.656256
FR0010112524-20100104-12:00:00     26.125364
FR0010112524-20100104-17:00:00     26.28817
FR0010112524-20100106-09:00:00     26.560003
FR0010112524-20100106-12:00:00     26.684067
FR0010112524-20100106-17:00:00     26.756487
FR0010112524-20100217-09:00:00     25.949999

Implémentation d’Hadoop pour un calcul de VWAP

En entrée, on a donc toute une série de fichiers txt. Chaque fichier correspond aux extractions Euronext des transactions d’un jour donné.

Seules les lignes commençant par «N;» nous intéressent, les autres correspondant aux carnets d’ordre.

Au niveau du Mapper, nous allons extraire toutes les lignes commençant par «N;» pour un ou plusieurs code ISIN donnés.

Dans le Reducer, on va vouloir récupérer toutes les lignes correspondant à notre/nos ISIN triées par jour puis heure afin de calculer la Vwap pour les heures données. Or Hadoop ne permet pas de trier directement la liste de <Values> reçue par le Reducer

Tri des <Values> grâce à une clé composite

Avec Hadoop il est impossible d’effectuer un tri sur les valeurs. Par contre, à la place on peut placer les données qu’on veut trier dans la clé et ainsi utiliser une clé composite. Ensuite on utilise une classe de Partition spécifique ainsi que deux classes RawComparator afin de trier et partitionner notre map en sortie correctement.

La solution est d’utiliser un système de clé composite, dans notre exemple, je la nomme IsinKey.

L’objet IsinKey sera utilisé comme la Clé et contient :

  • Une String key qui est la concaténation de l’ISIN puis du jour de l’extraction (par exemple : FR0010112524-20120913)
  • Une String hour qui correspond à l’heure d’une transaction

Un objet Tick sera lui utilisé comme Valeur pour stocker le volume et le prix.

Ensuite, il va nous falloir définir 3 réglages dans le JobConf d’Hadoop :

  • setPartitionerClass
  • setOutputValueGroupingComparator
  • setOutputKeyComparatorClass

Le Partitioner

La première classe qu’on doit setter étend de org.apache.hadoop.mapred.Partitioner. Cette classe a pour unique fonction de déterminer dans quelle partition notre map de sortie doit aller. On ne peut aller en dessous de 0 ou au dessus de numPartitions -1 dans notre retour. En général il est préférable de récupérer le HashCode d’une partie de la Clé et d’y appliquer un modulo du nombre de partitions.

Dans notre exemple, on récupère le hashcode de notre Clé qu’on module ensuite avec le numPartitions.

public int getPartition(IsinKey key, Tick val, int numPartitions) {
   int hash = key.getKey().hashCode();
   return hash % numPartitions;
}

L’ Output Value Grouping Comparator

L’OutputValueGroupingComparator dans les paramètres du JobConf utilise un org.apache.hadoop.io.RawComparator. Le RawComparator est utilisé pour déterminer dans quel Reducer notre ligne de sortie de mapping doit aller. Ce RawComparator ne trie pas la liste de Valeurs du Reducer, mais trie l’entrée du Reducer,  pour que le Reducer sache quand un nouveau groupe commence.

Dans notre exemple, le grouping comparator va trier par la «Key» de notre object IsinKey (c’est a dire «Isin-date»

public int compare(WritableComparable w1, WritableComparable w2) {
   IsinKey k1 = (IsinKey)w1;
   IsinKey k2 = (IsinKey)w2;
   return k1.getKey().compareTo(k2.getKey());
}

L’ Output Key Comparator

L’outputKeyComparatorClass dans les paramètre du JobConf utilise aussi un org.apache.hadoop.io.RawComparator. Ce RawComparator est utilisé pour trier les Clés que reçoit le Reducer. Les données envoyées au comparateur sont nos objets clés (IsinKey dans notre exemple). Il est très important aussi de retenir que le KeyComparator doit comprendre la règle définie dans l’OutputValueComparator.

Dans notre exemple, ça veut dire qu’on va d’abord comparer la Key de l’objet IsinKey, puis, si ces Key sont égales, alors on va comparer les champs Hours de l’objet IsinKey.

public int compare(WritableComparable w1, WritableComparable w2) {
   IsinKey k1 = (IsinKey)w1;
   IsinKey k2 = (IsinKey)w2;
   SimpleDateFormat f = new SimpleDateFormat("HH:mm:ss");
   int result = k1.getKey().compareTo(k2.getKey());

   if(0 == result) {
      try {
         result = f.parse(k1.getDate()).compareTo(f.parse(k2.getDate()));
      } catch (Exception e) {
         e.printStackTrace();
      }
   }
return result;
}

Grâce à ces trois classes redéfinies, le Reducer sera appelé une fois par clé IsinDate, puis lorsque l’on itère sur les «values», l’heure de la «Key» prend successivement ses différentes valeurs, ce qui nous permet  de déterminer la vwap à chaque tick, et de repartir à 0 pour chaque nouvelle journée

Lancement en “Stand-Alone” et reste des classes

Afin de démarrer le projet en mode «Stand-Alone», il suffit d’inclure les sources d’hadoop dans un projet eclipse par exemple, puis d’inclure ce projet dans le projet courant.

CalcVWAPDriver

public class CalcVWAPDriver {
   public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

      Configuration conf = new Configuration();
      GenericOptionsParser parser = new GenericOptionsParser(conf, args);
      args = parser.getRemainingArgs();
      conf.setStrings("ISIN", "FR0010112524");
      conf.setStrings("DATES", "09:00:00", "12:00:00", "17:00:00");
      Job job = new Job(conf, "calcVWAP");
      job.setPartitionerClass(NaturalKeyPartitioner.class);
      job.setGroupingComparatorClass(NaturalKeyGroupingComparator.class);
      job.setSortComparatorClass(CompositeKeyComparator.class);
      job.setMapOutputKeyClass(IsinKey.class);
      job.setMapOutputValueClass(Tick.class);
      job.setInputFormatClass(TextInputFormat.class);
      job.setOutputFormatClass(TextOutputFormat.class);
      job.setMapperClass(CalcVWAPMapper.class);
      job.setReducerClass(CalcVWAPReducer.class);
      FileInputFormat.setInputPaths(job, new Path(args[0]));
      FileOutputFormat.setOutputPath(job, new Path(args[1]));
      System.out.println(job.waitForCompletion(true));
   }
}

Dans l’exemple, on attribue en dur des valeurs ISIN et DATES dans la configuration d’Hadoop (lignes 7 et 8) afin d’y avoir accès dans le Mapper et le Reducer. Il est tout à fait envisageable de le rendre paramétrable avec les arguments passés à la méthode main.

CalcVWAPMapper

public class CalcVWAPMapper extends Mapper<LongWritable, Text, IsinKey, Tick> {
   private String currFile = null;
   private String currDate = null;
   private String currHour = null;
   private String currIsin = null;
   private List<String> isins = null;

   public String getFileName(Context context) {
      FileSplit fileSplit = (FileSplit)context.getInputSplit();
      return fileSplit.getPath().getName().toString();
   }

   protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
      if (!getFileName(context).equals(currFile)) {
         currFile = getFileName(context);
         currDate = currFile.replace(".txt", "");
      }
      if (isins == null) {
         isins = Arrays.asList(context.getConfiguration().getStrings("ISIN"));
      }

      String line = value.toString();
      if (line.startsWith("N;")) {
         String[] result = line.split(";");
         if (result.length >= 5) {
            if (isins.contains(result[1])) {
               if (!result[1].equals(currIsin)) {
                  currIsin = result[1];
               }
               if (!result[2].equals(currHour)) {
                  currHour = result[2];
               }
               String volume = result[3];
               String price = result[4];
               context.write(new IsinKey(currIsin+"-"+currDate, currHour), new Tick(volume, price));
            }
         }
      }
   }
}

Le Mapper est assez simple. Si ce n’est pas déjà fait, on prend le nom du fichier traité afin d’en extraire la date. Puis pareil, on récupère la liste d’ISIN entrée  en paramètre dans la JobConf Hadoop. (lignes 14 à 20)

Ensuite, on lit ligne par ligne le fichier. Si la ligne commence par “N;” alors on la split (lignes 23 et 24). L’Isin est la deuxième partie, l’heure la troisième, tandis que le volume et le prix sont respectivement les quatrièmes et cinquièmes parties.

Enfin, on écrit dans le context nos objets. D’un côté, la Clé composé de l’Isin-Date et de l’heure, et de l’autre côté la Valeur qui est composé du volume et du prix. (ligne 35)

CalcVWAPReducer

public class CalcVWAPReducer extends Reducer<IsinKey, Tick, IsinKey, OutputVWAP> {

   private String isin = null;
   private float vwap = 0.0f;
   private long totalVolume = 0;
   private List<Date> hoursList = null;
   private SimpleDateFormat f = new SimpleDateFormat("HH:mm:ss");

   protected void reduce(IsinKey key, Iterable<Tick> values, Context context) throws IOException, InterruptedException {
      IsinKey lastKey = null;
      OutputVWAP lastOutput = null;
      int currIdx = 0;

      if (hoursList == null) {
         hoursList = new ArrayList<Date>();
         String []dates = context.getConfiguration().getStrings("DATES");
         for (int i = 0; i < dates.length; i++) {
            try {
               hoursList.add(f.parse(dates[i]));
            } catch (Exception e) {
               e.printStackTrace();
            }
         }
      }

      for (Tick value : values) {
         try {
            if (currIdx >= hoursList.size()) {
               return;
            }
            if (f.parse(key.getDate()).after(hoursList.get(currIdx))) {
               context.write(lastKey, lastOutput);
               if (currIdx < hoursList.size()-1) {
                  currIdx++;
               } else {
                  return;
               }
            }
            Float price = Float.parseFloat(value.getPrice());
            Integer volume = Integer.parseInt(value.getVolume());
            vwap = (volume * price + totalVolume * vwap)
               / (totalVolume + volume);
            totalVolume += volume;

            lastKey = new IsinKey(key.getKey(), f.format(hoursList.get(currIdx)));
            lastOutput = new OutputVWAP(vwap);
         } catch (Exception e) {
            e.printStackTrace();
         }
      }
   context.write(lastKey, lastOutput);
   }
}

Pour le Reducer, dans un premier temps, on récupère les heures rentrés dans la configuration d’hadoop (lignes 14 à 24). Le but est d’écrire en sortie la vwap pour les heures voulues et ordonnées.

Puis on itère sur les <Values>. A chaque itération on calcule la vwap et on stocke le retour. (lignes 26 à 50)

Si à l’itération d’après l’heure de la clé dépasse l’heure en paramètre, alors on écrit la dernière vwap calculé dans le context. Puis on passe à l’heure d’après en paramètre. Une fois qu’on a traité toutes les heures voulues, on sort de la méthode. (lignes 28 à 38)

Vous pouvez récupérer les sources complètes sur le GitHub Infine

Conclusion

Nous avons donc vu comment calculer les vwap en utilisant le framework Hadoop en mode Stand-Alone.
Mais la où Hadoop tire toute sa puissance, c’est dans la parallélisation et le clustering.
Je vous montrerai donc dans un prochain article comment monter, configurer et utiliser un cluster avec Hadoop.

Partagez !    |
  • JavaGeekFr

    Juste une question :
    Comment tu charges le ou les fichiers en entrée ? Ca vient du file system de l’os et ca vient d’un fichier sur hdfs ?

    • JavaGeekFr

      J’ai aussi oublié fe demander cimbien de temps et de ressources ca prend pour traiter un fichier de 1Go ?

    • Jonathan Blaisius

      Les fichiers en entrée sont chargés via cette instruction : FileInputFormat.setInputPaths(job, new Path(args[0]));
      En StandAlone (lancement via l’IDE), c’est le file system de l’os qui est utilisé.
      Pour utiliser HDFS il faut exporter le jar et le lancer en ligne de commande sur une VM Hadoop (ça sera traité dans un prochain article avec le clustering)

    • Jonathan Blaisius

      Les fichiers en entrée sont chargés via cette instruction : FileInputFormat.setInputPaths(job, new Path(args[0]));
      En StandAlone (lancement via l’IDE), c’est le file system de l’os qui est utilisé.
      Pour utiliser HDFS il faut exporter le jar et le lancer en ligne de commande sur une VM Hadoop (ça sera traité dans un prochain article avec le clustering)

  • Chouaib Seghier

    bnsr ,

    j’ai un ptt prb de la partie de mapper Mr, vous êtes donner un fichier de 1GB pour une seule mapper? (dans ce cas n’ont pas le parallélisation). ou vous avez donné une seule ligne pour chaque mapper (est si le cas précise a nous svp j’ai besoin a ton aide )