Kubernetes – How Flannel network works

Software based packet routing mechanism without NAT.
https://blog.laputa.io/kubernetes-flannel-networking-6a1cb1f8ec7c

Advertisements
Posted in Kubernetes, Programming | Leave a comment

HTTP – Streaming events delivery with HTTP protocol

HTTP 1.1 has the following header option to deliver the streaming data to clients by opening a TCP connection.

Transfer-Encoding: chunked

With above header, Content-Length will be omitted but beginning of each chunk will have the length of the current chunk in hexadecimal format, followed by ‘\r\n’ and then the chunk itself, followed by another ‘\r\n’.

HTTP/1.1 200 OK 
Content-Type: text/plain 
Transfer-Encoding: chunked

7\r\n
Mozilla\r\n 
9\r\n
Developer\r\n
7\r\n
Network\r\n
0\r\n 
\r\n

Kubernetes API server is using this mechanism for watch=true API.
Example)

$ curl -X GET -i http://127.0.0.1:8080/api/v1/pods?watch=true
HTTP/1.1 200 OK
Transfer-Encoding: chunked
Date: Tue, 15 Mar 2016 08:00:09 GMT
Content-Type: text/plain; charset=utf-8
Transfer-Encoding: chunked

{"type":"ADDED","object":{"kind":"Pod","apiVersion":"v1","metadata":{"name":"nginx-app-o0kvl","generateName":"nginx-app-","namespace":"default","selfLink":"/api/v1/namespaces/default/pods/nginx-app-o0kvl","uid":"090cc0c8-ea84-11e5-8c79-42010af00003","resourceVersion":"228082","creationTimestamp":"2016-03-15T08:00:51Z","labels":{"run":"nginx-app"},"annotations":{"kubernetes.io/created-by":"{\"kind\":\"SerializedReference\",\"apiVersion\":\"v1\",\"reference\":{\"kind\":\"ReplicationController\",\"namespace\":\"default\",\"name\":\"nginx-app\",\"uid\":\"090bfb57-ea84-11e5-8c79-42010af00003\",\"apiVersion\":\"v1\",\"resourceVersion\":\"228081\"}}\n"}},"spec":{"volumes":[{"name":"default-token-wpfjn","secret":{"secretName":"default-token-wpfjn"}}],"containers":[{"name":"nginx-app","image":"nginx","ports":[{"containerPort":80,"protocol":"TCP"}],"resources":{},"volumeMounts":[{"name":"default-token-wpfjn","readOnly":true,"mountPath":"/var/run/secrets/kubernetes.io/serviceaccount"}],"terminationMessagePath":"/dev/termination-log","imagePullPolicy":"Always"}],"restartPolicy":"Always","terminationGracePeriodSeconds":30,"dnsPolicy":"ClusterFirst","serviceAccountName":"default","serviceAccount":"default","securityContext":{}},"status":{"phase":"Pending"}}}
{"type":"MODIFIED","object":{"kind":"Pod","apiVersion":"v1","metadata":{"name":"nginx-app-o0kvl","generateName":"nginx-app-","namespace":"default","selfLink":"/api/v1/namespaces/default/pods/nginx-app-o0kvl","uid":"090cc0c8-ea84-11e5-8c79-42010af00003","resourceVersion":"228084","creationTimestamp":"2016-03-15T08:00:51Z","labels":{"run":"nginx-app"},"annotations":{"kubernetes.io/created-by":"{\"kind\":\"SerializedReference\",\"apiVersion\":\"v1\",\"reference\":{\"kind\":\"ReplicationController\",\"namespace\":\"default\",\"name\":\"nginx-app\",\"uid\":\"090bfb57-ea84-11e5-8c79-42010af00003\",\"apiVersion\":\"v1\",\"resourceVersion\":\"228081\"}}\n"}},"spec":{"volumes":[{"name":"default-token-wpfjn","secret":{"secretName":"default-token-wpfjn"}}],"containers":[{"name":"nginx-app","image":"nginx","ports":[{"containerPort":80,"protocol":"TCP"}],"resources":{},"volumeMounts":[{"name":"default-token-wpfjn","readOnly":true,"mountPath":"/var/run/secrets/kubernetes.io/serviceaccount"}],"terminationMessagePath":"/dev/termination-log","imagePullPolicy":"Always"}],"restartPolicy":"Always","terminationGracePeriodSeconds":30,"dnsPolicy":"ClusterFirst","serviceAccountName":"default","serviceAccount":"default","nodeName":"10.240.0.4","securityContext":{}},"status":{"phase":"Pending"}}}
{"type":"MODIFIED","object":{"kind":"Pod","apiVersion":"v1","metadata":{"name":"nginx-app-o0kvl","generateName":"nginx-app-","namespace":"default","selfLink":"/api/v1/namespaces/default/pods/nginx-app-o0kvl","uid":"090cc0c8-ea84-11e5-8c79-42010af00003","resourceVersion":"228088","creationTimestamp":"2016-03-15T08:00:51Z","labels":{"run":"nginx-app"},"annotations":{"kubernetes.io/created-by":"{\"kind\":\"SerializedReference\",\"apiVersion\":\"v1\",\"reference\":{\"kind\":\"ReplicationController\",\"namespace\":\"default\",\"name\":\"nginx-app\",\"uid\":\"090bfb57-ea84-11e5-8c79-42010af00003\",\"apiVersion\":\"v1\",\"resourceVersion\":\"228081\"}}\n"}},"spec":{"volumes":[{"name":"default-token-wpfjn","secret":{"secretName":"default-token-wpfjn"}}],"containers":[{"name":"nginx-app","image":"nginx","ports":[{"containerPort":80,"protocol":"TCP"}],"resources":{},"volumeMounts":[{"name":"default-token-wpfjn","readOnly":true,"mountPath":"/var/run/secrets/kubernetes.io/serviceaccount"}],"terminationMessagePath":"/dev/termination-log","imagePullPolicy":"Always"}],"restartPolicy":"Always","terminationGracePeriodSeconds":30,"dnsPolicy":"ClusterFirst","serviceAccountName":"default","serviceAccount":"default","nodeName":"10.240.0.4","securityContext":{}},"status":{"phase":"Pending","conditions":[{"type":"Ready","status":"False","lastProbeTime":null,"lastTransitionTime":"2016-03-15T08:00:51Z","reason":"ContainersNotReady","message":"containers with unready status: [nginx-app]"}],"hostIP":"10.240.0.4","startTime":"2016-03-15T08:00:51Z","containerStatuses":[{"name":"nginx-app","state":{"waiting":{"reason":"ContainerCreating","message":"Image: nginx is ready, container is creating"}},"lastState":{},"ready":false,"restartCount":0,"image":"nginx","imageID":""}]}}}
{"type":"MODIFIED","object":{"kind":"Pod","apiVersion":"v1","metadata":{"name":"nginx-app-o0kvl","generateName":"nginx-app-","namespace":"default","selfLink":"/api/v1/namespaces/default/pods/nginx-app-o0kvl","uid":"090cc0c8-ea84-11e5-8c79-42010af00003","resourceVersion":"228094","creationTimestamp":"2016-03-15T08:00:51Z","labels":{"run":"nginx-app"},"annotations":{"kubernetes.io/created-by":"{\"kind\":\"SerializedReference\",\"apiVersion\":\"v1\",\"reference\":{\"kind\":\"ReplicationController\",\"namespace\":\"default\",\"name\":\"nginx-app\",\"uid\":\"090bfb57-ea84-11e5-8c79-42010af00003\",\"apiVersion\":\"v1\",\"resourceVersion\":\"228081\"}}\n"}},"spec":{"volumes":[{"name":"default-token-wpfjn","secret":{"secretName":"default-token-wpfjn"}}],"containers":[{"name":"nginx-app","image":"nginx","ports":[{"containerPort":80,"protocol":"TCP"}],"resources":{},"volumeMounts":[{"name":"default-token-wpfjn","readOnly":true,"mountPath":"/var/run/secrets/kubernetes.io/serviceaccount"}],"terminationMessagePath":"/dev/termination-log","imagePullPolicy":"Always"}],"restartPolicy":"Always","terminationGracePeriodSeconds":30,"dnsPolicy":"ClusterFirst","serviceAccountName":"default","serviceAccount":"default","nodeName":"10.240.0.4","securityContext":{}},"status":{"phase":"Running","conditions":[{"type":"Ready","status":"True","lastProbeTime":null,"lastTransitionTime":"2016-03-15T08:00:52Z"}],"hostIP":"10.240.0.4","podIP":"172.16.49.2","startTime":"2016-03-15T08:00:51Z","containerStatuses":[{"name":"nginx-app","state":{"running":{"startedAt":"2016-03-15T08:00:52Z"}},"lastState":{},"ready":true,"restartCount":0,"image":"nginx","imageID":"docker://sha256:af4b3d7d5401624ed3a747dc20f88e2b5e92e0ee9954aab8f1b5724d7edeca5e","containerID":"docker://b97168314ad58404dbce7cb94291db7a976d2cb824b39e5864bf4bdaf27af255"}]}}}
Posted in Programming, Web | Leave a comment

MapReduce – Reduce side join example

https://www.edureka.co/blog/mapreduce-example-reduce-side-join/

Posted in map_reduce, Programming | Leave a comment

Kubenetes – Example of Kubenetes

https://blog.octo.com/en/how-does-it-work-kubernetes-episode-1-kubernetes-general-architecture/
https://blog.octo.com/en/how-does-it-work-kubernetes-episode-2-kubernetes-networking/
https://blog.octo.com/en/how-does-it-work-kubernetes-episode-3-infrastructure-as-code-the-tools-of-the-trade/
https://blog.octo.com/en/how-does-it-work-kubernetes-episode-4-how-to-ansible-your-coreos-and-etcd/
https://blog.octo.com/en/how-does-it-work-kubernetes-episode-5-master-and-worker-at-last/

Posted in Kubernetes | Leave a comment

Docker – difference RUN, CMD and ETRYPOINT

1. CMD vs ENTRYPOINT
The ENTRYPOINT specifies a command that will always be executed when the container starts. The CMD specifies arguments that will be fed to the ENTRYPOINT.

2. RUN vs CMD
RUN – command triggers while we build the docker image.
CMD – command triggers while we launch the created docker image.

Posted in docker, Programming | Leave a comment

Java – Loading an entire Cassandra table in parallel

Sometimes, it is required to load all rows in the Cassandra table but it is very slow if loading is performed by a single thread.
Let’s say we have the following table in Cassandra

CREATE TABLE users (
  userid text PRIMARY KEY,
  first_name text,
  last_name text,
  emails set<text>,
  top_scores list<int>,
  todo map<timestamp, text>
);

If we load the entire table with the following CQL syntax in a single thread, it can take several hours if number of rows is 10 million.

select * from users;

Fortunately, there is a way to load an entire table in parallel. Astyanax has a class called AllRowsReader which is showing how to load entire table in parallel.
The basic idea of this code is to use token() keyword on selecting data. Since Cassandra is using hashing mechanism on distributing data in multiple hosts. If we know the token range, Cassandra is allow to load data in parallel.
For example, the below CQL syntax will load users rows between 0 and 10000 but this token value will be determined by the configured current partitioner in Cassandra.

select * from users where token(userid) >= 0 and token(userid) <= 10000;

The below code is summarizing this idea.

public class TokenRange
{
    private final BigInteger beginToken;
    private final BigInteger endToken;

    public TokenRange(BigInteger beginToken, BigInteger endToken)
    {
        this.beginToken = beginToken;
        this.endToken = endToken;
    }

    public BigInteger getBeginToken()
    {
        return beginToken;
    }

    public BigInteger getEndToken()
    {
        return endToken;
    }
}
public interface TokenPartitioner
{
    List<TokenRange> splitTokenRange(int count);
}
public class MurmurPartitioner implements TokenPartitioner
{
    private static final BigInteger MIN = new BigInteger(Long.toString(Long.MIN_VALUE));
    private static final BigInteger MAX = new BigInteger(Long.toString(Long.MAX_VALUE));

    @Override
    public List<TokenRange> splitTokenRange(int count)
    {
        List<TokenRange> tokens = new ArrayList<>();
        List<BigInteger> splits = splitRange(MIN, MAX, count);
        Iterator<BigInteger> iter = splits.iterator();
        BigInteger current = iter.next();
        while (iter.hasNext())
        {
            BigInteger next = iter.next();
            tokens.add(new TokenRange(current, next));
            current = next;
        }
        return tokens;
    }

    public static List<BigInteger> splitRange(BigInteger first, BigInteger last, int count)
    {
        List<BigInteger> tokens = new ArrayList<>();
        tokens.add(first);
        BigInteger delta = (last.subtract(first).divide(BigInteger.valueOf((long) count)));
        BigInteger current = first;
        for (int i = 0; i < count - 1; i++)
        {
            current = current.add(delta);
            tokens.add(current);
        }
        tokens.add(last);
        return tokens;
    }
}
public class AllRowReader<T>
{
    private static final Logger log = LoggerFactory.getLogger(AllRowCqlReader.class);

    private final Cluster cluster;
    private final Session session;
    private final static int DEFAULT_PAGE_SIZE = 100;
    private final String tableName;
    private final List<String> partitionKeyNames;
    private final String realm;
    private final Class<T> klass;
    private int pageSize;
    private int concurrencyLevel;

    private final Map<String, TokenPartitioner> partitionerMap = ImmutableMap.of(
            "org.apache.cassandra.dht.Murmur3Partitioner", new MurmurPartitioner(),
            "org.apache.cassandra.dht.RandomPartitioner", new RandomPartitioner()
    );

    public static class Builder<T>
    {
        private final Cluster cluster;
        private final Session session;
        private final String tableName;
        private final List<String> partitionKeyNames;
        private final String realm;
        private final Class<T> klass;
        private int pageSize = DEFAULT_PAGE_SIZE;
        private int concurrencyLevel = 4;

        public Builder(Cluster cluster,
                       Session session,
                       String tableName,
                       List<String> partitionKeyNames,
                       String realm,
                       Class<T> klass)
        {
            this.cluster = cluster;
            this.Session = session;
            this.tableName = tableName;
            this.partitionKeyNames = partitionKeyNames;
            this.realm = realm;
            this.klass = klass;
        }

        public Builder<T> withPageSize(int pageSize)
        {
            this.pageSize = pageSize;
            return this;
        }

        public Builder<T> withConcurrencyLevel(int level)
        {
            this.concurrencyLevel = level;
            return this;
        }

        public AllRowReader<T> build()
        {
            return new AllRowCqlReader<>(cluster,
                    session,
                    tableName,
                    partitionKeyNames,
                    realm,
                    klass,
                    pageSize,
                    concurrencyLevel);
        }
    }

    private AllRowCqlReader(Cluster cluster,
                            Session session,
                            String tableName,
                            List<String> partitionKeyNames,
                            String realm,
                            Class<T> klass,
                            int pageSize,
                            int concurrencyLevel)
    {
        this.cluster = cluster;
        this.session = session;
        this.tableName = tableName;
        this.partitionKeyNames = partitionKeyNames;
        this.realm = realm;
        this.klass = klass;
        this.pageSize = pageSize;
        this.concurrencyLevel = concurrencyLevel;
    }


    public void executeWithCallback(RequestContext ctx,
                                    Function<List<T>, Boolean> callback)
    {
        TokenPartitioner partitioner = findTokenPartitioner(ctx, realm);
        List<TokenRange> tokens = partitioner.splitTokenRange(concurrencyLevel);

        List<Callable<Boolean>> tasks = new ArrayList<>(concurrencyLevel);
        for (TokenRange token : tokens)
        {
            tasks.add(createLoadTaskInRange(ctx, token, callback));
        }

        try
        {
            ExecutorService localExecutor = Executors.newFixedThreadPool(concurrencyLevel,
                    new ThreadFactoryBuilder().setDaemon(true)
                            .setNameFormat("AllRowCqlReaderExecutor-%d")
                            .build());

            try
            {
                List<Future<Boolean>> futures = localExecutor.invokeAll(tasks);
                waitForTasksToFinish(futures);
            }
            finally
            {
                localExecutor.shutdownNow();
            }
        }
        catch (Exception e)
        {
            log.error("failed to load a table {}", tableName, e);
        }
    }

    private void waitForTasksToFinish(List<Future<Boolean>> futures) throws Exception
    {
        for (Future<Boolean> future : futures)
        {
            future.get();
        }
    }

    private TokenPartitioner findTokenPartitioner(RequestContext ctx, String realm)
    {
        String partitioner = cluster.getMetadata().getPartitioner();
        TokenPartitioner found = partitionerMap.get(partitioner);
        if (found == null)
        {
            throw new IllegalArgumentException("Not supported partitioner: " + partitioner);
        }
        return found;
    }

    private Callable<Boolean> createLoadTaskInRange(RequestContext ctx,
                                                    TokenRange token,
                                                    Function<List<T>, Boolean> callback)
    {
        return () ->
        {
            try
            {
                StringBuilder sb = new StringBuilder();
                sb.append("token(");
                sb.append(String.join(",", partitionKeyNames));
                sb.append(")");
                String tokenOfKey = sb.toString();

                BigInteger beginTokenB = token.getBeginToken();
                BigInteger endTokenB = token.getEndToken();

                Statement statement = QueryBuilder.select()
                        .all()
                        .from(tableName)
                        .where(QueryBuilder.gte(tokenOfKey, beginTokenB.longValue()))
                        .and(QueryBuilder.lte(tokenOfKey, endTokenB.longValue()))
                        .setFetchSize(pageSize);

                MappingManager mappingManager = new MappingManager(session);
                ResultSet rs = session.execute(statement);
                Result<T> mappedResultSet = mappingManager.mapper(klass).map(rs);

                List<T> rows = new ArrayList<>(pageSize);
                while (mappedResultSet.iterator().hasNext())
                {
                    rows.add(mappedResultSet.iterator().next());
                    if (rows.size() >= pageSize)
                    {
                        callback.apply(rows);
                        rows = new ArrayList<>(pageSize);
                    }
                }

                if (rows.size() > 0)
                {
                    callback.apply(rows);
                }

                return true;
            }
            catch (Exception e)
            {
                log.error("failed to load rows in range: {} - {}", token.getBeginToken().toString(), token.getEndToken().toString(), e);
            }

            return false;
        };
    }
}
Posted in Java, Programming | Leave a comment

C++ 17 – Generative Programming in C++ 17

Interesting generative programming in C++ 17. I think that this explains why compile time reflection and declarative programming approach is required for clean and readable code in C++.

Posted in C++, Programming | Leave a comment